转载来源http://blog.csdn.net/jdsjlzx/article/details/51534504

在说Observer与Subscriber的关系之前,我们下重温下相关概念。

RxJava 的观察者模式

RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。

  • onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
  • onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
  • 在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

RxJava 的观察者模式大致如下图:

RxJava的实现

基于以上的概念, RxJava 的基本实现主要有三点:

1) 创建 Observer

Observer 即观察者,它决定事件触发的时候将有怎样的行为。 RxJava 中的 Observer 接口的实现方式:

Observer<Apps> observer = new Observer<Apps>() {@Overridepublic void onCompleted() {listView.onRefreshComplete();}@Overridepublic void onError(Throwable e) {listView.onRefreshComplete();}@Overridepublic void onNext(Apps appsList) {listView.onRefreshComplete();appLists.addAll(appsList.apps);adapter.notifyDataSetChanged();}};

除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。 Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的:

Subscriber subscriber = new Subscriber<Apps>() {@Overridepublic void onCompleted() {listView.onRefreshComplete();}@Overridepublic void onError(Throwable e) {listView.onRefreshComplete();}@Overridepublic void onNext(Apps appsList) {listView.onRefreshComplete();appLists.addAll(appsList.apps);adapter.notifyDataSetChanged();}};

不仅基本使用方式一样,实质上,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。所以如果你只想使用基本功能,选择 Observer 和 Subscriber 是完全一样的。

Subscriber是Observer的实现类

public abstract class Subscriber<T> implements Observer<T>, Subscription 

而onStart()方法是Subscriber中的一个方法。它也属于回调级别的。

subscribe(Subscriber)方法中有如下代码:

// if not already wrapped   包裹一层if (!(subscriber instanceof SafeSubscriber)) {// assign to `observer` so we return the protected versionsubscriber = new SafeSubscriber<T>(subscriber);}

他将subscriber包装起来,这个具体什么意思有待研究,继续下看。

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);return hook.onSubscribeReturn(subscriber);

hook是什么呢?

private static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

RxJavaObservableExecutionHook.Java源码:

/*** Copyright 2014 Netflix, Inc.* * Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at* * http://www.apache.org/licenses/LICENSE-2.0* * Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package rx.plugins;import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func1;/*** Abstract ExecutionHook with invocations at different lifecycle points of {@link Observable} execution with a* default no-op implementation.* <p>* See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins:* <a href="https://github.com/ReactiveX/RxJava/wiki/Plugins">https://github.com/ReactiveX/RxJava/wiki/Plugins</a>.* <p>* <b>Note on thread-safety and performance:</b>* <p>* A single implementation of this class will be used globally so methods on this class will be invoked* concurrently from multiple threads so all functionality must be thread-safe.* <p>* Methods are also invoked synchronously and will add to execution time of the observable so all behavior* should be fast. If anything time-consuming is to be done it should be spawned asynchronously onto separate* worker threads.* */
public abstract class RxJavaObservableExecutionHook {/*** Invoked during the construction by {@link Observable#create(OnSubscribe)}* <p>* This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra* logging, metrics and other such things and pass-thru the function.* * @param f*            original {@link OnSubscribe}<{@code T}> to be executed* @return {@link OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just*         returned as a pass-thru*/public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {return f;}/*** Invoked before {@link Observable#subscribe(rx.Subscriber)} is about to be executed.* <p>* This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra* logging, metrics and other such things and pass-thru the function.* * @param onSubscribe*            original {@link OnSubscribe}<{@code T}> to be executed* @return {@link OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just*         returned as a pass-thru*/public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {// pass-thru by defaultreturn onSubscribe;}/*** Invoked after successful execution of {@link Observable#subscribe(rx.Subscriber)} with returned* {@link Subscription}.* <p>* This can be used to decorate or replace the {@link Subscription} instance or just perform extra logging,* metrics and other such things and pass-thru the subscription.* * @param subscription*            original {@link Subscription}* @return {@link Subscription} subscription that can be modified, decorated, replaced or just returned as a*         pass-thru*/public <T> Subscription onSubscribeReturn(Subscription subscription) {// pass-thru by defaultreturn subscription;}/*** Invoked after failed execution of {@link Observable#subscribe(Subscriber)} with thrown Throwable.* <p>* This is <em>not</em> errors emitted via {@link Subscriber#onError(Throwable)} but exceptions thrown when* attempting to subscribe to a {@link Func1}<{@link Subscriber}{@code <T>}, {@link Subscription}>.* * @param e*            Throwable thrown by {@link Observable#subscribe(Subscriber)}* @return Throwable that can be decorated, replaced or just returned as a pass-thru*/public <T> Throwable onSubscribeError(Throwable e) {// pass-thru by defaultreturn e;}/*** Invoked just as the operator functions is called to bind two operations together into a new* {@link Observable} and the return value is used as the lifted function* <p>* This can be used to decorate or replace the {@link Operator} instance or just perform extra* logging, metrics and other such things and pass-thru the onSubscribe.* * @param lift*            original {@link Operator}{@code <R, T>}* @return {@link Operator}{@code <R, T>} function that can be modified, decorated, replaced or just*         returned as a pass-thru*/public <T, R> Operator<? extends R, ? super T> onLift(final Operator<? extends R, ? super T> lift) {return lift;}
}

RxJavaObservableExecutionHook类的作用很特殊,似乎没有什么太大的作用,传进去什么(类型)参数,返回什么(类型)参数。

如下代码所示:

public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {return f;}public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {// pass-thru by defaultreturn onSubscribe;}

至于最后关键的返回结果:

public <T> Subscription onSubscribeReturn(Subscription subscription) {// pass-thru by defaultreturn subscription;}

说白了,就是返回订阅的Observer对象。


Observer与Subscriber的区别

它们的区别对于使用者来说主要有两点:

  1. onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法,具体可以在后面的文中看到。
  2. unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态。 unsubscribe() 这个方法很重要,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。

2) 创建 Observable

Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件。 RxJava 使用 create() 方法来创建一个 Observable ,并为它定义事件触发规则:

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("Hello");subscriber.onNext("John");subscriber.onCompleted();}
});

可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在返回的 Observable 对象中,它的作用相当于一个计划表,当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber 将会被调用两次 onNext() 和一次 onCompleted())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。

create() 是 RxJava 最基本的创造事件序列的操作符。基于这个操作符, RxJava 还提供了一些方法用来快捷创建事件队列,详见RxJava操作符系列文章:http://blog.csdn.net/jdsjlzx/article/details/51485861

3) Subscribe (订阅)
创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来,整条链子就可以工作了。代码形式很简单:

observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);

Observable.subscribe(Subscriber) 的内部实现是这样的(仅核心代码):

public final Subscription subscribe(Subscriber<? super T> subscriber) {return Observable.subscribe(subscriber, this);}private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {// validate and proceedif (subscriber == null) {throw new IllegalArgumentException("observer can not be null");}if (observable.onSubscribe == null) {throw new IllegalStateException("onSubscribe function can not be null.");/** the subscribe function can also be overridden but generally that's not the appropriate approach* so I won't mention that in the exception*/}// new Subscriber so onStart itsubscriber.onStart();/** See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls* to user code from within an Observer"*/// if not already wrappedif (!(subscriber instanceof SafeSubscriber)) {// assign to `observer` so we return the protected versionsubscriber = new SafeSubscriber<T>(subscriber);}// The code below is exactly the same an unsafeSubscribe but not used because it would // add a significant depth to already huge call stacks.try {// allow the hook to intercept and/or decoratehook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);return hook.onSubscribeReturn(subscriber);} catch (Throwable e) {// special handling for certain Throwable/Error/Exception typesExceptions.throwIfFatal(e);// if an unhandled error occurs executing the onSubscribe we will propagate ittry {subscriber.onError(hook.onSubscribeError(e));} catch (Throwable e2) {Exceptions.throwIfFatal(e2);// if this happens it means the onError itself failed (perhaps an invalid function implementation)// so we are unable to propagate the error correctly and will just throwRuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);// TODO could the hook be the cause of the error in the on error handling.hook.onSubscribeError(r);// TODO why aren't we throwing the hook's return value.throw r;}return Subscriptions.unsubscribed();}}

可以看到,subscriber() 做了3件事:

  1. 调用 Subscriber.onStart() 。这个方法在前面已经介绍过,是一个可选的准备方法。
  2. 调用 Observable 中的 OnSubscribe.call(Subscriber) 。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中, Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候。
  3. 将传入的 Subscriber 作为 Subscription 返回。这是为了方便 unsubscribe().

整个过程中对象间的关系如下图:

除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 还支持不完整定义的回调,RxJava 会自动根据定义创建出 Subscriber 。形式如下:

Action1<String> onNextAction = new Action1<String>() {// onNext()@Overridepublic void call(String s) {Log.d(tag, s);}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {// onError()@Overridepublic void call(Throwable throwable) {// Error handling}
};
Action0 onCompletedAction = new Action0() {// onCompleted()@Overridepublic void call() {Log.d(tag, "completed");}
};// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

注:正如前面所提到的,Observer 和 Subscriber 具有相同的角色,而且 Observer 在 subscribe() 过程中最终会被转换成 Subscriber 对象,因此,从某种程度上来说用 Subscriber 来代替 Observer ,这样会更加严谨。

根据目前的经验来看,Observer与Subscriber的主要区别在于onNext方法执行完毕后是否取消了订阅。

首先,我们分别定义mSubscriber 和 mObserver 。

如下代码:

protected Subscriber<D> mSubscriber = new Subscriber<D>() {@Overridepublic void onCompleted() {executeOnLoadFinish();}@Overridepublic void onError(Throwable e) {TLog.error("onError " + e.toString());executeOnLoadDataError(null);}@Overridepublic void onNext(D d) {TLog.log("onNext " );List<T> list = d;TLog.log("entity " + list.size());executeOnLoadDataSuccess(list);TLog.log("onSuccess totalPage " + totalPage);}};protected Observer<D> mObserver = new Observer<D>() {@Overridepublic void onCompleted() {executeOnLoadFinish();}@Overridepublic void onError(Throwable e) {TLog.error("onError " + e.toString());executeOnLoadDataError(null);}@Overridepublic void onNext(D d) {TLog.log("onNext " );List<T> list = d;TLog.log("entity " + list.size());executeOnLoadDataSuccess(list);TLog.log("onSuccess totalPage " + totalPage);}};
observable.subscribeOn(Schedulers.io()).map(new Func1<Response<D>,D>() {@Overridepublic D call(Response<D> response) {if(response == null){throw new ApiException(100);}totalPage = response.total;return response.result;}}).observeOn(AndroidSchedulers.mainThread())//.subscribe(mObserver);.subscribe(mSubscriber);

subscribe(mObserver)和subscribe(mSubscriber)执行结果就会有区别:

  • subscribe(mSubscriber)这种订阅方式在第二次请求数据时就不会执行了,原因就是onCompleted后自动取消了订阅
    (详见文章:http://blog.csdn.net/jdsjlzx/article/details/51542003);
  • subscribe(mObserver)则不出现此问题。

提醒:个人以为subscribe(mObserver)这个方式更适合分页加载。

请注意,如果你每次都使用subscribe(new Subscriber< T>() {})方式实现订阅,就不会出现上面的问题。

如下代码:

private void toSubscribe(Observable<Response<D>> observable) {observable.subscribeOn(Schedulers.io()).map(new Func1<Response<D>,D>() {@Overridepublic D call(Response<D> response) {if(response == null){throw new ApiException(100);}totalPage = response.total;return response.result;}}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<D>() {@Overridepublic void onCompleted() {executeOnLoadFinish();}@Overridepublic void onError(Throwable e) {TLog.error("onError " + e.toString());executeOnLoadDataError(null);}@Overridepublic void onNext(D d) {TLog.log("onNext " );List<T> list = d;TLog.log("entity " + list.size());executeOnLoadDataSuccess(list);TLog.log("onSuccess totalPage " + totalPage);}});}

当然,这个方式实现分页加载也是可以的。至于哪个更好,还需要再验证。

引用:

Subscriber是Observer接口的一个抽象实现;推荐使用Subscriber,实际上RxJava在subscibe过程中,会先将Observer转换为一个Subscriber。

了解RxJava .observeOn()与.subscribeOn():
http://blog.csdn.net/jdsjlzx/article/details/51685769

RxJava Observer与Subscriber的关系相关推荐

  1. RxJava中的doOnSubscribe默认运行线程分析

    假设你对RxJava1.x还不是了解,能够參考以下文章. 1. RxJava使用介绍 [视频教程] 2. RxJava操作符   • Creating Observables(Observable的创 ...

  2. RxJava 过滤操作符(Filtering Observables Operators)

    RxJava系列教程: 1. RxJava使用介绍 [视频教程] 2. RxJava操作符   • Creating Observables(Observable的创建操作符) [视频教程]   • ...

  3. Rxjava源码分析之IO.Reactivex.Observer

    Android 中的观察者模式,Rxjava中有两个重要的类Observable和Observer,函数响应式编程具体表现为一个观察者(Observer)订阅一个可观察对象(Observable).通 ...

  4. RxJava 源码解析之观察者模式

    了解 RxJava 的应该都知道是一个基于事务驱动的库,响应式编程的典范.提到事务驱动和响应就不得不说说,设计模式中观察者模式,已经了解的朋友,可以直接跳过观察者模式的介绍,直接到 RxJava 源码 ...

  5. RxJava使用(一)基本使用

    前言 RxJava及RxAndroid比较详细的介绍可以参考该文档<给 Android 开发者的 RxJava 详解> 基本介绍 ReactiveX 及 RxJava使用大部分来自和参考& ...

  6. 【Rxjava】通俗说Rxjava

    如下为Rxjava最基础也是最常见的使用样例: /*** 最基础的使用*/Observable.create(new OnSubscribe<String>() {@Overridepub ...

  7. 【Android】RxJava的使用(一)基本用法

    前言 最近经常看到RxJava这个字眼,也看到很多人在博客中推荐使用RxJava.好奇的我怎么能错过,于是Google了一下,说RxJava好用的和说RxJava难用的都有,于是自己也亲自尝试了一下( ...

  8. RxJava 和 RxAndroid 一 (基础)

    1.RxJava 项目地址 https://github.com/ReactiveX/RxJava 2.RxAndroid 项目地址    https://github.com/ReactiveX/R ...

  9. android ------- 开发者的 RxJava 详解

    在正文开始之前的最后,放上 GitHub 链接和引入依赖的 gradle 代码: Github:  https://github.com/ReactiveX/RxJava  https://githu ...

  10. 给 Android 开发人员的 RxJava 具体解释

    前言 我从去年開始使用 RxJava .到如今一年多了. 今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava .并且使用的场景越来越多 . 而 ...

最新文章

  1. 民生银行马坡数据中心基础设施Uptime Mamp;O运营管理体系建立与实施
  2. ML之二分类预测:以岩石水雷数据集(RockMine)为例从0到1深入理解科学预测之分类问题的思路框架(特征工程详细步骤(特征分析与特征处理)+分类模型设计)
  3. 川崎机器人c#通讯(转)
  4. PHP操作数据库,不推荐使用mysql函数,而推荐使用mysqli和PDO函数
  5. 计算机辅助应用的缩写有什么,计算机辅助设计的英文缩写是什么
  6. tomcat的localhost_access_log日志文件
  7. 利用Diskgen找回分区破坏前的资料
  8. 除了加强风控,大数据还能为FinTech做些什么?
  9. ijcai2017赛后总结
  10. c语言入门经典18个程序,c语言入门经典必背18个程序
  11. python安装numpy库教程_Python库之numpy库的安装教程
  12. vs2010中svn使用教程_vs2010+ Ankhsvn使用详解
  13. 新西兰计算机预科学费多少钱,去新西兰读预科需要多少费用?
  14. ES6 对象数组查找某一个对象
  15. 计算机模拟圣彼得堡游戏
  16. JavaOne美国之行–Session篇
  17. 已知字符串str1 = tomorrow is sunny day,下列表达式能正确查找到子字符串is的是()(选两项)
  18. cmd 修改ip地址
  19. html 字体样式加粗,css字体怎么加粗?
  20. pip安装selenium

热门文章

  1. 性能测试监控TP50、TP99、TP999含义
  2. 你知道微服务如何拆分,能解决哪些问题?
  3. 【vue 导出功能】file-saver插件
  4. 计算机快捷键任务管理器,任务管理器经常要打开任务管理器快捷键是哪个
  5. 一款老飞飞_魅力飞飞脚本研究增加攻击与暴击几率方式探讨源码(附带易语言源码)
  6. signature=735f4378ec01919f23285d0d2557be19,OPENSSL编程 第二十章 椭圆曲线
  7. 哈罗选了个好时点上线顺风车业务,但很可能雷声大雨点小
  8. java的nexttoken_int nextToken()
  9. 关于使用Python——写模拟手机通讯录查询系统
  10. Android Floyd-Steinberg-Dithering、Stucki-dither 抖动处理