RxJava操作符(四)Combining

原文链接 http://blog.chinaunix.net/uid-20771867-id-5197584.html

上一篇文章中我们了解了如何对数据进行过滤,在这篇文章里我们来了解一下如何组装多个Observable的数据。

一、CombineLatest
    CombineLatest操作符可以将2~9个Observable发射的数据组装起来然后再发射出来。不过还有两个前提:
    1.所有的Observable都发射过数据。
    2.满足条件1的时候任何一个Observable发射一个数据,就将所有Observable最新发射的数据按照提供的函数组装起来发射出去。
    
    Rxjava实现CombineLast操作符可以让我们直接将组装的Observable作为参数传值,也可以将所有的Observable装在一个List里面穿进去。

下面我们创建几个Observable对象,分别直接传值和使用List传值将其组装起来

点击(此处)折叠或打开

  1. private Observable<Integer> createObserver(int index) {
  2. return Observable.create(new Observable.OnSubscribe<Integer>() {
  3. @Override
  4. public void call(Subscriber<? super Integer> subscriber) {
  5. for (int i = 1; i < 6; i++) {
  6. subscriber.onNext(i * index);
  7. try {
  8. Thread.sleep(1000);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. }
  13. }
  14. }).subscribeOn(Schedulers.newThread());
  15. }
  16. private Observable<Integer> combineLatestObserver() {
  17. return Observable.combineLatest(createObserver(1), createObserver(2), (num1, num2) -> {
  18. log("left:" + num1 + " right:" + num2);
  19. return num1 + num2;
  20. });
  21. }
  22. List<Observable<Integer>> list = new ArrayList<>();
  23. private Observable<Integer> combineListObserver() {
  24. for (int i = 1; i < 5; i++) {
  25. list.add(createObserver(i));
  26. }
  27. return Observable.combineLatest(list, args -> {
  28. int temp = 0;
  29. for (Object i : args) {
  30. log(i);
  31. temp += (Integer) i;
  32. }
  33. return temp;
  34. });
  35. }

对其进行订阅

点击(此处)折叠或打开

  1. mLButton.setText("combineList");
  2. mLButton.setOnClickListener(e -> combineListObserver().subscribe(i -> log("combineList:" + i)));
  3. mRButton.setText("CombineLatest");
  4. mRButton.setOnClickListener(e -> combineLatestObserver().subscribe(i -> log("CombineLatest:" + i)));

运行结果如下

二、Join
    Join操作符根据时间窗口来组合两个Observable发射的数据,每个Observable都有一个自己的时间窗口,要组合的时候,在这个时间窗口内的数据都有有效的,可以拿来组合。
    Rxjava还实现了groupJoin,基本和join相同,只是最后组合函数的参数不同。

    使用join操作符需要4个参数,分别是:
    1.源Observable所要组合的目标Observable
    2.一个函数,就收从源Observable发射来的数据,并返回一个Observable,这个Observable的生命周期决定了源Observable发射出来数据的有效期
    3. 一个函数,就收从目标Observable发射来的数据,并返回一个Observable,这个Observable的生命周期决定了目标Observable发射出来数据的有效期
    4.一个函数,接收从源Observable和目标Observable发射来的数据,并返回最终组合完的数据。
      
    下面我们使用join和groupJoin操作符分别来组合两个Observable对象

点击(此处)折叠或打开

  1. private Observable<String> createObserver() {
  2. return Observable.create(new Observable.OnSubscribe<String>() {
  3. @Override
  4. public void call(Subscriber<? super String> subscriber) {
  5. for (int i = 1; i < 5; i++) {
  6. subscriber.onNext("Right-" + i);
  7. try {
  8. Thread.sleep(1000);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. }
  13. }
  14. }).subscribeOn(Schedulers.newThread());
  15. }
  16. private Observable<String> joinObserver() {
  17. return Observable.just("Left-").join(createObserver(),
  18. integer -> Observable.timer(3000, TimeUnit.MILLISECONDS),
  19. integer -> Observable.timer(2000, TimeUnit.MILLISECONDS),
  20. (i, j) -> i + j
  21. );
  22. }
  23. private Observable<Observable<String>> groupJoinObserver() {
  24. return Observable.just("Left-")
  25. .groupJoin(createObserver(),
  26. s -> Observable.timer(3000, TimeUnit.MILLISECONDS),
  27. s -> Observable.timer(2000, TimeUnit.MILLISECONDS),
  28. (s, stringObservable) -> stringObservable.map(str -> s + str));
  29. }

分别进行订阅

点击(此处)折叠或打开

  1. mLButton.setText("join");
  2. mLButton.setOnClickListener(e -> joinObserver().subscribe(i -> log("join:" + i + "\n")));
  3. mRButton.setText("groupJoin");
  4. mRButton.setOnClickListener(e -> groupJoinObserver().subscribe(i -> i.subscribe(j -> log("groupJoin:" + j + "\n"))));

运行结果如下,可以看到虽然目标Observable发射了4个数据,但是源Observable只发射了一个有效期为3秒的数据,所以最终的组合结果也只有3个数据。

三、Merege
    Merge操作符将多个Observable发射的数据整合起来发射,就如同是一个Observable发射的数据一样。但是其发射的数据有可能是交错的,如果想要没有交错,可以使用concat操作符。
    当某一个Observable发出onError的时候,merge的过程会被停止并将错误分发给Subscriber,如果不想让错误终止merge的过程,可以使用MeregeDelayError操作符,会将错误在merge结束后再分发。

    下面我们分别使用merge和mergeDelayError操作符来进行merge操作。

点击(此处)折叠或打开

  1. private Observable<Integer> mergeObserver() {
  2. return Observable.merge(Observable.just(1, 2, 3), Observable.just(4, 5, 6));
  3. }
  4. private Observable<Integer> mergeDelayErrorObserver() {
  5. return Observable.mergeDelayError(Observable.create(new Observable.OnSubscribe<Integer>() {
  6. @Override
  7. public void call(Subscriber<? super Integer> subscriber) {
  8. for (int i = 0; i < 5; i++) {
  9. if (i == 3) {
  10. subscriber.onError(new Throwable("error"));
  11. }
  12. subscriber.onNext(i);
  13. }
  14. }
  15. }), Observable.create(new Observable.OnSubscribe<Integer>() {
  16. @Override
  17. public void call(Subscriber<? super Integer> subscriber) {
  18. for (int i = 0; i < 5; i++) {
  19. subscriber.onNext(5 + i);
  20. }
  21. subscriber.onCompleted();
  22. }
  23. }));
  24. }

分别对其订阅

点击(此处)折叠或打开

  1. mLButton.setText("Merge");
  2. mLButton.setOnClickListener(e -> mergeObserver().subscribe(i -> log("Merge:" + i)));
  3. mRButton.setText("mergeDelayError");
  4. mRButton.setOnClickListener(e -> mergeDelayErrorObserver().subscribe(new Subscriber<Integer>() {
  5. @Override
  6. public void onCompleted() {
  7. log("onCompleted");
  8. }
  9. @Override
  10. public void onError(Throwable e) {
  11. log("mergeDelayError:" + e);
  12. }
  13. @Override
  14. public void onNext(Integer integer) {
  15. log("mergeDelayError:" + integer);
  16. }
  17. }));

运行结果如下。

四、StartWith、Switch
    StartWith操作符会在源Observable发射的数据前面插上一些数据。不仅仅只可以插入一些数据,还可以将Iterable和Observable插入进入。如果插入的是Observable,则这个Observable发射的数据会插入到
源Observable发射数据的前面。

    switch操作符在Rxjava上的实现为switchOnNext,用来将一个发射多个小Observable的源Observable转化为一个Observable,然后发射这多个小Observable所发射的数据。
需要注意的就是,如果一个小的Observable正在发射数据的时候,源Observable又发射出一个新的小Observable,则前一个Observable发射的数据会被抛弃,直接发射新
的小Observable所发射的数据。可以看示意图中的黄色圆圈就被丢弃了。

下面使用startWith和switchOnNext操作符来组合两个Observable

点击(此处)折叠或打开

  1. private Observable<Integer> startWithObserver() {
  2. return Observable.just(1, 2, 3).startWith(-1, 0);
  3. }
  4. private Observable<String> switchObserver() {
  5. return Observable.switchOnNext(Observable.create(
  6. new Observable.OnSubscribe<Observable<String>>() {
  7. @Override
  8. public void call(Subscriber<? super Observable<String>> subscriber) {
  9. for (int i = 1; i < 3; i++) {
  10. subscriber.onNext(createObserver(i));
  11. try {
  12. Thread.sleep(2000);
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. }
  18. }
  19. ));
  20. }
  21. private Observable<String> createObserver(int index) {
  22. return Observable.create(new Observable.OnSubscribe<String>() {
  23. @Override
  24. public void call(Subscriber<? super String> subscriber) {
  25. for (int i = 1; i < 5; i++) {
  26. subscriber.onNext(index + "-" + i);
  27. try {
  28. Thread.sleep(1000);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. }
  34. }).subscribeOn(Schedulers.newThread());
  35. }

分别进行订阅

点击(此处)折叠或打开

  1. mLButton.setText("StartWith");
  2. mLButton.setOnClickListener(e -> startWithObserver().subscribe(i -> log("StartWith:" + i)));
  3. mRButton.setText("switch");
  4. mRButton.setOnClickListener(e -> switchObserver().subscribe(i -> log("switch:" + i)));

运行结果如下,可以看到startwith将-1和0插入到前面。使用siwtch的时候第一个小Observable只发射出了两个数据,第二个小Observable就被源Observable发射出来了,所以其接下来的两个数据被丢弃。

五、Zip
    Zip操作符将多个Observable发射的数据按顺序组合起来,每个数据只能组合一次,而且都是有序的。最终组合的数据的数量由发射数据最少的Observable来决定。
    Rxjava实现了zip和zipWith两个操作符。

    下面我们使用zip和zipWith操作符来组合数据

点击(此处)折叠或打开

  1. private Observable<String> zipWithObserver() {
  2. return createObserver(2).zipWith(createObserver(3), (s, s2) -> s + "-" + s2);
  3. }
  4. private Observable<String> zipWithIterableObserver() {
  5. return Observable.zip(createObserver(2), createObserver(3), createObserver(4), (s, s2, s3) -> s + "-" + s2 + "-" + s3);
  6. }
  7. private Observable<String> createObserver(int index) {
  8. return Observable.create(new Observable.OnSubscribe<String>() {
  9. @Override
  10. public void call(Subscriber<? super String> subscriber) {
  11. for (int i = 1; i <= index; i++) {
  12. log("emitted:" + index + "-" + i);
  13. subscriber.onNext(index + "-" + i);
  14. try {
  15. Thread.sleep(500);
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. }
  21. }).subscribeOn(Schedulers.newThread());
  22. }

分别进行订阅

点击(此处)折叠或打开

  1. mLButton.setText("zipWith");
  2. mLButton.setOnClickListener(e -> zipWithObserver().subscribe(i -> log("zipWith:" + i + "\n")));
  3. mRButton.setText("zip");
  4. mRButton.setOnClickListener(e -> zipWithIterableObserver().subscribe(i -> log("zip:" + i + "\n")));

运行结果如下,可以看到,最终都发射出了两个数据,因为createObserver(2)所创建的Observable只会发射两个数据,所以其他Observable多余发射的数据都被丢弃了。
  

Combning的操作符就到这了,本文中的源码见 https://github.com/Chaoba/RxJavaDemo

RxJava操作符(四)Combining相关推荐

  1. RxJava操作符学习APP

    用于学习RxJava操作符的app 下载地址: fir.im http://fir.im/bpdu 或者直接在 Release里面下载 https://github.com/jiang111/RxJa ...

  2. Rxjava操作符之过滤操作

    前言: 本文将介绍以下过滤类操作符(基于Rxjava2.0): filter ofType take takeLast first firstOrError last lastOrError skip ...

  3. Android RxJava操作符的学习---创建操作符

    RxJava如此受欢迎的原因,在于其提供了丰富 & 功能强大的操作符,几乎能完成所有的功能需求 1. 简介 RxJava 操作符的具体简介如下: 2. 类型 RxJava功能强大,所以其对应的 ...

  4. RxJava的四种Subjects:PublishSubject、ReplaySubject、BehaviorSubject、AsyncSubject的理解

    RxJava的四种Subjects:PublishSubject.ReplaySubject.BehaviorSubject.AsyncSubject的理解 Subject:它既是Observable ...

  5. RxJava系列四(过滤操作符)

    转载请注明出处:https://zhuanlan.zhihu.com/p/21966621 RxJava系列1(简介) RxJava系列2(基本概念及使用介绍) RxJava系列3(转换操作符) Rx ...

  6. RxJava操作符在android中的使用场景详解(一)

    转载请注明出处:http://www.wangxinarhat.com/2016/04/19/2016-04-19-rxjava-android-operate1/ 最近学习了RxJava在andro ...

  7. RxJava操作符(三)Filtering

    在上一篇文章里,我们了解了转化操作符,能将数据转化为我们想要的格式,但是如果数据集合里面有一些我们想要过滤掉的数据怎么办?这时候我们就需要使用过滤操作符了,有点类似于sql里的where,让Obser ...

  8. RxJava 操作符 do

    看下文档给的图片 注册一项操作以应对各种可观察的生命周期事件 do的操作符有很多具体如下 下面看下器使用 Observable.create(new ObservableOnSubscribe< ...

  9. Android RxJava操作符的学习---条件 / 布尔操作符

    3.6 条件 / 布尔操作符 3.6.1. 作用 通过设置函数,判断被观察者(Observable)发送的事件是否符合条件 3.6.2. 类型 RxJava2中,条件 / 布尔操作符的类型包括: 下面 ...

最新文章

  1. 小学五年级计算机备考方案,小学五年级备考方案(2)
  2. 算法积分0042算法笔记——【随机化算法】计算π值和计算定积分
  3. 发布:NetBeans IDE 8.1 Beta
  4. Wing IDE 5.0 破解之寻找注册码
  5. e3是合法浮点数吗_下面4个选项中,均是合法浮点数的选项是?
  6. SDL2 undefined reference to `SDL_Init' 问题
  7. 量化策略回测TRIXKDJ
  8. 【Qt教程】1.6 - Qt5信号与槽、Single Slot emit、自定义信号、自定义槽
  9. Idea安装Eslint插件详解 提示:Plugin NativeScript was not installed解决
  10. Android Q Labs| Android Q 分区存储
  11. linux双机热备份
  12. 2016年408考研算法题
  13. Axure RP 8 Pro 破解版软件安装包
  14. 成功在中东和北非地区发布应用或游戏的 7 个技巧
  15. 2019年第十二届中国大学生计算机设计大赛总结
  16. windows server 2008 R2 SP1多国语言包官方下载
  17. ADC噪声全面分析 -01- ADC噪声的类型以及ADC特性
  18. (阅读笔记)3DRA和CTA脑动脉瘤分割方法与评价
  19. 工控安全之系统加固篇
  20. i5处理器做java够用吗,一般的游戏玩家用i5处理器就够了,这是真的吗?

热门文章

  1. android 获取设备的mac地址,Android编程获取设备MAC地址的实现方法
  2. RN-路由---基础
  3. 烟台初中计算机会考,山东烟台市2018年初中学业水平考试WORD 版有答案
  4. 单身程序猿的七夕应该怎么过?
  5. 密码学常用英文单词翻译
  6. 单片机学习笔记(五)—— 键盘
  7. windows正版系统下载地址
  8. 人工智能、机器学习和深度学习的关系
  9. 《声声慢·寻寻觅觅》 李清照
  10. 360全景偏移调整_360全景拼接之调水平