Rxjava 源码学习(一):基本流程分析 - 知乎Rxjava 源码版本:Rxjava2.2.8 1. Rxjava 的基本实现首先看一下最简单的例子,具体查看其内部实现: 通过以下代码查看 Rxjava 的典型使用: Observable.create(new ObservableOnSubscribe<String>() { @Over…https://zhuanlan.zhihu.com/p/307650066

概念共识

        1:Observable是被观察者,Observer是观察者,subsrcibe是让被观察者和观察者之间建立订阅关系。

      2:事件的发射自 上游到下游,事件的消费订阅:自下游到上游。

     3: 创建一个 ObservableCreate对象继承 Observable, 后面的发射事件,消费事件都是在这个对象中展开

1: Observable.create(new ObservableOnSubscrible())

     这也是Rxjava事件中的 :第一条线

     1. 仅仅是创建了 一个 Observerable 对象,这是一个抽象类并且实现 ObservableSource接口

public abstract class Observable<@NonNull T> implements ObservableSource<T>

2. Observerable定义了生成各种操作符函数,比如 Map操作符函数

public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {Objects.requireNonNull(mapper, "mapper is null");return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
}

3:定义了关键函数 subscribe()来让 Observable和Observer产生订阅关系

4:定义 抽象函数 subscribeActual(Observer observer) ,订阅函数subscribe()最终会调用 实现类的 subscribeActual(Observer observer)函数

protected abstract void subscribeActual(@NonNull Observer<? super T> observer);

2: 通过subscribe 来产生订阅关系。

     这也是Rxjava事件中的 :第二条线

1: Observable 对象通过 subscribe 与 Observer 产生订阅关系

2:subscribe 订阅关系是一条很重要的流程线,通过创建各种操作符下的 Observable对象,可以实现从:上游到下游的事件发射。此时创建订阅关系是由最后一个  Observable对象开始的,最后要给Observable来观察事件,所以事件订阅的流程就是:从下游到上游。

3:事件发生器(Emitter)

这也是Rxjava事件中的 :第三条线

1:  Observer、Observable 分别和事件发生器(Emitter) 产生关联,并且通过回调来到事件发射现场Observable。

2:  根据产生的订阅链自上游到下游发布事件。

事件流可以自上而下进行下去,原因是 Observable 操作符 得到的还是 Observable,通过通过 Observable.subsribe 方法实现订阅关系。

4:基本使用

5: 代码示例

Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Object> observer) throws Throwable {Log.d(TAG, "步骤二:发射事件");observer.onNext("步骤二发射事件");}}).subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {Log.d(TAG, "步骤一:不改变Disposable布尔值让其消费事件");}@Overridepublic void onNext(@NonNull Object o) {Log.d(TAG, "步骤三消费:" + o.toString());}@Overridepublic void onError(@NonNull Throwable e) {}@Overridepublic void onComplete() {}});打印结果:步骤一:不改变Disposable布尔值让其消费事件步骤二:发射事件步骤三消费:步骤二发射事件

6:源码分析:

1 :Observable.create(new ObservableOnSubscribe<Object> ())

实际上是:构造了 ObservableCreate对象,这个就是最初的发射事件对象

Observable.javapublic static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {Objects.requireNonNull(source, "source is null");return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}

2:  Observable.subscribe(new Observer())

利用其多态的性质,此时Observable的实现类是 :步骤一的 ObservableCreate对象。随意subscribe(new Observer()) 最终会调用 ObservableCreate.subscribeAcutal() 函数

Observable.javapublic final void subscribe(@NonNull Observer<? super T> observer) {Objects.requireNonNull(observer, "observer is null");try {observer = RxJavaPlugins.onSubscribe(this, observer);Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");// 利用多态的性质,调用 ObservableCreate的 subscribeAcutal(observer)函数subscribeActual(observer);} catch (NullPointerException e) { // NOPMDthrow e;} catch (Throwable e) {Exceptions.throwIfFatal(e);// can't call onError because no way to know if a Disposable has been set or not// can't call onSubscribe because the call might have set a Subscription alreadyRxJavaPlugins.onError(e);NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");npe.initCause(e);throw npe;}}

3: ObservableCreate 会先调用 observer.onScbscribe(createEmitter) 函数,然后回调

source.subscribe(createEmitter) 函数,(  这里的source就是 ObservableOnSubscribe)

通过createEmitter 来发射事件

ObservableCreate.java
protected void subscribeActual(Observer<? super T> observer) {CreateEmitter<T> parent = new CreateEmitter<>(observer);// Observable发射事件后,先调用 Observer的 onSubscribe()函数,来决定后面是否消费事件observer.onSubscribe(parent);try {// 通过source来发射事件source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}
}Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Object> observer) throws Throwable {Log.d(TAG, "步骤二:发射事件");// 通过source来发射事件observer.onNext("步骤二发射事件");}})ObservableCreate.javastatic final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {private static final long serialVersionUID = -3434801548987643227L;final Observer<? super T> observer;CreateEmitter(Observer<? super T> observer) {this.observer = observer;}//  呼应 上面: 通过source来发射事件 流程@Overridepublic void onNext(T t) {if (t == null) {onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));return;}// 呼应上面 : 先调用 Observer的 onSubscribe()函数,来决定后面是否消费事件if (!isDisposed()) {observer.onNext(t);}}

4 :如果在 Observer的 onSubscribe(Disposable) 中没有解除订阅流程,那么就可以让 消费者Observer来消费事件了

7:Schedulers: 指定线程

RxJava从入门到精通:RxJava源码初步分析相关推荐

  1. mysql80从入门到精通配套源码

    深入浅出mysql 跟 mysql从入门到精通哪本书比较好 差不多吧,个人觉得买一本看就行了,mysql作为一个开源的数据库在网上能找到很多相关的资料,当然,先买本书系统的学习一下也是很有必要的. 本 ...

  2. 微软AJax.net源码初步分析(2)--服务执行流程

    我以一个最简单的helloworld为例,演示AJax.net源码中调用后台服务的机制,只是列出一些大体的框架,具体细节我还在研究中:) 不当之处,欢迎指正. 我先把例子中的核心代码列出,方便大家阅读 ...

  3. Picasso源码初步分析

    Picasso工作原理 我们知道使用Picasso最简单的方式来显示一张图片的方法就是如下了 Picasso.with(this).load("http://192.168.56.1:808 ...

  4. 快速开发架构Spring Boot 从入门到精通 附源码

  5. RxJava Agera 从源码简要分析基本调用流程(2)

    2019独角兽企业重金招聘Python工程师标准>>> 版权声明:本文由晋中望原创文章,转载请注明出处:  文章原文链接:https://www.qcloud.com/communi ...

  6. java中batch基础_详解Spring batch 入门学习教程(附源码)

    详解Spring batch 入门学习教程(附源码) 发布时间:2020-09-08 00:28:40 来源:脚本之家 阅读:99 作者:achuo Spring batch 是一个开源的批处理框架. ...

  7. 微信公众平台开发教程(四) 实例入门:机器人(附源码)

    微信公众平台开发教程(四) 实例入门:机器人(附源码) 上一篇文章,写了基本框架,可能很多人会觉得晕头转向,这里提供一个简单的例子来予以说明,希望能帮你解开谜团. 一.功能介绍 通过微信公众平台实现在 ...

  8. 用机器学习进行学生成绩预测的数据分析(入门向 附可用源码)

    用机器学习进行学生成绩预测的数据分析(入门向 附可用源码) 声明 思路 检查数据 图像化处理 分析 相关性分析 构建模型 代码实现 可运行代码 声明 文章代码修改于kaggle博主DIPAMVASAN ...

  9. 视频教程-React 全家桶从入门到实战到源码-其他

    React 全家桶从入门到实战到源码 上市公司前端开发工程师,专注于 React 技术栈,对 React 全家桶从 react-router 路由到 Redux 状态管理工具再到 webpack 打包 ...

最新文章

  1. word文档怎么给数字加千分符_Word中如何将文档中的金额数值设置为财务数字中的千分位格式...
  2. Android 快捷方式的创建与查询 快捷方式问题大全 获取快捷方式在Launcher数据库中的信息 Failed to find provider info for com.android.la
  3. leetcode算法题--预测赢家★
  4. pytorch手动实现梯度下降法,随机梯度法--基于logistic Regression并探索Mini batch作用
  5. 阿里云yum源安装SVN失败的问题
  6. elasticsearch httpclient认证机制
  7. OrderAnalyticsController.initializeCachedDB - jdbc
  8. osg中三维模型的位置变换
  9. rsync 配置详解
  10. 长沙理工 ACM 数位 DP 1488
  11. 反编译 破解crash html editor winform [WinHTMLEditorControl.dll][.NET Win HTML Editor Control]
  12. snap7库C++版本对PLC数据的读写
  13. WORD之文字处理之插入复合条饼图
  14. Darknet - 模型 (.weights) 重命名
  15. 【互动媒体技术】有关十二个“一”的文艺创作-拓展
  16. yum search htppd 的意思
  17. Matlab作图后的各种调整方法——线条、坐标、标题、图例
  18. Java的时间类库joda教程
  19. 关于Synergy无法启动
  20. pyqt5 日历设计 QSS

热门文章

  1. 端口扫描器(masscan)
  2. 1.人工智能早期知识表示方法
  3. php mysql中文乱码怎么解决_php读取mysql中文乱码怎么解决?
  4. 阿ken的HTML、CSS的学习笔记_文本样式属性(笔记三)
  5. fastadmin框架前台常用语句
  6. 质量保障(QA)和质量控制(QC)
  7. 以太坊智能合约之如何执行智能合约?
  8. 电阻电容相同容量不同封装的区别
  9. python解决迅雷下载限制的方法
  10. 11.0592 M晶振