良好的编码习惯告诉我们,任何基于订阅者模式代码,都要注意为注册与注销的配对出现,以避免泄露都问题

Disposable


RxJava通过Disposable(RxJava1中是Subscription)在适当的时机取消订阅、停止数据流的发射。这在Android等具有Lifecycle概念的场景中非常重要,避免造成一些不必要bug以及对象泄露。

private CompositeDisposable compositeDisposable =new CompositeDisposable();
@Override public void onCreate() {compositeDisposable.add(backendApi.loadUser().subscribe(this::displayUser, this::handleError));
}
@Override public void onDestroy() {compositeDisposable.clear();
}

上例是Android中的常见写法:使用CompositeDisposable收集所有的Disposable,而后在onDestroy中统一释放。

Disposable作为一个接口,clear最终调用的是各个Disposable的dispose方法:

public interface Disposable {void dispose();boolean isDisposed();
}

当然,也出现了一些可以帮助我们自动dispose的框架以较少模板代码。例如RxLifecycle、uber的AutoDispose等。本文旨在介绍Disposable的基本工作原理,对这些三方库有兴趣的同学请去github自行学习。

dispose方法实现


Disposable disposable = Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {for (int i = 1; i <= 3; i++) {observableEmitter.onNext(i);}}).takeUntil(integer -> integer < 3).subscribe();

对于以上代码,当调用disposable.dispose();时,代码是如何运行的呢?

Disposable是一个Observer

当调用Observable.subscribe() / subscribe(...)后返回的Disposable对象,是本质是一个LambdaObserver对象

public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,@NonNull Action onComplete) {LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());subscribe(ls);return ls; //作为Disposable返回
}

LambdaObserver及众多接口于一身

public final class LambdaObserver<T> extends AtomicReference<Disposable>implements Observer<T>, Disposable
  • 首先,作为一个Observer,被subscribe()后,通过onNext等向下游发射数据;
  • 其次,作为Disposable对外提供dispose方法;
  • 最后,作为AtomicReference接口,持有一个Disposable的value
@Override
public void dispose() {DisposableHelper.dispose(this);
}public static boolean dispose(AtomicReference<Disposable> field) {Disposable current = field.get();Disposable d = DISPOSED;if (current != d) {current = field.getAndSet(d);if (current != d) {if (current != null) {current.dispose();}return true;}}return false;
}

如上,调用dispose后,做两件事:

  • 获取AtomicReference当前value,调用其dispose
  • 将当前value设为DISPOSED

onSubscribe中传递Disposable

LambdaObserver中AtomicReference的value是在Observer.onSubscribe中设置的:

@Override
public void onSubscribe(Disposable d) {if (DisposableHelper.setOnce(this, d)) { //设置valuetry {onSubscribe.accept(this);} catch (Throwable ex) {...}}
}

那么Observer.onSubscribe是什么时候被调用的呢?

RxJava的各种操作符都是Observable的实现。操作符链式调用的本质就是创建Observable和Observer,并通过subscribe关联。
subscribe内部最终都会调用到subscribeActual,这是每个操作符都必须实现的方法。

create在subscribeActual中,调用Observer.onSubscrie(),将当前的Disposable(前文说过其实就是当前Observer)作为parent传递给下游

protected void subscribeActual(Observer<? super T> observer) {CreateEmitter<T> parent = new CreateEmitter<>(observer); // CreateEmitter是一个Diposableobserver.onSubscribe(parent); // Observer.onSubscrie()try {source.subscribe(parent);} catch (Throwable ex) {...}
}

Observer中关联上下游

除create、subscribe这样的终端操作符以外,大部分的操作符的Observer结构如下:

@Override/** The downstream subscriber. */
protected final Observer<? super R> downstream;/** The upstream subscription. */
protected Disposable upstream;public final void onSubscribe(Disposable d) {...this.upstream = d;downstream.onSubscribe(this);...
}public void dispose() {upstream.dispose();
}
  • Observer会持有上下游对象:upstream和downstream
  • onSubscribe向下递归调用
  • dipose向上递归调用

在链式订阅中,向下游订阅Observer的同时,也关联了上游的Disposable(Observer)

我们在最下端调用subscribe时,各个Observer会建立上下游关联,当我们在下游调用dispose时,最终会递归调用到顶端(create)的dispose

再看takeUntil的例子

根据以上分析,我们会回到最初takeUntil的例子。前面说过所有的操作符都是Observable的实现:

  • takeUntil 对应 ObservableTakeUntilPredicate;
  • create 对应 ObservableCreate
ObservableCreate.subscribeActual() // Send 1, 2, 3 down↑
ObservableCreate.subscribe(TakeUntilPredicateObserver)↑
ObservableTakeUntilPredicate.subscribeActual()↑
ObservableTakeUntilPredicate.subscribe()

通过subscribeActual -> onSubscribe的调用,Disposable也有了如下引用链:

CreateEmitter↑
TakeUntilPredicateObserver↑
LambdaObserver

当我们针对终端的LambdaObserver调用dispose方法时,通过引用链递会最终调用到CreateEmitter的dispose。

CreateEmitter跟LambdaObserver一样,也会将AtomicReference的value设为DISPOSED

public void dispose() {DisposableHelper.dispose(this);
}

之后,在onNext中判断状态,当为DISPOSED时,不再发射数据流

@Override
public void onNext(T t) {if (!isDisposed()) { //是否为DISPOSEDobserver.onNext(t);}
}

关于onComplete


通过如下test,可以发现当onComplete调用后会,会自动调用dispose。

@Test
public void testDisposed(){boolean isDisposed = false;TestObserver<Integer> to = Observable.<Integer>create(subscriber -> {subscriber.setDisposable(new Disposable() {@Overridepublic boolean isDisposed() {return isDisposed;}@Overridepublic void dispose() {isDisposed = true;}});subscriber.onComplete();}).test();to.assertComplete();assertTrue(isDisposed);
}

ObservableEmitter的onComplete中果然也调用了dispose:

 public void onComplete() {if (!isDisposed()) {try {observer.onComplete();} finally {dispose();}}}

关于内存泄漏


调用dipose确实可以结束流的发射,但是不等于没有内存泄露。

查看ObservableCreate的源码可以知道,dispose只是只是简单的设置了DISPOSED状态,Observe中关联的上下游对象并没有释放。所以当订阅了一个静态的Observable时,无法避免内存泄漏。

但是当订阅一个Subject时,dispose确实可以有效释放对象,避免内存泄漏:

public void dispose() {if (super.tryDispose()) {parent.remove(this); //对象删除}
}

关于实时性


前面分析知道,对于终端操作符create、subscribe等,其Observer在dispose时会标记当前状态为DISPOSED。但对于其他操作符,其Observer的dispose只是递归向上调用dispose,对正在处理中的数据不会拦截。

调用dispose后,RxJava数据流不一定会立即停止,大部分操作符在调用dispose后,数据依然会发射给下游

关于dispose的实时性测试,下文可供参考
https://medium.com/stepstone-tech/the-curious-case-of-rxjava-disposables-e64ff8a06879

深入分析RxJava的Disposable及其工作原理相关推荐

  1. RxJava的Disposable及其工作原理

    一.关于 Disposable 任何订阅者模式的代码,都需要注意注册与注销的配对出现,否则会出现内存泄漏.RxJava2 提供了 Disposable( RxJava1 中是 Subscription ...

  2. java servlet原理_Java Web 深入分析(8) Servlet工作原理解析

    Servlet Servlet(Server Applet)是Java Servlet的简称,称为小服务程序或服务连接器,用Java编写的服务器端程序,主要功能在于交互式地浏览和修改数据,生成动态We ...

  3. 分布式系统熔断机制的工作原理

    目录 前言 1. 问题产生 2. 形成过程 2. 服务熔断和服务降级区别 2.1 服务降级:系统有限的资源的合理协调 2.2 服务熔断:应对雪崩效应的链路自我保护机制.可看作降级的特殊情况 2.3 服 ...

  4. 深入分析 Java I/O 的工作机制

    深入分析 Java I/O 的工作机制 I/O 问题是任何编程语言都无法回避的问题,可以说 I/O 问题是整个人机交互的核心问题,因为 I/O 是机器获取和交换信息的主要渠道.在当今这个数据大爆炸时代 ...

  5. RxJava RxPermissions 动态权限 简介 原理 案例 MD

    Markdown版本笔记 我的GitHub首页 我的博客 我的微信 我的邮箱 MyAndroidBlogs baiqiantao baiqiantao bqt20094 baiqiantao@sina ...

  6. 深入分析 Java I/O 的工作机制--转载

    Java 的 I/O 类库的基本架构 I/O 问题是任何编程语言都无法回避的问题,可以说 I/O 问题是整个人机交互的核心问题,因为 I/O 是机器获取和交换信息的主要渠道.在当今这个数据大爆炸时代, ...

  7. 面试:Handler 的工作原理是怎样的?

    面试场景 平时开发用到其他线程吗?都是如何处理的? 基本都用 RxJava 的线程调度切换,嗯对,就是那个 observeOn 和 subscribeOn 可以直接处理,比如网络操作,RxJava 提 ...

  8. ARP协议在同网段及跨网段下的工作原理

    一.ARP在同个网段下的工作原理 首先,每台主机都会在自己的ARP缓冲区中建立一个 ARP列表,以表示IP地址和MAC地址的对应关系.当源主机需要将一个数据包要发送到目的主机时,会首先检查自己 ARP ...

  9. gns3中两个路由器分别连接主机然后分析ip数据转发报文arp协议_ARP协议在同网段及跨网段下的工作原理...

    前言:ARP协议是在链路层通讯中,已知目标IP地址,但是,仅有IP 地址是不够的,因为IP数据报必须封装成帧才能通过数据链路进行发送,而数据帧必须要有目的MAC地址,每一个网络设备在数据封装前都需要获 ...

最新文章

  1. Java进阶之自动拆箱与自动装箱
  2. 2017.5.5-afternoon
  3. WinCE开机Logo的实现(USB下载图片到nandflash)
  4. 《系统集成项目管理工程师》必背100个知识点-74CMO的具体工作
  5. 【机器学习】机器学习必知概念
  6. 如何检测ARP病毒,arp病毒怎么解决?
  7. HTTP/2 流量调试
  8. base——JRE和JDK的区别【转】
  9. finder怎么才能找到library
  10. Qt——P10 自定义的信号和槽
  11. mysql批量更新报错_Mysql批量更新的三种方式
  12. 基于matlab的光伏电池通用数学模型,基于MATLAB的光伏电池通用数学模型.doc
  13. linux 复制文件_使用 rsync 复制大文件的一些误解 | Linux 中国
  14. mysql测评作业指导书_测评作业指导书
  15. 想考华为HCIA,但不知道选择什么方向,点进来~
  16. 【重磅整理】180篇NeurIPS2020顶会《强化学习领域》Accept论文大全
  17. TensorFlow 之 slim(TF-Slim)介绍
  18. 如何用计算机接收光纤无线电视,无线路由器怎么连接智能电视机攻略
  19. django3 分布式路由、应用以及模型
  20. 大学计算机基础第四章ppt,大学计算机基础第四章.ppt

热门文章

  1. 【印象笔记】绕开bug的小技巧
  2. Spring 注解 hibernate 实体方法 property name=packagesToScan value=com.sise.domain/
  3. python判断n是否为完全数_判断一个数是否为完全数
  4. 【c语言】和【Java】版本的猜数字小游戏
  5. 小韦老师@神犇营-my1088-麻将游戏
  6. 什么原因导致Chrome又被批评
  7. OpenVINO系列02-深度学习部署工具包(DLDT)介绍
  8. nginx配置域名启用http2协议
  9. jquery网页在线流程图
  10. 从事游戏美术行业,游戏建模和游戏原画间怎么选择?