Reactor响应式编程系列(二)- 背压策略BackPressure

  • 一. Reactor中的背压
    • 声明背压策略
    • 不同的背压策略下的结果
  • 二. request()限制请求

Reactor响应式编程系列导航

我在上一篇文章中有涉及到背压策略,有一个案例中,我将消费的代码特意进行睡眠1秒的操作,但是依旧能将所有的数据最后都获取到,这是由于其默认的背压策略是Buffer,也就是将下游来不及消费的数据进行缓存,那么最终下游总是能把所有数据都接收到。 而写的下游向上游的request()操作根本看不出来有什么效果,这时候就会觉得难搞哦~


那么这篇文章就开始讲讲背压策略。那么首先肯定是从概念来讲起。

一. Reactor中的背压

我们知道一个前提:即Reactor是响应式编程框架,也就是从项目的角度来出发,后端数据发生变化,发出响应,主动推给前端进行更新。而背压——BackPressure就是为了控制上游向下游推送数据的一个流量,避免上游数据流动太快,导致下游处理不过来的情况发生。

背压策略的相关定义在类FluxSink中的内部静态枚举类OverflowStrategy中:

public static enum OverflowStrategy {IGNORE,ERROR,DROP,LATEST,BUFFER;private OverflowStrategy() {}
}

详细来说就是:

  • IGNORE:完全忽略下游背压请求。当下游队列充满时会导致IllegalStateException。
  • ERROR:在下游无法跟上时发出错误信号IllegalStateException。
  • DROP:如果下游尚未准备好接收信号,则丢弃该信号。
  • LATEST:让下游只从上游获取最新信号。
  • BUFFER:(默认值)以在下游无法跟上时缓冲所有信号。(这会实现无限缓冲,并可能导致OutOfMemoryError)。

声明背压策略

我们知道,有两种生成序列的方法是用于异步的:

  • create()
  • push()

他们支持背压操作,以create()方法为例,其方法接口如下:

// 在不带第二个参数的情况下,默认的背压是Buffer策略
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) {return create(emitter, OverflowStrategy.BUFFER);
}public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure) {return onAssembly((Flux)(new FluxCreate(emitter, backpressure, CreateMode.PUSH_PULL)));
}

案例如下(参考 享学IT 的博客):

public class Test3 {private final int EVENT_DURATION = 10;    // 生成的事件间隔时间,单位毫秒private final int EVENT_COUNT = 10;    // 生成的事件个数private final int PROCESS_DURATION = 30;    // 订阅者处理每个元素的时间,单位毫秒// 一个事件源,用于创建多个事件,我们用上面的两个参数控制了事件创建的频率// 通过Flux.create来转换成一个个Flux数据流。private Flux<EventSource.Event> fastPublisher;// 慢的订阅者,会继承BaseSubscriber,重写对应的方法private SlowSubscriber slowSubscriber;private EventSource eventSource;private CountDownLatch countDownLatch;private Flux<EventSource.Event> createFlux(FluxSink.OverflowStrategy strategy) {// 事件源注册了一个监听器,负责监听新事件的创建以及事件源的停止return Flux.create(sink -> eventSource.register(new MyListener() {@Overridepublic void newEvent(EventSource.Event event) {System.out.println("上游------>数据源创建了新事件:" + event.getMsg());sink.next(event);}@Overridepublic void eventSourceStopped() {sink.complete();}}), strategy); // 别忘了这里还有个背压策略的参数}// 用来往数据源中添加事件用的,并通过定义的变量来控制频率,但是依旧发布的很快private void generateEvent(int times, int millis) {// 循环生成MyEvent,每个MyEvent间隔millis毫秒for (int i = 0; i < times; i++) {try {TimeUnit.MILLISECONDS.sleep(millis);} catch (InterruptedException e) {}eventSource.newEvent(new EventSource.Event(new Date(), "Event-" + i));}eventSource.eventStopped();}// 进行一些初始化操作@Beforepublic void setup() {countDownLatch = new CountDownLatch(1);slowSubscriber = new SlowSubscriber();eventSource = new EventSource();}/*** 触发订阅,使用CountDownLatch等待订阅者处理完成。*/@Afterpublic void subscribe() throws InterruptedException {// 也就是Flux.subscribe(subscriber)fastPublisher.subscribe(slowSubscriber);// 创建数据源generateEvent(EVENT_COUNT, EVENT_DURATION);countDownLatch.await(1, TimeUnit.MINUTES);}@Testpublic void testCreateBackPressureStratety() {fastPublisher =createFlux(FluxSink.OverflowStrategy.DROP)// 请求发生的时候,打印内容.doOnRequest(n -> System.out.println("下游------>向上游请求" + n + "个数据"))// 将任务运行于一个单线程上,并且指定订阅者每次向上游request的个数。默认是256// 因为一般情况下,create是一个多线程的方法,发布者和订阅者不在同一个线程上.publishOn(Schedulers.newSingle("newSingle"), 1);}class SlowSubscriber extends BaseSubscriber<EventSource.Event> {@Overrideprotected void hookOnSubscribe(Subscription subscription) {// 订阅时请求1个数据request(1);}@Overrideprotected void hookOnNext(EventSource.Event event) {System.out.println("线程" + Thread.currentThread().getName() + "接收数据:" + event.getMsg());try {TimeUnit.MILLISECONDS.sleep(PROCESS_DURATION);} catch (InterruptedException e) {}// 每处理完1个数据,就再请求1个request(1);}@Overrideprotected void hookOnComplete() {countDownLatch.countDown();}}
}

运行结果如下:

什么意思呢?就是:

  1. 在上游,也就是发布者,以10ms的频率(代码中定义的)去生成一个事件,并添加到事件源中。
  2. 事件源由于注册了监听器,因此监听到对应的事件,并将每个事件当做Flux序列中的一个元素(sink.next(event))。
  3. 而下游,也就是订阅者,订阅了发布者的数据源,想要去获取他的数据。而处理数据的过程需要耗费30ms(代码中定义的)。
  4. 而我代码用的背压策略是Drop,也就是下游来不及处理的数据会进行丢弃,因此会接收最近刚发布的数据,也就获得了上面的结果。
  5. 而在代码中添加一个CountDownLatch,是为了让事件源创建好指定数量的事件后,关闭sink,也就是需要调用Complete()方法,否则这个代码会一直跑下去。(不信大家可以把代码中最后面的countDownLatch.countDown();这个给注释掉再跑一下)

其实我一开始并不是很懂这个程序的逻辑,但是我在这篇文章里尽量把注释都写了上去,并且将流程摆了出来,希望大家能够理解。
同时我还是注明1点:

  • 在代码里用到了doOnRequest()这个方法,这个是干嘛的呢?

Publisher使用subscribe()方法的时候,Subscriber触发回触发一系列的on方法,如onSubscribe()为了更好的监控以及观测异步序列的传递情况,设置了一系列的doOn方法,在触发on方法的时候作behavior的副作用发生用于监控行为的运行情况。

不同的背压策略下的结果

上面的案例使用的是Drop策略,那如果改为Buffer策略,也就是默认的情况下会怎样呢?

  • 可以发现一开始下游的处理速度<上游创建事件速度。
  • 但是最后下游还是能够将上游的所有事件全部接收完毕。

背压策略改为:ERROR


背压策略改为:LATEST

结果类似于DROP,但是却有本质的不同,大家需要仔细观察顺序:

  • DROP策略下,来不及处理的数据全部扔掉了,因此下游获取的数据时候可能会等待,直到上游刚推出一个新的数据。
  • 而LATEST策略下,来不及处理的数据是不会扔的,因此下游获取的数据是目前Flux序列中最新的一个元素。

所以结果上来看:

  • DROP策略下:肯定是先有上游数据的创建,再有下游数据的接收,即我还需要等待上游发出数据。
  • LATEST策略下:在Flux序列不是空的情况下,输出和上游是否创建数据无关,即我尽管自己输出即可。

背压策略改为:IGNORE

可见队列满了就直接抛异常了,忽略所谓的背压策略。

二. request()限制请求

我觉得有必要让request()这个方法的作用彻底的显现出来,因此又补了这个模块给大家,我相信大家看了这个就更能明白所谓的背压了。

废话不多说,上代码:

@Test
public void () {Flux<Integer> flux = Flux.range(1, 10).log();flux.subscribe(new BaseSubscriber<Integer>() {private int count = 0;private final int requestCount = 4;@Overrideprotected void hookOnSubscribe(Subscription subscription) {request(requestCount);}@SneakyThrows@Overrideprotected void hookOnNext(Integer value) {count++;// 通过count控制每次request两个元素if (count == requestCount) {Thread.sleep(1000);request(requestCount);count = 0;}}});
}

来看看结果(每个输出停顿1秒,大家可以自己跑下,延长下睡眠的时间):

如果我将变量requestCount改为2,看看结果咋样:结果会是2个2个输出

这样一来是不是觉得对request()这个方法更加清晰了呢?

接下来准备好好学习下Reactor中的几个Operator操作符,其实操作符很多地方是通用的,比如Stream、Spark、Flink等等,都有这些filter、map啥的API。那我们就在下一篇博客见喽~

Reactor响应式编程系列(二)- 背压策略BackPressure相关推荐

  1. Reactor响应式编程

    Reactor响应式编程 介绍响应式编程 响应式编程(reactive programming)是一种基于数据流(data stream)和变化传递(propagation of change)的声明 ...

  2. Reactor响应式编程 之 简介

    1 reactor 出现的背景.初衷和要达到什么样的目标 Reactor 项目始于 2012 年. 经过长时间的内部孵化,于 2013 年发布 Reactor 1.x 版本. Reactor 1 在各 ...

  3. java reactor 响应式编程

    Spring framework 5 的一大新特性:响应式编程(Reactive Programming). 响应式编程就是基于reactor的思想,当你做一个带有一定延迟的才能够返回的io操作时,不 ...

  4. 使用Reactor响应式编程

    介绍 响应式编程 响应式编程不同于我们熟悉的命令式编程,我们熟悉的命令式编程即代码就是一行接一行的指令,按照它们的顺序一次一条地出现.一个任务被执行,程序就需要等到它执行完了,才能执行下一个任务.每一 ...

  5. SpringBoot2.0响应式编程系列(一)-导读

  6. Spring笔记(4):响应式编程、Reactor、WebFlux、Flow

    目录 1.Spring Webflux 介绍 2.响应式编程(Java 实现) 3.响应式编程(Reactor 实现) 4.SpringWebflux 执行流程和核心 API 5.SpringWebf ...

  7. RXJava2响应式编程框架设计三---Rxjava2背压、生命周期

    在上一次https://www.cnblogs.com/webor2006/p/12348890.html中已经完成了对RxJava2的整个线程切换原理的详细剖析了,这次继续来学习它其它比较重要的知识 ...

  8. 响应式圣经:10W字,实现Spring响应式编程自由

    前言 全链路异步化改造的基础是响应式编程 随着业务的发展,微服务应用的流量越来越大,使用到的资源也越来越多. 在微服务架构下,大量的应用都是 SpringCloud 分布式架构,这种架构总体上是全链路 ...

  9. 赠书:响应式编程到底是什么?

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 最近几年,随着Go.Node 等新语言.新技术的出现,J ...

最新文章

  1. 苹果小圆点怎么关闭_苹果手机连按2下屏幕,就能自动截图,不知道的来学一学...
  2. 禁掉人脸识别!一群音乐人正在号召,禁止在音乐节上动用人脸识别
  3. iconfont图标_小程序使用 Iconfont 的正确姿势
  4. C#中关键字ref与out的区别(转)
  5. Tomcat 7 Connector 精读(1)
  6. crossplatform---Nodejs in Visual Studio Code 01.简单介绍Nodejs
  7. POJ 1195 Mobile phones(裸的二维树状数组)
  8. POJ - 3347 Kadj Squares(思维+几何)
  9. import lombok 报错_Android上使用Lombok和set、get方法告别
  10. 阿里云神龙团队拿下TPCx-BB排名第一的背后技术
  11. 双核CPU揭密:英特尔/AMD没有告诉你的五项事实【ZZ】
  12. 运算除法的计算机函数,2、Python基础--除法、常用数学函数(示例代码)
  13. 解决cacti创建ping主机时不出图的问题
  14. 使用UTL_SMTP包发送邮件
  15. centos 搭建nfs服务器
  16. PS使用:windows解决Adobe Photoshop 2020(PS2020)闪退
  17. java案例代码10--购物车案例--重要
  18. 封装条形码MaHelper
  19. VM虚拟机 自定义(高级)安装win 7系统史上超详细图文教程(附win7 iso镜像资源)
  20. 电流速断保护c语言程序,[单选] 电流电压联锁速断保护与过电流保护十分相似,只是取消了()继电器。...

热门文章

  1. 算法训练 Cowboys(DP)
  2. 传统企业触网,打造完美用户体验是关键
  3. 单例模式深入浅出---详细注释
  4. 精确计算时,不要使用float或double
  5. python爬取数据时报错:`aiohttp.client_exceptions.ClientConnectorCertificateError: Cannot connect to host sea
  6. 《LeetCode》数据结构入门板块
  7. 怎么制作搞笑的GIF
  8. PHP图片表情制作生成源码
  9. 基于GEC6818的智能家居系统[完整源码/项目报告/笔记分享]
  10. android锁屏快捷键设置,【Android高级】锁屏功能简单实现