前言

Rxjava,由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎。

Github截图

本文主要讲解的是RxJava中的 背压控制策略,希望你们会喜欢。

本系列文章主要基于 Rxjava 2.0

接下来的时间,我将持续推出 Android中 Rxjava 2.0 的一系列文章,包括原理、操作符、应用场景、背压等等 ,有兴趣可以继续关注Carson_Ho的安卓开发笔记!!

示意图

目录

示意图

1. 引言

1.1 背景

观察者 & 被观察者 之间存在2种订阅关系:同步 & 异步。具体如下:

示意图

对于异步订阅关系,存在 被观察者发送事件速度 与观察者接收事件速度 不匹配的情况

发送 & 接收事件速度 = 单位时间内 发送&接收事件的数量

大多数情况,主要是 被观察者发送事件速度 > 观察者接收事件速度

1.2 问题

被观察者 发送事件速度太快,而观察者 来不及接收所有事件,从而导致观察者无法及时响应 / 处理所有发送过来事件的问题,最终导致缓存区溢出、事件丢失 & OOM

如,点击按钮事件:连续过快的点击按钮10次,则只会造成点击2次的效果;

解释:因为点击速度太快了,所以按钮来不及响应

下面再举个例子:

被观察者的发送事件速度 = 10ms / 个

观察者的接收事件速度 = 5s / 个

即出现发送 & 接收事件严重不匹配的问题

Observable.create(new ObservableOnSubscribe() {

// 1. 创建被观察者 & 生产事件

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

for (int i = 0; ; i++) {

Log.d(TAG, "发送了事件"+ i );

Thread.sleep(10);

// 发送事件速度:10ms / 个

emitter.onNext(i);

}

}

}).subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行

.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行

.subscribe(new Observer() {

// 2. 通过通过订阅(subscribe)连接观察者和被观察者

@Override

public void onSubscribe(Disposable d) {

Log.d(TAG, "开始采用subscribe连接");

}

@Override

public void onNext(Integer value) {

try {

// 接收事件速度:5s / 个

Thread.sleep(5000);

Log.d(TAG, "接收到了事件"+ value );

} catch (InterruptedException e) {

e.printStackTrace();

}

}

@Override

public void onError(Throwable e) {

Log.d(TAG, "对Error事件作出响应");

}

@Override

public void onComplete() {

Log.d(TAG, "对Complete事件作出响应");

}

});

结果

由于被观察者发送事件速度 > 观察者接收事件速度,所以出现流速不匹配问题,从而导致OOM

示意图

1.3 解决方案

采用 背压策略。

下面,我将开始介绍背压策略。

2. 背压策略简介

2.1 定义

一种 控制事件流速 的策略

2.2 作用

在 异步订阅关系 中,控制事件发送 & 接收的速度

注:背压的作用域 = 异步订阅关系,即 被观察者 & 观察者处在不同线程中

2.3 解决的问题

解决了 因被观察者发送事件速度 与 观察者接收事件速度 不匹配(一般是前者 快于 后者),从而导致观察者无法及时响应 / 处理所有 被观察者发送事件 的问题

2.4 应用场景

被观察者发送事件速度 与 观察者接收事件速度 不匹配的场景

具体场景就取决于 该事件的类型,如:网络请求,那么具体场景:有很多网络请求需要执行,但执行者的执行速度没那么快,此时就需要使用背压策略来进行控制。

3. 背压策略的原理

那么,RxJava实现背压策略(Backpressure)的原理是什么呢?

解决方案 & 思想主要如下:

示意图

示意图如下

示意图

与 RxJava1.0 中被观察者的旧实现 Observable 对比

示意图

好了,那么上图中在RxJava 2.0观察者模型中,Flowable到底是什么呢?它其实是RxJava 2.0中被观察者的一种新实现,同时也是背压策略实现的承载者

请继续看下一节的介绍:背压策略的具体实现 - Flowable

4. 背压策略的具体实现:Flowable

在 RxJava2.0中,采用 Flowable 实现 背压策略

正确来说,应该是 “非阻塞式背压” 策略

4.1 Flowable 介绍

定义:在 RxJava2.0中,被观察者(Observable)的一种新实现

同时,RxJava1.0 中被观察者(Observable)的旧实现: Observable依然保留

作用:实现 非阻塞式背压 策略

4.2 Flowable 特点

Flowable的特点 具体如下

示意图

下面再贴出一张RxJava2.0 与RxJava1.0的观察者模型的对比图

实际上,RxJava2.0 也有保留(被观察者)Observerble - Observer(观察者)的观察者模型,此处只是为了做出对比让读者了解

示意图

4.3 与 RxJava1.0 中被观察者的旧实现 Observable 的关系

具体如下图

示意图

那么,为什么要采用新实现Flowable实现背压,而不采用旧的Observable呢?

主要原因:旧实现Observable无法很好解决背压问题。

示意图

4.4 Flowable的基础使用

Flowable的基础使用非常类似于 Observable

具体如下

/**

* 步骤1:创建被观察者 = Flowable

*/

Flowable upstream = Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

emitter.onNext(1);

emitter.onNext(2);

emitter.onNext(3);

emitter.onComplete();

}

}, BackpressureStrategy.ERROR);

// 需要传入背压参数BackpressureStrategy,下面会详细讲解

/**

* 步骤2:创建观察者 = Subscriber

*/

Subscriber downstream = new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

// 对比Observer传入的Disposable参数,Subscriber此处传入的参数 = Subscription

// 相同点:Subscription具备Disposable参数的作用,即Disposable.dispose()切断连接, 同样的调用Subscription.cancel()切断连接

// 不同点:Subscription增加了void request(long n)

Log.d(TAG, "onSubscribe");

s.request(Long.MAX_VALUE);

// 关于request()下面会继续详细说明

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "onNext: " + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

};

/**

* 步骤3:建立订阅关系

*/

upstream.subscribe(downstream);

示意图

更加优雅的链式调用

// 步骤1:创建被观察者 = Flowable

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

Log.d(TAG, "发送事件 1");

emitter.onNext(1);

Log.d(TAG, "发送事件 2");

emitter.onNext(2);

Log.d(TAG, "发送事件 3");

emitter.onNext(3);

Log.d(TAG, "发送完成");

emitter.onComplete();

}

}, BackpressureStrategy.ERROR)

.subscribe(new Subscriber() {

// 步骤2:创建观察者 = Subscriber & 建立订阅关系

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

s.request(3);

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

至此,Flowable的基础使用讲解完

关于更深层次的使用会结合 背压策略的实现 来讲解

5. 背压策略的使用

在本节中,我将结合 背压策略的原理 & Flowable的使用,为大家介绍在RxJava 2.0 中该如何使用Flowable来实现背压策略功能,即背压策略的使用

Flowable与Observable在功能上的区别主要是 多了背压的功能

下面,我将顺着第3节中讲解背压策略实现原理 & 解决方案(如下图),来讲解Flowable在背压策略功能上的使用

示意图

注:

由于第2节中提到,使用背压的场景 = 异步订阅关系,所以下文中讲解的主要是异步订阅关系场景,即 被观察者 & 观察者 工作在不同线程中

但由于在同步订阅关系的场景也可能出现流速不匹配的问题,所以在讲解异步情况后,会稍微讲解一下同步情况,以方便对比

5.1 控制 观察者接收事件 的速度

5.1.1 异步订阅情况

简介

示意图

具体原理图

示意图

具体使用

// 1. 创建被观察者Flowable

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 一共发送4个事件

Log.d(TAG, "发送事件 1");

emitter.onNext(1);

Log.d(TAG, "发送事件 2");

emitter.onNext(2);

Log.d(TAG, "发送事件 3");

emitter.onNext(3);

Log.d(TAG, "发送事件 4");

emitter.onNext(4);

Log.d(TAG, "发送完成");

emitter.onComplete();

}

}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行

.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

// 对比Observer传入的Disposable参数,Subscriber此处传入的参数 = Subscription

// 相同点:Subscription参数具备Disposable参数的作用,即Disposable.dispose()切断连接, 同样的调用Subscription.cancel()切断连接

// 不同点:Subscription增加了void request(long n)

s.request(3);

// 作用:决定观察者能够接收多少个事件

// 如设置了s.request(3),这就说明观察者能够接收3个事件(多出的事件存放在缓存区)

// 官方默认推荐使用Long.MAX_VALUE,即s.request(Long.MAX_VALUE);

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

效果图

示意图

有2个结论是需要大家注意的

示意图

下图 = 当缓存区存满时(128个事件)溢出报错的原理图

示意图

代码演示1:观察者不接收事件的情况下,被观察者继续发送事件 & 存放到缓存区;再按需取出

/**

* 步骤1:设置变量

*/

private static final String TAG = "Rxjava";

private Button btn; // 该按钮用于调用Subscription.request(long n )

private Subscription mSubscription; // 用于保存Subscription对象

/**

* 步骤2:设置点击事件 = 调用Subscription.request(long n )

*/

btn = (Button) findViewById(R.id.btn);

btn.setOnClickListener(new View.OnClickListener() {

@Override

public void onClick(View view) {

mSubscription.request(2);

}

});

/**

* 步骤3:异步调用

*/

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

Log.d(TAG, "发送事件 1");

emitter.onNext(1);

Log.d(TAG, "发送事件 2");

emitter.onNext(2);

Log.d(TAG, "发送事件 3");

emitter.onNext(3);

Log.d(TAG, "发送事件 4");

emitter.onNext(4);

Log.d(TAG, "发送完成");

emitter.onComplete();

}

}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行

.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

mSubscription = s;

// 保存Subscription对象,等待点击按钮时(调用request(2))观察者再接收事件

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

示意图

代码演示2:观察者不接收事件的情况下,被观察者继续发送事件至超出缓存区大小(128)

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 一共发送129个事件,即超出了缓存区的大小

for (int i = 0;i< 129; i++) {

Log.d(TAG, "发送了事件" + i);

emitter.onNext(i);

}

emitter.onComplete();

}

}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行

.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

// 默认不设置可接收事件大小

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

示意图

5.1.2 同步订阅情况

同步订阅 & 异步订阅 的区别在于:

同步订阅中,被观察者 & 观察者工作于同1线程

同步订阅关系中没有缓存区

示意图

被观察者在发送1个事件后,必须等待观察者接收后,才能继续发下1个事件

/**

* 步骤1:创建被观察者 = Flowable

*/

Flowable upstream = Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 发送3个事件

Log.d(TAG, "发送了事件1");

emitter.onNext(1);

Log.d(TAG, "发送了事件2");

emitter.onNext(2);

Log.d(TAG, "发送了事件3");

emitter.onNext(3);

emitter.onComplete();

}

}, BackpressureStrategy.ERROR);

/**

* 步骤2:创建观察者 = Subscriber

*/

Subscriber downstream = new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

s.request(3);

// 每次可接收事件 = 3 二次匹配

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件 " + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

};

/**

* 步骤3:建立订阅关系

*/

upstream.subscribe(downstream);

示意图

所以,实际上并不会出现被观察者发送事件速度 > 观察者接收事件速度的情况。可是,却会出现被观察者发送事件数量 > 观察者接收事件数量的问题。

如:观察者只能接受3个事件,但被观察者却发送了4个事件,所以出现了不匹配情况

/**

* 步骤1:创建被观察者 = Flowable

*/

Flowable upstream = Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 被观察者发送事件数量 = 4个

Log.d(TAG, "发送了事件1");

emitter.onNext(1);

Log.d(TAG, "发送了事件2");

emitter.onNext(2);

Log.d(TAG, "发送了事件3");

emitter.onNext(3);

Log.d(TAG, "发送了事件4");

emitter.onNext(4);

emitter.onComplete();

}

}, BackpressureStrategy.ERROR);

/**

* 步骤2:创建观察者 = Subscriber

*/

Subscriber downstream = new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

s.request(3);

// 观察者接收事件 = 3个 ,即不匹配

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件 " + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

};

/**

* 步骤3:建立订阅关系

*/

upstream.subscribe(downstream);

示意图

所以,对于没有缓存区概念的同步订阅关系来说,单纯采用控制观察者的接收事件数量(响应式拉取)实际上就等于 “单相思”,虽然观察者控制了要接收3个事件,但假设被观察者需要发送4个事件,还是会出现问题。

在下面讲解 5.2 控制被观察者发送事件速度 时会解决这个问题。

有1个特殊情况需要注意

示意图

代码演示

/**

* 同步情况

*/

/**

* 步骤1:创建被观察者 = Flowable

*/

Flowable upstream = Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

Log.d(TAG, "发送了事件1");

emitter.onNext(1);

Log.d(TAG, "发送了事件2");

emitter.onNext(2);

Log.d(TAG, "发送了事件3");

emitter.onNext(3);

emitter.onComplete();

}

}, BackpressureStrategy.ERROR);

/**

* 步骤2:创建观察者 = Subscriber

*/

Subscriber downstream = new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

// 不设置request(long n)

// s.request(Long.MAX_VALUE);

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "onNext: " + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

};

/**

* 步骤3:建立订阅关系

*/

upstream.subscribe(downstream);

在被观察者发送第1个事件后, 就抛出MissingBackpressureException异常 & 观察者没有收到任何事件

示意图

5.2 控制 被观察者发送事件 的速度

简介

示意图

FlowableEmitter类的requested()介绍

public interface FlowableEmitter extends Emitter {

// FlowableEmitter = 1个接口,继承自Emitter

// Emitter接口方法包括:onNext(),onComplete() & onError

long requested();

// 作用:返回当前线程中request(a)中的a值

// 该request(a)则是措施1中讲解的方法,作用 = 设置

....// 仅贴出关键代码

}

每个线程中的requested()的返回值 = 该线程中的request(a)的a值

对应于同步 & 异步订阅情况 的原理图

示意图

为了方便大家理解该策略中的requested()使用,该节会先讲解同步订阅情况,再讲解异步订阅情况

5.2.1 同步订阅情况

原理说明

示意图

即在同步订阅情况中,被观察者 通过 FlowableEmitter.requested()获得了观察者自身接收事件能力,从而根据该信息控制事件发送速度,从而达到了观察者反向控制被观察者的效果

具体使用

下面的例子 = 被观察者根据观察者自身接收事件能力(10个事件),从而仅发送10个事件

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 调用emitter.requested()获取当前观察者需要接收的事件数量

long n = emitter.requested();

Log.d(TAG, "观察者可接收事件" + n);

// 根据emitter.requested()的值,即当前观察者需要接收的事件数量来发送事件

for (int i = 0; i < n; i++) {

Log.d(TAG, "发送了事件" + i);

emitter.onNext(i);

}

}

}, BackpressureStrategy.ERROR)

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

// 设置观察者每次能接受10个事件

s.request(10);

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

示意图

特别注意

在同步订阅情况中使用FlowableEmitter.requested()时,有以下几种使用特性需要注意的:

示意图

情况1:可叠加性

即:观察者可连续要求接收事件,被观察者会进行叠加并一起发送

Subscription.request(a1);

Subscription.request(a2);

FlowableEmitter.requested()的返回值 = a1 + a2

代码演示

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 调用emitter.requested()获取当前观察者需要接收的事件数量

Log.d(TAG, "观察者可接收事件" + emitter.requested());

}

}, BackpressureStrategy.ERROR)

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

s.request(10); // 第1次设置观察者每次能接受10个事件

s.request(20); // 第2次设置观察者每次能接受20个事件

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

示意图

情况2:实时更新性

即,每次发送事件后,emitter.requested()会实时更新观察者能接受的事件

即一开始观察者要接收10个事件,发送了1个后,会实时更新为9个

仅计算Next事件,complete & error事件不算。

Subscription.request(10);

// FlowableEmitter.requested()的返回值 = 10

FlowableEmitter.onNext(1); // 发送了1个事件

// FlowableEmitter.requested()的返回值 = 9

代码演示

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 1. 调用emitter.requested()获取当前观察者需要接收的事件数量

Log.d(TAG, "观察者可接收事件数量 = " + emitter.requested());

// 2. 每次发送事件后,emitter.requested()会实时更新观察者能接受的事件

// 即一开始观察者要接收10个事件,发送了1个后,会实时更新为9个

Log.d(TAG, "发送了事件 1");

emitter.onNext(1);

Log.d(TAG, "发送了事件1后, 还需要发送事件数量 = " + emitter.requested());

Log.d(TAG, "发送了事件 2");

emitter.onNext(2);

Log.d(TAG, "发送事件2后, 还需要发送事件数量 = " + emitter.requested());

Log.d(TAG, "发送了事件 3");

emitter.onNext(3);

Log.d(TAG, "发送事件3后, 还需要发送事件数量 = " + emitter.requested());

emitter.onComplete();

}

}, BackpressureStrategy.ERROR)

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

s.request(10); // 设置观察者每次能接受10个事件

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

示意图

情况3:异常

当FlowableEmitter.requested()减到0时,则代表观察者已经不可接收事件

此时被观察者若继续发送事件,则会抛出MissingBackpressureException异常

如观察者可接收事件数量 = 1,当被观察者发送第2个事件时,就会抛出异常

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 1. 调用emitter.requested()获取当前观察者需要接收的事件数量

Log.d(TAG, "观察者可接收事件数量 = " + emitter.requested());

// 2. 每次发送事件后,emitter.requested()会实时更新观察者能接受的事件

// 即一开始观察者要接收10个事件,发送了1个后,会实时更新为9个

Log.d(TAG, "发送了事件 1");

emitter.onNext(1);

Log.d(TAG, "发送了事件1后, 还需要发送事件数量 = " + emitter.requested());

Log.d(TAG, "发送了事件 2");

emitter.onNext(2);

Log.d(TAG, "发送事件2后, 还需要发送事件数量 = " + emitter.requested());

emitter.onComplete();

}

}, BackpressureStrategy.ERROR)

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

s.request(1); // 设置观察者每次能接受1个事件

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

示意图

额外

若观察者没有设置可接收事件数量,即无调用Subscription.request()

那么被观察者默认观察者可接收事件数量 = 0,即FlowableEmitter.requested()的返回值 = 0

5.2.2 异步订阅情况

原理说明

示意图

从上面可以看出,由于二者处于不同线程,所以被观察者 无法通过 FlowableEmitter.requested()知道观察者自身接收事件能力,即 被观察者不能根据 观察者自身接收事件的能力 控制发送事件的速度。具体请看下面例子

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 调用emitter.requested()获取当前观察者需要接收的事件数量

Log.d(TAG, "观察者可接收事件数量 = " + emitter.requested());

}

}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行

.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

s.request(150);

// 该设置仅影响观察者线程中的requested,却不会影响的被观察者中的FlowableEmitter.requested()的返回值

// 因为FlowableEmitter.requested()的返回值 取决于RxJava内部调用request(n),而该内部调用会在一开始就调用request(128)

// 为什么是调用request(128)下面再讲解

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

示意图

而在异步订阅关系中,反向控制的原理是:通过RxJava内部固定调用被观察者线程中的request(n) 从而 反向控制被观察者的发送事件速度

那么该什么时候调用被观察者线程中的request(n) & n 的值该是多少呢?请继续往下看。

具体使用

关于RxJava内部调用request(n)(n = 128、96、0)的逻辑如下:

示意图

至于为什么是调用request(128) & request(96) & request(0),感兴趣的读者可自己阅读 Flowable的源码

代码演示

下面我将用一个例子来演示该原理的逻辑

// 被观察者:一共需要发送500个事件,但真正开始发送事件的前提 = FlowableEmitter.requested()返回值 ≠ 0

// 观察者:每次接收事件数量 = 48(点击按钮)

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

Log.d(TAG, "观察者可接收事件数量 = " + emitter.requested());

boolean flag; //设置标记位控制

// 被观察者一共需要发送500个事件

for (int i = 0; i < 500; i++) {

flag = false;

// 若requested() == 0则不发送

while (emitter.requested() == 0) {

if (!flag) {

Log.d(TAG, "不再发送");

flag = true;

}

}

// requested() ≠ 0 才发送

Log.d(TAG, "发送了事件" + i + ",观察者可接收事件数量 = " + emitter.requested());

emitter.onNext(i);

}

}

}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行

.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

mSubscription = s;

// 初始状态 = 不接收事件;通过点击按钮接收事件

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

// 点击按钮才会接收事件 = 48 / 次

btn = (Button) findViewById(R.id.btn);

btn.setOnClickListener(new View.OnClickListener() {

@Override

public void onClick(View view) {

mSubscription.request(48);

// 点击按钮 则 接收48个事件

}

});

整个流程 & 测试结果 请看下图

示意图

5.3 采用背压策略模式:BackpressureStrategy

5.3.1 背压模式介绍

在Flowable的使用中,会被要求传入背压模式参数

示意图

面向对象:针对缓存区

作用:当缓存区大小存满、被观察者仍然继续发送下1个事件时,该如何处理的策略方式

缓存区大小存满、溢出 = 发送事件速度 > 接收事件速度 的结果 = 发送 & 接收事件不匹配的结果

5.3.2 背压模式类型

示意图

下面我将对每种模式逐一说明。

模式1:BackpressureStrategy.ERROR

问题:发送事件速度 > 接收事件 速度,即流速不匹配

具体表现:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时

处理方式:直接抛出异常MissingBackpressureException

// 创建被观察者Flowable

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 发送 129个事件

for (int i = 0;i< 129; i++) {

Log.d(TAG, "发送了事件" + i);

emitter.onNext(i);

}

emitter.onComplete();

}

}, BackpressureStrategy.ERROR) // 设置背压模式 = BackpressureStrategy.ERROR

.subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行

.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

示意图

模式2:BackpressureStrategy.MISSING

问题:发送事件速度 > 接收事件 速度,即流速不匹配

具体表现是:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时

处理方式:友好提示:缓存区满了

// 创建被观察者Flowable

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 发送 129个事件

for (int i = 0;i< 129; i++) {

Log.d(TAG, "发送了事件" + i);

emitter.onNext(i);

}

emitter.onComplete();

}

}, BackpressureStrategy.MISSING) // 设置背压模式 = BackpressureStrategy.MISSING

.subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行

.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

示意图

模式3:BackpressureStrategy.BUFFER

问题:发送事件速度 > 接收事件 速度,即流速不匹配

具体表现是:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时

处理方式:将缓存区大小设置成无限大

即 被观察者可无限发送事件 观察者,但实际上是存放在缓存区

但要注意内存情况,防止出现OOM

// 创建被观察者Flowable

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 发送 129个事件

for (int i = 1;i< 130; i++) {

Log.d(TAG, "发送了事件" + i);

emitter.onNext(i);

}

emitter.onComplete();

}

}, BackpressureStrategy.BUFFER) // 设置背压模式 = BackpressureStrategy.BUFFER

.subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行

.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

可以接收超过原先缓存区大小(128)的事件数量了

示意图

模式4: BackpressureStrategy.DROP

问题:发送事件速度 > 接收事件 速度,即流速不匹配

具体表现是:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时

处理方式:超过缓存区大小(128)的事件丢弃

如发送了150个事件,仅保存第1 - 第128个事件,第129 -第150事件将被丢弃

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 发送150个事件

for (int i = 0;i< 150; i++) {

Log.d(TAG, "发送了事件" + i);

emitter.onNext(i);

}

emitter.onComplete();

}

}, BackpressureStrategy.DROP) // 设置背压模式 = BackpressureStrategy.DROP

.subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行

.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

mSubscription = s;

// 通过按钮进行接收事件

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

btn = (Button) findViewById(R.id.btn);

btn.setOnClickListener(new View.OnClickListener() {

@Override

public void onClick(View view) {

mSubscription.request(128);

// 每次接收128个事件

}

});

被观察者一下子发送了150个事件,点击按钮接收时观察者接收了128个事件;再次点击接收时却无法接受事件,这说明超过缓存区大小的事件被丢弃了。

示意图

模式5:BackpressureStrategy.LATEST

问题:发送事件速度 > 接收事件 速度,即流速不匹配

具体表现是:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时

处理方式:只保存最新(最后)事件,超过缓存区大小(128)的事件丢弃

即如果发送了150个事件,缓存区里会保存129个事件(第1-第128 + 第150事件)

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

for (int i = 0;i< 150; i++) {

Log.d(TAG, "发送了事件" + i);

emitter.onNext(i);

}

emitter.onComplete();

}

}, BackpressureStrategy.LATEST) // // 设置背压模式 = BackpressureStrategy.LATEST

.subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行

.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

mSubscription = s;

// 通过按钮进行接收事件

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

btn = (Button) findViewById(R.id.btn);

btn.setOnClickListener(new View.OnClickListener() {

@Override

public void onClick(View view) {

mSubscription.request(128);

// 每次接收128个事件

}

});

被观察者一下子发送了150个事件,点击按钮接收时观察者接收了128个事件;

再次点击接收时却接收到1个事件(第150个事件),这说明超过缓存区大小的事件仅保留最后的事件(第150个事件)

示意图

5.3.3 特别注意

在使用背压策略模式的时候,有1种情况是需要注意的:

a. 背景

FLowable 可通过自己创建(如上面例子),或通过其他方式自动创建,如interval操作符

interval操作符简介

作用:每隔1段时间就产生1个数字(Long型),从0开始、1次递增1,直至无穷大

默认运行在1个新线程上

与timer操作符区别:timer操作符可结束发送

b. 冲突

对于自身手动创建FLowable的情况,可通过传入背压模式参数选择背压策略

(即上面描述的)

可是对于自动创建FLowable,却无法手动传入传入背压模式参数,那么出现流速不匹配的情况下,该如何选择 背压模式呢?

// 通过interval自动创建被观察者Flowable

// 每隔1ms将当前数字(从0开始)加1,并发送出去

// interval操作符会默认新开1个新的工作线程

Flowable.interval(1, TimeUnit.MILLISECONDS)

.observeOn(Schedulers.newThread()) // 观察者同样工作在一个新开线程中

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

mSubscription = s;

s.request(Long.MAX_VALUE); //默认可以接收Long.MAX_VALUE个事件

}

@Override

public void onNext(Long aLong) {

Log.d(TAG, "onNext: " + aLong);

try {

Thread.sleep(1000);

// 每次延时1秒再接收事件

// 因为发送事件 = 延时1ms,接收事件 = 延时1s,出现了发送速度 & 接收速度不匹配的问题

// 缓存区很快就存满了128个事件,从而抛出MissingBackpressureException异常,请看下图结果

} catch (InterruptedException e) {

e.printStackTrace();

}

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

示意图

c. 解决方案

RxJava 2.0内部提供 封装了背压策略模式的方法

onBackpressureBuffer()

onBackpressureDrop()

onBackpressureLatest()

默认采用BackpressureStrategy.ERROR模式

具体使用如下:

Flowable.interval(1, TimeUnit.MILLISECONDS)

.onBackpressureBuffer() // 添加背压策略封装好的方法,此处选择Buffer模式,即缓存区大小无限制

.observeOn(Schedulers.newThread())

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

mSubscription = s;

s.request(Long.MAX_VALUE);

}

@Override

public void onNext(Long aLong) {

Log.d(TAG, "onNext: " + aLong);

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

从而很好地解决了发送事件 & 接收事件 速度不匹配的问题。

封装方法的示意图.gif

其余方法的作用类似于上面的说背压模式参数,此处不作过多描述。

背压策略模式小结

示意图

至此,对RxJava 2.0的背压模式终于讲解完毕

6. 总结

本文主要对 Rxjava 的背压模式知识进行讲解

接下来的时间,我将持续推出 Android中 Rxjava 2.0 的一系列文章,包括原理、操作符、应用场景、背压等等

示意图

感兴趣的同学可以继续关注本人运营的Wechat Public Account:

请点赞!因为你的鼓励是我写作的最大动力!

不定期分享关于安卓开发的干货,追求短、平、快,但却不缺深度。

rxjava背压_Android RxJava :图文详解 背压策略相关推荐

  1. Carson带你学Android:图文详解RxJava背压策略

    前言 Rxjava,由于其基于事件流的链式调用.逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎. 本文主要讲解的是RxJava中的 背压控制策略,希望你们会喜欢. Cars ...

  2. 广联达2018模板算量步骤_老师傅带你学造价,广联达GTJ2018图文详解,小白也能学会的软件...

    在GTJ2018问世之前,土建造价人员有三个软件是必会的,一个是GGJ主打钢筋算量,一个是GCL主打土建算量,还有一个是GBQ主要是套定额用来计价的软件. 那时候如果计算一个工程的工程量,首先要用GG ...

  3. 计算机刷新的作用,图文详解Win8重置和刷新功能:超强自我治愈

    直接自愈,Windows8出故障之后,伴随着重置和刷新两大新功能,世上无难事了啊.微软Windows8团队今日在官方博客详细向用户解释Win8的重置和刷新PC功能,将可一键复位系统到最佳状态.视频演示 ...

  4. 计算机网络管理的常用命令,网络管理常用命令图文详解.pdf

    网络工程师必备 – 网络管理常用命令图文详解 网络工程师必备 网络管理常用命令 图文详解 V1.0 V1.0 包含 ping.ipconfig.netstat.nbtstat.tracert. pat ...

  5. 全网最全的Windows下Anaconda2 / Anaconda3里Python语言实现定时发送微信消息给好友或群里(图文详解)...

    不多说,直接上干货! 缘由: (1)最近看到情侣零点送祝福,感觉还是很浪漫的事情,相信有很多人熬夜为了给爱的人送上零点祝福,但是有时等着等着就睡着了或者时间并不是卡的那么准就有点强迫症了,这是也许程序 ...

  6. SSL之CA证书颁发机构安装图文详解

    上一节我们说到,在验证公钥安全性时,是在CA机构颁发的包含用户的公钥及其身份信息的数字证书,数字证书由权威机构--CA签发.这个CA权威机构可以是自己的服务器也可以是国际公认的CA权威机构.下面我就来 ...

  7. spark最新源码下载并导入到开发环境下助推高质量代码(Scala IDEA for Eclipse和IntelliJ IDEA皆适用)(以spark2.2.0源码包为例)(图文详解)...

    不多说,直接上干货! 前言   其实啊,无论你是初学者还是具备了有一定spark编程经验,都需要对spark源码足够重视起来. 本人,肺腑之己见,想要成为大数据的大牛和顶尖专家,多结合源码和操练编程. ...

  8. x264代码剖析(一):图文详解x264在Windows平台上的搭建

    x264代码剖析(一):图文详解x264在Windows平台上的搭建 X264源码下载地址:http://ftp.videolan.org/pub/videolan/x264/ 平台:win7 PC. ...

  9. 电脑连接电视方法详解_查看电脑配置的几种方法(图文详解)

    很多朋友想要了解自己电脑详细的配置的时候,一般都是通过第三方的工具检测的.那么有没有其他更好的方法可以在win系统下查看电脑配置呢?今天我就给大家分享一下如何查看电脑配置. 查看电脑配置的几种方法图文 ...

  10. Git学习系列之Git基本操作提交项目(图文详解)

    前面博客 Git学习系列之Git基本操作克隆项目(图文详解) 然后可以 cd 切换到 LispGentleIntro 目录, 新增或者修改某些文件.这里只是模拟一下操作, 实际情况可能是 使用 Ecl ...

最新文章

  1. springboot 成员变量_springboot系列之04-提高开发效率必备工具lombok
  2. 疫苗有效,先抛股票,今日官宣94.5%结果的Moderna高管早有信心
  3. html学习文档-3、HTML元素
  4. php生成唯一的加密串,hashids.php-master整数生成唯一字符串的加密库
  5. [BTS06]BizTalk2006 SDK阅读笔记(一) 角色
  6. link rel=canonical 用法
  7. UE3 移动设备分析
  8. Leetcode每日一题:203.remove-linked-list-elements(移除链表元素)
  9. gps nmea数据格式解析与生成
  10. Leetcode 每日一题——845. 数组中的最长山脉
  11. ubuntu18.04使用网易云音乐 ubuntu网易云音乐打不开怎么办? ubuntu安装网易云音乐
  12. Elasticsearch:InteliJ Elasticsearch plugin 集成
  13. 【tensorflow学习】Ftrl学习
  14. Windows下查看MySQL安装的版本
  15. js-视频播放插件Video.js简单使用
  16. 求助!网站重构需要帮手(前端)
  17. 规范化、标准化、归一化、正则化
  18. idea 打开项目所有java类变成咖啡图标 打开类上面是0110图标
  19. Linux之uevent与ueventd区别
  20. 嵩天python语言程序设计Python123

热门文章

  1. cadence 提示lic找不到怎么办
  2. 华为HCNA技术配置小型公司网络
  3. catch小说内容-从gui到爬虫(2)
  4. Hadoop大数据解决方案
  5. 双11购书大优惠!独家优惠券,折后再减,赶紧来抢啊!
  6. 求关系模式的候选码的方法
  7. “毕竟,你胜利了......敬胜利者一杯。”
  8. OneZero第一次会议(非正式)
  9. 印刷电路板丝网设计的十大技巧
  10. linux下pdb文件除水,blast+本地化中blastp操作(基于PDB库)—linux