很早之前有看过别人实现的 RxBus , 当初也只是随意用用而已,没有想过去研究。今天看到 brucezz 天哥在群里分享了一把,自己也加入了讨论,下来还实践了一把,所以想借此篇进入到源码层,深刻体验下 RxBus 这辆 “兰博基尼” 的设计美感和独特魅力。

本篇文章已授权微信公众号 guolin_blog (郭霖)独家发布

RxBus

准备

关于简单的实现和用法,这篇文章已经很好的说明了。

推荐先看看 RxBus 的简单实现和用法。

解剖

-

让我们看看这辆车到底用了些什么?

Subject

SerializedSubject

PublishSubject

CompositeSubscription

从 Subject 开始发车

官方解释

这是 Subject 的中文解释:

Subject可以看成是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。

由于一个Subject订阅一个Observable,它可以触发这个Observable开始发射数据(如果那个Observable是"冷"的--就是说,它等待有订阅才开始发射数据)。因此有这样的效果,Subject可以把原来那个"冷"的Observable变成"热"的。

Subject 源码

源码:

public abstract class Subject extends Observable implements Observer {

protected Subject(OnSubscribe onSubscribe) {

super(onSubscribe);

}

public abstract boolean hasObservers();

public final SerializedSubject toSerialized() {

if (getClass() == SerializedSubject.class) {

return (SerializedSubject)this;

}

return new SerializedSubject(this);

}

Subject 只有两个方法。

hasObservers()方法的解释是:

Indicates whether the {@link Subject} has {@link Observer Observers} subscribed to it.

判断 Subject 是否已经有 observers 订阅了 有则返回 ture

toSerialized() 方法的解释是:

Wraps a {@link Subject} so that it is safe to call its various {@code on} methods from different threads.

包装 Subject 后让它可以安全的在不同线程中调用各种方法

为什么这个方法后就可以是线程安全了呢?

我们看到 toSerialized() 返回了 SerializedSubject 。我们先到这里打住,稍后我们再看看该类做了什么。

PublishSubject 解释

在 RxJava 里有一个抽象类 Subject,既是 Observable 又是 Observer,可以把 Subject 理解成一个管道或者转发器,数据从一端输入,然后从另一端输出。

Subject 有好几种,这里可以使用最简单的 PublishSubject。订阅之后,一旦数据从一端传入,结果会里立刻从另一端输出。

源码里给了用法例子:

PublishSubject subject = PublishSubject.create();

// observer1 will receive all onNext and onCompleted events

subject.subscribe(observer1);

subject.onNext("one");

subject.onNext("two");

// observer2 will only receive "three" and onCompleted

subject.subscribe(observer2);

subject.onNext("three");

subject.onCompleted();

串行化

官方文档推荐我们:

如果你把 Subject 当作一个 Subscriber 使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能导致同时(非顺序)调用,这会违反Observable协议,给Subject的结果增加了不确定性。

要避免此类问题,你可以将 Subject 转换为一个 SerializedSubject ,类似于这样:

mySafeSubject = new SerializedSubject( myUnsafeSubject );

所以我们可以看到在 RxBus 初始化的时候我们做了这样一件事情:

private final Subject BUS;

private RxBus() {

BUS = new SerializedSubject<>(PublishSubject.create());

}

为了保证多线程的调用中结果的确定性,我们按照官方推荐将 Subject 转换成了一个 SerializedSubject 。

SerializedSubject

该类同样是 Subject 的子类,这里贴出该类的构造方法。

private final SerializedObserver observer;

private final Subject actual;

public SerializedSubject(final Subject actual) {

super(new OnSubscribe() {

@Override

public void call(Subscriber super R> child) {

actual.unsafeSubscribe(child);

}

});

this.actual = actual;

this.observer = new SerializedObserver(actual);

}

我们发现,Subject 最后转化成了 SerializedObserver.

SerializedObserver

When multiple threads are emitting and/or notifying they will be serialized by:

Allowing only one thread at a time to emit

Adding notifications to a queue if another thread is already emitting

Not holding any locks or blocking any threads while emitting

一次只会允许一个线程进行发送事物

如果其他线程已经准备就绪,会通知给队列

在发送事物中,不会持有任何锁和阻塞任何线程

通过介绍可以知道是通过 notifications 来进行并发处理的。

SerializedObserver 类中

private final NotificationLite nl = NotificationLite.instance();

重点看看 nl 在 onNext() 方法里的使用:

@Override

public void onNext(T t) {

// 省略一些代码

for (;;) {

for (int i = 0; i < MAX_DRAIN_ITERATION; i++) {

FastList list;

synchronized (this) {

list = queue;

if (list == null) {

emitting = false;

return;

}

queue = null;

}

for (Object o : list.array) {

if (o == null) {

break;

}

// 这里的 accept() 方法

try {

if (nl.accept(actual, o)) {

terminated = true;

return;

}

} catch (Throwable e) {

terminated = true;

Exceptions.throwIfFatal(e);

actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));

return;

}

}

}

}

}

NotificationLite

知道哪里具体调用了之后,我们再仔细看看 NotificationLite 。

先来了解它到底是什么:

For use in internal operators that need something like materialize and dematerialize wholly within the implementation of the operator but don't want to incur the allocation cost of actually creating {@link rx.Notification} objects for every {@link Observer#onNext onNext} and {@link Observer#onCompleted onCompleted}.

It's implemented as a singleton to maintain some semblance of type safety that is completely non-existent.

大致意思是:作为一个单例类保持这种完全不存在的安全类型的表象。

刚我们在 SerializedObserver 的 onNext() 方法中看到 nl.accept(actual, o)

所以我们再深入到 accept() 方法中:

public boolean accept(Observer super T> o, Object n) {

if (n == ON_COMPLETED_SENTINEL) {

o.onCompleted();

return true;

} else if (n == ON_NEXT_NULL_SENTINEL) {

o.onNext(null);

return false;

} else if (n != null) {

if (n.getClass() == OnErrorSentinel.class) {

o.onError(((OnErrorSentinel) n).e);

return true;

}

o.onNext((T) n);

return false;

} else {

throw new IllegalArgumentException("The lite notification can not be null");

}

}

Unwraps the lite notification and calls the appropriate method on the {@link Observer}.

判断 lite 通知类别,通知 observer 执行适当方法。

通过 NotificationLite 类图可以看到有三个标识

ON_NEXT_NULL_SENTINEL (onNext 标识)

ON_COMPLETED_SENTINEL (onCompleted 标识)

OnErrorSentinel (onError 标识)

与 Observer 回调一致。通过分析得知 accept() 就是通过标识来判断,然后调用 Observer 相对应的方法。

CompositeSubscription

RxBus 这辆"兰博基尼"与 CompositeSubscription 车间搭配更好。

-

构造函数:

private Set subscriptions;

private volatile boolean unsubscribed;

public CompositeSubscription() {

}

public CompositeSubscription(final Subscription... subscriptions) {

this.subscriptions = new HashSet(Arrays.asList(subscriptions));

}

内部是初始化了一个 HashSet ,按照哈希算法来存取集合中的对象,存取速度比较快,并且没有重复对象。

所以我们推荐在基类里实例化一个 CompositeSubscription 对象,使用 CompositeSubscription 来持有所有的 Subscriptions ,然后在 onDestroy()或者 onDestroyView()里取消所有的订阅。

参考文章

熄火休息

能力有限,文章错误还望指出,有任何问题都欢迎讨论 :)

转载请注明出处。

最后送上我女神 Gakki , 开心最好 ( ´͈v `͈ )◞。

rxbus 源码_从 RxBus 这辆兰博基尼深入进去相关推荐

  1. 宝宝起名神器小程序源码_支持多种流量主模式

    2022年马上到了,还不知道怎么给虎宝宝取名字么? 那么这款小程序源码就可以帮到你了,这款小程序支持输入姓氏自动起名. 不满意还可以点击换一换来找到满意的,支持起两个字或者三个字的名字. 另外也给该款 ...

  2. 新动态视频壁纸微信小程序源码_支持多种分类短视频-也有静态壁纸

    这是一款主打动态视频壁纸的一款微信小程序源码,当然啦,里面也是有静态壁纸的. 其实这款小程序也可以说是短视频小程序都可以,该款小程序全采集,另外支持多种流量主!! 下载链接: 新动态视频壁纸微信小程序 ...

  3. 图片拼图微信小程序源码_支持多模板制作和流量主

    介绍: 该款小程序支持多种流量主: 另外支持多种图形模板制作切割: 另外也支持长图合成等功能: 安装简单,新手容易上手,具体就不多说了大家自行研究吧!!!! 图片拼图微信小程序源码_支持多模板制作和流 ...

  4. 赚多多V10自动抢单系统源码_派单连单管理新增设置订单佣金

    收到用户反馈需要连单设置时需要单独设置佣金,于是针对派单连单管理这一块做了个调整,新增了设置佣金栏目. 功能说明:派单时有设置佣金会按照设置的佣金进行计算,设置佣金为单商品价格的百分比,比如设置价格为 ...

  5. QQ会员抽奖系统引流源码_适合引流,营销,推广

    简介: 今天分享一款qq会员抽奖系统源码,客户抽中QQ会员,提示需要分享到6个群后才能领取, 分享群后直接跳到自己想让加的群,纯暴力引流,适合引流,营销,推广:本程序无需后台. 安装步骤: 1.准备好 ...

  6. 新款趣味测试小程序源码_测试可用

    如图,测试功能正常,免服务器免域名,设置几个安全域名即可. 安全域名及广告位替换位置已打包,有需要的自行下载. 新款趣味测试小程序源码_测试正常-PHP文档类资源-CSDN下载

  7. 虚拟商品帐号交易平台源码_支持个人二维码收款

    精仿淘手游马上有号账号交易平台源码支持个人二维码收款,安装非常简单,支持个人二维码收款,可以运营精仿马上有号账号交易平台源码 支持个人二维码收款 安装教程: PHP版本一定要选择5.2 1.先修改配置 ...

  8. 最新酒桌小游戏喝酒小程序源码_带流量主源码下载

    2022最新酒桌小游戏喝酒小程序源码_带流量主 喝酒神器3.6,我修改增加了广告位,根据文档直接替换即可,原版本没有广告位 直接上传源码到开发者端即可 通过后改广告代码,然后关闭广告展示提交,通过后打 ...

  9. 2022最新酒桌小游戏喝酒小程序源码_带流量主

    2022最新酒桌小游戏喝酒小程序源码_带流量主 喝酒神器3.6,我修改增加了广告位,根据文档直接替换即可,原版本没有广告位 直接上传源码到开发者端即可 通过后改广告代码,然后关闭广告展示提交,通过后打 ...

  10. 分享一款超多功能工具箱组合微信小程序源码_支持流量主,无需服务器和域名!适合小白

    分享一款超多功能工具箱组合微信小程序源码_支持流量主,无需服务器和域名!适合小白操作! 简介: 超多功能工具箱组合微信小程序功能实用性质特别的高,用户还能覆盖的广一些具体功能列表如下: 1.证件照制作 ...

最新文章

  1. Maven报错解决:Element 'dependency' cannot have character [children], because the type's content type is
  2. DataFrame(8):DataFrame运算——逻辑运算(用于筛选数据) 含有~
  3. Linux运行jmeter
  4. 成功跳槽百度工资从15K涨到28K,已整理成文档
  5. 不懂电脑如何买电脑_买电脑交智商税?5分钟看懂笔记本电脑配置
  6. SQL数值计算函数之round(X,D)
  7. visio网络拓扑图 下载_Visio2019软件下载及安装教程
  8. java按年月季度统计折线图_拆线图按年、按月,按天统计,前端传时间只要起始时间与结束时间...
  9. 安装 | MATLAB2020a (64位) 安装教程及安装包下载链接
  10. zepto部分报错及解决方案
  11. bin文件的安装方法
  12. (Research)肝肿瘤免疫微环境亚型和中性粒细胞异质性
  13. R语言建立Cox回归模型(包含所有协变量)比较不同治疗方法生存率的差异、predict函数对cox模型进行新数据的预测、计算不同样本的风险比HR(hazard ratio)
  14. 20155322 2016-2017-2 《Java程序设计》第8周学习总结
  15. 竖屏java转横屏_android设置横屏和竖屏的方法
  16. 网易云音乐导出歌单-速食版
  17. java程序设计实验报告代写_代写file I/O作业、代写java Scanner I/O程序、代写java编程作业、代做java实验报告...
  18. RxSwift序列—Subject
  19. Unbuntu搭建pjsua实现自动拨号与自动播放语音
  20. 泛微OA系统多版本存在命令执行漏洞

热门文章

  1. 进化计算(九)——MOEA/D代码实现及中文详解(Matlab)
  2. Linux内核学习(三)应用层和内核
  3. 指针数组和二维数组指针
  4. 华为鸿蒙操作系统国美通讯,国美通讯(600898)03月14日14:30大单揭秘
  5. C++ 虚函数语义学
  6. Photoshop学习笔记
  7. 《软件工程导论》考试复习题集锦
  8. 心理测试软件需求分析报告,大学生心理测试软件心理测评档案管理系统
  9. 远程控制软件TeamViewer轻松解决企业运维难题
  10. 基于软件仿真的PLC系统测试技术