RxJava2.x是一个非常棒的流式编程,采用的观察者模式思想,事件的产生者产生事间之后发送给绑定的接受者,接受顺序与发送顺序一致.但是 是独立于RxJava1.x存在,本文讲解RxJava2.x的简
RxJava2.x是一个非常棒的流式编程,采用的观察者模式思想,事件的产生者产生事间之后发送给绑定的接受者,接受顺序与发送顺序一致.但是 是独立于RxJava1.x存在,本文讲解RxJava2.x的简单使用
RxJava2 封装主要变化
- Transformer的变化:RxJava1.X为rx.Observable.Transformer接口, 继承自Func1<Observable<T>, Observable<R>>, RxJava2.X为io.reactivex.ObservableTransformer<Upstream, Downstream>,是一个独立的接口。
- Flowable则是FlowableTransformer,如果你使用Flowable,以下ObservableTransformer替换FlowableTransformer即可。
compile 'io.reactivex.rxjava2:rxjava:2.1.0'compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
简单使用:
//观察者模式,这里产生事件,事件产生后发送给接受者,但是一定要记得将事件的产生者和接收者捆绑在一起,否则会出现错误
Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {//这里调用的方法会在产生事件之后会发送给接收者,接收者对应方法会收到e.onNext("hahaha");e.onError(new Exception("wulala"));e.onComplete();}}).subscribe(new Observer<String>() {//接受者,根据事件产生者产生的事件调用不同方法@Overridepublic void onSubscribe(Disposable d) {Log.e(TAG, "onSubscribe: ");}@Overridepublic void onNext(String value) {Log.e(TAG, "onNext: " + value);}@Overridepublic void onError(Throwable e) {Log.e(TAG, "onError: ", e);}@Overridepublic void onComplete() {Log.e(TAG, "onComplete: ");}
});
我们来用图解一下这其中发生了什么事:
上游朝下游发送数据,经过subscribe使上下游产生关系,即达成订阅.
解析1:
ObservableEmitter,这是个啥东西?Emitter:顾名思义,即Rxjava的发射器,通过这个发射器,即可发送事件-----通过调用onNext,onError,onComplete方法发送不同事件.
注意:
虽然RxJava可以进行事件发送,但这并不意味着你可以随便发送,这其中需要遵循一些规则.
onNext:你可以发送无数个onNext,发送的每个onNext接受者都会接收到.
onError:当发送了onError事件之后,发送者onError之后的事件依旧会继续发送,但是接收者当接收到onError之后就会停止接收事件了.
onComplete:当发送了onComplete事件之后,发送者的onComplete之后的事件依旧会继续发送,但是接收者接收到onComplete之后就停止接收事件了.
onError事件和onComplete事件是互斥的,但是这并不代表你配置了多个onError和onComplete一定会崩溃,多个onComplete是可以正常运行的,但是只会接收到第一个,之后的就不会再接收到了,多个onError时,只会接收到第一个,第二个会直接造成程序崩溃.
解析2:
Disposable又是个啥东西,翻译之后百度告诉我这东西叫做一次性的,是用来控制发送者和接受者之间的纽带的,默认为false,表示发送者和接受者直接的通信阀门关闭,可以正常通信,在调用dispose()方法之后,阀门开启,会阻断发送者和接收者之间的通信,从而断开连接.
重载方法:
subscribe(); //表示发送者随意发送数据,接受者什么都不管,什么都不接收.subscribe(Consumer<? super T> onNext) {} //只响应onNext()事件,其他的事件忽略.subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} //含义同上subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {} //含义同上subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {} //含义同上
解析3:
默认情况下,发送者和接收者都运行在主线程,但是这显然是不符合实际需求的,我们在日常使用中,通常用的最多的就是在子线程进行各种耗时操作,然后发送到主线程进行,难道我们就没有办法继续用这个优秀的库了?想多了你,一个优秀的库如果连这都想不到,怎么能被称为优秀呢,RxJava中有线程调度器,通过线程调度器,我们可以很简单的实现这种效果,下面放代码.
Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {e.onNext("hahaha");e.onNext("hahaha");e.onNext("hahaha");Log.e(TAG,"运行在什么线程" + Thread.currentThread().getName());e.onComplete();}
}).subscribeOn(Schedulers.newThread()) //线程调度器,将发送者运行在子线程.observeOn(AndroidSchedulers.mainThread()) //接受者运行在主线程.subscribe(new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {Log.e(TAG, "onSubscribe: ");Log.e(TAG,"接收在什么线程" + Thread.currentThread().getName());}@Overridepublic void onNext(String value) {Log.e(TAG, "onNext: " + value);}@Overridepublic void onError(Throwable e) {Log.e(TAG, "onError: ", e);}@Overridepublic void onComplete() {Log.e(TAG, "onComplete: ");}
});
注意事项: 变换线程方法与1.x一致
subscribeOn(),只有在第一次调用的时候生效,之后不管调用多少次,只会以第一次为准.
observeOn(),可以被调用多次,每次调用都会更改线程.
RxJava线程池中的几个线程选项
- Schedulers.io() io操作的线程, 通常io操作,如文件读写.
- Schedulers.computation() 计算线程,适合高计算,数据量高的操作.
- Schedulers.newThread() 创建一个新线程,适合子线程操作.
- AndroidSchedulers.mainThread() Android的主线程,主线程
操作符之变换
Map:
首先是变换操作符- > Map,(此处引入以前看过的一篇文章的一句话:不知道Map已经统治世界了么?)那么在RxJava2中,Map究竟是个什么鬼.
map是RxJava中最简单的一个变换操作符,它的作用是将上游发送过来的事件都去应用一个函数,让每一个事件都按照该函数去变化,下游接收到事件时,就变成了变化过后的事件,多说无益,上代码.
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);}}).map(new Function<Integer, String>() {@Overridepublic String apply(Integer integer) throws Exception {return "我是变换过后的" + integer;}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {Log.e("XYK",s);}});
}
通过运行结果可以看到,我们在上游发送的数据类型为Integer,到了下游接收到的数据为String类型,中间通过map对其进行了转换,是不是感觉很强大?通过map我们可以将上游数据转化为任意类型发送到下游,就是这么6~
圆形事件1,2,3,经过Map转化之后,变成了三角形事件1,2,3,但是有童鞋要问了,这有什么用呢,我们来举一个实际需求的例子,用Map来做一下:
读取一篇英文文章,将文章中的字符全部转换为大写.
我们先来用非RxJava2来做一下:
//模拟一篇文章
String article = "fkjdsalijfofldaJFOIEjfldanlJR2OnfldajilwafkndaIUPO32,LFKjlijuJFLMA";char[] chars = article.toCharArray();StringBuffer sb = new StringBuffer();for (int i = 0; i < chars.length; i++) {Log.e(TAG,chars[i] + "");if(chars[i] >= 'a' && chars[i] <= 'z'){sb.append((chars[i] + "").toUpperCase());}else{sb.append(chars[i]);}}Log.e(TAG,sb.toString());
好像看上去没什么问题,但是这逼格显然不够高,程序员的精髓不就是要敲出一段逼格超高的代码么,我们试试用RxJava2:
//模拟一篇文章String article = "fkjdsalijfofldaJFOIEjfldanlJR2OnfldajilwafkndaIUPO32,LFKjlijuJFLMA";final char[] chars = article.toCharArray();Observable.create(new ObservableOnSubscribe<Character>() {@Overridepublic void subscribe(ObservableEmitter<Character> e) throws Exception {for (int i = 0; i < chars.length; i++) {e.onNext(chars[i]);}}//delay 延时5秒发送}).delay(5, TimeUnit.SECONDS)//事件类型转换.map(new Function<Character, String>() {@Overridepublic String apply(Character s) throws Exception {if (s >= 'a' && s <= 'z') {return s.toString().toUpperCase();} else {return s.toString();}}})//线程调度.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {Log.e(TAG, s);}});
瞬间逼格就上去了,而且还做了线程调度等操作,是不是心头顿时感觉一串666飘过,这仅仅是最基础的转化操作符,接下来我们在看一个FlatMap:
FlatMap
FlatMap,上来就看到map,这个操作符和刚才的map有什么区别呢,flatmap可以将上游发送过来的数据,变换为多个数据,然后合并为一个事件发送到下游,这么说是不是有点难懂?恩,还是直接上代码:
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);}}).flatMap(new Function<Integer, ObservableSource<String>>() {@Overridepublic ObservableSource<String> apply(Integer integer) throws Exception {List<String> list = new ArrayList<String>();for (int i = 0; i < 5; i++) {list.add("我是变换过的" + integer);}return Observable.fromIterable(list);}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {Log.e("XYK", s);}});
}
通过运行结果可以看到,上游发送的数据在到达flatmap的时候,经过处理,将每个事件变成了5个,而后将5个合并为1个事件发送到下游,并且我们可以注意到,发送到下游的数据是无序的,那么这时候就要说了,我要接收的事件是有序的怎么办,这就是接下来要说的concatMap.
ConcatMap:
ConcatMap和FlatMap一样,只不过一个是有序,一个是无序而已,我们直接把上边的代码做一个更改:
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);}}).concatMap(new Function<Integer, ObservableSource<String>>() {@Overridepublic ObservableSource<String> apply(Integer integer) throws Exception {List<String> list = new ArrayList<String>();for (int i = 0; i < 5; i++) {list.add("我是变换过的" + integer);}return Observable.fromIterable(list);}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {Log.e("XYK", s);}});
}
过滤操作符
Filter
Filter,顾名思义,过滤器,可以过滤掉一部分不符合要求的事件,当上游给我们发送的数据超多,而下游需要的只是一些特定的数据,如果全部接收上游发送的数据,很容易造成OOM,为了避免OOM的出现,我们则需要对上游数据进行过滤,具体操作如下:
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> e) throws Exception {for (int i = 0; i < 10000; i++) {e.onNext(i);}}}).observeOn(Schedulers.io()).subscribeOn(AndroidSchedulers.mainThread()).filter(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {return integer % 7 == 0;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Log.e("XYK",integer + "");}});}
在上面的代码中,我们朝下游发送了10000个数据,而我只需要其中可以被7整除的数据,利用filter,将其他的数据过滤出去,留下需要的数据.
Filter方法使我们经常用到的一个过滤方法,基本已经可以满足大部分应用场所了,最常见的是过滤一些null对象,但是除此之外,还有一些其他的过滤方法,我们也来看下.
Sample
Sample,样品,其功能也是,sample会每隔一段时间对上游数据进行取样,发送到下游,但是这样会导致丢失了大量事件,比较适合特定场合,如对一组数中进行抽样,代码如下:
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> e) throws Exception {for (int i = 0; ; i++) {e.onNext(i);}}}).sample(1,TimeUnit.SECONDS).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Log.e("XYK",integer + "");}});}
在上边的代码中,使用sample之后,每隔1秒对上游数据采样一次,发送到下游,其他事件则被过滤.
take/takeList
take和takeList方法可以将上游事件中的前N项或者最后N项发送到下游,其他事件则进行过滤,代码如下:
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> e) throws Exception {for (int i = 0; ; i++) {e.onNext(i);}}}).take(3)//.takeList(3).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Log.e("XYK",integer + "");}});}
distinct
distinct方法,可以将重复对象去除重复对象,这里我们要用到一个方法,repeat(),产生重复事件,这里重复事件,再去除有些多余,只作为一个例子来展示.
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> e) throws Exception {for (int i = 0;i < 50; i++) {e.onNext(i);}}}).take(3)//生成重复事件.repeat(3).distinct().subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Log.e("XYK",integer + "");}});
组合操作符
zip操作符:
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);e.onNext(4);}
});Observable<String> observable1 = Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {e.onNext("这是");e.onNext("这个是");e.onNext("这个则是");}
});Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {e.onNext("个");e.onNext("只");e.onNext("条");e.onNext("张");e.onNext("本");e.onNext("副");}
});Observable.zip(observable, observable1, observable2, new Function3<Integer, String, String, String>() {@Overridepublic String apply(Integer integer, String s, String s2) throws Exception {return s + integer + s2;}
}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {Log.e("XYK",s);}
});
运行结果:
我们可以看到,3条上游中分别有4个事件,3个事件,6个事件,经过zip操作符操作之后为什么就只变成了3个事件了呢?我们来打下Log,看看其他事件去哪了.
添加Log之后的运行结果:
根据运行结果可以看到,上游逐条发送到下游,下游在接收到最后一条上游发送过来的事件之后开始组合,而多余的数据也被发送了,但是并没有被进行组合,这样是不是就看明白了呢?但是这时候有问题了,组合完成之后,多余的数据依旧在发送,如果我们不停发呢?会产生什么后果?,我们来修改一下observable
的代码:
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> e) throws Exception {for (int i = 0; ; i++) {Log.e("XYK",i + "");e.onNext(i);}}
});
运行之后通过Monitors我们查看一下内存:(Sorry,这里没截下图来....可以自己试一下,不过想一下也知道,内存肯定会暴增嘛...)
在内存持续暴增的情况下,可能用不了多久就会OOM,这种情况下我们应该怎么办呢?
还记不记得上篇文章写了啥,过滤啊,我们把不需要的过滤掉不就好了
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> e) throws Exception {for (int i = 0; ; i++) {e.onNext(i);}}
}).filter(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {return integer % 100 == 0;}
});
当然,除此之外还有很多办法,可以根据实际情况来进行组合应用.
RxJava中的zip操作符作用是将多条上游发送的事件进行结合到一起,发送到下游,并且按照顺序来进行结合,如多条上游中发送的事件数量不一致,则以最少的那条中的事件为准,下游接收到的事件数量和其相等.
Rxjava的2.x与1.x的区别(官翻)
Nulls
RxJava 2x 不再支持 null
值,如果传入一个null
会抛出 NullPointerException
Observable.just(null);Single.just(null);Observable.fromCallable(() -> null).subscribe(System.out::println, Throwable::printStackTrace);Observable.just(1).map(v -> null).subscribe(System.out::println, Throwable::printStackTrace);
这意味着 Observable<Void>
不再发射任何值,而只是正常结束或者抛出异常。API 设计者可以定义 Observable<Object>
这样的观察者, 因为并不确定具体是什么类型的 Object
。例如,如果你需要一个 signaller-like ,你可以定义一个共享的枚举类型,它是一个单独的实例onNext
‘d:
enum Irrelevant { INSTANCE; }Observable<Object> source = Observable.create((ObservableEmitter<Object> emitter) -> {System.out.println("Side-effect 1");emitter.onNext(Irrelevant.INSTANCE);System.out.println("Side-effect 2");emitter.onNext(Irrelevant.INSTANCE);System.out.println("Side-effect 3");emitter.onNext(Irrelevant.INSTANCE);
});source.subscribe(e -> { /* Ignored. */ }, Throwable::printStackTrace);
Observable 和 Flowable
我们试图在 2.x 中纠正这个问题。因此我们把io.reactivex.Observable
设计成非背压的,并增加一个新的io.reactivex.Flowable
去支持背压。
好消息是操作符的名字几乎没有改动。坏消息是当你执行’organize imports’时必须要格外的小心,它可能无意的给你选择一个非背压的io.reactivex.Observable
。
Single
2.x 的Single
类可以发射一个单独onSuccess
或 onError
消息。它现在按照Reactive-Streams规范被重新设计,SingleObserver
改成了如下的接口。
interface SingleObserver<T> {void onSubscribe(Disposable d);void onSuccess(T value);void onError(Throwable error);
}
并遵循协议 onSubscribe (onSuccess | onError)?
.
Completable
Completable
大部分和以前的一样。因为它在1.x的时候就是按照Reactive-Streams的规范进行设计的。
interface CompletableObserver<T> {void onSubscribe(Disposable d);void onComplete();void onError(Throwable error);
}
并且仍然遵循协议 onSubscribe (onComplete | onError)?
.
Maybe
RxJava 2.0.0-RC2 介绍了一个新的类型 Maybe
。从概念上来说,它是Single
和 Completable
的结合体。它可以发射0个或1个通知或错误的信号。
这个新的类,实际上和其他Flowable
的子类操作符一样可以发射0个或1个序列。
Maybe.just(1)
.map(v -> v + 1)
.filter(v -> v == 1)
.defaultIfEmpty(2)
.test()
.assertResult(2);
Base reactive interfaces
基础reactive接口
按照Reactive-Streams风格的Flowable
实现了 Publisher<T>
接口,其他基础类也实现了类似的基础接口
interface ObservableSource<T> {void subscribe(Observer<? super T> observer);
}interface SingleSource<T> {void subscribe(SingleObserver<? super T> observer);
}interface CompletableSource {void subscribe(CompletableObserver observer);
}interface MaybeSource<T> {void subscribe(MaybeObserver<? super T> observer);
}
因此,很多操作符需要从用户接收Publisher
和 XSource
的一些基础的类型。
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);
通过Publisher
作为输入,你可以组合其他的遵从Reactive-Streams规范的库,而不需要包裹或把它们转换成Flowable
。
如果一个操作符必须要提供一个基础类,那么用户将会收到一个完整的基础类。
Flowable<Flowable<Integer>> windows = source.window(5);source.compose((Flowable<T> flowable) -> flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()));
Subjects 和 Processors
其他类
GroupedObservable
在1.x中,你可以用GroupedObservable.from()
创建一个实例。在2.x中,所有实例都直接继承了GroupedObservable
,因此这个工厂方法不再可用; 现在整个类都是抽象的。
不过你可以继承类然后添加你自定义的subscribeActual
行为来达到1.x中相似的功能。
class MyGroup<K, V> extends GroupedObservable<K, V> {final K key;final Subject<V> subject;public MyGroup(K key) {this.key = key;this.subject = PublishSubject.create();}@Overridepublic T getKey() {return key;}@Overrideprotected void subscribeActual(Observer<? super T> observer) {subject.subscribe(observer);}
}
功能接口
Flowable.just("file.txt")
.map(name -> Files.readLines(name))
.subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);
如果文件不存在或者不可读,结尾的consumer会直接输出IOException
。你可以直接调用Files.readLines(name)
而不需要捕获异常。
Actions
为了减少组件数量,2.x中没有定义Action3
-Action9
和ActionN
。
Functions
此外,操作符不再使用Func1<T, Boolean>
但原始返回类型为Predicate<T>
。
io.reactivex.functions.Functions
类提供了常见的转换功能Function<Object[], R>
Subscriber
Flowable.range(1, 10).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {s.request(Long.MAX_VALUE);}@Overridepublic void onNext(Integer t) {System.out.println(t);}@Overridepublic void onError(Throwable t) {t.printStackTrace();}@Overridepublic void onComplete() {System.out.println("Done");}
});
由于命名冲突,把rx
包改成org.reactivestreams
。此外org.reactivestreams.Subscriber
不能从外面添加、取消或请求。
ResourceSubscriber<Integer> subscriber = new ResourceSubscriber<Integer>() {@Overridepublic void onStart() {request(Long.MAX_VALUE);}@Overridepublic void onNext(Integer t) {System.out.println(t);}@Overridepublic void onError(Throwable t) {t.printStackTrace();}@Overridepublic void onComplete() {System.out.println("Done");}
};Flowable.range(1, 10).delay(1, TimeUnit.SECONDS).subscribe(subscriber);subscriber.dispose();
注意,由于Reactive-Streams的兼容性,方法onCompleted
被重命名为onComplete
。
因为1.x中,Observable.subscribe(Subscriber)
返回Subscription
,用户经常添加Subscription
到CompositeSubscription
中,例如:
CompositeSubscription composite = new CompositeSubscription();composite.add(Observable.range(1, 5).subscribe(new TestSubscriber<Integer>()));由于Reactive-Streams规范,
Publisher.subscribe无返回值。为了弥补这一点,我们增加了E subscribeWith(E subscriber)
方法。因为在2.x中ResourceSubscriber
直接实现了Disposable
,所以代码可以这样写。
CompositeDisposable composite2 = new CompositeDisposable();composite2.add(Flowable.range(1, 5).subscribeWith(subscriber));
在onSubscribe/onStart中调用request
注意,在Subscriber.onSubscribe
或ResourceSubscriber.onStart
中调用request(n)
将会立即调用onNext
,实例代码如下:
Flowable.range(1, 3).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {System.out.println("OnSubscribe start");s.request(Long.MAX_VALUE);System.out.println("OnSubscribe end");}@Overridepublic void onNext(Integer v) {System.out.println(v);}@Overridepublic void onError(Throwable e) {e.printStackTrace();}@Overridepublic void onComplete() {System.out.println("Done");}
});
OnSubscribe start
1
2
3
Done
OnSubscribe end
这个行为不同于1.x中的 request
要经过延迟的逻辑直到上游的Producer
到达时。在2.x中,总是Subscription
先传递下来,90%的情况下没有延迟请求的必要。
Subscription
为了避免名字冲突,1.x的rx.Subscription
被改成了 io.reactivex.Disposable
。
在2.x中其他的subscribe
的重载方法返回Disposable
。
CompositeSubscription
改成CompositeDisposable
,SerialSubscription
和MultipleAssignmentSubscription
被合并到了SerialDisposable
。set()
方法取消了旧值,而replace()
方法没有。RefCountSubscription
已被删除。
背压
作为替代,在2.x中Observable
完全不支持背压,但可以被替换。
Reactive-Streams compliance
Flowable-based sources和operators是遵从Reactive-Streams 1.0.0规范的,除了一个规则§3.9和解释的规则§1.3:
§3.9: While the Subscription is not cancelled, Subscription.request(long n) MUST signal onError with a java.lang.IllegalArgumentException if the argument is <= 0. The cause message MUST include a reference to this rule and/or quote the full rule.
Rule §3.9 requires excessive overhead to handle (half-serializer on every operator dealing with request()) for a bug-case. RxJava 2 (and Reactor 3 in fact) reports the IllegalArgumentException
to RxJavaPlugins.onError
and ignores it otherwise. RxJava 2 passes the Test Compatibility Kit (TCK) by applying a custom operator that routes the IllegalArgumentException
into the Subscriber.onError
in an async-safe manner. All major Reactive-Streams libraries are free of such zero requests; Reactor 3 ignores it as we do and Akka-Stream uses a converter (to interact with other RS sources and consumers) which has (probably) a similar routing behavior as our TCK operator.
§1.3: onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled sequentially (no concurrent notifications).
TCK 允许同步但限制onSubscribe
和 onNext
之间往返。也就是说在onSubscribe
中,调用request(1)
后将会调用onNext
,在onNext
返回后request(1)
才会返回。虽然大部分操作符都是这样的,但操作符observeOn
会异步的调用onNext
,因此onSubscribe
会和onNext
同时被调用。这就是由TCK来检测,我们使用another operator来延迟下游请求直到onSubscribe
返回。再次声明,这种异步行为不是RxJava 2的一个问题,因为在Reactor 3中操作符是线程安全的执行onSubscribe
。Akka-Stream的转换类似于延迟请求。
因为这两个影响inter-library的行为,我们考虑在以后给Flowable
增加了一个标准的操作符,把这两种行为改到一个单独的方法。
Runtime hooks
2.x 中重新设计了RxJavaPlugins
类,现在支持运行时改变回调。测试需要重写schedulers,生命周期方法可以通过回调函数。
RxJavaObservableHook
和友类现在都取消了,RxJavaHooks
功能被加入到了RxJavaPlugins
。
Schedulers
io.reactivex.Scheduler
抽象类现在支持直接调度任务,不需要先创建然后通过Worker
调度。
public abstract class Scheduler {public Disposable scheduleDirect(Runnable task) { ... }public Disposable scheduleDirect(Runnable task, long delay, TimeUnit unit) { ... }public Disposable scheduleDirectPeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) { ... }public long now(TimeUnit unit) { ... }// ... rest is the same: lifecycle methods, worker creation
}
主要的目的是为了避免跟踪Worker
的开销。方法有一个默认的实现,你可以直接复用 createWorker
,但如果有需要,你也可以重写它来实现更强大的功能。
这些方法返回了当前时间调度器的概念, now()
被改成接受一个用于指定单位量的TimeUnit
的方法。
进入reactive的世界
Flowable.create((FlowableEmitter<Integer> emitter) -> {emitter.onNext(1);emitter.onNext(2);emitter.onComplete();
}, BackpressureStrategy.BUFFER);
实际上,1.x中fromEmitter
已经被重命名为Flowable.create
。其他基础类型也有类似的create
方法。
离开reactive的世界
List<Integer> list = Flowable.range(1, 100).toList().blockingGet(); // toList() returns SingleInteger i = Flowable.range(100, 100).blockingLast();
Subscriber<Integer> subscriber = new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {s.request(Long.MAX_VALUE);}public void onNext(Integer t) {if (t == 1) {throw new IllegalArgumentException();}}public void onError(Throwable e) {if (e instanceof IllegalArgumentException) {throw new UnsupportedOperationException();}}public void onComplete() {throw new NoSuchElementException();}
};Flowable.just(1).subscribe(subscriber);
这样的规则同样适用于Observer
, SingleObserver
, MaybeObserver
和 CompletableObserver
。
由于很多现有基于1.x的代码做了类似的事情,我们设计了safeSubscribe
方法来帮助你处理这样的代码。
当然你也可以使用subscribe(Consumer<T>, Consumer<Throwable>, Action)
方法来提供一个回调。
Flowable.just(1)
.subscribe(subscriber::onNext, subscriber::onError, subscriber::onComplete, subscriber::onSubscribe
);
Testing
test() “operator”
为了支持我们内部测试,所有的基础类都有 test()
方法,返回TestSubscriber
或 TestObserver
:
TestSubscriber<Integer> ts = Flowable.range(1, 5).test();TestObserver<Integer> to = Observable.range(1, 5).test();TestObserver<Integer> tso = Single.just(1).test();TestObserver<Integer> tmo = Maybe.just(1).test();TestObserver<Integer> tco = Completable.complete().test();
Flowable.range(1, 5)
.test()
.assertResult(1, 2, 3, 4, 5)
;
值得注意的新的断言方法
assertResult(T... items)
: 断言在onComplete
中将会按指定顺序收到给定的值,并且没有错误。assertFailure(Class<? extends Throwable> clazz, T... items)
: 断言将会收到指定的异常。assertFailureAndMessage(Class<? extends Throwable> clazz, String message, T... items)
: 和assertFailure
一样,但还会验证getMessage()
中包含的值。awaitDone(long time, TimeUnit unit)
等待一个终结事件,如果超时了,将会取消该事件。assertOf(Consumer<TestSubscriber<T>> consumer)
组成一些断言到流式链中。
其中一个好处是,把Flowable
改为Observable
,所以测试代码不需要改变,内部的已经把TestSubscriber
改成了TestObserver
。
提前取消和请求
TestObserver
中的test()
方法有一个 test(boolean cancel)
重载,它能在订阅前取消TestSubscriber
/TestObserver
:
PublishSubject<Integer> pp = PublishSubject.create();// nobody subscribed yet
assertFalse(pp.hasSubscribers());pp.test(true);// nobody remained subscribed
assertFalse(pp.hasSubscribers());
PublishProcessor<Integer> pp = PublishProcessor.create();TestSubscriber<Integer> ts = pp.test(0L);ts.request(1);pp.onNext(1);
pp.onNext(2);ts.assertFailure(MissingBackpressureException.class, 1);
测试异步代码
Flowable.just(1)
.subscribeOn(Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
Mockito & TestSubscriber
那些在1.x中正在使用Mockito和Observer
的用户需要去使用Subscriber.onSubscribe
方法去提出初始的请求,否则序列化将会挂起或者失败:
@SuppressWarnings("unchecked")
public static <T> Subscriber<T> mockSubscriber() {Subscriber<T> w = mock(Subscriber.class);Mockito.doAnswer(new Answer<Object>() {@Overridepublic Object answer(InvocationOnMock a) throws Throwable {Subscription s = a.getArgumentAt(0, Subscription.class);s.request(Long.MAX_VALUE);return null;}}).when(w).onSubscribe((Subscription)any());return w;}
操作符的差别
2.x中大部分操作符仍然被保留,实际上大部分行为和1.x一样。下面的列表中列出了每一个基础类的在1.x和2.x的区别
通常来说,很多操作符提供了重载,允许指定运行上游的内部缓冲区的大小或者预先分配的数量。
一些操作符重载已经被重命名为了后缀风格,比如 fromArray
, fromIterable
。这么做的原因是,当用Java 8编译时,javac往往不能区分功能接口类型。
在1.x中被标记为@Beta
或 @Experimental
的操作符已经成为正式操作符了。
1.x Observable 到 2.x Flowable
工厂方法:
1.x | 2.x |
---|---|
amb
|
添加 amb(ObservableSource...) 重载, 2-9 参数被删除
|
RxRingBuffer.SIZE |
bufferSize()
|
combineLatest
|
增加条目重载, 增加 带bufferSize 参数的重载, combineLatest(List) 被删除
|
concat
|
增加带 prefetch 参数的重载, 5-9 重载被删除 , 使用 concatArray 代替
|
N/A |
增加 concatArray 和 concatArrayDelayError
|
N/A |
增加 concatArrayEager 和 concatArrayEagerDelayError
|
concatDelayError
|
增加带延时的重载 |
concatEagerDelayError
|
增加带延时的重载 |
create(SyncOnSubscribe)
|
被 generate + 重载代替
|
create(AsnycOnSubscribe)
|
不存在 |
create(OnSubscribe)
|
使用安全的 create(FlowableOnSubscribe, BackpressureStrategy) , 支持unsafeCreate()
|
from
|
拆分成 fromArray , fromIterable , fromFuture
|
N/A |
增加 fromPublisher
|
fromAsync
|
重命名为 create()
|
N/A |
增加 intervalRange()
|
limit
|
被删除, 使用 take
|
merge
|
增加带 prefetch 的重载
|
mergeDelayError
|
增加带 prefetch 的重载
|
sequenceEqual
|
增加带 bufferSize 的重载
|
switchOnNext
|
增加带 prefetch 的重载
|
switchOnNextDelayError
|
增加带 prefetch 的重载
|
timer
|
被废弃 |
zip
|
增加带 bufferSize 和 delayErrors 的重载, 拆分成了 zipArray 和 zipIterable
|
实例方法:
1.x | 2.x |
---|---|
all
|
RC3 返回 Single<Boolean>
|
any
|
RC3 返回 Single<Boolean>
|
asObservable
|
重命名为 hide() , 隐藏所有的身份
|
buffer
|
重载自定义的 Collection 提供者
|
cache(int)
|
被废弃 |
collect
|
RC3 返回 Single<U>
|
collect(U, Action2<U, T>)
|
改成 collectInto 和 RC3 返回 Single<U>
|
concatMap
|
增加带 prefetch 的重载
|
concatMapDelayError
|
增加带 prefetch 的重载, 支持延时
|
concatMapEager
|
增加带 prefetch 的重载
|
concatMapEagerDelayError
|
增加带 prefetch 的重载, 支持延时
|
count
|
RC3 返回 Single<Long>
|
countLong
|
被删除, 使用 count
|
distinct
|
重载自定义的 Collection 提供者.
|
doOnCompleted
|
重命名为 doOnComplete
|
doOnUnsubscribe
|
重命名为 Flowable.doOnCancel 和 doOnDispose , additional info
|
N/A |
增加 doOnLifecylce 来处理 onSubscribe , request 和 cancel
|
elementAt(int)
|
RC3 不再发射 NoSuchElementException 如果源比索引更小 |
elementAt(Func1, int)
|
被删除, 使用 filter(predicate).elementAt(int) 代替
|
elementAtOrDefault(int, T)
|
重命名为 elementAt(int, T) 和 RC3 返回 Single<T>
|
elementAtOrDefault(Func1, int, T)
|
被删除, 使用 filter(predicate).elementAt(int, T) 代替
|
first()
|
RC3 重命名为 firstElement 返回 Maybe<T>
|
first(Func1)
|
被删除, 使用 filter(predicate).first() 代替
|
firstOrDefault(T)
|
重命名为 first(T) RC3 返回 Single<T>
|
firstOrDefault(Func1, T)
|
被删除, 使用 filter(predicate).first(T) 代替
|
flatMap
|
增加带 prefetch 的重载
|
N/A |
增加 forEachWhile(Predicate<T>, [Consumer<Throwable>, [Action]]) 用于有条件停止 consumption
|
groupBy
|
增加带 bufferSize 和 delayError 的重载, 支持 支持内部自定义map,RC1中没有
|
ignoreElements
|
RC3 返回 Completable
|
isEmpty
|
RC3 返回 Single<Boolean>
|
last()
|
RC3 重命名为 lastElement 返回 Maybe<T>
|
last(Func1)
|
被删除, 使用 filter(predicate).last() 代替
|
lastOrDefault(T)
|
重命名为 last(T) RC3 返回 Single<T>
|
lastOrDefault(Func1, T)
|
被删除, 使用 filter(predicate).last(T) 代替
|
nest
|
被删除, 使用 just 代替
|
publish(Func1)
|
增加带 prefetch 的重载
|
reduce(Func2)
|
RC3 返回 Maybe<T>
|
N/A |
增加 reduceWith(Callable, BiFunction) 为了减少自定义Subscriber, 返回 Single<T>
|
N/A |
增加 repeatUntil(BooleanSupplier)
|
repeatWhen(Func1, Scheduler)
|
删除了重载, 使用 subscribeOn(Scheduler).repeatWhen(Function) 代替
|
retry
|
增加 retry(Predicate) , retry(int, Predicate)
|
N/A |
增加 retryUntil(BooleanSupplier)
|
retryWhen(Func1, Scheduler)
|
删除了重载, 使用 subscribeOn(Scheduler).retryWhen(Function) 代替
|
N/A |
增加 sampleWith(Callable, BiFunction) 去扫描自定义的Subscriber方式
|
single()
|
RC3 重命名为 singleElement 返回 Maybe<T>
|
single(Func1)
|
被删除,使用 filter(predicate).single() 代替
|
singleOrDefault(T)
|
重命名为 single(T) RC3 返回 Single<T>
|
singleOrDefault(Func1, T)
|
被删除,使用 filter(predicate).single(T) 代替
|
skipLast
|
增加带 bufferSize 和 delayError 的重载
|
startWith
|
2-9 参数的被删除了, 使用 startWithArray 代替
|
N/A |
增加 startWithArray 来减少二义性
|
N/A |
增加 subscribeWith 返回输入的订阅对象
|
switchMap
|
增加带 prefetch 的重载
|
switchMapDelayError
|
增加带 prefetch 的重载
|
takeLastBuffer
|
被删除 |
N/A |
增加 test()
|
timeout(Func0<Observable>, ...)
|
方法签名改成了 timeout(Publisher, ...) 删除了方法, 如果有需要,使用defer(Callable<Publisher>>)
|
toBlocking().y
|
内联 blockingY() 操作符, 除了 toFuture
|
toCompletable
|
RC3 被删除, 使用 ignoreElements 代替
|
toList
|
RC3 返回 Single<List<T>>
|
toMap
|
RC3 返回 Single<Map<K, V>>
|
toMultimap
|
RC3 返回 Single<Map<K, Collection<V>>>
|
N/A |
增加 toFuture
|
N/A |
增加 toObservable
|
toSingle
|
RC3 被删除, 使用 single(T) 代替
|
toSortedList
|
RC3 增加 Single<List<T>>
|
withLatestFrom
|
5-9 个参数的重载被删除 |
zipWith
|
增加带 prefetch 和 delayErrors 的重载
|
不同的返回类型
2.x中一些的操作符产生确切的一个值或者一个错误时,返回Single
。
操作符 | 旧返回值 | 新返回值 | 备注 |
---|---|---|---|
all(Predicate)
|
Observable<Boolean>
|
Single<Boolean>
|
如果所有的元素都匹配,则发射true |
any(Predicate)
|
Observable<Boolean>
|
Single<Boolean>
|
如果所有的元素都匹配,则发射true |
count()
|
Observable<Long>
|
Single<Long>
|
计算序列中元素的数量 |
elementAt(int)
|
Observable<T>
|
Maybe<T>
|
Emits 给定位置处的元素或完成的元素 |
elementAt(int, T)
|
Observable<T>
|
Single<T>
|
发射指定位置的元素或默认元素 |
first(T)
|
Observable<T>
|
Single<T>
|
发射第一个元素或者抛出NoSuchElementException
|
firstElement()
|
Observable<T>
|
Maybe<T>
|
发射第一个元素或者结束 |
ignoreElements()
|
Observable<T>
|
Completable
|
忽略所有非终端事件 |
isEmpty()
|
Observable<Boolean>
|
Single<Boolean>
|
如果源为空,则发射true |
last(T)
|
Observable<T>
|
Single<T>
|
发射最后一个元素或默认值 |
lastElement()
|
Observable<T>
|
Maybe<T>
|
发射最后一个元素或结束 |
reduce(BiFunction)
|
Observable<T>
|
Maybe<T>
|
发射减少的值或者结束 |
reduce(Callable, BiFunction)
|
Observable<U>
|
Single<U>
|
发射减少的值或者初始的值 |
reduceWith(U, BiFunction)
|
Observable<U>
|
Single<U>
|
发射减少的值或者初始的值 |
single(T)
|
Observable<T>
|
Single<T>
|
发射唯一的元素或默认值 |
singleElement()
|
Observable<T>
|
Maybe<T>
|
发射唯一的元素或结束 |
toList()
|
Observable<List<T>>
|
Single<List<T>>
|
将所有元素放到 List
|
toMap()
|
Observable<Map<K, V>>
|
Single<Map<K, V>>
|
将所有元素放到 Map
|
toMultimap()
|
Observable<Map<K, Collection<V>>>
|
Single<Map<K, Collection<V>>>
|
将所有元素包装到Collection后放到 Map
|
toSortedList()
|
Observable<List<T>>
|
Single<List<T>>
|
将所有元素放到 List 并排序
|
移除
为了保证最终的2.0API尽可能干净,我们删除了一些候选的方法和组件。
删除时的版本 | 组件 | 备注 |
---|---|---|
RC3 |
Flowable.toCompletable()
|
使用 Flowable.ignoreElements() 代替
|
RC3 |
Flowable.toSingle()
|
使用 Flowable.single(T) 代替
|
RC3 |
Flowable.toMaybe()
|
使用 Flowable.singleElement() 代替
|
RC3 |
Observable.toCompletable()
|
使用 Observable.ignoreElements() 代替
|
RC3 |
Observable.toSingle()
|
使用 Observable.single(T) 代替
|
RC3 |
Observable.toMaybe()
|
使用 Observable.singleElement() 代替
|
其他改变
doOnCancel/doOnDispose/unsubscribeOn
由于同样的原因unsubscribeOn
也没被在终端路径上调用,但只有实际在链上调用cancel
时,才会调用unsubscribeOn
。
Flowable.just(1, 2, 3)
.doOnCancel(() -> System.out.println("Cancelled!"))
.subscribe(System.out::println);
然而,下面将会调用take
操作符在传送过程中取消onNext
Flowable.just(1, 2, 3)
.doOnCancel(() -> System.out.println("Cancelled!"))
.take(2)
.subscribe(System.out::println);
如果你需要在终端或者取消时执行清理,考虑使用using
操作符代替。
RxJava2.x是一个非常棒的流式编程,采用的观察者模式思想,事件的产生者产生事间之后发送给绑定的接受者,接受顺序与发送顺序一致.但是 是独立于RxJava1.x存在,本文讲解RxJava2.x的简相关推荐
- 20190827 On Java8 第十四章 流式编程
第十四章 流式编程 流的一个核心好处是,它使得程序更加短小并且更易理解.当 Lambda 表达式和方法引用(method references)和流一起使用的时候会让人感觉自成一体.流使得 Java ...
- ReactiveX流式编程—从xstream讲起
ReactiveX流式编程 ReactiveX来自微软,它是一种针对异步数据流的编程.简单来说,它将一切数据,包括HTTP请求,DOM事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处 ...
- Lambda表达式和Stream流式编程
写在前面 IDEA都默认是jdk11了,我这还写着jdk8的新特性呢,惭愧惭愧.其实在学校的时候,基本没咋用过Lambda表达式和Stream流式编程,但是在实习的时候,发现公司的代码好多这样写的,没 ...
- Java8新特性之Stream流式编程
特地感谢鲁班大叔的分享,原学习地址:Java8 Stream流式编程爱 撸码就是快,流式编程好 代码传家宝 以下是学习过程整理的笔记 1.简介 Stream 流处理,首先要澄清的是 java8 中的 ...
- 编程范式:函数式编程防御式编程响应式编程契约式编程流式编程
不长的编码生涯,看到无数概念和词汇:面向对象编程.过程式编程.指令式编程.函数式编程.防御式编程.流式编程.响应式编程.契约式编程.进攻式编程.声明式编程--有种生无可恋的感觉. 本文试图加以汇总和整 ...
- Flink系列之Flink 流式编程模式总结
title: Flink系列 一.Flink 流式编程模式总结 1.1 基础总结 官网: https://flink.apache.org/ Apache Flink® - Stateful Comp ...
- java流式编程(六)Collector接口
目录 一.接口定义 二.接口泛型 一.接口定义 public interface Collector<T, A, R> {Supplier<A> supplier();BiCo ...
- java中Lambda+流式编程讲解(心得篇)
首先举一个例子来说明,我们要在一群人里面刷选一些,首先自己写一个Person类: package 任务十三__流式计算.公司;import java.util.Objects;/*** @author ...
- Java Stream流式编程
流式编程Stream 一.简介 流式 API 从 Java8 开始引入,支持链式书写. 流只能消费一次,不能被两次消费(两次最终操作) 流在管道中流通,在节点被处理. 流[无存储],流不是一种数据结构 ...
最新文章
- 【硅谷牛仔】Facebook最初的CEO肖恩帕克
- 【赛道解析】针对冷热读写场景的 RocketMQ 存储系统设计思路拆解
- Detectron-MaskRCnn: 用于抠图的FCNN
- JavaSE 学习参考:变量(1)
- 福利来啦!!Python基础语法干货
- Google基本语法二,指令
- DPDK示例l3fwd性能测试
- 如何使用Excel的数据去查询数据库?
- 优秀软件测试工程师必读书籍推荐
- 关于加速器加速后进入游戏显示WiFi功能丢失的解决方法(maybe)
- php eot html,PHP eot
- react-native 启动android项目失败:A problem occurred evaluating project ‘:react-native-orientation‘.
- Android5.1浏览器证书问题
- 节假日读取接口_2018年节假日API接口,直接计算好的
- html图片实现左右滑动,css实现图片左右滑动
- 服务器迁移域名和证书要改什么用,服务器数据迁移方案介绍 怎样更换网站域名?...
- Android:有关下拉菜单导航的学习(供自己参考)
- android混淆那些坑
- 给HTML页面设置背景音乐
- 【Verilog基础】ROM IP 核基础知识