作者: 一字马胡

转载标志 【2017-12-13】

更新日志

日期

更新内容

备注

2017-12-13

RxJava学习笔记系列

系列笔记 (一)

2017-12-15

增加系列笔记(二)

2017-12-15 21:36

考虑到RxJava很大程度上用于android开发中,而我自身不是移动开发者,所以暂时将RxJava学习笔记系列挂起,在未来需要使用RxJava的时候再继续学习,并且结合实际的应用来学习会收获更多

挂起

导入

其实在很早以前就接触过RxJava,并且当时学习RxJava还有一个产出:JSwitcher,这是一个基于RxJava的实验性框架,对于该框架的介绍可以参考下面的描述:

JSwitcher is a Convenient tool to switch schedule base on RxJava, and Jswitcher also implement a sample Version Observer/Observable, you can learn how RxJava works from the sample codes. it's easy to switch to Another schedule from current schedule. you just need to care about your bussiness, using 'switchTo' Operator to switch to the longing schedlue when you want to do the work on the suitable schedule. There are some especial schedules for you, like I/O Bound Schedule, Cpu Bound Schedule, And Single Schedule, etc, if you want to create an extra schedule by yourself, it's ok for JSwitcher, and it's very easy to do this .And the most important thing is the jswitch support 'chain operator', that means you can switch to a schedule, then fit on this schedule some works, then you can do switch operator continue from current position, or you can just fit another work on current schedule, and jswitcher has terminal operator like 'waitAndShutdown', after do the operator, you can not do 'chain operator' anymore. and the jswitcher will wait some time and shutdown all of schedule.

该框架将RxJava的核心部分抽离出来并做了一些简化处理,说到这里,需要提及一下,将一个复杂框架中的某部分抽象出来看似很简单,但是实际操作起来还是有一些困难的,并且在实际操作的过程中为了不涉及过多外围的内容时常需要简化,就是将一些依赖外围的核心部分中的某些内容抛弃,但是最为主要的骨架不能丢掉,这样操作下来会对整个框架有一定的了解。如果上面的描述激起了你的兴趣,可以实际去阅读JSwitcher框架代码,也可以作为快速入门RxJava的学习材料,但是该框架存在一些不确定性以及一些待研究正确性的点,所以不宜在实际项目中应用。

JSwitcher的核心功能是实现线程池的切换,并且支持按任务性质(I/O,Compute)来划分线程池,切换到合适的线程池可以提交任务,具体的使用可以参考下面的例子:

SwitcherFitter.switcherFitter()

.switchToIoSchedule() //switch to i/o bound schedule

.switchToSingleSchedule() //switch to single schedule

.fit(normalRunner, future1, true) //do the normal runner at current schedule

.switchToComputeSchedule() // switch to cpu bound schedule

.fit(normalRunner, future2, true) // do

.fit(timeoutRunner, future3, true) // do

.switchToSingleSchedule() //switch

.switchToSingleSchedule() //switch

.fit(timeoutRunner, future4, true) //do

.awaitFuturesCompletedOrTimeout(100,

completableFutures, timeoutFutures, 10) //wait for the future

.switchToComputeSchedule() //switch

.fit(() -> {

System.out.println("i am a tester->" + Thread.currentThread().getName());

}) // do the stupid work

.waitAndShutdown(1000); //wait and shutdown !

关于JSwitcher的设计,可以参考下面的图片:

本文作为学习RxJava的学习笔记的第一篇文章,会从RxJava的一些核心概念出发,并且从实际的例子来梳理RxJava的实现原理,当然,为了阅读的流畅性,每一篇文章不会涉及太多的内容。需要说明的一点是,本文乃至本系列的所有文章都是基于RxJava2,RxJava目前有两个版本,一个是RxJava1,一个是RxJava2,据说两个版本间的差别还是很大的,介于我的学习都是基于RxJava2的,并且没有接触过RxJava1,所以本系列文章不会涉及RxJava1与RxJava2的对比内容,所有内容都是基于RxJava2的。

Observer和Observable

学习RxJava之前,你需要了解什么是Reactive,我的理解是应该要和传统的代码进行对比学习,我们一般写代码都是命令式的,我们希望做什么就做什么,比如我们想下载一张图片,然后判断图片是否下载成功,如果成功了就展示出来,如果没有下载成功则使用兜底图片进行展示,如果没有兜底图片则不展示。下面是这个功能的伪代码实现:

Image img = EntryDownloadHelper.downloadImageByUrl(url, timeout)

if img is null

then

if FALLBACK_IMG != null

then img = FALLBACK_IMG

if img != null

then

ShowEntryHelper.showImage(img, height, weight)

看起来很熟悉并且很容易理解,那什么是Reactive的呢?如果使用RxJava来重写上面的代码,则代码看起来像下面这样:

String imgUrl = "xxx";

Image img = null;

Image FALLBACK_IMG = "xxx";

int timeout = 1000;

int height = 100;

int weight = 200;

Observable.create(new ObservableOnSubscribe() {

public void subscribe(ObservableEmitter e) throws Exception {

if (imgUrl == null || imgUrl.isEmpty()) {

e.onNext(FALLBACK_IMG);

} else {

img = EntryDownloadHelper.downloadImageByUrl(imgUrl, timeout);

if (img == null) {

e.onNext(FALLBACK_IMG);

} else {

e.onNext(img);

}

}

e. onComplete();

}

}).subscribe(new Observer() {

public void onSubscribe(Disposable disposable) {

}

public void onNext(Image s) {

if (s != null) {

ShowEntryHelper.showImage(img, height, weight);

}

}

public void onError(Throwable throwable) {

System.out.println("onError:" + throwable);

}

public void onComplete() {

}

});

这只是一个简单的小例子,并没有什么使用价值,并且需要说明的一点是,RxJava更适合用于移动应用的开发,所以如果是做移动开发的话,学习RxJava的价值会更大,但是在一些其他的开发过程中也会使用到RxJava。

在上面的例子中,出现了两个比较关键的对象,ObServer和Observable,RxJava在实现Reactive的时候使用了观察者设计模式,Observable是被观察者,可以叫数据源,也可以叫做生产者,反正就是负责生产数据,并且将数据推送出去的东西,而ObServer是观察者对象,它会绑定到一个Observable上,并且观察Observable的行为,当ObServable触发事件的时候,ObServer会接收到事件,并且对相应的事件作出相应。所以可以将ObServer叫做事件的接收者,也可以叫做事件的消费者。有了观察者和被观察者,需要将两个角色联系起来,也就是上面所说到的将Observer绑定到Observable上,这个时候就需要使用Observable的subscribe方法,叫做订阅,下面会详细讲解Observable是如何将事件传递给Observer的。

学习一个新技术最开始需要做的就是写一个demo,并且运行起来,然后再继续学习下去。下面首先写一个RxJava的demo,下面的分析将会基于该demo:

Observable.create(new ObservableOnSubscribe() {

public void subscribe(ObservableEmitter e) throws Exception {

e.onNext("test");

e.onComplete();

}

}).subscribe(new Observer() {

public void onSubscribe(Disposable disposable) {

System.out.println("onSubscribe");

}

public void onNext(String s) {

System.out.println("onNext:" + s);

}

public void onError(Throwable throwable) {

System.out.println("onError:" + throwable);

}

public void onComplete() {

System.out.println("onComplete:");

}

});

首先需要创建一个Observable,可以使用Observable的静态方法create,当然可以直接new一个Observable对象,并且实现Observable的方法来实现,就像下面这样:

Observable observable = new Observable() {

@Override

protected void subscribeActual(Observer super String> observer) {

observer.onNext("ok");

observer.onComplete();

}

};

现在,Observable已经有了,下面就需要在该Observable上绑定一个Observer,就像上面的例子一样,使用Observable的subscribe方法,需要说明的一点是,可以在Observable做非常丰富的聚合操作,可以对Observable进行一系列聚合操作(比如map,filter等操作)之后再绑定Observer,但是本文不会涉及这些操作的内容,这些内容将在下一篇该系列的文章中出现。

目前Observable有六个subscribe方法供Observer选择:

public final Disposable subscribe()

public final Disposable subscribe(Consumer super T> onNext)

public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError)

public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError,Action onComplete)

public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError, Action onComplete, Consumer super Disposable> onSubscribe)

public final void subscribe(Observer super T> observer)

可以选择这六个中的任意一个来绑定Observer,本文以一个看起来较为简单的subscribe方法来分析,也就是上面例子中使用的版本:

public final void subscribe(Observer super T> observer)

下面展示了该方法的详细实现细节:

public final void subscribe(Observer super T> observer) {

ObjectHelper.requireNonNull(observer, "observer is null");

try {

observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

subscribeActual(observer);

} catch (NullPointerException e) { // NOPMD

throw e;

} catch (Throwable e) {

Exceptions.throwIfFatal(e);

// can't call onError because no way to know if a Disposable has been set or not

// can't call onSubscribe because the call might have set a Subscription already

RxJavaPlugins.onError(e);

NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");

npe.initCause(e);

throw npe;

}

}

看起来代码很多,但是核心代码就一句:subscribeActual(observer),然后:

/**

* Operator implementations (both source and intermediate) should implement this method that

* performs the necessary business logic.

*

There is no need to call any of the plugin hooks on the current Observable instance or

* the Subscriber.

* @param observer the incoming Observer, never null

*/

protected abstract void subscribeActual(Observer super T> observer);

再看一下new一个Observable的代码:

Observable observable = new Observable() {

@Override

protected void subscribeActual(Observer super String> observer) {

// XXX

}

};

也就是说,subscribe方法中会调用Observable的subscribeActual方法,并且将subscribe的参数(也就是绑定到该Observable的Observer)传递给subscribeActual,然后,我们在subscribeActual方法里面对subscribeActual的参数observer的操作实际上就是直接调用了Observer的方法,所以Observer当然会对响应相应的事件。

这个理解起来不太困难,下面看一下使用Observable的create静态方法来创建Observable的时候是怎么讲一个Observer绑定到一个create出来的Observable上的,回头看下面的代码:

Observable.create(new ObservableOnSubscribe() {

public void subscribe(ObservableEmitter e) throws Exception {

// XXX

}

}).subscribe(new Observer() {

// XXX

});

这个看起来好像不能像上面那种情况一样理解,因为create的参数是new一个ObservableOnSubscribe对象,现在先来看一下create方法的具体实现细节:

public static Observable create(ObservableOnSubscribe source) {

ObjectHelper.requireNonNull(source, "source is null");

return RxJavaPlugins.onAssembly(new ObservableCreate(source));

}

可以看到,create方法返回的是一个ObservableCreate对象,并且将我们的Observable对象传递给了ObservableCreate,这里使用了包装模式,将Observable包装成了ObservableCreate对象。在ObservableCreate类中找到了subscribeActual的实现,而这个subscribeActual正是实现了Observable的subscribeActual。所以包装需要包装彻底啊。下面是ObservableCreate类的subscribeActual的具体实现:

@Override

protected void subscribeActual(Observer super T> observer) {

CreateEmitter parent = new CreateEmitter(observer);

observer.onSubscribe(parent);

try {

source.subscribe(parent);

} catch (Throwable ex) {

Exceptions.throwIfFatal(ex);

parent.onError(ex);

}

}

在subscribeActual内部,又对Observer做了一次包装,将Observer对象包装成了CreateEmitter对象,为什么呢?因为在create方法的参数中我们new的Observable是一个ObservableOnSubscribe类型的对象,而ObservableOnSubscribe的subscribe的参数需要是CreateEmitter类型的,那我们new出来的ObservableOnSubscribe到哪去了呢?看下面的构造函数:

final ObservableOnSubscribe source;

public ObservableCreate(ObservableOnSubscribe source) {

this.source = source;

}

可以看到,我们new出来的ObservableOnSubscribe被保存在source字段中,在来看ObservableCreate类的subscribeActual方法,其中有关键的一句话:source.subscribe(parent),source是Observable,parent是Observer,只是Observer和Observable都是被包装了一层的。如果想具体了解到底是怎么包装的,可以参考CreateEmitter类,也可以借助这个机会学习一下包装模式,还是比较有用的。

本文是对RxJava学习笔记系列的第一篇文章,内容浅显易懂,没有涉及太多的内容,主要分析了一下RxJava中的两个重要的对象,Observable和Observer,并且梳理了一下一个Observer是如何绑定到一个Observable上的,当然,这是学习RxJava的基础内容,如果对这一部分内容都不清楚的话,还需要继续学习一下,本文涉及到两个设计模式,一个是观察者模式,一个是包装模式,结合具体的例子来看还是很好理解的。本文开头还介绍了一下JSwitcher,对于学习RxJava还是比较有帮助的。下面简单做一下RxJava学习笔记系列的文章计划:

《RxJava学习笔记 (一)》 : 了解RxJava中的Observable和Observer,并且明白如何实现订阅

《RxJava学习笔记 (二)》 : RxJava中Observable丰富的聚合操作支持的学习笔记

《RxJava学习笔记 (三)》 : RxJava2中的线程切换学习笔记

《RxJava学习笔记 (四)》 : RxJava Flowable学习

暂时定这几部分内容,在总结过程中如果发现还有什么内容需要补充的时候会进行补充更新。

扫码入群

Java中expecial,RxJava 学习笔记 (一)相关推荐

  1. java中volatile关键字---学习笔记

    volatile关键字的作用 在java内存模型中,线程之间共享堆内存(对应主内存),但又各自拥有自己的本地内存--栈内存,线程的栈内存中缓存有共享变量的副本,但如果是被volatile修饰的变量,线 ...

  2. java中servlet filter_lua学习笔记(二)仿java servlet中Filter功能

    2)代码 Filter.lua--[[ 过滤器接口 ]]-- local FilterChain = require("FilterChain") local Filter = { ...

  3. 零基础学习Java开发,这些学习笔记送给你

    因为Java具备很多特点,并且在企业中被广泛应用为此很多小伙伴选择学习Java基础开发,但是零基础学习Java技术开发需要我们制定Java学习路线图对于我们之后的学习会非常有帮助. 零基础学习Java ...

  4. Java 8 函数式编程学习笔记

    Java 8 函数式编程学习笔记 @(JAVASE)[java8, 函数式编程, lambda] Java 8 函数式编程学习笔记 参考内容 Java 8中重要的函数接口 扩展函数接口 常用的流操作 ...

  5. RxJava 学习笔记(八) --- Combining 结合操作

    @(Rxjava学习笔记) RxJava 学习笔记(八) - Combining 结合操作 RxJava 学习笔记八 Combining 结合操作 StartWith 在数据序列的开头插入一条指定的项 ...

  6. 【Java】函数式编程学习笔记——Stream流

    学习视频:https://www.bilibili.com/video/BV1Gh41187uR?p=1 (1)[Java]函数式编程学习笔记--Lambda表达式 (2)[Java]函数式编程学习笔 ...

  7. java 编程思想 多线程学习笔记

    java 编程思想 多线程学习笔记 一.如何创建多线程? 1.继承 java.lang.Thread 类 2.实现 java.lang.Runnable 接口 3.Callable接口 总之,在任何线 ...

  8. JAVA基础与高级学习笔记

    JAVA基础与高级学习笔记 /记录java基础与高级,除了较简单的内容,没有必要记录的没有记录外,其余的都记录了/ java初学者看这一篇就够了,全文 6万+ 字. JAVA基础 java会出现内存溢 ...

  9. 第10课:底实战详解使用Java开发Spark程序学习笔记

    第10课:底实战详解使用Java开发Spark程序学习笔记 本期内容: 1. 为什么要使用Java? 2. 使用Java开发Spark实战 3. 使用Java开发Spark的Local和Cluster ...

最新文章

  1. Zabbix 3.0 从入门到精通(zabbix使用详解)
  2. hdu 1043 Eight 搜索,哈希
  3. 一次上机试题(面向对象)
  4. hpg8服务器系列指示灯意思,HP Proliant GEN8服务器指示灯说明
  5. 鸿蒙十系统更新机型,高歌猛进,鸿蒙系统升级机型再次确认,花粉:终等到!...
  6. 一个数独问题的算法(已更新,提供一个简单算法,欢迎拍砖)
  7. 查询HTML标签select中options的值并定位其位置
  8. maven隐式依赖引起的包冲突
  9. mysql 字符串截取_MySQL|SUBSTR() 函数用法
  10. Python接口测试之moco
  11. jest自动化测试遇到的一些报错信息及解决方案
  12. python货币转化为资本的前提_深度剖析比特币背后的技术细节
  13. web压力测试工具介绍
  14. VS 2015社区版离线下载
  15. (转载)基于Overfeat的图片分类、定位、检测
  16. ENVI_IDL:批量拼接Modis Swath的逐日数据并输出为Geotiff格式
  17. 【元胞自动机】元胞自动机地铁火灾疏散模型【含Matlab源码 246期】
  18. Win10已安装更新无法卸载怎么办?
  19. 达梦数据库,删除表重复数据
  20. android 环境一键,一键切换Android应用环境(Environment Switcher)

热门文章

  1. vue中向数组去重_「前端剑指offer第3期」来,手写一下数组去重
  2. Windows环境下MySQL 5.7的安装、配置与卸载
  3. 电脑大小写怎么切换_苹果怎么切换电脑便签?可以自由切换的电脑便签手机日历怎么备注特殊日子?苹果手机日历特殊日子提醒便签...
  4. mysql时间格式函数_MySQL时间格式函数
  5. 如果项目中如何添加或卸载某些组件
  6. 四十三、ETL工具的流程和应用
  7. 手把手教你用Python进行SSH暴力破解
  8. 分享一份60页的《Python数据分析入门PPT》
  9. 在Win上做Python开发?当然是用官方的MS Terminal和VS Code了
  10. 读博熬不住了,拿个硕士学位投身业界如何?看过来人怎么说