代码调用

Observable.just(1)

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Exception {

}

});

直接进入主题,先看subscribe中调用了哪些方法

//Observable.java

public final Disposable subscribe(Consumer super T> onNext) {

return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());

}

public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError,

Action onComplete, Consumer super Disposable> onSubscribe) {

ObjectHelper.requireNonNull(onNext, "onNext is null");

ObjectHelper.requireNonNull(onError, "onError is null");

ObjectHelper.requireNonNull(onComplete, "onComplete is null");

ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

LambdaObserver ls = new LambdaObserver(onNext, onError, onComplete, onSubscribe);

subscribe(ls);

return ls;

}

public final void subscribe(Observer super T> observer) {

ObjectHelper.requireNonNull(observer, "observer is null");

try {

observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

subscribeActual(observer);

} catch (NullPointerException e) { // NOPMD

throw e;

} catch (Throwable e) {

Exceptions.throwIfFatal(e);

// can't call onError because no way to know if a Disposable has been set or not

// can't call onSubscribe because the call might have set a Subscription already

RxJavaPlugins.onError(e);

NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");

npe.initCause(e);

throw npe;

}

}

//最终调用了Observable的subscribeActual方法

protected abstract void subscribeActual(Observer super T> observer);

接下来我们看下subscribeOn方法中进行了什么操作

//Observable.java

public final Observable subscribeOn(Scheduler scheduler) {

ObjectHelper.requireNonNull(scheduler, "scheduler is null");

//这里返回了一个ObservableSubscribeOn对象,参考RxJavaPlugins.onAssembly方法,

//返回的值就是传入的值,再根据流式调用,

//即上面分析调用的subscribeActual方法,即是ObservableSubscribeOn的subscribeActual方法

return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));

}

接下来我们看ObservableSubscribeOn的subscribeActual方法

//ObservableSubscribeOn.java

public void subscribeActual(final Observer super T> observer) {

final SubscribeOnObserver parent = new SubscribeOnObserver(observer);

observer.onSubscribe(parent);

//这里生成了一个SubscribeTask,查看源码可知实现了Runnable接口

//这里调用了scheduler.scheduleDirect

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

}

看下scheduler.scheduleDirect,再次之前,我们先看下传入的Scheduler.io

查看传入的Schedule

public static Scheduler io() {

// 这里查看下IO

return RxJavaPlugins.onIoScheduler(IO);

}

//new IOTask

IO = RxJavaPlugins.initIoScheduler(new IOTask());

static final class IOTask implements Callable {

@Override

public Scheduler call() throws Exception {

return IoHolder.DEFAULT;

}

//由此可见,最后Schedulers.io就是IoScheduler

static final class IoHolder {

static final Scheduler DEFAULT = new IoScheduler();

}

//scheduler

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {

//这里生成一个Worker,但是createWorker是一个虚方法,有上可知

//这里调用了IoScheduler.createWorker,生成EventLoopWorker对象

final Worker w = createWorker();

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

DisposeTask task = new DisposeTask(decoratedRun, w);

//调用了EventLoopWorker.schedule

w.schedule(task, delay, unit);

return task;

}

接下来看EventLoopWorker

public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {

//取消注册

if (tasks.isDisposed()) {

// don't schedule, we are unsubscribed

return EmptyDisposable.INSTANCE;

}

//NewThreadWorker.scheduleActual

return threadWorker.scheduleActual(action, delayTime, unit, tasks);

}

真正进入线程调度的代码,在NewThreadWorker中

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {

Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

if (parent != null) {

if (!parent.add(sr)) {

return sr;

}

}

Future> f;

try {

if (delayTime <= 0) {

//executor是一个线程池

f = executor.submit((Callable)sr);

} else {

//存在延迟的

f = executor.schedule((Callable)sr, delayTime, unit);

}

sr.setFuture(f);

} catch (RejectedExecutionException ex) {

if (parent != null) {

parent.remove(sr);

}

RxJavaPlugins.onError(ex);

}

return sr;

}

所以到最后,真正进行线程调度的,其实是一个线程池,看完了subscribeOn,我们再来看看observeOn,首先看下AndroidSchedulers.mainThread()到底是哪个线程

public static Scheduler mainThread() {

return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);

}

private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(

new Callable() {

@Override public Scheduler call() throws Exception {

return MainHolder.DEFAULT;

}

});

private static final class MainHolder {

static final Scheduler DEFAULT

//从Looper.getMainLooper()可以看出,这里是获取了主线程的Looper

= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);

}

好确定了这个问题,我们再继续往下看

public final Observable observeOn(Scheduler scheduler) {

return observeOn(scheduler, false, bufferSize());

}

public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {

ObjectHelper.requireNonNull(scheduler, "scheduler is null");

ObjectHelper.verifyPositive(bufferSize, "bufferSize");

//生成一个新的ObservableObserverOn对象

return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));

}

接下去看ObservableObserveOn对象

protected void subscribeActual(Observer super T> observer) {

if (scheduler instanceof TrampolineScheduler) {

source.subscribe(observer);

} else {

//跟之前一样还是调用createWorker,从上面代码可知调用了HandlerScheduler.createWorker返回HandlerWorker

Scheduler.Worker w = scheduler.createWorker();

//这里有一个内部类对象ObserveOnObserver

source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));

}

}

//内部类ObserveOnObserver,以下是回调方法

public void onSubscribe(Disposable d) {

if (DisposableHelper.validate(this.upstream, d)) {

this.upstream = d;

if (d instanceof QueueDisposable) {

@SuppressWarnings("unchecked")

QueueDisposable qd = (QueueDisposable) d;

int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

if (m == QueueDisposable.SYNC) {

sourceMode = m;

queue = qd;

done = true;

downstream.onSubscribe(this);

//调用schedule

schedule();

return;

}

if (m == QueueDisposable.ASYNC) {

sourceMode = m;

queue = qd;

downstream.onSubscribe(this);

return;

}

}

queue = new SpscLinkedArrayQueue(bufferSize);

downstream.onSubscribe(this);

}

}

@Override

public void onNext(T t) {

if (done) {

return;

}

if (sourceMode != QueueDisposable.ASYNC) {

queue.offer(t);

}

//调用schedule

schedule();

}

@Override

public void onError(Throwable t) {

if (done) {

RxJavaPlugins.onError(t);

return;

}

error = t;

done = true;

//调用schedule

schedule();

}

@Override

public void onComplete() {

if (done) {

return;

}

done = true;

//调用schedule

schedule();

}

void schedule() {

if (getAndIncrement() == 0) {

//所以当回调的时候,最终是调用了worker.schedule

worker.schedule(this);

}

}

//最终看一下HandlerWorker的schedule方法,一看源码,老朋友了,Handler就不解释了

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {

if (run == null) throw new NullPointerException("run == null");

if (unit == null) throw new NullPointerException("unit == null");

if (disposed) {

return Disposables.disposed();

}

run = RxJavaPlugins.onSchedule(run);

ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

Message message = Message.obtain(handler, scheduled);

message.obj = this; // Used as token for batch disposal of this worker's runnables.

if (async) {

message.setAsynchronous(true);

}

handler.sendMessageDelayed(message, unit.toMillis(delay));

// Re-check disposed state for removing in case we were racing a call to dispose().

if (disposed) {

handler.removeCallbacks(scheduled);

return Disposables.disposed();

}

return scheduled;

}

生产调度java程序原码_Rxjava的线程调度源码解析相关推荐

  1. java中web错误返回码,关于在java程序里调用webservice报500返回码的有关问题

    关于在java程序里调用webservice报500返回码的问题 我现在写了个程序,是调用webservice的,执行后我打印返回码是500  错误信息是 java.io.IOException: S ...

  2. java计算机毕业设计培训机构运营系统源码+程序+lw文档+mysql数据库

    java计算机毕业设计培训机构运营系统源码+程序+lw文档+mysql数据库 java计算机毕业设计培训机构运营系统源码+程序+lw文档+mysql数据库 本源码技术栈: 项目架构:B/S架构 开发语 ...

  3. 使用Java实现发送微信消息(附源码)_此程序在手再也不怕对象跟你闹了

    使用Java实现发送微信消息(附源码)_此程序在手再也不怕对象跟你闹了 此程序在手再也不怕女朋友跟你闹了!!!!自从有了女朋友比如:早安.晚安之类的问候语可不能断,但是也难免有时候会忘记那么该咋么办呢 ...

  4. java计算机毕业设计汽车技术资料管理系统源码+程序+lw文档+mysql数据库

    java计算机毕业设计汽车技术资料管理系统源码+程序+lw文档+mysql数据库 java计算机毕业设计汽车技术资料管理系统源码+程序+lw文档+mysql数据库 本源码技术栈: 项目架构:B/S架构 ...

  5. java计算机毕业设计盘山县智慧项目管理系统源码+程序+lw文档+mysql数据库

    java计算机毕业设计盘山县智慧项目管理系统源码+程序+lw文档+mysql数据库 java计算机毕业设计盘山县智慧项目管理系统源码+程序+lw文档+mysql数据库 本源码技术栈: 项目架构:B/S ...

  6. php+mysql原仿素材火素材源码平台程序

    php+mysql原仿素材火素材源码平台程序 原素材码素材平台程序,程序为php+mysql开发.仿素材火模板程序源码. 基于codeIgniter框架,功能包含会员积分,码币充值,每日签到,推广获取 ...

  7. java计算机毕业设计宁夏红色旅游管理系统源码+程序+lw文档+mysql数据库

    java计算机毕业设计宁夏红色旅游管理系统源码+程序+lw文档+mysql数据库 java计算机毕业设计宁夏红色旅游管理系统源码+程序+lw文档+mysql数据库 本源码技术栈: 项目架构:B/S架构 ...

  8. java计算机毕业设计皮皮狗宠物用品商城源码+程序+lw文档+mysql数据库

    java计算机毕业设计皮皮狗宠物用品商城源码+程序+lw文档+mysql数据库 java计算机毕业设计皮皮狗宠物用品商城源码+程序+lw文档+mysql数据库 本源码技术栈: 项目架构:B/S架构 开 ...

  9. java计算机毕业设计期刊在线投稿系统源码+程序+lw文档+mysql数据库

    java计算机毕业设计期刊在线投稿系统源码+程序+lw文档+mysql数据库 java计算机毕业设计期刊在线投稿系统源码+程序+lw文档+mysql数据库 本源码技术栈: 项目架构:B/S架构 开发语 ...

最新文章

  1. Java设计模式-七大设计原则
  2. 《数字视频和高清:算法和接口》一第2章 图像的采样和显示
  3. g460 bios 白名单_深睛高清车牌识别相机白名单,智能管理车辆进出_深睛车牌识别...
  4. opencv imwrite()函数中 ImwriteFlags 的 cv.IMWRITE_JPEG_RST_INTERVAL(JPEG restart interval 重启间隔)是什么?
  5. org.apache.hadoop.hive.metastore.api.InvalidObjectException: Role public already exists.
  6. 西华大学计算机学院陈鹏,中国计算机学会CCF服务计算专委会走进西华大学
  7. VS2019 WPF制作OTA上位机(二)获取bin文件路径
  8. Docker使用Dockerfile构建简单镜像
  9. springboot 实现接口灰度发布
  10. 人工智能技术的三大学派_什么是人工智能?它离我们有多远
  11. “鱼渔合作”在IT运维中的启示
  12. OFFICE技术讲座:连续内容分断的规则
  13. 贴片电阻、贴片电容规格、封装、尺寸
  14. 【Python】自动抠图换背景
  15. 从原型图到成品:步步深入 CSS 布局
  16. 一个完整的MSI包的配置文件XML的内容形式和查看方法ORCA
  17. Python_第六篇 第三方安装包(1)_fancyimpute介绍及使用
  18. 3D游戏建模真的好找工作吗?
  19. where和which的区别【定于从句】
  20. 尚学堂JAVA第四章课后答案

热门文章

  1. java abs前缀变量_JAVA工具例大全--cn.hutool.setting.AbsSetting读取配置文件例子
  2. c语言调用createthread线程的头文件_易语言API多线程总汇
  3. 如何查看Win11系统的版本号
  4. 腾讯视频app怎么允许腾讯视频访问位置信息
  5. Win7系统电脑休眠后无法唤醒的解决方法
  6. Linux 命令解压缩
  7. 接口interface修饰符相关问题总结
  8. Java核心类库篇6——IO
  9. 大数据时代的3V3高
  10. 长春理工大学c语言实验题库,长春理工大学首届趣味心理知识竞赛初赛题库.doc...