本文讨论几种不同方式合并RxJava的Observable数据流。

Observable介绍

Observable 序列,或简单称为Observable,表示异步数据流。这些概念遵循基于观察者模式,在该模式中,一个叫做观察者的对象订阅了Observable发出的数据。要使用RxJava需要增加依赖:

<dependency><groupId>io.reactivex</groupId><artifactId>rxjava</artifactId><version>1.2.5</version>
</dependency>

订阅是无阻塞的,因为观察者会对Observable未来发出的任何消息做出响应,这也又促进了并发性。下面是简单RxJava示例:

Observable.from(new String[] { "John", "Doe" }).subscribe(name -> System.out.println("Hello " + name))

合并Observable数据

使用响应式框架编程,常见场景是合并不同的Observable数据。举例,web应用中可能需要获得两组相互独立的异步数据流。为了避免等待前面数据流完成才请求下一个数据流,我们可以同时调用,然后订阅合并两个数据流。本节讨论几种不同方式合并多个Observable数据,并区别它们之间的差异。

Merge

使用Merge操作可以合并多个Observable数据为一个输出结果,示例代码如下:

@Test
public void givenTwoObservables_whenMerged_shouldEmitCombinedResults() {TestSubscriber<String> testSubscriber = new TestSubscriber<>();Observable.merge(Observable.from(new String[] {"Hello", "World"}),Observable.from(new String[] {"I love", "RxJava"})).subscribe(testSubscriber);testSubscriber.assertValues("Hello", "World", "I love", "RxJava");
}

MergeDelayError

mergeDelayError 方法与merge功能一致,但如果在合并过程中有错误发生,可以忽略错误继续合并,最后传播错误异常:

@Test
public void givenMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError() {TestSubscriber<String> testSubscriber = new TestSubscriber<>();Observable.mergeDelayError(Observable.from(new String[] { "hello", "world" }),Observable.error(new RuntimeException("Some exception")),Observable.from(new String[] { "rxjava" })).subscribe(testSubscriber);testSubscriber.assertValues("hello", "world", "rxjava");testSubscriber.assertError(RuntimeException.class);
}

上面示例输出结果,与没有错发发生结果一致:

hello
world
rxjava

注意,如果使用 merge 代替 mergeDelayError, 则字符串rxjava不会发出,因为merge遇到错误会立刻停止Observable数据流。

zip

zip 方法组合两个序列为成对(pair)数据序列:

@Test
public void givenTwoObservables_whenZipped_thenReturnCombinedResults() {List<String> zippedStrings = new ArrayList<>();Observable.zip(Observable.from(new String[] { "Simple", "Moderate", "Complex" }), Observable.from(new String[] { "Solutions", "Success", "Hierarchy"}),(str1, str2) -> str1 + " " + str2).subscribe(zippedStrings::add);assertThat(zippedStrings).isNotEmpty();assertThat(zippedStrings.size()).isEqualTo(3);assertThat(zippedStrings).contains("Simple Solutions", "Moderate Success", "Complex Hierarchy");
}

Zip With Interval

下面示例给zip方法增加interrval参数,可以有效延迟第一个数据流推送数据元素:

@Test
public void givenAStream_whenZippedWithInterval_shouldDelayStreamEmmission() {TestSubscriber<String> testSubscriber = new TestSubscriber<>();Observable<String> data = Observable.just("one", "two", "three", "four", "five");Observable<Long> interval = Observable.interval(1L, TimeUnit.SECONDS);Observable.zip(data, interval, (strData, tick) -> String.format("[%d]=%s", tick, strData)).toBlocking().subscribe(testSubscriber);testSubscriber.assertCompleted();testSubscriber.assertValueCount(5);testSubscriber.assertValues("[0]=one", "[1]=two", "[2]=three", "[3]=four", "[4]=five");
}

总结

本文介绍几个合并RxJava的Observable数据流的方法。你还可以学习通过官方文档学习更多的方法:combineLatest, join, groupJoin, switchOnNext等。

合并RxJava的Observable数据流相关推荐

  1. RxJava2 / RxAndroid2的merge操作合并多个Observable

    RxJava2/RxAndroid2的merge操作合并多个Observable RxAndroid2/RxJava2的merge操作合并若干个Observable为单个可观测的Observable, ...

  2. RxJava使用Observable.zip的Iterable参数

    Observable.zip zip函数允许你传入多个请求,然后合并出另外的结果传出来,普通的用法就不多说了,网上一堆介绍的 然后做项目时有个疑问点,Observable.zip如果传入一个列表,合并 ...

  3. RxJava中Observable的基本用法

    1.前言 随着RxJava越来越火,相信在2016年必定会大方异彩. 虽然前前后后看了不少RxJava的文章,但都没有积累下来,又没有在实际项目中使用过. 因此特意写下这篇文章记录学习过程. 1.简介 ...

  4. Android RxJava操作符的学习---组合合并操作符---合并数据源并展示

    1. 需求场景 2. 功能说明 即,同时向2个数据源获取数据 -> 合并数据 -> 统一展示到客户端 3. 具体实现 此处采用Merge() & Zip()操作符进行讲解,其中: ...

  5. Android RxJava操作符的学习---组合 / 合并操作符

    3.3 组合 / 合并操作符 3.3.1. 作用 组合 多个被观察者(Observable) & 合并需要发送的事件 应用场景 组合多个被观察者 合并多个事件 发送事件前追加发送事件 统计发送 ...

  6. RxJava(三)-合并操作符

    1.concat和concatArray   是将多个被观察者合并后,发射出去. Observable o1 = Observable.just(1,2,3); Observable o2 = Obs ...

  7. 多个PDF文件或PDF数据流的合并

    背景 公司因人员变动和业务整改,提出将原打印(数据库保存的HTML字符串做替换)全部迁移至framework(利用framework完成数据字段替换与EL表达式相似,转换成PDF的二进制流返回),主要 ...

  8. RxJava Agera 从源码简要分析基本调用流程(2)

    2019独角兽企业重金招聘Python工程师标准>>> 版权声明:本文由晋中望原创文章,转载请注明出处:  文章原文链接:https://www.qcloud.com/communi ...

  9. ReactiveX/RxJava V3.0.0版本

    最近使用加载文本数据到内存中,提供web服务的方式,由于文件比较大,导致load数据非常消耗内存,4G的文本,每行数据封装对象存在hashmap中运行期竟然消耗了23G内存.后改用JDK8的strea ...

最新文章

  1. jetty9请求form表单太小限制
  2. NC:噬菌体中无机硫辅助代谢基因的生态学研究
  3. 【练习】c++用链栈实现计算器
  4. 软件保障与测试课程实践记录:贪吃蛇小程序
  5. netty系列之:netty架构概述
  6. PAT1132: Cut Integer
  7. 利用C++Builder自定义Windows窗体“系统菜单”
  8. Win7启用Administrator账户登录
  9. 线性表-串:KMP模式匹配算法
  10. IDEA 常用快捷键介绍
  11. ADS2015 for linux 安装教程
  12. Websphere8.5.5安装教程
  13. helper java_请教问题,helper类在java中的作用。
  14. 网易云信周梁伟专访:亿级架构IM平台的技术难点解析
  15. Chrome浏览器默认新标签页空白怎么办
  16. 计算机仿真塞曼效应实验报告,塞曼效应实验报告[完整版].doc
  17. 图标字体的优缺点和使用
  18. pdf如何转换成excel?教你几个方法
  19. java fail 方法_java中的fail是什么意思
  20. 三重邪骨手机版怎么登录服务器未响应,三重邪骨困难版

热门文章

  1. 公众号怎么做好微信营销,才能快速吸粉和变现?
  2. html 的打印和下载
  3. 决策树算法预测NBA赛事结果
  4. 将英雄对战、生存竞技的乐趣巧妙融合的动作手游——风云岛行动
  5. [宋史学习] 雍熙北伐
  6. 费孝通乡土中国阅读笔记——家族
  7. 怎么让段落自动空两格_word自动空两格 如何设置word中段首自动空两格
  8. 普通网红如何打造个人IP?个人IP打造有哪些注意事项?
  9. android APP隐藏NavigationBar,通过修改framework隐藏/显示 navigation bar
  10. 用户隐私协议弹窗html,服务协议和隐私政策 ,首次启动弹窗