1.前言

阅读本文需要对Rxjava了解,如果还没有了解或者使用过Rxjava的兄die们,可以观看我另外一篇

Android Rxjava:不一样的诠释进行学习。

Rxjava背压:被观察者发送事件的速度大于观察者接收事件的速度时,观察者内会创建一个无限制大少的缓冲池存储未接收的事件,因此当存储的事件越来越多时就会导致OOM的出现。(注:当subscribeOn与observeOn不为同一个线程时,被观察者与观察者内存在不同时长耗时任务,就会使发送与接收速度存在差异。)

背压例子

public void backpressureSample(){

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter e) throws Exception {

int i = 0;

while(true){

Thread.sleep(500);

i++;

e.onNext(i);

Log.i(TAG,"每500ms发送一次数据:"+i);

}

}

}).subscribeOn(Schedulers.newThread())//使被观察者存在独立的线程执行

.observeOn(Schedulers.newThread())//使观察者存在独立的线程执行

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Exception {

Thread.sleep(5000);

Log.e(TAG,"每5000m接收一次数据:"+integer);

}

});

}

例子执行效果

上述代码执行效果

backpressure.png

通过上述例子可以大概了解背压是如何产生,因此Rxjava2.0版本提供了 Flowable 解决背压问题。

本文章就是使用与分析 Flowable 是如何解决背压问题。

2.目录

目录.png

3.简介

简介.png

_______________________________________________________________________________

4.使用与原理详解

4.1 Flowable 与 Observable 的区别

flowable与observable对比.png

上图可以很清楚看出二者的区别,其实Flowable 出来以上的区别之外,它其他所有使用与Observable完全一样。

Flowable 的create例子

public void flowable(){

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter e) throws Exception {

for(int j = 0;j<=150;j++){

e.onNext(j);

Log.i(TAG," 发送数据:"+j);

try{

Thread.sleep(50);

}catch (Exception ex){

}

}

}

},BackpressureStrategy.ERROR)

.subscribeOn(Schedulers.newThread())

.observeOn(Schedulers.newThread())

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

s.request(Long.MAX_VALUE); //观察者设置接收事件的数量,如果不设置接收不到事件

}

@Override

public void onNext(Integer integer) {

try {

Thread.sleep(100);

} catch (InterruptedException e) {

e.printStackTrace();

}

Log.e(TAG,"onNext : "+(integer));

}

@Override

public void onError(Throwable t) {

Log.e(TAG,"onError : "+t.toString());

}

@Override

public void onComplete() {

Log.e(TAG,"onComplete");

}

});

}

4.2 BackpressureStrategy媒体类

从Flowable源码查看,缓存池默认大少为:128

public abstract class Flowable implements Publisher {

/** The default buffer size. */

static final int BUFFER_SIZE;

static {

BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));

}

.....

}

通过上面的例子,我们可以看到create方法中的包含了一个BackpressureStrategy媒体类,其包含5种类型:

4.2.1. ERROR

把上面例子改为ERROR类型,执行结果如下:

error.png

总结 :当被观察者发送事件大于128时,观察者抛出异常并终止接收事件,但不会影响被观察者继续发送事件。

4.2.2. BUFFER

把上面例子改为BUFFER类型,执行结果如下:

buffer.gif

总结 :与Observable一样存在背压问题,但是接收性能比Observable低,因为BUFFER类型通过BufferAsyncEmitter添加了额外的逻辑处理,再发送至观察者。

4.2.3. DROP

把上面例子改为DROP类型,执行结果如下:

drop.png

总结 :每当观察者接收128事件之后,就会丢弃部分事件。

4.2.4. LATEST

把上面例子改为LATEST类型,执行结果如下:

laster.gif

总结 :LATEST与DROP使用效果一样,但LATEST会保证能接收最后一个事件,而DROP则不会保证。

4.2.5. MISSING

把上面例子改为MISSING类型,执行结果如下:

buffer.gif

总结 :MISSING就是没有采取背压策略的类型,效果跟Obserable一样。

在设置MISSING类型时,可以配合onBackPressure相关操作符使用,也可以到达上述其他类型的处理效果。

4.3 onBackPressure相关操作符

使用例子:

Flowable.interval(50,TimeUnit.MILLISECONDS)

.onBackpressureDrop()//效果与Drop类型一样

.subscribeOn(Schedulers.newThread())

.observeOn(Schedulers.newThread())

.subscribe(new Consumer() {

@Override

public void accept(Long aLong) throws Exception {

try {

Thread.sleep(100);

} catch (InterruptedException e) {

e.printStackTrace();

}

Log.e(TAG,"onNext : "+(aLong));

}

});

onBackpressureBuffer :与BUFFER类型一样效果。

onBackpressureDrop :与DROP类型一样效果。

onBackpressureLaster :与LASTER类型一样效果。

4.4 request()

4.4.1 request(int count):设置接收事件的数量.

例子:

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter e) throws Exception {

for(int j = 0;j<50;j++){

e.onNext(j);

Log.i(TAG," 发送数据:"+j);

try{

Thread.sleep(50);

}catch (Exception ex){

}

}

}

},BackpressureStrategy.BUFFER)

.subscribeOn(Schedulers.newThread())

.observeOn(Schedulers.newThread())

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

s.request(10); //观察者设置接收事件的数量,如果不设置接收不到事件

}

@Override

public void onNext(Integer integer) {

try {

Thread.sleep(100);

} catch (InterruptedException e) {

e.printStackTrace();

}

Log.e(TAG,"onNext : "+(integer));

}

@Override

public void onError(Throwable t) {

Log.e(TAG,"onError : "+t.toString());

}

@Override

public void onComplete() {

Log.e(TAG,"onComplete");

}

});

request.png

4.4.2 request扩展使用

request还可进行扩展使用,当遇到在接收事件时想追加接收数量(如:通信数据通过几次接收,验证准确性的应用场景),可以通过以下方式进行扩展:

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter e) throws Exception {

for(int j = 0;j<50;j++){

e.onNext(j);

Log.i(TAG," 发送数据:"+j);

try{

Thread.sleep(50);

}catch (Exception ex){

}

}

}

},BackpressureStrategy.BUFFER)

.subscribeOn(Schedulers.newThread())

.observeOn(Schedulers.newThread())

.subscribe(new Subscriber() {

private Subscription subscription;

@Override

public void onSubscribe(Subscription s) {

subscription = s;

s.request(10); //观察者设置接收事件的数量,如果不设置接收不到事件

}

@Override

public void onNext(Integer integer) {

if(integer==5){

subscription.request(3);

}

try {

Thread.sleep(100);

} catch (InterruptedException e) {

e.printStackTrace();

}

Log.e(TAG,"onNext : "+(integer));

}

@Override

public void onError(Throwable t) {

Log.e(TAG,"onError : "+t.toString());

}

@Override

public void onComplete() {

Log.e(TAG,"onComplete");

}

});

request扩展.png

总结:可以动态设置观察者接收事件的数量,但不影响被观察者继续发送事件。

4.5 requested

requested 与 request不是同一的函数,但它们都是属于FlowableEmitter类里的方法,那么requested()是有什么作用呢,看看以下例子:

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter e) throws Exception {

for(int j = 0;j<15;j++){

e.onNext(j);

Log.i(TAG,e.requested()+" 发送数据:"+j);

try{

Thread.sleep(50);

}catch (Exception ex){

}

}

}

},BackpressureStrategy.BUFFER)

// .subscribeOn(Schedulers.newThread())

// .observeOn(Schedulers.newThread())

.subscribe(new Subscriber() {

private Subscription subscription;

@Override

public void onSubscribe(Subscription s) {

subscription = s;

s.request(10); //观察者设置接收事件的数量,如果不设置接收不到事件

}

@Override

public void onNext(Integer integer) {

try {

Thread.sleep(100);

} catch (InterruptedException e) {

e.printStackTrace();

}

Log.e(TAG,"onNext : "+(integer));

}

@Override

public void onError(Throwable t) {

Log.e(TAG,"onError : "+t.toString());

}

@Override

public void onComplete() {

Log.e(TAG,"onComplete");

}

});

requested.png

从图中我们可以发现,requested打印的结果就是 剩余可接收的数量 ,它的作用就是可以检测剩余可接收的事件数量。

5.总结

到此,Flowable讲解完毕。

如果喜欢我的分享,可以点击 关注 或者 赞,你们支持是我分享的最大动力 。

不定期分享关于安卓开发的干货。

写技术文章初心

技术知识积累

技术知识巩固

技术知识分享

技术知识交流

rxjava背压_Android Rxjava :最简单全面背压讲解 (Flowable)相关推荐

  1. android RxJava(RxAndroid)的简单使用

    今天,简单讲讲android里如何使用RxJava(RxAndroid). Android框架系列: 一.android EventBus的简单使用 二.android Glide简单使用 三.and ...

  2. OkHttpUtils-2.0.0 升级后改名 OkGo,全新完美支持 RxJava,比 Retrofit 更简单易用。

    okhttp-OkGo 项目地址:jeasonlzy/okhttp-OkGo 简介:OkHttpUtils-2.0.0 升级后改名 OkGo,全新完美支持 RxJava,比 Retrofit 更简单易 ...

  3. 动脑学院 java_动脑学院Rxjava预习资料 Rxjava入门

    前言 Rxjava由于其基于事件流的链式调用.逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎. Github截图 本文主要: 面向 刚接触Rxjava的初学者 提供了一份 ...

  4. 路由器RIP简单配置和讲解

    路由器RIP简单配置和讲解 1.RIP特点 2.RIP简单配置 pc配置 三层交换机基本配置 路由器Router0 基本配置 路由器Router2 基本配置 RIP协议配置 三层交换机RIP协议 Ro ...

  5. Kali Linux 2021.2在VMware和VirtualBox安装教程 超简单 步骤详细讲解

    Kali Linux 2021.2在VMware和VirtualBox安装教程 超简单 步骤详细讲解 一. 资源下载及工作站安装 二. 使用VMware平台安装 三. 使用VirtualBox平台安装 ...

  6. java 轮询请求接口_Android RxJava 实际应用讲解:(无条件)网络请求轮询

    前言 Rxjava,由于其基于事件流的链式调用.逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎. Github截图 RxJava如此受欢迎的原因,在于其提供了丰富 & ...

  7. java监听网络连接_Android RxJava 之网络链接监听示范

    RxJava在stream events处理上真的是一个利器.下面的示范代码显示如何用它来监听android设备的网络连接状况,实时接收change信息. IntentFilter filter = ...

  8. 一起来造一个RxJava,揭秘RxJava的实现原理

    RxJava是一个神奇的框架,用法很简单,但内部实现有点复杂,代码逻辑有点绕.我读源码时,确实有点似懂非懂的感觉.网上关于RxJava源码分析的文章,源码贴了一大堆,代码逻辑绕来绕去的,让人看得云里雾 ...

  9. android rxjava 过滤,解剖 RxJava 之过滤操作符

    介绍 此文章结合 Github AnalyseRxJava 项目,给 Android 开发者带来 RxJava 详细的解说.参考自 RxJava Essential 及书中的例子 关于 RxJava ...

最新文章

  1. 小程序的事件处理参数不能取得
  2. python字符串写入excel-python 将数据写入excel
  3. Day 27: Restify —— 在Node.js中构建正确的REST Web服务
  4. C++ Primer 5th笔记(chap 14 重载运算和类型转换)函数匹配与重载运算符
  5. java final 方法重载_java方法重载和覆写的定义,static和final修饰符的讲解,java面试题...
  6. 一款精美的漂亮的EMLOG模板
  7. 执行一次怎么会写入两次数据_Java进阶知识:一文详解缓存Redis的持久化机制,新手看完也会用
  8. vs2012如何为进行单元测试
  9. 嵌入式Linux内核,文件系统的制作
  10. 深度学习图片卷积输出大小计算公式
  11. JDK笔记-IO流读写
  12. android带人脸识别码,Android自带的人脸识别
  13. ​【预测模型】基于粒子群算法优化最小二乘支持向量机实现数据分类matlab代码
  14. item在C语言中是什么意思中文,单词item中文表达的是什么意思
  15. 【Bioinfo Blog 011】【R Code 008】——功能富集分析
  16. centos7安装大数据平台
  17. flush=true 的含义
  18. 【操作系统】Linux内核和Windows系统的内核有什么区别?
  19. Python邮件发送SMATP模块详细总结(含qq邮箱及163邮箱服务开启及授权码获取,多附件发送)
  20. NETPLIER : 一款基于概率的网络协议逆向工具(一)理论

热门文章

  1. Asp.Net MVC 自定义登录过滤器
  2. 个人作业1——四则运算题目生成程序(基于java)
  3. 2016年第14本:毅力----如何培养自律的习惯(漫画版)
  4. iOS自动布局之autoresizingi
  5. 【PAT】B1004 成绩排名
  6. Java业务代理模式~
  7. 互动中国分享: 15例HTML5酷站欣赏
  8. 【Java从0到架构师】Maven - 依赖冲突、分模块构建项目
  9. 520晚上,我用python破解了前女友的加密文件,结果却发现。。。
  10. [转]netstat 输出内容详解,TCP链接握手对应state