以Observable为例,先上代码:

//①
ObservableJust<String> observable = (ObservableJust<String>) Observable.just("hello rxjava2");
//②ObservableSubscribeOn<String> subscribe = (ObservableSubscribeOn<String>) observable.subscribeOn(Schedulers.io());
//③ObservableObserveOn<String> observerOn = (ObservableObserveOn<String>) subscribe.observeOn(AndroidSchedulers.mainThread());
//④ObservableDoFinally<String> doFinally = (ObservableDoFinally<String>) observerOn.doFinally(new Action() {@Overridepublic void run() throws Exception {System.out.println("doFinally");}});
//⑤ObservableDoOnLifecycle<String> doOnSubscribe = (ObservableDoOnLifecycle<String>) doFinally.doOnSubscribe(new Consumer<Disposable>() {@Overridepublic void accept(Disposable disposable) throws Exception {System.out.println("doOnSubscribe: " + disposable.hashCode());}});
//⑥doOnSubscribe.subscribe(new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {System.out.println("onSubscribe: "+d.hashCode());/*  if (!d.isDisposed()){System.out.println("onSubscribe: dispose");d.dispose();}*/}@Overridepublic void onNext(String s) {System.out.println("onNext: "+s);Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();}@Overridepublic void onError(Throwable e) {System.out.println("onError: "+e.getMessage());Toast.makeText(MainActivity.this, e.getMessage(), Toast.LENGTH_SHORT).show();}@Overridepublic void onComplete() {System.out.println("onComplete");Toast.makeText(MainActivity.this, "onComplete", Toast.LENGTH_SHORT).show();}});

Observable传递

这里每次调用一个操作符,返回的都是Observable的直接子类或者间接之类.以just为例:

 public static <T> Observable<T> just(T item) {ObjectHelper.requireNonNull(item, "The item is null");return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));}

这里重新new了一个Observable的子类对象ObservableJust.

结论如下:

  1. 每个操作符都会对应返回一个Observable的子类对象,类名格式ObservableXXX然后去调用下一个操作符.比如interval操作符,返回的是ObservableInterval的实例对象.
  2. 对于Observable的创建型操作符,返回的是其直接子类,而其他操作符,返回的是AbstractObservableWithUpstream的子类对象.AbstractObservableWithUpstream的构造函数中,第一个参数就是Observable对象,这一点非常重要,这个参数是上一个操作符返回的Observable对象.这保证了整个调用流程的起始处的Observable对象能在整个流程中传递.

最后一步订阅subscribe(Observer).如果没有最下游的观察者对数据做接收,整个调用流程是不会执行的.
先从⑥开始看ObservableDoOnLifecycle的subscribe方法做了什么.

@Overrideprotected void subscribeActual(Observer<? super T> observer) {source.subscribe(new DisposableLambdaObserver<T>(observer, onSubscribe, onDispose));}

source就是上游操作符返回的Observable的子类对象,通过AbstractObservableWithUpstream的构造函数传递给下游的.这里去调用了上一个Observable对象的subscribe方法.这个调用由下至上,直到整个流程的起始处.

Observable对象先从上游逐步通过下游的Observable对象的构造函数传递给下游,再通过下游的subscribe方法,逐步去调用上游的subscribe方法.

Observer传递

订阅发生在最后一步调用subscribe(Observer).从第⑤步ObservableDoOnLifecycle的subscribe方法开始看.

 @Overrideprotected void subscribeActual(Observer<? super T> observer) {source.subscribe(new DoFinallyObserver<T>(observer, onFinally));}

重新创建一个DoFinallyObserver对象,并把第⑥步的Observer参数传入后,交给上游的Observable.这个调用流程会逐步传递到最上游的ObservableJust的subscribe方法.

//ObservableJust.java@Overrideprotected void subscribeActual(Observer<? super T> observer) {//参数observer是下游传上来的ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);observer.onSubscribe(sd);sd.run();}

首先调用了 observer.onSubscribe(sd);可以得到结论:

Observer的onSubscribe在主线程执行,无论上下游怎么切换线程.在请求网络时,可以在这个地方弹出进度提示或者做一些初始化操作.

ScalarDisposable.run()方法调用了下游的Observer传递数据,这个调用会逐步往下传递,直到最下游的Observer,如果没遇到错误或者异常情况.

Observer对象先从最下游的订阅处开始往上传递到最上游,再携带数据逐步往下游传递.

数据传递

从上面可以知道,数据是被Observer携带,逐步往下游传递

Observable.subscribe(Consumer,Consumer,Action)

有多个重载的方法

 //方法一@SchedulerSupport(SchedulerSupport.NONE)public final Disposable subscribe() {return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());}//方法二@CheckReturnValue@SchedulerSupport(SchedulerSupport.NONE)public final Disposable subscribe(Consumer<? super T> onNext) {return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());}//方法三@CheckReturnValue@SchedulerSupport(SchedulerSupport.NONE)public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());}//方法四@CheckReturnValue@SchedulerSupport(SchedulerSupport.NONE)public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete) {return subscribe(onNext, onError, onComplete, Functions.emptyConsumer());}//方法五@CheckReturnValue@SchedulerSupport(SchedulerSupport.NONE)public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe) {//创建LambdaObserver对象LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);subscribe(ls);return ls;}//方法六@SchedulerSupport(SchedulerSupport.NONE)@Overridepublic final void subscribe(Observer<? super T> observer) {observer = RxJavaPlugins.onSubscribe(this, observer);
// 省略subscribeActual(observer);//省略}

前五个方法最终在第五个方法内部重新创建了一个Observer类型对象LambdaObserver,然后调用了第六个方法.

RxJava2:Observable和Observer如何传递相关推荐

  1. java observer update_Java_观察者模式(Observable和Observer)

    java.util.Observable中有两个方法对Observer特别重要 ①setChanged()方法 /** * Sets the changed flag for this {@code  ...

  2. 你会用RxJS吗?【初识 RxJS中的Observable和Observer】

    概念 RxJS是一个库,可以使用可观察队列来编写异步和基于事件的程序的库. RxJS 中管理和解决异步事件的几个关键点: Observable: 表示未来值或事件的可调用集合的概念. Observer ...

  3. 设计模式之观察者模式(Observable与Observer)

    1.什么是观察者模式 简单情形:有A.B.C.D等四个独立的对象,其中B.C.D这三个对象想在A对象发生改变的第一时间知道这种改变,以便做出相应的响应或者对策. 上面的这种情形,就是观察者模式. 当然 ...

  4. Observable与Observer

    在Java中通过Observable类和Observer接口实现了观察者模式.一个Observer对象监视着一个Observable对象的变化,当Observable对象发生变化时,Observer得 ...

  5. java observer update_Java_观察者模式(Observable和Observer) -转

    一.观察者模式介绍 在Java中通过Observable类和Observer接口实现了观察者模式.一个Observer对象监视着一个Observable对象的变化,当Observable对象发生变化时 ...

  6. 4.Subject - Observable和Observer的复合体,也是二者的桥梁

    Subject Subject可以看成是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当了Observer和Observable的角色.因为它是一个Observer,它可 ...

  7. [译] Subject 和 Observable + Observer 的混淆指北[ Android RxJava2 ] ( 这什么鬼系列 ) 第八话...

    原文地址:Confusion between Subject and Observable + Observer [ Android RxJava2 ] ( What the hell is this ...

  8. JDK 9中不推荐使用Java的Observer和Observable

    在博客文章< 应用JDK 9 @Deprecated增强功能>中 ,我讨论了JDK 9中 @Deprecated批注中对forRemoval()和since()可选元素(方法)的添加 . ...

  9. JDK 9中已弃用Java的Observer和Observable

    在博客文章< 应用JDK 9 @Deprecated增强功能>中 ,我讨论了JDK 9中 @Deprecated批注中对forRemoval()和since()可选元素(方法)的添加 . ...

最新文章

  1. VC调试篇:减少运行时错误,中断所有异常
  2. 2017 CIO展望:新IT运营模式的5大元素
  3. WebAssembly 浏览器中运行c/c++模块
  4. 在账户确定类型KOFI没有找到总账科目
  5. Python中的AES加解密算法
  6. 配置 AEM CQ6 (author + publish + apache dispatcher + ubuntu )
  7. Centos5.5几种提高工作效率的方法
  8. 手把手教你从0开始建中台
  9. 使用JavaScript将小写金额转化为大写金额的两种常见方法
  10. 沃特世推出SELECT SERIES MRT多反射飞行时间质谱平台,树立高分辨质谱性能新标杆
  11. 微软服务器系统版本有几个,windows系统有几个版本
  12. 软件全屏使用时点击鼠标自动跳回桌面的问题
  13. 为什么要创建SRT?
  14. cad如何打开stp文件_用cad如何打开stp文件
  15. [CTF]2022美团CTF WEB WP
  16. C/C++时间字符串和时间戳的相互转化
  17. CBR的产生和Roger Schank
  18. ibm aix 抓包命令_在IBM AIX上模拟丢弃的TCP / IP数据包
  19. “燕云十六将”之Lorna(14)
  20. 遗传算法解整数规划IntCon

热门文章

  1. java备忘录模式应用场景_Java描述设计模式(24):备忘录模式
  2. quartz mysql数据源_配置quartz数据源的三种方式
  3. JAVA day27,28 线程池
  4. 第 7 章 原型模式
  5. Skywalking微服务监控分析
  6. oracle wm_concat listagg,oracle分析函数:四、listagg和wmsys.wm_concat
  7. python 除数总是提示为0_Python错误的处理方法
  8. html canvas绘制网格,canvas(七)绘制网格和坐标轴
  9. 小微型计算机期刊,小型微型计算机系统杂志
  10. java main生命周期_java的生命周期