Panda白话 Reactor -背压策略
上回书我们讲了元素采样和延迟响应操作符 - 传送门
五、背压策略
背压:下游消费速度跟不上上游发送数据速度,导致阻塞或异常,进而影响上游。
reactor提供了集中背压策略
- onBackPressureBuffer - 顾名思义,来不及消费的数据先缓存在队列里
- onBackPressureDrop - drop,丢掉,来不及消费的数据直接扔掉
- onBackPressureLatest - 保留最新数据,一旦下游请求出现,立即推向下游
- onBackPressureError - 下游消费速度跟不上,直接抛异常报错
- limitRate(n) - 限速,上游一次最多发n个
onBackPressureBuffer- Reactor默认使用的背压策略
public final Flux<T> onBackpressureBuffer(int maxSize)
弹珠图:
demo:
- 创建一个流1,放从1递增的100个数据
- 每隔10毫秒从流1订阅一个元素,形成新的响应式流2
- 添加背压策略onBackpressureBuffer,设置一个50个元素大小的缓存区
- 每隔100毫秒从流2订阅一个元素,形成新的响应式流3
- 订阅响应式流3元素,
- 数据信号onNext - 打印
- 异常信号onError - 打印
- 完成信号onComplete - 打印
- 线程阻塞5秒
Flux.range(1,100).delayElements(Duration.ofMillis(10)).onBackpressureBuffer(50).delayElements(Duration.ofMillis(100)).subscribe(item -> System.out.print(item +" "),ex -> System.out.println("onError: "+ex),() -> System.out.println("onComplete"));Thread.sleep(5000);
打印结果:
订阅了8个元素就异常了,
因为缓存区设置了50个大小,不够用
画个图示意一下,更好理解:
Stream2隔10毫秒发射一个数据,Stream3隔100毫秒订阅一个数据,
Stream2欻欻欻扔了10个数据,Stream3才拿走一个,没订阅的就扔缓存队列里,
缓存50个数据满了时,Stream3才订阅了5个,
后面的数据缓存也没地了,消费速度也跟不上,直接抛异常了就
我们可以将缓存空间设置大一点就ok了
onBackPressureDrop- 来不及消费的数据我就扔掉
弹珠图:
发布者欻欻欻就是扔数据,订阅者来不及订阅的元素直接drop扔掉
demo:
步骤同上,只是背压策略换成onBackPressureDrop
Flux.range(1,100).delayElements(Duration.ofMillis(10)).onBackpressureDrop().delayElements(Duration.ofMillis(100)).subscribe(item -> System.out.print(item+" "),ex -> System.out.println("onError: "+ex),() -> System.out.println("onComplete"));Thread.sleep(5000);
打印结果:
可以看到,没有报错,打印了onComplete,说明是正常订阅完成
但是数据只订阅了32个,后面来不及订阅的68个全部扔掉了,嘎嘎
onBackPressureLast - 也是丢弃来不及订阅的数据,但是保留最新的
弹珠图
demo:
步骤同上,只是背压策略换成onBackpressureLatest
Flux.range(1,100).delayElements(Duration.ofMillis(10)).onBackpressureLatest().delayElements(Duration.ofMillis(100)).subscribe(item -> System.out.print(item+" "),ex -> System.out.println("onError: "+ex),() -> System.out.println("onComplete"));Thread.sleep(5000);
打印结果:
可以看到:
1、正常订阅完成
2、订阅到元素32速度就跟不上了,后面数据丢弃
3、但是保留了最新数据100,被订阅到
onBackpressureError - 下游速度跟不上、发错误信号
弹珠图:
demo:
步骤同上,只是背压策略换成onBackpressureError
Flux.range(1,100).delayElements(Duration.ofMillis(10)).onBackpressureError().delayElements(Duration.ofMillis(100)).subscribe(item -> System.out.print(item+" "),ex -> System.out.println("onError: "+ex),() -> System.out.println("onComplete"));Thread.sleep(5000);
打印结果:
可以看到,订阅了4个元素就报错了
错误信息: 流溢出异常:接收者被比预期更多的信号给累蒙圈了,哈哈
onError: reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue…)
Panda白话 Reactor -背压策略相关推荐
- Reactor响应式编程系列(二)- 背压策略BackPressure
Reactor响应式编程系列(二)- 背压策略BackPressure 一. Reactor中的背压 声明背压策略 不同的背压策略下的结果 二. request()限制请求 Reactor响应式编程系 ...
- RxJava之背压策略
转载请以链接形式标明出处: 本文出自:103style的博客 本文基于 RxJava 2.x 版本 目录 RxJava背压策略简介 Observable背压导致崩溃的原因 Flowable 使用介绍 ...
- Carson带你学Android:图文详解RxJava背压策略
前言 Rxjava,由于其基于事件流的链式调用.逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎. 本文主要讲解的是RxJava中的 背压控制策略,希望你们会喜欢. Cars ...
- 继续谈谈从Rxjava迁移到Flow的背压策略
前言 对于背压问题不久前就讨论过了,这里就不过多介绍了,总之它是一个非常复杂的话题,本文的主要目的是分析我们如何从Rxjava迁移到Flow并且使用其背压方案,由于本身技术的限制以及协程内部的复杂性, ...
- Panda白话 - Synchronized底层同步机制
Synchronized 都用过,那么它是怎么做到简单修饰一下就能做到并发场景下资源同步的呢?让我们来揭开它神秘的面纱~ JDK6以前Synchronized 只有重量级锁,很笨重,就是当一个线程t1 ...
- Panda白话 - G1垃圾收集器 之 Refine线程
先上一张Panda总结图,欢迎大佬们指正: DCQ - Dirty Catd Queue DCQS - Dirty Catd Queue Set Mutator - 用户线程 RSet - Remem ...
- webflux系列--reactor功能
创建一个新的Flux just 指定序列中包含的全部元素.创建出来的 Flux 序列在发布这些元素之后会自动结束. 即有限序列. public static <T> Flux<T&g ...
- ⒈响应式编程 Project Reactor 概述
文章目录 What is Reactive Programming? Project Reactor & Reactive Programming 总览 Features Best pract ...
- rxjava背压_Android Rxjava :最简单全面背压讲解 (Flowable)
1.前言 阅读本文需要对Rxjava了解,如果还没有了解或者使用过Rxjava的兄die们,可以观看我另外一篇 Android Rxjava:不一样的诠释进行学习. Rxjava背压:被观察者发送事件 ...
最新文章
- good way for university professors accumulating the citation
- JavaScript日期格式化处理
- transporter上传卡正在交付_【iOS】Xcode11使用Transporter将APP上传到App Store,卡在正在验证APP...
- ASP在 Web.config 中创建数据库连接字符串
- 首次使用物联网设备进行的 Wi-Fi 6 试用测试宣布成功
- linux安装mysql配置,linux安装mysql,配置mysql文件
- 西电计科《算法分析与设计》上机(源码+实验报告+历次作业)(渗透问题+排序算法性能比较+地图路由+文本索引)(2019级 霍红卫老师)
- 三角函数公式和图像大全
- 资深3D角色建模师对于游戏角色设计浅析:细节与风格
- 20-统一网关Gateway-全局过滤器
- kvm迁移镜像启动报错
- Python re模块 —— 从零基础开始入门正则表达式
- 边缘计算的深刻详细解读
- 使用adb的时候出现 adb不是内部或者外部命令如何解决
- SuperMap地图绘制线段
- 打破电动车定义,星越L Hi·P雷神增程电动SUV上市
- ccs一次替换一个多出使用的变量
- 用filebeat读不了log文件,出现“message“:“Non-zero metrics in the last 30s“
- 全网最详细的Android实现三方分享ShareSDK分享功能
- 2019年JAVA最新面试题部分