在教学和指导RxJava以及撰写本书之后 ,我注意到某些领域尤其成问题。 我决定发布一些简短的提示,以解决最常见的陷阱。 这是第一部分。
ObservableFlowable本质上是惰性的。 这意味着无论您在Flowable放置了多么繁琐或长时间运行的逻辑,仅当有人订阅时,它才会被评估。 并且还有某人订阅的次数。 下面的代码段对此进行了说明:

private static String slow() throws InterruptedException {logger.info("Running");TimeUnit.SECONDS.sleep(1);return "abc";
}//...Flowable flo = Flowable.fromCallable(this::slow);
logger.info("Created");
flo.subscribe();
flo.subscribe();
logger.info("Done");

这样的ObservableFlowable将不可避免地打印:

19:37:57.368 [main] - Created
19:37:57.379 [main] - Running
19:37:58.383 [main] - Running
19:37:59.388 [main] - Done

请注意,您需要两次支付sleep()的价格sleep()两次订阅)。 此外,所有逻辑都在客户端( main )线程中运行,除非通过subscriptionOn subscribeOn()请求或异步流隐式可用,否则RxJava中没有隐式线程。 问题是:我们是否可以热切地强制运行订阅逻辑,以便每当有人订阅该流时,就已经对其进行了预先计算或至少开始了计算?

完全渴望评估

最明显但有缺陷的解决方案是急于计算流返回的任何内容,并简单地将其包装为固定的Flowable

Flowable<String> eager() {final String slow = slow();return Flowable.just(slow);
}

不幸的是,这大大破坏了RxJava的目的。 首先,像subscribeOn()这样的运算符将不再起作用,并且无法将计算卸载到其他线程。 更糟糕的是,即使eager()返回了Flowable但按照定义,它将始终阻止客户端线程。 很难推理,组合和管理此类流。 通常,即使需要进行急切的评估,也应避免使用这种模式,而应选择延迟加载。

使用

下一个示例仅使用cache()运算符:

Flowable<String> eager3() throws InterruptedException {final Flowable<String> cached =Flowable.fromCallable(this::slow).cache();cached.subscribe();return cached;
}

这个想法很简单:用惰性Flowable包装计算并缓存它。 cache()运算符所做的是,它会记住第一次订阅时发出的所有事件,以便在出现第二个Subscriber ,它将接收相同的事件缓存序列。 但是cache()运算符(像大多数其他运算符一样)是惰性的,因此我们必须第一次强制订阅。 调用subscribe()将预填充缓存,此外,如果第二个订户出现在slow()计算完成之前,它将同样等待它,而不是第二次启动它。

此解决方案有效,但请记住,由于未涉及Schedulersubscribe()实际上将被阻止。 如果要在后台预填充Flowable ,请尝试subscribeOn()

Flowable<String> eager3() throws InterruptedException {final Flowable<String> cached =Flowable.fromCallable(this::slow).subscribeOn(justDontAlwaysUse_Schedulers.io()).cache();cached.subscribe();return cached;
}

是的,在生产系统上使用Schedulers.io()存在问题且难以维护,因此请避免使用自定义线程池。

错误处理

令人遗憾的是,吞噬RxJava中的异常非常容易。 如果slow()方法失败,这就是我们上一个示例中可能发生的情况。 异常不会完全被吞没,但是默认情况下,如果没有人对此感兴趣,它将在System.err上打印堆栈跟踪。 同样,未处理的异常也包装在OnErrorNotImplementedException 。 如果您执行任何形式的集中式日志记录,则不太方便,很可能会丢失。 您可以使用doOnError()操作进行日志记录,但它仍然通过例外下游RxJava认为未处理的为好,一次包装与OnErrorNotImplementedException 。 因此,让我们在subscribe()实现onError回调:

Flowable<String> eager3() throws InterruptedException {final Flowable<String> cached =Flowable.fromCallable(this::slow).cache();cached.subscribe(x -> {/* ignore */},e -> logger.error("Prepopulation error", e));return cached;
}

我们不想处理实际事件,而只是处理subscribe()错误。 此时,您可以安全地返回此类Flowable 。 急切且有希望的是,只要您订阅了它,数据就已经可用。 注意,例如,Hystrix的observe()方法也很急切,而懒惰的toObservable()相反。 这是你的选择。

翻译自: https://www.javacodegeeks.com/2017/08/eager-subscription-rxjava-faq.html

渴望订阅– RxJava常见问题解答相关推荐

  1. 渴望 英语_渴望订阅– RxJava常见问题解答

    渴望 英语 在教学和指导RxJava以及撰写本书之后 ,我注意到某些领域特别有问题. 我决定发布一些简短的提示,以解决最常见的陷阱. 这是第一部分. Observable和Flowable本质上是惰性 ...

  2. flatmap_flatMap()与concatMap()与concatMapEager()– RxJava常见问题解答

    flatmap RxJava 2.x中共有三个无缝相似的运算符: flatMap() , concatMap()和concatMapEager() . 它们都接受相同的参数-从原始流的单个项目到任意类 ...

  3. 并发查询parallel_惯用并发:flatMap()与parallel()– RxJava常见问题解答

    并发查询parallel 简单,有效和安全的并发是RxJava的设计原则之一. 然而,具有讽刺意味的是,它可能是该库中最容易被误解的方面之一. 让我们举一个简单的例子:假设我们有一堆UUID并且对于每 ...

  4. flowable背压 取消_使用Flowable.generate()生成可感知背压的流– RxJava常见问题解答...

    flowable背压 取消 RxJava缺少创建无限自然数流的工厂. 这样的流很有用,例如,当您想通过压缩两个事件的顺序来为可能的无限事件流分配唯一的序列号时: Flowable<Long> ...

  5. openwrt固定速率_固定速率与固定延迟– RxJava常见问题解答

    openwrt固定速率 如果您使用的是纯Java,从版本5开始,我们有一个方便的调度程序类,该类允许以固定速率或固定延迟运行任务: import java.util.concurrent.Execut ...

  6. rxjava 背压_背压加载文件– RxJava常见问题解答

    rxjava 背压 事实证明,将文件作为流进行处理非常有效且方便. 许多人似乎忘记了,自Java 8(3年以上!)以来,我们可以很容易地将任何文件变成一行代码: String filePath = & ...

  7. flatMap()与concatMap()与concatMapEager()– RxJava常见问题解答

    RxJava 2.x中共有三个无缝相似的运算符: flatMap() , concatMap()和concatMapEager() . 它们都接受相同的参数-从原始流的单个项目到任意类型的(子)流的函 ...

  8. 惯用并发:flatMap()与parallel()– RxJava常见问题解答

    简单,有效和安全的并发是RxJava的设计原则之一. 然而,具有讽刺意味的是,它可能是该库中最容易被误解的方面之一. 让我们举一个简单的例子:假设我们有一堆UUID并且对于每个UUID ,我们必须执行 ...

  9. 使用Flowable.generate()生成可感知背压的流– RxJava常见问题解答

    RxJava缺少创建无限自然数流的工厂. 这样的流很有用,例如,当您想通过压缩两个事件的唯一序列号给可能的无限事件流时: Flowable<Long> naturalNumbers = / ...

最新文章

  1. VSCode如何自动换行,右侧换行间距长度,隐藏右侧代码预览(Minimap代码缩略图滚动条),比对代码差异窗口也自动换行
  2. 如何使用Python和Tkinter构建Toy Markdown编辑器
  3. python 如何把小数变成百分数格式
  4. 实例讲解如何通过Oracle成功发送邮件-入门基础
  5. oracle查看数据库剩余空间,Oracle 查看数据库空间使用情况
  6. 电子商务概论_走进经管优质线上课堂(二)之电子商务概论
  7. 刀片服务器虚拟化哪家好,刀片服务器TOP5 细数虚拟化时代利器
  8. Linux 部署ASP.NET SQLite 应用 的坎坷之旅 附demo及源码
  9. android scrollview 动态添加,使用Scrollview和LinearLayout动态添加布局
  10. 64行代码实现简单人脸识别
  11. efi分区咋移动到c盘里_曲线解决——用原版镜像中的diskpart命令创建efi分区,分区类型为主分区的原因...
  12. windows 10字体突然变小变细,模糊
  13. 2020省赛第八次训练赛题解
  14. 【谷粒商城】k8s、devops集群篇(4/4)
  15. linux安装使用7zip
  16. swfobject简单封装
  17. 站群教程SEO推广(SEO教程)
  18. python 列表 循环 实现简易的电子商城
  19. 迁移学习论文(三):Multi-Adversarial Domain Adaptation论文原理及复现工作
  20. CoppeliaSim学习笔记之差速小车的控制与传感器的驱动

热门文章

  1. 这可能是把Docker的概念讲的最清楚的一篇文章
  2. JavaFX UI控件教程(九)之Text Field
  3. 扫盲,为什么分布式一定要有Redis?
  4. JavaScript学习总结(九)——Javascript面向(基于)对象编程
  5. android volley 上传图片 和参数,Android使用Volley实现上传文件功能
  6. mysql5.5安装配置 在阿里云服务器上 本地navicat连接
  7. 转:json与map互转
  8. 牛客网JAVA专项联系共899题--个人记录学习经历
  9. aws lambda_API网关和AWS Lambda进行身份验证
  10. optionals_Java Optionals获得更具表现力的代码