Rxjava 源码系列目录

  1. Rxjava源码分析之IO.Reactivex.Observer
  2. Rxjava源码分析之IO.Reactivex.CompositeDisposable
  3. Rxjava源码分析之IO.Reactivex.Observable

CompositeDisposable源码分析

  • 前言
  • 主要方法
  • 注意事项
  • 源码
  • 总结

博客创建时间:2020.04.26
博客更新时间:2021.04.12

以Android studio build=4.1.3,gradle=6.5,SdkVersion 30来分析讲解。如图文和网上其他资料不一致,可能是别的资料版本较低而已


前言

该处源码分析是基于Rxjava3而非Rxjava2,两者有细微的差别。

RxJava容易造成内存泄漏,在某些情况下没有及时的取消订阅会导致内存泄漏。CompositeDisposable就是专用于Disposable管理的。

CompositeDisposable是一个Disposable容器,它提供了多种算法时间复杂度为O(1)的方法,如add、remove、delete等方法。实质是内部有一个OpenHashSet实例resources专用于添加移除管理Disposable。


主要方法

1. dispose()
resources置空,并且为逐个Dispose在resources中的每一个Disposable

2.add(@NonNull Disposable disposable)和addAll(@NonNull Disposable disposable)
添加Disposable或其集合到resources中,如果CompositeDisposable已经disposed,则添加的Disposable会直接dispose取消订阅,无法执行Disposable中的任务。

3. remove(@NonNull Disposable disposable)
remove方法实质调用了delete,且多了一步 disposable.dispose()。remove即从容器resources中不但remove,且执行完毕后该 disposable取消与订阅者的连接关系dispose()

4. delete(@NonNull Disposable disposable)
删除resources中的该disposable

5. clear()
dispose容器resources中所包含的所有Disposable


注意事项

  1. 该类中所有需要传Disposable的参数都不能为null,否则抛出NullPointerException
  2. 一般在OnCreate中创建CompositeDisposable实例,在onDestory方法中进行clear()操作
  3. 为了方便disposable 的管理,可以在Observer.onSubscribe(Disposable d)中将Disposable的实例加入CompositeDisposable中
     .subscribe(new Observer() {@Overridepublic void onSubscribe(Disposable d) {if(d!=null){compositeDisposable.add(d);}}

源码

public class IO_Reactivex_CompositeDisposable implements Disposable, DisposableContainer {/*** 存放 disposable 的 set*/private OpenHashSet<Disposable> resources;/*** 是否已经处理过*/private volatile boolean disposed;/*** Creates an empty {@code CompositeDisposable}.*/public IO_Reactivex_CompositeDisposable() {}/*** Creates a {@code CompositeDisposable} with the given array of initial {@link Disposable} elements.* 创建 一个CompositeDisposable 通过给定的 初始元素数组。** @param disposables the array of {@code Disposable}s to start with    起始的Disposables* @throws NullPointerException if {@code disposables} or any of its array items is {@code null} 如果存在一个disposable,则抛空异常*/public IO_Reactivex_CompositeDisposable(@NonNull Disposable... disposables) {Objects.requireNonNull(disposables, "disposables is null");this.resources = new OpenHashSet<>(disposables.length + 1);for (Disposable d : disposables) {Objects.requireNonNull(d, "A Disposable in the disposables array is null");this.resources.add(d);}}/*** 创建 一个CompositeDisposable 通过给定的初始化元素 Iterable 序列* Creates a {@code CompositeDisposable} with the given {@link Iterable} sequence of initial {@link Disposable} elements.** @param disposables the {@code Iterable} sequence of {@code Disposable} to start with   初始参数* @throws NullPointerException if {@code disposables} or any of its items is {@code null}   存在null 抛异常*/public IO_Reactivex_CompositeDisposable(@NonNull Iterable<? extends Disposable> disposables) {Objects.requireNonNull(disposables, "disposables is null");this.resources = new OpenHashSet<>();for (Disposable d : disposables) {Objects.requireNonNull(d, "A Disposable item in the disposables sequence is null");this.resources.add(d);}}@Overridepublic void dispose() {//已经处理过,不在调用if (disposed) {return;}OpenHashSet<Disposable> set;// 前面添加的OpenHashSet<Disposable> 清空,置nullsynchronized (this) {if (disposed) {return;}disposed = true;set = resources;resources = null;}dispose(set);}@Overridepublic boolean isDisposed() {return disposed;}//TODO CompositeDisposable 的初始化,add 等方法中传入参数Disposable 或其数组都不可为null,否则抛空异常/*** TODO 添加一个Disposable 到 container容器中 ,如果container已经disposed,此时返回false且disposable.dispose();* Adds a {@link Disposable} to this container or disposes it if the* container has been disposed.** @param disposable the {@code Disposable} to add, not {@code null}* @return {@code true} if successful, {@code false} if this container has been disposed* @throws NullPointerException if {@code disposable} is {@code null}*/@Overridepublic boolean add(@NonNull Disposable disposable) {Objects.requireNonNull(disposable, "disposable is null");if (!disposed) {synchronized (this) {if (!disposed) {OpenHashSet<Disposable> set = resources;// 刚刚初始化的CompositeDisposable,此时resources=nullif (set == null) {set = new OpenHashSet<>();resources = set;}set.add(disposable);return true;}}}disposable.dispose();return false;}/*** 一次添加一个Disposable数组成功则返回true,如果容器disposed,则Disposable数组批量dispose此时返回false* Atomically adds the given array of {@link Disposable}s to the container or* disposes them all if the container has been disposed.** @param disposables the array of {@code Disposable}s* @return {@code true} if the operation was successful, {@code false} if the container has been disposed* @throws NullPointerException if {@code disposables} or any of its array items is {@code null}*/public boolean addAll(@NonNull Disposable... disposables) {Objects.requireNonNull(disposables, "disposables is null");if (!disposed) {synchronized (this) {if (!disposed) {OpenHashSet<Disposable> set = resources;if (set == null) {set = new OpenHashSet<>(disposables.length + 1);resources = set;}for (Disposable d : disposables) {Objects.requireNonNull(d, "A Disposable in the disposables array is null");set.add(d);}return true;}}}for (Disposable d : disposables) {d.dispose();}return false;}/*** TODO 如果disposable是resources中的一个,则从resources中移除,并且将该Disposable.dispose* Removes and disposes the given {@link Disposable} if it is part of this container.** @param disposable the disposable to remove and dispose, not {@code null}* @return {@code true} if the operation was successful* @throws NullPointerException if {@code disposable} is {@code null}*/@Overridepublic boolean remove(@NonNull Disposable disposable) {if (delete(disposable)) {disposable.dispose();return true;}return false;}/*** TODO 从resources中移除Disposable,但不dispose。 如果resources=null,* TODO 或者resources.remove返回false,则该方法返回false* Removes (but does not dispose) the given {@link Disposable} if it is part of this container.** @param disposable the disposable to remove, not {@code null}* @return {@code true} if the operation was successful* @throws NullPointerException if {@code disposable} is {@code null}*/@Overridepublic boolean delete(@NonNull Disposable disposable) {Objects.requireNonNull(disposable, "disposable is null");if (disposed) {return false;}synchronized (this) {if (disposed) {return false;}OpenHashSet<Disposable> set = resources;if (set == null || !set.remove(disposable)) {return false;}}return true;}/*** 原子性的清理container, 然后dispose 前面所有包含的Disposables* Atomically clears the container, then disposes all the previously contained {@link Disposable}s.*/public void clear() {if (disposed) {return;}OpenHashSet<Disposable> set;synchronized (this) {if (disposed) {return;}set = resources;resources = null;}dispose(set);}/*** 返回 resources中的count* Returns the number of currently held {@link Disposable}s.** @return the number of currently held {@code Disposable}s*/public int size() {if (disposed) {return 0;}synchronized (this) {if (disposed) {return 0;}OpenHashSet<Disposable> set = resources;return set != null ? set.size() : 0;}}/*** TODO Dispose OpenHashSet中的元素 通过非致命的方式最后。  如果该过程中产生了异常,则抛出异常。* Dispose the contents of the {@link OpenHashSet} by suppressing non-fatal* {@link Throwable}s till the end.** @param set the {@code OpenHashSet} to dispose elements of*/private void dispose(@Nullable OpenHashSet<Disposable> set) {if (set == null) {return;}List<Throwable> errors = null;Object[] array = set.keys();for (Object o : array) {if (o instanceof Disposable) {try {((Disposable) o).dispose();} catch (Throwable ex) {Exceptions.throwIfFatal(ex);if (errors == null) {errors = new ArrayList<>();}errors.add(ex);}}}if (errors != null) {if (errors.size() == 1) {throw ExceptionHelper.wrapOrThrow(errors.get(0));}throw new CompositeException(errors);}}
}

总结

CompositeDisposable的存在是为了解决Rxjava耗时操作的内存泄漏,用好它非常重要。

本测试Demo源码
gitee:https://gitee.com/luofaxin/RxJava3Analysis.git
github:https://github.com/l424533553/RxJava3Analysis.git


相关链接

  1. Rxjava源码分析之IO.Reactivex.Observer
  2. Rxjava源码分析之IO.Reactivex.CompositeDisposable
  3. Rxjava源码分析之IO.Reactivex.Observable

扩展链接:

  1. 最通俗易懂的教你使用RxJava3(一)
  2. 最通俗易懂的教你使用RxJava3(二)
  3. 最通俗易懂的教你使用RxJava3(三)

扩展训练:

  1. CompositeDisposable几个常用方法的使用
  2. CompositeDisposable存在的意义

博客书写不易,您的点赞收藏是我前进的动力,千万别忘记点赞、 收藏 ^ _ ^ !

Rxjava源码分析之IO.Reactivex.CompositeDisposable相关推荐

  1. Rxjava源码分析之IO.Reactivex.Observable

    Rxjava 源码系列目录 Rxjava源码分析之IO.Reactivex.Observer Rxjava源码分析之IO.Reactivex.CompositeDisposable Rxjava源码分 ...

  2. Rxjava源码分析之IO.Reactivex.Observer

    Android 中的观察者模式,Rxjava中有两个重要的类Observable和Observer,函数响应式编程具体表现为一个观察者(Observer)订阅一个可观察对象(Observable).通 ...

  3. 10章 RxJava源码分析

    本篇文章已授权微信公众号 YYGeeker 独家发布转载请标明出处 CSDN学院课程地址 RxJava2从入门到精通-初级篇:https://edu.csdn.net/course/detail/10 ...

  4. spring源码分析-core.io包里面的类

    前些日子看<深入理解javaweb开发>时,看到第一章java的io流,发觉自己对io流真的不是很熟悉.然后看了下JDK1.7中io包的一点点代码,又看了org.springframewo ...

  5. nginx源码分析之IO多路复用流程

    一.             主流程 几乎所有的服务器程序的工作模式都是: (1)      初始化一些参数: (2)      开启监听socket: (3)      在主线程的死循环(一般都是死 ...

  6. caffe源码分析-layer

    本文主要分析caffe layer层,主要内容如下: 从整体上说明下caffe的layer层的类别,以及作用 通过proto定义与类Layer简要说明下Layer的核心成员变量; Layer类的核心成 ...

  7. RxJava从入门到精通:RxJava源码初步分析

    Rxjava 源码学习(一):基本流程分析 - 知乎Rxjava 源码版本:Rxjava2.2.8 1. Rxjava 的基本实现首先看一下最简单的例子,具体查看其内部实现: 通过以下代码查看 Rxj ...

  8. Thrift异步IO服务器源码分析

    http://yanyiwu.com/work/2014/12/06/thrift-tnonblockingserver-analysis.html 最近在使用 libevent 开发项目,想起之前写 ...

  9. 多路复用IO: select、sys_select、do_select源码分析

    <srsLTE源码学习:逻辑信道多路复用与MAC CE分解pdu_queue.h,pdu_queue.cc> <select用法> <从select函数谈及系统调用原理& ...

最新文章

  1. php使用NuSoap产生webservice结合WSDL让asp.net调用
  2. 有空间感的图片环形滚动代码
  3. vagrant --- vagrant部署环境
  4. Java刷题知识点之方法覆盖(方法重写)和方法重载的区别
  5. 【ARM】Tiny4412裸板编程之MMU(页 4K)
  6. JPA多条件复杂SQL动态分页查询
  7. JAVA对时间的几个处理小方法
  8. 一加7T Pro最新渲染图曝光:背部有小改动
  9. python把list的所有元素生成排列和组合
  10. XCode插件因为升级不能用了怎么办?几个步骤教你搞定
  11. 谈谈以前那位研发总监错在哪里
  12. julia的几种画图方法
  13. 独家首发强大的个性生成工具箱微信小程序源码,超多功能的合成
  14. 没有了耳机接口,怎么让手机同时支持充电、听歌?USB-C音频方案了解一下
  15. 【我想对策划说的事】-- 入职dy一年后被邀请召开的扯淡分享会讲稿
  16. 利用Kmeans聚类进行用户分层分析
  17. 平板电脑android系统,平板电脑是什么系统
  18. 计算机配置太低,安装Win10系统电脑配置太低怎么办
  19. 剑指offer第62题 圆圈中最后剩下的数字(约瑟夫问题)
  20. 实用 | Mybatis事务管理

热门文章

  1. 背记不如实战系列-javaGUI实例-计算器制作
  2. 机车出入库相关、调车转线、、后期杂谈
  3. 使用R语言自带的茑尾花(iris)数据集,绘制鸢尾花的萼片的长度和宽度的散点图并添加不同品种花萼长度与花萼宽度的回归直线。不同的品种用不同的颜色,x轴为花萼长度
  4. 饼状图环形图数据信息PR图形模板MOGRT
  5. java软件工程师就业招聘信息_Java软件工程师就业前景为什么这么好呢?
  6. 中国联通研究院发力开源 取得互联网化核心技术能力新突破
  7. 识别计算机硬件实训,计算机硬件及组装实训报告工作报告_1
  8. 【笔记】python的传递实参:位置实参、关键字实参、默认值、等效的函数调用、避免实参错误
  9. android gc由QQ空间团队奉献
  10. arcgis server10.5将https改为http,6443改为6080默认端口