public class T2 {/*** subject 是一个神奇的对象,它可以是一个Observable同时也可以是一个Observer:它作为连接这两个世界的一座桥梁。* 一个主题可以订阅一个Observable,就像一个观察者,并且它可以发射新的数据,或者传递它接受到的数据,就像一个Observable。* 很明显,作为一个Observable,观察者们或者其它主题都可以订阅它。* 串行化如果你把 Subject 当作一个 Subscriber 使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法),* 这可能导致同时(非顺序)调用,这会违反Observable协议,给Subject的结果增加了不确定性。* 要避免此类问题,你可以将 Subject 转换为一个 SerializedSubject ,类似于这样:* mySafeSubject = new SerializedSubject( myUnsafeSubject );*/public static void main(String[] args) {T2 t2 = new T2();System.out.println("===================testPublishSubject==========================");t2.testPublishSubject();System.out.println("===================testBehaviorSubject==========================");t2.testBehaviorSubject();System.out.println("===================testReplaySubject==========================");t2.testReplaySubject();System.out.println("===================testAsyncSubject==========================");t2.testAsyncSubject();}/*PublishSubject的观察者接收到的是后续的消息输出为:===================testPublishSubject==========================observer1 - A observer1 - B   observer1 - C   observer2 - C   observer1 - D   observer2 - D   onCompletedonCompleted* */private void testPublishSubject() {Observer<String> observer1 = new Observer<String>() {@Overridepublic void onNext(String t) {System.out.print("observer1 - " + t + "\t");}@Overridepublic void onCompleted() {System.out.println("onCompleted");}@Overridepublic void onError(Throwable e) {System.out.println(e.getMessage());}};Observer<String> observer2 = new Observer<String>() {@Overridepublic void onNext(String t) {System.out.print("observer2 - " + t + "\t");}@Overridepublic void onCompleted() {System.out.println("onCompleted");}@Overridepublic void onError(Throwable e) {System.out.println(e.getMessage());}};PublishSubject<String> publishSubject = PublishSubject.create();publishSubject.subscribe(observer1);publishSubject.onNext("A");publishSubject.onNext("B");publishSubject.subscribe(observer2);publishSubject.onNext("C");publishSubject.onNext("D");publishSubject.onCompleted();System.out.println();}/** BehaviorSubject的观察者接收到的永远是最近的消息 和后续的消息* 输出为===================testBehaviorSubject==========================* default  A   B   C* B    C   D* onCompleted* error* */private void testBehaviorSubject() {Observer<String> observer = new Observer<String>() {@Overridepublic void onNext(String t) {System.out.print(t + "\t");}@Overridepublic void onCompleted() {System.out.println("onCompleted");}@Overridepublic void onError(Throwable e) {System.out.println(e.getMessage());}};//收到所有消息BehaviorSubject<String> subject1 = BehaviorSubject.create("default");subject1.subscribe(observer);subject1.onNext("A");subject1.onNext("B");subject1.onNext("C");System.out.println();//不能收到default、ABehaviorSubject<String> subject2 = BehaviorSubject.create("default");subject2.onNext("A");subject2.onNext("B");subject2.subscribe(observer);subject2.onNext("C");subject2.onNext("D");System.out.println();//只能收到onCompletedBehaviorSubject<String> subject3 = BehaviorSubject.create("default");subject3.onNext("A");subject3.onNext("B");subject3.onCompleted();subject3.subscribe(observer);System.out.println();// 只能收到errorBehaviorSubject<String> subject4 = BehaviorSubject.create("default");subject4.onNext("A");subject3.onNext("B");subject4.onError(new RuntimeException("error"));subject4.subscribe(observer);System.out.println();}/** ReplaySubject会缓存所有消息,所以观察者都会收到所有消息* 输出:===================testReplaySubject==========================* A B   A   B   C   C   D   D   onCompleted* onCompleted* */private void testReplaySubject() {Observer<String> observer1 = new Observer<String>() {@Overridepublic void onNext(String t) {System.out.print(t + "\t");}@Overridepublic void onCompleted() {System.out.println("onCompleted");}@Overridepublic void onError(Throwable e) {System.out.println(e.getMessage());}};Observer<String> observer2 = new Observer<String>() {@Overridepublic void onNext(String t) {System.out.print(t + "\t");}@Overridepublic void onCompleted() {System.out.println("onCompleted");}@Overridepublic void onError(Throwable e) {System.out.println(e.getMessage());}};ReplaySubject<String> publishSubject = ReplaySubject.create();publishSubject.subscribe(observer1);publishSubject.onNext("A");publishSubject.onNext("B");publishSubject.subscribe(observer2);publishSubject.onNext("C");publishSubject.onNext("D");publishSubject.onCompleted();System.out.println();}/**当Observable完成时AsyncSubject只会发布最后一条消息给已经订阅的每一个观察者,* 如果没有调用onCompleted则被观察者不会发送任何消息给观察者* 输出===================testAsyncSubject==========================* C   onCompleted* */private void testAsyncSubject() {Observer<String> observer = new Observer<String>() {@Overridepublic void onNext(String t) {System.out.print(t + "\t");}@Overridepublic void onCompleted() {System.out.println("onCompleted");}@Overridepublic void onError(Throwable e) {System.out.println(e.getMessage());}};AsyncSubject<String> publishSubject1 = AsyncSubject.create();publishSubject1.subscribe(observer);publishSubject1.onNext("A");publishSubject1.onNext("B");publishSubject1.onNext("C");AsyncSubject<String> publishSubject2 = AsyncSubject.create();publishSubject2.subscribe(observer);publishSubject2.onNext("A");publishSubject2.onNext("B");publishSubject2.onNext("C");publishSubject2.onCompleted();System.out.println();}
}

RxJava之PublishSubject、BehaviorSubject、ReplaySubject和AsyncSubject相关推荐

  1. RxJs Subject, AnonymousSubject, BehaviorSubject, ReplaySubject和AsyncSubject

    本文转载自公众号" zcx的工作室", 作者: zcx本人 原文链接:https://mp.weixin.qq.com/s/i14brW_Ok8JYGoBIcfhs5Q 原文标题: ...

  2. RxJava的四种Subjects:PublishSubject、ReplaySubject、BehaviorSubject、AsyncSubject的理解

    RxJava的四种Subjects:PublishSubject.ReplaySubject.BehaviorSubject.AsyncSubject的理解 Subject:它既是Observable ...

  3. RxJava开发精要2-为什么是Observables?

    原文出自<RxJava Essentials> 原文作者 : Ivan Morgillo 译文出自 : 开发技术前线 www.devtf.cn 转载声明: 本译文已授权开发者头条享有独家转 ...

  4. RxJava中常见的几种Subject

    RxJava是什么? 原文是这样描述的: RxJava is a Java VM implementation of Reactive Extensions: a library for compos ...

  5. RxJava Agera 从源码简要分析基本调用流程(2)

    2019独角兽企业重金招聘Python工程师标准>>> 版权声明:本文由晋中望原创文章,转载请注明出处:  文章原文链接:https://www.qcloud.com/communi ...

  6. RxSwift Subjects: ReplaySubject replaySubject BehaviorSub map

    ​​​​​​​ ReplaySubject  replaySubject BehaviorSub variable 变量 已经被弃用,推荐使用Behaviorsubject ReplaySubject

  7. 学着造轮子-RxLifeCycle

    使用RxJava的一个很大的优势就是线程的灵活切换,特别是Android开发,工作线程请求,主线程监听,这已经是最普通的常规操作,但是Activity和Fragment都是有生命周期的,如何让我们的请 ...

  8. 响应式编程在Android中的应用

    响应式编程简介 响应式编程是一种基于异步数据流概念的编程模式.数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流. 响应式编程的一个关键概念是事件.事件可以被 ...

  9. 带你入门学习Rxjava--上手教程

    相信各位看官对RxJava早有耳闻,那么关于什么是RxJava我就不再赘述了,不知道的可自行百度.网上的RxJava的入门门槛稍高,但入门不难,可以跟本文一起来学习 注: 本文针对rxjava 1.x ...

最新文章

  1. 如何安装“散装版”(如自己开发的)插件到VSCode
  2. android 不可点击状态,Android开机指引后notification应为不可点击状态
  3. gogs mysql 报错_docker上对gogs二次开发
  4. linux /proc/net/arp
  5. java面试题六 float变量合法声明
  6. 树莓派Pico的一些有趣的基本实验
  7. php mysqli报错,php安装扩展mysqli的实现步骤及报错解决办法
  8. 简述Struts2 Convention零配置
  9. (4)vue.js 基础语法
  10. python简说(十八)导入模块
  11. python batch_size_深度学习中的batch的大小对学习效果有何影响?
  12. mysql4函数_MySQL-4.常见函数
  13. matlab 提取图像轮廓(图像边缘提取)
  14. IBM服务器管理口IMM使用指南
  15. 远程控制家里电脑的N种方案
  16. 计算机存储单位试题,计算机存储试题及答案
  17. [java8] Stream流
  18. 华硕ac68u无线最佳设置_华硕AC86U,AC88U的掉线、断流问题何时彻底解决?
  19. 扫地机器人朋友圈文案_匠龙扫地机器人广告片创意脚本文案
  20. 关于图片转base64的加密解密

热门文章

  1. 基于Docker搭建私有镜像仓库
  2. Android 获取当前的时间。年月日,时分秒,星期几
  3. oracle 10g 报错:ORA-00257: archiver error. Connect internal only, until freed
  4. 支付宝小程序中Navigator和导航栏之间的区别以及用法场景的分析
  5. jquery.uploadify参数
  6. js调用WebService(复杂对象传入)[原创]
  7. 反转链表--清晰易懂的两种方法
  8. git clone 速度过慢
  9. jQuery 属性,css,文档操作
  10. Ubuntu14.04LTS打造实用的环境