转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/76443347
本文出自【赵彦军的博客】

以前写过 Rxjava 系列教程, 如下所示

  • RxJava 和 RxAndroid 一 (基础)
  • RxJava 和 RxAndroid 二(操作符的使用)
  • RxJava 和 RxAndroid 三(生命周期控制和内存优化)
  • RxJava 和 RxAndroid 四(RxBinding的使用)
  • RxJava 和 RxAndroid 五(线程调度)

上面的这些教程覆盖了 rxjava 的方方面面,很详细。只是当时写的时候是基于 rxjava 1.X 的版本写的,后来 rxjava 进入了快速迭代的时期,很快就出现了 2.x 版本。根据 Rxjava 官方的GitHub 来看,2.x 相对于 1.x 做了很多改进,删除了不少的类,同时也增加了一些新的类。基于以上背景,以前的这些文章,就显得有些不足,为了紧跟 rxjava 的步伐,下面的这篇博客,就是对 rxjava 的重新认识。

Rxjava、RxAndroid

Rxjava : https://github.com/ReactiveX/RxJava

RxAndroid : https://github.com/ReactiveX/RxAndroid

添加依赖

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.2'

create() :创建

create操作符应该是最常见的操作符了,主要用于产生一个Obserable被观察者对象,为了方便大家的认知,以后的教程中统一把被观察者Observable称为发射器(上游事件),观察者Observer称为接收器(下游事件)。

Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);e.onComplete(); //结束e.onNext( 4 );}}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {Log.e("zhao", "onSubscribe: " + d.isDisposed());}@Overridepublic void onNext(@NonNull Integer integer) {Log.e("zhao", "onNext: " + integer);}@Overridepublic void onError(@NonNull Throwable e) {Log.e("zhao", "onError: ");}@Overridepublic void onComplete() {Log.e("zhao", "onComplete: ");}});

结果是:

E/zhao: onSubscribe: false
E/zhao: onNext: 1
E/zhao: onNext: 2
E/zhao: onNext: 3
E/zhao: onComplete: 

需要注意的几点是:

1)在发射完 3 之后, 调用 e.onComplete() 方法,结束 发射数据。4 没有发射出来。

2) 另外一个值得注意的点是,在RxJava 2.x中,可以看到发射事件方法相比1.x多了一个throws Excetion,意味着我们做一些特定操作再也不用try-catch了。

3) 并且2.x 中有一个Disposable概念,这个东西可以直接调用切断,可以看到,当它的isDisposed()返回为false的时候,接收器能正常接收事件,但当其为true的时候,接收器停止了接收。所以可以通过此参数动态控制接收事件了。

在上面接收数据的时候,我们用了 Observer 对象,需要实现 4 个 方法。这显得过于累赘,我们可以用 Consumer 对象来代替 Observer 对象,代码如下:

Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);e.onComplete();e.onNext(4);}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Log.e("zhao", "accept: " + integer);}});

效果如下:

 E/zhao: accept: 1E/zhao: accept: 2E/zhao: accept: 3

需要注意的是:

1)、Consumer 对象完全代替了Observer ,效果是一样的。Consumer 顾名思义是消费者的意思,是消费数据的对象。Consumer 对象是 Rxjava 2.x 才出现的,老版本没有。

map 操作符

map基本算是 RxJava 中一个最简单的操作符了,熟悉 RxJava 1.x 的知道,它的作用是对发射时间发送的每一个事件应用一个函数,是的每一个事件都按照指定的函数去变化,而在2.x中它的作用几乎一致。

Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);}}).map(new Function<Integer, String>() {@Overridepublic String apply(@NonNull Integer integer) throws Exception {// map 操作符,就是转换输入、输出 的类型;本例中输入是 Integer , 输出是 String 类型Log.e("zhao", "apply: " + integer + "  线程:" + Thread.currentThread().getName());return "This is result " + integer;}}).subscribeOn(Schedulers.io()) //在子线程发射.observeOn(AndroidSchedulers.mainThread())  //在主线程接收.subscribe(new Consumer<String>() {@Overridepublic void accept(@NonNull String s) throws Exception {Log.e("zhao", "accept: " + s + "  线程:" + Thread.currentThread().getName());}
});

结果是:

E/zhao: apply: 1  线程:RxCachedThreadScheduler-1
E/zhao: apply: 2  线程:RxCachedThreadScheduler-1
E/zhao: apply: 3  线程:RxCachedThreadScheduler-1
E/zhao: accept: This is result 1  线程:main
E/zhao: accept: This is result 2  线程:main
E/zhao: accept: This is result 3  线程:main

flatMap 操作符

FlatMap 是一个很有趣的东西,我坚信你在实际开发中会经常用到。它可以把一个发射器Observable 通过某种方法转换为多个Observables,然后再把这些分散的Observables装进一个单一的发射器Observable。但有个需要注意的是,flatMap并不能保证事件的顺序,如果需要保证,需要用到我们下面要讲的ConcatMap。

Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);}}).flatMap(new Function<Integer, ObservableSource<String>>() {@Overridepublic ObservableSource<String> apply(@NonNull Integer integer) throws Exception {List<String> list = new ArrayList<>();for (int i = 0; i < 3; i++) {list.add("I am value " + integer);}//随机生成一个时间int delayTime = (int) (1 + Math.random() * 10);return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {Log.e("zhao", "accept: " + s);}});

效果如下:

E/zhao: accept: I am value 1
E/zhao: accept: I am value 3
E/zhao: accept: I am value 3
E/zhao: accept: I am value 3
E/zhao: accept: I am value 1
E/zhao: accept: I am value 1
E/zhao: accept: I am value 2
E/zhao: accept: I am value 2
E/zhao: accept: I am value 2

一切都如我们预期中的有意思,为了区分concatMap(下一个会讲),我在代码中特意动了一点小手脚,我采用一个随机数,生成一个时间,然后通过delay(后面会讲)操作符,做一个小延时操作,而查看Log日志也确认验证了我们上面的说法,它是无序的。

concatMap 操作符

上面其实就说了,concatMap 与 FlatMap 的唯一区别就是 concatMap 保证了顺序,所以,我们就直接把 flatMap 替换为 concatMap 验证吧。

Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);}}).concatMap(new Function<Integer, ObservableSource<String>>() {@Overridepublic ObservableSource<String> apply(@NonNull Integer integer) throws Exception {List<String> list = new ArrayList<>();for (int i = 0; i < 3; i++) {list.add("I am value " + integer);}//随机生成一个时间int delayTime = (int) (1 + Math.random() * 10);return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {Log.e("zhao", "accept: " + s);}});

效果如下:

E/zhao: accept: I am value 1
E/zhao: accept: I am value 1
E/zhao: accept: I am value 1
E/zhao: accept: I am value 2
E/zhao: accept: I am value 2
E/zhao: accept: I am value 2
E/zhao: accept: I am value 3
E/zhao: accept: I am value 3
E/zhao: accept: I am value 3

zip 操作符

构建一个 String 发射器 和 Integer 发射器

  //创建 String 发射器
private Observable<String> getStringObservable() {return Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {e.onNext("A");e.onNext("B");e.onNext("C");}});}//创建 String 发射器
private Observable<Integer> getIntegerObservable() {return Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);e.onNext(4);e.onNext(5);}});}

使用 zip 操作符

Observable.zip(getStringObservable(), getIntegerObservable(), new BiFunction<String, Integer, String>() {@Overridepublic String apply(@NonNull String s, @NonNull Integer integer) throws Exception {return s + integer;}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {Log.e("zhao", "accept: " + s);}});

效果如下:

E/zhao: accept: A1
E/zhao: accept: B2
E/zhao: accept: C3

需要注意的是:

1) zip 组合事件的过程就是分别从发射器A和发射器B各取出一个事件来组合,并且一个事件只能被使用一次,组合的顺序是严格按照事件发送的顺序来进行的,所以上面截图中,可以看到,1永远是和A 结合的,2永远是和B结合的。

2) 最终接收器收到的事件数量是和发送器发送事件最少的那个发送器的发送事件数目相同,所以如截图中,5很孤单,没有人愿意和它交往,孤独终老的单身狗。

interval 操作符

interval操作符是每隔一段时间就产生一个数字,这些数字从0开始,一次递增1直至无穷大

//方法1
Flowable.interval(1, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {@Overridepublic void accept(@NonNull Long aLong) throws Exception {Log.e("zhao", "accept11>: " + aLong);}});//方法2
Observable.interval(1, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {Log.e("zhao", "accept:22> " + aLong);}});

效果如下:


E/zhao: accept11>: 0
E/zhao: accept11>: 1
E/zhao: accept11>: 2
E/zhao: accept11>: 3
E/zhao: accept11>: 4

倒计时

既然 interval 操作符会产生从 0 到无穷大的序列,那么我们我们会返回来思考一下,如果倒过来想, 就会发现可以用 interval 方法,实现一个倒计时的功能。

创建一个倒计时的 Observable

/*** 产生一个倒计时的 Observable* @param time* @return*/public Observable<Long> countdown(final long time) {return Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, Long>() {@Overridepublic Long apply(@NonNull Long aLong) throws Exception {return time - aLong;}}).take( time + 1 );}

实现倒计时的功能

countdown(4).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {Log.e("zhao", "accept: 倒计时: " + aLong);}});

效果如下:

E/zhao: accept: 倒计时: 4
E/zhao: accept: 倒计时: 3
E/zhao: accept: 倒计时: 2
E/zhao: accept: 倒计时: 1
E/zhao: accept: 倒计时: 0

repeat 操作符:重复的发射数据

repeat 重复地发射数据

  • repeat( ) //无限重复
  • repeat( int time ) //设定重复的次数
Observable.just(1, 2).repeat( 3 ) //重复3次.subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Log.e("zhao", "accept: " + integer);}});

效果:

E/zhao: accept: 1
E/zhao: accept: 2
E/zhao: accept: 1
E/zhao: accept: 2
E/zhao: accept: 1
E/zhao: accept: 2

range :发射特定的整数序列

range 发射特定整数序列的 Observable

  • range( int start , int end ) //start :开始的值 , end :结束的值

要求: end >= start

 Observable.range( 1 , 5 ).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Log.e("zhao", "accept: " + integer);}});

效果:

E/zhao: accept: 1
E/zhao: accept: 2
E/zhao: accept: 3
E/zhao: accept: 4
E/zhao: accept: 5

fromArray : 遍历数组

Integer[] items = {0, 1, 2, 3, 4, 5};Observable.fromArray(items).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Log.e("zhao", "accept: " + integer);}});

效果是:

E/zhao: accept: 0
E/zhao: accept: 1
E/zhao: accept: 2
E/zhao: accept: 3
E/zhao: accept: 4
E/zhao: accept: 5

fromIterable : 遍历集合

List<String> list = new ArrayList<>();
list.add("a");
list.add("b");
list.add("c");Observable.fromIterable(list).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {Log.e("zhao", "accept: " + s);}});

效果

E/zhao: accept: a
E/zhao: accept: b
E/zhao: accept: c

toList : 把数据转换成 List 集合

Observable.just(1, 2, 3, 4).toList().subscribe(new Consumer<List<Integer>>() {@Overridepublic void accept(List<Integer> integers) throws Exception {Log.e("zhao", "accept: " + integers);}});

效果是

accept: [1, 2, 3, 4]

把数组转化成 List 集合

Integer[] items = {0, 1, 2, 3, 4, 5};Observable.fromArray( items )  //遍历数组.toList()  //把遍历后的数组转化成 List .subscribe(new Consumer<List<Integer>>() {@Overridepublic void accept(List<Integer> integers) throws Exception {Log.e("zhao", "accept: " + integers);}});

效果是:

 accept: [0, 1, 2, 3, 4, 5]

delay : 延迟发射数据

Observable.just(1, 2, 3).delay(3, TimeUnit.SECONDS)  //延迟3秒钟,然后在发射数据.subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Log.e("zhao", "accept: " + integer);}});

效果:

E/zhao: accept: 1
E/zhao: accept: 2
E/zhao: accept: 3

背压 BackPressure

背压产生的原因: 被观察者发送消息太快以至于它的操作符或者订阅者不能及时处理相关的消息。在 Rxjava 1.x 版本很容易就会报错,使程序发生崩溃。

...Caused by: rx.exceptions.MissingBackpressureException
...
...

为了解决这个问题,在RxJava2里,引入了Flowable这个类:Observable不包含 backpressure 处理,而 Flowable 包含。

下面我们来模拟一个触发背压的实例 , 发射器每1毫秒发射一个数据,接收器每一秒处理一个数据。数据产生是数据处理的1000 倍。

首先用 RxJava 2.x 版本的 Observable 来实现。

Observable.interval(1, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {Thread.sleep(1000);Log.e("zhao", "onNext: " + aLong);}});

经过测试,app 很健壮,没有发生崩溃,日志每1秒打印一次。在上面我们说到 2.x 版本中 Observable 不再支持背压,发神器生成的数据全部缓存在内存中。

Observable :

  • 不支持 backpressure 处理,不会发生 MissingBackpressureException 异常。

  • 所有没有处理的数据都缓存在内存中,等待被订阅者处理。

  • 坏处是:当产生的数据过快,内存中缓存的数据越来越多,占用大量内存。

然后用 RxJava 2.x 版本的 Flowable 来实现。

Flowable.interval(1, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {Thread.sleep(1000);Log.e("zhao", "onNext: " + aLong);}});

运行起来发生崩溃,崩溃日志如下:

io.reactivex.exceptions.OnErrorNotImplementedException: Can't deliver value 128 due to lack of requests
...
...Caused by: io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests

很明显发生了 MissingBackpressureException 异常 , 128 代表是 Flowable 最多缓存 128 个数据,缓存次超过 128 个数据,就会报错。可喜的是,Rxjava 已经给我们提供了解决背压的策略。

onBackpressureDrop

onBackpressureDrop() :当缓冲区数据满 128 个时候,再新来的数据就会被丢弃,如果此时有数据被消费了,那么就会把当前最新产生的数据,放到缓冲区。简单来说 Drop 就是直接把存不下的事件丢弃。

onBackpressureDrop 测试

Flowable.interval( 1 , TimeUnit.MILLISECONDS).onBackpressureDrop() //onBackpressureDrop 一定要放在 interval 后面否则不会生效.subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {Thread.sleep(1000);Log.e("zhao", "onNext: " + aLong);}});

效果如下:

E/zhao: onNext: 0
E/zhao: onNext: 1
...
E/zhao: onNext: 126
E/zhao: onNext: 127
E/zhao: onNext: 96129
E/zhao: onNext: 96130
E/zhao: onNext: 96131

从日志上分析来看,发射器发射的 0 ~ 127 总共 128 个数据是连续的,下一个数据就是 96129 , 128 ~ 96128 的数据被丢弃了。

注意事项

1、onBackpressureDrop 一定要放在 interval 后面否则不会生效

onBackpressureLatest

onBackpressureLatest 就是只保留最新的事件。

onBackpressureBuffer

  • onBackpressureBuffer:默认情况下缓存所有的数据,不会丢弃数据,这个方法可以解决背压问题,但是它有像 Observable 一样的缺点,缓存数据太多,占用太多内存。

  • onBackpressureBuffer(int capacity) :设置缓存队列大小,但是如果缓冲数据超过了设置的值,就会报错,发生崩溃。

onBackpressureBuffer(int capacity) 测试

Flowable.interval( 1 , TimeUnit.MILLISECONDS).onBackpressureBuffer( 1000 ) //设置缓冲队列大小为 1000.subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {Thread.sleep(1000);Log.e("zhao", "onNext: " + aLong);}});

运行起来后,过了几秒钟,发生崩溃,日志如下:

io.reactivex.exceptions.OnErrorNotImplementedException: Buffer is full
···
Caused by: io.reactivex.exceptions.MissingBackpressureException: Buffer is full

通过日志可以看出,缓冲区已经满了。

注意事项

1、onBackpressureBuffer 一定要放在 interval 后面否则不会生效

参考资料

RxJava2 源码分析

如何形象地描述 RxJava 中的背压和流控机制?

给初学者的RxJava2.0教程(八): Flowable缓存


个人微信号:zhaoyanjun125 , 欢迎关注

RxJava 2.x 使用最佳实践相关推荐

  1. RxJava系列7(最佳实践)

    RxJava系列1(简介) RxJava系列2(基本概念及使用介绍) RxJava系列3(转换操作符) RxJava系列4(过滤操作符) RxJava系列5(组合操作符) RxJava系列6(从微观角 ...

  2. fir.im Weekly - 2016 年 Android 最佳实践列表

    2016 年已经过去一半,你在年初制定的成长计划都实现了吗? 学海无涯,技术成长不是一簇而就的事情.本期 fir.im Weekly 推荐 王下邀月熊_Chevalier的 我的编程之路--知识管理与 ...

  3. Android开发最佳实践

    原文链接:https://github.com/futurice/android-best-practices 转载来源:http://blog.csdn.net/asce1885/article/d ...

  4. 2016里一些Android最佳实践列表——Opinionated

    本文是一篇属于Opinionated的文章,只是代表了作者的个人观点,笔者看到Medium有两人发了都是关于最佳实践的Checklist,就把二者集成了下,并且加入了一些个人的看法,基本的知识点分布方 ...

  5. Java 设计模式最佳实践:1~5

    原文:Design Patterns and Best Practices in Java 协议:CC BY-NC-SA 4.0 译者:飞龙 本文来自[ApacheCN Java 译文集],采用译后编 ...

  6. Android开发最佳实践---Futurice之见

    原文链接:https://github.com/futurice/android-best-practices 本文是Futurice公司的Android开发人员总结的最佳实践,遵循这些准则可以避免重 ...

  7. 编写高性能Java代码的最佳实践

    编写高性能Java代码的最佳实践 摘要:本文首先介绍了负载测试.基于APM工具的应用程序和服务器监控,随后介绍了编写高性能Java代码的一些最佳实践.最后研究了JVM特定的调优技巧.数据库端的优化和架 ...

  8. 提示和技巧:光线跟踪最佳实践

    提示和技巧:光线跟踪最佳实践 Tips and Tricks: Ray Tracing Best Practices 本文介绍了在游戏和其他实时图形应用程序中实现光线跟踪的最佳实践.我们尽可能简短地介 ...

  9. SQL Server 最佳实践分析器使用小结

    Best Practices Analyzer Tool for Microsoft SQL Server 2000是Microsoft SQL Server开发团队开发的一个数据库管理工具,可以让你 ...

最新文章

  1. 在Docker应用场景下 如何使用新技术快速实现DevOps
  2. MySQL完整备份,还原
  3. Nginx之windows下搭建
  4. 计算机二级access什么时候报名_全国计算机等级考试什么时候报名
  5. python mysql 基于 sqlalvhrmy_Python基于DB-API操作MySQL数据库过程解析
  6. 2.1.1 物理层接口特性、数据通信模型、物理层基本概念(数据、信号、码元 、信源、信道、信宿 、速率、波特、带宽)(转载)
  7. Android实践--监測网络状态
  8. unity 竖屏不能全屏显示
  9. 电路中常用的拉普拉斯变换
  10. GB28181协议介绍
  11. Syncthing - 远程文件实时同步(P2P)工具简介、安装及使用
  12. android 市场 上传,安卓市场APP上传流程及审核要求
  13. 使用C++开发的NES(FC)模拟器
  14. 16k Star!一个开源的命令行视频播放器
  15. 登陆远程kvm_KVM远程VMM管理
  16. 使用Pyppeteer进行gmail模拟登录
  17. V2X和D2D的链路级sidelink上的区别
  18. 苹果充电器怎么辨别真假_苹果MagSafe充电器怎么样?能搭配安卓机使用吗?
  19. 每日一讲:C语言getchar函数的用法
  20. 豆芽邀请码51136推荐,Mixgo CE在WIN7 下使用Mixly2.0

热门文章

  1. python编程入门指南 代码库在哪下-致Python初学者 Anaconda入门使用指南完整版
  2. python内置collections模块的使用
  3. python的GUI库PyQt5的使用
  4. ML-2 机器学习算法
  5. 把一个质量为M0 的物体加速到 时间慢100倍 需要多大能量
  6. 分类模型的性能评价指标(Classification Model Performance Evaluation Metric)
  7. Laravel 任务调度(Console)
  8. java调用存储过程
  9. SQLAlchemy 用于 MySQL创建表时的bug修正
  10. ORACLE 中为什么要把列名都转换成大写字母?