RxJava 的创建操作符主要包括如下内容:

  • just():将一个或多个对象转换成发射这个或这些对象的一个 Observable
  • from():将一个 Iterable、一个 Future 或者一个数组转换成一个 Observable
  • create():使用一个函数从头创建一个 Observable
  • defer():只有当订阅者订阅才创建 Observable,为每个订阅创建一个新的 Observable
  • range():创建一个发射指定范围的整数序列的 Observable
  • interval():创建一个按照给定的时间间隔发射整数序列的 Observable
  • timer():创建一个在给定的延时之后发射单个数据的 Observable
  • empty():创建一个什么都不做直接通知完成的 Observable
  • error():创建一个什么都不做直接通知错误的 Observable
  • never():创建一个不发射任何数据的 Observable

1. create、just 和 from

  1. create 使用一个从头开始创建一个 Observable,可以使用 create 操作符从头开始创建一个 Observable,给这个操作符传递一个接受观察者作为参数,编写这个函数让它的行为表现为一个 Observable——恰当地调用观察者的 onNext、onError 和 onComplete 方法。一个形式正确的有限 Observable 必须尝试调用观察者的 onComplete 或者 onError 一次,而且次后不能再调用观察者的任何其他方法。 RxJava 建议我们在传递给 create 方法的函数时,先检查一下观察者的 isDisposed 状态,以便在没有观察者的时候,让 Observable 停止发射数据,防止运行昂贵的运算。
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {if (!emitter.isDisposed()) {for (int i = 0; i < 10; i++) {emitter.onNext(i);}emitter.onComplete();}
}).subscribe(integer -> System.out.println("next: " + integer),throwable -> System.out.println("error: " + throwable.getMessage()),() -> System.out.println("Sequence complete."));
复制代码
  1. just 创建一个发射指定值的 Observable,just 将单个数据转换为发射这个单个数据的 Observable。
Observable.just("hello just").subscribe(System.out::println);
复制代码

just 类似 from,但是 from 回见数组或 Iterable 的数据取出然后逐个发射,而 just 只是简单的原样发射,将数组或 Iterable当做单个数据。它可以接受一至十个参数,返回一个按参数列表顺序发射这些数据的 Observable。 在 RxJava 2.0 中,如果在 just() 中传入 null,则会抛出一个空指针异常。

  1. from from 可以将其它种类的对象和数据类型转换为 Observable,当使用 Observable 时,如果要处理的数据都可以转换为 Observables,而不是需要混合使用 Observables 和其他类型的数据,会非常方便。 例如,Iterable 可以看成同步的 Observable;Future 可以看成总是只发射单个数据的 Observable。通过显式地将那些数据转换为 Observables,可以像使用 Observable 一样与它们交互。 在 RxJava 中,from 操作符可以将 Future、Iterable 和数组转换成 Observable。对于 Iterable 和数组,产生的 Observable 会发射 Iterable 或数组的每一项。 from 方法有一个可接受两个可选参数的版本,分别制定超时时长和时间单位。如果过了指定的时长,Future 还没有返回一个值,那么这个 Observable 就会发射错误通知并终止。

2. repeat

创建一个发射特定数据重复多次的 Observable,repeat 会重复地发射数据。某些实现允许重复发射某个数据序列,还有一些允许限制重复的次数。repeat 不是创建一个 Observable,而是重复发射原始 Observable 的数据序列,这个序列或许是无限的,或许是通过 repeat(n) 指定重复的次数。

Observable.just("hello world").repeat(3).subscribe(System.out::println);
复制代码

在 RxJava 2.x 中还有两个跟 repeat 相关的操作符:repeatWhen 和 repeatUtil。

repeatWhen

repeatWhen 不是缓存和重放原始的 Observable 的数据序列,而是有条件地重新订阅和发射原来的 Observable。 将原始 Observable 的终止通知(完成或错误)当做一个 void 数据传递给一个通知处理器,以此来决定是否要重新订阅和发射原来的 Observable。这个通知处理器就像一个 Observable 操作符,接受一个发射 void 通知的 Observable 作为输入,返回一个发射 void 数据(重新订阅和发射 Observable) 或者直接终止(使用 repeatWhen 终止发射数据) 的 Observable。

Observable.range(0, 9).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {return Observable.timer(10, TimeUnit.SECONDS);}
}).subscribe(System.out::println);try {Thread.sleep(12000);
} catch (InterruptedException e) {e.printStackTrace();
}
复制代码

这里会发射 0~9 这几个数据,由于使用了 repeatWhen 操作符,因此在 10s 之后还会再发射一次这些数据。

repeatUntil

repeatUntil 是 RxJava 2.x 新增的操作符,表示直到某个条件就不再重复发射数据。当 BooleanSuppiler 的 getAsBoolean() 返回 false 时,表示重复发射上游的 Observable;当返回 true 时,表示中止重复发射上游的 Observable。

final long startTimeMillis = System.currentTimeMillis();
Observable.interval(500, TimeUnit.MILLISECONDS).take(5).repeatUntil(() -> {System.out.println(System.currentTimeMillis() - startTimeMillis > 5000);return System.currentTimeMillis() - startTimeMillis > 5000;}).subscribe(System.out::println);
try {Thread.sleep(6000);
} catch (InterruptedException e) {e.printStackTrace();
}
复制代码

3. defer、interval 和 timer

1. defer

直到有观察者订阅时才创建 Observable,并且为每个观察者创建一个全新的 Observable。defer 操作符会一直等待直到观察者订阅它,然后使用 Observable 工厂方法生成一个 Observable。对每个观察者都是这样做,因此尽管每个订阅者都以为自己订阅的是同一个 Observable,但实际上每个订阅者获取的是他们自己单独的数据序列。

Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {@Overridepublic ObservableSource<? extends String> call() throws Exception {return Observable.just("hello world");}
});
observable.subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println(s);}
});
复制代码

2. interval

创建一个按固定时间间隔发射整数序列的 Observable。interval 操作符返回一个 Observable,它按固定的时间间隔发射一个无线递增的整数序列。interval 接受一个表示时间间隔的参数和一个表示时间单位的参数。interval 默认在 computation 调度器上执行。

Observable.interval(1, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println(aLong);}});
复制代码

3. timer

创建一个 Observable,在一个给定的延迟后发射一个特殊的值。timer 返回一个 Observable,在延迟一段时间后发射一个简单的数字 0。timer 操作符默认在 computation 调度上执行。

Observable.timer(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {// 2 秒后打印System.out.println("hello timer");}});
try {Thread.sleep(10000);
} catch (InterruptedException e) {e.printStackTrace();
}
复制代码

4. 小结

上述所有的创建操作符,不仅 Observable 可以使用,Flowable 等也可以使用。

转载于:https://juejin.im/post/5b8a6af8e51d4538bf55dbb2

RxJava 之创建操作符相关推荐

  1. Android RxJava操作符的学习---创建操作符

    RxJava如此受欢迎的原因,在于其提供了丰富 & 功能强大的操作符,几乎能完成所有的功能需求 1. 简介 RxJava 操作符的具体简介如下: 2. 类型 RxJava功能强大,所以其对应的 ...

  2. 解剖 RxJava 之过滤操作符

    介绍 此文章结合 Github AnalyseRxJava 项目,给 Android 开发者带来 RxJava 详细的解说.参考自 RxJava Essential 及书中的例子 关于 RxJava ...

  3. Android RxJava操作符的学习---创建操作符--(无条件)网络请求轮询

    1. 需求场景 2. 功能说明 本文将采用Get方法对 金山词霸API 按规定时间 重复发送网络请求,从而模拟 轮询 需求实现 采用 Gson 进行数据解析 3. 具体实现 下面结合 Retrofit ...

  4. Rxjava(2.操作符)

    参考地址 英文版 中文版 Rxjava(1.基础篇) lambda表达式 ReactiveX中文翻译文档 本文主要分为: 准备工作 改进 还可以更好 丰富的操作符 其他的操作符(归纳几十个) 写下本文 ...

  5. RxJava之过滤操作符

    涉及到列表的数据时,总是会想到一个过滤这个词语.比如,在1-100的整数中,筛选出偶数或者奇数相加,或者将前49个数相加,又或者后36个数相加,等等.在这样的场景中,不由想到将需要的数据筛选出来.在发 ...

  6. android rxjava 过滤,解剖 RxJava 之过滤操作符

    介绍 此文章结合 Github AnalyseRxJava 项目,给 Android 开发者带来 RxJava 详细的解说.参考自 RxJava Essential 及书中的例子 关于 RxJava ...

  7. Kotlin中使用RxJAVA的map()操作符遇到的问题

    2019独角兽企业重金招聘Python工程师标准>>> 在使用Kotlin开发Android项目时遇到Rxjava 的map()类型转换出错,如下 解决方法如下: private f ...

  8. RxJava重复创建Disposable导致在销毁的时候销毁不全面,出现内存泄漏

    泄漏的引用栈: In com.eebbk.askhomework.content:1.10.0.0:1100000. bfc-leakcanary:5.0.12-bugfix. * com.eebbk ...

  9. 使用RxJava的retryWhen操作符实现token过期自动刷新

    1.问题描述 我们的项目中请求与登录相关接口时需要带上sessionId这个参数,当发现token过期的时候就需要走刷新token的接口,获取最新的token,然后再重新进行请求. 如果项目中是用Ok ...

最新文章

  1. [JVM 相关] Java 新型垃圾回收器(Garbage First,G1)
  2. sysfs: cannot create duplicate filename '/class/spi_master/spi1'
  3. OPenCV膨胀函数dilate()的使用
  4. 清除ubuntu下缓存、软件安装包和多余内核
  5. AdonisUI - 用于 WPF 应用程序的轻量级 UI 工具包,提供经典但增强的 Windows 视觉效果...
  6. 我的职场战争--一年来的开发组内战实录
  7. 第十二届蓝桥杯青少年python组 第1-3题 C++实现
  8. mysql memory inno_如何定位RDS MySQL内存使用率高?-阿里云开发者社区
  9. 通过ODBC实现ACCESS与SQL数据互导(临安人才网 )
  10. Logistic逻辑回归用初等数学解读逻辑回归
  11. sap linux系统安装教程,SAP系统安装步骤 --- 46C OR 4.7
  12. 女生考华为IE会容易一点吗?女生做网工一般是什么岗位呢?找工作容易吗?需要出差吗?以后怎么发展呢?
  13. PTC Onshape 的两个核心弱点
  14. 编写C\Cpp程序中函数声明定义中的常见错误
  15. android 原生控件,抽离Android原生控件的方法
  16. Pygame推箱子2021:经典版推箱子小游戏,一起回味童年经典掌机游戏~
  17. 机器人开源项目KDL源码学习整理
  18. 获取天气预报ajax,Ajax 通过城市名获取数据(全国天气预报API)
  19. 关于用GetDIBits代替GetPixel效率低的问题
  20. 42岁大厂高管,给30岁-39岁人提个醒:这6个让你变强的习惯,要尽快养成

热门文章

  1. python程序设计与应用第4章实验
  2. python代码块使用缩进来表示_Python代码需要缩进吗
  3. ROS入门笔记(十):编写与测试简单的消息发布器和订阅器(C++)
  4. 小度智能音响拆解 芯片_不拆不快:小度音箱拆解测评
  5. 处于停机等非正常状态_关于消防栓稳压泵不停机故障的思考
  6. 广告冷启动_超级推荐如何缩短冷启动时间,让流量快速注入店铺
  7. Java内部类 Inner Class
  8. Python编程基础05:运算符与表达式
  9. 【BZOJ1434】【codevs2348】染色游戏,博弈
  10. 18.外部相机校准——旋转(Rotation),R是什么样子的,绕Z轴旋转的例子,齐次坐标旋转_2