RxJava----操作符:辅助操作符
Observable Utility Operators(辅助操作符)
delay
顾名思义,Delay操作符就是让发射数据的时机延后一段时间,这样所有的数据都会依次延后一段时间发射。
log("start subscrib:" + System.currentTimeMillis()/1000);Observable<Long> observable = Observable.create(new Observable.OnSubscribe<Long>() {@Overridepublic void call(Subscriber<? super Long> subscriber) {for (int i = 1; i <= 2; i++) {Long currentTime=System.currentTimeMillis()/1000;log("subscrib:" + currentTime);subscriber.onNext(currentTime);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}}).subscribeOn(Schedulers.newThread());observable.delay(2000, TimeUnit.MILLISECONDS).subscribe(new Action1<Long>() {@Overridepublic void call(Long aLong) {log("delay:"+System.currentTimeMillis()/1000+"---"+(System.currentTimeMillis()/1000-aLong));}});
结果:
start subscrib:1462519228
subscrib:1462519228
subscrib:1462519229
delay:1462519230---2
delay:1462519231---2
delaySubscription
不同之处在于Delay是延时数据的发射,而DelaySubscription是延时注册Subscriber。
dealy是延迟发射,delaySubscription则是延迟收到。
log("start subscrib:" + System.currentTimeMillis()/1000);Observable<Long> observable = Observable.create(new Observable.OnSubscribe<Long>() {@Overridepublic void call(Subscriber<? super Long> subscriber) {for (int i = 1; i <= 2; i++) {Long currentTime=System.currentTimeMillis()/1000;log("subscrib:" + currentTime);subscriber.onNext(currentTime);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}}).subscribeOn(Schedulers.newThread());observable.delaySubscription(2000, TimeUnit.MILLISECONDS).subscribe(new Action1<Long>() {@Overridepublic void call(Long aLong) {log("delaySubscription:"+System.currentTimeMillis()/1000+"---"+(System.currentTimeMillis()/1000-aLong));}});
结果:
start subscrib:1462519279
subscrib:1462519281
delaySubscription:1462519281---0
subscrib:1462519282
delaySubscription:1462519282---0
do
do操作符就是给Observable的生命周期的各个阶段加上一系列的回调监听,当Observable执行到这个阶段的时候,这些回调就会被触发。在Rxjava实现了很多的doxxx操作符。
doOnEach
doOnEach可以给Observable加上这样的样一个回调:Observable每发射一个数据的时候就会触发这个回调,不仅包括onNext还包括onError和onCompleted。
Observable observable=Observable.just(1,2,3);observable.doOnEach(new Action1<Notification>() {@Overridepublic void call(Notification notification) {log("doOnEach send " + notification.getValue() + " type:" + notification.getKind());}}).subscribe(new Action1() {@Overridepublic void call(Object o) {log(o.toString());}});Subject<Integer, Integer> values = ReplaySubject.create();values.doOnEach(new Action1<Notification<? super Integer>>() {@Overridepublic void call(Notification<? super Integer> notification) {log("doOnEach send " + notification.getValue() + " type:" + notification.getKind());}}).subscribe(new Action1() {@Overridepublic void call(Object o) {log(o.toString());}});values.onNext(4);values.onNext(5);values.onNext(6);values.onError(new Exception("Oops"));
结果:
doOnEach send 1 type:OnNext
1
doOnEach send 2 type:OnNext
2
doOnEach send 3 type:OnNext
3
doOnEach send null type:OnCompleteddoOnEach send 4 type:OnNext
4
doOnEach send 5 type:OnNext
5
doOnEach send 6 type:OnNext
6
doOnEach send null type:OnError
doOnNext
doOnNext则只有onNext的时候才会被触发。
Subject<Integer, Integer> values = ReplaySubject.create();values.doOnNext(new Action1<Integer>() {@Overridepublic void call(Integer integer) {log("doOnNext send :"+integer.toString());}}).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {log(integer.toString());}});values.onNext(4);values.onError(new Exception("Oops"));
结果:
doOnNext send :4
4
doOnSubscribe
doOnSubscribe会在Subscriber进行订阅的时候触发回调。
Observable observable=Observable.just(1,2);observable.subscribe(new Action1() {@Overridepublic void call(Object o) {log("first:"+o.toString());}});observable.subscribe(new Action1() {@Overridepublic void call(Object o) {log("second:"+o.toString());}});
结果:
I'm be subscribed!
first:1
first:2
I'm be subscribed!
second:1
second:2
doOnUnSubscribe
doOnUnSubscribe则会在Subscriber进行反订阅的时候触发回调。
当一个Observable通过OnError或者OnCompleted结束的时候,会反订阅所有的Subscriber。
Observable observable = Observable.just(1, 2).doOnUnsubscribe(new Action0() {@Overridepublic void call() {log("I'm be unSubscribed!");}});Subscription subscribe1 = observable.subscribe();Subscription subscribe2 = observable.subscribe();subscribe1.unsubscribe();subscribe2.unsubscribe();
结果:
I'm be unSubscribed!
I'm be unSubscribed!
doOnError
doOnError会在OnError发生的时候触发回调,并将Throwable对象作为参数传进回调函数里;
try {Observable observable = Observable.error(new Throwable("呵呵哒")).doOnError(new Action1<Throwable>() {@Overridepublic void call(Throwable throwable) {log(throwable.getMessage().toString());}});observable.subscribe();}catch (Exception e){log("catch the exception");}
结果:
呵呵哒
catch the exception
doOnComplete
doOnComplete会在OnCompleted发生的时候触发回调。
Observable observable = Observable.empty().doOnCompleted(new Action0() {@Overridepublic void call() {log("Complete!");}});observable.subscribe();
结果:
Complete!
doOnTerminate
DoOnTerminate会在Observable结束前触发回调,无论是正常还是异常终止;
Subject<Integer, Integer> values = ReplaySubject.create();values.doOnTerminate(new Action0() {@Overridepublic void call() {log("order to terminate");}}).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {log(integer.toString());}}, new Action1<Throwable>() {@Overridepublic void call(Throwable throwable) {log(throwable.getMessage().toString());}});values.onNext(4);values.onError(new Exception("Oops"));
结果:
4
order to terminate
Oops
finallyDo
finallyDo会在Observable结束后触发回调,无论是正常还是异常终止。
Observable observable = Observable.empty().finallyDo(new Action0() {@Overridepublic void call() {log("already terminate");}});observable.subscribe(new Action1() {@Overridepublic void call(Object o) {}}, new Action1<Throwable>() {@Overridepublic void call(Throwable throwable) {}}, new Action0() {@Overridepublic void call() {log("Complete!");}});
结果:
Complete!
already terminate
materialize
materialize操作符将OnNext/OnError/OnComplete都转化为一个Notification对象并按照原来的顺序发射出来。
public final Observable<Notification<T>> materialize()
元数据中包含了源 Observable 所发射的动作,是调用 onNext 还是 onComplete。注意上图中,源 Observable 结束的时候, materialize 还会发射一个 onComplete 数据,然后才发射一个结束事件。
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);values.take(3).materialize().subscribe(new Action1<Object>() {@Overridepublic void call(Object o) {log(o.toString());}});
结果:
meterialize:0--type:OnNext
meterialize:1--type:OnNext
meterialize:2--type:OnNext
meterialize:null--type:OnCompleted
Notification 类包含了一些判断每个数据发射类型的方法,如果出错了还可以获取错误信息 Throwable 对象。
dematerialize
deMeterialize则是与materialize 执行相反的过程。
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);values.take(3).materialize().dematerialize().subscribe(new Action1<Object>() {@Overridepublic void call(Object o) {log(o.toString());}});
结果:
0
1
2
注意:在调用dematerialize()
之前必须先调用materialize()
,否则会报错。
serialize
强制Observable按次序发射数据并且功能是有效的
如果你无法确保自定义的操作符符合 Rx 的约定,例如从多个源异步获取数据,则可以使用 serialize 操作函数。 serialize 可以把一个不符合约定的 Observable 转换为一个符合约定的 Observable。
下面创建一个不符合约定的 Observable,并且订阅到该 Observable上:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {subscriber.onNext(1);subscriber.onNext(2);subscriber.onCompleted();subscriber.onNext(3);subscriber.onCompleted();}});observable.doOnUnsubscribe(new Action0() {@Overridepublic void call() {log("Unsubscribed");}}) .subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {}}, new Action1<Throwable>() {@Overridepublic void call(Throwable throwable) {}}, new Action0() {@Overridepublic void call() {log("Complete!");}});
结果:
1
2
Complete!
Unsubscribed
先不管上面的 Observable 发射的数据,订阅结束的情况看起来符合 Rx 约定。 这是由于 subscribe 认为当前数据流结束的时候会主动结束这个 Subscription。但实际使用中我们可能并不想直接结束这个Subscription。还有一个函数为 unsafeSubscribe ,该函数不会自动取消订阅。
observable.doOnUnsubscribe(new Action0() {@Overridepublic void call() {log("Unsubscribed");}}).unsafeSubscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log("Complete!");}@Overridepublic void onError(Throwable e) {}@Overridepublic void onNext(Integer integer) {}});
结果:
1
2
Complete!
3
Complete!
上面的示例最后就没有打印 Unsubscribed 字符串。
unsafeSubscribe 也不能很好的处理错误情况。所以该函数几乎没用。在文档中说:该函数应该仅仅在自定义操作函数中处理嵌套订阅的情况。 为了避免这种操作函数接受到不合法的数据流,我们可以在其上应用 serialize 操作函数:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {subscriber.onNext(1);subscriber.onNext(2);subscriber.onCompleted();subscriber.onNext(3);subscriber.onCompleted();}}).cast(Integer.class).serialize(); observable.doOnUnsubscribe(new Action0() {@Overridepublic void call() {log("Unsubscribed");}}).unsafeSubscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log("Complete!");}@Overridepublic void onError(Throwable e) {}@Overridepublic void onNext(Integer integer) {}});
结果:
1
2
Complete!
尽管上面的代码中没有调用unsubscribe, 但是数据流事件依然符合约定。最后也收到了完成事件。
timeout
添加超时机制,如果过了指定的一段时间没有发射数据,就发射一个错误通知
- 我们可以认为timeout()为一个Observable的限时的副本。
- 如果在指定的时间间隔内Observable不发射值的话,它监听的原始的Observable时就会触发onError()函数。
Observable<Long> values = Observable.interval(200, TimeUnit.MILLISECONDS);Subscription subscription = values.timeout(300,TimeUnit.MILLISECONDS).subscribe(new Observer<Long>() {@Overridepublic void onCompleted() {log("Complete!");}@Overridepublic void onError(Throwable e) {log(e.getMessage().toString());}@Overridepublic void onNext(Long aLong) {log(aLong+"");}});
结果:
0
1
2
...
Rxjava将Timeout实现为很多不同功能的操作符,比如说超时后用一个备用的Observable继续发射数据等。
Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {for (int i = 0; i <= 3; i++) {try {Thread.sleep(i * 100);} catch (InterruptedException e) {e.printStackTrace();}subscriber.onNext(i);}subscriber.onCompleted();}}).timeout(200, TimeUnit.MILLISECONDS, Observable.just(5, 6)).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {log(integer.toString());}});
结果:
0
1
2
5
6
timestamp
给Observable发射的每个数据项添加一个时间戳
timestamp 把数据转换为 Timestamped 类型,里面包含了原始的数据和一个原始数据是何时发射的时间戳。
public final Observable<Timestamped<T>> timestamp()
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);values.take(3).timestamp().subscribe(new Action1<Timestamped>() {@Overridepublic void call(Timestamped mTimestamped) {log(mTimestamped.toString());}});
结果:
Timestamped(timestampMillis = 1461758360570, value = 0)
Timestamped(timestampMillis = 1461758360670, value = 1)
Timestamped(timestampMillis = 1461758360771, value = 2)
从结果可以看到,上面的数据大概每隔100毫秒发射一个。
timeInterval
将一个Observable转换为发射两个数据之间所耗费时间的Observable
如果你想知道前一个数据和当前数据发射直接的时间间隔,则可以使用 timeInterval 函数。
public final Observable<TimeInterval<T>> timeInterval()
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);values.take(3).timeInterval().subscribe(new Action1<TimeInterval>() {@Overridepublic void call(TimeInterval mTimeInterval) {log(mTimeInterval.toString());}});
结果:
TimeInterval [intervalInMilliseconds=101, value=0]
TimeInterval [intervalInMilliseconds=99, value=1]
TimeInterval [intervalInMilliseconds=100, value=2]
using
创建一个只在Observable的生命周期内存在的一次性资源
Using操作符创建一个在Observable生命周期内存活的资源,也可以这样理解:我们创建一个资源并使用它,用一个Observable来限制这个资源的使用时间,当这个Observable终止的时候,这个资源就会被销毁。
public static final <T,Resource> Observable<T> using(Func0<Resource> resourceFactory,Func1<? super Resource,? extends Observable<? extends T>> observableFactory,Action1<? super Resource> disposeAction)
using 有三个参数,分别是:
- 1.创建这个一次性资源的函数
- 2.创建Observable的函数
- 3.释放资源的函数
当 Observable 被订阅的时候,resourceFactory 用来获取到需要的资源;observableFactory 用这个资源来发射数据;当 Observable 完成的时候,disposeAction 来释放资源。
Observable observable = Observable.using(new Func0<Animal>() {@Overridepublic Animal call() {return new Animal();}}, new Func1<Animal, Observable<?>>() {@Overridepublic Observable<?> call(Animal animal) {return Observable.timer(3, TimeUnit.SECONDS);//三秒后发射一次就completed
// return Observable.timer(4, 2, TimeUnit.SECONDS);//没有completed,不停的发射数据
// return Observable.range(1,3);//一次发射三个数据,马上结束
// return Observable.just(1,2,3);//一次发射三个数据,马上结束}}, new Action1<Animal>() {@Overridepublic void call(Animal animal) {animal.relase();}});Subscriber subscriber = new Subscriber() {@Overridepublic void onCompleted() {log("subscriber---onCompleted");}@Overridepublic void onError(Throwable e) {log("subscriber---onError");}@Overridepublic void onNext(Object o) {log("subscriber---onNext"+o.toString());//o是发射的次数统计,可以用timer(4, 2, TimeUnit.SECONDS)测试}};observable.count().subscribe(subscriber);
结果:
create animal
animal eat
animal eat
animal eat
subscriber---onNext1
subscriber---onCompleted
animal released
项目源码 GitHub求赞,谢谢!
引用:
RxJava操作符(六)Utility-云少嘎嘎嘎-ChinaUnix博客
RxJava 教程第三部分:驯服数据流之自定义操作函数 - 云在千峰
RxJava----操作符:辅助操作符相关推荐
- RxJava 之创建操作符
RxJava 的创建操作符主要包括如下内容: just():将一个或多个对象转换成发射这个或这些对象的一个 Observable from():将一个 Iterable.一个 Future 或者一个数 ...
- 解剖 RxJava 之过滤操作符
介绍 此文章结合 Github AnalyseRxJava 项目,给 Android 开发者带来 RxJava 详细的解说.参考自 RxJava Essential 及书中的例子 关于 RxJava ...
- 【C语言】算数操作符 移位操作符 以及 sizeof单目操作符讲解
目录 1.算术操作符 2. 移位操作符 2.1 左移操作符 2.2 右移操作符 3.sizeof 单目操作符介绍 操作符介绍: C语言操作符分为: 算术操作符 移位操作符 位操作符 赋值操作符 单目操 ...
- 全面讲解Python列表数组(二),列表分区/片,列表操作符,比较操作符,逻辑操作符,连接操作符,重复操作符,成员关系操作符;
一 列表分片 简单概括来说就是可以从一个列表中一次性取出来多个元素等操作; 这里有一个列表 member=[1,2,3,4,5] member[1:3] [2,3] 还可以 member[:3] [1 ...
- oracle连接操作符,Oracle操作符,函数
SQL 操作符 Oracle 支持的 SQL 操作符分类如下: 操作符介绍(一) 算术操作符 用于执行数值计算 可以在SQL语句中使用算术表达式,算术表达式由数值数据类型的列名.数值常量和连接它们的算 ...
- 操作符 算数操作符
操作符 算数操作符 + - * / % ++ -- /**任意单元的长度超过int,那么结果就按照最长的长度计算*/public class year {//类对应的块public static vo ...
- c++ 操作符大全-算术操作符、关系操作符、逻辑操作符、位操作符、自增自减操作符、赋值操作符、条件操作符、逗号操作符、操作符优先级
文章目录 操作符 1.算术操作符 2.关系操作符 3.逻辑操作符 4.位操作符 5.自增自减操作符 6.赋值操作符 7.条件操作符 8.逗号操作符 9.操作符优先级 操作符 计算机程序可以看作一串运算 ...
- Rxjava(2.操作符)
参考地址 英文版 中文版 Rxjava(1.基础篇) lambda表达式 ReactiveX中文翻译文档 本文主要分为: 准备工作 改进 还可以更好 丰富的操作符 其他的操作符(归纳几十个) 写下本文 ...
- RxJava之过滤操作符
涉及到列表的数据时,总是会想到一个过滤这个词语.比如,在1-100的整数中,筛选出偶数或者奇数相加,或者将前49个数相加,又或者后36个数相加,等等.在这样的场景中,不由想到将需要的数据筛选出来.在发 ...
最新文章
- 聊天机器人之环境准备
- SBB:pH主导土壤中固氮群落的共存与装配
- Android 模拟Uart 串口通信
- C++学习笔记----3.2 C++引用在本质上是什么,它和指针到底有什么区别
- ai的预览模式切换_绝对高级!AI打造超酷矩阵纬度文字效果!
- python监控程序编写_05-python进阶-简单监控程序开发
- C#:导入Excel通用类(CSV格式)
- 苹果加入AOM联盟 AV1获全主流生态平台支持
- php获取当前周得周一_PHP怎样获得最近一个周一和上周一的日期?
- maven实现多模块热部署
- ajax原生为什么else会执行2次,关于Ajax,明明传过去的值是1,可不知道为什么就是else起作用?...
- Hystrix熔断机制原理剖析
- Java编程必备软件
- 空间不足以提取VMware Tools解决方法
- Google学术的使用指南
- 软考初级程序员--学习
- Mybatis动态sql是做什么的?都有哪些动态sql?简述一下动态sql的执行原理?
- 9个适合上班族晚上在家就能赚钱的副业推荐(建议收藏)
- java没错泄露_记一次尴尬的Java应用内存泄露排查
- SecureCRT快速连接服务器