示例如下:

    final Disposable disposable = Observable.create(new ObservableOnSubscribe<String>() {public void subscribe(@NonNull ObservableEmitter<String> e) {for (int i=0; i<100; i++) {if (e.isDisposed())break;Thread.sleep(1000);e.onNext(String.valueOf(i));}e.onComplete();}}).map(new Function<Integer, String>() {public String apply(Integer number) {return number.toString();}}).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe();new Handler().postDelayed(new Runnable() {public void run() {disposable.dispose();}}, 3000);

问题:

为什么调用disposable.dispose之后,ObservableEmitter.isDisposed为true?

分析:

经过调试发现,disposable.dispose这一句执行后最终会调用ObservableCreate.CreateEmitter.dispose。

【connect】在observer.onSubscribe时连接成一条线

  1. Disposable是什么?
  2. 怎样连接起来?
  3. 怎样跨越线程?
  4. 怎样跨越操作符?

1,Disposable是一个interface

/*** Represents a disposable resource.*/
public interface Disposable {/*** Dispose the resource, the operation should be idempotent.*/void dispose();/*** Returns true if this resource has been disposed.* @return true if this resource has been disposed*/boolean isDisposed();
}

2,当subscribe调用后,会构造一个LambdaObserver,最终返回的Dispose就是这个。

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe) {ObjectHelper.requireNonNull(onNext, "onNext is null");ObjectHelper.requireNonNull(onError, "onError is null");ObjectHelper.requireNonNull(onComplete, "onComplete is null");ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);subscribe(ls);return ls;
}

假设只有Observable.create与subscribe,那么,将会进入ObservableCreate.subscribeActual

@Override
protected void subscribeActual(Observer<? super T> observer) {// observer即传入的LambdaObserverCreateEmitter<T> parent = new CreateEmitter<T>(observer);// 重点在此,看看里面做了什么observer.onSubscribe(parent);try {source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}
}

LambdaObserver.onSubscribe

@Override
public void onSubscribe(Disposable d) {// 注意,继承了AtomicReference<Disposable>的类内部都有一个// volatile Disposable value,// 此处就是保存传入的Disposable(CreateEmitter),如此,// 当LambdaObserver的dispose调用时,会调用到CreateEmitter.disposeif (DisposableHelper.setOnce(this, d)) {try {onSubscribe.accept(this);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);d.dispose();onError(ex);}}
}

3,先看subscribeOn,ObservableSubscribeOn

@Override
public void subscribeActual(final Observer<? super T> observer) {final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);// downstream保存SubscribeOnObserver对象(Disposable)observer.onSubscribe(parent);// 任务抛到线程,并且将其Disposable对象保存parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

当source.subscribe调用后,SubscribeOnObserver内onSubscribe如下

@Override
public void onSubscribe(Disposable d) {// 将上游observable保存到upstream中DisposableHelper.setOnce(this.upstream, d);
}

当LambdaObserver.dispose调用后,SubscribeOnObserver内dispose被调用

@Override
public void dispose() {// 触发上游disposeDisposableHelper.dispose(upstream);// dispose任务,任务在线程中执行会先判断isDisposed再决定是否执行DisposableHelper.dispose(this);
}

再看observeOn,ObservableObserveOn

@Override
protected void subscribeActual(Observer<? super T> observer) {if (scheduler instanceof TrampolineScheduler) {source.subscribe(observer);} else {Scheduler.Worker w = scheduler.createWorker();// ObserveOnObserver.onNext时,会在worker中执行observer.onNextsource.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));}
}

ObservableObserveOn.onSubscribe

@Override
public void onSubscribe(Disposable d) {if (DisposableHelper.validate(this.upstream, d)) {// 将CreateEmitter保存起来this.upstream = d;if (d instanceof QueueDisposable) {@SuppressWarnings("unchecked")QueueDisposable<T> qd = (QueueDisposable<T>) d;int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);if (m == QueueDisposable.SYNC) {sourceMode = m;queue = qd;done = true;downstream.onSubscribe(this);schedule();return;}if (m == QueueDisposable.ASYNC) {sourceMode = m;queue = qd;downstream.onSubscribe(this);return;}}queue = new SpscLinkedArrayQueue<T>(bufferSize);// downstream保存此Disposable对象downstream.onSubscribe(this);}
}

LambdaObserver.dispose调用后,会执行ObservableObserveOn.dispose

@Override
public void dispose() {if (!disposed) {disposed = true;// 中止上游任务upstream.dispose();// 中止线程中的任务worker.dispose();if (getAndIncrement() == 0) {queue.clear();}}
}

4,以map为例

Observable.create().map().subscribe();

内部保存upstream,并把自己保存在downstream的Disposable对象中。

从map.subscribeActual开始

@Override
public void subscribeActual(Observer<? super U> t) {// source为ObservableCreate// t为LambdaObserversource.subscribe(new MapObserver<T, U>(t, function));
}

source.subscribe调用后会进入MapObserver.onSubscribe,在基类中BasicFuseableObserver

@Override
public final void onSubscribe(Disposable d) {// d为CreateEmitter,保存为upstreamif (DisposableHelper.validate(this.upstream, d)) {this.upstream = d;if (d instanceof QueueDisposable) {this.qd = (QueueDisposable<T>)d;}if (beforeDownstream()) {// downstream为LambdaObserver,在构造函数中赋值的// 该句调用后,LambdaObserver保存此对象Disposabledownstream.onSubscribe(this);afterDownstream();}}
}

因此,LambdaObserver.dispose调用后,会调用MapObserver.dispose

@Override
public void dispose() {// upstream即上面的CreateEmitterupstream.dispose();
}

总结:

下游会保存上游的Disposable对象,从而,在下游开始调用dispose后,实际上是去调用上游的dispose,如此,直到第一个Observable对象调用dispose停止,从而达到每个Disposable对象都为DISPOSED。

RxAndroid dispose实现原理相关推荐

  1. RxJava的Disposable及其工作原理

    一.关于 Disposable 任何订阅者模式的代码,都需要注意注册与注销的配对出现,否则会出现内存泄漏.RxJava2 提供了 Disposable( RxJava1 中是 Subscription ...

  2. 响应式编程简介之:Reactor

    文章目录 简介 Reactor简介 reactive programming的发展史 Iterable-Iterator 和Publisher-Subscriber的区别 为什么要使用异步reacti ...

  3. 那些年收藏的技术文章(一) CSDN篇

    #Android ##Android基础及相关机制 Android Context 上下文 你必须知道的一切 Android中子线程真的不能更新UI吗? Android基础和运行机制 Android任 ...

  4. Android复习系列④之《Android进阶》

    Android进阶 1 Okhttp OkHttpClient相当于配置中心, 所有的请求都会共享这些配置(例如出错是否重试.共享的连接池) . 1.OkHttpCLient中的配置主要有: Disp ...

  5. 那些年收藏的技术文章(一)-CSDN篇

    Android Android基础及相关机制 Android View体系 Android坐标相关 Android事件机制及相关问题 Android官方组件 Android Service Andro ...

  6. RxSwift之深入解析dispose源码的实现原理

    一.前言 任何对象都有生命周期,有创建就要销毁.OC 中有 init 和 dealloc,swift 有 init 和 deinit,RxSwift 也不例外,有 create 和 dispose. ...

  7. 行动力决定了一个人的成败,有想法,就去做! C#的内存管理原理解析+标准Dispose模式的实现

    尽管.NET运行库负责处理大部分内存管理工作,但C#程序员仍然必须理解内存管理的工作原理,了解如何高效地处理非托管的资源,才能在非常注重性能的系统中高效地处理内存. C#编程的一个优点就是程序员不必担 ...

  8. RxJava和RxAndroid

    现在RxJava和RxAndroid越来越火爆,自己在业余时间也学习了一下,感觉确实很好用,之前 为了完成页面刷新,数据请求,组件信息传递的时候,要使用handler,真的是逻辑思路很强,稍微不注意, ...

  9. 重拾Android之路(五)RxJava和RxAndroid

    现在RxJava和RxAndroid越来越火爆,自己在业余时间也学习了一下,感觉确实很好用,之前 为了完成页面刷新,数据请求,组件信息传递的时候,要使用handler,真的是逻辑思路很强,稍微不注意, ...

最新文章

  1. skynet 报错 skynet 服务缺陷 Lua死循环
  2. springboot集成kaptcha 2.3.2
  3. MobileNet、GhostNet理解及测试
  4. WPF实现背景透明磨砂,并通过HandyControl组件实现弹出等待框
  5. linux awk 时间范围,linux下使用awk命令按时间段筛选日志
  6. 先睹为快!第十一届数据技术嘉年华七大看点全揭秘
  7. NodeJs 多核多进程并行框架实作 - CNode
  8. 最速下降法求解步骤及例题
  9. linux查看磁带大小命令,16条实用的Linux/Unix 磁带管理命令
  10. LabVIEW与西门子1200,1500系列Sanp7协议通讯案例+QMH标准框架编写,非常实用,长期测试通讯无误。
  11. 一片外文的计算机网络方面的文献,计算机网络专科外文文献 计算机网络专科核心期刊参考文献有哪些...
  12. PS for Mac 破解版下载
  13. 淘宝天猫自动领取喵币
  14. 平安京s9服务器维护,决战平安京S9赛季段位怎么继承_决战平安京S9赛季段位继承详情_素材吧...
  15. cmd循环调用native2ascii.exe
  16. 软件企业双软认证的条件和好处是什么
  17. JavaApplet运行
  18. 《Python深度学习》第一部分读书笔记
  19. 【深度学习图像识别课程】毕业项目:狗狗种类识别(1)环境准备
  20. 浅谈自动化测试中的验证码处理方法小总结

热门文章

  1. java替换图片内容
  2. 《人人都在说谎:赤裸裸的数据真相》读书笔记2
  3. Docker初级应用
  4. Linux将用户加入sudo组,ubuntu新建一个账户并将其加入sudo用户组
  5. Win8.1部署 .NET Framework 3.5 靠谱安装方式_Andy_Issta_新浪博客
  6. 转:学会“Asking ”,而非“Telling”
  7. MAPIFolder.UserDefinedProperties 的作用
  8. 不得不服!windows10docker效率
  9. 数字先锋 | 主机、硬盘、CPU统统没有? 这个电教室有点“潮”!
  10. html中模糊背景的设置方法