前言

本节我们来学习如何使用 Flowable。

开始


之前我们所的上游和下游分别是Observable和Observer, 这次不一样的是上游变成了Flowable, 下游变成了Subscriber, 但是水管之间的连接还是通过subscribe(), 我们来看看最基本的用法吧:

Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {Log.d(TAG, "emit 1");emitter.onNext(1);Log.d(TAG, "emit 2");emitter.onNext(2);Log.d(TAG, "emit 3");emitter.onNext(3);Log.d(TAG, "emit complete");emitter.onComplete();}}, BackpressureStrategy.ERROR); //增加了一个参数Subscriber<Integer> downstream = new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");s.request(Long.MAX_VALUE);  //注意这句代码}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: " + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}};upstream.subscribe(downstream);

运行结果 :

D/TAG: onSubscribe
D/TAG: emit 1
D/TAG: onNext: 1
D/TAG: emit 2
D/TAG: onNext: 2
D/TAG: emit 3
D/TAG: onNext: 3
D/TAG: emit complete
D/TAG: onComplete

我们注意到这次和 Observable 有些不同. 首先是创建 Flowable 的时候增加了一个参数, 这个参数是用来选择背压,也就是出现上下游流速不均衡的时候应该怎么处理的办法, 这里我们直接用BackpressureStrategy.ERROR 这种方式, 这种方式会在出现上下游流速不均衡的时候直接抛出一个异常,这个异常就是著名的MissingBackpressureException. 其余的策略后面再来讲解.

另外的一个区别是在下游的onSubscribe方法中传给我们的不再是Disposable了, 而是Subscription, 它俩有什么区别呢, 首先它们都是上下游中间的一个开关, 之前我们说调用Disposable.dispose()方法可以切断水管, 同样的调用Subscription.cancel()也可以切断水管, 不同的地方在于Subscription增加了一个void request(long n)方法, 这个方法有什么用呢, 在上面的代码中也有这么一句代码:

 s.request(Long.MAX_VALUE);

这句代码有什么用呢, 不要它可以吗? 我们来试试:

 Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {Log.d(TAG, "emit 1");emitter.onNext(1);Log.d(TAG, "emit 2");emitter.onNext(2);Log.d(TAG, "emit 3");emitter.onNext(3);Log.d(TAG, "emit complete");emitter.onComplete();}}, BackpressureStrategy.ERROR).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: " + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});

这次我们取消掉了request这句代码, 来看看运行结果:

从运行结果中可以看到, 在上游发送第一个事件之后, 下游就抛出了一个著名的MissingBackpressureException异常, 并且下游没有收到任何其余的事件. 可是这是一个同步的订阅呀, 上下游工作在同一个线程, 上游每发送一个事件应该会等待下游处理完了才会继续发事件啊, 不可能出现上下游流速不均衡的问题呀.

带着这个疑问, 我们再来看看异步的情况:

Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {Log.d(TAG, "emit 1");emitter.onNext(1);Log.d(TAG, "emit 2");emitter.onNext(2);Log.d(TAG, "emit 3");emitter.onNext(3);Log.d(TAG, "emit complete");emitter.onComplete();}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");mSubscription = s;}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: " + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});

运行结果:

D/TAG: onSubscribe
D/TAG: emit 1
D/TAG: emit 2
D/TAG: emit 3
D/TAG: emit complete

哎, 这次上游正确的发送了所有的事件, 但是下游一个事件也没有收到. 这是因为什么呢?

这是因为Flowable在设计的时候采用了一种新的思路也就是响应式拉取的方式来更好的解决上下游流速不均衡的问题, 与我们之前所讲的控制数量和控制速度不太一样, 这种方式用通俗易懂的话来说就好比是叶问打鬼子, 我们把上游看成小日本, 把下游当作叶问, 当调用Subscription.request(1)时, 叶问就说我要打一个! 然后小日本就拿出一个鬼子给叶问, 让他打, 等叶问打死这个鬼子之后, 再次调用request(10), 叶问就又说我要打十个! 然后小日本又派出十个鬼子给叶问, 然后就在边上看热闹, 看叶问能不能打死十个鬼子, 等叶问打死十个鬼子后再继续要鬼子接着打…

所以我们把request当做是一种能力, 当成下游处理事件的能力, 下游能处理几个就告诉上游我要几个, 这样只要上游根据下游的处理能力来决定发送多少事件, 就不会造成一窝蜂的发出一堆事件来, 从而导致OOM. 这也就完美的解决之前我们所学到的两种方式的缺陷, 过滤事件会导致事件丢失, 减速又可能导致性能损失. 而这种方式既解决了事件丢失的问题, 又解决了速度的问题, 完美 !

但是太完美的东西也就意味着陷阱也会很多, 你可能只是被它的外表所迷惑, 失去了理智, 如果你滥用或者不遵守规则, 一样会吃到苦头.

比如这里需要注意的是, 只有当上游正确的实现了如何根据下游的处理能力来发送事件的时候, 才能达到这种效果, 如果上游根本不管下游的处理能力, 一股脑的瞎他妈发事件, 仍然会产生上下游流速不均衡的问题, 这就好比小日本管他叶问要打几个, 老子直接拿出1万个鬼子, 这尼玛有种打死给我看看? 那么如何正确的去实现上游呢, 这里先卖个关子, 之后我们再来讲解.

学习了request, 我们就可以解释上面的两段代码了.

首先第一个同步的代码, 为什么上游发送第一个事件后下游就抛出了MissingBackpressureException异常, 这是因为下游没有调用request, 上游就认为下游没有处理事件的能力, 而这又是一个同步的订阅, 既然下游处理不了, 那上游不可能一直等待吧, 如果是这样, 万一这两根水管工作在主线程里, 界面不就卡死了吗, 因此只能抛个异常来提醒我们. 那如何解决这种情况呢, 很简单啦, 下游直接调用request(Long.MAX_VALUE)就行了, 或者根据上游发送事件的数量来request就行了, 比如这里request(3)就可以了.

然后我们再来看看第二段代码, 为什么上下游没有工作在同一个线程时, 上游却正确的发送了所有的事件呢? 这是因为在Flowable里默认有一个大小为128的水缸, 当上下游工作在不同的线程中时, 上游就会先把事件发送到这个水缸中, 因此, 下游虽然没有调用request, 但是上游在水缸中保存着这些事件, 只有当下游调用request时, 才从水缸里取出事件发给下游.

是不是这样呢, 我们来验证一下:

    public static void request(long n) {mSubscription.request(n); //在外部调用request请求上游}public static void demo3() {Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {Log.d(TAG, "emit 1");emitter.onNext(1);Log.d(TAG, "emit 2");emitter.onNext(2);Log.d(TAG, "emit 3");emitter.onNext(3);Log.d(TAG, "emit complete");emitter.onComplete();}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");mSubscription = s;  //把Subscription保存起来}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: " + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});}

这里我们把 Subscription 保存起来, 在界面上增加了一个按钮, 点击一次就调用 Subscription.request(1), 来看看运行结果:

结果似乎像那么回事, 上游发送了四个事件保存到了水缸里, 下游每request一个, 就接收一个进行处理.

刚刚我们有说到水缸的大小为128, 有朋友就问了, 你说128就128吗, 又不是唯品会周年庆, 我不信. 那就来验证一下:

Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {for (int i = 0; i < 128; i++) {Log.d(TAG, "emit " + i);emitter.onNext(i);}}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");mSubscription = s;}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: " + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});

运行结果:

D/TAG: onSubscribe
D/TAG: emit 0...
D/TAG: emit 126
D/TAG: emit 127

这段代码的运行结果很正常, 没有任何错误和异常, 上游仅仅是发送了128个事件.

【Android -- RxJava】RxJava2.0 教程(七),如何使用 Flowable相关推荐

  1. 给初学者的RxJava2.0教程(七)(转载)

    前言 上一节里我们学习了只使用Observable如何去解决上下游流速不均衡的问题, 之所以学习这个是因为Observable还是有很多它使用的场景, 有些朋友自从听说了Flowable之后就觉得Fl ...

  2. 给初学者的RxJava2.0教程(七)

    前言 上一节里我们学习了只使用Observable如何去解决上下游流速不均衡的问题, 之所以学习这个是因为Observable还是有很多它使用的场景, 有些朋友自从听说了Flowable之后就觉得Fl ...

  3. RxJava2.0教程(七)

    谢谢原作者! 前言 上一节里我们学习了只使用Observable如何去解决上下游流速不均衡的问题, 之所以学习这个是因为Observable还是有很多它使用的场景, 有些朋友自从听说了Flowable ...

  4. 给初学者的RxJava2.0教程(七)(转)

    前言 上一节里我们学习了只使用Observable如何去解决上下游流速不均衡的问题, 之所以学习这个是因为Observable还是有很多它使用的场景, 有些朋友自从听说了Flowable之后就觉得Fl ...

  5. RxJava2.0教程(四)

    RxJava2.0教程(四) 上一节里我们学习了只使用Observable如何去解决上下游流速不均衡的问题, 之所以学习这个是因为Observable还是有很多它使用的场景, 有些朋友自从听说了Flo ...

  6. 给初学者的RxJava2.0教程(二):【线程控制】

    CSD转载地址:http://blog.csdn.net/qq_23179075/article/details/79256089 作者:Season_zlc 链接:https://www.jians ...

  7. 给初学者的RxJava2.0教程(三):【操作符:Map、FlatMap、ConcatMap】

    CSDN转载地址:http://blog.csdn.net/qq_23179075/article/details/79271365 作者:Season_zlc 链接:https://www.jian ...

  8. 给初学者的RxJava2.0教程(一):【入门】

    CSDN转载地址:http://blog.csdn.net/qq_23179075/article/details/79255971 作者:Season_zlc 链接:https://www.jian ...

  9. Android RxJava 2.0中backpressure(背压)概念的理解

    英文原文:https://github.com/ReactiveX/RxJava/wiki/Backpressure Backpressure(背压.反压力) 在rxjava中会经常遇到一种情况就是被 ...

最新文章

  1. HTTP简介、请求方法与响应状态码
  2. Linux中pthread源码在哪,pthread - 源码下载|系统编程|Linux/Unix编程|源代码 - 源码中国...
  3. numpy的常规使用(数组合并、拼接、添加)
  4. mybatis动态调用表名和字段名
  5. JavaScript实现的水珠动画效果
  6. C++的new和delete
  7. mysql数据=_mysql 数据操作
  8. 数据结构之删除线性表中的元素
  9. Ubuntu 16.04 LTS 安装Mongodb 3.4
  10. 点击微信网页的a标签直接跳转到淘宝APP打开怎么实现的?附:动图演示效果
  11. 31省份及直辖市自治区的下拉框代码
  12. julia安装源_Julia 国内镜像安装实测
  13. [乐意黎原创] 送李愿归盘谷序
  14. linux下doc转docx
  15. 每个人都会经历一段迷茫
  16. 接上一篇Trao文本行数及省略号问题
  17. JavaScript ES12新特性抢先体验
  18. Java编写程序求一个正整数(自然数)的阶乘
  19. 根据16进制输出所有汉字
  20. 股票市场市价委托类型

热门文章

  1. 塞翁失马又得马,福祸循环,人生如此相似
  2. iphone11各机型对比_iPhone11和iPhone12系列对比:新款有何改变?
  3. 玩绝地求生:刺激战场如何设置灵敏度效果最好?刺激战场灵敏度攻略分享
  4. KNN算法原理及python实现
  5. matlab图像处理关于细胞计数
  6. 旧瓶装新酒:金融科技是互联网金融的避风港吗?
  7. MySQL存储引擎(一)——相关命令
  8. mysql 创建表 key_mysql 建表时key
  9. AGX Xavier安装中文输入法
  10. Pinpoint介绍