文章目录
依赖接入
Flowable
Single
Maybe
BackpressureStrategy
线程切换
concat
例子1
依赖接入

implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation "io.reactivex.rxjava3:rxjava:3.0.4"

Flowable

//java 方式
Flowable.just(1).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Throwable {}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Throwable {}});//或者用 Lambda 简写
Flowable.just(1).subscribe( it -> {}, throwable -> {});

range 一组序列数据

Flowable.range(0, 4).subscribe(it -> {//结果 0 1 2 3}, throwable -> {});

Single
Single只发射单个数据或错误事件,即使发射多个数据,后面发射的数据也不会处理。
只有 onSuccess 和 onError事件,没有 onNext 、onComplete事件。

SingleEmitter

public interface SingleEmitter<@NonNull T> {void onSuccess(@NonNull T t);void onError(@NonNull Throwable t);void setDisposable(@Nullable Disposable d);void setCancellable(@Nullable Cancellable c);boolean isDisposed();boolean tryOnError(@NonNull Throwable t);}

示例1

Single.create(new SingleOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull SingleEmitter<Integer> emitter) throws Throwable {emitter.onSuccess(1);}}).subscribe(integer -> {}, throwable -> {});

示例2

  Single.just(1).subscribe(integer -> {}, throwable -> {});

Maybe
Maybe 是 RxJava2.x 之后才有的新类型,可以看成是Single和Completable的结合。
Maybe 也只能发射单个事件或错误事件,即使发射多个数据,后面发射的数据也不会处理。
只有 onSuccess 、 onError 、onComplete事件,没有 onNext 事件。

public interface MaybeEmitter<@NonNull T> {void onSuccess(@NonNull T t);void onError(@NonNull Throwable t);void onComplete();void setDisposable(@Nullable Disposable d);void setCancellable(@Nullable Cancellable c);boolean isDisposed();boolean tryOnError(@NonNull Throwable t);}

实例1

Maybe.create(new MaybeOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull MaybeEmitter<Integer> emitter) throws Throwable {emitter.onSuccess(1);emitter.onComplete();}}).subscribe(integer -> {}, throwable -> {});

实例2

Maybe.just(1).subscribe(integer -> {}, throwable -> {});

BackpressureStrategy
背压策略

public enum BackpressureStrategy {/*** The {@code onNext} events are written without any buffering or dropping.* Downstream has to deal with any overflow.* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.*/MISSING,/*** Signals a {@link io.reactivex.rxjava3.exceptions.MissingBackpressureException MissingBackpressureException}* in case the downstream can't keep up.*/ERROR,/*** Buffers <em>all</em> {@code onNext} values until the downstream consumes it.*/BUFFER,/*** Drops the most recent {@code onNext} value if the downstream can't keep up.*/DROP,/*** Keeps only the latest {@code onNext} value, overwriting any previous value if the* downstream can't keep up.*/LATEST
}

MISSING 策略则表示通过 Create 方法创建的 Flowable 没有指定背压策略,不会对通过 OnNext 发射的数据做缓存或丢弃处理,需要下游通过背压操作符
BUFFER 策略则在还有数据未下发完成时就算上游调用onComplete或onError也会等待数据下发完成
LATEST 策略则当产生背压时仅会缓存最新的数据
DROP 策略为背压时丢弃背压数据
ERROR 策略是背压时抛出异常调用onError

 Flowable.create(new FlowableOnSubscribe<Long>() {@Overridepublic void subscribe(@NonNull FlowableEmitter<Long> emitter) throws Throwable {emitter.onNext(1L);emitter.onNext(2L);emitter.onComplete();}}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(it -> {}, throwable -> {});

线程切换

RxUtil

package com.example.streamimport io.reactivex.rxjava3.android.schedulers.AndroidSchedulers
import io.reactivex.rxjava3.core.FlowableTransformer
import io.reactivex.rxjava3.core.MaybeTransformer
import io.reactivex.rxjava3.core.ObservableTransformer
import io.reactivex.rxjava3.core.SingleTransformer
import io.reactivex.rxjava3.schedulers.Schedulers/*** @author yanjun.zhao* @time 2020/6/12 8:39 PM* @desc*/object RxUtil {/*** 线程切换*/fun <T> maybeToMain(): MaybeTransformer<T, T> {return MaybeTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}/*** 线程切换*/fun <T> singleToMain(): SingleTransformer<T, T> {return SingleTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}/*** 线程切换*/fun <T> flowableToMain(): FlowableTransformer<T, T> {return FlowableTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}fun <T> observableToMain(): ObservableTransformer<T, T> {return ObservableTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}}

具体实现

package com.example.streamimport android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Maybe
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.Singleclass MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)Single.just(1).map {//运行在子线程it}.compose(RxUtil.singleToMain())  //线程转换.subscribe({//运行在主线程},{it.printStackTrace()})Maybe.just(1).map {//运行在子线程it}.compose(RxUtil.maybeToMain())  //线程转换.subscribe({//运行在主线程},{it.printStackTrace()})Flowable.just(1).map {//运行在子线程it}.compose(RxUtil.flowableToMain())  //线程转换.subscribe({//运行在主线程},{it.printStackTrace()})Observable.just(1).map {//运行在子线程it}.compose(RxUtil.observableToMain())  //线程转换.subscribe({ it ->//运行在主线程},{it.printStackTrace()})}
}

concat

Concat操作符连接多个Observable的输出,就好像它们是一个Observable,第一个Observable发射的所有数据在第二个Observable发射的任何数据前面,以此类推。

直到前面一个Observable终止,Concat才会订阅额外的一个Observable。注意:因此,如果你尝试连接一个"热"Observable(这种Observable在创建后立即开始发射数据,即使没有订阅者),Concat将不会看到也不会发射它之前发射的任何数据。例子1

private var ob1 = Observable.create<String> {Log.d("concat-数据源1", " ${Thread.currentThread().name} ")it.onNext("a1")it.onComplete()}private var ob2 = Observable.create<String> {Log.d("concat-数据源2", " ${Thread.currentThread().name} ")it.onNext("a2")it.onComplete()}private var ob3 = Observable.create<String> {Log.d("concat-数据源3", " ${Thread.currentThread().name} ")it.onNext("a3")it.onComplete()}Observable.concat<String>(ob1, ob2, ob3).subscribeOn(Schedulers.io()).subscribe{Log.d("concat-结果", " ${Thread.currentThread().name} " + it)}

结果是:

concat-数据源1:  RxCachedThreadScheduler-1
concat-结果:  RxCachedThreadScheduler-1
concat-数据源2:  RxCachedThreadScheduler-1
concat-结果:  RxCachedThreadScheduler-1
concat-数据源3:  RxCachedThreadScheduler-1
concat-结果:  RxCachedThreadScheduler-1 

结果分析:

concat 输出结果是有序的
concat 会使三个数据源都会执行
那么如果我要实现哪个数据源有数据,我就用哪个数据,一旦获取到想要的数据,后续数据源不再执行。其实很简单,用 firstElement() ,这个需求有点像图片加载流程 先从内存取,内存没有从本地文件取,本都文件没有就请求服务器。一旦哪个环节获取到了数据,立刻停止后面的流程

Observable.concat<String>(ob1, ob2, ob3).firstElement().subscribeOn(Schedulers.io()).subscribe {Log.d("concat-结果", " ${Thread.currentThread().name} ")}}

运行结果为:

concat-数据源1:  RxCachedThreadScheduler-1
concat-结果:  RxCachedThreadScheduler-1 

————————————————
版权声明:本文为CSDN博主「赵彦军」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/zhaoyanjun6/article/details/106720158

RxJava 3.x 使用总结相关推荐

  1. RxJava 实现模糊搜索

    实现的效果图如下 下面说下实现的具体方法 1 引入库 implementation "io.reactivex.rxjava3:rxjava:3.0.0-RC5"implement ...

  2. RxJava firstElement 与 lastElement 以及 elementAt

    1 firstElement 文档如下 2 lastElement 文档如下 3 elementAt 文档如下 下面写一个下代码 firstElement Observable.just(1,2,3, ...

  3. RxJava 过滤操作符 throttleFirst 与 throttleLast 以及 sample

    看文档发现 throttleFirst 与 throttleLast 以及 Sample 都跳到同一个界面Sample throttleFirst :在某段时间内,只发送该段时间内第1次事件(假如一个 ...

  4. RxJava 过滤操作符 distinct 和 distinctUntilChanged

    distinct  看下文档 distinct  : 过滤掉重复的元素 distinctUntilChanged: 过滤掉连续重复的元素,不连续重复的是不过滤 看下代码 1 distinct Obse ...

  5. RxJava 过滤操作符 take 与 takeLast

    take 看下官方文档 take : 指定 观察者正序接受指定的items数量 takeLast 指定观察者正序接受最后指定的items的数量 看下demo take的代码 Observable.ju ...

  6. RxJava 过滤操作符skip 与 skipLast

    skip 看下文档 skip 是正序跳过指定的items skipLast 是正序跳过指定最后几个items 下面看下代码 Observable.just(1,2,3,4,5,6).skip(1)// ...

  7. RxJava 变换操作符Map

    看下文档如下 通过对每个项目应用函数来转换Observable发出的项目 个人理解为转换类型 下面写一个把int 类型转换为String 类型的demo Observable.create(new O ...

  8. RxJava 操作符 do

    看下文档给的图片 注册一项操作以应对各种可观察的生命周期事件 do的操作符有很多具体如下 下面看下器使用 Observable.create(new ObservableOnSubscribe< ...

  9. RxJava debounce()和throttleWithTimeout()

    官方地址:http://reactivex.io/documentation/operators/debounce.html debounce :防抖动 throttleWithTimeout:节流超 ...

  10. RxJava 解除订阅---------Disposable.dispose()方法

    有时候我们需要解绑订阅,或者取消订阅, 这个时候就使用到了 Disposable.dispose()方法下面以一个案例说下使用方法 //Disposable.dispose()切断观察者 与 被观察者 ...

最新文章

  1. c语言 内存搜索,怎么在一段内存中搜索一个十六进制串
  2. 1-2、算法设计常用思想之贪婪法
  3. 《Go语言圣经》学习笔记 第五章函数
  4. 教你如何在Python中读,写和解析CSV文
  5. SpringBoot 集成 Jsp、Thymeleaf 模板引擎 + Thymeleaf 基本使用
  6. 中英文对照 —— 宗教
  7. 自己做的一个水印生成类
  8. JQValidate使用说明
  9. mysql查看enum和set值_mysql中的enum和set类型_MySQL
  10. 简要增量式PI控制器
  11. 循环链表解决约瑟夫问题
  12. Vue 记录一次安装插件引起的项目崩溃(This is probably not a problem with npm,there is likely additional logging outp)
  13. 好用的录音软件排行 有哪些电脑录音工具
  14. 华为无线portal服务器,portal服务器配置
  15. 合同变换为什么是一个行变换再跟一个相应的列变换?
  16. 武汉交通职业学院计算机协会,2018年中国计算机学会(CCF)职业教育大会在苏州顺利落幕...
  17. c语言实验3报告及答案,C语言编程实验报告3
  18. input文本框实现输入英文时自动触发事件,输入中文时要输入完成后才触发事件
  19. RemiLore:少女与异世界与魔导书 简体中文免安装版
  20. VLAN和交换机接口模式:access、trunk、hybrid

热门文章

  1. 在 Solaris 系统上安装 PHP
  2. 高中计算机会考vb试题,信息技术高中会考VB模块操作题试题库
  3. 键盘起源于那一带计算机,键盘排序的由来竟是因为这个原因……
  4. 能骗我跟你开房的方法只有一个
  5. 炫酷登录界面(html+css)
  6. VMware-Ubuntu安装bochs
  7. [1032]spark-3.0安装和入门
  8. 2021最新壁纸小程序源码 壁纸小程序已去授权
  9. Halcon2019软件安装教程
  10. HTML矢量图标的应用