前言

上一节里我们学习了只使用Observable如何去解决上下游流速不均衡的问题, 之所以学习这个是因为Observable还是有很多它使用的场景, 有些朋友自从听说了Flowable之后就觉得Flowable能解决任何问题, 甚至有抛弃Observable这种想法, 这是万万不可的, 它们都有各自的优势和不足.

在这一节里我们先来学习如何使用Flowable, 它东西比较多, 也比较繁琐, 解释起来也比较麻烦, 但我还是尽量用通俗易懂的话来说清楚, 毕竟, 这是一个通俗易懂的教程.

正题

我们还是以两根水管举例子:

之前我们所的上游和下游分别是ObservableObserver, 这次不一样的是上游变成了Flowable, 下游变成了Subscriber, 但是水管之间的连接还是通过subscribe(), 我们来看看最基本的用法吧:

Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {Log.d(TAG, "emit 1");emitter.onNext(1);Log.d(TAG, "emit 2");emitter.onNext(2);Log.d(TAG, "emit 3");emitter.onNext(3);Log.d(TAG, "emit complete");emitter.onComplete();}}, BackpressureStrategy.ERROR); //增加了一个参数Subscriber<Integer> downstream = new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");s.request(Long.MAX_VALUE);  //注意这句代码}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: " + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}};upstream.subscribe(downstream);
复制代码

这段代码中,分别创建了一个上游Flowable和下游Subscriber, 上下游工作在同一个线程中, 和之前的Observable的使用方式只有一点点的区别, 先来看看运行结果吧:

D/TAG: onSubscribe
D/TAG: emit 1
D/TAG: onNext: 1
D/TAG: emit 2
D/TAG: onNext: 2
D/TAG: emit 3
D/TAG: onNext: 3
D/TAG: emit complete
D/TAG: onComplete
复制代码

结果也和我们预期的是一样的.

我们注意到这次和Observable有些不同. 首先是创建Flowable的时候增加了一个参数, 这个参数是用来选择背压,也就是出现上下游流速不均衡的时候应该怎么处理的办法, 这里我们直接用BackpressureStrategy.ERROR这种方式, 这种方式会在出现上下游流速不均衡的时候直接抛出一个异常,这个异常就是著名的MissingBackpressureException. 其余的策略后面再来讲解.

另外的一个区别是在下游的onSubscribe方法中传给我们的不再是Disposable了, 而是Subscription, 它俩有什么区别呢, 首先它们都是上下游中间的一个开关, 之前我们说调用Disposable.dispose()方法可以切断水管, 同样的调用Subscription.cancel()也可以切断水管, 不同的地方在于Subscription增加了一个void request(long n)方法, 这个方法有什么用呢, 在上面的代码中也有这么一句代码:

 s.request(Long.MAX_VALUE);
复制代码

这句代码有什么用呢, 不要它可以吗? 我们来试试:

 Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {Log.d(TAG, "emit 1");emitter.onNext(1);Log.d(TAG, "emit 2");emitter.onNext(2);Log.d(TAG, "emit 3");emitter.onNext(3);Log.d(TAG, "emit complete");emitter.onComplete();}}, BackpressureStrategy.ERROR).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: " + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});
复制代码

这次我们取消掉了request这句代码, 来看看运行结果:

zlc.season.rxjava2demo D/TAG: onSubscribe
zlc.season.rxjava2demo D/TAG: emit 1
zlc.season.rxjava2demo W/TAG: onError: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requestsat io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:411)at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:377)at zlc.season.rxjava2demo.demo.ChapterSeven$3.subscribe(ChapterSeven.java:77)at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:72)at io.reactivex.Flowable.subscribe(Flowable.java:12218)at zlc.season.rxjava2demo.demo.ChapterSeven.demo2(ChapterSeven.java:111)at zlc.season.rxjava2demo.MainActivity$2.onClick(MainActivity.java:36)at android.view.View.performClick(View.java:5637)at android.view.View$PerformClick.run(View.java:22429)at android.os.Handler.handleCallback(Handler.java:751)at android.os.Handler.dispatchMessage(Handler.java:95)at android.os.Looper.loop(Looper.java:154)at android.app.ActivityThread.main(ActivityThread.java:6119)at java.lang.reflect.Method.invoke(Native Method)at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:886)at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776)
zlc.season.rxjava2demo D/TAG: emit 2
zlc.season.rxjava2demo D/TAG: emit 3
zlc.season.rxjava2demo D/TAG: emit complete
复制代码

哎哎哎, 大兄弟, 怎么一言不合就抛异常?

从运行结果中可以看到, 在上游发送第一个事件之后, 下游就抛出了一个著名的MissingBackpressureException异常, 并且下游没有收到任何其余的事件. 可是这是一个同步的订阅呀, 上下游工作在同一个线程, 上游每发送一个事件应该会等待下游处理完了才会继续发事件啊, 不可能出现上下游流速不均衡的问题呀.

带着这个疑问, 我们再来看看异步的情况:

Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {Log.d(TAG, "emit 1");emitter.onNext(1);Log.d(TAG, "emit 2");emitter.onNext(2);Log.d(TAG, "emit 3");emitter.onNext(3);Log.d(TAG, "emit complete");emitter.onComplete();}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");mSubscription = s;}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: " + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});
复制代码

这次我们同样去掉了request这句代码, 但是让上下游工作在不同的线程, 来看看运行结果:

zlc.season.rxjava2demo D/TAG: onSubscribe
zlc.season.rxjava2demo D/TAG: emit 1
zlc.season.rxjava2demo D/TAG: emit 2
zlc.season.rxjava2demo D/TAG: emit 3
zlc.season.rxjava2demo D/TAG: emit complete
复制代码

哎, 这次上游正确的发送了所有的事件, 但是下游一个事件也没有收到. 这是因为什么呢?

这是因为Flowable在设计的时候采用了一种新的思路也就是响应式拉取的方式来更好的解决上下游流速不均衡的问题, 与我们之前所讲的控制数量控制速度不太一样, 这种方式用通俗易懂的话来说就好比是叶问打鬼子, 我们把上游看成小日本, 把下游当作叶问, 当调用Subscription.request(1)时, 叶问就说我要打一个! 然后小日本就拿出一个鬼子给叶问, 让他打, 等叶问打死这个鬼子之后, 再次调用request(10), 叶问就又说我要打十个! 然后小日本又派出十个鬼子给叶问, 然后就在边上看热闹, 看叶问能不能打死十个鬼子, 等叶问打死十个鬼子后再继续要鬼子接着打...

所以我们把request当做是一种能力, 当成下游处理事件的能力, 下游能处理几个就告诉上游我要几个, 这样只要上游根据下游的处理能力来决定发送多少事件, 就不会造成一窝蜂的发出一堆事件来, 从而导致OOM. 这也就完美的解决之前我们所学到的两种方式的缺陷, 过滤事件会导致事件丢失, 减速又可能导致性能损失. 而这种方式既解决了事件丢失的问题, 又解决了速度的问题, 完美 !

但是太完美的东西也就意味着陷阱也会很多, 你可能只是被它的外表所迷惑, 失去了理智, 如果你滥用或者不遵守规则, 一样会吃到苦头.

比如这里需要注意的是, 只有当上游正确的实现了如何根据下游的处理能力来发送事件的时候, 才能达到这种效果, 如果上游根本不管下游的处理能力, 一股脑的瞎他妈发事件, 仍然会产生上下游流速不均衡的问题, 这就好比小日本管他叶问要打几个, 老子直接拿出1万个鬼子, 这尼玛有种打死给我看看? 那么如何正确的去实现上游呢, 这里先卖个关子, 之后我们再来讲解.

学习了request, 我们就可以解释上面的两段代码了.

首先第一个同步的代码, 为什么上游发送第一个事件后下游就抛出了MissingBackpressureException异常, 这是因为下游没有调用request, 上游就认为下游没有处理事件的能力, 而这又是一个同步的订阅, 既然下游处理不了, 那上游不可能一直等待吧, 如果是这样, 万一这两根水管工作在主线程里, 界面不就卡死了吗, 因此只能抛个异常来提醒我们. 那如何解决这种情况呢, 很简单啦, 下游直接调用request(Long.MAX_VALUE)就行了, 或者根据上游发送事件的数量来request就行了, 比如这里request(3)就可以了.

然后我们再来看看第二段代码, 为什么上下游没有工作在同一个线程时, 上游却正确的发送了所有的事件呢? 这是因为在Flowable里默认有一个大小为128的水缸, 当上下游工作在不同的线程中时, 上游就会先把事件发送到这个水缸中, 因此, 下游虽然没有调用request, 但是上游在水缸中保存着这些事件, 只有当下游调用request时, 才从水缸里取出事件发给下游.

是不是这样呢, 我们来验证一下:

    public static void request(long n) {mSubscription.request(n); //在外部调用request请求上游}public static void demo3() {Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {Log.d(TAG, "emit 1");emitter.onNext(1);Log.d(TAG, "emit 2");emitter.onNext(2);Log.d(TAG, "emit 3");emitter.onNext(3);Log.d(TAG, "emit complete");emitter.onComplete();}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");mSubscription = s;  //把Subscription保存起来}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: " + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});}
复制代码

这里我们把Subscription保存起来, 在界面上增加了一个按钮, 点击一次就调用Subscription.request(1), 来看看运行结果:

结果似乎像那么回事, 上游发送了四个事件保存到了水缸里, 下游每request一个, 就接收一个进行处理.

刚刚我们有说到水缸的大小为128, 有朋友就问了, 你说128就128吗, 又不是唯品会周年庆, 我不信. 那就来验证一下:

Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {for (int i = 0; i < 128; i++) {Log.d(TAG, "emit " + i);emitter.onNext(i);}}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");mSubscription = s;}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: " + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});
复制代码

这里我们让上游一次性发送了128个事件, 下游一个也不接收, 来看看运行结果:

zlc.season.rxjava2demo D/TAG: onSubscribe
zlc.season.rxjava2demo D/TAG: emit 0...
zlc.season.rxjava2demo D/TAG: emit 126
zlc.season.rxjava2demo D/TAG: emit 127
复制代码

这段代码的运行结果很正常, 没有任何错误和异常, 上游仅仅是发送了128个事件.

那来试试129个呢, 把上面代码中的128改成129试试:

zlc.season.rxjava2demo D/TAG: onSubscribe
zlc.season.rxjava2demo D/TAG: emit 0...
zlc.season.rxjava2demo D/TAG: emit 126
zlc.season.rxjava2demo D/TAG: emit 127
zlc.season.rxjava2demo D/TAG: emit 128  //这是第129个事件
zlc.season.rxjava2demo W/TAG: onError: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requestsat io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:411)at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:377)at zlc.season.rxjava2demo.demo.ChapterSeven$7.subscribe(ChapterSeven.java:169)at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:72)at io.reactivex.Flowable.subscribe(Flowable.java:12218)at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82)at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)at java.util.concurrent.FutureTask.run(FutureTask.java:237)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)at java.lang.Thread.run(Thread.java:761)
复制代码

这次可以看到, 在上游发送了第129个事件的时候, 就抛出了MissingBackpressureException异常, 提醒我们发洪水啦. 当然了, 这个128也不是我凭空捏造出来的, Flowable的源码中就有这个buffersize的大小定义, 可以自行查看.

注意这里我们是把上游发送的事件全部都存进了水缸里, 下游一个也没有消费, 所以就溢出了, 如果下游去消费了事件, 可能就不会导致水缸溢出来了. 这里我们说的是可能不会, 这也很好理解, 比如刚才这个例子上游发了129个事件, 下游只要快速的消费了一个事件, 就不会溢出了, 但如果下游过了十秒钟再来消费一个, 那肯定早就溢出了.

好了, 今天的教程就到这里了, 下一节我们将会更加深入的去学习FLowable, 敬请期待.

转载于:https://juejin.im/post/5cadb23ae51d456e8b07dd11

RxJava2.0的初学者必备教程(七)相关推荐

  1. RxJava2.0的初学者必备教程(九)

    前言 好久不见朋友们,最近一段时间在忙工作上的事情,这两天正好有点时间,赶紧写下了这篇教程,免得大家说我太咸了. 正题 先来回顾一下上上节,我们讲Flowable的时候,说它采用了响应式拉的方式,我们 ...

  2. 给初学者的RxJava2.0教程(二):【线程控制】

    CSD转载地址:http://blog.csdn.net/qq_23179075/article/details/79256089 作者:Season_zlc 链接:https://www.jians ...

  3. 给初学者的RxJava2.0教程(三):【操作符:Map、FlatMap、ConcatMap】

    CSDN转载地址:http://blog.csdn.net/qq_23179075/article/details/79271365 作者:Season_zlc 链接:https://www.jian ...

  4. 给初学者的RxJava2.0教程(一):【入门】

    CSDN转载地址:http://blog.csdn.net/qq_23179075/article/details/79255971 作者:Season_zlc 链接:https://www.jian ...

  5. RxJava2.0教程(四)

    RxJava2.0教程(四) 上一节里我们学习了只使用Observable如何去解决上下游流速不均衡的问题, 之所以学习这个是因为Observable还是有很多它使用的场景, 有些朋友自从听说了Flo ...

  6. EDIUS 视音频制作标准教程(第2版)21堂课高清1280X720 25Pmp4格式 含同步素材 edius初学者必备装备

    EDIUS 视音频制作标准教程(第2版)21堂课高清1280X720 25Pmp4格式 含同步素材 edius初学者必备装备 肖一峰,作为Grass Valley 公司EDISU产品工程师,有大量机会 ...

  7. 黄聪:Microsoft Enterprise Library 5.0 系列教程(七) Exception Handling Application Block

    黄聪:Microsoft Enterprise Library 5.0 系列教程(七) Exception Handling Application Block 原文:黄聪:Microsoft Ent ...

  8. slua 是c语言开发的吗,初学者必备文档:LUA新手快速学习笔记

    LUA程序设计语言 是一个简洁.轻量.可扩展的脚本语言.LUA读作/'lua/(噜啊),是葡萄牙语中"Luna"(月亮)的意思. LUA的目标是成为一个很容易嵌入其它语言中使用的语 ...

  9. MVC5+EF6 入门完整教程七

    原文:MVC5+EF6 入门完整教程七 本篇我们针对表格显示添加一些新功能. 前面我们已经讲解过表格显示数据了,现在我们添加三个常用功能: 对显示结果进行排序.过滤.分页. 文章提纲 理论基础/前置准 ...

最新文章

  1. 爬虫之js2py的使用
  2. python selenium 文件上传_Python+Selenium学习--上传文件
  3. @Configuration与@Component区别
  4. 报错解决办法 SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder
  5. Qt文档阅读笔记-QThreadPool的解释及使用
  6. CSS的class、id、css文件名的常用命名规则
  7. c语言用栈输出迷宫所有路径,如何在迷宫中使用到栈
  8. 如何在计算机上设置禁止游戏,如何禁止玩电脑游戏 屏蔽网络游戏的方法
  9. lcms质谱仪_岛津LCMS-8045三重四极杆质谱仪
  10. 如何开启显示文件后缀名(扩展名)
  11. 京东、搜狗“带狗”都好好的,为何司机对快狗打车不满意?
  12. 阿里云ECS服务器安装AMH5.3面板并搭建WordPress站点详细教程(卞懂的学习笔记)...
  13. 学大伟业:在数学竞赛学习中,你属于哪种类型?
  14. 颜色转换公式大全及转换表格(31种)
  15. 【Upload oss图片 上传失败】
  16. 在 VMware vSphere 中构建 Kubernetes 存储环境
  17. cmd命令行进行C++代码编译运行;实现进程调度和存储管理
  18. 20年嵌入式工程师经验分享:从0开发一款嵌入式产品-道合顺大数据Infinigo
  19. smalltalk五个特性
  20. 【转】拇指拇外翻的纠正训练

热门文章

  1. 模板方法模式和职责链模式
  2. 二、C++反作弊对抗实战 (进阶篇 —— 14.利用内存加载+重定向绕过inline iat hook)
  3. 不吃苦,你要青春何用
  4. python怎么做笔记本(文本编辑器)
  5. 学习笔记 | SpringBoot微信点餐系统实战课程笔记(一)、数据库设计与创建
  6. 全链路监控之pinpoint
  7. C语言——一维整型数组的 镜像对调
  8. 【AI JUST AI】自然语言交互式学习,ChatGPT成了我的最佳博客写作助手
  9. vue3+qrcode插件实现下载二维码图片(.png、.svg格式)
  10. Edittext抖动