转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/106720158
本文出自【赵彦军的博客】

文章目录

  • 依赖接入
  • Flowable
  • Single
  • Maybe
  • BackpressureStrategy
  • 线程切换
  • concat
    • 例子1

依赖接入

implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation "io.reactivex.rxjava3:rxjava:3.0.4"

Flowable

//java 方式
Flowable.just(1).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Throwable {}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Throwable {}});//或者用 Lambda 简写
Flowable.just(1).subscribe( it -> {}, throwable -> {});

range 一组序列数据

Flowable.range(0, 4).subscribe(it -> {//结果 0 1 2 3}, throwable -> {});

Single

Single只发射单个数据或错误事件,即使发射多个数据,后面发射的数据也不会处理。
只有 onSuccessonError事件,没有 onNextonComplete事件。

SingleEmitter

public interface SingleEmitter<@NonNull T> {void onSuccess(@NonNull T t);void onError(@NonNull Throwable t);void setDisposable(@Nullable Disposable d);void setCancellable(@Nullable Cancellable c);boolean isDisposed();boolean tryOnError(@NonNull Throwable t);}

示例1

   Single.create(new SingleOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull SingleEmitter<Integer> emitter) throws Throwable {emitter.onSuccess(1);}}).subscribe(integer -> {}, throwable -> {});

示例2

  Single.just(1).subscribe(integer -> {}, throwable -> {});

Maybe

Maybe 是 RxJava2.x 之后才有的新类型,可以看成是Single和Completable的结合。
Maybe 也只能发射单个事件或错误事件,即使发射多个数据,后面发射的数据也不会处理。
只有 onSuccessonErroronComplete事件,没有 onNext 事件。

public interface MaybeEmitter<@NonNull T> {void onSuccess(@NonNull T t);void onError(@NonNull Throwable t);void onComplete();void setDisposable(@Nullable Disposable d);void setCancellable(@Nullable Cancellable c);boolean isDisposed();boolean tryOnError(@NonNull Throwable t);}

实例1

Maybe.create(new MaybeOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull MaybeEmitter<Integer> emitter) throws Throwable {emitter.onSuccess(1);emitter.onComplete();}}).subscribe(integer -> {}, throwable -> {});

实例2

   Maybe.just(1).subscribe(integer -> {}, throwable -> {});

BackpressureStrategy

背压策略

public enum BackpressureStrategy {/*** The {@code onNext} events are written without any buffering or dropping.* Downstream has to deal with any overflow.* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.*/MISSING,/*** Signals a {@link io.reactivex.rxjava3.exceptions.MissingBackpressureException MissingBackpressureException}* in case the downstream can't keep up.*/ERROR,/*** Buffers <em>all</em> {@code onNext} values until the downstream consumes it.*/BUFFER,/*** Drops the most recent {@code onNext} value if the downstream can't keep up.*/DROP,/*** Keeps only the latest {@code onNext} value, overwriting any previous value if the* downstream can't keep up.*/LATEST
}
  • MISSING 策略则表示通过 Create 方法创建的 Flowable 没有指定背压策略,不会对通过 OnNext 发射的数据做缓存或丢弃处理,需要下游通过背压操作符
  • BUFFER 策略则在还有数据未下发完成时就算上游调用onComplete或onError也会等待数据下发完成
  • LATEST 策略则当产生背压时仅会缓存最新的数据
  • DROP 策略为背压时丢弃背压数据
  • ERROR 策略是背压时抛出异常调用onError
 Flowable.create(new FlowableOnSubscribe<Long>() {@Overridepublic void subscribe(@NonNull FlowableEmitter<Long> emitter) throws Throwable {emitter.onNext(1L);emitter.onNext(2L);emitter.onComplete();}}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(it -> {}, throwable -> {});

线程切换

RxUtil

package com.example.streamimport io.reactivex.rxjava3.android.schedulers.AndroidSchedulers
import io.reactivex.rxjava3.core.FlowableTransformer
import io.reactivex.rxjava3.core.MaybeTransformer
import io.reactivex.rxjava3.core.ObservableTransformer
import io.reactivex.rxjava3.core.SingleTransformer
import io.reactivex.rxjava3.schedulers.Schedulers/*** @author yanjun.zhao* @time 2020/6/12 8:39 PM* @desc*/object RxUtil {/*** 线程切换*/fun <T> maybeToMain(): MaybeTransformer<T, T> {return MaybeTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}/*** 线程切换*/fun <T> singleToMain(): SingleTransformer<T, T> {return SingleTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}/*** 线程切换*/fun <T> flowableToMain(): FlowableTransformer<T, T> {return FlowableTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}fun <T> observableToMain(): ObservableTransformer<T, T> {return ObservableTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}}

具体实现

package com.example.streamimport android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Maybe
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.Singleclass MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)Single.just(1).map {//运行在子线程it}.compose(RxUtil.singleToMain())  //线程转换.subscribe({//运行在主线程},{it.printStackTrace()})Maybe.just(1).map {//运行在子线程it}.compose(RxUtil.maybeToMain())  //线程转换.subscribe({//运行在主线程},{it.printStackTrace()})Flowable.just(1).map {//运行在子线程it}.compose(RxUtil.flowableToMain())  //线程转换.subscribe({//运行在主线程},{it.printStackTrace()})Observable.just(1).map {//运行在子线程it}.compose(RxUtil.observableToMain())  //线程转换.subscribe({ it ->//运行在主线程},{it.printStackTrace()})}
}

concat

Concat操作符连接多个Observable的输出,就好像它们是一个Observable,第一个Observable发射的所有数据在第二个Observable发射的任何数据前面,以此类推。

直到前面一个Observable终止,Concat才会订阅额外的一个Observable。注意:因此,如果你尝试连接一个"热"Observable(这种Observable在创建后立即开始发射数据,即使没有订阅者),Concat将不会看到也不会发射它之前发射的任何数据。

例子1

  private var ob1 = Observable.create<String> {Log.d("concat-数据源1", " ${Thread.currentThread().name} ")it.onNext("a1")it.onComplete()}private var ob2 = Observable.create<String> {Log.d("concat-数据源2", " ${Thread.currentThread().name} ")it.onNext("a2")it.onComplete()}private var ob3 = Observable.create<String> {Log.d("concat-数据源3", " ${Thread.currentThread().name} ")it.onNext("a3")it.onComplete()}Observable.concat<String>(ob1, ob2, ob3).subscribeOn(Schedulers.io()).subscribe{Log.d("concat-结果", " ${Thread.currentThread().name} " + it)}

结果是:

concat-数据源1:  RxCachedThreadScheduler-1
concat-结果:  RxCachedThreadScheduler-1
concat-数据源2:  RxCachedThreadScheduler-1
concat-结果:  RxCachedThreadScheduler-1
concat-数据源3:  RxCachedThreadScheduler-1
concat-结果:  RxCachedThreadScheduler-1

结果分析:

  • concat 输出结果是有序的
  • concat 会使三个数据源都会执行

那么如果我要实现哪个数据源有数据,我就用哪个数据,一旦获取到想要的数据,后续数据源不再执行。其实很简单,用 firstElement() ,这个需求有点像图片加载流程 先从内存取,内存没有从本地文件取,本都文件没有就请求服务器。一旦哪个环节获取到了数据,立刻停止后面的流程

Observable.concat<String>(ob1, ob2, ob3).firstElement().subscribeOn(Schedulers.io()).subscribe {Log.d("concat-结果", " ${Thread.currentThread().name} ")}}

运行结果为:

concat-数据源1:  RxCachedThreadScheduler-1
concat-结果:  RxCachedThreadScheduler-1

Android RxJava 3.x 使用总结相关推荐

  1. android 多个绑定事件,Android RxJava 实际应用讲解:联合判断多个事件

    前言 Rxjava,由于其基于事件流的链式调用.逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎. Github截图 RxJava如此受欢迎的原因,在于其提供了丰富 & ...

  2. android novate乱码,Android RxJava+Retrofit2+RxBinding

    Android RxJava+Retrofit2+RxBinding 本文原创,转载请注明出处.欢迎关注我的 简书. 安利一波我写的开发框架:MyScFrame喜欢的话就给个Star 前言: 之前写了 ...

  3. android RxJava(RxAndroid)的简单使用

    今天,简单讲讲android里如何使用RxJava(RxAndroid). Android框架系列: 一.android EventBus的简单使用 二.android Glide简单使用 三.and ...

  4. android如何获取网络的状态码,Android RxJava+Retrofit网络异常、状态码统一处理

    Android RxJava+Retrofit 网络异常捕获.状态码统一处理 前言 近来使用RxJava+Retrofit进行开发,在项目中遇到这样一个需求,联网请求获得数据异常时,需要将对应的Mes ...

  5. Android RxJava 基本用法

    Android RxJava 基本用法 RxJava 使用函数响应式编程方式,它可以简化项目,处理嵌套回调的异步事件. RxJava 依赖 这里以 RxJava 2.2.1 为例.在 build.gr ...

  6. Android RxJava应用:优雅实现网络请求轮询(无条件)

    前言 Rxjava,由于其基于事件流的链式调用.逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎. 今天,我将为大家带来 Rxjava创建操作符的常见开发应用场景:轮询需求 ...

  7. Android RxJava操作符的学习---组合合并操作符---联合判断多个事件

    1. 需求场景 需要同时对多个事件进行联合判断 如,填写表单时,需要表单里所有信息(姓名.年龄.职业等)都被填写后,才允许点击 "提交" 按钮 2. 功能说明 此处采用 填写表单 ...

  8. Android RxJava 2.0中backpressure(背压)概念的理解

    英文原文:https://github.com/ReactiveX/RxJava/wiki/Backpressure Backpressure(背压.反压力) 在rxjava中会经常遇到一种情况就是被 ...

  9. Android—RxJava库知识

    RXJAVA:一个在 Java VM 上使用可观测的序列来组成异步的.基于事件的程序的库. 优点:异步,逻辑简洁易懂. 程序要求:将一个给出的目录数组 File[] folders 中每个目录下的 p ...

最新文章

  1. 思科:全球近75%的物联网项目失败
  2. R语言dplyr包mutate_if函数修改所有满足条件的数据列的内容实战
  3. 联手马斯克创立神秘组织,他正改写人类与AI的未来
  4. Redis configuration
  5. [Python]网络爬虫(三):异常的处理和HTTP状态码的分类
  6. zabbix2.4详细安装过程
  7. c语言推箱子给上颜色,本人的C语言大作业——推箱子
  8. Java 接口 新特性(Java8)
  9. matlab figure被图像填充
  10. SQLAlchemy engine.Engine
  11. 如何对SNP设计引物: CAPS, dCAPS
  12. 制造业ERP系统具体操作流程是什么?
  13. js laypage mysql_laypage 物理分页与逻辑分页实例
  14. 图片放大后很模糊怎么办?
  15. 这应该是史上最强的物理学科普(雄文)
  16. SPSS基础操作(一):用幂指数型的权函数建立加权最小二乘回归方程
  17. 什么是模拟信号?什么是数字信号
  18. 计算机人工智能论文参考文献格式,人工智能论文参考文献范例借鉴
  19. 《中国人工智能系列白皮书——智能驾驶》精编
  20. 思维导图软件 XMind 2022

热门文章

  1. Java SE有几个代码_JavaSE常用类及方法的介绍(附代码)
  2. php视频生成指定帧图片,python3.5 cv2 获取视频特定帧生成jpg图片
  3. 深度学习和目标检测系列教程 7-300:先进的目标检测Faster R-CNN架构
  4. 四十三、Scrapy 爬取前程无忧51jobs
  5. 八十三、Eureka实现相互注册
  6. 深度学习篇| keras入门(一)
  7. Matlab中画图以及plot函数及legend函数详解
  8. 两概率分布交叉熵的最小值是多少?
  9. 报名 | CCKS 2021评测任务:生活服务领域知识图谱问答
  10. CVPR 2019 | 人脸照片秒变艺术肖像画:清华大学提出APDrawingGAN