RxJava目前有两种发布和订阅模式

第一种 cold模式,这种模式观察者订阅被观察模式时,被观察者的动作会重放,举例说明:

@NonNullFlowable<@NonNull Object> observeOn =Flowable.create(e -> {e.onNext(1); Thread.sleep(1000);e.onNext(2); Thread.sleep(1000);e.onNext(3); Thread.sleep(1000);e.onNext(4); Thread.sleep(1000);e.onNext(5); Thread.sleep(1000);e.onNext(6); Thread.sleep(1000);}, BackpressureStrategy.BUFFER).observeOn(Schedulers.computation());Thread.sleep(1000 * 1);observeOn.subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));Thread.sleep(1000 * 1);observeOn.subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));

这种会大音两次1,2,3,4,5,6,即使时不同的线程,第二次会等第一次完成过后开始(因为未设置subscribeOn,所以是单线程的,第二次会在第一次完成后再开始)。而且是当观察者订阅被观察者的时候触发被观察者的动作。

RxComputationThreadPool-1 + next + 1
RxComputationThreadPool-1 + next + 2
RxComputationThreadPool-1 + next + 3
RxComputationThreadPool-1 + next + 4
RxComputationThreadPool-1 + next + 5
RxComputationThreadPool-1 + next + 6
RxComputationThreadPool-2 + next + 1
RxComputationThreadPool-2 + next + 2
RxComputationThreadPool-2 + next + 3
RxComputationThreadPool-2 + next + 4
RxComputationThreadPool-2 + next + 5
RxComputationThreadPool-2 + next + 6

我们create中lambda的方法会再次运行

相似的ReplaySubject

还有一种当订阅后会重新处理已发送的数据ReplaySubject

ReplaySubject<Integer> replaySubject = ReplaySubject.create(1);replaySubject.onNext(1);replaySubject.onNext(2);replaySubject.onNext(3);System.out.println("subscribe 1");replaySubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));replaySubject.onNext(4);replaySubject.onNext(5);System.out.println("subscribe 2");replaySubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));replaySubject.onNext(6);

控制台输出为

subscribe 1
subscribe 2
RxComputationThreadPool-1 + next + 1
RxComputationThreadPool-1 + next + 2
RxComputationThreadPool-1 + next + 3
RxComputationThreadPool-1 + next + 4
RxComputationThreadPool-1 + next + 5
RxComputationThreadPool-1 + next + 6
RxComputationThreadPool-2 + next + 1
RxComputationThreadPool-2 + next + 2
RxComputationThreadPool-2 + next + 3
RxComputationThreadPool-2 + next + 4
RxComputationThreadPool-2 + next + 5
RxComputationThreadPool-2 + next + 6

但是Flowable.create和ReplaySubject的模式不太一样。
对于Flowable来说,新的subscribe来临时,会重新执行create方法里面的FlowableOnSubscribe的apply方法。
对于ReplaySubject是把前面onNext()的数据保存到list中,然后新的subscribe来临时重新遍历list消费。这里需要注意ReplaySubject有内存泄漏的风险。见io.reactivex.rxjava3.subjects.ReplaySubject.buffer。

第二种是HOT模式。

1、使用cold + publish()方法修改cold为hot。

@NonNullConnectableFlowable<@NonNull Object> publish = Flowable.create(e -> {e.onNext(1); Thread.sleep(1000);System.out.println(1+ " " + Thread.currentThread().getName());e.onNext(2); Thread.sleep(1000);System.out.println(2+ " " + Thread.currentThread().getName());e.onNext(3); Thread.sleep(1000);System.out.println(3+ " " + Thread.currentThread().getName());e.onNext(4); Thread.sleep(1000);System.out.println(4+ " " + Thread.currentThread().getName());e.onNext(5); Thread.sleep(1000);System.out.println(5+ " " + Thread.currentThread().getName());e.onNext(6); Thread.sleep(1000);System.out.println(6+ " " + Thread.currentThread().getName());}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.computation()).publish();System.out.println("connect");publish.connect();System.out.println("subscribe 1");publish.subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));Thread.sleep(3000 * 1);System.out.println("subscribe 2");publish.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));

下面是输出,可以看到没有出现重放。

connect
subscribe 1
main + next + 1
1 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 2
2 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 3
3 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 4
subscribe 2
4 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 5
RxComputationThreadPool-2 + next + 5
5 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 6
RxComputationThreadPool-2 + next + 6
6 RxComputationThreadPool-1

2、或者使用Subject对象
这里使用了PublishSubject,他的观察者只处理订阅过后的数据。subject包含了其他类型的对象,可以参考RxJava 的 Subject

     PublishSubject<Integer> publishSubject = PublishSubject.create();publishSubject.onNext(1);publishSubject.onNext(2);publishSubject.onNext(3);System.out.println("subscribe 1");publishSubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));publishSubject.onNext(4);publishSubject.onNext(5);System.out.println("subscribe 2");publishSubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));publishSubject.onNext(6);

执行过后的打印

subscribe 1
subscribe 2
RxComputationThreadPool-1 + next + 4
RxComputationThreadPool-1 + next + 5
RxComputationThreadPool-1 + next + 6
RxComputationThreadPool-2 + next + 6

RxJava 两种生产和消费模式,(冷)cold和(热)hot相关推荐

  1. kotlin for android----------MVP模式下(OKHttp和 Retrofit+RxJava)网络请求的两种实现方式...

    2019独角兽企业重金招聘Python工程师标准>>> 今天要说的干货是:以Kotlin,在MVP模式下(OKHttp和 Retrofit+RxJava)网络请求两种实现方式的一个小 ...

  2. 10-2 5-2 查询至少生产两种不同的计算机(PC或便携式电脑)且机器速度至少为133的厂商 (20 分)(思路加详解+测试用例)

    一:题目 本题目要求编写SQL语句, 查询至少生产两种不同型号的计算机(PC或便携式电脑)且机器速度至少为133的厂商,输出结果按照制造商升序排列. 提示: 本题有三种情况: (1) 至少生产两种不同 ...

  3. RocketMQ(七) RocketMQ的两种消费模式

    目录 一.概述 二.集群消费 三.广播消费 一.概述 RocketMQ主要提供了两种消费模式:集群消费以及广播消费.我们只需要在定义消费者的时候通过setMessageModel(MessageMod ...

  4. 查询至少生产两种不同的计算机(PC或便携式电脑)且机器速度至少为133的厂商

    10-2 5-2 查询至少生产两种不同的计算机(PC或便携式电脑)且机器速度至少为133的厂商 (10 分) 本题目要求编写SQL语句, 查询至少生产两种不同型号的计算机(PC或便携式电脑)且机器速度 ...

  5. 世链财经|教你两种方法创建冷钱包的方法

    来源:世链财经(www.shilain.com) 冷钱包是指网络不能访问到你私钥的钱包(即离线钱包).它的优点是very安全,不用担心私钥被盗.但是操作麻烦,而且也存在物理安全风险(比如电脑丢失损坏等 ...

  6. sparkstreaming监听hdfs目录_Spark Streaming消费Kafka数据的两种方案

    下午的时候翻微信看到大家在讨论Spark消费Kafka的方式,官网中就有答案,只不过是英文的,当然很多博客也都做了介绍,正好我的收藏夹中有一篇文章供大家参考.文章写的通俗易懂,搭配代码,供大家参考. ...

  7. 注塑模具是一种生产塑胶制品的工具

    注塑模具由动模和定模两部分组成,动模安装在注射成型机的移动模板上,定模安装在注射成型机的固定模板上.在注射成型时动模与定模闭合构成浇注系统和型腔. 注塑模具依成型特性区分为热固性塑胶模具.热塑性塑胶模 ...

  8. 并发框架Disruptor(核心概念 入门 高性能原理-伪共享 CAS 环形数据 生产和消费模式 高级使用 )

    并发框架Disruptor 并发框架Disruptor Disruptor概述 背景 什么是Disruptor 为什么使用Disruptor Disruptor 的核心概念 Ring Buffer S ...

  9. 2021年大数据Spark(九):Spark On Yarn两种模式总结

    目录 Spark On Yarn两种模式 引入 一.当一个MR应用提交运行到Hadoop YARN上时 二.当一个Spark应用提交运行在集群上时 注意 client 模式 cluster 模式 总结 ...

  10. mysql不停止重启服务器_不停止MySQL服务增加从库的两种方式

    现在生产环境MySQL数据库是一主一从,由于业务量访问不断增大,故再增加一台从库.前提是不能影响线上业务使用,也就是说不能重启MySQL服务,为了避免出现其他情况,选择在网站访问量低峰期时间段操作. ...

最新文章

  1. Windows 7 下月停止支持,微软重申将全屏通知敦促用户升级系统
  2. MybatisPlus代码生成器配置
  3. SQL中的CASE WHEN用法
  4. iOS经典面试题之深入分析block相关高频面试题
  5. TCP 端口监听队列原理
  6. 在java继承机制中 父类中的私有_Java中子类能继承父类的私有属性吗?
  7. 一个强大的粘性标签库
  8. SQL条件!=null查不出数据
  9. 第11月第18天 RACSequence
  10. 三星投8万亿扩充10nm、7nm产能:骁龙835爆发
  11. [Cogs728] [网络流24题#3] 最小路径覆盖 [网络流,最大流,二分图匹配]
  12. 电脑有英伟达独显,有了英伟达显卡驱动还要英特尔显卡驱动吗?
  13. shell脚本shc加密解密
  14. TeamViewer15免费版更换账户登录设备
  15. 7-3 求最小码距(完整版) (10 分)
  16. 安卓开发笔记——APP闪退解决方案
  17. 8.10-8.13 shell的cut,sort_wc_uniq,tee_tr_split,特殊符号
  18. CSS水平居中,垂直居中,水平垂直居中
  19. Linux/Android——usb触摸屏驱动 - usbtouchscreen (一)
  20. 什么是web前端开发工程师和h5有什么关系

热门文章

  1. PageOffice 在线打开 word 文件并添加水印
  2. win10状态栏卡死,屏幕正常
  3. GAMES101 13节 光线追踪 笔记
  4. html作品实验报告,html网页制作实验报告.docx
  5. SP10108 BALLOT - Distributing Ballot Boxes(二分查找)
  6. UiPath如何下载
  7. async异步问题(waterfall、parallel、eachSeries、map、whilst)
  8. 桌面便签哪个软件好用?求推荐一款便签软件?
  9. python基本类型关键字_python中的关键字---1(基础数据类)
  10. 基于element-ui实现待支付订单显示定时器