【RxJava】使用

虽说Rxjava显然已经有些过时了,但是有些公司还在使用,为了能适应更多的业务代码,并提高自己的开发效率,所以这里仅做个Rxjava使用的总结,不涉及Rxjava内部的实现原理。

RxJava的核心就是异步数据流和响应式编程。

我们平时开发过程中的网络请求、数据库读写、文件读写、定时任务等各种耗时操作,都可以使用RxJava来完成。

在平时的开发中,我们可以把所有的事件(数据)我们都可以看成是一条河流,它可以被观察,被过滤等操作,也可以将多条河流汇合成一条新河流。

引入RxJava

只需要引入如下两个依赖即可使用rxjava:

implementation 'io.reactivex.rxjava2:rxjava:2.2.21'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'

RxJava的几个重要概念

  • 观察者Observer:观察事件变化并处理的主要角色,消费者也可以理解为一种特殊的观察者
  • 被观察者:触发事件并决定什么时候发送事件的主要角色
  • 订阅Subscribe:观察者和被观察者建立关联的操作(代码中的体现经常是被观察者去订阅观察者

Observable、Flowable、Single、Completable、Maybe都是被观察者,这几种被观察者可以通过 toObservable、toFlowable、toSingle、toCompletable、toMaybe 相互转化。

常用操作符及其使用

创建操作符

创建被观察者的各种操作符:create()、just() 等等。

create操作符演示:

private fun testRxjava() {Observable.create(object : ObservableOnSubscribe<String> {override fun subscribe(emitter: ObservableEmitter<String>) {// 事件产生的地方,可以通过 emitter对象 来发送事件,发送一个事件,会被观察者的 onNext方法 监听到emitter.onNext("发送一个事件1")emitter.onNext("发送一个事件2")// onComplete方法表示事件发送结束emitter.onComplete()}}).subscribe(object : Observer<String> {override fun onSubscribe(d: Disposable) {// 被观察者和观察者建立订阅之后回调的方法Log.i("testLog", "onSubscribe..")}override fun onNext(t: String) {Log.i("testLog", "onNext.. t = $t")}override fun onError(e: Throwable) {// 可以在subscribe方法中手动抛出异常,如emitter.onError(Throwable("test error!"))// 但是会导致 onComplete() 方法无法回调,也就是说onComplete和onError是互斥的Log.i("testLog", "onError.. e = ${e.message}")}override fun onComplete() {Log.i("testLog", "onComplete..")}})
}

日志打印如下:

I/testLog: onSubscribe..
I/testLog: onNext.. t = 发送一个事件1
I/testLog: onNext.. t = 发送一个事件2
I/testLog: onComplete..

当然,我们也可以使用Consumer(消费者)充当观察者,Consumer只有一个方法即accept,比起Observer需要实现四个方法来说,显然更加简洁了,

而当我们想要处理异常时,只需要再多传入一个专门接收ThrowableConsumer即可。使用如下:

Observable.create(object : ObservableOnSubscribe<String> {override fun subscribe(emitter: ObservableEmitter<String>) {// 事件产生的地方,可以通过 emitter对象 来发送事件,发送一个事件,会被观察者的 onNext方法 监听到emitter.onNext("发送一个事件1")emitter.onNext("发送一个事件2")emitter.onError(Throwable("test error!"))// onComplete方法表示事件发送结束emitter.onComplete()}
}).subscribe(// 第一个Consumer,用来处理事件object : Consumer<String> {override fun accept(t: String) {Log.i("testLog", "accept.. t = $t")}},// 异常将会在这里的Consumer中处理object : Consumer<Throwable> {override fun accept(t: Throwable?) {Log.i("testLog", "accept onError.. e = ${t?.message}")}})日志打印如下:
I/testLog: accept.. t = 发送一个事件1
I/testLog: accept.. t = 发送一个事件2
I/testLog: accept onError.. e = test error!

just操作符

通过just操作符,可以非常简单的完成create操作符实现的事情。

just 方法传参限制最多为10个,另外,just 内部实际上是调用了 fromArray 操作符方法,而fromArray方法是不限制传参数量的。

just操作符的使用如下所示:

private fun testRxjava() {val consumer = object : Consumer<Any> {override fun accept(t: Any?) {Log.i("testLog", "accept.. t = $t")}}Observable.just("a", 2, "3").subscribe(consumer)
}日志打印如下:
I/testLog: accept.. t = a
I/testLog: accept.. t = 2
I/testLog: accept.. t = 3

转换操作符

map() / flatMap() / concatMap() 等。

  • map()操作符可以将被观察者发送的数据类型转变成其他的类型
  • flatMap()可以将事件序列中的元素进行整合加工,返回一个新的被观察者,在网络请求场景中比较常用。
  • concatMap()flatMap()基本一样,只不过concatMap()转化出来的事件是有序的,而flatMap()转化出来的事件是无序的

map()操作符的使用如下:

private fun testRxjava() {val consumer = object : Consumer<Any> {override fun accept(t: Any?) {Log.i("testLog", "accept.. t = $t")}}Observable.just("2").map(object : Function<String, Int> {override fun apply(t: String): Int {// 事件发送出来后进入apply方法// 这里将这个事件(字符串2)转化为int类型并加1,然后返回return t.toString().toInt() + 1}}).subscribe(consumer)
}// 日志打印如下:
I/testLog: accept.. t = 3

flatMap()操作符的使用如下:

private fun testRxjava() {val consumer = object : Consumer<Any> {override fun accept(t: Any?) {Log.i("testLog", "accept.. t = $t")}}Observable.just("1", "2", "3", "4", "5").flatMap(object : Function<String, ObservableSource<Any>> {override fun apply(t: String): ObservableSource<Any> {// ObservableSource是被观察者的顶层父类,所以其实就是生成一个新的被观察者并返回// 这里拿到的 t 是无序的,如果需要有序,则使用concatMap即可// 这种场景类比于当前需要请求基于上一次请求的结果return Observable.just(t + "3")}}).subscribe(consumer)
}

组合操作符

  • concat():将多个被观察者进行整合,得到一个新的被观察者
  • merge():和concat()作用基本一样,只是concat()串行的,而merge()并行
private fun testRxjava() {val consumer = object : Consumer<Any> {override fun accept(t: Any?) {Log.i("testLog", "accept.. t = $t")}}// 将两个被观察者进行整合,得到一个新的被观察者Observable.concat(Observable.just("1"),Observable.just("2")).subscribe(consumer)
}日志打印如下:
I/testLog: accept.. t = 1
I/testLog: accept.. t = 2

功能操作符

subscribeOn():用来决定执行subscribe()方法所处的线程,也就是发射事件所在的线程,该方法需要传入一个Scheduler对象Schedulers.io()Schedulers.newThread()都可以拿到一个Scheduler对象,它们都可以开启一个子线程,只是Schedulers.io()的底层实现是线程池的形式。

observeOn():用来决定下游事件被处理所处的线程,该方法同样需要传入一个Scheduler对象,一般是该方法来切换回到主线程。

下面是它们的用法:

private fun testRxjava() {Observable.create(object : ObservableOnSubscribe<String> {override fun subscribe(emitter: ObservableEmitter<String>) {// 事件产生的地方,可以通过 emitter对象 来发送事件,发送一个事件,会被观察者的 onNext方法 监听到Thread.sleep(2000)emitter.onNext("发送一个事件1,当前线程为:" + Thread.currentThread().name)emitter.onNext("发送一个事件2,当前线程为:" + Thread.currentThread().name)// onComplete方法表示事件发送结束emitter.onComplete()}}).subscribeOn(Schedulers.newThread()) // 决定上游事件被处理所处的线程.observeOn(AndroidSchedulers.mainThread()) // 决定下游事件被处理所处的线程.subscribe(object : Observer<String> {override fun onSubscribe(d: Disposable) {// 被观察者和观察者建立订阅之后回调的方法Log.i("testLog", "onSubscribe.." + Thread.currentThread().name)}override fun onNext(t: String) {Log.i("testLog", "onNext.. t = $t " + Thread.currentThread().name)}override fun onError(e: Throwable) {// 可以在subscribe方法中手动抛出异常,如emitter.onError(Throwable("test error!"))// 但是会导致 onComplete() 方法无法回调,也就是说onComplete和onError是互斥的Log.i("testLog", "onError.. e = ${e.message + Thread.currentThread().name}")}override fun onComplete() {Log.i("testLog", "onComplete.." + Thread.currentThread().name)}})
}日志打印如下:
I/testLog: onSubscribe..main
I/testLog: onNext.. t = 发送一个事件1,当前线程为:RxNewThreadScheduler-1 main
I/testLog: onNext.. t = 发送一个事件2,当前线程为:RxNewThreadScheduler-1 main
I/testLog: onComplete..main

过滤操作符

filter():过滤掉某些事件

private fun testRxjava() {val consumer = object : Consumer<Any> {override fun accept(t: Any?) {Log.i("testLog", "accept.. t = $t")}}// 发送[1,10]范围内的数值Observable.range(1, 10).filter(object : Predicate<Int> {override fun test(t: Int): Boolean {if (t < 5) {// 如果 t 小于5,就将 t 过滤掉return true} else {// 把 >= 5的t保留return false}}}).subscribe(consumer)
}日志打印如下:
I/testLog: accept.. t = 1
I/testLog: accept.. t = 2
I/testLog: accept.. t = 3
I/testLog: accept.. t = 4

【RxJava】使用相关推荐

  1. RxJava 实现模糊搜索

    实现的效果图如下 下面说下实现的具体方法 1 引入库 implementation "io.reactivex.rxjava3:rxjava:3.0.0-RC5"implement ...

  2. RxJava firstElement 与 lastElement 以及 elementAt

    1 firstElement 文档如下 2 lastElement 文档如下 3 elementAt 文档如下 下面写一个下代码 firstElement Observable.just(1,2,3, ...

  3. RxJava 过滤操作符 throttleFirst 与 throttleLast 以及 sample

    看文档发现 throttleFirst 与 throttleLast 以及 Sample 都跳到同一个界面Sample throttleFirst :在某段时间内,只发送该段时间内第1次事件(假如一个 ...

  4. RxJava 过滤操作符 distinct 和 distinctUntilChanged

    distinct  看下文档 distinct  : 过滤掉重复的元素 distinctUntilChanged: 过滤掉连续重复的元素,不连续重复的是不过滤 看下代码 1 distinct Obse ...

  5. RxJava 过滤操作符 take 与 takeLast

    take 看下官方文档 take : 指定 观察者正序接受指定的items数量 takeLast 指定观察者正序接受最后指定的items的数量 看下demo take的代码 Observable.ju ...

  6. RxJava 过滤操作符skip 与 skipLast

    skip 看下文档 skip 是正序跳过指定的items skipLast 是正序跳过指定最后几个items 下面看下代码 Observable.just(1,2,3,4,5,6).skip(1)// ...

  7. RxJava 变换操作符Map

    看下文档如下 通过对每个项目应用函数来转换Observable发出的项目 个人理解为转换类型 下面写一个把int 类型转换为String 类型的demo Observable.create(new O ...

  8. RxJava 操作符 do

    看下文档给的图片 注册一项操作以应对各种可观察的生命周期事件 do的操作符有很多具体如下 下面看下器使用 Observable.create(new ObservableOnSubscribe< ...

  9. RxJava debounce()和throttleWithTimeout()

    官方地址:http://reactivex.io/documentation/operators/debounce.html debounce :防抖动 throttleWithTimeout:节流超 ...

  10. RxJava 解除订阅---------Disposable.dispose()方法

    有时候我们需要解绑订阅,或者取消订阅, 这个时候就使用到了 Disposable.dispose()方法下面以一个案例说下使用方法 //Disposable.dispose()切断观察者 与 被观察者 ...

最新文章

  1. 基于 NodeGit 的周报生成工具
  2. 阿里云mysql5.7 窗口函数_关于阿里云centos版本,mysql5.7的一些注意事项
  3. 新无限天空服务器,天空魔域3782版最新服务端(含网站程序)
  4. 通常每个套接字地址只允许使用一次
  5. python升级pip在哪儿打开_Linux下升级python和安装pip的详解
  6. Tiray.SMSTiray.SMSTiray.SMSTiray.SMSTiray.SMSTiray.SMS
  7. oracle dbms_crypto,DBMS_CRYPTO包对Oracle加密
  8. 孙悟空谈即时通讯有多神通广大
  9. 每个程序员都该学习的5种开发语言,不可错过!
  10. 训练时发生的错误:Couldn‘t open shared file mapping: <000001910A228862>, error code: <1455>
  11. Java学习之路 之 容易混淆篇
  12. iOS底层探索之多线程(五)—GCD不同队列源码分析
  13. VS2019+CUDA编程(流程)
  14. drm是什么_DRM:它是什么,为什么不起作用
  15. 7-27 冒泡法排序 (20分) Kotlin
  16. 高数篇:03罗尔定理
  17. Java反射之Filed(类中的属性对象)
  18. nginx的location匹配字段后斜杠的作用
  19. 2022国内网络安全事件大盘点
  20. swi 指令能用在C语言吗,ARM的SWI异常中断处理程序设计

热门文章

  1. python实现画图工具
  2. Excel转TXT怎么转?介绍两个办法
  3. 西门子s1200教程_如何进行西门子的S-1200的S7单边通讯图文详解
  4. 微信小程序地图组件错误 显示非洲问题
  5. android 记住多个账号,Android实现记住账号密码功能
  6. 全新小龟双端影视1.6壳+反编译视频教程
  7. 解决处理虚拟主机中有文件触发了安全防护报警规则,可能存在webshell网页木马...
  8. day24/MyIE.java
  9. 基于Spring Boot+Shiro+Thymeleaf+MyBatis支付系统+微信商城 源码
  10. Java精品项目源码第117期超市收银管理系统