前言

使用了RxJava有一段时间了,深深感受到了其“牛逼”之处。下面,就从RxJava的基础开始,一步一步与大家分享一下这个强大的异步库的用法!

RxJava 概念初步

RxJava 在Github Repo上给的解释是:

“RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.”

大概就是说RxJava是Java VM上一个灵活的、使用可观测序列来组成的一个异步的、基于事件的库。咋一看好像不知道是啥东西… … 没事,往下看~

作用 - 异步

上面 这段解释,重点就在于异步!但是它又不像 AsyncTask 这样用法简单,所以刚接触RxJava的童鞋,可能会觉得特别难,无从下手,没事,相信通过这篇文章,大伙儿可以有一个比较深刻的理解!

RxJava精华可以浓缩为异步两个字,其核心的东西不外乎两个:

1.  Observable(被观察者) 2.  Observer/Subscriber(观察者)

Observables可以发出一系列的 事件,这里的事件可以是任何东西,例如网络请求、复杂计算处理、数据库操作、文件操作等等,事件执行结束后交给 Observer/Subscriber 的回调处理。

模式 - 观察者模式

观察者模式是一种对象的行为模式,是 Java 设计模式中很常用的一个模式。观察者模式也常称为:

发布-订阅模式(Publish/Subscribe)模型-视图模式(Model/View)源-监听器模式(Source/Listener)从属者模式(Dependents)

例如用过事件总线 EventBus 库的童鞋就知道,EventBus 属于发布-订阅模式(Publish/Subscribe)。

// 事件订阅@Subscribe(threadMode = ThreadMode.MAIN)public void showDownProgress(MyEvent event) {     // TODO}// 事件发布EventBus.getDefault().post(new MyEvent());

实际上,使用 RxJava 也可以设计出一套事件总线的库,这个称为 RxBus。有兴趣的话可以在学完 RxJava 之后,可以尝试写一个。这里就不细说了~

为啥说这个呢?因为,RxJava 也是一种扩展的观察者模式!

举个栗子,Android 中 View 的点击监听器的实现,View 是被观察者,OnClickListener 对象是观察者,Activity 要如何知道 View 被点击了?那就是构造一个 OnClickListener 对象,通过 setOnClickListener 与View达成一个订阅关系,一旦 View 被点击了,就通过OnClickListener对象的 OnClick 方法传达给 Activity 。采用观察者模式可以避免去轮询检查,节约有限的cpu资源。

结构 - 响应式编程

响应式?顾名思义,就是“你变化,我响应”。举个栗子,a = b + c; 这句代码将b+c的值赋给a,而之后如果b和c的值改变了不会影响到a,然而,对于响应式编程,之后b和c的值的改变也动态影响着a,意味着a会随着b和c的变化而变化。

响应式编程的组成为Observable/Operator/Subscriber,RxJava在响应式编程中的基本流程如下:

Observable -> Operator 1 -> Operator 2 -> Operator 3 -> Subscriber

这个流程,可以简单的理解为:

  1. Observable 发出一系列事件,他是事件的产生者;

  2. Subscriber 负责处理事件,他是事件的消费者;

  3. Operator 是对 Observable 发出的事件进行修改和变换;

  4. 若事件从产生到消费不需要其他处理,则可以省略掉中间的 Operator,从而流程变为 Obsevable -> Subscriber

  5. Subscriber 通常在主线程执行,所以原则上不要去处理太多的事务,而这些复杂的事务处理则交给 Operator;

优势 - 逻辑简洁

Rx 优势可以概括为四个字,那就是 逻辑简洁。然而,逻辑简洁并不意味着代码简洁,但是,由于链式结构,一条龙,你可以从头到尾,从上到下,很清楚的看到这个连式结构的执行顺序。对于开发人员来说,代码质量并不在于代码量,而在于逻辑是否清晰简洁,可维护性如何,代码是否健壮!

另外,熟悉lambda的,还可以进一步提高代码的简洁性。举个简单栗子对比一下,暂时不需要过多理解,后面会一一道来:

// 不使用lambdaObservable.just("Hello World!").map(new Func1<String, String>() {         @Overridepublic String call(String s) {             return s + "I am kyrie!";}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<String>() {         @Overridepublic void call(String s) {Log.i(TAG, s);}});// 使用lambdaObservable.just("Hello World!").map(s -> s + "I am kyrie!").subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(s -> {Log.i(TAG, s);});

RxJava 依赖

在 Android Studio 项目下,为 module 增加 Gradle 依赖。

// Android 平台下须引入的一个依赖,主要用于线程控制compile 'io.reactivex:rxandroid:1.1.0'// RxJavacompile 'io.reactivex:rxjava:1.1.5'

这是我项目里面用的版本,也可以到Maven/RxJava下获取最新版本。

RxJava 入门

前面讲了那么多,大家在概念上对RxJava有一个初步的认识就好,接下来,将为您解开RxJava神秘的面纱~~

无需过分纠结于“事件”这个词,暂时可以简单的把“事件”看成是一个值,或者一个对象。

  1. 事件产生,就是构造要传递的对象;

  2. 事件处理变换,就是改变传递的对象,可以改变对象的值,或是干脆创建个新对象,新对象类型也可以与源对象不一样;

  3. 事件处理,就是接收到对象后要做的事;

事件产生

RxJava创建一个事件比较简单,由 Observable 通过 create 操作符来创建。举个栗子,还是经典的 HelloWorld~~

// 创建一个ObservableObservable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {    @Overridepublic void call(Subscriber<? super String> subscriber) {        // 发送一个 Hello World 事件subscriber.onNext("Hello World!");        // 事件发送完成subscriber.onCompleted();}
});

这段代码可以理解为, Observable 发出了一个类型为 String ,值为 “Hello World!” 的事件,仅此而已。

对于 Subscriber 来说,通常onNext()可以多次调用,最后调用onCompleted()表示事件发送完成。

上面这段代码,也可以通过just操作符进行简化。RxJava常用操作符后面会详细介绍,这里先有个了解。

// 创建对象,just里面的每一个参数,相当于调用一次Subscriber#OnNext()Observable<String> observable = Observable.just("Hello World!");

这样,是不是简单了许多?

事件消费

有事件产生,自然也要有事件消费。RxJava 可以通过 subscribe 操作符,对上述事件进行消费。首先,先创建一个观察者。

// 创建一个ObserverObserver<String> observer = new Observer<String>() {    @Overridepublic void onCompleted() {Log.i(TAG, "complete");}    @Overridepublic void onError(Throwable e) {}    @Overridepublic void onNext(String s) {Log.i(TAG, s);}
};

或者

// 创建一个SubscriberSubscriber<String> subscriber = new Subscriber<String>() {    @Overridepublic void onCompleted() {Log.i(TAG, "complete");}    @Overridepublic void onError(Throwable e) {}    @Overridepublic void onNext(String s) {Log.i(TAG, s);}
};
  1. Observer 是观察者, Subscriber 也是观察者,Subscriber 是一个实现了Observer接口的抽象类,对 Observer 进行了部分扩展,在使用上基本没有区别;

  2. Subscriber 多了发送之前调用的 onStart() 和解除订阅关系的 unsubscribe() 方法。

  3. 并且,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。所以在这之后的示例代码,都使用 Subscriber 来作为观察者。

事件订阅

最后,我们可以调用 subscribe 操作符, 进行事件订阅。

// 订阅事件observable.subscribe(subscriber);

在 Subscriber 实现的三个方法中,顾名思义,对应三种不同状态: 
1. onComplete(): 事件全部处理完成后回调 
2. onError(Throwable t): 事件处理异常回调 
3. onNext(T t): 每接收到一个事件,回调一次

区分回调动作

对于事件消费与事件订阅来说,好像为了打印一个“Hello World!”要费好大的劲… 其实,RxJava 自身提供了精简回调方式,我们可以为 Subscriber 中的三种状态根据自身需要分别创建一个回调动作 Action

// onComplete()Action0 onCompleteAction = new Action0() {    @Overridepublic void call() {Log.i(TAG, "complete");}
};// onNext(T t)Action1<String> onNextAction = new Action1<String>() {    @Overridepublic void call(String s) {Log.i(TAG, s);}
};// onError(Throwable t)Action1<Throwable> onErrorAction = new Action1<Throwable>() {    @Overridepublic void call(Throwable throwable) {}
};

那么,RxJava 的事件订阅支持以下三种不完整定义的回调。

observable.subscribe(onNextAction);observable.subscribe(onNextAction, onErrorAction);observable.subscribe(onNextAction, onErrorAction, onCompleteAction);

我们可以根据当前需要,传入对应的 Action, RxJava 会相应的自动创建 Subscriber。

  1. Action0 表示一个无回调参数的Action;

  2. Action1 表示一个含有一个回调参数的Action;

  3. 当然,还有Action2 ~ Action9,分别对应2~9个参数的Action;

  4. 每个Action,都有一个 call() 方法,通过泛型T,来指定对应参数的类型;

入门示例

前面讲解了事件的产生到消费、订阅的过程,下面就举个完整的例子。从res/mipmap中取出一张图片,显示在ImageView上。

final ImageView ivLogo = (ImageView) findViewById(R.id.ivLogo);Observable.create(new Observable.OnSubscribe<Drawable>() {        @Overridepublic void call(Subscriber<? super Drawable> subscriber) {            // 从mipmap取出一张图片作为Drawable对象Drawable drawable = ContextCompat.getDrawable(mContext, R.mipmap.ic_launcher);            // 把Drawable对象发送出去subscriber.onNext(drawable);subscriber.onCompleted();}}).subscribe(new Subscriber<Drawable>() {        @Overridepublic void onCompleted() {}        @Overridepublic void onError(Throwable e) {Log.i(TAG, e.toString());}        @Overridepublic void onNext(Drawable drawable) {            // 接收到Drawable对象,显示在ImageView上ivLogo.setImageDrawable(drawable);}});

上面示例是RxJava最基本的一个用法。稍微消化一下,继续~~

RxJava 进阶

Scheduler线程控制

默认情况下,RxJava事件产生和消费均在同一个线程中,例如在主线程中调用,那么事件的产生和消费都在主线程。

那么问题来了,假如事件产生的过程是耗时操作,比如网络请求,结果显示在UI中,这个时候在主线程执行对于网络请求就不合适了,而在子线程执行,显示结果需要进行UI操作,同样不合适~~

所以,RxJava 的第一个牛逼之处在于可以自由切换线程!那么,如何做?

在 RxJava 中,提供了一个名为 Scheduler 的线程调度器,RxJava 内部提供了4个调度器,分别是:

  1. Schedulers.io(): I/O 操作(读写文件、数据库、网络请求等),与newThread()差不多,区别在于io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 效率比 newThread() 更高。值得注意的是,在 io() 下,不要进行大量的计算,以免产生不必要的线程;

  2. Schedulers.newThread(): 开启新线程操作;

  3. Schedulers.immediate(): 默认指定的线程,也就是当前线程;

  4. Schedulers.computation():计算所使用的调度器。这个计算指的是 CPU 密集型计算,即不会被 I/O等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。值得注意的是,不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU;

  5. AndroidSchedulers.mainThread(): RxJava 扩展的 Android 主线程;

我们可以通过 subscribeOn() 和 observeOn() 这两个方法来进行线程调度。举个栗子:

依然还是显示一张图片,不同的是,这次是从网络上加载图片

final ImageView ivLogo = (ImageView) findViewById(R.id.ivLogo);Observable.create(new Observable.OnSubscribe<Drawable>() {    @Overridepublic void call(Subscriber<? super Drawable> subscriber) {        try {Drawable drawable = Drawable.createFromStream(new URL("https://ss2.baidu.com/6ONYsjip0QIZ8tyhnq/it/u=2502144641,437990411&fm=80&w=179&h=119&img.JPEG").openStream(), "src");subscriber.onNext(drawable);} catch (IOException e) {subscriber.onError(e);}}
})        // 指定 subscribe() 所在的线程,也就是上面call()方法调用的线程.subscribeOn(Schedulers.io())        // 指定 Subscriber 回调方法所在的线程,也就是onCompleted, onError, onNext回调的线程.observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Drawable>() {            @Overridepublic void onCompleted() {}            @Overridepublic void onError(Throwable e) {Log.e(TAG, e.toString());}            @Overridepublic void onNext(Drawable drawable) {ivLogo.setImageDrawable(drawable);}});

所以,这段代码就做一件事,在 io 线程加载一张网络图片,加载完毕之后在主线程中显示到ImageView上。

变换

RxJava的又一牛逼之处,在于 变换。啥意思呢? 就是将发送的事件或事件序列,加工后转换成不同的事件或事件序列。

map操作符

变换的概念不好理解吧?举个简单的栗子,我们对上述示例 进行改写。

final ImageView ivLogo = (ImageView) findViewById(R.id.ivLogo);Observable.create(new Observable.OnSubscribe<String>() {    @Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("https://ss2.baidu.com/-vo3dSag_xI4khGko9WTAnF6hhy/image/h%3D200/sign=4db5130a073b5bb5a1d727fe06d2d523/cf1b9d16fdfaaf51965f931e885494eef11f7ad6.jpg");}
}).map(new Func1<String, Drawable>() {    @Overridepublic Drawable call(String url) {        try {Drawable drawable = Drawable.createFromStream(new URL(url).openStream(), "src");            return drawable;} catch (IOException e) {}        return null;}
})        // 指定 subscribe() 所在的线程,也就是call()方法调用的线程.subscribeOn(Schedulers.io())        // 指定 Subscriber 回调方法所在的线程,也就是onCompleted, onError, onNext回调的线程.observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Drawable>() {            @Overridepublic void onCompleted() {}            @Overridepublic void onError(Throwable e) {Log.e(TAG, e.toString());}            @Overridepublic void onNext(Drawable drawable) {                if (drawable != null) {ivLogo.setImageDrawable(drawable);}}});

经过改写代码后,有什么变化呢? Observable 创建了一个 String 事件,也就是产生一个url,通过 map 操作符进行变换,返回Drawable对象,这个变换指的就是通过url进行网络图片请求,返回一个Drawable。所以简单的来说就是把String事件,转换为Drawable事件。逻辑表示就是:

Observable<String> --> map变换 --> Observable<Drawable>

那么,Func1 是什么呢?与 Action1 类似,不同的是 FuncX 有返回值,而 ActionX 没有。为什么需要返回值呢?目的就在于对象的变换,由String对象转换为Drawable对象。同样,也有Func0 ~ Func9,对应不同的参数个数。

当然了,RxJava 的变换,可不止于map这么简单,继续往下!

flatMap操作符

不难发现,上述的 map 操作符,是一对一的变换,并且返回的是变换后的对象。而 flatMap 操作符可以适应一对多,并且返回的是一个 Observable 。应用场景举例:例如一个员工负责多个任务,现在要打印所有员工的所有任务。

final List<Employee> list = new ArrayList<Employee>() {{add(new Employee("jackson", mission_list1));add(new Employee("sunny", mission_list2));}
};
Observable.from(list).flatMap(new Func1<Employee, Observable<Employee.Mission>>() {            @Overridepublic Observable<Employee.Mission> call(Employee employee) {                return Observable.from(employee.missions);}}).subscribe(new Subscriber<Employee.Mission>() {            @Overridepublic void onCompleted() {}            @Overridepublic void onError(Throwable e) {}            @Overridepublic void onNext(Employee.Mission mission) {Log.i(TAG, mission.desc);}});

执行结果为顺序打印出两位员工的所有任务列表。

通过上面的代码可以看出,map 与 flatMap 这两个操作符的共同点在于,他们都是把一个对象转换为另一个对象,但须注意以下这些特点:

  1. flatMap 返回的是一个Observable对象,而 map 返回的是一个普通转换后的对象;

  2. flatMap 返回的Observable对象并不是直接发送到Subscriber的回调中,而是重新创建一个Observable对象,并激活这个Observable对象,使之开始发送事件;而 map 变换后返回的对象直接发到Subscriber回调中;

  3. flatMap 变换后产生的每一个Observable对象发送的事件,最后都汇入同一个Observable,进而发送给Subscriber回调;

  4. map返回类型 与 flatMap 返回的Observable事件类型,可以与原来的事件类型一样;

  5. 可以对一个Observable多次使用 map 和 flatMap

鉴于 flatMap 自身强大的功能,这常常被用于 嵌套的异步操作,例如嵌套网络请求。传统的嵌套请求,一般都是在前一个请求的 onSuccess() 回调里面发起新的请求,这样一旦嵌套多个的话,缩进就是大问题了,而且严重的影响代码的可读性。而RxJava嵌套网络请求仍然通过链式结构,保持代码逻辑的清晰!举个栗子:

Github上的 README.md 文件,通常是 MarkDown 语法。我们要获取 README.md 内容并按 MarkDown 风格显示在UI上,就可以通过以下方式(Retrofit2 + RxJava,稍后会介绍):

new ReadmeContentClient()    // 获取md语法的Readme内容, 返回的是一个Observable<String>对象.getReadme().flatMap(new Func1<String, Observable<String>>() {        @Overridepublic Observable<String> call(String md) {            // 由于Readme的内容是md语法,需要转成html字符串通过WebView显示到UI// 返回的也是Observable<String>对象return new MarkDownStyleClient(md).formatMarkStyle();}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {        @Overridepublic void onCompleted() {}        @Overridepublic void onError(Throwable e) {Log.e(TAG, "readme:" + e.toString());}        @Overridepublic void onNext(String html) {            // html就是根据readme md格式内容,生成的html代码view.showReadme(html);}});

RxJava 其他常用操作符

  1. from 
    接收一个集合作为输入,然后每次输出一个元素给subscriber。

    // Observable.from(T[] params)Observable.from(new Integer[]{1, 2, 3, 4, 5}).subscribe(new Action1<Integer>() {        @Overridepublic void call(Integer number) {Log.i(TAG, "number:" + number);}});

    注意:如果from()里面执行了耗时操作,即使使用了subscribeOn(Schedulers.io()),仍然是在主线程执行,可能会造成界面卡顿甚至崩溃,所以耗时操作还是使用Observable.create(…);

  2. just 
    接收一个可变参数作为输入,最终也是生成数组,调用from(),然后每次输出一个元素给subscriber。

    // Observable.just(T... params),params的个数为1 ~ 10Observable.just(1, 2, 3, 4, 5).subscribe(new Action1<Integer>() {        @Overridepublic void call(Integer number) {Log.i(TAG, "number:" + number);}});
  3. filter 
    条件过滤,去除不符合某些条件的事件。举个栗子:

    Observable.from(new Integer[]{1, 2, 3, 4, 5}).filter(new Func1<Integer, Boolean>() {        @Overridepublic Boolean call(Integer number) {            // 偶数返回true,则表示剔除奇数,留下偶数return number % 2 == 0;}}).subscribe(new Action1<Integer>() {        @Overridepublic void call(Integer number) {Log.i(TAG, "number:" + number);}});
  4. take 
    最多保留的事件数。

  5. doOnNext 
    在处理下一个事件之前要做的事。

    Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}).filter(new Func1<Integer, Boolean>() {        @Overridepublic Boolean call(Integer number) {            // 偶数返回true,则表示剔除奇数return number % 2 == 0;}})    // 最多保留三个,也就是最后剩三个偶数.take(3).doOnNext(new Action1<Integer>() {        @Overridepublic void call(Integer number) {            // 在输出偶数之前输出它的hashCodeLog.i(TAG, "hahcode = " + number.hashCode() + "");}}).subscribe(new Action1<Integer>() {        @Overridepublic void call(Integer number) {Log.i(TAG, "number = " + number);}});

    输出如下:

    hahcode = 2number = 2hahcode = 4number = 4hahcode = 6number = 6
  6. debounce 
    通俗点讲,就是N个事件发生的时间间隔太近,就过滤掉前N-1个事件,保留最后一个事件。debounce可以指定这个时间间隔!可以用在SearchEditText请求关键词的地方,SearchEditText的内容变化太快,可以抵制频繁请求关键词,后面第15条15.Subject会介绍这个。为了演示效果,先举个简单栗子:

    Observable.create(new Observable.OnSubscribe<Integer>() {        @Overridepublic void call(Subscriber<? super Integer> subscriber) {            int i = 0;            int[] times = new int[]{100, 1000};            while (true) {i++;                if (i >= 100)                    break;subscriber.onNext(i);                try {                    // 注意!!!!// 当i为奇数时,休眠1000ms,然后才发送i+1,这时i不会被过滤掉// 当i为偶数时,只休眠100ms,便发送i+1,这时i会被过滤掉Thread.sleep(times[i % 2]);} catch (InterruptedException e) {e.printStackTrace();}}subscriber.onCompleted();}})    // 间隔400ms以内的事件将被丢弃.debounce(400, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Integer>() {        @Overridepublic void onCompleted() {Log.i(TAG, "complete");}        @Overridepublic void onError(Throwable e) {Log.e(TAG, e.toString());}        @Overridepublic void onNext(Integer integer) {Log.i(TAG, "integer = " + integer);}});

    输出结果:

    11-23 10:44:45.167 MainActivity: integer = 111-23 10:44:46.270 MainActivity: integer = 311-23 10:44:47.373 MainActivity: integer = 511-23 10:44:48.470 MainActivity: integer = 711-23 10:44:49.570 MainActivity: integer = 911-23 10:44:50.671 MainActivity: integer = 1111-23 10:44:51.772 MainActivity: integer = 1311-23 10:44:52.872 MainActivity: integer = 1511-23 10:44:53.973 MainActivity: integer = 17...

    我们设置过滤条件为400ms,可以发现,奇数正常输出,因为在它的下一个事件事件隔了1000ms,所以它不会被过滤掉;偶数被过滤掉,是因为它距离下一个事件(奇数)只隔了100ms。并且,输出的两个事件相隔大约为 100ms + 1000ms = 1100ms

  7. merge 
    用于合并两个Observable为一个Observable。较为简单。

    Observable.merge(Observable1, Observable2).subscribe(subscriber);
  8. concat 
    顺序执行多个Observable,个数为1 ~ 9。例子稍后与first操作符一起~~

  9. compose 
    与 flatMap 类似,都是进行变换,返回Observable对象,激活并发送事件。

    1. compose 是唯一一个能够从数据流中得到原始Observable的操作符,所以,那些需要对整个数据流产生作用的操作(比如,subscribeOn()和observeOn())需要使用 compose 来实现。相较而言,如果在flatMap()中使用subscribeOn()或者observeOn(),那么它仅仅对在 flatMap 中创建的Observable起作用,而不会对剩下的流产生影响。这样就可以简化subscribeOn()以及observeOn()的调用次数了。

    2. compose 是对 Observable 整体的变换,换句话说, flatMap 转换Observable里的每一个事件,而 compose 转换的是整个Observable数据流。

    3. flatMap 每发送一个事件都创建一个 Observable,所以效率较低。而 compose 操作符只在主干数据流上执行操作。

    4. 建议使用 compose 代替 flatMap

  10. first 
    只发送符合条件的第一个事件。可以与前面的contact操作符,做网络缓存。举个栗子:依次检查Disk与Network,如果Disk存在缓存,则不做网络请求,否则进行网络请求。

    // 从缓存获取Observable<BookList> fromDisk = Observable.create(new Observable.OnSubscribe<BookList>() {    @Overridepublic void call(Subscriber<? super BookList> subscriber) {BookList list = getFromDisk();        if (list != null) {subscriber.onNext(list);} else {subscriber.onCompleted();}}
    });// 从网络获取Observable<BookList> fromNetWork = bookApi.getBookDetailDisscussionList();Observable.concat(fromDisk, fromNetWork)        // 如果缓存不为null,则不再进行网络请求。反之.first().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<BookList>() {            @Overridepublic void onCompleted() {}            @Overridepublic void onError(Throwable e) {}            @Overridepublic void onNext(BookList discussionList) {}});

    网络缓存用法,具体可参见我的项目:https://github.com/JustWayward/BookReader

  11. timer 
    可以做定时操作,换句话讲,就是延迟执行。事件间隔由timer控制。举个栗子:两秒后输出“Hello World!”

    Observable.timer(2, TimeUnit.SECONDS).subscribe(new Subscriber<Long>() {        @Overridepublic void onCompleted() {}        @Overridepublic void onError(Throwable e) {}        @Overridepublic void onNext(Long aLong) {Log.i(TAG, "Hello World!");}});
  12. interval 
    定时的周期性操作,与timer的区别就在于它可以重复操作。事件间隔由interval控制。举个栗子:每隔两秒输出“Hello World!”

    Observable.interval(2, TimeUnit.SECONDS).subscribe(new Subscriber<Long>() {        @Overridepublic void onCompleted() {}        @Overridepublic void onError(Throwable e) {}        @Overridepublic void onNext(Long aLong) {Log.i(TAG, "Hello World!");}});
  13. throttleFirst 
    与debounce类似,也是时间间隔太短,就丢弃事件。可以用于防抖操作,比如防止双击。

    RxView.clicks(button).throttleFirst(1, TimeUnit.SECONDS).subscribe(new Observer<Object>() {      @Overridepublic void onCompleted() {}      @Overridepublic void onError(Throwable e) {}      @Overridepublic void onNext(Object o) {Log.i(TAG, "do clicked!");}});

    上面这个RxView详见:https://github.com/JakeWharton/RxBinding, 主要与RxJava结合用于一些View的事件绑定,JakeWharton大神的项目,厉害。

  14. Single 
    Single与Observable类似,相当于是他的精简版。订阅者回调的不是OnNext/OnError/onCompleted,而是回调OnSuccess/OnError。

    Single.create(new Single.OnSubscribe<Object>() {    @Overridepublic void call(SingleSubscriber<? super Object> subscriber) {subscriber.onSuccess("Hello");}
    }).subscribe(new SingleSubscriber<Object>() {    @Overridepublic void onSuccess(Object value) {Log.i(TAG, value.toString());}    @Overridepublic void onError(Throwable error) {}
    });
  15. Subject 
    Subject这个类,既是Observable又是Observer,啥意思呢?就是它自身既是事件的生产者,又是事件的消费者,相当于自身是一条管道,从一端进,又从另一端出。举个栗子:PublishSubject

    Subject subject = PublishSubject.create();// 1.由于Subject是Observable,所以进行订阅subject.subscribe(new Subscriber<Object>() {    @Overridepublic void onCompleted() {}    @Overridepublic void onError(Throwable e) {}    @Overridepublic void onNext(Object o) {Log.i(TAG, o.toString());}
    });// 2.由于Subject同时也是Observer,所以可以调用onNext发送数据subject.onNext("world");

    这个好像有点厉害的样子,哈哈。可以配合debounce,避免SearchEditText频繁请求。

    Subject subject = PublishSubject.create();subject.debounce(400, TimeUnit.MILLISECONDS).subscribe(new Subscriber<Object>() {        @Overridepublic void onCompleted() {}        @Overridepublic void onError(Throwable e) {}        @Overridepublic void onNext(Object o) {            // request}});edittext.addTextChangedListener(new TextWatcher() {    @Override public void beforeTextChanged(CharSequence s, int start, int count, int after) { }    @Override public void onTextChanged(CharSequence s, int start, int before, int count) {subject.onNext(s.toString());}    @Override public void afterTextChanged(Editable s) { }
    });

RxJava 应用

RxJava+Retrofit 的网络请求方式

Retrofit是一个非常适合RestAPI的网络请求库。没用过的童鞋,还是推荐学一学的。

使用Callback的请求方式:

// 1. 定义一个请求接口@GET("/match/stat")
Call<String> getMatchStat(@Query("mid") String mid, @Query("tabType") String tabType);// 2. 创建Service对象Retrofit retrofit = new Retrofit.Builder().baseUrl(BuildConfig.TENCENT_SERVER)// 加入RxJava支持.addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .addConverterFactory(ScalarsConverterFactory.create()).client(OkHttpHelper.getTecentClient()).build();TencentApi api = retrofit.create(TencentApi.class);// 3. 调用Call<String> call = api.getMatchStat(mid, tabType);
call.enqueue(new Callback<String>() {    @Overridepublic void onResponse(Call<String> call, Response<String> response) {        if(response != null && response.body()!=null)            // 成功} else {            // 无数据}}    @Overridepublic void onFailure(Call<String> call, Throwable t) {        // 失败}
});

与 RxJava 结合的方式,则是

// 1. 定义请求接口,返回的是Observable对象@GET("/user/followers")
Observable<List<User>> followers();// 2. 同样是创建api对象...// 3. 请求api.followers().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<List<User>>() {        @Overridepublic void onCompleted() {}        @Overridepublic void onError(Throwable e) {            // 请求出错。可能发生网络异常、Json解析异常等等}        @Overridepublic void onNext(List<User> list) {            // 请求成功view.showMyFollowers(list);}});

若需嵌套请求,比如先获取Token再进行才能进行登录,可参考flatMap操作符最后的获取Readme内容显示在WebView上的例子。

Retrofit2 + RxJava + Dagger2: 具体可参见我的项目,里面有比较详细的用法。 
https://github.com/JustWayward/BookReader

不难发现,Retrofit 把请求封装进 Observable ,在请求结束后调用 onNext() 以及 OnCompleted() 或在请求失败后调用 onError()

:RxJava形式的请求,并不能减少代码量,但是逻辑非常清晰。假如请求到数据之后需要对数据进行处理,并且是耗时操作,难道要再开一个线程,或者用AsyncTask再做一次异步?很显然,RxJava的变换很好的解决了这个问题,依然会使逻辑结构清晰。

RxBus

准确的来说,是一种基于RxJava实现事件总线的一种思想。可以替代EventBus/Otto,因为他们都依赖于观察者模式。可以参考https://github.com/AndroidKnife/RxBus这个库。

RxBinding

前面介绍过了,JakeWharton大神的项目,https://github.com/JakeWharton/RxBinding, 主要与RxJava结合用于一些View的事件绑定。

RxJava 的一些坑

未取消订阅而引起的内存泄漏

举个栗子,对于前面常用操作符12.interval做周期性操作的例子,并没有使之停下来的,没有去控制订阅的生命周期,这样,就有可能引发内存泄漏。所以,在Activity#onDestroy()的时候或者不需要继续执行的时候应该取消订阅。

Subscription subscription = Observable.interval(2, TimeUnit.SECONDS).subscribe(new Subscriber<Long>() {        @Overridepublic void onCompleted() {}        @Overridepublic void onError(Throwable e) {}        @Overridepublic void onNext(Long aLong) {Log.i(TAG, "Hello World!");}});// 调用unsubscribe();方法进行取消订阅subscription.unsubscribe();

但是,如果有很多个数据源,那岂不是要取消很多次?当然不是的,可以利用 CompositeSubscription, 相当于一个 Subscription 集合。

CompositeSubscription list = new CompositeSubscription();
list.add(subscription1);
list.add(subscription2);
list.add(subscription3);// 统一调用一次unsubscribe,就可以把所有的订阅都取消list.unsubscribe();

总结

相信到了这里,大家对RxJava应该有了一个比较清晰的理解。当然,实践出真知,还是要去尝试,才能更深层次的体会到其强大之处。

最后,总结一下RxJava的基本使用过程。

  1. 首先是创建事件源源,也就是被观察者,可以用Observable的create/just/from等方法来创建;

  2. 通过filter/debounce等操作符,进行自定义事件过滤;

  3. 通过Schedules进行事件发送和订阅的线程控制,也就是subscribeOn() 和 observeOn();

  4. 通过map/flatMap/compose等操作符,进行事件的变换;

  5. 调用subscribe进行事件订阅;

  6. 最后,不要忘了对订阅者生命周期的控制,不用的时候,记得调用unsubscribe(),以免引发内存泄漏。

作者:哈士奇WWW
链接:https://www.imooc.com/article/68834
来源:慕课网

RxJava 从入门到全解析相关推荐

  1. DoTween全解析(入门篇)

    DoTween全解析(入门篇) 概述: DoTween,Itween,这些名字作为一个Unity开发人员听起来并不陌生,它们在动画方面表现出了令人折服的能力,今天我带着大家来一起认识一下这款插件. 首 ...

  2. ps裁剪和裁切的区别_【后期修图】ps新手入门:裁剪工具全解析!

    原标题:[后期修图]ps新手入门:裁剪工具全解析! 想要掌握ps,裁剪工具的学习是必不可少的,几乎角角落落都能看到它的身影,比如去掉照片边缘的杂物.重新规划照片比例.调整构图等等. 一.认识工具 剪裁 ...

  3. js解析二维码_最新最全阿里巴巴,今日头条,腾讯Flutter面试真题全解析(狂虐不止)...

    阿里巴巴,今日头条,腾讯Flutter面试真题全解析.你只有去过大厂,才知道大厂的面试有多难,这个难度不是你能够想象得到的.所以说如果想去做这方面的工作,建议把以下内容好好准备一下(其实也就是多看一些 ...

  4. RxJava学习入门

    RxJava是什么 一个词:异步. RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-bas ...

  5. vuex 源码分析_前端入门之(vuex-router-sync解析)

    前端入门之(vuex-router-sync解析) 发布时间:2018-11-14 13:31, 浏览次数:513 , 标签: vuex router sync 前言:vue全家桶的内容我们已经研究过 ...

  6. 【持续更新】Leetcode SQL题目全解析(附建表sql)

    Leetcode SQL题目全解析 越前须知(雾) 题目Q & A 175 组合两个表 181 超过经理收入的员工 182 查找重复电子邮箱 183 从不订购的用户 197 上升的温度 511 ...

  7. 【比特熊故事汇】X Microsoft Build 2022——微软专家+MVP,技术亮点全解析

    大家好!我是爱吃.爱玩.更爱学习技术,IT届的新晋小网红,开发者的好朋友比特熊! 比特熊:特别联名Microsoft Build 2022,本期[比特熊故事汇]请来重量级嘉宾,突破直播间嘉宾数新高!由 ...

  8. Android图片加载框架最全解析(八),带你全面了解Glide 4的用法

    本文转载自郭神的Glide分析系列:http://blog.csdn.net/guolin_blog/article/details/78582548 本文同步发表于我的微信公众号,扫一扫文章底部的二 ...

  9. 网络爬虫全解析(JAVA)--目录

    第1章技术基础1 1.1第一个程序1 1.2准备开发环境2 1.2.1JDK2 1.2.2Eclipse3 1.3类和对象4 1.4常量5 1.5命名规范6 1.6基本语法6 1.7条件判断7 1.8 ...

最新文章

  1. 32位计算机能玩什么游戏,系统32位操作能玩什么游戏?
  2. BeautifulSoup解析库详解
  3. 数据库 'SqlPersistenceService' 的版本为 655,无法打开。此服务器支持 611 版及更低版...
  4. 区块链,供应链金融的新机遇
  5. 动画函数requestAnimationFrame
  6. Redis主从配置和集群配置
  7. C# 运行时通过鼠标拖动改变控件的大小
  8. spring学习(42):属性注入注入数组和列表的说明
  9. 【Elasticsearch】解除索引只读限制 read-only
  10. python手机销售系统结论于心得_python实现手机销售管理系统
  11. Python终端输出中文
  12. ubuntu18.04编译Open Pose 1.7
  13. Rtmp协议看一篇就够了
  14. Windows10怎么下载MySQL,详解Windows10下载mysql的教程图解
  15. 软件以人为本5 - 敏捷3 - 拯救每日立会2
  16. git官网下载太慢解决方法
  17. Java 实现 图片OCR文字识别
  18. 受保护的Word文档如何编辑?
  19. [含lw+源码等]微信小程序在线考试管理系统+后台管理系统[包运行成功]
  20. linux的ioctl函数实现

热门文章

  1. 通过Freedgo画方框图
  2. python 几何计算_【理解黎曼几何】6. 曲率的计数与计算(Python)
  3. 用JAVA编写简易计算器界面
  4. VMware运用Intel I350网卡异常处理
  5. 原生js实现元素拖拽onmousedown/onmousemove/onmouseup
  6. vs2015安装qt插件提示QT in the given path was built using minGW
  7. Macbook翻转鼠标滑轮
  8. 安卓手机卡顿怎么解决_安卓手机卡是通病?大这样设置一招轻松解决
  9. 3.Mac安装Vue出现的问题解决方案:Error: EACCES: permission denied
  10. 滚潮信息中标移动服务器,电信联通25万座5G基站集采结果出炉 华为、中兴、爱立信、大唐移动中标...