rxjava背压_Rx系列第十八篇:RxJava之背压策略
(1)背压的存在背景
默认情况下,上游是在主线程执行的,那么下游也必然在主线程中运行,比如:
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
for (int i=0;;i++){
e.onNext(String.valueOf(i));
}
}
})
.subscribe(new Consumer() {
@Override
public void accept(String s) throws Exception {
Thread.sleep(3000);
Log.d("aaa", String.valueOf(s));
}
});
当使用subscribeOn来控制上游线程时,比如:
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
for (int i=0;;i++){
e.onNext(String.valueOf(i));
Log.d("aaa", "==========="+Thread.currentThread().getName());
}
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Consumer() {
@Override
public void accept(String s) throws Exception {
Thread.sleep(3000);
Log.d("aaa", String.valueOf(s)+"----"+Thread.currentThread().getName());
}
});
subscribeOn将上游的线程切换到IO线程,那么下游也自然而然在IO线程执行。
以上两种情况(没有控制线程或者subscribeOn控制上游线程),当上游发送一个数据之后,等到下游接收到数据之后上游才能继续发送数据,这样也就不会发生异常。
当我们使用observeOn时,上游和下游的执行就会独自运行了,即使如以下代码:
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
for (int i=0;;i++){
e.onNext(String.valueOf(i));
Log.d("aaa", "==========="+Thread.currentThread().getName());
}
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(String s) throws Exception {
Thread.sleep(3000);
Log.d("aaa", String.valueOf(s)+"----"+Thread.currentThread().getName());
}
});
以上代码上游和下游都是运行在主线程,但是经过测试,只要使用了observeOn控制了下游的线程,那么第二次发送数据就不需要等到下游接收到数据之后才能发送了。也就是说,只要使用observeOn,上游和下游就会分别独自运行。
大部分情况,发送数据比较快,接收数据相对比较慢,也就是说:
发送的数据个数 > 接收数据的个数
当下游来不及处理上游发送的数据时,这些发送的数据会存放在一个缓存区,当缓存区越来越大时,会发生OOM的现象,日志请看下图
图片.png
我们来看一下内存情况
38.gif
由于缓存越来越大,导致内存泄漏非常严重,等缓存大到一定程度就会发生OOM。
为了解决这样的问题,出现了背压策略。
(2)Flowable
在RxJava2中,采用Flowable来处理背压问题,Flowable的效率要比Observable低,所以最好当需要处理背压问题时再使用Flowable。
先贴一下代码实现
Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter e) throws Exception {
for(int i=0;i<129;i++){
e.onNext(String.valueOf(i));
Log.d("aaa","已发送数据:"+i);
}
e.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
//设置最多可接受数据的数量
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String s) {
Log.d("aaa", s);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
Log.d("aaa", "t:"+t.getMessage());
}
@Override
public void onComplete() {
Log.d("aaa", "===完成===");
}
});
前面讲到的上游和下游分别是Observable和Observer, 以上代码上游变成了Flowable,下游变成了Subscriber,上游和下游由subscribe() 来连通。
代码中有两点比较重要
request
s.request(Long.MAX_VALUE);
这句话的意思是说,设置最大接收数据的数量,这里设置Long.MAX_VALUE就可以了。
BackpressureStrategy
Flowable.create的第二个参数就是策略常量。
(3)BackpressureStrategy
先看一下源码
/**
* Represents the options for applying backpressure to a source sequence.
*/
public enum BackpressureStrategy {
/**
* OnNext events are written without any buffering or dropping.
* Downstream has to deal with any overflow.
*
Useful when one applies one of the custom-parameter onBackpressureXXX operators.
*/
MISSING,
/**
* Signals a MissingBackpressureException in case the downstream can't keep up.
*/
ERROR,
/**
* Buffers all onNext values until the downstream consumes it.
*/
BUFFER,
/**
* Drops the most recent onNext value if the downstream can't keep up.
*/
DROP,
/**
* Keeps only the latest onNext value, overwriting any previous value if the
* downstream can't keep up.
*/
LATEST
}
在RxJava2中,给我们提供了5种背压策略
BackpressureStrategy.MISSING
OnNext事件是在不进行任何缓冲或删除的情况下写入的下游必须处理任何溢出
Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter e) throws Exception {
for(int i=0;;i++){
e.onNext(String.valueOf(i));
}
}
}, BackpressureStrategy.MISSING)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
//设置最多可接受数据的数量
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String s) {
Log.d("aaa", s);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
Log.d("aaa", "t:"+t.getMessage());
}
@Override
public void onComplete() {
Log.d("aaa", "===完成===");
}
});
日志显示,只要缓存队列满了就会抛出MissingBackpressureException异常,下游消费的数量是随机的。
图片.png
图片.png
BackpressureStrategy.MISSING策略本身不会产生多余缓存。
BackpressureStrategy.ERROR
如果下游无法跟上上游发送的速度,则会发出反向压力异常信号。
图片.png
解决这个问题,还是限制一下发送数据的速度为好。
BackpressureStrategy.BUFFER
缓存上游发送的数据,直到下游消费为止。
Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter e) throws Exception {
for(int i=0;;i++){
e.onNext(String.valueOf(i));
}
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
//设置最多可接受数据的数量
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String s) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.d("aaa", s);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
Log.d("aaa", "t:"+t.getMessage());
}
@Override
public void onComplete() {
Log.d("aaa", "===完成===");
}
});
这种策略依然会发生OOM,消耗的内存比Observable慢。
BackpressureStrategy.DROP
这个策略存在一个长度为128大小的缓存区,当缓存区满时下游则不再接收数据,等到缓存区清理的时候才可以再次接收数据。
BackpressureStrategy.LATEST
只保留最新的onnext值,如果下游无法跟上,则覆盖任何以前的值。
与Drop策略一样,如果缓存池满了,会丢掉将要放入缓存池中的数据,不同的是,不管缓存池的状态如何,LATEST都会将最后一条数据强行放入缓存池中,来保证观察者在接收到完成通知之前,能够接收到Flowable最新发射的一条数据。
rxjava背压_Rx系列第十八篇:RxJava之背压策略相关推荐
- “睡服”面试官系列第十八篇之generator函数的语法(建议收藏学习)
目录 1简介 1.1基本概念 1.2yield 表达式 1.3与 Iterator 接口的关系 2. next 方法的参数 3. for...of 循环 4. Generator.prototype. ...
- 【SQL开发实战技巧】系列(十八):数据仓库中时间类型操作(进阶)INTERVAL、EXTRACT以及如何确定一年是否为闰年及周的计算
系列文章目录 [SQL开发实战技巧]系列(一):关于SQL不得不说的那些事 [SQL开发实战技巧]系列(二):简单单表查询 [SQL开发实战技巧]系列(三):SQL排序的那些事 [SQL开发实战技巧] ...
- Go入门系列(十八) 反射、包和测试工具
本系列文章目录 展开/收起 Go入门系列(一) 初识Go语言 Go入门系列(二) 变量.指针.数据类型简介和作用域 Go入门系列(三) 基础类型--整型.浮点型.布尔类型和字符串 Go入门系列(四) ...
- Reflex WMS入门系列二十八:空白标签打印
Reflex WMS入门系列二十八:空白标签打印 贴在托盘上的标签,因托盘上的货物的移动,使用等缘故可能会导致标签丢失.在很多场景下又需要扫描托盘标签,所以Reflex WMS系统提供了打印空白标签的 ...
- Debezium报错处理系列之三十八:Timeout expired while fetching topic metadata
Debezium报错处理系列之三十八:'trace': 'org.apache.kafka.common.errors.TimeoutException: Timeout expired while ...
- 2021年安全生产工作总结及2022年思路计划(二十八篇)PPTX(附下载)
摘要:2021年安全生产工作总结及2022年思路计划(二十八篇) 公众号:安全生产星球
- Oracle数据库从入门到精通系列之十八:Oracle进程
Oracle数据库从入门到精通系列之十八:Oracle进程 一.Oracle进程 二.服务器进程server process 三.后台进程background process 四.从属进程(slave ...
- ComicEnhancerPro 系列教程十八:JPG文件长度与质量
作者:马健 邮箱:stronghorse_mj@hotmail.com 主页:http://www.comicer.com/stronghorse/ 发布:2017.07.23 教程十八:JPG文件长 ...
- 算法系列之十八:用天文方法计算二十四节气(上) .
二十四节气在中国古代历法中扮演着非常重要的角色,本文将介绍二十四节气的基本知识,以及如何使用VSOP82/87行星运行理论计算二十四节气发生的准确时间. 中国古代历法都是以月亮运行规律为主,严格按照朔 ...
- 算法系列之十八:用天文方法计算二十四节气(上)
二十四节气在中国古代历法中扮演着非常重要的角色,本文将介绍二十四节气的基本知识,以及如何使用VSOP82/87行星运行理论计算二十四节气发生的准确时间. 中国古代历法都是以月亮运行规律为主,严格按照朔 ...
最新文章
- 分布式与集群是一回事儿么?别让这么简单的问题难住你!
- 用户管理和su,id 命令
- Bash scripts
- python asyncio_Python 的异步 IO:Asyncio 简介
- python爬虫教程(一)
- cat /proc/meminfo 各字段详解
- 四十七、Ansible自动化入门
- docker-machine create -d generic 运行的波折过程及遇见的问题
- java udp 接收不定长_JAVA UDP通信为什么只能接收一次数据,我想要时刻接收数据,并更新UI,大神们帮我看看程序吧?...
- 看懂 IPv6+,这篇就够了
- 深入学习heritrix---体系结构(Overview of the crawler)
- Makefile中的ifeq 多条件使用 ***
- 安卓帧数监测软件_手机帧数测试软件-手机fps帧数显示软件1.6 免root版-东坡下载...
- powerdesigner 导出mysql 库,自动生成ER图
- C/C++程序设计与算法第十一周:零点定理求方程的根
- 吃饭 睡觉 打豆豆!!!
- 整数乘法的计算机方法,太实用了!小学数学四则运算技巧及简便方法
- Errors were encountered while processing(Ubuntu系统报错)
- 《SysML精粹》学习记录--第一章
- cavium CN71XX芯片 GSER Interface总结
热门文章
- 历史论文比赛TCR介绍
- 如何在CSS中解决长英文单词的页面显示问题?CSS3
- 极简主义、人工智能与Readhub的产品哲学
- 用Credential Harvester Attack Method登录人人网
- 苹果快捷键怎么调出来_原来还有这么好用的CAD快捷键,文末附赠快捷键鼠标垫!留言走起...
- Coursera | Applied Plotting, Charting Data Representation in Python(UMich)| Assignment3
- NB-IOT相关的术语 SGW、PGW、LTE、RRC、E-UTRAN、EPC
- 回顾2020,谈谈“拥抱变化”的新理解
- 鼠鼠百科——普适计算
- 十大api接口平台(接口商)