上回书我们讲了元素采样和延迟响应操作符 - 传送门

五、背压策略

背压:下游消费速度跟不上上游发送数据速度,导致阻塞或异常,进而影响上游。

reactor提供了集中背压策略

  • onBackPressureBuffer - 顾名思义,来不及消费的数据先缓存在队列里
  • onBackPressureDrop - drop,丢掉,来不及消费的数据直接扔掉
  • onBackPressureLatest - 保留最新数据,一旦下游请求出现,立即推向下游
  • onBackPressureError - 下游消费速度跟不上,直接抛异常报错
  • limitRate(n) - 限速,上游一次最多发n个

onBackPressureBuffer- Reactor默认使用的背压策略

public final Flux<T> onBackpressureBuffer(int maxSize)

弹珠图:

demo:

  1. 创建一个流1,放从1递增的100个数据
  2. 每隔10毫秒从流1订阅一个元素,形成新的响应式流2
  3. 添加背压策略onBackpressureBuffer,设置一个50个元素大小的缓存区
  4. 每隔100毫秒从流2订阅一个元素,形成新的响应式流3
  5. 订阅响应式流3元素,
    • 数据信号onNext - 打印
    • 异常信号onError - 打印
    • 完成信号onComplete - 打印
  6. 线程阻塞5秒
Flux.range(1,100).delayElements(Duration.ofMillis(10)).onBackpressureBuffer(50).delayElements(Duration.ofMillis(100)).subscribe(item -> System.out.print(item +" "),ex -> System.out.println("onError: "+ex),() -> System.out.println("onComplete"));Thread.sleep(5000);

打印结果:
订阅了8个元素就异常了,
因为缓存区设置了50个大小,不够用


画个图示意一下,更好理解:

Stream2隔10毫秒发射一个数据,Stream3隔100毫秒订阅一个数据,
Stream2欻欻欻扔了10个数据,Stream3才拿走一个,没订阅的就扔缓存队列里,
缓存50个数据满了时,Stream3才订阅了5个,
后面的数据缓存也没地了,消费速度也跟不上,直接抛异常了就
我们可以将缓存空间设置大一点就ok了

onBackPressureDrop- 来不及消费的数据我就扔掉

弹珠图:
发布者欻欻欻就是扔数据,订阅者来不及订阅的元素直接drop扔掉

demo:
步骤同上,只是背压策略换成onBackPressureDrop

 Flux.range(1,100).delayElements(Duration.ofMillis(10)).onBackpressureDrop().delayElements(Duration.ofMillis(100)).subscribe(item -> System.out.print(item+" "),ex -> System.out.println("onError: "+ex),() -> System.out.println("onComplete"));Thread.sleep(5000);

打印结果:
可以看到,没有报错,打印了onComplete,说明是正常订阅完成
但是数据只订阅了32个,后面来不及订阅的68个全部扔掉了,嘎嘎

onBackPressureLast - 也是丢弃来不及订阅的数据,但是保留最新的

弹珠图

demo:
步骤同上,只是背压策略换成onBackpressureLatest

 Flux.range(1,100).delayElements(Duration.ofMillis(10)).onBackpressureLatest().delayElements(Duration.ofMillis(100)).subscribe(item -> System.out.print(item+" "),ex -> System.out.println("onError: "+ex),() -> System.out.println("onComplete"));Thread.sleep(5000);

打印结果:
可以看到:
1、正常订阅完成
2、订阅到元素32速度就跟不上了,后面数据丢弃
3、但是保留了最新数据100,被订阅到

onBackpressureError - 下游速度跟不上、发错误信号

弹珠图:

demo:
步骤同上,只是背压策略换成onBackpressureError

 Flux.range(1,100).delayElements(Duration.ofMillis(10)).onBackpressureError().delayElements(Duration.ofMillis(100)).subscribe(item -> System.out.print(item+" "),ex -> System.out.println("onError: "+ex),() -> System.out.println("onComplete"));Thread.sleep(5000);

打印结果:
可以看到,订阅了4个元素就报错了

错误信息: 流溢出异常:接收者被比预期更多的信号给累蒙圈了,哈哈
onError: reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue…)

Panda白话 Reactor -背压策略相关推荐

  1. Reactor响应式编程系列(二)- 背压策略BackPressure

    Reactor响应式编程系列(二)- 背压策略BackPressure 一. Reactor中的背压 声明背压策略 不同的背压策略下的结果 二. request()限制请求 Reactor响应式编程系 ...

  2. RxJava之背压策略

    转载请以链接形式标明出处: 本文出自:103style的博客 本文基于 RxJava 2.x 版本 目录 RxJava背压策略简介 Observable背压导致崩溃的原因 Flowable 使用介绍 ...

  3. Carson带你学Android:图文详解RxJava背压策略

    前言 Rxjava,由于其基于事件流的链式调用.逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎. 本文主要讲解的是RxJava中的 背压控制策略,希望你们会喜欢. Cars ...

  4. 继续谈谈从Rxjava迁移到Flow的背压策略

    前言 对于背压问题不久前就讨论过了,这里就不过多介绍了,总之它是一个非常复杂的话题,本文的主要目的是分析我们如何从Rxjava迁移到Flow并且使用其背压方案,由于本身技术的限制以及协程内部的复杂性, ...

  5. Panda白话 - Synchronized底层同步机制

    Synchronized 都用过,那么它是怎么做到简单修饰一下就能做到并发场景下资源同步的呢?让我们来揭开它神秘的面纱~ JDK6以前Synchronized 只有重量级锁,很笨重,就是当一个线程t1 ...

  6. Panda白话 - G1垃圾收集器 之 Refine线程

    先上一张Panda总结图,欢迎大佬们指正: DCQ - Dirty Catd Queue DCQS - Dirty Catd Queue Set Mutator - 用户线程 RSet - Remem ...

  7. webflux系列--reactor功能

    创建一个新的Flux just 指定序列中包含的全部元素.创建出来的 Flux 序列在发布这些元素之后会自动结束. 即有限序列. public static <T> Flux<T&g ...

  8. ⒈响应式编程 Project Reactor 概述

    文章目录 What is Reactive Programming? Project Reactor & Reactive Programming 总览 Features Best pract ...

  9. rxjava背压_Android Rxjava :最简单全面背压讲解 (Flowable)

    1.前言 阅读本文需要对Rxjava了解,如果还没有了解或者使用过Rxjava的兄die们,可以观看我另外一篇 Android Rxjava:不一样的诠释进行学习. Rxjava背压:被观察者发送事件 ...

最新文章

  1. good way for university professors accumulating the citation
  2. JavaScript日期格式化处理
  3. transporter上传卡正在交付_【iOS】Xcode11使用Transporter将APP上传到App Store,卡在正在验证APP...
  4. ASP在 Web.config 中创建数据库连接字符串
  5. 首次使用物联网设备进行的 Wi-Fi 6 试用测试宣布成功
  6. linux安装mysql配置,linux安装mysql,配置mysql文件
  7. 西电计科《算法分析与设计》上机(源码+实验报告+历次作业)(渗透问题+排序算法性能比较+地图路由+文本索引)(2019级 霍红卫老师)
  8. 三角函数公式和图像大全
  9. 资深3D角色建模师对于游戏角色设计浅析:细节与风格
  10. 20-统一网关Gateway-全局过滤器
  11. kvm迁移镜像启动报错
  12. Python re模块 —— 从零基础开始入门正则表达式
  13. 边缘计算的深刻详细解读
  14. 使用adb的时候出现 adb不是内部或者外部命令如何解决
  15. SuperMap地图绘制线段
  16. 打破电动车定义,星越L Hi·P雷神增程电动SUV上市
  17. ccs一次替换一个多出使用的变量
  18. 用filebeat读不了log文件,出现“message“:“Non-zero metrics in the last 30s“
  19. 全网最详细的Android实现三方分享ShareSDK分享功能
  20. 2019年JAVA最新面试题部分

热门文章

  1. matlab锂电池p2d模型,锂电池P2D模型基础:电荷守恒
  2. 求(2Y-4)²-4(Y-2)(3Y+7)≥0得解题过程和思路
  3. Go语言环境搭建详解(2020版)
  4. 樱花动漫视频数据表分析樱花动漫
  5. Python【词云】《长恨歌》
  6. 清明节到来 祭扫各纷然
  7. 网易有道的面试也太难了吧,这都堪比我朋友三面的阿里了!
  8. 计算机 硕士 技术路线,(最新整理)硕士开题报告技术路线图
  9. 批量生成insert语句的方法(word转excel,excel用公式生成insert)
  10. 模拟转换芯片:ADC芯片参数及介绍