深入分析RxJava的Disposable及其工作原理
良好的编码习惯告诉我们,任何基于订阅者模式代码,都要注意为注册与注销的配对出现,以避免泄露都问题
Disposable
RxJava通过Disposable
(RxJava1中是Subscription
)在适当的时机取消订阅、停止数据流的发射。这在Android等具有Lifecycle概念的场景中非常重要,避免造成一些不必要bug以及对象泄露。
private CompositeDisposable compositeDisposable =new CompositeDisposable();
@Override public void onCreate() {compositeDisposable.add(backendApi.loadUser().subscribe(this::displayUser, this::handleError));
}
@Override public void onDestroy() {compositeDisposable.clear();
}
上例是Android中的常见写法:使用CompositeDisposable
收集所有的Disposable,而后在onDestroy中统一释放。
Disposable作为一个接口,clear最终调用的是各个Disposable的dispose方法:
public interface Disposable {void dispose();boolean isDisposed();
}
当然,也出现了一些可以帮助我们自动dispose的框架以较少模板代码。例如RxLifecycle
、uber的AutoDispose
等。本文旨在介绍Disposable的基本工作原理,对这些三方库有兴趣的同学请去github自行学习。
dispose方法实现
Disposable disposable = Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {for (int i = 1; i <= 3; i++) {observableEmitter.onNext(i);}}).takeUntil(integer -> integer < 3).subscribe();
对于以上代码,当调用disposable.dispose();
时,代码是如何运行的呢?
Disposable是一个Observer
当调用Observable.subscribe() / subscribe(...)
后返回的Disposable对象,是本质是一个LambdaObserver
对象
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,@NonNull Action onComplete) {LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());subscribe(ls);return ls; //作为Disposable返回
}
LambdaObserver及众多接口于一身
public final class LambdaObserver<T> extends AtomicReference<Disposable>implements Observer<T>, Disposable
- 首先,作为一个Observer,被
subscribe()
后,通过onNext
等向下游发射数据; - 其次,作为Disposable对外提供
dispose
方法; - 最后,作为
AtomicReference
接口,持有一个Disposable的value
@Override
public void dispose() {DisposableHelper.dispose(this);
}public static boolean dispose(AtomicReference<Disposable> field) {Disposable current = field.get();Disposable d = DISPOSED;if (current != d) {current = field.getAndSet(d);if (current != d) {if (current != null) {current.dispose();}return true;}}return false;
}
如上,调用dispose后,做两件事:
- 获取
AtomicReference
当前value,调用其dispose - 将当前value设为
DISPOSED
onSubscribe中传递Disposable
LambdaObserver中AtomicReference的value是在Observer.onSubscribe
中设置的:
@Override
public void onSubscribe(Disposable d) {if (DisposableHelper.setOnce(this, d)) { //设置valuetry {onSubscribe.accept(this);} catch (Throwable ex) {...}}
}
那么Observer.onSubscribe是什么时候被调用的呢?
RxJava的各种操作符都是Observable的实现。操作符链式调用的本质就是创建Observable和Observer,并通过subscribe关联。
subscribe内部最终都会调用到subscribeActual,这是每个操作符都必须实现的方法。
create在subscribeActual中,调用Observer.onSubscrie()
,将当前的Disposable(前文说过其实就是当前Observer)作为parent传递给下游
protected void subscribeActual(Observer<? super T> observer) {CreateEmitter<T> parent = new CreateEmitter<>(observer); // CreateEmitter是一个Diposableobserver.onSubscribe(parent); // Observer.onSubscrie()try {source.subscribe(parent);} catch (Throwable ex) {...}
}
Observer中关联上下游
除create、subscribe这样的终端操作符以外,大部分的操作符的Observer结构如下:
@Override/** The downstream subscriber. */
protected final Observer<? super R> downstream;/** The upstream subscription. */
protected Disposable upstream;public final void onSubscribe(Disposable d) {...this.upstream = d;downstream.onSubscribe(this);...
}public void dispose() {upstream.dispose();
}
- Observer会持有上下游对象:upstream和downstream
- onSubscribe向下递归调用
- dipose向上递归调用
在链式订阅中,向下游订阅Observer的同时,也关联了上游的Disposable(Observer)
我们在最下端调用subscribe时,各个Observer会建立上下游关联,当我们在下游调用dispose时,最终会递归调用到顶端(create)的dispose
再看takeUntil的例子
根据以上分析,我们会回到最初takeUntil的例子。前面说过所有的操作符都是Observable的实现:
- takeUntil 对应
ObservableTakeUntilPredicate
; - create 对应
ObservableCreate
ObservableCreate.subscribeActual() // Send 1, 2, 3 down↑
ObservableCreate.subscribe(TakeUntilPredicateObserver)↑
ObservableTakeUntilPredicate.subscribeActual()↑
ObservableTakeUntilPredicate.subscribe()
通过subscribeActual -> onSubscribe的调用,Disposable也有了如下引用链:
CreateEmitter↑
TakeUntilPredicateObserver↑
LambdaObserver
当我们针对终端的LambdaObserver调用dispose方法时,通过引用链递会最终调用到CreateEmitter的dispose。
CreateEmitter跟LambdaObserver一样,也会将AtomicReference的value设为DISPOSED
public void dispose() {DisposableHelper.dispose(this);
}
之后,在onNext中判断状态,当为DISPOSED时,不再发射数据流
@Override
public void onNext(T t) {if (!isDisposed()) { //是否为DISPOSEDobserver.onNext(t);}
}
关于onComplete
通过如下test,可以发现当onComplete调用后会,会自动调用dispose。
@Test
public void testDisposed(){boolean isDisposed = false;TestObserver<Integer> to = Observable.<Integer>create(subscriber -> {subscriber.setDisposable(new Disposable() {@Overridepublic boolean isDisposed() {return isDisposed;}@Overridepublic void dispose() {isDisposed = true;}});subscriber.onComplete();}).test();to.assertComplete();assertTrue(isDisposed);
}
ObservableEmitter的onComplete中果然也调用了dispose:
public void onComplete() {if (!isDisposed()) {try {observer.onComplete();} finally {dispose();}}}
关于内存泄漏
调用dipose确实可以结束流的发射,但是不等于没有内存泄露。
查看ObservableCreate的源码可以知道,dispose只是只是简单的设置了DISPOSED状态,Observe中关联的上下游对象并没有释放。所以当订阅了一个静态的Observable时,无法避免内存泄漏。
但是当订阅一个Subject时,dispose确实可以有效释放对象,避免内存泄漏:
public void dispose() {if (super.tryDispose()) {parent.remove(this); //对象删除}
}
关于实时性
前面分析知道,对于终端操作符create、subscribe等,其Observer在dispose时会标记当前状态为DISPOSED
。但对于其他操作符,其Observer的dispose只是递归向上调用dispose,对正在处理中的数据不会拦截。
调用dispose后,RxJava数据流不一定会立即停止,大部分操作符在调用dispose后,数据依然会发射给下游
关于dispose的实时性测试,下文可供参考
https://medium.com/stepstone-tech/the-curious-case-of-rxjava-disposables-e64ff8a06879
深入分析RxJava的Disposable及其工作原理相关推荐
- RxJava的Disposable及其工作原理
一.关于 Disposable 任何订阅者模式的代码,都需要注意注册与注销的配对出现,否则会出现内存泄漏.RxJava2 提供了 Disposable( RxJava1 中是 Subscription ...
- java servlet原理_Java Web 深入分析(8) Servlet工作原理解析
Servlet Servlet(Server Applet)是Java Servlet的简称,称为小服务程序或服务连接器,用Java编写的服务器端程序,主要功能在于交互式地浏览和修改数据,生成动态We ...
- 分布式系统熔断机制的工作原理
目录 前言 1. 问题产生 2. 形成过程 2. 服务熔断和服务降级区别 2.1 服务降级:系统有限的资源的合理协调 2.2 服务熔断:应对雪崩效应的链路自我保护机制.可看作降级的特殊情况 2.3 服 ...
- 深入分析 Java I/O 的工作机制
深入分析 Java I/O 的工作机制 I/O 问题是任何编程语言都无法回避的问题,可以说 I/O 问题是整个人机交互的核心问题,因为 I/O 是机器获取和交换信息的主要渠道.在当今这个数据大爆炸时代 ...
- RxJava RxPermissions 动态权限 简介 原理 案例 MD
Markdown版本笔记 我的GitHub首页 我的博客 我的微信 我的邮箱 MyAndroidBlogs baiqiantao baiqiantao bqt20094 baiqiantao@sina ...
- 深入分析 Java I/O 的工作机制--转载
Java 的 I/O 类库的基本架构 I/O 问题是任何编程语言都无法回避的问题,可以说 I/O 问题是整个人机交互的核心问题,因为 I/O 是机器获取和交换信息的主要渠道.在当今这个数据大爆炸时代, ...
- 面试:Handler 的工作原理是怎样的?
面试场景 平时开发用到其他线程吗?都是如何处理的? 基本都用 RxJava 的线程调度切换,嗯对,就是那个 observeOn 和 subscribeOn 可以直接处理,比如网络操作,RxJava 提 ...
- ARP协议在同网段及跨网段下的工作原理
一.ARP在同个网段下的工作原理 首先,每台主机都会在自己的ARP缓冲区中建立一个 ARP列表,以表示IP地址和MAC地址的对应关系.当源主机需要将一个数据包要发送到目的主机时,会首先检查自己 ARP ...
- gns3中两个路由器分别连接主机然后分析ip数据转发报文arp协议_ARP协议在同网段及跨网段下的工作原理...
前言:ARP协议是在链路层通讯中,已知目标IP地址,但是,仅有IP 地址是不够的,因为IP数据报必须封装成帧才能通过数据链路进行发送,而数据帧必须要有目的MAC地址,每一个网络设备在数据封装前都需要获 ...
最新文章
- Java进阶之自动拆箱与自动装箱
- 2017.5.5-afternoon
- WinCE开机Logo的实现(USB下载图片到nandflash)
- 《系统集成项目管理工程师》必背100个知识点-74CMO的具体工作
- 【机器学习】机器学习必知概念
- 如何检测ARP病毒,arp病毒怎么解决?
- HTTP/2 流量调试
- base——JRE和JDK的区别【转】
- finder怎么才能找到library
- Qt——P10 自定义的信号和槽
- mysql批量更新报错_Mysql批量更新的三种方式
- 基于matlab的光伏电池通用数学模型,基于MATLAB的光伏电池通用数学模型.doc
- linux 复制文件_使用 rsync 复制大文件的一些误解 | Linux 中国
- mysql测评作业指导书_测评作业指导书
- 想考华为HCIA,但不知道选择什么方向,点进来~
- 【重磅整理】180篇NeurIPS2020顶会《强化学习领域》Accept论文大全
- TensorFlow 之 slim(TF-Slim)介绍
- 如何用计算机接收光纤无线电视,无线路由器怎么连接智能电视机攻略
- django3 分布式路由、应用以及模型
- 大学计算机基础第四章ppt,大学计算机基础第四章.ppt
热门文章
- 【印象笔记】绕开bug的小技巧
- Spring 注解 hibernate 实体方法 property name=packagesToScan value=com.sise.domain/
- python判断n是否为完全数_判断一个数是否为完全数
- 【c语言】和【Java】版本的猜数字小游戏
- 小韦老师@神犇营-my1088-麻将游戏
- 什么原因导致Chrome又被批评
- OpenVINO系列02-深度学习部署工具包(DLDT)介绍
- nginx配置域名启用http2协议
- jquery网页在线流程图
- 从事游戏美术行业,游戏建模和游戏原画间怎么选择?