前言

我从去年開始使用 RxJava 。到如今一年多了。

今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava 。并且使用的场景越来越多 。

而近期这几个月。我也发现国内越来越多的人開始提及 RxJava 。

有人说『RxJava 真是太好用了』,有人说『RxJava 真是太难用了』,另外很多其它的人表示:我真的百度了也谷歌了。但我还是想问: RxJava 究竟是什么?

鉴于 RxJava 眼下这种既火爆又神奇的现状,而我又在一年的使用过程中对 RxJava 有了一些理解,我决定写下这篇文章来对 RxJava 做一个相对详细的、针对 Android 开发人员的介绍。

这篇文章的目的有两个:1. 给对 RxJava 感兴趣的人一些入门的指引2. 给正在使用 RxJava 但仍然心存疑惑的人一些更深入的解析

  • RxJava 究竟是什么
  • RxJava 好在哪
  • API 介绍和原理简析
    • 1. 概念:扩展的观察者模式

      • 观察者模式
      • RxJava 的观察者模式
    • 2. 基本实现
      • 1) 创建 Observer
      • 2) 创建 Observable
      • 3) Subscribe (订阅)
      • 4) 场景演示样例
        • a. 打印字符串数组
        • b. 由 id 取得图片并显示
    • 3. 线程控制 —— Scheduler (一)
      • 1) Scheduler 的 API (一)
      • 2) Scheduler 的原理 (一)
    • 4. 变换
      • 1) API
      • 2) 变换的原理:lift()
      • 3) compose: 对 Observable 总体的变换
    • 5. 线程控制:Scheduler (二)
      • 1) Scheduler 的 API (二)
      • 2) Scheduler 的原理(二)
      • 3) 延伸:doOnSubscribe()
  • RxJava 的适用场景和使用方式
    • 1. 与 Retrofit 的结合
    • 2. RxBinding
    • 3. 各种异步操作
    • 4. RxBus
  • 最后
    • 关于作者:
    • 为什么写这个?

在正文開始之前的最后。放上 GitHub 链接和引入依赖的 gradle 代码: Github:
https://github.com/ReactiveX/RxJava
https://github.com/ReactiveX/RxAndroid
引入依赖:
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'
(版本号号是文章公布时的最新稳定版)

另外,感谢 RxJava 核心成员流火枫林的技术支持和内測读者代码家、鲍永章、drakeet、马琳、有时放纵、程序亦非猿、大头鬼、XZoomEye、席德雨、TCahead、Tiiime、Ailurus、宅学长、妖孽、大大大大大臣哥、NicodeLee的帮助,以及周伯通招聘的赞助。

RxJava 究竟是什么

一个词:异步

RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观測的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。

然而,对于刚開始学习的人来说,这太难看懂了。由于它是一个『总结』,而刚開始学习的人更须要一个『引言』。

事实上, RxJava 的本质能够压缩为异步这一个词。

说到根上。它就是一个实现异步操作的库,而别的定语都是基于这之上的。

RxJava 好在哪

换句话说,『相同是做异步,为什么人们用它,而不用现成的 AsyncTask / Handler / XXX / ... ?』

一个词:简洁

异步操作非常关键的一点是程序的简洁性,由于在调度过程比較复杂的情况下。异步代码经常会既难写也难被读懂。 Android 创造的 AsyncTaskHandler ,事实上都是为了让异步代码更加简洁。RxJava 的优势也是简洁,但它的简洁的与众不同之处在于。随着程序逻辑变得越来越复杂,它依旧能够保持简洁。

假设有这样一个需求:界面上有一个自己定义的视图 imageCollectorView 。它的作用是显示多张图片,并能使用 addImage(Bitmap) 方法来随意添加显示的图片。如今须要程序将一个给出的文件夹数组 File[] folders 中每一个文件夹下的 png 图片都载入出来并显示在 imageCollectorView 中。须要注意的是,由于读取图片的这一过程较为耗时,须要放在后台运行。而图片的显示则必须在 UI 线程运行。

经常使用的实现方式有多种,我这里贴出当中一种:

new Thread() {@Overridepublic void run() {super.run();for (File folder : folders) {File[] files = folder.listFiles();for (File file : files) {if (file.getName().endsWith(".png")) {final Bitmap bitmap = getBitmapFromFile(file);getActivity().runOnUiThread(new Runnable() {@Overridepublic void run() {imageCollectorView.addImage(bitmap);}});}}}}
}.start();

而假设使用 RxJava 。实现方式是这种:

Observable.from(folders).flatMap(new Func1<File, Observable<File>>() {@Overridepublic Observable<File> call(File file) {return Observable.from(file.listFiles());}}).filter(new Func1<File, Boolean>() {@Overridepublic Boolean call(File file) {return file.getName().endsWith(".png");}}).map(new Func1<File, Bitmap>() {@Overridepublic Bitmap call(File file) {return getBitmapFromFile(file);}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Bitmap>() {@Overridepublic void call(Bitmap bitmap) {imageCollectorView.addImage(bitmap);}});

那位说话了:『你这代码明明变多了啊!

简洁个毛啊!』大兄弟你消消气。我说的是逻辑的简洁。不是单纯的代码量少(逻辑简洁才是提升读写代码速度的必杀技对不?)。

观察一下你会发现, RxJava 的这个实现,是一条从上到下的链式调用,没有不论什么嵌套,这在逻辑的简洁性上是具有优势的。当需求变得复杂时,这种优势将更加明显(试想假设还要求仅仅选取前 10 张图片,常规方式要怎么办?假设有很多其它这样那样的要求呢?再试想,在这一大堆需求实现完两个月之后须要改功能,当你翻回这里看到自己当初写下的那一片迷之缩进,你能保证自己将迅速看懂。而不是对着代码又一次捋一遍思路?)。

另外。假设你的 IDE 是 Android Studio 。事实上每次打开某个 Java 文件的时候,你会看到被自己主动 Lambda 化的预览,这将让你更加清晰地看到程序逻辑:

Observable.from(folders).flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) }).filter((Func1) (file) -> { file.getName().endsWith(".png") }).map((Func1) (file) -> { getBitmapFromFile(file) }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });

假设你习惯使用 Retrolambda ,你也能够直接把代码写成上面这种简洁的形式。

而假设你看到这里还不知道什么是 Retrolambda ,我不建议你如今就去学习它。原因有两点:1. Lambda 是把双刃剑,它让你的代码简洁的同一时候。减少了代码的可读性,因此同一时候学习 RxJava 和 Retrolambda 可能会让你忽略 RxJava 的一些技术细节。2. Retrolambda 是 Java 6/7 对 Lambda 表达式的非官方兼容方案,它的向后兼容性和稳定性是无法保障的,因此对于企业项目,使用 Retrolambda 是有风险的。所以。与非常多 RxJava 的推广者不同,我并不推荐在学习 RxJava 的同一时候一起学习 Retrolambda。

事实上,我个人尽管非常赞赏 Retrolambda,但我从来不用它。

在Flipboard 的 Android 代码中。有一段逻辑非常复杂。包括了多次内存操作、本地文件操作和网络操作,对象分分合合。线程间相互配合相互等待,一会儿排成人字,一会儿排成一字。

假设使用常规的方法来实现,肯定是要写得欲仙欲死,然而在使用 RxJava 的情况下。依旧仅仅是一条链式调用就完毕了。它非常长。但非常清晰。

所以, RxJava 好在哪?就好在简洁,好在那把什么复杂逻辑都能穿成一条线的简洁。

API 介绍和原理简析

这个我就做不到一个词说明了……由于这一节的主要内容就是一步步地说明 RxJava 究竟如何做到了异步。如何做到了简洁。

1. 概念:扩展的观察者模式

RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。

观察者模式

先简述一下观察者模式,已经熟悉的能够跳过这一段。

观察者模式面向的需求是:A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,须要在 B 变化的一瞬间做出反应。举个样例。新闻里喜闻乐见的警察抓小偷。警察须要在小偷伸手作案的时候实施抓捕。

在这个样例里,警察是观察者,小偷是被观察者,警察须要时刻盯着小偷的一举一动,才干保证不会漏过不论什么瞬间。

程序的观察者模式和这种真正的『观察』略有不同,观察者不须要时刻盯着被观察者(比如 A 不须要每过 2ms 就检查一次 B 的状态),而是採用注冊(Register)或者称为订阅(Subscribe)的方式,告诉被观察者:我须要你的某某状态,你要在它变化的时候通知我。

Android 开发中一个比較典型的样例是点击监听器 OnClickListener 。对设置 OnClickListener 来说。 View 是被观察者, OnClickListener 是观察者。二者通过 setOnClickListener() 方法达成订阅关系。订阅之后用户点击按钮的瞬间。Android Framework 就会将点击事件发送给已经注冊的 OnClickListener 。採取这样被动的观察方式,既省去了重复检索状态的资源消耗。也能够得到最高的反馈速度。

当然,这也得益于我们能够随意定制自己程序中的观察者和被观察者,而警察叔叔明显无法要求小偷『你在作案的时候务必通知我』。

OnClickListener 的模式大致例如以下图:

如图所看到的,通过 setOnClickListener() 方法,Button 持有 OnClickListener 的引用(这一过程没有在图上画出)。当用户点击时,Button 自己主动调用 OnClickListeneronClick() 方法。

另外。假设把这张图中的概念抽象出来(Button -> 被观察者、OnClickListener -> 观察者、setOnClickListener() -> 订阅,onClick() -> 事件)。就由专用的观察者模式(比如仅仅用于监听控件点击)转变成了通用的观察者模式。

例如以下图:

而 RxJava 作为一个工具库,使用的就是通用形式的观察者模式。

RxJava 的观察者模式

RxJava 有四个基本概念:Observable (可观察者。即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。

ObservableObserver 通过 subscribe() 方法实现订阅关系。从而 Observable 能够在须要的时候发出事件来通知 Observer

与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外。还定义了两个特殊的事件:onCompleted()onError()

  • onCompleted(): 事件队列完结。

    RxJava 不仅把每一个事件单独处理,还会把它们看做一个队列。RxJava 规定。当不会再有新的 onNext() 发出时,须要触发 onCompleted() 方法作为标志。

  • onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同一时候队列自己主动终止。不同意再有事件发出。
  • 在一个正确运行的事件序列中, onCompleted()onError() 有且仅仅有一个。并且是事件序列中的最后一个。须要注意的是,onCompleted()onError() 二者也是相互排斥的,即在队列中调用了当中一个。就不应该再调用还有一个。

RxJava 的观察者模式大致例如以下图:

2. 基本实现

基于以上的概念。 RxJava 的基本实现主要有三点:

1) 创建 Observer

Observer 即观察者,它决定事件触发的时候将有如何的行为。 RxJava 中的 Observer 接口的实现方式:

Observer<String> observer = new Observer<String>() {@Overridepublic void onNext(String s) {Log.d(tag, "Item: " + s);}@Overridepublic void onCompleted() {Log.d(tag, "Completed!");}@Overridepublic void onError(Throwable e) {Log.d(tag, "Error!");}
};

除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:SubscriberSubscriberObserver 接口进行了一些扩展。但他们的基本使用方式是全然一样的:

Subscriber<String> subscriber = new Subscriber<String>() {@Overridepublic void onNext(String s) {Log.d(tag, "Item: " + s);}@Overridepublic void onCompleted() {Log.d(tag, "Completed!");}@Overridepublic void onError(Throwable e) {Log.d(tag, "Error!");}
};

不仅基本使用方式一样,实质上,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。

所以假设你仅仅想使用基本功能,选择 ObserverSubscriber 是全然一样的。它们的差别对于使用者来说主要有两点:

  1. onStart(): 这是 Subscriber 添加的方法。

    它会在 subscribe 刚開始,而事件还未发送之前被调用,能够用于做一些准备工作,比如数据的清零或重置。

    这是一个可选方法。默认情况下它的实现为空。

    须要注意的是,假设对准备工作的线程有要求(比如弹出一个显示运行进度的对话框。这必须在主线程运行), onStart() 就不适用了。由于它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,能够使用 doOnSubscribe() 方法,详细能够在后面的文中看到。

  2. unsubscribe(): 这是 Subscriber 所实现的还有一个接口 Subscription 的方法,用于取消订阅。

    在这种方法被调用后。Subscriber 将不再接收事件。一般在这种方法调用前。能够使用 isUnsubscribed() 先推断一下状态。

    unsubscribe() 这种方法非常重要。由于在 subscribe() 之后。 Observable 会持有 Subscriber 的引用,这个引用假设不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(比如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。

2) 创建 Observable

Observable 即被观察者,它决定什么时候触发事件以及触发如何的事件。 RxJava 使用 create() 方法来创建一个 Observable 。并为它定义事件触发规则:

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("Hello");subscriber.onNext("Hi");subscriber.onNext("Aloha");subscriber.onCompleted();}
});

能够看到。这里传入了一个 OnSubscribe 对象作为參数。

OnSubscribe 会被存储在返回的 Observable 对象中,它的作用相当于一个计划表,当 Observable 被订阅的时候。OnSubscribecall() 方法会自己主动被调用,事件序列就会按照设定依次触发(对于上面的代码,就是观察者Subscriber 将会被调用三次 onNext() 和一次 onCompleted())。

这样,由被观察者调用了观察者的回调方法。就实现了由被观察者向观察者的事件传递,即观察者模式。

这个样例非常easy:事件的内容是字符串。而不是一些复杂的对象;事件的内容是已经定好了的,而不像有的观察者模式一样是待确定的(比如网络请求的结果在请求返回之前是未知的);全部事件在一瞬间被全部发送出去,而不是夹杂一些确定或不确定的时间间隔或者经过某种触发器来触发的。总之。这个样例看起来毫无实用价值。

但这是为了便于说明,实质上仅仅要你想,各种各样的事件发送规则你都能够自己来写。至于详细怎么做。后面都会讲到。但如今不行。

仅仅有把基础原理先说明确了,上层的运用才干更easy说清晰。

create() 方法是 RxJava 最主要的创造事件序列的方法。

基于这种方法。 RxJava 还提供了一些方法用来快捷创建事件队列,比如:

  • just(T...): 将传入的參数依次发送出来。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
  • from(T[]) / from(Iterable<? extends T>) : 将传入的数组或 Iterable 拆分成详细对象后,依次发送出来。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();

上面 just(T...) 的样例和 from(T[]) 的样例,都和之前的 create(OnSubscribe) 的样例是等价的。

3) Subscribe (订阅)

创建了 ObservableObserver 之后,再用 subscribe() 方法将它们联结起来,整条链子就能够工作了。代码形式非常easy:

observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);

有人可能会注意到。 subscribe() 这种方法有点怪:它看起来是『observalbe 订阅了 observer / subscriber』而不是『observer / subscriber 订阅了 observalbe』。这看起来就像『杂志订阅了读者』一样颠倒了对象关系。

这让人读起来有点别扭。只是假设把 API 设计成 observer.subscribe(observable) / subscriber.subscribe(observable) ,尽管更加符合思维逻辑,但对流式 API 的设计就造成影响了,比較起来明显是得不偿失的。

Observable.subscribe(Subscriber) 的内部实现是这种(仅核心代码):

// 注意:这不是 subscribe() 的源代码。而是将源代码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。

// 假设须要看源代码,能够去 RxJava 的 GitHub 仓库下载。 public Subscription subscribe(Subscriber subscriber) { subscriber.onStart(); onSubscribe.call(subscriber); return subscriber; }

能够看到,subscriber() 做了3件事:

  1. 调用 Subscriber.onStart() 。这种方法在前面已经介绍过,是一个可选的准备方法。
  2. 调用 Observable 中的 OnSubscribe.call(Subscriber)

    在这里。事件发送的逻辑開始运行。从这也能够看出。在 RxJava 中, Observable 并非在创建的时候就立即開始发送事件,而是在它被订阅的时候,即当 subscribe() 方法运行的时候。

  3. 将传入的 Subscriber 作为 Subscription 返回。这是为了方便 unsubscribe().

整个过程中对象间的关系例如以下图:

或者能够看动图:

除了 subscribe(Observer)subscribe(Subscriber)subscribe() 还支持不完整定义的回调。RxJava 会自己主动依据定义创建出 Subscriber 。形式例如以下:

Action1<String> onNextAction = new Action1<String>() {// onNext()@Overridepublic void call(String s) {Log.d(tag, s);}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {// onError()@Overridepublic void call(Throwable throwable) {// Error handling}
};
Action0 onCompletedAction = new Action0() {// onCompleted()@Overridepublic void call() {Log.d(tag, "completed");}
};// 自己主动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自己主动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自己主动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

简单解释一下这段代码中出现的 Action1Action0Action0 是 RxJava 的一个接口,它仅仅有一个方法 call(),这种方法是无參无返回值的。由于 onCompleted() 方法也是无參无返回值的,因此 Action0 能够被当成一个包装对象。将 onCompleted() 的内容打包起来将自己作为一个參数传入 subscribe() 以实现不完整定义的回调。这样事实上也能够看做将 onCompleted() 方法作为參数传进了 subscribe(),相当于其它某些语言中的『闭包』。

Action1 也是一个接口,它相同仅仅有一个方法 call(T param),这种方法也无返回值,但有一个參数;与 Action0 同理。由于 onNext(T obj)onError(Throwable error) 也是单參数无返回值的。因此 Action1 能够将 onNext(obj)onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。事实上,尽管 Action0Action1 在 API 中使用最广泛。但 RxJava 是提供了多个 ActionX 形式的接口 (比如 Action2, Action3) 的,它们能够被用以包装不同的无返回值的方法。

注:正如前面所提到的。ObserverSubscriber 具有相同的角色,并且 Observersubscribe() 过程中最终会被转换成 Subscriber 对象。因此,从这里開始,后面的描写叙述我将用 Subscriber 来取代 Observer 。这样更加严谨。

4) 场景演示样例

以下举两个样例:

为了把原理用更清晰的方式表述出来。本文中挑选的都是功能尽可能简单的样例。以至于有些演示样例代码看起来会有『画蛇添足』『明明不用 RxJava 能够更简便地解决这个问题』的感觉。当你看到这种情况,不要认为是由于 RxJava 太啰嗦,而是由于在过早的时候举出真实场景的样例并不利于原理的解析,因此我刻意挑选了简单的情景。

a. 打印字符串数组

将字符串数组 names 中的全部字符串依次打印出来:

String[] names = ...;
Observable.from(names).subscribe(new Action1<String>() {@Overridepublic void call(String name) {Log.d(tag, name);}});
b. 由 id 取得图片并显示

由指定的一个 drawable 文件 id drawableRes 取得图片,并显示在 ImageView 中。并在出现异常的时候打印 Toast 报错:

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {@Overridepublic void call(Subscriber<?

super Drawable> subscriber) { Drawable drawable = getTheme().getDrawable(drawableRes)); subscriber.onNext(drawable); subscriber.onCompleted(); } }).subscribe(new Observer<Drawable>() { @Override public void onNext(Drawable drawable) { imageView.setImageDrawable(drawable); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show(); } });

正如上面两个样例这样。创建出 ObservableSubscriber ,再用 subscribe() 将它们串起来,一次 RxJava 的基本使用就完毕了。非常easy。

然而,

在 RxJava 的默认规则中。事件的发出和消费都是在同一个线程的。也就是说。假设仅仅用上面的方法,实现出来的仅仅是一个同步的观察者模式。观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的。而要实现异步。则须要用到 RxJava 的还有一个概念: Scheduler

3. 线程控制 —— Scheduler (一)

在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe()。就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。

假设须要切换线程,就须要用到 Scheduler (调度器)。

1) Scheduler 的 API (一)

在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。

RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler
  • Schedulers.newThread(): 总是启用新线程。并在新线程运行操作。
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler

    行为模式和 newThread() 几乎相同。差别在于 io() 的内部实现是是用一个无数量上限的线程池,能够重用空暇的线程,因此多数情况下 io()newThread() 更有效率。

    不要把计算工作放在 io() 中,能够避免创建不必要的线程。

  • Schedulers.computation(): 计算所使用的 Scheduler

    这个计算指的是 CPU 密集型计算。即不会被 I/O 等操作限制性能的操作,比如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。

    不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

  • 另外。 Android 还有一个专用的 AndroidSchedulers.mainThread()。它指定的操作将在 Android 主线程运行。

有了这几个 Scheduler ,就能够使用 subscribeOn()observeOn() 两个方法来对线程进行控制了。* subscribeOn(): 指定 subscribe() 所发生的线程。即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。

* observeOn(): 指定 Subscriber 所运行在的线程。

或者叫做事件消费的线程。

文字叙述总归难理解,上代码:

Observable.just(1, 2, 3, 4).subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程.subscribe(new Action1<Integer>() {@Overridepublic void call(Integer number) {Log.d(tag, "number:" + number);}});

上面这段代码中。由于 subscribeOn(Schedulers.io()) 的指定,被创建的事件的内容 1234 将会在 IO 线程发出。而由于 observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber 数字的打印将发生在主线程 。

事实上,这种在 subscribe() 之前写上两句 subscribeOn(Scheduler.io())observeOn(AndroidSchedulers.mainThread()) 的使用方式非经常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。

而前面提到的由图片 id 取得图片并显示的样例。假设也加上这两句:

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {@Overridepublic void call(Subscriber<?

super Drawable> subscriber) { Drawable drawable = getTheme().getDrawable(drawableRes)); subscriber.onNext(drawable); subscriber.onCompleted(); } }) .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程 .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程 .subscribe(new Observer<Drawable>() { @Override public void onNext(Drawable drawable) { imageView.setImageDrawable(drawable); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show(); } });

那么,载入图片将会发生在 IO 线程,而设置图片则被设定在了主线程。这就意味着,即使载入图片耗费了几十甚至几百毫秒的时间,也不会造成丝毫界面的卡顿。

2) Scheduler 的原理 (一)

RxJava 的 Scheduler API 非常方便,也非常奇妙(加了一句话就把线程切换了,怎么做到的?并且 subscribe() 不是最外层直接调用的方法吗,它居然也能被指定线程?)。然而 Scheduler 的原理须要放在后面讲。由于它的原理是以下一节《变换》的原理作为基础的。

好吧这一节事实上我屁也没说,仅仅是为了让你安心,让你知道我不是忘了讲原理,而是把它放在了更合适的地方。

4. 变换

最终要到牛逼的地方了,无论你激动不激动,反正我是激动了。

RxJava 提供了对事件序列进行变换的支持。这是它的核心功能之中的一个,也是大多数人说『RxJava 真是太好用了』的最大原因。

所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。概念说着总是模糊难懂的,来看 API。

1) API

首先看一个 map() 的样例:

Observable.just("images/logo.png") // 输入类型 String.map(new Func1<String, Bitmap>() {@Overridepublic Bitmap call(String filePath) { // 參数类型 Stringreturn getBitmapFromPath(filePath); // 返回类型 Bitmap}}).subscribe(new Action1<Bitmap>() {@Overridepublic void call(Bitmap bitmap) { // 參数类型 BitmapshowBitmap(bitmap);}});

这里出现了一个叫做 Func1 的类。它和 Action1 非常类似,也是 RxJava 的一个接口,用于包装含有一个參数的方法。 Func1Action 的差别在于, Func1 包装的是有返回值的方法。

另外,和 ActionX 一样。 FuncX 也有多个。用于不同參数个数的方法。FuncXActionX 的差别在 FuncX 包装的是有返回值的方法。

能够看到,map() 方法将參数中的 String 对象转换成一个 Bitmap 对象后返回,而在经过 map() 方法后,事件的參数类型也由 String 转为了 Bitmap

这种直接变换对象并返回的。是最常见的也最easy理解的变换。只是 RxJava 的变换远不止这样。它不仅能够针对事件对象,还能够针对整个事件队列,这使得 RxJava 变得非常灵活。我列举几个经常使用的变换:

  • map(): 事件对象的直接变换,详细功能上面已经介绍过。它是 RxJava 最经常使用的变换。

    map() 的示意图:

  • flatMap(): 这是一个非常实用但非常难理解的变换,因此我决定花多些篇幅来介绍它。

    首先假设这么一种需求:假设有一个数据结构『学生』。如今须要打印出一组学生的名字。实现方式非常easy:

Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {@Overridepublic void onNext(String name) {Log.d(tag, name);}...
};
Observable.from(students).map(new Func1<Student, String>() {@Overridepublic String call(Student student) {return student.getName();}}).subscribe(subscriber);

非常easy。那么再假设:假设要打印出每一个学生所须要修的全部课程的名称呢?(需求的差别在于。每一个学生仅仅有一个名字,但却有多个课程。)首先能够这样实现:

Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {@Overridepublic void onNext(Student student) {List<Course> courses = student.getCourses();for (int i = 0; i < courses.size(); i++) {Course course = courses.get(i);Log.d(tag, course.getName());}}...
};
Observable.from(students).subscribe(subscriber);

依旧非常easy。

那么假设我不想在 Subscriber 中使用 for 循环。而是希望 Subscriber 中直接传入单个的 Course 对象呢(这对于代码复用非常重要)?用 map() 显然是不行的,由于 map() 是一对一的转化,而我如今的要求是一对多的转化。那怎么才干把一个 Student 转化成多个 Course 呢?

这个时候,就须要用 flatMap() 了:

Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {@Overridepublic void onNext(Course course) {Log.d(tag, course.getName());}...
};
Observable.from(students).flatMap(new Func1<Student, Observable<Course>>() {@Overridepublic Observable<Course> call(Student student) {return Observable.from(student.getCourses());}}).subscribe(subscriber);

从上面的代码能够看出。 flatMap()map() 有一个相同点:它也是把传入的參数转化之后返回还有一个对象。但须要注意,和 map() 不同的是。 flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并非被直接发送到了 Subscriber 的回调方法中。 flatMap() 的原理是这种:1. 使用传入的事件对象创建一个 Observable 对象;2. 并不发送这个 Observable, 而是将它激活,于是它開始发送事件;3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。

flatMap() 示意图:

扩展:由于能够在嵌套的 Observable 中加入异步代码。 flatMap() 也经常使用于嵌套的异步操作,比如嵌套的网络请求。演示样例代码(Retrofit + RxJava):

networkClient.token() // 返回 Observable<String>,在订阅时请求 token。并在响应后发送 token.flatMap(new Func1<String, Observable<Messages>>() {@Overridepublic Observable<Messages> call(String token) {// 返回 Observable<Messages>,在订阅时请求消息列表,并在响应后发送请求到的消息列表return networkClient.messages();}}).subscribe(new Action1<Messages>() {@Overridepublic void call(Messages messages) {// 处理显示消息列表showMessages(messages);}});

传统的嵌套请求须要使用嵌套的 Callback 来实现。而通过 flatMap() ,能够把嵌套的请求写在一条链中,从而保持程序逻辑的清晰。

  • throttleFirst(): 在每次事件触发后的一定时间间隔内丢弃新的事件。经常使用作去抖动过滤。比如按钮的点击监听器:RxView.clickEvents(button) // RxBinding 代码,后面的文章有解释.throttleFirst(500, TimeUnit.MILLISECONDS) // 设置防抖间隔为 500ms.subscribe(subscriber);妈妈再也不怕我的用户手抖点开两个重复的界面啦。

此外。 RxJava 还提供非常多便捷的方法来实现事件序列的变换。这里就不一一举例了。

2) 变换的原理:lift()

这些变换尽管功能各有不同,但实质上都是针对事件序列的处理和再发送

而在 RxJava 的内部。它们是基于同一个基础的变换方法: lift(Operator)

首先看一下 lift() 的内部实现(仅核心代码):

// 注意:这不是 lift() 的源代码。而是将源代码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。

// 假设须要看源代码,能够去 RxJava 的 GitHub 仓库下载。 public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) { return Observable.create(new OnSubscribe<R>() { @Override public void call(Subscriber subscriber) { Subscriber newSubscriber = operator.call(subscriber); newSubscriber.onStart(); onSubscribe.call(newSubscriber); } }); }

这段代码非常有意思:它生成了一个新的 Observable 并返回,并且创建新 Observable 所用的參数 OnSubscribe 的回调方法 call() 中的实现居然看起来和前面讲过的 Observable.subscribe() 一样。然而它们并不一样哟~不一样的地方关键就在于第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的对象不同(高能预警:接下来的几句话可能会导致身体的严重不适)——

  • subscribe() 中这句话的 onSubscribe 指的是 Observable 中的 onSubscribe 对象,这个没有问题。但是 lift() 之后的情况就复杂了点。
  • 当含有 lift() 时:
    1.lift() 创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了;
    2.而相同地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe
    3.当用户调用经过 lift() 后的 Observablesubscribe() 的时候,使用的是 lift() 所返回的新的 Observable ,于是它所触发的 onSubscribe.call(subscriber)。也是用的新 Observable 中的新 OnSubscribe。即在 lift() 中生成的那个 OnSubscribe
    4.而这个新 OnSubscribecall() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在这个 call() 方法里。新 OnSubscribe 利用 operator.call(subscriber) 生成了一个新的 SubscriberOperator 就是在这里,通过自己的 call() 方法将新 Subscriber 和原始 Subscriber 进行关联。并插入自己的『变换』代码以实现变换)。然后利用这个新 Subscriber 向原始 Observable 进行订阅。

    这样就实现了 lift() 过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。

精简掉细节的话,也能够这么说:在 Observable 运行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样。负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber

假设你更喜欢具象思维,能够看图:

或者能够看动图:

两次和多次的 lift() 同理,例如以下图:

举一个详细的 Operator 的实现。以下这是一个将事件中的 Integer 对象转换成 String 的样例,仅供參考:

observable.lift(new Observable.Operator<String, Integer>() {@Overridepublic Subscriber<?

super Integer> call(final Subscriber<? super String> subscriber) { // 将事件序列中的 Integer 对象转换为 String 对象 return new Subscriber<Integer>() { @Override public void onNext(Integer integer) { subscriber.onNext("" + integer); } @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } }; } });

讲述 lift() 的原理仅仅是为了让你更好地了解 RxJava 。从而能够更好地使用它。然而无论你是否理解了 lift() 的原理,RxJava 都不建议开发人员自己定义 Operator 来直接使用 lift(),而是建议尽量使用已有的 lift() 包装方法(如 map() flatMap() 等)进行组合来实现需求,由于直接使用 lift() 非常easy发生一些难以发现的错误。

3) compose: 对 Observable 总体的变换

除了 lift() 之外, Observable 还有一个变换方法叫做 compose(Transformer)。它和 lift() 的差别在于。 lift() 是针对事件项和事件序列的,而 compose() 是针对 Observable 自身进行变换。

举个样例,假设在程序中有多个 Observable ,并且他们都须要应用一组相同的 lift() 变换。你能够这么写:

observable1.lift1().lift2().lift3().lift4().subscribe(subscriber1);
observable2.lift1().lift2().lift3().lift4().subscribe(subscriber2);
observable3.lift1().lift2().lift3().lift4().subscribe(subscriber3);
observable4.lift1().lift2().lift3().lift4().subscribe(subscriber1);

你认为这样太不软件project了,于是你改成了这样:

private Observable liftAll(Observable observable) {return observable.lift1().lift2().lift3().lift4();
}
...
liftAll(observable1).subscribe(subscriber1);
liftAll(observable2).subscribe(subscriber2);
liftAll(observable3).subscribe(subscriber3);
liftAll(observable4).subscribe(subscriber4);

可读性、可维护性都提高了。但是 Observable 被一个方法包起来,这种方式对于 Observale 的灵活性似乎还是增添了那么点限制。

怎么办?这个时候。就应该用 compose() 来攻克了:

public class LiftAllTransformer implements Observable.Transformer<Integer, String> {@Overridepublic Observable<String> call(Observable<Integer> observable) {return observable.lift1().lift2().lift3().lift4();}
}
...
Transformer liftAll = new LiftAllTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
observable4.compose(liftAll).subscribe(subscriber4);

像上面这样,使用 compose() 方法,Observable 能够利用传入的 Transformer 对象的 call 方法直接对自身进行处理,也就不必被包在方法的里面了。

compose() 的原理比較简单,不附图喽。

5. 线程控制:Scheduler (二)

除了灵活的变换,RxJava 还有一个牛逼的地方,就是线程的自由控制。

1) Scheduler 的 API (二)

前面讲到了,能够利用 subscribeOn() 结合 observeOn() 来实现线程控制。让事件的产生和消费发生在不同的线程。

但是在了解了 map() flatMap() 等变换方法后。有些好事的(事实上就是当初刚接触 RxJava 时的我)就问了:能不能多切换几次线程?

答案是:能。

由于 observeOn() 指定的是 Subscriber 的线程。而这个 Subscriber 并非(严格说应该为『不一定是』。但这里最好还是理解为『不是』)subscribe() 參数中的 Subscriber 。而是 observeOn() 运行时的当前 Observable 所相应的 Subscriber 。即它的直接下级 Subscriber 。换句话说。observeOn() 指定的是它之后的操作所在的线程。

因此假设有多次切换线程的需求,仅仅要在每一个想要切换线程的位置调用一次 observeOn() 就可以。上代码:

Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定.subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()).map(mapOperator) // 新线程。由 observeOn() 指定.observeOn(Schedulers.io()).map(mapOperator2) // IO 线程,由 observeOn() 指定.observeOn(AndroidSchedulers.mainThread) .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定

如上,通过 observeOn() 的多次调用,程序实现了线程的多次切换。

只是,不同于 observeOn()subscribeOn() 的位置放在哪里都能够,但它是仅仅能调用一次的。

又有好事的(事实上还是当初的我)问了:假设我非要调用多次 subscribeOn() 呢?会有什么效果?

这个问题先放着,我们还是从 RxJava 线程控制的原理说起吧。

2) Scheduler 的原理(二)

事实上。 subscribeOn()observeOn() 的内部实现,也是用的 lift()

详细看图(不同颜色的箭头表示不同的线程):

subscribeOn() 原理图:

observeOn() 原理图:

从图中能够看出,subscribeOn()observeOn() 都做了线程切换的工作(图中的 "schedule..." 部位)。

不同的是。 subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有開始发送。因此 subscribeOn() 的线程控制能够从事件发出的开端就造成影响;而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程。

最后,我用一张图来解释当多个 subscribeOn()observeOn() 混合使用时。线程调度是怎么发生的(由于图中对象较多。相对于上面的图对结构做了一些简化调整):

图中共同拥有 5 处含有对事件的操作。由图中能够看出。①和②两处受第一个 subscribeOn() 影响。运行在红色线程;③和④处受第一个 observeOn() 的影响。运行在绿色线程;⑤处受第二个 onserveOn() 影响,运行在紫色线程。而第二个 subscribeOn() ,由于在通知过程中线程就被第一个 subscribeOn() 截断,因此对整个流程并没有不论什么影响。这里也就回答了前面的问题:当使用了多个 subscribeOn() 的时候,仅仅有第一个 subscribeOn() 起作用。

3) 延伸:doOnSubscribe()

然而,尽管超过一个的 subscribeOn() 对事件处理的流程没有影响。但在流程之前却是能够利用的。

在前面讲 Subscriber 的时候。提到过 SubscriberonStart() 能够用作流程開始前的初始化。然而 onStart() 由于在 subscribe() 发生时就被调用了。因此不能指定线程。而是仅仅能运行在 subscribe() 被调用时的线程。

这就导致假设 onStart() 中含有对线程有要求的代码(比如在界面上显示一个 ProgressBar,这必须在主线程运行)。将会有线程非法的风险。由于有时你无法预測 subscribe() 将会在什么线程运行。

而与 Subscriber.onStart() 相相应的。有一个方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 相同是在 subscribe() 调用后并且在事件发送前运行,但差别在于它能够指定线程。

默认情况下。 doOnSubscribe() 运行在 subscribe() 发生的线程;而假设在 doOnSubscribe() 之后有 subscribeOn() 的话,它将运行在离它近期的 subscribeOn() 所指定的线程。

演示样例代码:

Observable.create(onSubscribe).subscribeOn(Schedulers.io()).doOnSubscribe(new Action0() {@Overridepublic void call() {progressBar.setVisibility(View.VISIBLE); // 须要在主线程运行}}).subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程.observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);

如上,在 doOnSubscribe()的后面跟一个 subscribeOn() 。就能指定准备工作的线程了。

RxJava 的适用场景和使用方式

1. 与 Retrofit 的结合

Retrofit 是 Square 的一个著名的网络请求库。

没实用过 Retrofit 的能够选择跳过这一小节也没关系,我举的每种场景都仅仅是个样例。并且样例之间并无前后关联,仅仅是个抛砖引玉的作用,所以你跳过这里看别的场景也能够的。

Retrofit 除了提供了传统的 Callback 形式的 API,还有 RxJava 版本号的 Observable 形式 API。以下我用对照的方式来介绍 Retrofit 的 RxJava 版 API 和传统版本号的差别。

以获取一个 User 对象的接口作为样例。使用Retrofit 的传统 API,你能够用这种方式来定义请求:

@GET("/user")
public void getUser(@Query("userId") String userId, Callback<User> callback);

在程序的构建过程中。 Retrofit 会把自己主动把方法实现并生成代码。然后开发人员就能够利用以下的方法来获取特定用户并处理响应:

getUser(userId, new Callback<User>() {@Overridepublic void success(User user) {userView.setUser(user);}@Overridepublic void failure(RetrofitError error) {// Error handling...}
};

而使用 RxJava 形式的 API。定义相同的请求是这种:

@GET("/user")
public Observable<User> getUser(@Query("userId") String userId);

使用的时候是这种:

getUser(userId).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<User>() {@Overridepublic void onNext(User user) {userView.setUser(user);}@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable error) {// Error handling...}});

看到差别了吗?

当 RxJava 形式的时候,Retrofit 把请求封装进 Observable ,在请求结束后调用 onNext() 或在请求失败后调用 onError()

对照来看, Callback 形式和 Observable 形式长得不太一样,但本质都几乎相同。并且在细节上 Observable 形式似乎还比 Callback 形式要差点。那 Retrofit 为什么还要提供 RxJava 的支持呢?

由于它好用啊!

从这个样例看不出来是由于这仅仅是最简单的情况。

而一旦情景复杂起来, Callback 形式立即就会開始让人头疼。比方:

假设这么一种情况:你的程序取到的 User 并不应该直接显示,而是须要先与数据库中的数据进行比对和修正后再显示。

使用 Callback 方式大概能够这么写:

getUser(userId, new Callback<User>() {@Overridepublic void success(User user) {processUser(user); // 尝试修正 User 数据userView.setUser(user);}@Overridepublic void failure(RetrofitError error) {// Error handling...}
};

有问题吗?

非常简便。但不要这样做。为什么?由于这样做会影响性能。数据库的操作非常重。一次读写操作花费 10~20ms 是非经常见的。这种耗时非常easy造成界面的卡顿。

所以通常情况下,假设能够的话一定要避免在主线程中处理数据库。

所以为了提升性能。这段代码能够优化一下:

getUser(userId, new Callback<User>() {@Overridepublic void success(User user) {new Thread() {@Overridepublic void run() {processUser(user); // 尝试修正 User 数据runOnUiThread(new Runnable() { // 切回 UI 线程@Overridepublic void run() {userView.setUser(user);}});}).start();}@Overridepublic void failure(RetrofitError error) {// Error handling...}
};

性能问题解决,但……这代码实在是太乱了,迷之缩进啊!杂乱的代码往往不仅仅是美观问题,由于代码越乱往往就越难读懂,而假设项目中充斥着杂乱的代码,无疑会减少代码的可读性,造成团队开发效率的减少和出错率的升高。

这时候,假设用 RxJava 的形式,就好办多了。 RxJava 形式的代码是这种:

getUser(userId).doOnNext(new Action1<User>() {@Overridepublic void call(User user) {processUser(user);}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<User>() {@Overridepublic void onNext(User user) {userView.setUser(user);}@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable error) {// Error handling...}});

后台代码和前台代码全都写在一条链中,明显清晰了非常多。

再举一个样例:假设 /user 接口并不能直接訪问,而须要填入一个在线获取的 token ,代码应该怎么写?

Callback 方式。能够使用嵌套的 Callback

@GET("/token")
public void getToken(Callback<String> callback);@GET("/user")
public void getUser(@Query("token") String token, @Query("userId") String userId, Callback<User> callback);...getToken(new Callback<String>() {@Overridepublic void success(String token) {getUser(token, userId, new Callback<User>() {@Overridepublic void success(User user) {userView.setUser(user);}@Overridepublic void failure(RetrofitError error) {// Error handling...}};}@Overridepublic void failure(RetrofitError error) {// Error handling...}
});

倒是没有什么性能问题,但是迷之缩进毁一生,你懂我也懂。做过大项目的人应该更懂。

而使用 RxJava 的话。代码是这种:

@GET("/token")
public Observable<String> getToken();@GET("/user")
public Observable<User> getUser(@Query("token") String token, @Query("userId") String userId);...getToken().flatMap(new Func1<String, Observable<User>>() {@Overridepublic Observable<User> onNext(String token) {return getUser(token, userId);}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<User>() {@Overridepublic void onNext(User user) {userView.setUser(user);}@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable error) {// Error handling...}});

用一个 flatMap() 就搞定了逻辑,依旧是一条链。

看着就非常爽。是吧?

2016/03/31 更新,加上我写的一个 Sample 项目:
rengwuxian RxJava Samples

好。Retrofit 部分就到这里。

2. RxBinding

RxBinding 是 Jake Wharton 的一个开源库,它提供了一套在 Android 平台上的基于 RxJava 的 Binding API。

所谓 Binding,就是类似设置 OnClickListener 、设置 TextWatcher 这种注冊绑定对象的 API。

举个设置点击监听的样例。

使用 RxBinding 。能够把事件监听用这种方法来设置:

Button button = ...;
RxView.clickEvents(button) // 以 Observable 形式来反馈点击事件.subscribe(new Action1<ViewClickEvent>() {@Overridepublic void call(ViewClickEvent event) {// Click handling}});

看起来除了形式变了没什么差别,实质上也是这样。甚至假设你看一下它的源代码,你会发现它连实现都没什么惊喜:它的内部是直接用一个包裹着的 setOnClickListener() 来实现的。然而。仅仅这一个形式的改变。却恰好就是 RxBinding 的目的:扩展性。通过 RxBinding 把点击监听转换成 Observable 之后,就有了对它进行扩展的可能。

扩展的方式有非常多,依据需求而定。

一个样例是前面提到过的 throttleFirst() ,用于去抖动,也就是消除手抖导致的高速连环点击:

RxView.clickEvents(button).throttleFirst(500, TimeUnit.MILLISECONDS).subscribe(clickAction);

假设想对 RxBinding 有很多其它了解。能够去它的 GitHub 项目 以下看看。

3. 各种异步操作

前面举的 RetrofitRxBinding 的样例,是两个能够提供现成的 Observable 的库。而假设你有某些异步操作无法用这些库来自己主动生成 Observable。也全然能够自己写。比如数据库的读写、大图片的载入、文件压缩/解压等各种须要放在后台工作的耗时操作。都能够用 RxJava 来实现。有了之前几章的样例。这里应该不用再举例了。

4. RxBus

RxBus 名字看起来像一个库,但它并非一个库,而是一种模式,它的思想是使用 RxJava 来实现了 EventBus ,而让你不再须要使用 Otto 或者 GreenRobot 的 EventBus。至于什么是 RxBus,能够看这篇文章。顺便说一句,Flipboard 已经用 RxBus 替换掉了 Otto ,眼下为止没有不良反应。

最后

对于 Android 开发人员来说。 RxJava 是一个非常难上手的库,由于它对于 Android 开发人员来说有太多陌生的概念了。

但是它真的非常牛逼。因此,我写了这篇《给 Android 开发人员的 RxJava 详细解释》。希望能给始终搞不明确什么是 RxJava 的人一些入门的指引,或者能让正在使用 RxJava 但仍然心存疑惑的人看到一些更深入的解析。无论如何。仅仅要能给各位同为 Android project师的你们提供一些帮助。这篇文章的目的就达到了。

再次感谢对这篇文章的产出提供支持的各位:
技术支持:流火枫林
内測读者:代码家、鲍永章、drakeet、马琳、有时放纵、程序亦非猿、大头鬼、XZoomEye、席德雨、TCahead、Tiiime、Ailurus、宅学长、妖孽、大大大大大臣哥、NicodeLee
赞助方:周伯通招聘是他们让我的文章能够以不那么丑陋的样子出如今大家面前。

关于作者

朱凯(扔物线),Flipboard 北京 Android project师。

微博:扔物线

GitHub:rengwuxian

为什么写这个?

与两三年前的境况不同,中国如今已经不缺0基础 Android project师。但中级和高级project师严重供不应求。

因此我决定从今天開始不定期地公布我的技术分享。仅仅希望能够和大家共同提升,通过我们的成长来解决一点点国内互联网公司人才稀缺的困境。也提升各位技术党的收入。

所以,不仅要写这篇,我还会写很多其它。

至于内容的定位,我计划仅仅定位真正的干货,一些边边角角的小技巧和炫酷的黑科技应该都不会写,总之希望每篇文章都能帮读者提升真正的实力。

转载于:https://www.cnblogs.com/llguanli/p/8327435.html

给 Android 开发人员的 RxJava 具体解释相关推荐

  1. Java vs Kotlin,Android开发人员应该选择哪种语言?

    自 Google 于 2017 年宣布 Kotlin 成为 Google IO 的 Android 开发官方语言以来,想要成为Android开发人员的程序员正陷入两难境地. 在讨论这个问题前,我首先要 ...

  2. mega_[MEGA DEAL]完整的Android开发人员课程–构建14个应用程序(91%折扣)

    mega 顶级教练Rob Percival将带您经过31个小时的培训,从编码新手到忍者 嘿,怪胎, 本周,在我们的JCG Deals商店中,我们提供了一个极端的报价. 我们提供的"完整的An ...

  3. Android开发人员的10大抱怨

    2019独角兽企业重金招聘Python工程师标准>>> Android受到追捧,但也有人抱怨,正所谓"萝卜白菜,各有所爱",本文就Android开发人员常挂在嘴边 ...

  4. Android开发人员不得不收集的工具类集合

    一.Android开发人员不得不收集的工具类集合  ----收藏 https://github.com/RobertCow/RxTools 二.android 开发之listview工具集合框架 ht ...

  5. Android开发人员不得不收集的代码(持续更新中)(http://www.jianshu.com/p/72494773aace,原链接)

    Android开发人员不得不收集的代码(持续更新中) Blankj 关注 2016.07.31 04:22* 字数 370 阅读 102644评论 479喜欢 3033赞赏 14 utilcode D ...

  6. (转)解决android开发人员,手机app图标显示不正确问题

    (转)解决android开发人员,手机app图标显示不正确问题 参考文章: (1)(转)解决android开发人员,手机app图标显示不正确问题 (2)https://www.cnblogs.com/ ...

  7. android开发人员要求_如何成为一名Android开发人员

    android开发人员要求 An Android Developer is somebody who creates our lives simple by creating a correspond ...

  8. Android 开发人员不得不收集的工具类集合

    RxTools 项目地址:vondear/RxTools  简介:Android 开发人员不得不收集的工具类集合 | 支付宝支付 | 微信支付(统一下单) | 微信分享 | Zip4j 压缩(支持分卷 ...

  9. [干货]Android开发人员不得不收集的代码(不断更新)

    代码地址链接:[干货]Android开发人员不得不收集的代码 为方便查找,已进行大致归类,其目录如下所示: 尺寸相关→SizeUtils.java dp与px转换 dp2px.px2dp sp与px转 ...

最新文章

  1. 重磅!公开课|四旋翼飞行器:算法与实战
  2. Flex+fluorineFx +ASP.NET开发的IIS部署
  3. 漫谈ERP实施服务的三种境界
  4. CString初始化时提示字符串太大
  5. 阿里为什么禁用Executors创建线程池?
  6. Java快速开发框架LML简介
  7. 浙江工业大学计算机应用基础,浙江工业大学期终考试命题稿-浙江大学人文学院.doc...
  8. 【kubernetes系列之安装Dashboard】
  9. Jquery的jqzoom插件的使用(图片放大镜)
  10. enable pen pressure in ps
  11. No boot device avalible,Press any key to reboot the system
  12. python正则匹配_Python中的正则表达式(re)
  13. 抽象代数 Abstract Algebra 学习笔记
  14. 电动自动吞吐式IC卡RFID读写器EMV模块HX150进卡命令
  15. 高级计算机器,高级计算器最新版
  16. 自学网站大全(值得收藏)
  17. BugKu (杂项篇MISC)—隐写
  18. python中函数的定义通常会从关键字_4.7. 深入 Python 函数定义
  19. 【matlab】matlab相关系数计算公式(Pearson和Spearman,以及Kendall Rank)
  20. 物流matlab,物流配送线路优化Matlab算法研究

热门文章

  1. 猴子选大王c语言课程设计,【C/C++】猴子选大王
  2. android 两足机器人,基于Android的双足机器人语音控制系统研究
  3. 开启mybatis日志_Mybatis源码分析之Cache二级缓存原理 (五)
  4. python gui测试框架_八款常用的 Python GUI 开发框架推荐
  5. ae抠像插件_AE抠像背景残留去除
  6. 验证MYSQL安装成功
  7. 判断回文(Python)
  8. 使用sys无法通过sqlplus或者sqldeveloper连接数据库
  9. ElasticSearch Java Api(二) -检索索引库
  10. 软件设计原则(三)里氏替换原则 -Liskov Substitution Principle