rxbus 源码_关于RxBus实现方式的思考
我们都知道当有了RxBus这种通信方式后,我们Activity A跳转到Activity B,然后B带值传到A中就很方便了,再也不需要之前的startActivityForResualt.
可是假如我们A带值到B去怎么做呢,RxBus是做不到的,传统的方式我们还在使用Intent 传过去,虽然这样做没有什么毛病,但是无疑当业务复杂后,key会特别的多.
那我就想能不能改进RxBus方式可以达到这种效果呢?
首先我们简单看一下网上一个RxBus实现源码,并不复杂
public class RxBus {
// 主题
private final FlowableProcessor bus;
// PublishSubject只会把在订阅发生的时间点之后来自原始Flowable的数据发射给观察者
private RxBus() {
bus = PublishProcessor.create().toSerialized();
}
public static RxBus getDefault() {
return RxBusHolder.sInstance;
}
private static class RxBusHolder {
private static final RxBus sInstance = new RxBus();
}
// 提供了一个新的事件
public void post(Object o) {
bus.onNext(o);
}
// 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
public Flowable toFlowable(Class eventType) {
return bus.ofType(eventType);
}
// 封装默认订阅
public Disposable toDefaultFlowable(Class eventType, Consumer act) {
return bus.ofType(eventType).compose(RxUtil.rxSchedulerHelper()).subscribe(act);
}
}
我之前用的RxBus实现方式是这样的当然还是1.x+版本
public class RxBus {
private static final String TAG = RxBus.class.getSimpleName();
private static Object async = new Object();
private static RxBus instance;
public static boolean DEBUG = false;
private ConcurrentHashMap> subjectMapper = new ConcurrentHashMap<>();
private RxBus() {}
public static RxBus get() {
synchronized (async){
if (null == instance)
instance = new RxBus();
return instance;
}
}
/**
* 注册事件
* @param tag
* @param clazz
* @param
* @return
*/
@SuppressWarnings("unchecked")
public Observable register(@NonNull Object tag, @NonNull Class clazz) {
List subjectList = subjectMapper.get(tag);
if (null == subjectList) {
subjectList = new ArrayList<>();
subjectMapper.put(tag, subjectList);
}
Subject subject;
subjectList.add(subject = PublishSubject.create());
if (DEBUG) Log.d(TAG, "[register]subjectMapper: " + subjectMapper);
return subject;
}
/**
* 反注册事件
* @param tag
* @param observable
*/
public void unregister(@NonNull Object tag, @NonNull Observable observable) {
List subjects = subjectMapper.get(tag);
if (null != subjects) {
subjects.remove(observable);
if (isEmpty(subjects)) {
subjectMapper.remove(tag);
}
}
if (DEBUG) Log.d(TAG, "[unregister]subjectMapper: " + subjectMapper);
}
/**
* 清空所有
*/
public void clear(){
if (subjectMapper != null)
subjectMapper.clear();
}
/**
* 发送事件
* @param content
*/
public void post(@NonNull Object content) {
post(content.getClass().getName(), content);
}
/**
* 发送事件
* @param tag
* @param content
*/
@SuppressWarnings("unchecked")
public void post(@NonNull Object tag, @NonNull Object content) {
List subjectList = subjectMapper.get(tag);
if (!isEmpty(subjectList)) {
for (Subject subject : subjectList) {
subject.onNext(content);
}
}
if (DEBUG) Log.d(TAG, "[send]subjectMapper: " + subjectMapper);
}
private boolean isEmpty(List subjectList) {
return (subjectList == null || subjectList.size() == 0);
}
}
对比一下可以看出来,之前的方式是创建一个map,map的key会存我们RxBus注册的事件也就是我们传进去的Key,而value却是一个List集合,
这种实现方式是这样的: 我们创建一个Map,然后在register中我们为map中的每一个value都创建一个List,当注册的时候根据Key拿到里面List,List如果不为空的话我们就create一个PublishSubject然后把它存进List中,这样每个Key对应的value(List)可能会有 0~N个PublishSubject,在post发送事件的时候根据key拿到map中的List然后遍历List调用每一个PublishSubject的
onNext()把事件发送出去. 这样实现是没什么问题,但是不仅有点麻烦,而且我们每次注册都需要create去创建一个PublishSubject,并且由于我们的RxBus是static静态的导致我们List中强引用每个对象所以我们调用结束后还需要unregister去反注册一下,感觉有点麻烦啊。
这样相比之下第一种方式比较简洁一些,第一种方式我们只创建一个PublishProcessor,所有注册RxBus的地方公用这一个PublishProcessor,
而我们post发送事件的时候也会把事件发送给每一个使用这个PublishProcessor注册的地方
我们看看PublishProcessor onNext()源码
@Override
public void onNext(T t) {
if (subscribers.get() == TERMINATED) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
for (PublishSubscription s : subscribers.get()) {
s.onNext(t);
}
}
我大胆猜测,我们在调用PublishProcessor 的subscribe()方法的时候就会把里面的参数传到类似一个集合中,然后在调用onNext()时候会统一调用集合中所有值的onNext(),我们看看subscribers.get()中subscribers是啥???
/** The array of currently subscribed subscribers. */
final AtomicReference[]> subscribers;
关于AtomicReference我们看一个解释这里暂时不做深究,只需要知道它只是一个原子性对象引用就好
AtomicReference
AtomicReference类提供了一种读和写都是原子性的对象引用变量。原子意味着多个线程试图改变同一个AtomicReference(例如比较和交换操作)将不会使得AtomicReference处于不一致的状态。AtomicReferenc的compareAndSet()方法可以使得它与期望的一个值进行比较,如果他们是相等的,AtomicReference里的对象会被设置成一个新的引用。
可以看到这里引用到的是一个PublishSubscription数组,现在来搜一下什么地方赋值这个数组了看看能不能搜到,
/**
* Tries to add the given subscriber to the subscribers array atomically
* or returns false if the subject has terminated.
* @param ps the subscriber to add
* @return true if successful, false if the subject has terminated
*/
boolean add(PublishSubscription ps) {
for (;;) {
PublishSubscription[] a = subscribers.get();
if (a == TERMINATED) {
return false;
}
int n = a.length;
@SuppressWarnings("unchecked")
PublishSubscription[] b = new PublishSubscription[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = ps;
if (subscribers.compareAndSet(a, b)) {
return true;
}
}
}
咦貌似在这个地方传递PublishSubscription进去然后正好赋值给我们上面的PublishSubscription数组
看看add()方法在哪被添加
@Override
public void subscribeActual(Subscriber super T> t) {
PublishSubscription ps = new PublishSubscription(t, this);
t.onSubscribe(ps);
if (add(ps)) {
// if cancellation happened while a successful add, the remove() didn't work
// so we need to do it again
if (ps.isCancelled()) {
remove(ps);
}
} else {
Throwable ex = error;
if (ex != null) {
t.onError(ex);
} else {
t.onComplete();
}
}
}
在这个方法中Subscriber参数和this被组装成一个PublishSubscription对象然后调用上面的add()方法传递进去
ok我们接下来回头看一看PublishProcessor 的subscribe()方法里做了些什么
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe() {
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
}
点进去看看
@CheckReturnValue
@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError,
Action onComplete, Consumer super Subscription> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaSubscriber ls = new LambdaSubscriber(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
LambdaSubscriber ls = new LambdaSubscriber(onNext, onError, onComplete, onSubscribe);
这里面是把onNext, onError, onComplete, onSubscribe上面的参数封装进LambdaSubscriber对象里
public LambdaSubscriber(Consumer super T> onNext, Consumer super Throwable> onError,
Action onComplete,
Consumer super Subscription> onSubscribe) {
super();
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onSubscribe = onSubscribe;
}
好上面貌似就一个subscribe(ls)方法,不是很复杂点进去看一看
@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final void subscribe(FlowableSubscriber super T> s) {
ObjectHelper.requireNonNull(s, "s is null");
try {
Subscriber super T> z = RxJavaPlugins.onSubscribe(this, s);
ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber");
subscribeActual(z);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Subscription has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
擦这里正好调用了上面分析的subscribeActual(z)方法,然后调用add()方法把参数组装成PublishSubscription然后加入到数组引用中,根据源码分析结果跟我们之前的猜测完全一致!
分析完毕,总结一下,第一种写法弊端在于create出大量的Subject对象比较耗用内存。
优点是我们使用hashmap存我们每个事件的key,我们可以制定只发送给注册某一个Key的List中的每一个事件。
第二种写法有点在于公用同一个Subject,占用内存小.
缺点是我们是根据事件的类型来存储发送事件的,如果几个界面注册了相同类型事件,那么发送这个事件所有的界面都会收到。
现在写法在于我们知道了Rx源码中就已经帮我们把注册的对象添加到数组中并且在我们调用onNext()的时候会帮我们遍历进行发送所以我们是不需要自己再创建数组去引用的,这样就不需要手动去移除了.
rxbus 源码_关于RxBus实现方式的思考相关推荐
- Spark源码分析:多种部署方式之间的区别与联系
作者:过往记忆 从官方的文档我们可以知道, Spark 的部署方式有很多种:local.Standalone.Mesos.YARN-..不同部署方式的后台处理进程是不一样的,但是如果我们从代码的角度来 ...
- 面试有没有看过spring源码_如何看Spring源码、Java每日六道面试分享,打卡第二天...
原标题:如何看Spring源码.Java每日六道面试分享,打卡第二天 想要深入的熟悉了解Spring源码,我觉得第一步就是要有一个能跑起来的极尽简单的框架,下面我就教大家搭建一个最简单的Spring框 ...
- 超级签名源码_企业签名和超级签名有哪些区别?
我们知道iOS系统对于非App Store中的应用是有安装限制的,而App Store严格的审核机制又将许多APP拒之门外,这令不少开发者们郁闷不已. 所以很多开发者们会选择苹果签名的方式,让自己的i ...
- 宝宝起名神器小程序源码_支持多种流量主模式
2022年马上到了,还不知道怎么给虎宝宝取名字么? 那么这款小程序源码就可以帮到你了,这款小程序支持输入姓氏自动起名. 不满意还可以点击换一换来找到满意的,支持起两个字或者三个字的名字. 另外也给该款 ...
- 新动态视频壁纸微信小程序源码_支持多种分类短视频-也有静态壁纸
这是一款主打动态视频壁纸的一款微信小程序源码,当然啦,里面也是有静态壁纸的. 其实这款小程序也可以说是短视频小程序都可以,该款小程序全采集,另外支持多种流量主!! 下载链接: 新动态视频壁纸微信小程序 ...
- 图片拼图微信小程序源码_支持多模板制作和流量主
介绍: 该款小程序支持多种流量主: 另外支持多种图形模板制作切割: 另外也支持长图合成等功能: 安装简单,新手容易上手,具体就不多说了大家自行研究吧!!!! 图片拼图微信小程序源码_支持多模板制作和流 ...
- 内核源码包打包成rpm方式
内核源码包打包成rpm方式 文章目录 内核源码包打包成rpm方式 第一部分,rpm包简单定制 一.rpm 制作前的环境准备 二.准备内核的源代码组件 1.下载地址 https://www.kernel ...
- 赚多多V10自动抢单系统源码_派单连单管理新增设置订单佣金
收到用户反馈需要连单设置时需要单独设置佣金,于是针对派单连单管理这一块做了个调整,新增了设置佣金栏目. 功能说明:派单时有设置佣金会按照设置的佣金进行计算,设置佣金为单商品价格的百分比,比如设置价格为 ...
- QQ会员抽奖系统引流源码_适合引流,营销,推广
简介: 今天分享一款qq会员抽奖系统源码,客户抽中QQ会员,提示需要分享到6个群后才能领取, 分享群后直接跳到自己想让加的群,纯暴力引流,适合引流,营销,推广:本程序无需后台. 安装步骤: 1.准备好 ...
- 新款趣味测试小程序源码_测试可用
如图,测试功能正常,免服务器免域名,设置几个安全域名即可. 安全域名及广告位替换位置已打包,有需要的自行下载. 新款趣味测试小程序源码_测试正常-PHP文档类资源-CSDN下载
最新文章
- 这些Java代码优化细节,你需要注意!
- 关于python:为什么我不能在打开的文件上两次调用read()?
- element 日期控件 限制开始日期和结束日期
- pdf转换成可编辑的word转换器
- python tornado对接权限中心的sdk封装
- php 输出mysql查询结果_php如何输出mysql查询结果
- 【优化算法】亨利气体溶解度优化算法(HGSO)【含Matlab源码 127期】
- 一起谈.NET技术,NET下RabbitMQ实践 [配置篇]
- 第六届全国大学生GIS应用技能大赛开发题答案(非官方)
- 后端向前端返回图片URL,并向后端传递base64格式URL
- 微软Windows聚焦锁屏壁纸存放目录
- 总结几个Linux系统中拷贝文件内容的方法
- Python数据分析第四课:数据的处理(数据合并、数据筛选、数据排序)
- defaults(default是什么职位)
- 【784. 字母大小写全排列】
- nvenc vs x264 对比(2)
- 一篇对于了解我自己,挖掘我自己,从而成长的文章
- poj解题报告——poj 1528 Perfection
- linux服务器看门狗服务,服务器watchdog看门狗的理解
- 你想知道你的计算机一秒能做多少次运算吗?
热门文章
- 网络编程入门(代码很详细)
- 计算机如何通过手机连接网络打印机,手机连接电脑打印机怎么设置
- win7计算机扫描仪,win7系统怎么用打印机扫描仪功能|win7系统扫描仪功能的使用方法...
- 操作系统——实验一(Linux基本操作)
- js实现xml转json和json转xml
- 分区混乱,C盘不是系统盘怎么办?
- SSM毕设项目国有资产管理系统3c938(java+VUE+Mybatis+Maven+Mysql)
- ESP8266连接天猫精灵(一)
- cobar mysql 性能,Cobar + MySQL 技術驗證(li)
- 济南oracle 认证费用,济南ORACLE管理培训价格