Observable Utility Operators(辅助操作符)

delay

顾名思义,Delay操作符就是让发射数据的时机延后一段时间,这样所有的数据都会依次延后一段时间发射。

        log("start subscrib:" + System.currentTimeMillis()/1000);Observable<Long> observable = Observable.create(new Observable.OnSubscribe<Long>() {@Overridepublic void call(Subscriber<? super Long> subscriber) {for (int i = 1; i <= 2; i++) {Long currentTime=System.currentTimeMillis()/1000;log("subscrib:" + currentTime);subscriber.onNext(currentTime);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}}).subscribeOn(Schedulers.newThread());observable.delay(2000, TimeUnit.MILLISECONDS).subscribe(new Action1<Long>() {@Overridepublic void call(Long aLong) {log("delay:"+System.currentTimeMillis()/1000+"---"+(System.currentTimeMillis()/1000-aLong));}});

结果:

start subscrib:1462519228
subscrib:1462519228
subscrib:1462519229
delay:1462519230---2
delay:1462519231---2

delaySubscription

不同之处在于Delay是延时数据的发射,而DelaySubscription是延时注册Subscriber。
dealy是延迟发射,delaySubscription则是延迟收到。

        log("start subscrib:" + System.currentTimeMillis()/1000);Observable<Long> observable = Observable.create(new Observable.OnSubscribe<Long>() {@Overridepublic void call(Subscriber<? super Long> subscriber) {for (int i = 1; i <= 2; i++) {Long currentTime=System.currentTimeMillis()/1000;log("subscrib:" + currentTime);subscriber.onNext(currentTime);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}}).subscribeOn(Schedulers.newThread());observable.delaySubscription(2000, TimeUnit.MILLISECONDS).subscribe(new Action1<Long>() {@Overridepublic void call(Long aLong) {log("delaySubscription:"+System.currentTimeMillis()/1000+"---"+(System.currentTimeMillis()/1000-aLong));}});

结果:

start subscrib:1462519279
subscrib:1462519281
delaySubscription:1462519281---0
subscrib:1462519282
delaySubscription:1462519282---0

do

do操作符就是给Observable的生命周期的各个阶段加上一系列的回调监听,当Observable执行到这个阶段的时候,这些回调就会被触发。在Rxjava实现了很多的doxxx操作符。

doOnEach

doOnEach可以给Observable加上这样的样一个回调:Observable每发射一个数据的时候就会触发这个回调,不仅包括onNext还包括onError和onCompleted。

        Observable observable=Observable.just(1,2,3);observable.doOnEach(new Action1<Notification>() {@Overridepublic void call(Notification notification) {log("doOnEach send " + notification.getValue() + " type:" + notification.getKind());}}).subscribe(new Action1() {@Overridepublic void call(Object o) {log(o.toString());}});Subject<Integer, Integer> values = ReplaySubject.create();values.doOnEach(new Action1<Notification<? super Integer>>() {@Overridepublic void call(Notification<? super Integer> notification) {log("doOnEach send " + notification.getValue() + " type:" + notification.getKind());}}).subscribe(new Action1() {@Overridepublic void call(Object o) {log(o.toString());}});values.onNext(4);values.onNext(5);values.onNext(6);values.onError(new Exception("Oops"));

结果:

doOnEach send 1 type:OnNext
1
doOnEach send 2 type:OnNext
2
doOnEach send 3 type:OnNext
3
doOnEach send null type:OnCompleteddoOnEach send 4 type:OnNext
4
doOnEach send 5 type:OnNext
5
doOnEach send 6 type:OnNext
6
doOnEach send null type:OnError

doOnNext

doOnNext则只有onNext的时候才会被触发。

        Subject<Integer, Integer> values = ReplaySubject.create();values.doOnNext(new Action1<Integer>() {@Overridepublic void call(Integer integer) {log("doOnNext send :"+integer.toString());}}).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {log(integer.toString());}});values.onNext(4);values.onError(new Exception("Oops"));

结果:

doOnNext send :4
4

doOnSubscribe

doOnSubscribe会在Subscriber进行订阅的时候触发回调。

        Observable observable=Observable.just(1,2);observable.subscribe(new Action1() {@Overridepublic void call(Object o) {log("first:"+o.toString());}});observable.subscribe(new Action1() {@Overridepublic void call(Object o) {log("second:"+o.toString());}});

结果:

I'm be subscribed!
first:1
first:2
I'm be subscribed!
second:1
second:2

doOnUnSubscribe

doOnUnSubscribe则会在Subscriber进行反订阅的时候触发回调。
当一个Observable通过OnError或者OnCompleted结束的时候,会反订阅所有的Subscriber。

Observable observable = Observable.just(1, 2).doOnUnsubscribe(new Action0() {@Overridepublic void call() {log("I'm be unSubscribed!");}});Subscription subscribe1 = observable.subscribe();Subscription subscribe2 = observable.subscribe();subscribe1.unsubscribe();subscribe2.unsubscribe();

结果:

I'm be unSubscribed!
I'm be unSubscribed!

doOnError

doOnError会在OnError发生的时候触发回调,并将Throwable对象作为参数传进回调函数里;

         try {Observable observable = Observable.error(new Throwable("呵呵哒")).doOnError(new Action1<Throwable>() {@Overridepublic void call(Throwable throwable) {log(throwable.getMessage().toString());}});observable.subscribe();}catch (Exception e){log("catch the exception");}

结果:

呵呵哒
catch the exception

doOnComplete

doOnComplete会在OnCompleted发生的时候触发回调。

        Observable observable = Observable.empty().doOnCompleted(new Action0() {@Overridepublic void call() {log("Complete!");}});observable.subscribe();

结果:

Complete!

doOnTerminate

DoOnTerminate会在Observable结束前触发回调,无论是正常还是异常终止;

        Subject<Integer, Integer> values = ReplaySubject.create();values.doOnTerminate(new Action0() {@Overridepublic void call() {log("order to terminate");}}).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {log(integer.toString());}}, new Action1<Throwable>() {@Overridepublic void call(Throwable throwable) {log(throwable.getMessage().toString());}});values.onNext(4);values.onError(new Exception("Oops"));

结果:

4
order to terminate
Oops

finallyDo

finallyDo会在Observable结束后触发回调,无论是正常还是异常终止。

        Observable observable = Observable.empty().finallyDo(new Action0() {@Overridepublic void call() {log("already terminate");}});observable.subscribe(new Action1() {@Overridepublic void call(Object o) {}}, new Action1<Throwable>() {@Overridepublic void call(Throwable throwable) {}}, new Action0() {@Overridepublic void call() {log("Complete!");}});

结果:

Complete!
already terminate

materialize

materialize操作符将OnNext/OnError/OnComplete都转化为一个Notification对象并按照原来的顺序发射出来。

public final Observable<Notification<T>> materialize()


元数据中包含了源 Observable 所发射的动作,是调用 onNext 还是 onComplete。注意上图中,源 Observable 结束的时候, materialize 还会发射一个 onComplete 数据,然后才发射一个结束事件。

        Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);values.take(3).materialize().subscribe(new Action1<Object>() {@Overridepublic void call(Object o) {log(o.toString());}});

结果:

meterialize:0--type:OnNext
meterialize:1--type:OnNext
meterialize:2--type:OnNext
meterialize:null--type:OnCompleted

Notification 类包含了一些判断每个数据发射类型的方法,如果出错了还可以获取错误信息 Throwable 对象。

dematerialize

deMeterialize则是与materialize 执行相反的过程。

        Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);values.take(3).materialize().dematerialize().subscribe(new Action1<Object>() {@Overridepublic void call(Object o) {log(o.toString());}});

结果:

0
1
2

注意:在调用dematerialize()之前必须先调用materialize(),否则会报错。

serialize

强制Observable按次序发射数据并且功能是有效的

如果你无法确保自定义的操作符符合 Rx 的约定,例如从多个源异步获取数据,则可以使用 serialize 操作函数。 serialize 可以把一个不符合约定的 Observable 转换为一个符合约定的 Observable。

下面创建一个不符合约定的 Observable,并且订阅到该 Observable上:

        Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {subscriber.onNext(1);subscriber.onNext(2);subscriber.onCompleted();subscriber.onNext(3);subscriber.onCompleted();}});observable.doOnUnsubscribe(new Action0() {@Overridepublic void call() {log("Unsubscribed");}}) .subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {}}, new Action1<Throwable>() {@Overridepublic void call(Throwable throwable) {}}, new Action0() {@Overridepublic void call() {log("Complete!");}});

结果:

1
2
Complete!
Unsubscribed

先不管上面的 Observable 发射的数据,订阅结束的情况看起来符合 Rx 约定。 这是由于 subscribe 认为当前数据流结束的时候会主动结束这个 Subscription。但实际使用中我们可能并不想直接结束这个Subscription。还有一个函数为 unsafeSubscribe ,该函数不会自动取消订阅。

        observable.doOnUnsubscribe(new Action0() {@Overridepublic void call() {log("Unsubscribed");}}).unsafeSubscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log("Complete!");}@Overridepublic void onError(Throwable e) {}@Overridepublic void onNext(Integer integer) {}});

结果:

1
2
Complete!
3
Complete!

上面的示例最后就没有打印 Unsubscribed 字符串。
unsafeSubscribe 也不能很好的处理错误情况。所以该函数几乎没用。在文档中说:该函数应该仅仅在自定义操作函数中处理嵌套订阅的情况。 为了避免这种操作函数接受到不合法的数据流,我们可以在其上应用 serialize 操作函数:

        Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {subscriber.onNext(1);subscriber.onNext(2);subscriber.onCompleted();subscriber.onNext(3);subscriber.onCompleted();}}).cast(Integer.class).serialize();                    observable.doOnUnsubscribe(new Action0() {@Overridepublic void call() {log("Unsubscribed");}}).unsafeSubscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log("Complete!");}@Overridepublic void onError(Throwable e) {}@Overridepublic void onNext(Integer integer) {}});

结果:

1
2
Complete!

尽管上面的代码中没有调用unsubscribe, 但是数据流事件依然符合约定。最后也收到了完成事件。

timeout

添加超时机制,如果过了指定的一段时间没有发射数据,就发射一个错误通知

  • 我们可以认为timeout()为一个Observable的限时的副本。
  • 如果在指定的时间间隔内Observable不发射值的话,它监听的原始的Observable时就会触发onError()函数。
        Observable<Long> values = Observable.interval(200, TimeUnit.MILLISECONDS);Subscription subscription = values.timeout(300,TimeUnit.MILLISECONDS).subscribe(new Observer<Long>() {@Overridepublic void onCompleted() {log("Complete!");}@Overridepublic void onError(Throwable e) {log(e.getMessage().toString());}@Overridepublic void onNext(Long aLong) {log(aLong+"");}});

结果:

0
1
2
...

Rxjava将Timeout实现为很多不同功能的操作符,比如说超时后用一个备用的Observable继续发射数据等。

Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {for (int i = 0; i <= 3; i++) {try {Thread.sleep(i * 100);} catch (InterruptedException e) {e.printStackTrace();}subscriber.onNext(i);}subscriber.onCompleted();}}).timeout(200, TimeUnit.MILLISECONDS, Observable.just(5, 6)).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {log(integer.toString());}});

结果:

0
1
2
5
6

timestamp

给Observable发射的每个数据项添加一个时间戳

timestamp 把数据转换为 Timestamped 类型,里面包含了原始的数据和一个原始数据是何时发射的时间戳。

public final Observable<Timestamped<T>> timestamp()
        Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);values.take(3).timestamp().subscribe(new Action1<Timestamped>() {@Overridepublic void call(Timestamped mTimestamped) {log(mTimestamped.toString());}});

结果:

Timestamped(timestampMillis = 1461758360570, value = 0)
Timestamped(timestampMillis = 1461758360670, value = 1)
Timestamped(timestampMillis = 1461758360771, value = 2)

从结果可以看到,上面的数据大概每隔100毫秒发射一个。

timeInterval

将一个Observable转换为发射两个数据之间所耗费时间的Observable

如果你想知道前一个数据和当前数据发射直接的时间间隔,则可以使用 timeInterval 函数。

public final Observable<TimeInterval<T>> timeInterval()
         Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);values.take(3).timeInterval().subscribe(new Action1<TimeInterval>() {@Overridepublic void call(TimeInterval mTimeInterval) {log(mTimeInterval.toString());}});

结果:

TimeInterval [intervalInMilliseconds=101, value=0]
TimeInterval [intervalInMilliseconds=99, value=1]
TimeInterval [intervalInMilliseconds=100, value=2]

using

创建一个只在Observable的生命周期内存在的一次性资源

Using操作符创建一个在Observable生命周期内存活的资源,也可以这样理解:我们创建一个资源并使用它,用一个Observable来限制这个资源的使用时间,当这个Observable终止的时候,这个资源就会被销毁。

public static final <T,Resource> Observable<T> using(Func0<Resource> resourceFactory,Func1<? super Resource,? extends Observable<? extends T>> observableFactory,Action1<? super Resource> disposeAction)

using 有三个参数,分别是:

  • 1.创建这个一次性资源的函数
  • 2.创建Observable的函数
  • 3.释放资源的函数

当 Observable 被订阅的时候,resourceFactory 用来获取到需要的资源;observableFactory 用这个资源来发射数据;当 Observable 完成的时候,disposeAction 来释放资源。

        Observable observable = Observable.using(new Func0<Animal>() {@Overridepublic Animal call() {return new Animal();}}, new Func1<Animal, Observable<?>>() {@Overridepublic Observable<?> call(Animal animal) {return Observable.timer(3, TimeUnit.SECONDS);//三秒后发射一次就completed
//                return Observable.timer(4, 2, TimeUnit.SECONDS);//没有completed,不停的发射数据
//                return Observable.range(1,3);//一次发射三个数据,马上结束
//                return Observable.just(1,2,3);//一次发射三个数据,马上结束}}, new Action1<Animal>() {@Overridepublic void call(Animal animal) {animal.relase();}});Subscriber subscriber = new Subscriber() {@Overridepublic void onCompleted() {log("subscriber---onCompleted");}@Overridepublic void onError(Throwable e) {log("subscriber---onError");}@Overridepublic void onNext(Object o) {log("subscriber---onNext"+o.toString());//o是发射的次数统计,可以用timer(4, 2, TimeUnit.SECONDS)测试}};observable.count().subscribe(subscriber);

结果:

create animal
animal eat
animal eat
animal eat
subscriber---onNext1
subscriber---onCompleted
animal released

项目源码 GitHub求赞,谢谢!
引用:
RxJava操作符(六)Utility-云少嘎嘎嘎-ChinaUnix博客
RxJava 教程第三部分:驯服数据流之自定义操作函数 - 云在千峰

RxJava----操作符:辅助操作符相关推荐

  1. RxJava 之创建操作符

    RxJava 的创建操作符主要包括如下内容: just():将一个或多个对象转换成发射这个或这些对象的一个 Observable from():将一个 Iterable.一个 Future 或者一个数 ...

  2. 解剖 RxJava 之过滤操作符

    介绍 此文章结合 Github AnalyseRxJava 项目,给 Android 开发者带来 RxJava 详细的解说.参考自 RxJava Essential 及书中的例子 关于 RxJava ...

  3. 【C语言】算数操作符 移位操作符 以及 sizeof单目操作符讲解

    目录 1.算术操作符 2. 移位操作符 2.1 左移操作符 2.2 右移操作符 3.sizeof 单目操作符介绍 操作符介绍: C语言操作符分为: 算术操作符 移位操作符 位操作符 赋值操作符 单目操 ...

  4. 全面讲解Python列表数组(二),列表分区/片,列表操作符,比较操作符,逻辑操作符,连接操作符,重复操作符,成员关系操作符;

    一 列表分片 简单概括来说就是可以从一个列表中一次性取出来多个元素等操作; 这里有一个列表 member=[1,2,3,4,5] member[1:3] [2,3] 还可以 member[:3] [1 ...

  5. oracle连接操作符,Oracle操作符,函数

    SQL 操作符 Oracle 支持的 SQL 操作符分类如下: 操作符介绍(一) 算术操作符 用于执行数值计算 可以在SQL语句中使用算术表达式,算术表达式由数值数据类型的列名.数值常量和连接它们的算 ...

  6. 操作符 算数操作符

    操作符 算数操作符 + - * / % ++ -- /**任意单元的长度超过int,那么结果就按照最长的长度计算*/public class year {//类对应的块public static vo ...

  7. c++ 操作符大全-算术操作符、关系操作符、逻辑操作符、位操作符、自增自减操作符、赋值操作符、条件操作符、逗号操作符、操作符优先级

    文章目录 操作符 1.算术操作符 2.关系操作符 3.逻辑操作符 4.位操作符 5.自增自减操作符 6.赋值操作符 7.条件操作符 8.逗号操作符 9.操作符优先级 操作符 计算机程序可以看作一串运算 ...

  8. Rxjava(2.操作符)

    参考地址 英文版 中文版 Rxjava(1.基础篇) lambda表达式 ReactiveX中文翻译文档 本文主要分为: 准备工作 改进 还可以更好 丰富的操作符 其他的操作符(归纳几十个) 写下本文 ...

  9. RxJava之过滤操作符

    涉及到列表的数据时,总是会想到一个过滤这个词语.比如,在1-100的整数中,筛选出偶数或者奇数相加,或者将前49个数相加,又或者后36个数相加,等等.在这样的场景中,不由想到将需要的数据筛选出来.在发 ...

最新文章

  1. 聊天机器人之环境准备
  2. SBB:pH主导土壤中固氮群落的共存与装配
  3. Android 模拟Uart 串口通信
  4. C++学习笔记----3.2 C++引用在本质上是什么,它和指针到底有什么区别
  5. ai的预览模式切换_绝对高级!AI打造超酷矩阵纬度文字效果!
  6. python监控程序编写_05-python进阶-简单监控程序开发
  7. C#:导入Excel通用类(CSV格式)
  8. 苹果加入AOM联盟 AV1获全主流生态平台支持
  9. php获取当前周得周一_PHP怎样获得最近一个周一和上周一的日期?
  10. maven实现多模块热部署
  11. ajax原生为什么else会执行2次,关于Ajax,明明传过去的值是1,可不知道为什么就是else起作用?...
  12. Hystrix熔断机制原理剖析
  13. Java编程必备软件
  14. 空间不足以提取VMware Tools解决方法
  15. Google学术的使用指南
  16. 软考初级程序员--学习
  17. Mybatis动态sql是做什么的?都有哪些动态sql?简述一下动态sql的执行原理?
  18. 9个适合上班族晚上在家就能赚钱的副业推荐(建议收藏)
  19. java没错泄露_记一次尴尬的Java应用内存泄露排查
  20. SecureCRT快速连接服务器

热门文章

  1. 前后端交互——Ajax
  2. 计算机应用技术个人研修总结,信息技术个人研修总结(精选7篇)
  3. POI + PDFbox将PPT有图表页转换成图片
  4. 怎样在虚拟主机上安装商城系统
  5. 计算机工程与科学不是CSCD吗,计算机工程与科学核心电子期刊发表要求
  6. SM2国密算法公钥解压缩
  7. 军事标图计算机软件,基于AreEngine的军事标图系统
  8. DIY多快充协议太阳能充电器!----硬件框图
  9. 帮我写基于matlab的风光互补发电系统代码
  10. [置顶]谷歌大牛 Jeff Dean 是如何成为互联网战神的