RxJ ava Create 方法

为什么会有这个

RxJava框架现在出现已经有些年头了,如果有人问你你会不会用,可能大多数人都会说会。但是我被人问过一个我没有考虑过的问题,你知道Rxjava是怎么实现的吗?我。。。。。
所以就有了这一篇文章。


如果你想通过这篇文章学会Rxjava怎么用,这可能不会是一篇很好的文章,这里面有很多干扰你阅读的东西,和一些我现在还不懂的知识点。但是如果你想通过这篇文章找到我,然后对我说你这样理解不对,这将是一篇完美的文章,因为而且你还会得到一个陌生人的崇拜。

Create 方法


Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> emitter) throws Exception {// 这里是观察者发出声音的地方emitter.onNext("就是一段话");emitter.onComplete();}}).subscribe(new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {OutputUtil.printLn("绑定");}@Overridepublic void onNext(String s) {OutputUtil.printLn(s);}@Overridepublic void onError(Throwable e) {OutputUtil.printLn("错误");}@Overridepublic void onComplete() {OutputUtil.printLn("完成");}});复制代码

以上的代码非常的简单,就是创建一个被观察者,然后发出一段话,,然后结束。观察者就是接收到一段话,然后就完成了
运行的结果是:

绑定
就是一段话
完成
复制代码

这个结果里甚至都没有调用错误的办法,但是这个用法是Rxjava框架最基本的用法。我们就跟着源码的步骤开始阅读。

为什么

在开始之前我们先定义几个名字

  • Observable (可被观察者)
  • Emitter (发射器)
  • Observer (观察者)

从 可被观察者 的 create 方法中 进入源码是

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {// 在这里创建一个源头(可被观察者)ObjectHelper.requireNonNull(source, "source is null");// 装配一个可被观察者,这里传入的是发射器的操作方法return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));}
复制代码

在这里还记得我们 create 方法中做了什么吗? 我们实现了一个 ObservaleOnSubscribe 类

public interface ObservableOnSubscribe<T> {void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
复制代码

我们在 subscribe 方法中用调用了 emitter 对象 的 onNext() 方法,然后 观察者就收到了内容,是不是说emitter持有了观察者接口,然后去调用方法呢?

那我们重点看看 RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)) 方法中做了什么

ObservableCreatepublic final class ObservableCreate<T> extends Observable<T> {final ObservableOnSubscribe<T> source;public ObservableCreate(ObservableOnSubscribe<T> source) {this.source = source;}
...
复制代码

你看 ObservableCreate 继承自 Observable ,所以在实例化的时候 返回的是一个 可被观察者,同时这里持有了我们传入的 ObservableOnSubscribe 对象。那么我们在 ObservableOnSubscribe 中使用的 subscribe(ObservableEmitter emitter) 方法 哪里来的呢? 一步一步的走。

我们这里岔开一下,先介绍下 ObservableOnSubscribe 对象

public interface ObservableEmitter<T> extends Emitter<T> {void setDisposable(Disposable d);void setCancellable(Cancellable c);boolean isDisposed();ObservableEmitter<T> serialize();
}public interface Emitter<T> {void onNext(@NonNull T value);void onError(@NonNull Throwable error);void onComplete();
}
复制代码

这里面定义了我们在被观察者中最常用的三个方法,onNext onError onComplete

看完了 可被观察者 ,我们接下来就要通过订阅的方式,将观察者与被观察者结合。在这里是使用的 subscribe(Observer<? super T> observer) 方法

public final void subscribe(Observer<? super T> observer) {ObjectHelper.requireNonNull(observer, "observer is null");try {// hook 相关,略过observer = RxJavaPlugins.onSubscribe(this, observer);ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");// 这里要开始绑定了subscribeActual(observer);} catch (NullPointerException e) { // NOPMDthrow e;} catch (Throwable e) {Exceptions.throwIfFatal(e);// can't call onError because no way to know if a Disposable has been set or not// can't call onSubscribe because the call might have set a Subscription alreadyRxJavaPlugins.onError(e);NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");npe.initCause(e);throw npe;}}
复制代码
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {// 如果设定了自定义的挂钩方法,可以在这里执行.如果没有则为默认BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;if (f != null) {return apply(f, source, observer);}return observer;}
复制代码

一般情况下 我们不会添加 hook 部分代码,所以基本上这部分是没有操作的,.我们主要是在 subscribeActual(Observer<? super T> observer) 中进行绑定操作

public final class ObservableCreate<T> extends Observable<T> {final ObservableOnSubscribe<T> source;public ObservableCreate(ObservableOnSubscribe<T> source) {this.source = source;}
/*** 这里是开始绑定的操作,所有继承自 Observable 的类都要去做这件事* @param observer the incoming Observer, never null*/@Overrideprotected void subscribeActual(Observer<? super T> observer) {// 创建一个发射器CreateEmitter<T> parent = new CreateEmitter<T>(observer);// 观察者持有发射器,实现绑定开始,同时可以解除绑定observer.onSubscribe(parent);// 到这里 观察者 和 发射器 已经互相持有. 观察者已经掌握了去观察 被观察者 的能力// 但是这里还不能观察,因为 被观察者 还没有和 发射器 结合,观察者有消息不能发出try {// 被观察者 与 发射器 结合,被观察者拿到了 发射器,就能发出消息了source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}}
复制代码

这个方法存在于 ObservableCreate 类中, 里面包含了 发射器 和 观察者, 所以 可被观察者 在拿到这两个对象之后,通过 发射器 向 观察者 发射内容数据。完成之后的 onNext 和 onComplete 操作

重点是

// 被观察者 与 发射器 结合,被观察者拿到了 发射器,就能发出消息了
source.subscribe(parent);
复制代码

这里的source就是我们在实现 可被观察者 时实现的类 ObservableOnSubscribe 。这里我们在实现 可被观察者 的时候就能够调用其中传入的发射器,然后通过 发射器 调用 观察者 的 onNext 、 onComplete 等方法

/*** 创建发射机,这里发射器持有观察者,所以发射器可以将内容给到观察者*/static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {private static final long serialVersionUID = -3434801548987643227L;final Observer<? super T> observer;CreateEmitter(Observer<? super T> observer) {this.observer = observer;}@Overridepublic void onNext(T t) {if (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));return;}// 如果没有被 disposed,就发射给被观察者if (!isDisposed()) {observer.onNext(t);}}@Overridepublic void onError(Throwable t) {if (!tryOnError(t)) {RxJavaPlugins.onError(t);}}@Overridepublic boolean tryOnError(Throwable t) {if (t == null) {t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");}// 如果没有被 disposed 则会执行 onError 方法if (!isDisposed()) {try {observer.onError(t);} finally {// 然后一定会被 disposeddispose();}return true;}return false;}@Overridepublic void onComplete() {// 如果没有被 disposed 则会执行 onComplete 方法if (!isDisposed()) {try {observer.onComplete();} finally {// 然后一定会 disposeddispose();}}}}
复制代码

总结

  1. 只有Observable 和 Observer 没有被 disposed 时,才能够执行 发射内容的方法。其实这也就是说,当被 disposed 之后,就不会再去执行 观察者 中的方法,在 Android 生命周期中,观察者中的内容可能已经被释放。
  2. Observable和Observer订阅时,也就是执行 onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) 方法时 ,Observable才会开始发送数据。
  3. ObservableCreate将ObservableOnSubscribe(真正的源)转化为Observable,这里还可以通过ObservableCreate转化其他的操作符
  4. ObservableOnSubscribe(真正的源)需要的是发射器ObservableEmitter,其实是发射器来讲 可被观察者 的内容发送给观察者
  5. CreateEmitter将Observer->ObservableEmitter,同时它也是Disposable

参考阅读 : http://blog.csdn.net/zxt0601/article/details/61614799

转载于:https://juejin.im/post/5accd2cc6fb9a028da7cdf21

RxJava -- 从 create 开始 (一)相关推荐

  1. android ------- 开发者的 RxJava 详解

    在正文开始之前的最后,放上 GitHub 链接和引入依赖的 gradle 代码: Github:  https://github.com/ReactiveX/RxJava  https://githu ...

  2. 给 Android 开发人员的 RxJava 具体解释

    前言 我从去年開始使用 RxJava .到如今一年多了. 今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava .并且使用的场景越来越多 . 而 ...

  3. Android—RxJava库知识

    RXJAVA:一个在 Java VM 上使用可观测的序列来组成异步的.基于事件的程序的库. 优点:异步,逻辑简洁易懂. 程序要求:将一个给出的目录数组 File[] folders 中每个目录下的 p ...

  4. RxJava学习入门

    RxJava是什么 一个词:异步. RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-bas ...

  5. RxJava菜鸟驿站(一)

    前言 终究没有经受住RxJava的诱惑,只恨自己来的比较晚,走起~ RxJava 是什么? 一个在 Java VM 上使用可观测的序列来组成异步的.基于事件的程序的库~ Rx Java 有什么优势? ...

  6. Android响应式编程(一)RxJava前篇[入门基础]

    1.RxJava概述 ReactiveX与RxJava 在讲到RxJava之前我们首先要了解什么是ReactiveX,因为RxJava是ReactiveX的一种java实现. ReactiveX是Re ...

  7. 响应式编程android,Android响应式编程(一)RxJava[入门基础]

    1.RxJava概述 ReactiveX与RxJava 在讲到RxJava之前我们首先要了解什么是ReactiveX,因为RxJava是ReactiveX的一种java实现. ReactiveX是Re ...

  8. RxJava基本实现

    在使用RxJava前请先在AndroidStudio中配置gradle: //RxJavaimplementation "io.reactivex.rxjava3:rxjava:3.0.0& ...

  9. RxJava之背压策略

    转载请以链接形式标明出处: 本文出自:103style的博客 本文基于 RxJava 2.x 版本 目录 RxJava背压策略简介 Observable背压导致崩溃的原因 Flowable 使用介绍 ...

  10. EventBus、Rxjava、RxBus的定义、作用、使用方式及区别

    这篇文章是在读了别人的文章的基础上并结合工作中项目做的总结,总共分为以下几个部分,这只是简单总结,想了解更多原理的可以点击后面的相关链接: 1.什么是Evenbus?有什么用?怎么用? 2.什么是Rx ...

最新文章

  1. css游戏代码_介绍CSSBattle-第一个CSS代码搜寻游戏
  2. php javabean对象,Struts2 bean标签:创建并示例化一个JavaBean对象
  3. word2vec原理(一): 词向量、CBOW与Skip-Gram模型基础
  4. git pull 部分文件无法获取
  5. Java技术回顾之JNDI--实例
  6. android studio 导入包分不分动态静态,详解Android studio 动态fragment的用法
  7. Java中常用的6种排序算法详细分解
  8. 分布式光伏贷款欲破冰 多家银行推出相关业务
  9. python 取字符串的最后一位_python中获得一个字符串最后出现位置 | 学步园
  10. Eclipse-导入maven项目
  11. python如何处理inp文件_python 之文件操作
  12. Android SDK环境变量配置(windows和Linux)
  13. 淘宝上线独立搜索引擎一淘网
  14. 如何更好的把平时的工作记录下来?这款便签可以高效记录工作日志
  15. com.apple.Boot.plist 和SMBIOS.plist 的设置
  16. 计算机基础-路由器、光猫、交换机、电脑之间的连接关系
  17. html字体铺盖颜色,买被子也是有讲究的?这几种颜色的被子,再好看都别往卧室放!...
  18. 天天在做的数据可视化,才是企业数字化转型的关键
  19. uniapp小程序使用iconfont彩色图标
  20. vscode一键生成佛祖保佑永无bug

热门文章

  1. Pig 实现关键词匹配
  2. 如何用新安装的jdk替换掉Linux系统默认jdk
  3. vi-vim (十五):显示与设置选项
  4. 如何在DNN4下使用VS2005进行单元测试???
  5. Eclipes中AndbaseDemo 手动导入 Android studio
  6. 手把手0基础项目实战(三)——教你开发一套电商平台的安全框架
  7. [Python]小甲鱼Python视频第020课(函数:内嵌函数和闭包)课后题及参考解答
  8. 对C#中事件的简单理解
  9. vue项目下,webpack.js/package.json配置
  10. HTML DOM对象的属性和方法介绍(原生JS方法)