@(Rxjava学习笔记)

RxJava 学习笔记(八) — Combining 结合操作

  • RxJava 学习笔记八 Combining 结合操作

    • StartWith 在数据序列的开头插入一条指定的项
    • Merge 将多个Observable合并为一个
    • MergeDelayError 将多个Observable合并为一个
    • Zip 使用一个函数组合多个Observable发射的数据集合然后再发射这个结果
    • CombineLatest 当两个Observables中的任何一个发射了一个数据时通过一个指定的函数组合每个Observable发射的最新数据一共两个数据然后发射这个函数的结果
    • Join 无论何时如果一个Observable发射了一个数据项只要在另一个Observable发射的数据项定义的时间窗口内就将两个Observable发射的数据合并发射
    • GroupJoin groupJoin操作符非常类似于join操作符区别在于join操作符中第四个参数的传入函数不一致
    • SwitchOnNext 将一个发射Observables的Observable转换成另一个Observable后者发射这些Observables最近发射的数据

1. StartWith —> 在数据序列的开头插入一条指定的项

一个Observable在发射数据之前先发射一个指定的数据序列

示例代码:

Observable.just(10,20,30).startWith(2, 3, 4).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}@Overridepublic void onError(Throwable e) {System.err.println("Error: " + e.getMessage());}@Overridepublic void onNext(Integer value) {System.out.println("Next:" + value);}});

输出:

Next:2
Next:3
Next:4
Next:10
Next:20
Next:30
Sequence complete.
  • Javadoc:startWith(T) (最多接受九个参数)
  • Javadoc:startWith(Iterable)

你也可以传递一个ObservablestartWith,它会将那个Observable的发射物插在原始Observable发射的数据序列之前,然后把这个当做自己的发射物集合。这可以看作是Concat的反转。

示例代码:

Observable.just(1,2,3,4,5).startWith(Observable.just(9,8,5)).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {System.out.println(integer+"");}});

输出:

9
8
5
1
2
3
4
5
  • Javadoc:startWith(Observable)

2. Merge —> 将多个Observable合并为一个

使用Merge操作符你可以将多个Observables的输出合并,就好像它们是一个单个的Observable一样。

Merge可能会让合并的Observables发射的数据交错(有一个类似的操作符Concat不会让数据交错,它会按顺序一个接着一个发射多个Observables的发射物)。

正如图例上展示的,任何一个原始ObservableonError通知会被立即传递给观察者,而且会终止合并后的Observable

示例代码:

Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler);
Observable<Integer> evens = Observable.just(2, 4, 6);Observable.merge(odds, evens).subscribe(new Subscriber<Integer>() {@Overridepublic void onNext(Integer item) {System.out.println("Next: " + item);}@Overridepublic void onError(Throwable error) {System.err.println("Error: " + error.getMessage());}@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}});

输出:

Next: 1
Next: 3
Next: 5
Next: 2
Next: 4
Next: 6
Sequence complete.

除了传递多个Observablemerge,你还可以传递一个Observable列表List,数组,甚至是一个发射Observable序列的Observablemerge将合并它们的输出作为单个Observable的输出:

如果你传递一个发射Observables序列的Observable,你可以指定merge应该同时订阅的Observable‘的最大数量。一旦达到订阅数的限制,它将不再订阅原始Observable发射的任何其它Observable,直到某个已经订阅的Observable发射了onCompleted通知。

  • Javadoc:merge(Observable,Observable) (接受二到九个Observable)
  • Javadoc:merge(Observable[])
  • Javadoc:merge(Iterable)
  • Javadoc:merge(Iterable,int)

3. MergeDelayError —> 将多个Observable合并为一个

它的行为有一点不同,它会保留onError通知直到合并后的Observable所有的数据发射完成,在那时它才会把onError传递给观察者。

示例代码:

        //产生0,5,10数列,最后会产生一个错误Observable<Long> errorObservable = Observable.error(new Exception("this is end!"));Observable < Long > observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS).map(new Func1<Long, Long>() {@Overridepublic Long call(Long aLong) {return aLong * 5;}}).take(3).mergeWith(errorObservable.delay(3500, TimeUnit.MILLISECONDS));//产生0,10,20,30,40数列Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS).map(new Func1<Long, Long>() {@Overridepublic Long call(Long aLong) {return aLong * 10;}}).take(5);Observable.mergeDelayError(observable1, observable2).subscribe(new Subscriber<Long>() {@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}@Overridepublic void onError(Throwable e) {System.err.println("Error: " + e.getMessage());}@Overridepublic void onNext(Long aLong) {System.out.println("Next:" + aLong);}});

输出:

Next:0
Next:0
Next:10
Next:20
Next:30
Next:40
Error: this is end!
  • Javadoc:mergeDelayError(Observable)
  • Javadoc:mergeDelayError(Observable,Observable)

4. Zip —> 使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果

Zip
通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。

Zip操作符返回一个Obversable,它使用这个函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。


zip的最后一个参数接受每个Observable发射的一项数据,返回被压缩后的数据,它可以接受一到九个参数:一个Observable序列,或者一些发射ObservableObservables

Observable<Integer> observable1 = Observable.just(10,20,30);Observable<Integer> observable2 = Observable.just(4, 8, 12, 16);Observable.zip(observable1, observable2, new Func2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer integer, Integer integer2) {return integer + integer2;}}).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}@Overridepublic void onError(Throwable e) {System.err.println("Error: " + e.getMessage());}@Overridepublic void onNext(Integer value) {System.out.println("Next:" + value);}});

输出:

Next:14
Next:28
Next:42
Sequence complete.
  • Javadoc:zip(Iterable,FuncN)
  • Javadoc:zip(Observable,FuncN)
  • Javadoc:zip(Observable,Observable,Func2) (最多可以有九个Observables参数)

ZipWith

zipWith操作符总是接受两个参数,第一个参数是一个Observable或者一个Iterable

zipzipWith默认不在任何特定的操作符上执行。

  • Javadoc:zipWith(Observable,Func2)
  • Javadoc:zipWith(Iterable,Func2)

5. CombineLatest —> 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果

CombineLatest操作符行为类似于zip,但是只有当原始的Observable中的每一个都发射了一条数据时zip才发射数据。CombineLatest则在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时,CombineLatest使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。

示例代码:

//产生0,5,10,15,20数列Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS).map(new Func1<Long, Long>() {@Overridepublic Long call(Long aLong) {return aLong * 5;}}).take(5);//产生0,10,20,30,40数列Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS).map(new Func1<Long, Long>() {@Overridepublic Long call(Long aLong) {return aLong * 10;}}).take(5);Observable.combineLatest(observable1, observable2, new Func2<Long, Long, Long>() {@Overridepublic Long call(Long aLong, Long aLong2) {return aLong+aLong2;}}).subscribe(new Subscriber<Long>() {@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}@Overridepublic void onError(Throwable e) {System.err.println("Error: " + e.getMessage());}@Overridepublic void onNext(Long aLong) {System.out.println("Next: " + aLong);}});

输出:

Next: 0
Next: 5
Next: 15
Next: 20
Next: 30
Next: 35
Next: 45
Next: 50
Next: 60
Sequence complete.
  • Javadoc:combineLatest(List,FuncN)
  • Javadoc:combineLatest(Observable,Observable,Func2)

6. Join —> 无论何时,如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射

zip()merge()方法作用在发射数据的范畴内,在决定如何操作值之前有些场景我们需要考虑时间的。RxJavajoin()函数基于时间窗口将两个Observables发射的数据结合在一起。+

join方法的用法如下:
observableA.join(observableB,
observableA产生结果生命周期控制函数,
observableB产生结果生命周期控制函数,
observableA产生的结果与observableB产生的结果的合并规则)

示例代码:

//产生0,5,10,15,20数列Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS).map(new Func1<Long, Long>() {@Overridepublic Long call(Long aLong) {return aLong * 5;}}).take(5);//产生0,10,20,30,40数列Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS).map(new Func1<Long, Long>() {@Overridepublic Long call(Long aLong) {return aLong * 10;}}).take(5);observable1.join(observable2, new Func1<Long, Observable<Long>>() {@Overridepublic Observable<Long> call(Long aLong) {//使Observable延迟600毫秒执行return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);}}, new Func1<Long, Observable<Long>>() {@Overridepublic Observable<Long> call(Long aLong) {//使Observable延迟600毫秒执行return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);}}, new Func2<Long, Long, Long>() {@Overridepublic Long call(Long aLong, Long aLong2) {return aLong + aLong2;}}).subscribe(new Subscriber<Long>() {@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}@Overridepublic void onError(Throwable e) {System.err.println("Error: " + e.getMessage());}@Overridepublic void onNext(Long aLong) {System.out.println("Next: " + aLong);}});

输出:

Next: 0
Next: 5
Next: 15
Next: 20
Next: 30
Next: 35
Next: 45
Next: 50
Next: 60
Sequence complete.
  • Javadoc:Join(Observable,Func1,Func1,Func2)

7. GroupJoin —> groupJoin操作符非常类似于join操作符,区别在于join操作符中第四个参数的传入函数不一致

示例代码:

        //产生0,5,10,15,20数列Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS).map(new Func1<Long, Long>() {@Overridepublic Long call(Long aLong) {return aLong * 5;}}).take(5);//产生0,10,20,30,40数列Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS).map(new Func1<Long, Long>() {@Overridepublic Long call(Long aLong) {return aLong * 10;}}).take(5);observable1.groupJoin(observable2, new Func1<Long, Observable<Long>>() {@Overridepublic Observable<Long> call(Long aLong) {return Observable.just(aLong).delay(1600, TimeUnit.MILLISECONDS);}}, new Func1<Long, Observable<Long>>() {@Overridepublic Observable<Long> call(Long aLong) {return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);}}, new Func2<Long, Observable<Long>, Observable<Long>>() {@Overridepublic Observable<Long> call(Long aLong, Observable<Long> observable) {return observable.map(new Func1<Long, Long>() {@Overridepublic Long call(Long aLong2) {return aLong + aLong2;}});}}).subscribe(new Subscriber<Observable<Long>>() {@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}@Overridepublic void onError(Throwable e) {System.err.println("Error: " + e.getMessage());}@Overridepublic void onNext(Observable<Long> observable) {observable.subscribe(new Subscriber<Long>() {@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onNext(Long aLong) {System.out.println("Next: " + aLong);}});}});

输出:

Next: 5
Next: 15
Next: 10
Next: 20
Next: 25
Next: 30
Next: 35
Next: 45
Next: 40
Next: 50
Next: 60
Next: 55
Sequence complete.
  • Javadoc:groupJoin(Observable,Func1,Func1,Func2)

8. SwitchOnNext —> 将一个发射Observables的Observable转换成另一个Observable,后者发射这些Observables最近发射的数据

switchOnNext操作符是把一组Observable转换成一个Observable,转换规则为:对于这组Observable中的每一个Observable所产生的结果,如果在同一个时间内存在两个或多个Observable提交的结果,只取最后一个Observable提交的结果给订阅者

示例代码:

        //每隔500毫秒产生一个observableObservable<Observable<Long>> observable = Observable.timer(0, 500, TimeUnit.MILLISECONDS).map(new Func1<Long, Observable<Long>>() {@Overridepublic Observable<Long> call(Long aLong) {//每隔200毫秒产生一组数据(0,10,20,30,40)return Observable.timer(0, 200, TimeUnit.MILLISECONDS).map(new Func1<Long, Long>() {@Overridepublic Long call(Long aLong) {return aLong * 10;}}).take(5);}}).take(2);Observable.switchOnNext(observable).subscribe(new Subscriber<Long>() {@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}@Overridepublic void onError(Throwable e) {System.err.println("Error: " + e.getMessage());}@Overridepublic void onNext(Long aLong) {System.out.println("Next:" + aLong);}});

输出:

Next:0
Next:10
Next:20
Next:0
Next:10
Next:20
Next:30
Next:40
Sequence complete.
  • Javadoc:switchOnNext(Observable)

RxJava 学习笔记(八) --- Combining 结合操作相关推荐

  1. Mr.J-- jQuery学习笔记(八)--CSS样式操作

    CSS属性操作 在jQuery中,关于元素的样式操作方法共有2种: (1)CSS属性操作: (2)CSS类名操作: 三种设置方法 逐个设置 $("div").css("w ...

  2. C# 学习笔记(17)操作SQL Server 上

    C# 学习笔记(17)操作SQL Server上 安装SQL Server 微软官网 https://www.microsoft.com/zh-cn/sql-server/sql-server-dow ...

  3. Java中expecial,RxJava 学习笔记 (一)

    作者: 一字马胡 转载标志 [2017-12-13] 更新日志 日期 更新内容 备注 2017-12-13 RxJava学习笔记系列 系列笔记 (一) 2017-12-15 增加系列笔记(二) 201 ...

  4. ReactJS学习笔记八:动画

    ReactJS学习笔记八:动画 分类: react学习笔记 javascript2015-07-06 20:27 321人阅读 评论(0) 收藏 举报 react动画 目录(?)[+] 这里只讨论Re ...

  5. Halcon 学习笔记八:颜色识别

    Halcon 学习笔记八:颜色识别 一.图像处理需要的知识 二.图像处理的预处理和分割过程 二.颜色识别的方法 三.例子一 四.例子二 五.例子三 一.图像处理需要的知识 1.图像处理基础(rgb(h ...

  6. ZooKeeper学习笔记(八):ZooKeeper集群写数据原理

    写数据原理 写流程直接请求发送给Leader节点 这里假设集群中有三个zookeeper服务端 ACK (Acknowledge character)即是确认字符,在数据通信中,接收站发给发送站的一种 ...

  7. MongoDB 学习笔记八 复制、分片、备份与恢复、监控

    MongoDB 学习笔记八 复制.分片.备份与恢复.监控 MongoDB复制(副本集) 什么是复制? MongoDB 复制原理 MongoDB 副本集设置 副本集添加成员 MongoDB 分片 分片 ...

  8. Redis 学习笔记八:集群模式

    Redis 学习笔记八:集群模式 作者:Grey 原文地址: 博客园:Redis 学习笔记八:集群模式 CSDN:Redis 学习笔记八:集群模式 前面提到的Redis 学习笔记七:主从复制和哨兵只能 ...

  9. 黑马程序员_java自学学习笔记(八)----网络编程

    黑马程序员_java自学学习笔记(八)----网络编程 android培训. java培训.期待与您交流! 网络编程对于很多的初学者来说,都是很向往的一种编程技能,但是很多的初学者却因为很长一段时间无 ...

最新文章

  1. 一款让你轻松在IDEA画图的插件!
  2. 案例 | 用pdpipe搭建pandas数据分析流水线
  3. 真的已经讲烂了!java字符串转对象
  4. Appium移动自动化测试-----(一)Appium介绍
  5. 最适合游戏的显卡排行榜,显卡天梯图
  6. C语言实现拟合直线方程
  7. html 布局 拖拽 在线,可视化编辑 - 拖拽式编辑网页模板无需代码,自由拖拽布局,即可完成网站设计制作!...
  8. 微信三方平台调试过程中遇到的问题
  9. 我的世界我服务器注册密码大全,网易账号密码大全我的世界 | 手游网游页游攻略大全...
  10. 怎么定位门面位置_教您,如何在高德和腾讯定位自己的店铺位置
  11. Spring batch系列文章(一)——介绍和入门
  12. 螺旋矩阵(Java)
  13. 怎么让联想计算机升级,联想电脑怎么升级win11?联想电脑升级win11的几种方法...
  14. python二手房使用教程_利用Python对链家网北京二手房进行简单数据分析
  15. 不惧新冠疫情 DDN公司销售收入逆势增长
  16. C1任务一01-信息编码
  17. MOOS-ivp 实验三 MOOS简介(2)
  18. Python Ajax爬取微博个人博客数据
  19. python随机生成一个数字_python随机生成
  20. 某高校毕业设计-数据分析课题技术实现篇

热门文章

  1. jsp高级DOM和BOM
  2. 企业级BOM系统与外部系统集成的几种方案
  3. STM32 keil中__IO得意思
  4. 锋利的jQuery学习总结
  5. .net使用Microsoft.Office给word添加自定义水印
  6. C语言生成n个随机坐标,c语言如何生成随机数 怎样用c语言生成n个随机数?
  7. 方舟手游非官方服务器修改器,方舟非官方服务器作弊码 | 手游网游页游攻略大全...
  8. 玉米生吃好还是熟吃好 各种情况分析
  9. 移动端h5调起高德地图、百度地图实现路线及路况查询
  10. 测试大会能给我们带来什么?