RxJava 两种生产和消费模式,(冷)cold和(热)hot
RxJava目前有两种发布和订阅模式
第一种 cold模式,这种模式观察者订阅被观察模式时,被观察者的动作会重放,举例说明:
@NonNullFlowable<@NonNull Object> observeOn =Flowable.create(e -> {e.onNext(1); Thread.sleep(1000);e.onNext(2); Thread.sleep(1000);e.onNext(3); Thread.sleep(1000);e.onNext(4); Thread.sleep(1000);e.onNext(5); Thread.sleep(1000);e.onNext(6); Thread.sleep(1000);}, BackpressureStrategy.BUFFER).observeOn(Schedulers.computation());Thread.sleep(1000 * 1);observeOn.subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));Thread.sleep(1000 * 1);observeOn.subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));
这种会大音两次1,2,3,4,5,6,即使时不同的线程,第二次会等第一次完成过后开始(因为未设置subscribeOn,所以是单线程的,第二次会在第一次完成后再开始)。而且是当观察者订阅被观察者的时候触发被观察者的动作。
RxComputationThreadPool-1 + next + 1
RxComputationThreadPool-1 + next + 2
RxComputationThreadPool-1 + next + 3
RxComputationThreadPool-1 + next + 4
RxComputationThreadPool-1 + next + 5
RxComputationThreadPool-1 + next + 6
RxComputationThreadPool-2 + next + 1
RxComputationThreadPool-2 + next + 2
RxComputationThreadPool-2 + next + 3
RxComputationThreadPool-2 + next + 4
RxComputationThreadPool-2 + next + 5
RxComputationThreadPool-2 + next + 6
我们create中lambda的方法会再次运行
相似的ReplaySubject
还有一种当订阅后会重新处理已发送的数据ReplaySubject
ReplaySubject<Integer> replaySubject = ReplaySubject.create(1);replaySubject.onNext(1);replaySubject.onNext(2);replaySubject.onNext(3);System.out.println("subscribe 1");replaySubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));replaySubject.onNext(4);replaySubject.onNext(5);System.out.println("subscribe 2");replaySubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));replaySubject.onNext(6);
控制台输出为
subscribe 1
subscribe 2
RxComputationThreadPool-1 + next + 1
RxComputationThreadPool-1 + next + 2
RxComputationThreadPool-1 + next + 3
RxComputationThreadPool-1 + next + 4
RxComputationThreadPool-1 + next + 5
RxComputationThreadPool-1 + next + 6
RxComputationThreadPool-2 + next + 1
RxComputationThreadPool-2 + next + 2
RxComputationThreadPool-2 + next + 3
RxComputationThreadPool-2 + next + 4
RxComputationThreadPool-2 + next + 5
RxComputationThreadPool-2 + next + 6
但是Flowable.create和ReplaySubject的模式不太一样。
对于Flowable来说,新的subscribe来临时,会重新执行create方法里面的FlowableOnSubscribe的apply方法。
对于ReplaySubject是把前面onNext()的数据保存到list中,然后新的subscribe来临时重新遍历list消费。这里需要注意ReplaySubject有内存泄漏的风险。见io.reactivex.rxjava3.subjects.ReplaySubject.buffer。
第二种是HOT模式。
1、使用cold + publish()方法修改cold为hot。
@NonNullConnectableFlowable<@NonNull Object> publish = Flowable.create(e -> {e.onNext(1); Thread.sleep(1000);System.out.println(1+ " " + Thread.currentThread().getName());e.onNext(2); Thread.sleep(1000);System.out.println(2+ " " + Thread.currentThread().getName());e.onNext(3); Thread.sleep(1000);System.out.println(3+ " " + Thread.currentThread().getName());e.onNext(4); Thread.sleep(1000);System.out.println(4+ " " + Thread.currentThread().getName());e.onNext(5); Thread.sleep(1000);System.out.println(5+ " " + Thread.currentThread().getName());e.onNext(6); Thread.sleep(1000);System.out.println(6+ " " + Thread.currentThread().getName());}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.computation()).publish();System.out.println("connect");publish.connect();System.out.println("subscribe 1");publish.subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));Thread.sleep(3000 * 1);System.out.println("subscribe 2");publish.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));
下面是输出,可以看到没有出现重放。
connect
subscribe 1
main + next + 1
1 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 2
2 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 3
3 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 4
subscribe 2
4 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 5
RxComputationThreadPool-2 + next + 5
5 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 6
RxComputationThreadPool-2 + next + 6
6 RxComputationThreadPool-1
2、或者使用Subject对象
这里使用了PublishSubject,他的观察者只处理订阅过后的数据。subject包含了其他类型的对象,可以参考RxJava 的 Subject
PublishSubject<Integer> publishSubject = PublishSubject.create();publishSubject.onNext(1);publishSubject.onNext(2);publishSubject.onNext(3);System.out.println("subscribe 1");publishSubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));publishSubject.onNext(4);publishSubject.onNext(5);System.out.println("subscribe 2");publishSubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));publishSubject.onNext(6);
执行过后的打印
subscribe 1
subscribe 2
RxComputationThreadPool-1 + next + 4
RxComputationThreadPool-1 + next + 5
RxComputationThreadPool-1 + next + 6
RxComputationThreadPool-2 + next + 6
RxJava 两种生产和消费模式,(冷)cold和(热)hot相关推荐
- kotlin for android----------MVP模式下(OKHttp和 Retrofit+RxJava)网络请求的两种实现方式...
2019独角兽企业重金招聘Python工程师标准>>> 今天要说的干货是:以Kotlin,在MVP模式下(OKHttp和 Retrofit+RxJava)网络请求两种实现方式的一个小 ...
- 10-2 5-2 查询至少生产两种不同的计算机(PC或便携式电脑)且机器速度至少为133的厂商 (20 分)(思路加详解+测试用例)
一:题目 本题目要求编写SQL语句, 查询至少生产两种不同型号的计算机(PC或便携式电脑)且机器速度至少为133的厂商,输出结果按照制造商升序排列. 提示: 本题有三种情况: (1) 至少生产两种不同 ...
- RocketMQ(七) RocketMQ的两种消费模式
目录 一.概述 二.集群消费 三.广播消费 一.概述 RocketMQ主要提供了两种消费模式:集群消费以及广播消费.我们只需要在定义消费者的时候通过setMessageModel(MessageMod ...
- 查询至少生产两种不同的计算机(PC或便携式电脑)且机器速度至少为133的厂商
10-2 5-2 查询至少生产两种不同的计算机(PC或便携式电脑)且机器速度至少为133的厂商 (10 分) 本题目要求编写SQL语句, 查询至少生产两种不同型号的计算机(PC或便携式电脑)且机器速度 ...
- 世链财经|教你两种方法创建冷钱包的方法
来源:世链财经(www.shilain.com) 冷钱包是指网络不能访问到你私钥的钱包(即离线钱包).它的优点是very安全,不用担心私钥被盗.但是操作麻烦,而且也存在物理安全风险(比如电脑丢失损坏等 ...
- sparkstreaming监听hdfs目录_Spark Streaming消费Kafka数据的两种方案
下午的时候翻微信看到大家在讨论Spark消费Kafka的方式,官网中就有答案,只不过是英文的,当然很多博客也都做了介绍,正好我的收藏夹中有一篇文章供大家参考.文章写的通俗易懂,搭配代码,供大家参考. ...
- 注塑模具是一种生产塑胶制品的工具
注塑模具由动模和定模两部分组成,动模安装在注射成型机的移动模板上,定模安装在注射成型机的固定模板上.在注射成型时动模与定模闭合构成浇注系统和型腔. 注塑模具依成型特性区分为热固性塑胶模具.热塑性塑胶模 ...
- 并发框架Disruptor(核心概念 入门 高性能原理-伪共享 CAS 环形数据 生产和消费模式 高级使用 )
并发框架Disruptor 并发框架Disruptor Disruptor概述 背景 什么是Disruptor 为什么使用Disruptor Disruptor 的核心概念 Ring Buffer S ...
- 2021年大数据Spark(九):Spark On Yarn两种模式总结
目录 Spark On Yarn两种模式 引入 一.当一个MR应用提交运行到Hadoop YARN上时 二.当一个Spark应用提交运行在集群上时 注意 client 模式 cluster 模式 总结 ...
- mysql不停止重启服务器_不停止MySQL服务增加从库的两种方式
现在生产环境MySQL数据库是一主一从,由于业务量访问不断增大,故再增加一台从库.前提是不能影响线上业务使用,也就是说不能重启MySQL服务,为了避免出现其他情况,选择在网站访问量低峰期时间段操作. ...
最新文章
- Windows 7 下月停止支持,微软重申将全屏通知敦促用户升级系统
- MybatisPlus代码生成器配置
- SQL中的CASE WHEN用法
- iOS经典面试题之深入分析block相关高频面试题
- TCP 端口监听队列原理
- 在java继承机制中 父类中的私有_Java中子类能继承父类的私有属性吗?
- 一个强大的粘性标签库
- SQL条件!=null查不出数据
- 第11月第18天 RACSequence
- 三星投8万亿扩充10nm、7nm产能:骁龙835爆发
- [Cogs728] [网络流24题#3] 最小路径覆盖 [网络流,最大流,二分图匹配]
- 电脑有英伟达独显,有了英伟达显卡驱动还要英特尔显卡驱动吗?
- shell脚本shc加密解密
- TeamViewer15免费版更换账户登录设备
- 7-3 求最小码距(完整版) (10 分)
- 安卓开发笔记——APP闪退解决方案
- 8.10-8.13 shell的cut,sort_wc_uniq,tee_tr_split,特殊符号
- CSS水平居中,垂直居中,水平垂直居中
- Linux/Android——usb触摸屏驱动 - usbtouchscreen (一)
- 什么是web前端开发工程师和h5有什么关系
热门文章
- PageOffice 在线打开 word 文件并添加水印
- win10状态栏卡死,屏幕正常
- GAMES101 13节 光线追踪 笔记
- html作品实验报告,html网页制作实验报告.docx
- SP10108 BALLOT - Distributing Ballot Boxes(二分查找)
- UiPath如何下载
- async异步问题(waterfall、parallel、eachSeries、map、whilst)
- 桌面便签哪个软件好用?求推荐一款便签软件?
- python基本类型关键字_python中的关键字---1(基础数据类)
- 基于element-ui实现待支付订单显示定时器