1.简单介绍

1.1.发布/订阅事件主要用于网络请求的回调。

  事件总线可以使Android各组件之间的通信变得简单,而且可以解耦。

  其实RxJava实现事件总线和EventBus比较类似,他们都依据与观察者模式。

  个人比较习惯用RxJava来实现,因为非常简单而清晰。

  

1.2.当然EventBus实现总线的方式也有很多人用。

  这里给个传送门==>EventBus的github地址:https://github.com/greenrobot/EventBus

  然后Otto实现总线也不错==>Otto的github地址:https://github.com/square/otto

1.3.使用RxJava的好处以及注意点

  最明显的好处就是:项目体积缩小了。

  注意:使用RxLifecycle来解决RxJava内存泄漏的问题。

  ==>参考我的另一篇博客:RxLifecycle第三方库的使用。

  

1.4.理解一下观察者模式。

  这是一种行为模式。

  当你的类或者主对象(称为被观察者)的状态发生改变就会通知所有对此感兴趣的类或对象(称为观察者)。

  详情了解请参考这篇文章:观察者模式--千军万马穿云箭。

1.5.理解一下发布/订阅

  发布/订阅 模式的功能和观察者模式是一样的,都是完成特定事件发生后的消息通知。

  观察者模式和发布/订阅模式之间还是存在了一些差别,在发布/订阅模式中重点是发布消息,然后由调度中心

  统一调度,不需要知道具体有哪些订阅者。(这样就可以匿名)

为什么要匿名?
在计算机程序设计中有一个非常棒的思想叫“解耦”。你通常希望在你的设计中保持尽可能低的耦合度。
通常情况下,你希望消息发布商能够直接了解所有需要接收消息的订阅者,这样,一旦“事件”或消息准备好就可以及时通知每一个订阅者。但是使用事件总线,发布者可以免除这种职责并实现独立性,因为消息发布者和消息订阅者可以相互不知道对方,只关心对应的消息,从而接触两者之间的依赖关系

怎么实现匿名?
提到匿名,自然而然你就会问:你是如何真正实现发布者和订阅者之间的匿名? 很简单,只要找到一个中间人,让这个中间人负责两方的消息调度。事件总线就是一个这样的中间人。综上所述,事件总线就是这么简单。

1.6.使用RxJava实现事件总线的简单案例

  案例来源:用RxJava实现事件总线-RxBus。

  github参考案例地址:https://github.com/kaushikgopal/RxJava-Android-Samples

  如下面的例子:

  我们从顶部片段(绿色部分)发布事件,并从底部片段(蓝色部分)监听点击事件(通过事件总线)。

  

  怎么实现这个功能呢?

  第一步自定义一个事件总线 

public class RxBus {private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());public void send(Object o) {_bus.onNext(o);}public Observable<Object> toObserverable() {return _bus;}
}

  第二步将事件发布到总线中。

@OnClick(R.id.btn_demo_rxbus_tap)
public void onTapButtonClicked() {_rxBus.send(new TapEvent());
}

  第三步监听来自其他组件或服务事件

_rxBus.toObserverable().subscribe(new Action1<Object>() {@Overridepublic void call(Object event) {if(event instanceof TapEvent) {_showTapText();}else if(event instanceof SomeOtherEvent) {_doSomethingElse();}}});

1.7.本篇文章的参考文献

  Android RxJava实现RxBus。

  Android基于RxJava、RxAndroid的EventBus实现。

  用RxJava实现事件总线-RxBus。

2.封装好的总线类

2.1.RxJava1.x的总线实现方式

/*** desc   : 利用 PublishSubject的特性:与普通的Subject不同,在订阅时并不立即触发订阅事件,* 而是允许我们在任意时刻手动调用onNext(),onError(),onCompleted来触发事件。*/
public class RxBus {private ConcurrentHashMap<Object, List<Subject>> subjectMapper = new ConcurrentHashMap<>();private RxBus() {}private static class Holder {private static RxBus instance = new RxBus();}public static RxBus getInstance() {return Holder.instance;}public <T> Observable<T> register(@NonNull Class<T> clz) {return register(clz.getName());}public <T> Observable<T> register(@NonNull Object tag) {List<Subject> subjectList = subjectMapper.get(tag);if (null == subjectList) {subjectList = new ArrayList<>();subjectMapper.put(tag, subjectList);}Subject<T, T> subject = PublishSubject.create();subjectList.add(subject);//System.out.println("注册到rxbus");return subject;}public <T> void unregister(@NonNull Class<T> clz, @NonNull Observable observable) {unregister(clz.getName(), observable);}public void unregister(@NonNull Object tag, @NonNull Observable observable) {List<Subject> subjects = subjectMapper.get(tag);if (null != subjects) {subjects.remove(observable);if (subjects.isEmpty()) {subjectMapper.remove(tag);//System.out.println("从rxbus取消注册");
            }}}public void post(@NonNull Object content) {post(content.getClass().getName(), content);}public void post(@NonNull Object tag, @NonNull Object content) {List<Subject> subjects = subjectMapper.get(tag);if (!subjects.isEmpty()) {for (Subject subject: subjects) {subject.onNext(content);}}}
}

几个关键方法:
register —— 由tag,生成一个subject List,同时利用PublishSubject创建一个Subject并返回,它同时也是Observable的子类。
unregister —— 移除tag对应subject List 中的Observable。若subject List为空,也将被移除。
post —— 遍历tag对应subject List 中的Subject,执行onNext()。这里实际执行的是观察者Observer的onNext(),Subject的定义:public abstract class Subject<T, R> extends Observable<R> implements Observer<T>。

  测试代码:

/*
rxbus*/
Observable<String> observable = RxBus.getInstance().register(String.class);
observable.map(s -> {try {int v = Integer.valueOf(s);System.out.println("map变换成功, source = " + s);return v;} catch (Exception e) {System.out.println("map变换失败, source = " + s);return s;}
}).subscribe(value -> {System.out.println("订阅 " + value);
});RxBus.getInstance().post("888");
RxBus.getInstance().post("发发发");
RxBus.getInstance().unregister(String.class, observable);
//这里比较有意思的是,使用了lambda表达式。//在map变换时,如果将字符串转成Integer,没有问题就返回整型;//若报异常,就返回String型。//同样的,在最终订阅时,value参数的类型也是由map变换来决定的。

2.2.RxJava2.0总线实现类

  因为在RxJava2.0之后,io.reactivex.Observable中没有进行背压处理了。

  如果有大量消息堆积在总线中来不及处理会产生OutOfMemoryError。

  有新类io.reactivex.Flowable专门针对背压问题。

  无背压处理的Observable实现,跟RxJava1.0x中一样,使用PublishSubject来实现。

  要实现有背压的2.0x版,使用FlowableProcessor的子类PublishProcessor来产生Flowable。

  

  源代码如下:

public class RxBus {private final FlowableProcessor<Object> mBus;private RxBus() {mBus = PublishProcessor.create().toSerialized();}private static class Holder {private static RxBus instance = new RxBus();}public static RxBus getInstance() {return Holder.instance;}public void post(@NonNull Object obj) {mBus.onNext(obj);}public <T> Flowable<T> register(Class<T> clz) {return mBus.ofType(clz);}public void unregisterAll() {//会将所有由mBus 生成的 Flowable 都置  completed 状态  后续的 所有消息  都收不到了
        mBus.onComplete();}public boolean hasSubscribers() {return mBus.hasSubscribers();}}

  测试代码:

Flowable<Integer> f1 = RxBus.getInstance().register(Integer.class);
f1.subscribe(value -> System.out.println("订阅f1消息 .. " + value));
RxBus.getInstance().post(999);

3.实际项目调用方式

3.1.首先自定义一个RxBus。

  这个类感觉有点像工具类。和其他函数没有任何耦合关系。

  这个类见在上面2中封装好的RxBus类。

3.2.在BaseListFragment实现了LazyLoadFragment中的抽象函数。

  这里解释一下:

  BaseListFragment是一个可以刷新可以加载更多的一个碎片。

  LazyLoadFragment是一个懒加载的被BaseListFragmetn继承的一个基类。

  LazyLoadFragment通过判断是否可见的函数setUserVisibleHint执行了一个抽象函数fetchData()。

  adapter是页面内容的一个适配器。

  然后在BaseListFragment中重写这个抽象函数。

 @Overridepublic void fetchData() {observable = RxBus.getInstance().register(BaseListFragment.TAG);observable.subscribe(new Consumer<Integer>() {@Overridepublic void accept(@NonNull Integer integer) throws Exception {adapter.notifyDataSetChanged();}});}

  observable.subscribe(new Consumer<Integer>)返回的是一个Disposable类型。

  如下面Disposable的简单使用方式。

 Disposable disposable = observable.subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {//这里接收数据项
            }}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {//这里接收onError
            }}, new Action() {@Overridepublic void run() throws Exception {//这里接收onComplete。
            }});

3.3.小贴士

  RxBus的注册与反注册一定要对应出现。

  一般在活动或者Fragment中的onStart中register这个活动或者片段的TAG(也就是一个唯一标识字符串)。

  一般在活动或者Fragment中的onDestroy中ungister这个活动或者片段的TAG。

  post用于传递消息,看情况调用呗。

  

转载于:https://www.cnblogs.com/Jason-Jan/p/8017412.html

Android 使用RxJava实现一个发布/订阅事件总线相关推荐

  1. 简单的写一个发布订阅器

    发布-订阅模式在开发中的应用其实是很广泛的,比如大家都知道的 Vue,使用数据驱动模板的改变,将我们的双手从繁琐的 dom 操作中解放出来,稍微懂一些原理的同学们都知道,其双向数据绑定就是通过数据劫持 ...

  2. 在Android手机上运行OpenDDS发布订阅app

    在Android手机(我测试的是华为手机)上,运行OpenDDS的发布订阅apk程序,实现本机和相邻手机的数据传输,基本步骤如下 1)选择合适的OpenDDS和ACE版本,编译出Arm64-v8a的交 ...

  3. Android自定义事件总线,手写Android事件总线框架Eventbus(简易版)

    开篇废话 近期利用业余时间,跟着大神把Eventbus的框架学习了一下,在这里,记录一下这次学习的心得. EventBus是针一款对Android的发布/订阅事件总线.它可以让我们很轻松的实现在And ...

  4. Android事件总线设计(一)- EventBus初步学习

    Android事件总线设计(一) 我们通常在进行安卓设计的时候,不同的Activity,Fragment,Service等之间通讯一直是个不小的问题,用Intent以及Handler都觉得有一点麻烦, ...

  5. Android事件总线(一)EventBus3.0用法全解析

    相关文章 Android事件总线(一)EventBus3.0用法全解析 Android事件总线(二)EventBus3.0源码解析 Android事件总线(三)otto用法全解析 Android事件总 ...

  6. Android事件总线——EventBus的使用

    前言 首先我们来说下事件总线,它的作用:为了更简化并更高质量的在Activity,Fragment,Thread和Service等之间的通信,解决组件之间高耦合的同时仍能进行高效的通信. 什么是Eve ...

  7. 手把手教你学Dapr - 6. 发布订阅

    介绍 发布/订阅模式允许微服务使用消息相互通信.生产者或发布者在不知道哪个应用程序将接收它们的情况下向主题发送消息.这涉及将它们写入输入通道.同样,消费者或订阅者订阅该主题并接收其消息,而不知道是什么 ...

  8. redis java 发布订阅_Redis之发布订阅(Java)

    上一章节我们已经学会了Redis在Java项目里面最基本的应用,我们这一章节来讲一讲Redis里面一个非常重要的功能:发布订阅 发布订阅(Pub/Sub):目前广泛使用的通信模型,它采用事件作为基本的 ...

  9. 从东京奥运会看js设计模式之发布订阅模式

    开篇废话:本篇文章介绍发布-订阅模式,想必很多人听说过有一种观察者模式,网上既有资料说这是两种不同的设计模式,也有说这是一种模式,我倾向于认同他们是同一种设计模式.不必过于纠结 开篇楔子:东京奥运会已 ...

  10. JavaScript设计模式 -发布订阅者模式

    1. 定义 发布订阅者模式又叫观察者模式,他定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得将得到通知 案例: 小明最近看上了一套房子,到了售楼处之后才被告知,该楼 ...

最新文章

  1. [重磅] 让HTML5达到原生的体验 系列之中的一个 避免切页白屏
  2. [转]C++ 使用Makefile文件
  3. 微软的PivotViewer控件编程续
  4. Java10 新特性
  5. 你确定你会使用git commit?
  6. docker~aspnetcore2.0镜像缺少libgdiplus问题
  7. 华为机试HJ44:Sudoku(数独问题,深度优先遍历DFS解法)
  8. python代码-让你的Python代码实现类型提示功能
  9. python3的文件编码问题
  10. process_创建进程
  11. 【计量经济学】固定效应、随机效应、相关随机效应
  12. 设计模式学习之—我是一个粉刷匠(装饰模式)
  13. 设定个人发展目标和计划
  14. webassembly学习-modules
  15. 2021-2027全球与中国USB智能电源板市场现状及未来发展趋势
  16. 机器学习常用算法的优缺点总结
  17. HDU4847-Wow! Such Doge!
  18. 毕业设计工作内容和进度
  19. 论文浅尝 | DI刊发的那些有关Knowledge Graph的论文
  20. C程序|实现使用OPENSSL库 发送HTTPS请求,并接收数据|例如请求12306获取高铁、动车、火车车次信息的方法

热门文章

  1. ROS机器人程序设计(原书第2版)2.4.1 ROS文件系统导览
  2. Redhat7.2下编译rpm包的形式安装openvswitch
  3. CentOS_6.x安装VNC_Server
  4. 经典:从追MM谈Java的23种设计模式
  5. 数据结构利器之私房STL
  6. 《Two Dozen Short Lessons in Haskell》学习(三)
  7. [转] 数据挖掘 机器学习 模式识别的关系
  8. 在主函数中输入10个等长的字符串。用另一函数对他们排序。
  9. fedora 11 下分析系统性能瓶颈之(一)mpstat
  10. 4G模块使用记录SIMCOM7070