不忘初心 砥砺前行, Tomorrow Is Another Day !

本文概要:

  1. Observable的创建
  2. subscribe订阅过程
  3. 发送事件
  4. 线程切换过程

1. Observable的创建

对应源码

//Observable.java
@CheckReturnValue@SchedulerSupport(SchedulerSupport.NONE)public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {//参数检查ObjectHelper.requireNonNull(source, "source is null");//装配Observable,返回一个ObservableCreate对象return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));}//钩子方法@NonNullpublic static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {Function<? super Observable, ? extends Observable> f = onObservableAssembly;if (f != null) {return apply(f, source);}return source;}@NonNullstatic <T, R> R apply(@NonNull Function<T, R> f, @NonNull T t) {try {return f.apply(t);} catch (Throwable ex) {throw ExceptionHelper.wrapOrThrow(ex);}}复制代码

当我们通过Create方法创建一个Observable对象时,

  • 参数检查,然后装配Observable,返回一个ObservableCreate对象,它持有一个ObservableOnSubscribe对象,ObservableCreate是Observable的子类

    • 装配流程: 执行钩子方法,对即将创建的Observale进行预处理,可以通过onObservableAssembly进行设置.

2. subscribe订阅过程

当我们创建完Observable对象时,会调用subscribe去绑定观察者Observer.所以直接看该方法.

对应源码

//Observable.java
@SchedulerSupport(SchedulerSupport.NONE)@Overridepublic final void subscribe(Observer<? super T> observer) {ObjectHelper.requireNonNull(observer, "observer is null");try {observer = RxJavaPlugins.onSubscribe(this, observer);ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");subscribeActual(observer);//此方法是重中之重.} catch (NullPointerException e) { // NOPMDthrow e;} catch (Throwable e) {Exceptions.throwIfFatal(e);// can't call onError because no way to know if a Disposable has been set or not// can't call onSubscribe because the call might have set a Subscription alreadyRxJavaPlugins.onError(e);NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");npe.initCause(e);throw npe;}}
复制代码

同样的方式,对我们传入的Observer观察者对象进行检查以及预处理,最终调用subscribeActual方法,该方法是一个抽象方法.所以我们直接看它的实现类,也就是ObservableCreate对象下的subscribeActual方法.

对应源码

//ObservableCreate.javapublic ObservableCreate(ObservableOnSubscribe<T> source) {this.source = source;}@Overrideprotected void subscribeActual(Observer<? super T> observer) {//1. 创建一个发射器对象CreateEmitter<T> parent = new CreateEmitter<T>(observer);//2. 回调onSubscribe,通知订阅成功observer.onSubscribe(parent);try {//3. 回调subscribe,开始发送事件.//source对象就是创建被观察者传入的.source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}}
复制代码

上面注释已经写得很详细了这里再重复一次.整个订阅过程就是.

  1. 创建一个发射器对象,绑定观察者Observer
  2. 回调onSubscribe,通知订阅成功
  3. 回调subscribe,开始发送事件

接着我们来看发射器对象CreateEmitter是如何将事件发送给订阅者的.

3. 发送事件

对应源码

//ObservableCreate#CreateEmitter.java
static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {private static final long serialVersionUID = -3434801548987643227L;final Observer<? super T> observer;CreateEmitter(Observer<? super T> observer) {this.observer = observer;}@Overridepublic void onNext(T t) {if (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));return;}if (!isDisposed()) {//回调onNextobserver.onNext(t);}}@Overridepublic void onError(Throwable t) {if (!tryOnError(t)) {RxJavaPlugins.onError(t);}}@Overridepublic boolean tryOnError(Throwable t) {if (t == null) {t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");}if (!isDisposed()) {try {//回调onErrorobserver.onError(t);} finally {//最后断开订阅dispose();}return true;}return false;}@Overridepublic void onComplete() {if (!isDisposed()) {try {//回调onComplete.observer.onComplete();} finally {//最后断开订阅dispose();}}}@Overridepublic void setDisposable(Disposable d) {DisposableHelper.set(this, d);}@Overridepublic void setCancellable(Cancellable c) {setDisposable(new CancellableDisposable(c));}@Overridepublic ObservableEmitter<T> serialize() {return new SerializedEmitter<T>(this);}@Overridepublic void dispose() {DisposableHelper.dispose(this);}@Overridepublic boolean isDisposed() {return DisposableHelper.isDisposed(get());}}//DisposableHelper.java//断开订阅方法public static boolean dispose(AtomicReference<Disposable> field) {Disposable current = field.get();Disposable d = DISPOSED;if (current != d) {current = field.getAndSet(d);if (current != d) {if (current != null) {current.dispose();}return true;}}return false;}复制代码

从上面注释可知,发射器CreateEmitter直接回调了观察者Observer的相关方法.当调用dispose断开dispose订阅时,此时和线程中断处理一样,仅仅只是作为一个标识,标识当前发射器已经被中断.

这里最后给出一张关系图对上面流程进行归纳.

Observable的创建-订阅-发送事件过程

4. subscribeOn过程

从上面可知,RxJava的整体流程框架还是挺清晰的,但有时我们需要多它进行一些附加的操作如切线程,map,filter进行转换等.这里以切线程为例进行分析看如何工作的.

@CheckReturnValue@SchedulerSupport(SchedulerSupport.CUSTOM)public final Observable<T> subscribeOn(Scheduler scheduler) {ObjectHelper.requireNonNull(scheduler, "scheduler is null");//新建一个ObservableSubscribeOn.return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));}复制代码

当我们切换下游线程时,也返回了一个新建的Observable-ObservableSubscribeOn.接着我们直接看ObservableSubscribeOn类.

对应源码

//ObservableSubscribeOn.java
//中间的Observable.
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {super(source);//上游的源Observablethis.scheduler = scheduler;}@Overridepublic void subscribeActual(final Observer<? super T> s) {//创建一个中间的Observer//创建一个中间的Observer//创建一个中间的Observer//重要的话说三遍.final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);//回调onSubscribe,通知订阅成功s.onSubscribe(parent);//切换线程parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));}//线程任务类SubscribeTask
final class SubscribeTask implements Runnable {private final SubscribeOnObserver<T> parent;SubscribeTask(SubscribeOnObserver<T> parent) {this.parent = parent;}@Overridepublic void run() {//回调subscribe,进行订阅//source是源Observable,parent则是中间Observer,不是最终目标的Observer.这里一定要清楚.source.subscribe(parent);}}
复制代码

从上面注释可知,当我们调用subscribeOn切换上游线程时

  1. 创建一个中间的Observable,然后在这个Observable里面会创建一个中间的Observer
  2. 最后在线程任务的run方法里面通过将源Observable与中间的Observer进行绑定订阅,从而进行切换了上游线程.
  3. 中间的Observer接到相应回调时则会继续往上回调源Observer.

这也就是解释了最开始文章所说的subscribeOn无论调用几次,为什么只有第一次是生效的.因为每次都创建新的Observable与Observer,线程调度器里将源Observable与中间的Observer进行绑定订阅时,源Observable仅仅是指上一个,已经不是第一个创建出来的.

通过上面流程分析可以总结出RxJava操作符一些通用的流程.对于Map等操作符都可以参考.如图

Observable附加操作通用流程

5. observeOn过程

最后我们根据上面总结出的通用流程,去分析下切换下游线程的过程.


@CheckReturnValue@SchedulerSupport(SchedulerSupport.CUSTOM)public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {ObjectHelper.requireNonNull(scheduler, "scheduler is null");ObjectHelper.verifyPositive(bufferSize, "bufferSize");//返回一个新建的Observablereturn RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));}复制代码

同样当调用observeOn过程切换下游线程时,果不其然也返回了一个中间的Observable-ObservableObserveOn.接着看ObservableObserveOn代码.

对应源码

//中间的Observable
//ObservableObserveOn.java
@Overrideprotected void subscribeActual(Observer<? super T> observer) {if (scheduler instanceof TrampolineScheduler) {source.subscribe(observer);} else {//线程调度器Scheduler.Worker w = scheduler.createWorker();//回调subscribe进行订阅(中间的Observer-ObserveOnObserver)source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));}}
复制代码

同样,果不其然

  1. 创建了一个中间的Observer-ObserveOnObserver,这个中间的Observer绑定了线程调度器Scheduler.
  2. 接着将源Observable与它绑定订阅.
  3. 最后接收到事件时,则通过中间Observer的线程调度器去回调目标Observer.

之前我们知道每次执行observeOn都会切换一次下游线程,从上面源码可知,每次都会新建一个中间的Observer绑定新指定的线程调度器,所以接收事件都是在新的线程中执行啦.

至此,RxJava基本原理就差不多分析完成,最重要的是记住两张流程图,都遵循这个规律.

由于本人技术有限,如有错误的地方,麻烦大家给我提出来,本人不胜感激,大家一起学习进步.

四. RxJava之基本原理相关推荐

  1. 排序算法四:归并排序基本原理以及Python实现

    1. 基本原理 归并排序建立在归并操作上的一种算法.该算法是采用分治法(Divide and Conquer)的一个非常典型的应用.归并排序是将两 个已经有序的序列合成一个有序的序列的过程. 因此,对 ...

  2. RxJava相关 - 收藏集 - 掘金

    一篇文章带你走通 OkHttp+Retrofit+Rxjava - Android - 掘金 一篇文章带你走通 OkHttp+Retrofit+Rxjava @(Android)[android] . ...

  3. 掌握RxJava的葵花宝典

    本文授权发布公众号[刘桂林],星球[Hi Android] 各位少侠,老夫在黑木崖恭候大驾,欲练此功,必先哈哈. 今天我们分享的是RxJava的知识点,让你快速掌握,所以我们会从0开始讲带RxJava ...

  4. 第四章 程序设计语言基础知识

    一. 程序语言的基本概念 1.低级语言与高级语言 1)低级语言:汇编 2)高级语言: 常见的有Java.C.C++.PHP.Pyhton.Delphi 等. 2.编译形式:汇编.解释.编译. 3.程序 ...

  5. Zookeeper知识汇总

    文章目录 一.简介 二.CAP定理 三.Zookeeper的特性 四.Zookeeper基本原理 4.1 Zookeeper系统架构 4.2 Zookeeper Server的状态与角色 4.2.1 ...

  6. Linux安装prometheus+grafana监控

    一.在业务中遇到服务器负载过高问题,由于没有监控,一直没发现,直到业务方反馈网站打开速度慢,才发现问题.这样显得开发很被动.所以是时候搭建一套监控系统了. 由于是业余时间自己捯饬,所以神马业务层面的监 ...

  7. (附代码)数独大作业【读取数独,解数独,生成唯一解数独(随机,特定形状,不同难度生成),玩数独】

    注:未经同意不要转载. 上学期简单的做了一个数独程序,实现了一些功能,想简单的为大家提供的思路. 为了避免某些情况出现,具体代码暂时先不发了,有不太懂的地方可以评论提问啊. 下面是我的具体报告: 一, ...

  8. 尚硅谷-SpringSecurity

    一.SpringSecurity是什么 SpringSecurti基于Spring框架,提供了一套Web应用安全性的完整解决方案 一般来说,Web应用的安全性包括用户认证(Authenticataio ...

  9. 操作系统C语言模拟内存分配算法的模拟实现

    使用一个一维数组来模拟内存储空间,建立内存块来记录内存分配使用情况,通过随机产生进程及其所需要的内存来模拟真实的进程.通过给进程分配内存及回收来实现对动态不等长存储管理方法. 代码 #include ...

最新文章

  1. 读大叔深入理解javascript(2)
  2. vb链接远程mysql数据库代码_vb链接远程mysql数据库代码
  3. linq语句复杂查询和分开查询的性能对比
  4. hystrix 源码 线程池隔离_基于hystrix的线程池隔离
  5. 图文结合分析Spring的面向切面编程--AOP
  6. 在android中使用USB进行通信的4种方法
  7. java 如何去掉http debug日志_你居然还去服务器上捞日志,搭个日志收集系统难道不香么!...
  8. leetCode-数组:Remove Duplicates from Sorted Array
  9. 【手把手带你入门深度学习之150行代码的汉字识别系统】学习笔记 ·002 训练神经网络
  10. 每天一算法(一)——用链表实现加减乘运算
  11. Delphi之TClientSocket和TServerSocket使用tcp keepalive心跳机制实现“断网”、断电检测...
  12. JavaScript运动应用一
  13. JAVA判断素数法+引用方法
  14. 三菱FX PLC编程口通讯协议详解
  15. Qt 多语言切换——Qt语言家
  16. 苹果8a1660是什么版本_苹果7a1660是什么版本
  17. 30岁学前端晚不晚?别被年龄定义你的人生!
  18. 写口算用计算机作文600字,口算比赛作文600字
  19. python面向对象实验一之烤地瓜
  20. 「构建企业级推荐系统系列」推荐系统与精细化运营

热门文章

  1. Mongodb数据库的基本操作
  2. Reactive Extensions(Rx) 学习
  3. 再遭质疑:Chrome、Safari自动填信息可能会泄密
  4. 在Mac配置adb命令
  5. Linux平台-×××
  6. python-django框架中使用docker和elasticsearch配合实现搜索功能
  7. 巨杉数据库中标东莞农商银行非结构化内容管理平台项目
  8. cocos creator怎么隐藏组件(setVisible)
  9. NA-NP-IE系列实验35:标准ACL
  10. .NET Compact Framework下SQL CE的使用