前言

  • Rxjava,由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎。


本文主要讲解的是RxJava中的 背压控制策略,希望你们会喜欢。

Carson带你学RxJava系列文章,包括 原理、操作符、应用场景、背压等等,请看文章:Android:这是一份全面 & 详细的RxJava学习指南

本文所有代码 Demo均存放在Carson_Ho的Github地址


目录


1. 引言

1.1 背景

  • 观察者 & 被观察者 之间存在2种订阅关系:同步 & 异步。具体如下:

  • 对于异步订阅关系,存在 被观察者发送事件速度 与观察者接收事件速度 不匹配的情况
  1. 发送 & 接收事件速度 = 单位时间内 发送&接收事件的数量
  2. 大多数情况,主要是 被观察者发送事件速度 > 观察者接收事件速度

1.2 问题

  • 被观察者 发送事件速度太快,而观察者 来不及接收所有事件,从而导致观察者无法及时响应 / 处理所有发送过来事件的问题,最终导致缓存区溢出、事件丢失 & OOM
  1. 如,点击按钮事件:连续过快的点击按钮10次,则只会造成点击2次的效果;
  2. 解释:因为点击速度太快了,所以按钮来不及响应

下面再举个例子:

  • 被观察者的发送事件速度 = 10ms / 个
  • 观察者的接收事件速度 = 5s / 个

即出现发送 & 接收事件严重不匹配的问题

 Observable.create(new ObservableOnSubscribe<Integer>() {// 1. 创建被观察者 & 生产事件@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {for (int i = 0; ; i++) {Log.d(TAG, "发送了事件"+ i );Thread.sleep(10);// 发送事件速度:10ms / 个 emitter.onNext(i);}}}).subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行.subscribe(new Observer<Integer>() {// 2. 通过通过订阅(subscribe)连接观察者和被观察者@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "开始采用subscribe连接");}@Overridepublic void onNext(Integer value) {try {// 接收事件速度:5s / 个 Thread.sleep(5000);Log.d(TAG, "接收到了事件"+ value  );} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic void onError(Throwable e) {Log.d(TAG, "对Error事件作出响应");}@Overridepublic void onComplete() {Log.d(TAG, "对Complete事件作出响应");}});
  • 结果
    由于被观察者发送事件速度 > 观察者接收事件速度,所以出现流速不匹配问题,从而导致OOM

1.3 解决方案

采用 背压策略。

下面,我将开始介绍背压策略。


2. 背压策略简介

2.1 定义

一种 控制事件流速 的策略

2.2 作用

异步订阅关系 中,控制事件发送 & 接收的速度

注:背压的作用域 = 异步订阅关系,即 被观察者 & 观察者处在不同线程中

2.3 解决的问题

解决了 因被观察者发送事件速度 与 观察者接收事件速度 不匹配(一般是前者 快于 后者),从而导致观察者无法及时响应 / 处理所有 被观察者发送事件 的问题

2.4 应用场景

  • 被观察者发送事件速度 与 观察者接收事件速度 不匹配的场景
  • 具体场景就取决于 该事件的类型,如:网络请求,那么具体场景:有很多网络请求需要执行,但执行者的执行速度没那么快,此时就需要使用背压策略来进行控制。

3. 背压策略的原理

  • 那么,RxJava实现背压策略(Backpressure)的原理是什么呢?
  • 解决方案 & 思想主要如下:

  • 示意图如下

  • RxJava1.0 中被观察者的旧实现 Observable 对比

  • 好了,那么上图中在RxJava 2.0观察者模型中,Flowable到底是什么呢?它其实是RxJava 2.0中被观察者的一种新实现,同时也是背压策略实现的承载者
  • 请继续看下一节的介绍:背压策略的具体实现 - Flowable

4. 背压策略的具体实现:Flowable

RxJava2.0中,采用 Flowable 实现 背压策略

正确来说,应该是 “非阻塞式背压” 策略

4.1 Flowable 介绍

  • 定义:在 RxJava2.0中,被观察者(Observable)的一种新实现

同时,RxJava1.0 中被观察者(Observable)的旧实现: Observable依然保留

  • 作用:实现 非阻塞式背压 策略

4.2 Flowable 特点

  • Flowable的特点 具体如下

  • 下面再贴出一张RxJava2.0RxJava1.0的观察者模型的对比图

实际上,RxJava2.0 也有保留(被观察者)Observerble - Observer(观察者)的观察者模型,此处只是为了做出对比让读者了解

4.3 与 RxJava1.0 中被观察者的旧实现 Observable 的关系

  • 具体如下图

  • 那么,为什么要采用新实现Flowable实现背压,而不采用旧的Observable呢?
  • 主要原因:旧实现Observable无法很好解决背压问题。

4.4 Flowable的基础使用

  • Flowable的基础使用非常类似于 Observable
  • 具体如下
/*** 步骤1:创建被观察者 =  Flowable*/Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);emitter.onComplete();}}, BackpressureStrategy.ERROR);// 需要传入背压参数BackpressureStrategy,下面会详细讲解/*** 步骤2:创建观察者 =  Subscriber*/Subscriber<Integer> downstream = new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {// 对比Observer传入的Disposable参数,Subscriber此处传入的参数 = Subscription// 相同点:Subscription具备Disposable参数的作用,即Disposable.dispose()切断连接, 同样的调用Subscription.cancel()切断连接// 不同点:Subscription增加了void request(long n)Log.d(TAG, "onSubscribe");s.request(Long.MAX_VALUE);// 关于request()下面会继续详细说明}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: " + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}};/*** 步骤3:建立订阅关系*/upstream.subscribe(downstream);

  • 更加优雅的链式调用
        // 步骤1:创建被观察者 =  FlowableFlowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {Log.d(TAG, "发送事件 1");emitter.onNext(1);Log.d(TAG, "发送事件 2");emitter.onNext(2);Log.d(TAG, "发送事件 3");emitter.onNext(3);Log.d(TAG, "发送完成");emitter.onComplete();}}, BackpressureStrategy.ERROR).subscribe(new Subscriber<Integer>() {// 步骤2:创建观察者 =  Subscriber & 建立订阅关系@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");s.request(3);}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "接收到了事件" + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});
  • 至此,Flowable的基础使用讲解完
  • 关于更深层次的使用会结合 背压策略的实现 来讲解

5. 背压策略的使用

  • 在本节中,我将结合 背压策略的原理 & Flowable的使用,为大家介绍在RxJava 2.0 中该如何使用Flowable来实现背压策略功能,即背压策略的使用
  • FlowableObservable在功能上的区别主要是 多了背压的功能
  • 下面,我将顺着第3节中讲解背压策略实现原理 & 解决方案(如下图),来讲解Flowable在背压策略功能上的使用

注:

  1. 由于第2节中提到,使用背压的场景 = 异步订阅关系,所以下文中讲解的主要是异步订阅关系场景,即 被观察者 & 观察者 工作在不同线程中
  2. 但由于在同步订阅关系的场景也可能出现流速不匹配的问题,所以在讲解异步情况后,会稍微讲解一下同步情况,以方便对比

5.1 控制 观察者接收事件 的速度

5.1.1 异步订阅情况
  • 简介

  • 具体原理图

  • 具体使用
// 1. 创建被观察者FlowableFlowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {// 一共发送4个事件Log.d(TAG, "发送事件 1");emitter.onNext(1);Log.d(TAG, "发送事件 2");emitter.onNext(2);Log.d(TAG, "发送事件 3");emitter.onNext(3);Log.d(TAG, "发送事件 4");emitter.onNext(4);Log.d(TAG, "发送完成");emitter.onComplete();}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行.subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {// 对比Observer传入的Disposable参数,Subscriber此处传入的参数 = Subscription// 相同点:Subscription参数具备Disposable参数的作用,即Disposable.dispose()切断连接, 同样的调用Subscription.cancel()切断连接// 不同点:Subscription增加了void request(long n)s.request(3);// 作用:决定观察者能够接收多少个事件// 如设置了s.request(3),这就说明观察者能够接收3个事件(多出的事件存放在缓存区)// 官方默认推荐使用Long.MAX_VALUE,即s.request(Long.MAX_VALUE);}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "接收到了事件" + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});
  • 效果图

  • 有2个结论是需要大家注意的

下图 = 当缓存区存满时(128个事件)溢出报错的原理图

  • 代码演示1:观察者不接收事件的情况下,被观察者继续发送事件 & 存放到缓存区;再按需取出
 /*** 步骤1:设置变量*/private static final String TAG = "Rxjava";private Button btn; // 该按钮用于调用Subscription.request(long n )private Subscription mSubscription; // 用于保存Subscription对象/*** 步骤2:设置点击事件 = 调用Subscription.request(long n )*/btn = (Button) findViewById(R.id.btn);btn.setOnClickListener(new View.OnClickListener() {@Overridepublic void onClick(View view) {mSubscription.request(2);}});/*** 步骤3:异步调用*/Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {Log.d(TAG, "发送事件 1");emitter.onNext(1);Log.d(TAG, "发送事件 2");emitter.onNext(2);Log.d(TAG, "发送事件 3");emitter.onNext(3);Log.d(TAG, "发送事件 4");emitter.onNext(4);Log.d(TAG, "发送完成");emitter.onComplete();}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行.subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");mSubscription = s;// 保存Subscription对象,等待点击按钮时(调用request(2))观察者再接收事件}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "接收到了事件" + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});

  • 代码演示2:观察者不接收事件的情况下,被观察者继续发送事件至超出缓存区大小(128)
Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {// 一共发送129个事件,即超出了缓存区的大小for (int i = 0;i< 129; i++) {Log.d(TAG, "发送了事件" + i);emitter.onNext(i);}emitter.onComplete();}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行.subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");// 默认不设置可接收事件大小}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "接收到了事件" + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});

5.1.2 同步订阅情况

同步订阅 & 异步订阅 的区别在于:

  • 同步订阅中,被观察者 & 观察者工作于同1线程
  • 同步订阅关系中没有缓存区

  • 被观察者在发送1个事件后,必须等待观察者接收后,才能继续发下1个事件
/*** 步骤1:创建被观察者 =  Flowable*/Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {// 发送3个事件Log.d(TAG, "发送了事件1");emitter.onNext(1);Log.d(TAG, "发送了事件2");emitter.onNext(2);Log.d(TAG, "发送了事件3");emitter.onNext(3);emitter.onComplete();}}, BackpressureStrategy.ERROR);/*** 步骤2:创建观察者 =  Subscriber*/Subscriber<Integer> downstream = new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");s.request(3);// 每次可接收事件 = 3 二次匹配}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "接收到了事件 " + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}};/*** 步骤3:建立订阅关系*/upstream.subscribe(downstream);

所以,实际上并不会出现被观察者发送事件速度 > 观察者接收事件速度的情况。可是,却会出现被观察者发送事件数量 > 观察者接收事件数量的问题。

  • 如:观察者只能接受3个事件,但被观察者却发送了4个事件,所以出现了不匹配情况
/*** 步骤1:创建被观察者 =  Flowable*/Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {// 被观察者发送事件数量 = 4个Log.d(TAG, "发送了事件1");emitter.onNext(1);Log.d(TAG, "发送了事件2");emitter.onNext(2);Log.d(TAG, "发送了事件3");emitter.onNext(3);Log.d(TAG, "发送了事件4");emitter.onNext(4);emitter.onComplete();}}, BackpressureStrategy.ERROR);/*** 步骤2:创建观察者 =  Subscriber*/Subscriber<Integer> downstream = new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");s.request(3);// 观察者接收事件 = 3个 ,即不匹配}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "接收到了事件 " + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}};/*** 步骤3:建立订阅关系*/upstream.subscribe(downstream);

所以,对于没有缓存区概念的同步订阅关系来说,单纯采用控制观察者的接收事件数量(响应式拉取)实际上就等于 “单相思”,虽然观察者控制了要接收3个事件,但假设被观察者需要发送4个事件,还是会出现问题。

在下面讲解 5.2 控制被观察者发送事件速度 时会解决这个问题。

  • 有1个特殊情况需要注意

  • 代码演示
/*** 同步情况*//*** 步骤1:创建被观察者 =  Flowable*/Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {Log.d(TAG, "发送了事件1");emitter.onNext(1);Log.d(TAG, "发送了事件2");emitter.onNext(2);Log.d(TAG, "发送了事件3");emitter.onNext(3);emitter.onComplete();}}, BackpressureStrategy.ERROR);/*** 步骤2:创建观察者 =  Subscriber*/Subscriber<Integer> downstream = new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");// 不设置request(long n)// s.request(Long.MAX_VALUE);}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: " + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}};/*** 步骤3:建立订阅关系*/upstream.subscribe(downstream);

在被观察者发送第1个事件后, 就抛出MissingBackpressureException异常 & 观察者没有收到任何事件


5.2 控制 被观察者发送事件 的速度

  • 简介

  • FlowableEmitter类的requested()介绍

public interface FlowableEmitter<T> extends Emitter<T> {
// FlowableEmitter = 1个接口,继承自Emitter
// Emitter接口方法包括:onNext(),onComplete() & onErrorlong requested();// 作用:返回当前线程中request(a)中的a值// 该request(a)则是措施1中讲解的方法,作用  = 设置....// 仅贴出关键代码}
  • 每个线程中的requested()的返回值 = 该线程中的request(a)的a值

  • 对应于同步 & 异步订阅情况 的原理图

为了方便大家理解该策略中的requested()使用,该节会先讲解同步订阅情况,再讲解异步订阅情况


5.2.1 同步订阅情况

  • 原理说明

即在同步订阅情况中,被观察者 通过 FlowableEmitter.requested()获得了观察者自身接收事件能力,从而根据该信息控制事件发送速度,从而达到了观察者反向控制被观察者的效果

  • 具体使用
    下面的例子 = 被观察者根据观察者自身接收事件能力(10个事件),从而仅发送10个事件
Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {// 调用emitter.requested()获取当前观察者需要接收的事件数量long n = emitter.requested();Log.d(TAG, "观察者可接收事件" + n);// 根据emitter.requested()的值,即当前观察者需要接收的事件数量来发送事件for (int i = 0; i < n; i++) {Log.d(TAG, "发送了事件" + i);emitter.onNext(i);}}}, BackpressureStrategy.ERROR).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");// 设置观察者每次能接受10个事件s.request(10);}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "接收到了事件" + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});

  • 特别注意
    在同步订阅情况中使用FlowableEmitter.requested()时,有以下几种使用特性需要注意的:

情况1:可叠加性

  • 即:观察者可连续要求接收事件,被观察者会进行叠加并一起发送
Subscription.request(a1);
Subscription.request(a2);FlowableEmitter.requested()的返回值 = a1 + a2
  • 代码演示
Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {// 调用emitter.requested()获取当前观察者需要接收的事件数量Log.d(TAG, "观察者可接收事件" + emitter.requested());}}, BackpressureStrategy.ERROR).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");s.request(10); // 第1次设置观察者每次能接受10个事件s.request(20); // 第2次设置观察者每次能接受20个事件}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "接收到了事件" + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});

情况2:实时更新性

  • 即,每次发送事件后,emitter.requested()会实时更新观察者能接受的事件
  1. 即一开始观察者要接收10个事件,发送了1个后,会实时更新为9个
  2. 仅计算Next事件,complete & error事件不算。

Subscription.request(10);
// FlowableEmitter.requested()的返回值 = 10FlowableEmitter.onNext(1); // 发送了1个事件
// FlowableEmitter.requested()的返回值 = 9
  • 代码演示
Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {// 1. 调用emitter.requested()获取当前观察者需要接收的事件数量Log.d(TAG, "观察者可接收事件数量 = " + emitter.requested());// 2. 每次发送事件后,emitter.requested()会实时更新观察者能接受的事件// 即一开始观察者要接收10个事件,发送了1个后,会实时更新为9个Log.d(TAG, "发送了事件 1");emitter.onNext(1);Log.d(TAG, "发送了事件1后, 还需要发送事件数量 = " + emitter.requested());Log.d(TAG, "发送了事件 2");emitter.onNext(2);Log.d(TAG, "发送事件2后, 还需要发送事件数量 = " + emitter.requested());Log.d(TAG, "发送了事件 3");emitter.onNext(3);Log.d(TAG, "发送事件3后, 还需要发送事件数量 = " + emitter.requested());emitter.onComplete();}}, BackpressureStrategy.ERROR).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");s.request(10); // 设置观察者每次能接受10个事件}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "接收到了事件" + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});

情况3:异常

  • FlowableEmitter.requested()减到0时,则代表观察者已经不可接收事件
  • 此时被观察者若继续发送事件,则会抛出MissingBackpressureException异常

如观察者可接收事件数量 = 1,当被观察者发送第2个事件时,就会抛出异常

Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {// 1. 调用emitter.requested()获取当前观察者需要接收的事件数量Log.d(TAG, "观察者可接收事件数量 = " + emitter.requested());// 2. 每次发送事件后,emitter.requested()会实时更新观察者能接受的事件// 即一开始观察者要接收10个事件,发送了1个后,会实时更新为9个Log.d(TAG, "发送了事件 1");emitter.onNext(1);Log.d(TAG, "发送了事件1后, 还需要发送事件数量 = " + emitter.requested());Log.d(TAG, "发送了事件 2");emitter.onNext(2);Log.d(TAG, "发送事件2后, 还需要发送事件数量 = " + emitter.requested());emitter.onComplete();}}, BackpressureStrategy.ERROR).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");s.request(1); // 设置观察者每次能接受1个事件}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "接收到了事件" + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});

额外

  • 若观察者没有设置可接收事件数量,即无调用Subscription.request()
  • 那么被观察者默认观察者可接收事件数量 = 0,即FlowableEmitter.requested()的返回值 = 0

5.2.2 异步订阅情况

  • 原理说明

从上面可以看出,由于二者处于不同线程,所以被观察者 无法通过 FlowableEmitter.requested()知道观察者自身接收事件能力,即 被观察者不能根据 观察者自身接收事件的能力 控制发送事件的速度。具体请看下面例子

Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {// 调用emitter.requested()获取当前观察者需要接收的事件数量Log.d(TAG, "观察者可接收事件数量 = " + emitter.requested());}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行.subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");s.request(150);// 该设置仅影响观察者线程中的requested,却不会影响的被观察者中的FlowableEmitter.requested()的返回值// 因为FlowableEmitter.requested()的返回值 取决于RxJava内部调用request(n),而该内部调用会在一开始就调用request(128)// 为什么是调用request(128)下面再讲解}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "接收到了事件" + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});

而在异步订阅关系中,反向控制的原理是:通过RxJava内部固定调用被观察者线程中的request(n) 从而 反向控制被观察者的发送事件速度

那么该什么时候调用被观察者线程中的request(n) & n 的值该是多少呢?请继续往下看。

  • 具体使用

关于RxJava内部调用request(n)(n = 128、96、0)的逻辑如下:

至于为什么是调用request(128) & request(96) & request(0),感兴趣的读者可自己阅读 Flowable的源码

  • 代码演示

下面我将用一个例子来演示该原理的逻辑

// 被观察者:一共需要发送500个事件,但真正开始发送事件的前提 = FlowableEmitter.requested()返回值 ≠ 0
// 观察者:每次接收事件数量 = 48(点击按钮)Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {Log.d(TAG, "观察者可接收事件数量 = " + emitter.requested());boolean flag; //设置标记位控制// 被观察者一共需要发送500个事件for (int i = 0; i < 500; i++) {flag = false;// 若requested() == 0则不发送while (emitter.requested() == 0) {if (!flag) {Log.d(TAG, "不再发送");flag = true;}}// requested() ≠ 0 才发送Log.d(TAG, "发送了事件" + i + ",观察者可接收事件数量 = " + emitter.requested());emitter.onNext(i);}}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行.subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");mSubscription = s;// 初始状态 = 不接收事件;通过点击按钮接收事件}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "接收到了事件" + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});// 点击按钮才会接收事件 = 48 / 次
btn = (Button) findViewById(R.id.btn);btn.setOnClickListener(new View.OnClickListener() {@Overridepublic void onClick(View view) {mSubscription.request(48);// 点击按钮 则 接收48个事件}});

整个流程 & 测试结果 请看下图


5.3 采用背压策略模式:BackpressureStrategy

5.3.1 背压模式介绍

在Flowable的使用中,会被要求传入背压模式参数

  • 面向对象:针对缓存区
  • 作用:当缓存区大小存满、被观察者仍然继续发送下1个事件时,该如何处理的策略方式

缓存区大小存满、溢出 = 发送事件速度 > 接收事件速度 的结果 = 发送 & 接收事件不匹配的结果

5.3.2 背压模式类型

下面我将对每种模式逐一说明。

模式1:BackpressureStrategy.ERROR

  • 问题:发送事件速度 > 接收事件 速度,即流速不匹配

具体表现:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时

  • 处理方式:直接抛出异常MissingBackpressureException
 // 创建被观察者FlowableFlowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {// 发送 129个事件for (int i = 0;i< 129; i++) {Log.d(TAG, "发送了事件" + i);emitter.onNext(i);}emitter.onComplete();}}, BackpressureStrategy.ERROR) // 设置背压模式 = BackpressureStrategy.ERROR.subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行.subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "接收到了事件" + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});

模式2:BackpressureStrategy.MISSING

  • 问题:发送事件速度 > 接收事件 速度,即流速不匹配

具体表现是:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时

  • 处理方式:友好提示:缓存区满了
// 创建被观察者FlowableFlowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {// 发送 129个事件for (int i = 0;i< 129; i++) {Log.d(TAG, "发送了事件" + i);emitter.onNext(i);}emitter.onComplete();}}, BackpressureStrategy.MISSING) // 设置背压模式 = BackpressureStrategy.MISSING.subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行.subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "接收到了事件" + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});

模式3:BackpressureStrategy.BUFFER

  • 问题:发送事件速度 > 接收事件 速度,即流速不匹配

具体表现是:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时

  • 处理方式:将缓存区大小设置成无限大
  1. 即 被观察者可无限发送事件 观察者,但实际上是存放在缓存区
  2. 但要注意内存情况,防止出现OOM
// 创建被观察者FlowableFlowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {// 发送 129个事件for (int i = 1;i< 130; i++) {Log.d(TAG, "发送了事件" + i);emitter.onNext(i);}emitter.onComplete();}}, BackpressureStrategy.BUFFER) // 设置背压模式 = BackpressureStrategy.BUFFER.subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行.subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "接收到了事件" + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});

可以接收超过原先缓存区大小(128)的事件数量了

模式4: BackpressureStrategy.DROP

  • 问题:发送事件速度 > 接收事件 速度,即流速不匹配

具体表现是:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时

  • 处理方式:超过缓存区大小(128)的事件丢弃

如发送了150个事件,仅保存第1 - 第128个事件,第129 -第150事件将被丢弃

        Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {// 发送150个事件for (int i = 0;i< 150; i++) {Log.d(TAG, "发送了事件" + i);emitter.onNext(i);}emitter.onComplete();}}, BackpressureStrategy.DROP)      // 设置背压模式 = BackpressureStrategy.DROP.subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行.subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");mSubscription = s;// 通过按钮进行接收事件}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "接收到了事件" + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});btn = (Button) findViewById(R.id.btn);btn.setOnClickListener(new View.OnClickListener() {@Overridepublic void onClick(View view) {mSubscription.request(128);// 每次接收128个事件}});

被观察者一下子发送了150个事件,点击按钮接收时观察者接收了128个事件;再次点击接收时却无法接受事件,这说明超过缓存区大小的事件被丢弃了。

模式5:BackpressureStrategy.LATEST

  • 问题:发送事件速度 > 接收事件 速度,即流速不匹配

具体表现是:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时

  • 处理方式:只保存最新(最后)事件,超过缓存区大小(128)的事件丢弃

即如果发送了150个事件,缓存区里会保存129个事件(第1-第128 + 第150事件)

Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {for (int i = 0;i< 150; i++) {Log.d(TAG, "发送了事件" + i);emitter.onNext(i);}emitter.onComplete();}}, BackpressureStrategy.LATEST) // // 设置背压模式 = BackpressureStrategy.LATEST.subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行.subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");mSubscription = s;// 通过按钮进行接收事件}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "接收到了事件" + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});btn = (Button) findViewById(R.id.btn);btn.setOnClickListener(new View.OnClickListener() {@Overridepublic void onClick(View view) {mSubscription.request(128);// 每次接收128个事件}});
  • 被观察者一下子发送了150个事件,点击按钮接收时观察者接收了128个事件;
  • 再次点击接收时却接收到1个事件(第150个事件),这说明超过缓存区大小的事件仅保留最后的事件(第150个事件)


5.3.3 特别注意

在使用背压策略模式的时候,有1种情况是需要注意的:

a. 背景
FLowable 可通过自己创建(如上面例子),或通过其他方式自动创建,如interval操作符

interval操作符简介

  1. 作用:每隔1段时间就产生1个数字(Long型),从0开始、1次递增1,直至无穷大
  2. 默认运行在1个新线程上
  3. 与timer操作符区别:timer操作符可结束发送

b. 冲突

  • 对于自身手动创建FLowable的情况,可通过传入背压模式参数选择背压策略
    (即上面描述的)

  • 可是对于自动创建FLowable,却无法手动传入传入背压模式参数,那么出现流速不匹配的情况下,该如何选择 背压模式呢?

// 通过interval自动创建被观察者Flowable// 每隔1ms将当前数字(从0开始)加1,并发送出去// interval操作符会默认新开1个新的工作线程Flowable.interval(1, TimeUnit.MILLISECONDS).observeOn(Schedulers.newThread()) // 观察者同样工作在一个新开线程中.subscribe(new Subscriber<Long>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");mSubscription = s;s.request(Long.MAX_VALUE); //默认可以接收Long.MAX_VALUE个事件}@Overridepublic void onNext(Long aLong) {Log.d(TAG, "onNext: " + aLong);try {Thread.sleep(1000);// 每次延时1秒再接收事件// 因为发送事件 = 延时1ms,接收事件 = 延时1s,出现了发送速度 & 接收速度不匹配的问题// 缓存区很快就存满了128个事件,从而抛出MissingBackpressureException异常,请看下图结果} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});

c. 解决方案
RxJava 2.0内部提供 封装了背压策略模式的方法

  • onBackpressureBuffer()
  • onBackpressureDrop()
  • onBackpressureLatest()

默认采用BackpressureStrategy.ERROR模式

具体使用如下:

Flowable.interval(1, TimeUnit.MILLISECONDS).onBackpressureBuffer() // 添加背压策略封装好的方法,此处选择Buffer模式,即缓存区大小无限制.observeOn(Schedulers.newThread()) .subscribe(new Subscriber<Long>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");mSubscription = s;s.request(Long.MAX_VALUE); }@Overridepublic void onNext(Long aLong) {Log.d(TAG, "onNext: " + aLong);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});

从而很好地解决了发送事件 & 接收事件 速度不匹配的问题。

其余方法的作用类似于上面的说背压模式参数,此处不作过多描述。

背压策略模式小结

  • 至此,对RxJava 2.0的背压模式终于讲解完毕
  • 所有代码Demo均存放在Carson_Ho的Github地址

6. 总结

  • 本文主要对 Rxjava 的背压模式知识进行讲解
  • Carson带你学RxJava系列文章:

入门
Carson带你学Android:这是一篇清晰易懂的Rxjava入门教程
Carson带你学Android:这是一份面向初学者的RxJava使用指南
Carson带你学Android:RxJava2.0到底更新了什么?(含使用建议)
原理
Carson带你学Android:图文解析RxJava原理
Carson带你学Android:手把手带你源码分析RxJava
使用教程(操作符)
Carson带你学Android:RxJava操作符教程
Carson带你学Android:RxJava创建操作符
Carson带你学Android:RxJava功能性操作符
Carson带你学Android:RxJava过滤操作符
Carson带你学Android:RxJava组合/合并操作符
Carson带你学Android:RxJava变换操作符
Carson带你学Android:RxJava条件/布尔操作符
应用
Carson带你学Android:什么时候应该使用Rxjava?(开发场景汇总)
Carson带你学Android:RxJava线程控制(含实例讲解)
Carson带你学Android:图文详解RxJava背压策略
Carson带你学Android:RxJava、Retrofit联合使用汇总(含实例教程)
Carson带你学Android:优雅实现网络请求嵌套回调
Carson带你学Android:网络请求轮询(有条件)
Carson带你学Android:网络请求轮询(无条件)
Carson带你学Android:网络请求出错重连(结合Retrofit)
Carson带你学Android:合并数据源
Carson带你学Android:联想搜索优化
Carson带你学Android:功能防抖
Carson带你学Android:从磁盘/内存缓存中获取缓存数据
Carson带你学Android:联合判断


欢迎关注Carson_Ho的CSDN博客 与 公众号!

博客链接:https://carsonho.blog.csdn.net/


请帮顶 / 评论点赞!因为你的鼓励是我写作的最大动力!

Carson带你学Android:图文详解RxJava背压策略相关推荐

  1. Carson带你学Android:RxJava过滤操作符

    前言 Rxjava由于其基于事件流的链式调用.逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎. 今天,我将为大家详细介绍RxJava操作符中最常用的 过滤操作符,希望你们会 ...

  2. Carson带你学Android:最全面的Webview使用详解

    前言 现在很多App里都内置了Web网页(Hyprid App),比如说很多电商平台,淘宝.京东.聚划算等等,如下图 那么这种该如何实现呢?其实这是Android里一个叫WebView的组件实现的.今 ...

  3. Carson带你学Android:关于ContentProvider的知识都在这里了!

    前言 ContentProvider属于 Android的四大组件之一 本文全面解析了 ContentProvider ,包括ContentProvider 原理.使用方法 & 实例讲解,希望 ...

  4. Carson带你学Android:你要的WebView与 JS 交互方式都在这里了

    前言 现在很多App里都内置了Web网页(Hybrid App),比如说很多电商平台,淘宝.京东.聚划算等等,如下图 上述功能是由Android的WebView实现的,其中涉及到Android客户端与 ...

  5. Carson带你学Android:源码解析自定义View Draw过程

    前言 自定义View是Android开发者必须了解的基础 网上有大量关于自定义View原理的文章,但存在一些问题:内容不全.思路不清晰.无源码分析.简单问题复杂化 等 今天,我将全面总结自定义View ...

  6. Carson带你学Android:这是一份全面详细的WebView学习攻略

    前言 现在很多App里都内置了Web网页(Hybrid App),比如说很多电商平台,淘宝.京东.聚划算等等,如下图 那么这种该如何实现呢?其实这是Android里一个叫WebView组件实现 今天, ...

  7. Carson带你学Android:请收好这一份全面详细的Android学习指南

    前言 如果你也学习Android,那么你大概率会看过我的文章.经常有读者给我留言:"该怎么学习Android?"."日常学习Android的方法是什么". 今天 ...

  8. Carson带你学Android:这是一份全面详细的属性动画学习攻略!

    前言 属性动画的使用 是 Android 开发中常用的知识 本文将献上一份全面 & 详细的属性动画学习指南,将详细介绍属性动画的所有内容,包括:意义.作用.应用场景.功原理 & 具体使 ...

  9. Carson带你学Android:手把手带你入门跨平台UI开发框架Flutter

    前言 Flutter 作为Google出品的一个新兴的跨平台移动客户端UI开发框架,正在被越来越多的开发者和组织使用,包括阿里的咸鱼.腾讯的微信等. 今天,我将献上一份 <全面 & 详细 ...

最新文章

  1. Android ListView 删除 item
  2. (Asp.Net)转载-用Powershell 建立IIS web site
  3. Android EditText
  4. 使用C#进行Word 2002和Excel 2002编程
  5. cfar恒虚警matlab实现,一种用于距离副瓣抑制的自适应恒虚警方法与流程
  6. Springboot高级特性——缓存
  7. 一程序员反应职场怪现象
  8. VsCode如何设置成中文
  9. 基于Hadoop技术进行地理空间分析
  10. Blender学习笔记(4)材质配色和打光渲染|blender
  11. 直播预告:Envoy Core Maintainer 跨洋解读 Envoy 技术(中文)
  12. 学习《论文写作》课程的收获
  13. 经纬度在线查询,地名(批量)查询经纬度,经纬度(批量)查询地名
  14. 双软企业的税收优惠政策怎么样?
  15. 2021年中国频谱分析仪行业市场规模、格局及专利情况分析:行业规模不断扩大,国外企业占据主导地位,国产企业发展空间较大[图]
  16. 如何实现表格固定表头和某列
  17. Apache DolphinScheduler 诞生记
  18. 新商用密码产品认证梳理——政策法规篇
  19. latex插入图片之后图片后面的文字跑到前面来了怎么办
  20. [FineReport]实现用户自定义查询月考勤记录

热门文章

  1. 巴西龟饲养日志----提前结束冬眠
  2. 根据Excel表头的位置数转化为对应列名称
  3. 怎么设置启用远程桌面?如何让外网电脑远程本地内网?
  4. 你做过的最有效的提高你的编程水平的一件事情是什么
  5. 一:Debian安装
  6. 基于机器学习的服装搭配问题分析
  7. 计算机视觉中的多视图几何 -- 2D射影几何与变换 --无穷远直线、虚圆点及其对偶以及恢复图像的仿射性质
  8. 经典WEB项目之宠物商店(一)
  9. 淘宝api是什么19970108019
  10. linux debian u盘安装,Debian U盘安装盘,debian安装盘