本博客大部分内容来自:http://www.cherylgood.cn,本文稍有改动。

本文内容大致如下:

  1. 初步了解RxJava2.X的使用流程 ;
  2. 探索Observable发送数据的流程 ;
  3. 明白Observer是如何接收数据的 ;
  4. 解析Observable与Observer的勾搭(如何关联)过程 ;
  5. 探索RxJava线程切换的奥秘 ;
  6. 了解RxJava操作符的实现原理。

探索RxJava2分发订阅流程

从Demo到原理

//1、观察者创建一个Observer
Observer observer = new Observer() {@Overridepublic void onSubscribe(@NonNull Disposable d) {Log.d(TAG, "onSubscribe");}@Overridepublic void onNext(@NonNull String s) {Log.d(TAG, "onNext data is :" + s);}@Overridepublic void onError(@NonNull Throwable e) {Log.d(TAG, "onError data is :" + e.toString());}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}};Observable observable = Observable.create(new ObservableOnSubscribe() {@Overridepublic void subscribe(@NonNull ObservableEmitter e) throws Exception {e.onNext("hello");e.onNext("world");e.onComplete();}});observable.subscribe(observer);

结果输出:

onSubscribe
onNext data is :hello
onNext data is :world
onComplete

可以看到,Observer的onSubscribe是最先被调用的,这个回调会有什么用呢?我们后面会讲到。

由于整个流程是从create开始的,我们就从源头开始分析。create方法返回的是一个observable对象,也就是被观察的对象。create方法需要传入一个ObservableOnSubscribe来创建,我们看下ObservableOnSubscribe是什么:

/*** A functional interface that has a {@code subscribe()} method that receives* an instance of an {@link ObservableEmitter} instance that allows pushing* events in a cancellation-safe manner.** @param <T> the value type pushed*/
public interface ObservableOnSubscribe<T> {/*** Called for each Observer that subscribes.* @param e the safe emitter instance, never null* @throws Exception on error*/void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}

该接口会接收一个ObservableEmitter的一个对象,然后通过该对象我们可以发送消息也可以安全地取消消息,我们继续看ObservableEmitter这个接口类。

public interface ObservableEmitter<T> extends Emitter<T> {/*** Sets a Disposable on this emitter; any previous Disposable* or Cancellation will be unsubscribed/cancelled.* @param d the disposable, null is allowed*/void setDisposable(@Nullable Disposable d);/*** Sets a Cancellable on this emitter; any previous Disposable* or Cancellation will be unsubscribed/cancelled.* @param c the cancellable resource, null is allowed*/void setCancellable(@Nullable Cancellable c);/*** Returns true if the downstream disposed the sequence or the* emitter was terminated via {@link #onError(Throwable)}, {@link #onComplete} or a* successful {@link #tryOnError(Throwable)}.* <p>This method is thread-safe.* @return true if the downstream disposed the sequence or the emitter was terminated*/boolean isDisposed();/*** Ensures that calls to onNext, onError and onComplete are properly serialized.* @return the serialized ObservableEmitter*/@NonNullObservableEmitter<T> serialize();/*** Attempts to emit the specified {@code Throwable} error if the downstream* hasn't cancelled the sequence or is otherwise terminated, returning false* if the emission is not allowed to happen due to lifecycle restrictions.* <p>* Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called* if the error could not be delivered.* @param t the throwable error to signal if possible* @return true if successful, false if the downstream is not able to accept further* events* @since 2.1.1 - experimental*/@Experimentalboolean tryOnError(@NonNull Throwable t);
}

ObservableEmitter是对Emitter的扩展,而扩展的方法证实RxJava2.0之后引入的,提供了可中途取消等新能力,我们继续看Emitter。

/*** Base interface for emitting signals in a push-fashion in various generator-like source* operators (create, generate).** @param <T> the value type emitted*/
public interface Emitter<T> {/*** Signal a normal value.* @param value the value to signal, not null*/void onNext(@NonNull T value);/*** Signal a Throwable exception.* @param error the Throwable to signal, not null*/void onError(@NonNull Throwable error);/*** Signal a completion.*/void onComplete();
}

里面的三个方法使用过rx的应该非常眼熟了。看到这里,我们只是了解了传递参数的数据结构,了解到的信息还是比较少的。我们继续看下create内部做了什么操作。

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {ObjectHelper.requireNonNull(source, "source is null");return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

RxJavaPlugins或许你会很陌生,其实我也很陌生,不过没关系,我觉得后面会经常遇到RxJavaPlugins,熟悉它是必然的;

可以看到我们传入ObservableOnSubscribe被用来创建ObservableCreate,其实ObservableCreate就是Observable的一个实现类哦。

思路梳理

OK,到这里我们先梳理一下思路:
1、Observable通过调用create创建一个Observable
2、调用create时需要传入一个ObservableOnSubscribe类型的实例参数
3、最终传入的ObservableOnSubscribe类型的实例参数作为ObservableCreate构造函数的参数传入,一个Observable就此诞生了

ObservableCreate又是个什么东东呢?我们分步来,先看ObservableCreate的两个方法。

public final class ObservableCreate<T> extends Observable<T> {final ObservableOnSubscribe<T> source;public ObservableCreate(ObservableOnSubscribe<T> source) {this.source = source;}@Overrideprotected void subscribeActual(Observer<? super T> observer) {CreateEmitter<T> parent = new CreateEmitter<T>(observer);observer.onSubscribe(parent);try {source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}}......}

source:Observable.createc传入的ObservableOnSubscribe实例;

subscribeActual回调方法,它在调用Observable.subscribe时被调用,即与观察者或则订阅者发生联系时触发。subscribeActual也是实现我们主要逻辑的地方,我们来仔细分析下subscribeActual方法:
1. 首先subscribeActual传入的参数为Observer类型,也就是我们subscribe时传入的观察者,到底是不是呢?后面会分析到。
2. 传入的Observer会被包装成一个CreateEmitter,CreateEmitter继承了AtomicReference提供了原子级的控制能力。RxJava2.0提供的新特性与之息息相关哦,这个我们先给它来个关键标签,后面再详细分析。
3. 观察者(observer)调用自己的onSubscribe(parent);将包装后的observer传入。这个也是RxJava2.0的变化,真正的订阅在source.subscribe(parent);这句代码被执行后开始,而在此之前先调用了onSubscribe方法来提供RxJava2.0后引入的新能力(如中断能力)。从这里我们也就知道了为何观察者的onSubscribe最先被调用了。(被订阅者说:我也很无辜,他自己调用了自己,我也控制不了╮(╯_╰)╭)
4. 被订阅者或者说被观察者(source)调用subscribe订阅方法与观察者发生联系。这里进行了异常捕获,如果subscribe抛出了未被捕获的异常,则调用 parent.onError(ex);
5. 在执行subscribe时也就对应了我们demo中的

public void subscribe(@NonNull ObservableEmitter e) throws Exception {e.onNext("hello");e.onNext("world");e.onComplete();}

Ok,看来subscribeActual这个回调确实很重要,前面我们也说了subscribeActual回调方法在Observable.subscribe被调用时执行的,真的像我说的一样么?万一我看走眼了

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {ObjectHelper.requireNonNull(observer, "observer is null");try {observer = RxJavaPlugins.onSubscribe(this, observer);ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");subscribeActual(observer);} catch (NullPointerException e) { // NOPMDthrow e;} catch (Throwable e) {Exceptions.throwIfFatal(e);// can't call onError because no way to know if a Disposable has been set or not// can't call onSubscribe because the call might have set a Subscription alreadyRxJavaPlugins.onError(e);NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");npe.initCause(e);throw npe;}
}

OK,代码不多,可以看到RxJavaPlugins.onSubscribe(this, observer);,我们RxJava2.0中的Hook能力就是来自这里了。然后继续看下面subscribeActual(observer);被调用了。

思路梳理
  1. 传入的ObservableOnSubscribe最终被用来创建成ObservableCreate
  2. ObservableCreate持有我们的被观察者对象以及订阅时所触发的回调subscribeActual
  3. 在subscribeActual实现了我们的主要逻辑,包括observer.onSubscribe(parent);,source.subscribe(parent);,parent.onError(ex);的调用
  4. 在Observable的subscribe被调用时开始执行事件分发流程。

探索RxJava2神秘的随意取消订阅流程的原理

前面初步分析了RxJava从创建到执行的流程。
接着我们将探索RxJava2.x提供给我们的Disposable能力的来源。

先看一个demo。

//1、观察者创建一个Observer
Observer observer = new Observer() {@Overridepublic void onSubscribe(@NonNull Disposable d) {Log.d(TAG, "onSubscribe");disposable = d;}@Overridepublic void onNext(@NonNull String s) {Log.d(TAG, "onNext data is :" + s);if (s.equals("hello")) {//执行了hello之后终止disposable.dispose();}}@Overridepublic void onError(@NonNull Throwable e) {Log.d(TAG, "onError data is :" + e.toString());}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}
};Observable observable = Observable.create(new ObservableOnSubscribe() {@Overridepublic void subscribe(@NonNull ObservableEmitter e) throws Exception {e.onNext("hello");Log.i(TAG, "发送 hello");e.onNext("world");Log.i(TAG, "发送 world");e.onComplete();Log.i(TAG, "调用 onComplete");}});
observable.subscribe(observer);

输出结果如下:

  onSubscribeonNext data is :hello发送 hello发送 world调用 onComplete

在发送玩hello之后,成功终止了后面的Reactive流。从结果我们还发现,后面的Reactive流被终止了,也就是订阅者或者观察者收不到后面的信息了,但是生产者或者说被订阅者、被观察者的代码还是会继续执行的。

Ok,我们从哪开始入手呢?我们发现,在我们执行了 disposable.dispose();后,触发了该事件,我们通过源码看下 disposable.dispose();到底做了什么。

/*** Represents a disposable resource.*/
public interface Disposable {/*** Dispose the resource, the operation should be idempotent.*/void dispose();/*** Returns true if this resource has been disposed.* @return true if this resource has been disposed*/boolean isDisposed();
}

此时我们要回忆一下前面的一段代码

public ObservableCreate(ObservableOnSubscribe<T> source) {this.source = source;}@Override
protected void subscribeActual(Observer<? super T> observer) {CreateEmitter<T> parent = new CreateEmitter<T>(observer);observer.onSubscribe(parent);try {source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}
}
  • 我们之前分析到在执行source.subscribe(parent);触发数据分发事件之前先执行了observer.onSubscribe(parent);这句代码,所传入的parent也就对应了我们的Disposable
  • parent是CreateEmitter类型的,但是CreateEmitter是实现了Disposable接口的一个类。而parent又是我们的observer的一个包装后的对象。
  • OK,分析到这里我们来整理下前面的环节,根据Demo来解释下:首先在执行下面代码之前
 e.onNext("hello");Log.i(TAG, "发送 hello");e.onNext("world");Log.i(TAG, "发送 world");e.onComplete();Log.i(TAG, "调用 onComplete");
  • 先执行了observer.onSubscribe(parent);,我们在demo中也是通过传入的parent调用其dispose方法来终止Reactive流,而执行分发hello等数据的e也是我们的parent,也就是他们都是同一个对象。而执行e.onNext(“hello”);的e对象也是observer的一个包装后的ObservableEmitter类型的对象。

总结:Observer自己来控制了Reactive流状态。

Ok,此时如果我说关键点应该在ObservableEmitter这个类上面,你觉得可能性有多少呢?( ̄∇ ̄)

关键点就是CreateEmitter parent = new CreateEmitter(observer);这个包装的过程,我们来看下其源码。

static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {private static final long serialVersionUID = -3434801548987643227L;final Observer<? super T> observer;CreateEmitter(Observer<? super T> observer) {this.observer = observer;}@Overridepublic void onNext(T t) {if (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));return;}if (!isDisposed()) {observer.onNext(t);}}@Overridepublic void onError(Throwable t) {if (!tryOnError(t)) {RxJavaPlugins.onError(t);}}@Overridepublic boolean tryOnError(Throwable t) {if (t == null) {t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");}if (!isDisposed()) {try {observer.onError(t);} finally {dispose();}return true;}return false;}@Overridepublic void onComplete() {if (!isDisposed()) {try {observer.onComplete();} finally {dispose();}}}@Overridepublic void setDisposable(Disposable d) {DisposableHelper.set(this, d);}@Overridepublic void setCancellable(Cancellable c) {setDisposable(new CancellableDisposable(c));}@Overridepublic ObservableEmitter<T> serialize() {return new SerializedEmitter<T>(this);}@Overridepublic void dispose() {DisposableHelper.dispose(this);}@Overridepublic boolean isDisposed() {return DisposableHelper.isDisposed(get());}@Overridepublic String toString() {return String.format("%s{%s}", getClass().getSimpleName(), super.toString());}
}

CreateEmitter是ObservableCreate类的静态内部类。CreateEmitter实现了ObservableEmitter, Disposable接口类,所以需实现其方法。这里其实是使用了装饰者模式,其魅力所在一会就会看到了。

我们可以看到在ObservableEmitter内部通过Observer< ? super T> observer存储了我们的observer,这样有什么用呢?看Demo,我们在调用e.onNext(“hello”);时,调用的时ObservableEmitter对象的onNext方法,然后ObservableEmitter对象的onNext方法内部再通过observer调用onNext方法,但是从源码我们可以发现,其并不是简单的调用哦。

@Override
public void onNext(T t) {if (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));return;}if (!isDisposed()) {observer.onNext(t);}
}

1、先判断传入的数据是否为null
2、判断isDisposed(),如果isDisposed()返回false则不执行onNext。

isDisposed()什么时候会返回false呢?按照demo,也就是我们调用了disposable.dispose();后,disposable前面分析了就是CreateEmitter parent,我们看CreateEmitter.dispose()

@Override
public void dispose() {DisposableHelper.dispose(this);
}

里面调用了DisposableHelper.dispose(this);,我们看isDisposed()

@Override
public boolean isDisposed() {return DisposableHelper.isDisposed(get());
}

RxJava的onComplete();与onError(t);只有一个会被执行的秘密原来是它?
再看另外两个方法的调用

@Override
public void onError(Throwable t) {if (!tryOnError(t)) {RxJavaPlugins.onError(t);}
}@Override
public boolean tryOnError(Throwable t) {if (t == null) {t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");}if (!isDisposed()) {try {observer.onError(t);} finally {dispose();}return true;}return false;
}@Override
public void onComplete() {if (!isDisposed()) {try {observer.onComplete();} finally {dispose();}}
}

其内部也基本做了同样的操作,先判断!isDisposed()后再决定是否执行。

但是再这里还有一点哦,我们应该知道onComplete();和onError(t)只有一个会发生,其实现原理也是通过isDisposed这个方法哦,我们可以看到,不关是先执行onComplete();还是先执行onError(t),最终都会调用dispose();,而调用了dispose();后,isDisposed()为false,也就不会再执行另外一个了。而且如果人为先调用onComplete再调用onError,onComplete不会被触发,而且会抛出NullPointerException异常。

小结:

此时我们的目的基本达到了,我们知道了Reactive流是如何被终止的以及RxJava的onComplete();与onError(t);只有一个会被执行的原因。

我们虽然知道了原因,但是秉着刨根问底的态度,抵挡不住内心的好奇,我还是决定挖一挖DisposableHelper这个类,当然如果不想了解DisposableHelper的话,看到这里也就可以了;

Ok,前面分析到,代码里调用了DisposableHelper类的静态方法,我们看下其调用的两个静态方法分别做了什么?

public enum DisposableHelper implements Disposable {DISPOSED;public static boolean isDisposed(Disposable d) {// 判断上次记录的终点标识的是否是 当前执行的Observer,如果是返回truereturn d == DISPOSED;
}
....
public static boolean dispose(AtomicReference field) {//1、current为我们当前的observer的Disposable的值,第一次调用时current是nullDisposable current = field.get();//2、终止标识Disposable d = DISPOSED;//3、两次不相同,说明observer未调用过dispose,if (current != d) {//4、将终止标识的值设置給当前的observer的Disposable,并返回设置前的observer的Disposable的值,此时如果调用isDisposed(Disposable d)返回的就是ture了current = field.getAndSet(d);if (current != d) {//第一次调用时会走到这里,此时current==null,返回true,//current不为null时说明当前的observer调用了多次dispose(),而如果多次调用了Disposable的值还不是DISPOSED,说明之前设置失败,所以再次调用dispose();if (current != null) {current.dispose();}return true;}}return false;
}
....
}    

1、DISPOSED:作为是否要终止的枚举类型的标识
2、isDisposed:判断上次记录的终点标识的是否是 当前执行的Observer,如果是返回true
3、dispose:采用了原子性引用类AtomicReference,目的是防止多线程操作出现的错误。

总结:
  • 我们了解了RxJava的随意终止Reactive流的能力的来源;
  • 过程中也明白了RxJava的onComplete();与onError(t);只有一个会被执行的秘密。
  • 实现该能力的主要方式还是利用了装饰者模式
  • 从中体会了设计模式的魅力所在,当然我们还接触了AtomicReference这个类,在平时估计很少接触到。

探索RxJava2之订阅线程切换原理

本次我们将探索RxJava2.x线程切换的实现原理。

先看一个demo。

 //1、观察者创建一个ObserverObserver observer = new Observer() {@Overridepublic void onSubscribe(@NonNull Disposable d) {Log.d(TAG, "onSubscribe");Log.d(TAG,"work thread is"+Thread.currentThread().getName());disposable = d;}@Overridepublic void onNext(@NonNull String s) {Log.d(TAG, "onNext data is :" + s);Log.d(TAG,"work thread is"+Thread.currentThread().getName());if (s.equals("hello")) {//执行了hello之后终止disposable.dispose();disposable.dispose();}CompositeDisposable compositeDisposable = new CompositeDisposable();compositeDisposable.dispose();}@Overridepublic void onError(@NonNull Throwable e) {Log.d(TAG, "onError data is :" + e.toString());}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}};Observable observable = Observable.create(new ObservableOnSubscribe() {@Overridepublic void subscribe(@NonNull ObservableEmitter e) throws Exception {Log.d(TAG,"work thread is"+Thread.currentThread().getName());e.onNext("hello");Log.i(TAG, "发送 hello");e.onNext("world");Log.i(TAG, "发送 world");e.onComplete();Log.i(TAG, "调用 onComplete");}});

版本1:不存在线程切换

observable.subscribe(observer);

输出结果:

07-13 14:58:13.173 818-865/? D/RxJavaDemo2: onSubscribe
07-13 14:58:13.173 818-865/? D/RxJavaDemo2: work thread isInstr: android.support.test.runner.AndroidJUnitRunner
07-13 14:58:13.173 818-865/? D/RxJavaDemo2: work thread isInstr: android.support.test.runner.AndroidJUnitRunner
07-13 14:58:13.173 818-865/? D/RxJavaDemo2: onNext data is :hello
07-13 14:58:13.173 818-865/? D/RxJavaDemo2: work thread isInstr: android.support.test.runner.AndroidJUnitRunner
07-13 14:58:13.173 818-865/? I/RxJavaDemo2: 发送 hello
07-13 14:58:13.173 818-865/? I/RxJavaDemo2: 发送 world
07-13 14:58:13.173 818-865/? I/RxJavaDemo2: 调用 onComplete
07-13 14:58:13.173 818-865/? I/TestRunner: finished: testRx(com.guanaj.rxdemo.RxJavaDemo2)

版本2:切换线程(切换操作是如此的潇洒)

observable.subscribeOn(Schedulers.io())//切换到io线程.observeOn(AndroidSchedulers.mainThread())//切换到主线程.subscribe(observer);

输出结果:

07-13 14:43:56.699 29727-29749/? D/RxJavaDemo2: onSubscribe
07-13 14:43:56.699 29727-29749/? D/RxJavaDemo2: work thread isInstr: android.support.test.runner.AndroidJUnitRunner
07-13 14:43:56.699 29727-29749/? I/TestRunner: finished: testRx(com.guanaj.rxdemo.RxJavaDemo2)
07-13 14:43:56.699 29727-29753/? D/RxJavaDemo2: work thread isRxCachedThreadScheduler-1
07-13 14:43:56.699 29727-29753/? I/RxJavaDemo2: 发送 hello
07-13 14:43:56.699 29727-29753/? I/RxJavaDemo2: 发送 world
07-13 14:43:56.699 29727-29753/? I/RxJavaDemo2: 调用 onComplete
07-13 14:43:56.699 29727-29727/? D/RxJavaDemo2: onNext data is :hello
07-13 14:43:56.699 29727-29727/? D/RxJavaDemo2: work thread ismain

结果分析(因为我用的是@RunWith(AndroidJUnit4.class)执行代码,所以在工作线程是AndroidJUnitRunner):
现在我们现象,后面根据现象分析原因。

没线程切换的版本:

无论在哪里调用subscribe,都在当前线程执行。

存在版本切换的版本:

1、被观察者的onSubscribe在调用subscribe的线程中执行,
2、被观察者的subscribe在RxJava2的RxCachedThreadScheduler-1中运行。
3、onNext工作在主线程

OK,现象看完了,我们开始看本质吧!但是,从哪入手呢?还是老办法,哪里触发的行为就哪里下手( ̄∇ ̄)

我们先来探索切换Observable工作线程的subscribeOn方法入手。

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {ObjectHelper.requireNonNull(scheduler, "scheduler is null");return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

看到了RxJavaPlugins.onAssembly,前面分析过,为hook服务,现在看成是返回传入的Obserable即可。这里的this为我们的observable,scheduler就是我们传入的Schedulers.io();我们继续看ObservableSubscribeOn;

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {....}

其继承AbstractObservableWithUpstream

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {/** The source consumable Observable. */protected final ObservableSource<T> source;/*** Constructs the ObservableSource with the given consumable.* @param source the consumable Observable*/AbstractObservableWithUpstream(ObservableSource<T> source) {this.source = source;}@Overridepublic final ObservableSource<T> source() {return source;}}

AbstractObservableWithUpstream继承自Observable,其作用是通过source字段保存上游的Observable,因为Observable是ObservableSource接口的实现类,所以我们可以认为Observable和ObservableSource在本文中是相等的:,

也就是说ObservableSubscribeOn是对Observble进行了一次wrapper操作

我们继续回来看ObservableSubscribeOn的源码

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {//1、线程调度器final Scheduler scheduler;public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {//2、存储上游的obserblesuper(source);this.scheduler = scheduler;}@Override
public void subscribeActual(final Observer<? super T> s) {//以下为关键部分//3、对我们下游的observer进行一次wrapperfinal SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);//4、同样,先自己调用自己的onSubscribes.onSubscribe(parent);//5、(高能量聚集了)将调度的线程的Disposable赋值给当前的Disposable。scheduler可以看成是某个线程上的调度器。new SubscribeTask(parent)工作在其指定的线程里面。SubscribeTask是一个Runnable,也就是说调度器触发Runnable的run()运行,//***是不是恍然大悟,那么run()里面的代码就是运行在scheduler的线程上了。这样也就实现了线程的切换了。parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));}static final class SubscribeOnObserver<T> extends AtomicReference implements Observer<T>, Disposable {....}...}

我们开看下SubscribeTask

final class SubscribeTask implements Runnable {private final SubscribeOnObserver<T> parent;SubscribeTask(SubscribeOnObserver<T> parent) {this.parent = parent;}@Overridepublic void run() {source.subscribe(parent);}
}

1、parent就是我们包装后的observe,其内部保存了下游的observer
2、source即通过ObservableSubscribeOnwrapper后存储我们上游的observable

所以run里面的source.subscribe(parent);即为wrapper的observer订阅了上游的observable,触发了上游observable的subscribeActual,开始执行数据的分发
上层obserable -》wrapper产生的observer -》真实的observser

思路梳理(重要)

Ok,分析到这里思路基本清晰了
1、在执行subscribeOn时,在Observable和observer中插入了一个(wrapper)obserabler和(wrapper)Observer

原observable->【(Wrapper)observable||(Wrapper)Observer】->(原)Observer

2、observable.subscribe触发->(Wrapper)Observable.subscribeActual()内部调用->parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));,->scheduler在指定线程调用(完成线程切换)->SubscribeTask.run,run内部调用->原Observable.subscribe((Wrapper)Observer)触发->(原)Observable.subscribeActual()开始数据分发

此时分发给的是(Wrapper)Observer,那应该是(Wrapper)Observer分发给了(原)Observer,我们看下是不是。

OK,(Wrapper)Observer对(原)Observer进行了wrapper,我们看下源码:

static final class SubscribeOnObserver<T> extends AtomicReference implements Observer<T>, Disposable {//6、存储下游的observerfinal Observer<? super T> actual;//7、保存下游observer的Disposable,下游的Disposable交由其管理final AtomicReference<Disposable> s;SubscribeOnObserver(Observersuper T> actual) {this.actual = actual;this.s = new AtomicReference<Disposable>();}@Overridepublic void onSubscribe(Disposable s) {//8、onSubscribe()方法在observer调用subscribe时触发,Observer传入自己的Disposable,赋值给this.s,交由当前的包装的Observer管理。同样是装饰者模式的魅力所在。DisposableHelper.setOnce(this.s, s);}//当前Observer可以理解为下游observer和上游obserable的一个中间observer。//9、这里直接调用下游observer的对应方法。@Overridepublic void onNext(T t) {actual.onNext(t);}@Overridepublic void onError(Throwable t) {actual.onError(t);}@Overridepublic void onComplete() {actual.onComplete();}//10、取消订阅时,要同时取消下游的observer和当前的observer,因为上游obserable分发订阅数据判断是否需要派发时判断的是与之最近的observer。//上层obserable-》wrapper产生的observer》真实的observser@Overridepublic void dispose() {DisposableHelper.dispose(s);DisposableHelper.dispose(this);}@Overridepublic boolean isDisposed() {return DisposableHelper.isDisposed(get());}//11、subscribeActual()中被调用,目的是将Schedulers返回的Worker加入管理void setDisposable(Disposable d) {DisposableHelper.setOnce(this, d);}}

确实是(Wrapper)Observer分发给了(原)Observer。
到这里的时候,整个流程基本OK了,但是,我们在5和11处说了,调度Worker也会加入Disposable进行管理,我还是要一探究竟( ̄∇ ̄)。

有了对SubscribeOnObserver分析的铺垫,我们现在可以分析第5处parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));的代码了,我们先看scheduler.scheduleDirect()这句

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {//12、以毫秒为单位,无延迟调度return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

其返回一个Disposable,我们看下这个Disposable是否真的是调度的线程的。

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {//13、Worker实现了Disposable的一个调度工作者类final Worker w = createWorker();//14、hook,排除hook干扰,可以理解为decoratedRun==runfinal Runnable decoratedRun = RxJavaPlugins.onSchedule(run);//15、DisposeTask同样是实现了Disposable的TaskDisposeTask task = new DisposeTask(decoratedRun, w);//16、开始执行w.schedule(task, delay, unit);//17、确实是返回了管理run的workerreturn task;
}

现在重点转移到DisposeTask,我们把run给了DisposeTask,然后worker调度task开始执行run

那么我们可以猜测w.schedule(task, delay, unit)执行后应该是调度了task的某个方法,然后task开始执行Runnable的run

是不是真的呢?我们来看下new DisposeTask(decoratedRun, w)做了什么

static final class DisposeTask implements Runnable, Disposable {//18、我们外部传入的runnablefinal Runnable decoratedRun;//19、调度工作者final Worker w;//20、当前线程Thread runner;DisposeTask(Runnable decoratedRun, Worker w) {this.decoratedRun = decoratedRun;this.w = w;}@Overridepublic void run() {//21、获取执decoratedRun的线程runner = Thread.currentThread();try {//22、OK,高能来了。decoratedRun的run被执行decoratedRun.run();} finally {dispose();runner = null;}}@Overridepublic void dispose() {if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {((NewThreadWorker)w).shutdown();} else {//在DisposeTask被取消时,告诉Worker取消,因为DisposeTask是Worker进行管理的w.dispose();}}@Overridepublic boolean isDisposed() {return w.isDisposed();}}

结论:

  • scheduler.scheduleDirect无延迟调用->worker->worker调度->DisposeTask->DisposeTask.run执行->decoratedRun.run();
  • decoratedRun即我们外部的SubscribeTask

总结

  • 我们从subscribeOn入手分析了Observable线程切换的流程。其基本是通过中间插入包装类,也就是装饰者模式的体现,巧妙的实现了线程的切换。
  • 其内部也对Disposed做了处理,保证Disposed的传递。
  • 装饰者模式的使用贯穿了RxJava2的各处(个人理解),再次体会了设计模式的魅力。

探索RxJava2之观察者线程切换原理

接着我们将继续探索RxJava2.x切换观察者的原理,分析observeOn与subscribeOn的不同之处。

按照套路,我们从observeOn方法入手。

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {//false为默认无延迟发送错误,bufferSize为缓冲区大小return observeOn(scheduler, false, bufferSize());
}

我们继续往下看,我猜套路跟subscribeOn的差不多,也是采用装饰者模式,wrapper我们的Observable和Observer产生一个中间被观察者和观察中,通过中间被观察者订阅上游被观察者,通过中间观察者接收上游被观察者下发的数据,然后通过线程切换将数据传递给下游观察者。

我们来验证下才想。我觉得就是没完全猜对,也能猜对其中的大部分。

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {ObjectHelper.requireNonNull(scheduler, "scheduler is null");ObjectHelper.verifyPositive(bufferSize, "bufferSize");return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

熟悉的RxJavaPlugins.onAssemblyhook处理,略过,直接看new ObservableObserveOn(this, scheduler, delayError, bufferSize)这句。

 public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {final Scheduler scheduler;final boolean delayError;final int bufferSize;public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {super(source);this.scheduler = scheduler;this.delayError = delayError;this.bufferSize = bufferSize;}@Overrideprotected void subscribeActual(Observersuper T> observer) {//1、在当前线程调度,但不是立即执行,放入队列中if (scheduler instanceof TrampolineScheduler) {source.subscribe(observer);} else {//2、本次走的是这里Scheduler.Worker w = scheduler.createWorker();//3source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));}}

果然,熟悉的模式,对我们上游的Observable,下游的Observerwrapper一次。
1、ObservableObserveOn继承了AbstractObservableWithUpstream
2、source保存上游的Observable
3、scheduler为本次的调度器
4、在下游调用subscribe订阅时触发->subscribeActual->Wrapper了下游的Observer观察者

3处:source为游Observable,下游Observer被wrapper到ObserveOnObserver,发生订阅数件,上游Observable开始执行subscribeActual,调用ObserveOnObserver的onSubscribe以及onNext、onError、onComplete等

我们接着看Observer被包装进 ObserveOnObserver的样子,代码有点多,我们分段讲解

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>implements Observer<T>, Runnable {private static final long serialVersionUID = 6576896619930983584L;//下游的Observerfinal Observer<? super T> actual;//调度工作者final Scheduler.Worker worker;//是否延迟错误,默认falsefinal boolean delayError;//队列大小final int bufferSize;//存储上游Observable下发的数据队列SimpleQueue<T> queue;//存储下游Observer的DisposableDisposable s;//存储错误信息Throwable error;//校验是否完毕volatile boolean done;//是否被取消volatile boolean cancelled;//存储执行模式,同步或者异步 同步int sourceMode;boolean outputFused;ObserveOnObserver(Observersuper T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {this.actual = actual;this.worker = worker;this.delayError = delayError;this.bufferSize = bufferSize;}@Overridepublic void onSubscribe(Disposable s) {if (DisposableHelper.validate(this.s, s)) {this.s = s;if (s instanceof QueueDisposable) {@SuppressWarnings("unchecked")QueueDisposable<T> qd = (QueueDisposable<T>) s;int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);//1、判断执行模式并调用onSubscribe传递给下游Observerif (m == QueueDisposable.SYNC) {sourceMode = m;queue = qd;//true 后面的onXX方法都不会被调用done = true;actual.onSubscribe(this);//2、同步模式下,直接调用scheduleschedule();return;}if (m == QueueDisposable.ASYNC) {sourceMode = m;queue = qd;actual.onSubscribe(this);//2、异步模式下,等待schedulereturn;}}queue = new SpscLinkedArrayQueue<T>(bufferSize);//判断执行模式并调用onSubscribe传递给下游Observeractual.onSubscribe(this);}}

执行玩这里之后,就到我们的onXX方法了
首先可无限调用的onNext

@Override
public void onNext(T t) {//3、数据源是同步模式或者执行过error / complete 会是trueif (done) {return;}//如果数据源不是异步类型,if (sourceMode != QueueDisposable.ASYNC) {//4、上游Observable下发的数据压入queuequeue.offer(t);}//5、开始调度schedule();
}

其次只能触发一次的onError,基本差不多

@Override
public void onError(Throwable t) {if (done) {//6、已完成再执行会抛一场RxJavaPlugins.onError(t);return;}//7、记录错误信息error = t;//8、标识已完成done = true;//9、开始调度schedule();
}

同样是只能触发一次的onComplete,同样的套路,就不说了

@Override
public void onComplete() {if (done) {return;}done = true;schedule();
}

然后就是我们的关键点schedule();

//关键点就是直接、简单、里面线程调度工作者调用schedule(this),传入了this
void schedule() {//getAndIncrement很关键,他原子性的保证了worker.schedule(this);在调度完之前不会被再次调度if (getAndIncrement() == 0) {worker.schedule(this);}
}

这里传入了this,那么说明什么呢?( ̄∇ ̄)

嗯?this是个runnable,没错,我们的ObserveOnObserver实现了Runnable接口

那么,接下来自然是调用run方法

@Override
public void run() {//outputFused一般是falseif (outputFused) {drainFused();} else {drainNormal();}

好吧,在看drainNormal前,我们先看一个函数

//从名字看是检测是否已终止boolean checkTerminated(boolean d, boolean empty, Observersuper T> a) {//1、订阅已取消if (cancelled) {//清空队列queue.clear();return true;}//2、d其实是done,if (d) {//done==ture可能的情况onNext刚被调度完,onError或者onCompele被调用,Throwable e = error;if (delayError) {//delayError==true时等到队列为空才调用if (empty) {if (e != null) {a.onError(e);} else {a.onComplete();}worker.dispose();return true;}} else {//否则直接调用if (e != null) {queue.clear();a.onError(e);worker.dispose();return true;} elseif (empty) {a.onComplete();worker.dispose();return true;}}}//否则未终结return false;}

true:1、订阅被取消cancelled==true,2、done==true onNext刚被调度完,onError或者onCompele被调用

继续看drainNormal

void drainNormal() {int missed = 1;final SimpleQueue<T> q = queue;final Observersuper T> a = actual;//Ok,死循环,我们来看下有哪些出口for (;;) {//Ok,出口,该方法前面分析的if (checkTerminated(done, q.isEmpty(), a)) {return;}//在此死循环for (;;) {boolean d = done;T v;try {//分发数据出队列v = q.poll();} catch (Throwable ex) {//有异常时终止退出Exceptions.throwIfFatal(ex);s.dispose();q.clear();a.onError(ex);//停止worker(线程)worker.dispose();return;}boolean empty = v == null;//判断队列是否为空if (checkTerminated(d, empty, a)) {return;}//没数据退出if (empty) {break;}//数据下发给下游Obsever,这里支付者onNext,onComplete和onError主要放在了checkTerminated里面回调a.onNext(v);}//保证此时确实有一个 worker.schedule(this);正在被执行,missed = addAndGet(-missed);//为何要这样做呢?我的理解是保证drainNormal方法被原子性调用,如果执行了addAndGet之后getAndIncrement() == 0就成立了,此时又一个worker.schedule(this);被调用了,那么就不能执行break了if (missed == 0) {break;}}}

总结

看到这里我们基本了解了observeOn的实现流程,同样是老套路,使用装饰者模式,中间Wrapper了我们的Observable和Observer,通过中间增加一个Observable和Observer来实现线程的切换。

下篇:RxJava2.X 源码分析 二

RxJava2.X 源码分析 一相关推荐

  1. Java代码怎么取消订阅功能,RxJava2 中多种取消订阅 dispose 的方法梳理( 源码分析 )...

    Github 相关代码: Github地址 一直感觉 RxJava2 的取消订阅有点混乱, 这样也能取消, 那样也能取消, 没能系统起来的感觉就像掉进了盘丝洞, 迷乱... 下面说说这几种情况 几种取 ...

  2. 10章 RxJava源码分析

    本篇文章已授权微信公众号 YYGeeker 独家发布转载请标明出处 CSDN学院课程地址 RxJava2从入门到精通-初级篇:https://edu.csdn.net/course/detail/10 ...

  3. Rxjava源码分析之IO.Reactivex.CompositeDisposable

    Rxjava 源码系列目录 Rxjava源码分析之IO.Reactivex.Observer Rxjava源码分析之IO.Reactivex.CompositeDisposable Rxjava源码分 ...

  4. Retrofit跟OkHttp源码分析

    网上已经有了相等多的分析博客,但终归是别人的知识点,倒不如自己走一遍流程,如果你看到了这篇博客,最好自己跟着思路对照源码过一遍哦! Retrofit源码分析 Retrofit的构建 在我们开发工作中使 ...

  5. java中的 dispose_RxJava2 中多种取消订阅 dispose 的方法梳理( 源码分析 )

    Github 相关代码: Github地址 一直感觉 RxJava2 的取消订阅有点混乱, 这样也能取消, 那样也能取消, 没能系统起来的感觉就像掉进了盘丝洞, 迷乱- 下面说说这几种情况 几种取消的 ...

  6. 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析

    目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...

  7. SpringBoot-web开发(四): SpringMVC的拓展、接管(源码分析)

    [SpringBoot-web系列]前文: SpringBoot-web开发(一): 静态资源的导入(源码分析) SpringBoot-web开发(二): 页面和图标定制(源码分析) SpringBo ...

  8. SpringBoot-web开发(二): 页面和图标定制(源码分析)

    [SpringBoot-web系列]前文: SpringBoot-web开发(一): 静态资源的导入(源码分析) 目录 一.首页 1. 源码分析 2. 访问首页测试 二.动态页面 1. 动态资源目录t ...

  9. SpringBoot-web开发(一): 静态资源的导入(源码分析)

    目录 方式一:通过WebJars 1. 什么是webjars? 2. webjars的使用 3. webjars结构 4. 解析源码 5. 测试访问 方式二:放入静态资源目录 1. 源码分析 2. 测 ...

最新文章

  1. AttributeError: ‘NoneType‘ object has no attribute ‘span‘
  2. 2022年人才引进,哪些高校待遇高?
  3. torch.nn.module API
  4. python — 列表与元组
  5. linux内核4.0,新闻|Linux内核4.0功能:实时内核补丁,支持PS3
  6. 百练OJ:2728:第一个C++程序
  7. 那年大一在图书馆作死的大学高数笔记 | 导数和微分
  8. 你可能从未听过的 Linux 发行版
  9. centos8配置本地光盘yum源_CentOS8 配置本地yum源的详细教程
  10. TensorFlow2.0 —— 模型保存与加载
  11. 《Go语言程序设计》读书笔记 (九) 命令工具集
  12. [debug] 命令行窗口运行.py文件,报错No module named XXX
  13. 实用小程序,快速求A类不确定度(物理实验),保留六位
  14. Ubuntu系统配置Java环境
  15. 树莓派linux虚拟键盘,树莓派raspbian安装matchbox-keyboard虚拟键盘
  16. 如何看待阿里云成立新零售事业部?
  17. 轻松学会正则表达式(标题狗)
  18. vs X64汇编 LNK2001: unresolved external symbol mainCRTStartup
  19. Unix/Linux编程:getcontext、setcontext
  20. 新做的网络电视播放器,欢迎试用

热门文章

  1. 天池龙珠计划SQL训练营 Task 2
  2. 2018-8-9-win-消息
  3. 瑞吉外卖--套餐的添加修改等功能,短信验证登录原理操作及用户地址管理功能
  4. jq linux获取数组长度,数组长度用size还是length
  5. 循环改变数组长度时的对策
  6. 这次漂亮!2016斯巴鲁跨界挑战赛大造赛事IP
  7. 线性回归模型及R语言代码
  8. Centos 开机取消进度条
  9. deliphi 字符串分割_Delphi中 分割字符串(两种方法)
  10. 机器学习笔记 - 基于JavaScript的顶级机器学习框架