操作符

rxjava之所以强大原因就在这里了,操作符可对原始发射器发出的数据进行多种变换继而重新发送。

Map
它的作用是对发射时间发送的每一个事件应用一个函数,每一个事件都按照指定的函数去变化

Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> emitter) throws Exception {emitter.onNext(1 + "");emitter.onNext(2 + "");emitter.onNext(3 + "");}}).map(new Function<String, Integer>() {@Overridepublic Integer apply(String s) throws Exception {return new Integer(s) + 1;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Logger.e(integer + "");}});

发送字符串1,2,3 通过map操作符转换成Integer类型值分别为2,3,4最后发射

│    2
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E:
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 3
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 4
E:

Zip
一 一对应合并两种发射器的数据,当然合并后的数据类型是由我们自定义的。如果两种发射器数据量不同,那么多的数据就会被丢弃。

    Observable.zip(Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);}}), Observable.just(2, 3), new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(Integer integer, Integer integer2) throws Exception {return integer + integer2;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Logger.e(integer + "");}});

前者发送了 1,2,3 后者只发送了2,3,那么合并后第一组数据的3会被丢弃。

├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 3
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: main
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ MainActivity$1.accept  (MainActivity.java:70)
E: │    MainActivity$1.accept  (MainActivity.java:73)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 5
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────

Concat

concat 操作符将两种发射器数据进行连接

   Observable.concat(Observable.just(1, 2, 3, 4), Observable.create(new ObservableOnSubscribe<Object>() {/*** Called for each Observer that subscribes.** @param emitter the safe emitter instance, never null* @throws Exception on error*/@Overridepublic void subscribe(ObservableEmitter<Object> emitter) throws Exception {emitter.onNext(5);emitter.onNext(6);emitter.onNext(7);}})).subscribe(new Consumer<Object>() {/*** Consume the given value.** @param o the value* @throws Exception on error*/@Overridepublic void accept(Object o) throws Exception {Logger.e(o.toString());}});

前者发送了1,2,3,4,后者为5,6,7。那么最后的结果是两者拼接起来且顺序相同。

E: │ 1
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: main
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:78)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 2
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: main
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:78)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 3
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: main
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:78)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 4
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: main
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:78)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 5
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: main
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:78)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 6
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: main
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:78)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 7
E:

这里要注意的是,如果要使用creat来创造默认发射器,那么结束后需调用onComplete() ,否则第二个发射器数据不能正常发射。

FlatMap
分流发射器,就是将当前发射器发出每一组数据进行扩展。扩展成一个发射器(包含多组数据),再将这些发射器的数据组合起来, 组合后的数据并不能一定保证原始数据的顺序。

Observable.just(1, 2).flatMap(new Function<Integer, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Integer integer) throws Exception {if (integer == 1) {List<Integer> list = new ArrayList<>();for (int i = 0; i < 3; i++) {list.add(integer);}return Observable.fromIterable(list).delay(5000, TimeUnit.MILLISECONDS);} else {List<Integer> list = new ArrayList<>();for (int i = 0; i < 3; i++) {list.add(integer);}return Observable.fromIterable(list).delay(5000, TimeUnit.MILLISECONDS);}}}).subscribe(new Consumer<Object>() {/*** Consume the given value.** @param o the value* @throws Exception on error*/@Overridepublic void accept(Object o) throws Exception {Logger.e(o.toString());}});

笔者将两组发射器都延迟5000毫秒后执行,如果是有序,那么按照顺序依次延迟后,前者的发射器也是应该先发出,那就是111,222.

├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 1
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:87)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 2
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:87)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 2
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:87)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 2
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:87)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 1
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:87)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 1
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────

然而并不是这样,结果则是两组发射器同时发出,所以说flatMap没有控制数据的发送顺序,而是由新生成的Observable本身决定。

ConcatMap

concatMap就是与之对应的保证了顺序的分流发射器,

Observable.just(1, 2).concatMap(new Function<Integer, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Integer integer) throws Exception {if (integer == 1) {List<Integer> list = new ArrayList<>();for (int i = 0; i < 3; i++) {list.add(integer);}return Observable.fromIterable(list).delay(10000, TimeUnit.MILLISECONDS);} else {List<Integer> list = new ArrayList<>();for (int i = 0; i < 3; i++) {list.add(integer);}return Observable.fromIterable(list).delay(5000, TimeUnit.MILLISECONDS);}}}).subscribe(new Consumer<Object>() {/*** Consume the given value.** @param o the value* @throws Exception on error*/@Overridepublic void accept(Object o) throws Exception {Logger.e(String.valueOf(System.currentTimeMillis()));Logger.e(o.toString());}});

笔者将第一组发射器延迟10秒, 第二组延迟5秒, 如果ConcatMap不能保证顺序的话,那么首先执行的一定是第二组。

├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 1533124963245
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:88)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 1
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:87)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 1533124963249
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:88)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 1
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:87)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 1533124963251
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-1
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:88)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 1
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-2
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:87)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 1533124968254
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-2
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:88)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 2
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-2
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:87)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 1533124968257
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-2
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:88)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 2
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-2
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:87)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 1533124968259
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
E: │ Thread: RxComputationThreadPool-2
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ LambdaObserver.onNext  (LambdaObserver.java:63)
E: │    MainActivity$1.accept  (MainActivity.java:88)
E: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
E: │ 2
E: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────

然而仍是先执行第一组,然后再是第二组,观察时间差值,正是第二组的延迟时间,所以他的机制应该就是保证第一组数据发射完毕才会开始处理第二组数据然后执行发射。而flatMap则是先处理完所有原始数据再根据生成的Observable本身情况去发送。 其实给apply方法里打个Log很容易就看出来区别。

。。。。。。。后续

RxJava2基础总结(二)相关推荐

  1. 【C++自我精讲】基础系列二 const

    [C++自我精讲]基础系列二 const 0 前言 分三部分:const用法.const和#define比较.const作用. 1 const用法 const常量:const可以用来定义常量,不可改变 ...

  2. java负数右移_收入囊中篇---Java程序基础(二)

    前言: 本篇是接着上一篇更新的,如果没有阅读上一篇的话,可以查阅或回顾一下. 1.收入囊中篇---Java基础必备知识(一) 2.收入囊中篇---Java程序基础(二) Java程序基础目录 1.Ja ...

  3. mysql 基础篇(二) 账号、权限管理

    mysql 基础篇(二) 账号.权限管理.备份与还原 建立账号密码: Grant all on test.* to "cj"@"localhost" ident ...

  4. JVM 内部原理(七)— Java 字节码基础之二

    JVM 内部原理(七)- Java 字节码基础之二 介绍 版本:Java SE 7 为什么需要了解 Java 字节码? 无论你是一名 Java 开发者.架构师.CxO 还是智能手机的普通用户,Java ...

  5. CV:计算机视觉技术之图像基础知识(二)—图像内核的可视化解释

    CV:计算机视觉技术之图像基础知识(二)-图像内核的可视化解释 目录 图像内核的可视化解释 测试九种卷积核 官方Demo DIY图片测试 DIY实时视频测试 相关文章 CV:计算机视觉技术之图像基础知 ...

  6. CV:计算机视觉技术之图像基础知识(二)—以python的skimage和numpy库来了解计算机视觉图像基础(图像存储原理-模糊核-锐化核-边缘检测核,进阶卷积神经网络(CNN)的必备基础)

    CV:计算机视觉技术之图像基础知识(二)-以python的skimage和numpy库来了解计算机视觉图像基础(图像存储原理-模糊核-锐化核-边缘检测核,进阶卷积神经网络(CNN)的必备基础) 目录 ...

  7. MySQL基础总结(二)

    MySQL基础总结(二) 文章目录 MySQL基础总结(二) 四.索引 7.MyISAM主键索引与辅助索引的结构 8.InnoDB主键索引与辅助索引的结构 **`主键索引`** **`辅助(非主键)索 ...

  8. 网络基础(二)及HTTP协议

    网络基础(二)及HTTP协议 文章目录 网络基础(二)及HTTP协议 一.HTTP协议 二.端口 三.udp协议 四.tcp协议 一.HTTP协议 1 . 什么是url? 平时我们俗称的 " ...

  9. 计算机应用基础第二版在线作业c,计算机应用基础作业二(答案)

    计算机应用基础作业二 一.单选题(40题,每题1分,共40分) 1.第一台电子数字计算机的运算速度为每秒______. A:5,000,000次 B:500,000次 C:50,000次 D:5000 ...

  10. (五)JS基础知识二(通过图理解原型和原型链)【三座大山之一,必考!!!】

    JS基础知识二(原型和原型链) 提问 class 继承 类型判断(instanceof) 原型 原型关系 基于原型的执行规则 原型链 说明 提问 如何准确判断一个变量是不是数组 class的原型本质 ...

最新文章

  1. 移动端开发框架Zepto.js
  2. ahk编程_AHK编程可视化的实现
  3. C# DataTable转ListModel通用类
  4. access开发精要(7)-定位记录、查找空值
  5. 关于C#中使用SQLDMO来获取数据库中的一些操作
  6. JSON自动生成相关类
  7. object 构造器java_“java”中为什么“Object”类要有一个空的构造函数?
  8. Acwing 734. 能量石
  9. mysql防止误操作之prompt命令提示符
  10. python开发_xml.etree.ElementTree_XML文件操作
  11. Python学习入门基础教程(learning Python)--5.3 Python写文件基础
  12. python2.7.12源码编译
  13. ABB机械臂手眼标定
  14. 我的职业规划500字计算机范文,职业生涯规划自我分析(职业生涯规划500字)
  15. vue项目引入三方字体
  16. 利用SSRF攻击Redis
  17. 关于word不能存档解决办法
  18. 安全狗2周年“全民大抽奖”活动
  19. 如何通过 HTML+CSS+JS 制作焦点轮播图
  20. Java十六进制操作

热门文章

  1. Peppa's menu
  2. excel图表配合下拉菜单_在下拉列表中选择Excel仪表盘图表
  3. 群晖监控备份方案,为金融企业信息安全保驾护航
  4. 《计算机网络》笔记-第1章计算机网络和因特网
  5. swift中代码生成纯色图片
  6. 区块链:分布式系统核心技术
  7. 屏幕录像功能技术探索及分享
  8. 第三方登陆——QQ登陆详解
  9. Code: 516. DB::Exception: Received from localhost:9000. failed: password is incorrect or there is no
  10. almon多项式_基于Almon变换的多项式阶数选择