1、RxJava 2.0

1.1、什么是RxJava和响应式编程(reactive programming)

在响应编程中,消费者对数据进行反应,这就是为什么异步编程也被称为响应式编程的原因。 响应式编程允许将事件更改传播到已注册的观察者。

RxJava是从Netflix的反向扩展(Rx)到Java的端口。 RxJava是2014年开源的,托管于http://reactivex.io/。

“观察者模式做的正确。 ReactiveX是来自Observer模式,Iterator模式和功能编程的最佳创意的组合。“

--activex.io

这个概念的Java版本叫做RxJava,它托管在https://github.com/ReactiveX/RxJava下。 RxJava根据Apache 2.0许可证发布。
       RxJava将自己描述为用于具有可观察流的异步编程的API。

1.2、定义与RxJava 2.0的依赖关系

在撰写本文时,2.0.4版本目前是发布版本。 将g.a.v替换为2.0.6或更高版本。

对于Gradle构建,您可以通过以下依赖关系语句添加RxJava。

compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: 'g.a.v'
      对于Maven,您可以添加以下代码段的依赖关系

<dependency><groupId>io.reactivex.rxjava2</groupId><artifactId>rxjava</artifactId><version>g.a.v</version>
</dependency>

对于OSGi环境,例如Eclipse RCP开发,https://dl.bintray.com/simon-scholz/RxJava-OSGi/可用作p2更新站点。

1.3、 异步编程

现在的编程以一种迫切的单向线程的方式进行编程通常会导致奇怪的行为,阻塞不响应的用户界面,从而导致糟糕的用户体验。
       例如,如果网络不响应,则主动等待数据库查询或Web服务调用可能导致应用程序冻结。
       这可以通过异步处理不可预测的事情来避免。

一个例子是:

public List<Todo> getTodos() {List<Todo> todosFromWeb = // query a webservice (with bad network latency)return todosFromDb;
}

从主线程或UI线程调用getTodos()方法将导致一个非响应的应用程序,直到todosFromWeb到达。
      为了改进这个查询,这需要不可预测的时间量,这个代码应该运行在不同的线程中,并在结果进入时通知主线程。

public void getTodos(Consumer<List<Todo>> todosCallback) {Thread thread = new Thread(()-> {List<Todo> todosFromWeb = // query a webservicetodosCallback.accept(todosFromWeb);});thread.start();
}

现在调用getTodos(Consumer <List <Todo >> todosConsumer)后,主线程可以继续工作,一旦调用了给定的Consumer的accept方法,就不会被阻塞,并且可以做出反应。
       现在的代码真正是异步的。
       但是如果发生Web服务查询中的错误怎么办?

public void getTodos(FailableCallback<List<Todo>> todosCallback) {Thread thread = new Thread(()-> {try {List<Todo> todosFromWeb = // query a web servicetodosCallback.accept(todosFromWeb);} catch(Exception ex) {todosCallback.error(ex);}});thread.start();
}

使用自定义FailableCallback界面可以工作,但也增加了复杂性。
      还有更多的问题可以发生:

  • 与UI同步(SWT和Android中的小部件必须从UI线程更新)
  • 如果FailableCallback的消费者不再存在,该怎么办?
  • 如果这样的FailableCallback取决于另一个FailableCallback怎么办?
public void getUserPermission(FailableCallback<UserPermission> permissionCallback) {Thread thread = new Thread(()-> {try {UserPermission permission = // query a web servicepermissionCallback.accept(permission);} catch(Exception ex) {permission.error(ex);}});thread.start();
}public void getTodos(FailableCallback<List<Todo>> todosCallback) {Thread thread = new Thread(()-> {getUserPermission(new FailableCallback() {public void accept(UserPermission permission) {if(permission.isValid()) {try {List<Todo> todosFromWeb = // query a web serviceif(!todosCallbackInstance.isDisposed()) {if(syncWithUIThread()) {todosCallback.accept(todosFromWeb);}}} catch(Exception ex) {if(!todosCallbackInstance.isDisposed()) {if(syncWithUIThread()) {todosCallback.error(ex);}}}}}public void error(Exception ex) {// Oh no!}});});thread.start();
}

这是非常糟糕的编码,它可能会变得更糟,应该只显示一个可以用ReactiveX解决的例子。 这些问题通常被认为是回调地狱/大坑。

2、RxJava可观察类型

为了存档这个RxJava,它附带了作为数据源的可观察类型,用于订阅这些可观察类型的类以及用于修改,组合和转换正在观察者和订阅者之间交换的数据的许多方法。

其中一些方法与Java 8提供的Stream API非常相似,例如filter(),map()等等。     表1.表可观察类型 


     可以重复或甚至无限发射数据的类型Flowable<T>Obervable<T>.
Observable<Todo> todoObservable = Observable.create(emitter -> {try {List<Todo> todos = getTodos();for (Todo todo : todos) {emitter.onNext(todo);}emitter.onComplete();} catch (Exception e) {emitter.onError(e);}
});

典型的Observable可能会发出无限数据,就像一个点击监听器一样,UI监听器是不可预测的,通常用户可能会点击按钮或其他UI小部件。通常终止成功或失败的类型Maybe<T>Single<T>和 Completable。
      Maybe<T>
对象是一种异步java.util.Optional从Java 8。

Maybe<List<Todo>> todoMaybe = Maybe.create(emitter -> {try {List<Todo> todos = getTodos();if(todos != null && !todos.isEmpty()) {emitter.onSuccess(todos); (1)}else {emitter.onComplete();   (2)}} catch (Exception e) {emitter.onError(e);   (3)}
});
(1) java.util.Optional 与一个值
(2) java.util.Optional 不包含值→null
(3) 发生错误

Single<T>对象也可以被认为是承诺,在异步框架中也很受欢迎,并且类似于Maybe<T>对象,但只有没有onComplete()方法。
Completable对象与Single<T>对象非常相似,但没有返回值,因此也不具有类似其他类型的泛型。Completable对象也可以看作是反应式java.lang.Runnable对象。
      除了这些可观察类型的最流行的create()方法之外,还有更多的方便方法来创建这些类型之一。

  • Observable.just() - 允许在其他数据类型周围创建一个可观察的包装
  • Observable.fromIterable() - 接受一个java.lang.Iterable <T>,并在数据结构中按顺序排列它们的值
  • Observable.fromArray() - 获取一个数组,并在数据结构中按顺序排列它们的值
  • Observable.fromCallable() - 允许为java.util.concurrent.Callable <V>创建一个observable
  • Observable.fromFuture() - 允许为java.util.concurrent.Future创建一个observable
  • Observable.interval() - 在给定间隔内发出长对象的可观察值
  • ...

3、在RxJava中订阅

一个可观察的实例是可用的监听器/用户可以附加。
    所有可观察类型都提供了各种各样的订阅方法。

Observable<Todo> todoObservable = Observable.create(emitter -> { ... });// Simply subscribe with a io.reactivex.functions.Consumer<T>, which will be informed onNext()
Disposable disposable = todoObservable.subscribe(t -> System.out.print(t));// Dispose the subscription when not interested in the emitted data any more
disposable.dispose();// Also handle the error case with a second io.reactivex.functions.Consumer<T>
Disposable subscribe = todoObservable.subscribe(t -> System.out.print(t), e -> e.printStackTrace());// ...

  • 有更多的io.reactivex.functions.Consumer <T> onNext,onSuccess,onFailure,onComplete等符合可观察类型。
  • io.reactivex.functions.Consumer <T>几乎等于java 8中的java.util.function.Consumer,除了它的accept方法可以抛出异常。 此外,RxJava也不依赖Java 8,但与Java 6兼容。
在可观察的实例上还有一个subscribeWith方法,可以像这样使用:
DisposableObserver<Todo> disposableObserver = todoObservable.subscribeWith(new         DisposableObserver<Todo>() {@Overridepublic void onNext(Todo t) {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}
});

4、处理订阅并使用CompositeDisposable

当注册者或订阅者被附加时,他们通常不应该永久地聆听。所以可能会发生这样的情况:由于某些状态的改变,一个可观察的发射的事件可能不再是有趣的。

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;Single<List<Todo>> todosSingle = getTodos();Disposable disposable = todosSingle.subscribeWith(new DisposableSingleObserver<List<Todo>>() {@Overridepublic void onSuccess(List<Todo> todos) {// work with the resulting todos}@Overridepublic void onError(Throwable e) {// handle the error case}
});// continue working and dispose when value of the Single is not interesting any more
disposable.dispose();
  • 单一类和其他可观察类提供不同的订阅方法,返回一个Disposable对象。

当使用多个订阅(由于使用CompositeDisposable进行相同状态更改可能会过时)可能会很方便地处理订阅集合。

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.disposables.CompositeDisposable;CompositeDisposable compositeDisposable = new CompositeDisposable();Single<List<Todo>> todosSingle = getTodos();Single<Happiness> happiness = getHappiness();compositeDisposable.add(todosSingle.subscribeWith(new DisposableSingleObserver<List<Todo>>() {@Overridepublic void onSuccess(List<Todo> todos) {// work with the resulting todos}@Overridepublic void onError(Throwable e) {// handle the error case}
}));compositeDisposable.add(happiness.subscribeWith(new DisposableSingleObserver<Happiness>() {@Overridepublic void onSuccess(Happiness happiness) {// celebrate the happiness :-D}@Overridepublic void onError(Throwable e) {System.err.println("Don't worry, be happy! :-P");}
}));// continue working and dispose all subscriptions when the values from the Single objects are not interesting any more
compositeDisposable.dispose();

5.缓存已完成的可观察值的值

当使用可观察器对可观察器上的每个订阅进行异步调用时,通常不是必需的。可能会发生在应用程序中传递观察器,而不需要在添加订阅的同时进行这样一个昂贵的调用。以下代码执行昂贵的网页查询4次,即使这样做一次会很好,因为应该显示相同的Todo对象,而只能以不同的方式显示。

Single<List<Todo>> todosSingle = Single.create(emitter -> {Thread thread = new Thread(() -> {try {List<Todo> todosFromWeb = // query a webserviceSystem.out.println("Called 4 times!");emitter.onSuccess(todosFromWeb);} catch (Exception e) {emitter.onError(e);}});thread.start();
});todosSingle.subscribe(... " Show todos times in a bar chart " ...);showTodosInATable(todosSingle);todosSingle.subscribe(... " Show todos in gant diagram " ...);anotherMethodThatsSupposedToSubscribeTheSameSingle(todosSingle);

下一个代码片段使用缓存方法,以便在第一次成功之后,Single实例保持其结果。

Single<List<Todo>> todosSingle = Single.create(emitter -> {Thread thread = new Thread(() -> {try {List<Todo> todosFromWeb = // query a webserviceSystem.out.println("I am only called once!");emitter.onSuccess(todosFromWeb);} catch (Exception e) {emitter.onError(e);}});thread.start();
});// cache the result of the single, so that the web query is only done once
Single<List<Todo>> cachedSingle = todosSingle.cache();cachedSingle.subscribe(... " Show todos times in a bar chart " ...);showTodosInATable(cachedSingle);cachedSingle.subscribe(... " Show todos in gant diagram " ...);anotherMethodThatsSupposedToSubscribeTheSameSingle(cachedSingle);

6. Flowable<T> 和 Backpressure

RxJava 2.0引入了一种新型的Flowable <T>,它与API相当于Observable <T>,但Flowable <T>支持Backpressure,而Observable <T>则不支持。回到RxJava 1.0Backpressure的概念太晚了,被添加到了Observable的类型,但有些则抛出了一个MissingBackpressureException,所以区分Flowable <T>Observable <T>是一件好事。除了Observable <T>也可能<T>,Single <T>Completable没有Backpressure

7.类型之间的转换

很容易在不同的RxJava类型之间进行转换。

From / To Flowable Observable Maybe Single Completable

Flowable

 

toObservable()

reduce()
elementAt()
firstElement()
lastElement()
singleElement()

scan()
elementAt()
first()/firstOrError()
last()/lastOrError()
single()/singleOrError()
all()/any()/count()
(and more…​)

ignoreElements()

Observable

toFlowable()

 

reduce()
elementAt()
firstElement()
lastElement()
singleElement()

scan()
elementAt()
first()/firstOrError()
last()/lastOrError()
single()/singleOrError()
all()/any()/count()
(and more…​)

ignoreElements()

Maybe

toFlowable()

toObservable()

 

toSingle()
sequenceEqual()

toCompletable()

Single

toFlowable()

toObservable()

toMaybe()

 

toCompletable()

Completable

toFlowable()

toObservable()

toMaybe()

toSingle()
toSingleDefault()

 

8、测试RxJava可观察和订阅

8.1、 测试可观察量

您可以通过RxJava库提供的TestSubscriber类来测试可观察值。

Observable<String> obs = ...// assume creation code here
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
obs.subscribe(testSubscriber);testSubscriber.assertNoErrors();
List<String> chickens = testSubscriber.getOnNextEvents();// TODO assert your string integrity...

8.2、 测试可观察量

RxJava提供了一种覆盖显示的调度程序的方法,以便可观察器被同步调用。 请参阅http://fedepaol.github.io/blog/2015/09/13/testing-rxjava-observables-subscriptions/作为示例。

9、RxJava的相关资源

Introduction to RxJava

RxJava for Android

Dan Lew’s Grokking with RxJava series

Crunching RxAndroid blog series from Roberto Orgiu

GOTO 2016 • Exploring RxJava 2 for Android • Jake Wharton

Droidcon NYC 2016 - Looking Ahead to RxJava 2

RxJava 2.0 Examples for Android

10、原文链接

Using RxJava for Android

在Android中运用RxJava相关推荐

  1. android项目集成okgo,Android中MVP+RXJAVA+OKGO框架

    [实例简介] Android中MVP+RXJAVA+OKGO框架 Glide的封装 沉浸式状态栏 butterknife 和recyclerview的使用 [实例截图] [核心代码] 882096ee ...

  2. Android中的RxJava

    最近准备梳理一下Kotlin,先复习一遍RxJava思想,做个学习笔记+伪代码,整个脉络分为三个部分. (一)使用场景 RxJava是重量级.最复杂的框架(没有之一),JakeWharton 的巅峰之 ...

  3. RxJava操作符在android中的使用场景详解(一)

    转载请注明出处:http://www.wangxinarhat.com/2016/04/19/2016-04-19-rxjava-android-operate1/ 最近学习了RxJava在andro ...

  4. RxJava 在Android中的应用(一)

    RxJava 在Android中的应用 RxJava 是什么, 能解决什么问题 对比其他技术理解为什么要选择RxJava RxJava的相关API RxJava在Android中的典型使用场景 RxJ ...

  5. [Android]在Dagger 2中使用RxJava来进行异步注入(翻译)

    以下内容为原创,欢迎转载,转载请注明 来自天天博客:http://www.cnblogs.com/tiantianbyconan/p/6236646.html 在Dagger 2中使用RxJava来进 ...

  6. 一个整合OkHttp 、Retrofit 、Volley 、RxJava、Novate多种开源网络框架的项目,高度的封装和集成,Android中Web网络请求一行代码解决

    一个整合OkHttp .Retrofit .Volley .RxJava.Novate多种开源网络框架的项目,高度的封装和集成,Android中Web网络请求一行代码解决 AndroidHttp 一个 ...

  7. rxjava在Android中的使用

    前言 我从去年开始使用 RxJava ,到现在一年多了.今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava ,并且使用的场景越来越多 .而最近 ...

  8. android:catation=quot;90quot;,Android中的AlarmManager的使用.htm

    var protocol = window.location.protocol; document.write(' Android中的AlarmManager的使用 - wangxingwu_314的 ...

  9. android ------- 开发者的 RxJava 详解

    在正文开始之前的最后,放上 GitHub 链接和引入依赖的 gradle 代码: Github:  https://github.com/ReactiveX/RxJava  https://githu ...

最新文章

  1. php-cgi 重启,自动监测和重启 FastCGI 服务
  2. vue---进行post和get请求
  3. 西南石油大学计算机科学校区,任冬梅 - 西南石油大学 - 计算机科学学院
  4. python爬虫机器_Python常用的机器学习库|python爬虫|python入门|python教程
  5. Linux 系统下显示文件内容(查看文件内容)的命令 more/less/cat/head/tail 比较
  6. 收集系统性能数据并通过gnuplot绘图
  7. java 反复器_java集合类中的枚举器(反复器)
  8. VisualSVN-Server 安装以及使用教程
  9. 微信小程序的页面布局(1)
  10. 计算机专业面试 英文,计算机专业面试英文自我介绍
  11. OBS_Classic经典版框架
  12. MongoDB 学习笔记八 复制、分片、备份与恢复、监控
  13. [ 习题 ] 句子简化题 细节题 排除题
  14. 创世卓越高清晰PDF图书下载 40本
  15. 按键精灵安卓版去除重复数组然后排序排序
  16. 改变世界的17个方程式,你认识几个?
  17. 问题 H: 活字印刷
  18. 从满腹经纶到入行小白:理论学习与实际应用的差距
  19. error: (-5:Bad argument) image is empty or has incorrect depth (!=CV_8U) in function ‘cv::SIFT_Impl:
  20. Python - 文件基础操作

热门文章

  1. 【软件项目管理】知识点整理
  2. python内置函数下划线_python(内置函数)
  3. fastunfolding算法_社区发现算法综述—part1
  4. 我的世界java骷髅马_我的世界:骷髅马材质更新,老MC教你获得骷髅马技巧,萌新:真好...
  5. linux环境下python 库模块安装
  6. Prometheus一条告警是怎么触发的
  7. oracle下创建id自增长
  8. RabbitMQ消息队列-VirtualHost与权限管理
  9. 《OpenGL编程指南》一第3章 OpenGL绘制方式
  10. 第七讲:tapestry可预览的模板页