文章目录

  • What is Reactive Programming?
  • Project Reactor & Reactive Programming
    • 总览
      • Features
      • Best practices
      • 代价
    • 测试用例
      • 提高吞吐量
      • Back pressure (背压)
  • Conclusion
  • Reference

What is Reactive Programming?

响应式编程 (Reactive Programming) 是一种流行的编程方法, 编写代码是基于对变化的反应. 它的灵感来自日常生活.

在执行日常生活活动时, 我们会尽可能多任务. 但大脑无法处理多任务, 不管我们如何努力去做. 我们人类实现多任务的唯一办法是在时间线上在任务之间切换. 事实上, 我们总是切换任务, 即使我们没有意识到它. 例如, 要执行一个任务: 在星巴克喝一杯咖啡饮料, 你需要发出一个命令, 等待准备好, 然后接受你的饮料. 当你在等待的时候, 你很可能会找到别的事情做. 这是最简单的执行任务的反应 (响应) 形式, 你会在你等待来自咖啡师的 “响应” 时做别的事情.

响应式编程依赖于事件, 面向 “流”: 代码运行的顺序. 不是代码的顺序, 而是和一个以上的事件有关. 这些事件发生是以随着时间的推移的序列. 我们把这一系列事件称为 “流”.

响应式编程就是因为我们得 “响应” 这些事件而得以命名. 想象一下, 你在等待某人发送一个很酷商品的促销码, 当这条促销码发出时, 你会立即响应去购买这个商品. 这正是响应式编程的原理.

为了能够对事件作出反应, 我们必须监听它. 在没有监听的情况下, 我们永远不会知道什么时候它会有事件发生. 在微博上, 可以设置监测微博的事件, 我们设置我们的电话, 每次发微博事件时就会第一时间通知我们.

在响应式编程中, 监听事件被称为侦听或订阅该事件. 这一点其实和订阅简讯类似. 当你订阅 Web 上发布的新闻, 你得提供电子邮件地址, 每次有新的文章发布时, 你的电子邮箱将得到该新闻的一个副本. 同样, 我们使用某个函数订阅事件流, 每当有一个新的事件, 流将激活这个函数, 以使代码能够对事件作出响应. 在这个例子中, 新闻通讯平台是事件流, 电子报的每一个新闻是一个事件, 电子邮箱是使用订阅事件流的函数.

Project Reactor & Reactive Programming

总览

Project Reactor 是 响应式编程 (Reactive Programming) 理论的实现. 可以总结为:

Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s).

- Reactive programming - Wikipedia

在上述响应式编程的定义中, 除了异步编程, 还包含两个重要的关键词:

  • Data streams: 数据流. 分为 静态数据流 (数组, 文件等) 和 动态数据流 (事件流, 日志流等). 基于数据流模型, 响应式编程得以提供一套统一的 Stream 风格的数据处理接口. 和 Java 8 的 Stream API 比起来, 响应式编程 API 除了支持静态数据流, 还支持动态数据流, 并且允许复用和同时接入多个订阅者.
  • The propagation of change: 变化传播. 简单来说就是以一个数据流作为输入, 经过一连串操作转化为另一个数据流. 然后分发给订阅者的过程. 这就有点像函数式编程中的组合函数, 将多个函数串联起来, 把一组输入数据转化为其他结构的输出数据.

Features

本质上, Project Reactor 是一种异步编程框架, 是 JVM 之上的完全非阻塞的响应式编程基础. 和其他框架相比, 响应式编程包含了以下三个特性:

  1. 描述而非执行: 在最终调用 subscribe() 方法之前, 从发布端到订阅端没有任何事情发生. 就好比无论多长的水管. 只要水龙头不拧开, 水管里的水就不会流动. 为了提高描述能力, 响应式编程提供了比 Java 8 的 Stream API 多得多的 API, 如 buffer(), merge(), orErrorMap() 等.
  2. 提高吞吐量: 类似 HTTP/2 中的连接复用, 响应式编程通过线程复用来提高吞吐量. 在传统的 Servlet 容器中, 每进来一个请求就会申请一个线程来处理. 受限于机器硬件资源, 单台服务器所能支撑的线程数存在极限的. 假设为 T, 那么应用同时能处理的请求数 (吞吐量) 必然也不会超过 T. 但是对于一个使用 Spring 5 开发的基于响应式编程框架的应用, 通过线程复用技术, 最大吞吐量是有可能超过 T 的.
  3. Backpressure (背压) 支持: 简单来说, 背压就是一种反馈机制. 在一般的 Push 模型中, 发布者既不知道也不关心订阅者的处理速度, 当数据的发布速度超过处理速度时, 需要订阅者自己决定缓存还是丢弃. 如果使用响应式编程框架, 决定权就交回给发布者, 订阅者只需要根据自己的处理能力向发布者请求相应数量的数据. 你可能会问这不就是 Pull 模型嘛? 其实是不同的: 在Pull模型中, 订阅者每次处理完数据, 都要重新发起一次请求拉取新的数据. 使用背压, 订阅者只需要发起一次请求, 就能连续不断的重复请求数据.

Best practices

一般来说, 响应式编程适用于高并发, 带延迟操作的场景. 比如:

  • 一次请求涉及多次外部服务调用

  • 非可靠的网络传输

  • 高并发下的消息处理

  • 弹性计算网络

代价

响应式编程的两个比较大的问题是:

  1. 虽然复用线程有助于提高吞吐量, 但是一旦在某个回调函数中线程被卡住, 那么这个想成上的所有请求都会被阻塞. 最严重的的情况是整个应用会被拖垮.
  2. 难以调试. 由于响应式编程 API 的强大的描述能力, 在一个典型应用中, 大部分代码都是以链式表达式的形式构成, 将很难定位到具体哪个环节出了问题. 需要专门的调试方法和工具 (以后介绍).

测试用例

以下内容摘录自: https://github.com/emac/demo-reactor

下面有两个测试用例, 用以演示响应式编程的两个关键特性: 提高吞吐量和背压.

提高吞吐量

@Test
public void testImperative() throws InterruptedException {_runInParallel(CONCURRENT_SIZE, () -> {ImperativeRestaurantRepository.INSTANCE.insert(load);});
}private void _runInParallel(int nThreads, Runnable task) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(nThreads);for (int i = 0; i < nThreads; i++) {executorService.submit(task);}executorService.shutdown();executorService.awaitTermination(1, TimeUnit.MINUTES);
}@Test
public void testReactive() throws InterruptedException {CountDownLatch latch = new CountDownLatch(CONCURRENT_SIZE);for (int i = 0; i < CONCURRENT_SIZE; i++) {ReactiveRestaurantRepository.INSTANCE.insert(load).subscribe(s -> {}, e -> latch.countDown(), latch::countDown);}latch.await();
}

第一个测试用例使用的是 “多线程 + MongoDB Driver”, 同时起 100 个线程, 每个线程往 MongoDB 中插入 1W 条数据, 总共 100W 条数据, 平均用时 15s 左右.

第二个测试用例使用的是 “Reactor + MongoDB Reactive Streams Driver”, 同样是插入 100W 条数据, 平均用时不到 10s.

Back pressure (背压)

在演示测试用例之前, 先看两张图帮助你更形象的理解什么是背压. 两张图一看没啥区别, 但其实是完全两种不同的背压策略:

第一张图, 发布速度 (100/s) 远大于订阅速度 (1/s), 但是由于背压的关系, 发布者严格按照订阅者的请求数量发送数据

第二张图, 发布速度 (1/s) 小于订阅速度 (100/s), 当订阅者请求 100 个数据时, 发布者会积满所需个数的数据再开始发送. 可以看到, 通过背压机制, 发布者可以根据各个订阅者的能力动态调整发布速度.

@BeforeEach
public void beforeEach() {// initialize publisherAtomicInteger count = new AtomicInteger();timerPublisher = Flux.create(s ->new Timer().schedule(new TimerTask() {@Overridepublic void run() {s.next(count.getAndIncrement());if (count.get() == 10) {s.complete();}}}, 100, 100));
}@Test
public void testNormal() throws InterruptedException {CountDownLatch latch = new CountDownLatch(1);timerPublisher.subscribe(r -> System.out.println("Continuous consuming " + r),e -> latch.countDown(),latch::countDown);latch.await();
}@Test
public void testBackpressure() throws InterruptedException {CountDownLatch latch = new CountDownLatch(1);AtomicReference<Subscription> timerSubscription = new AtomicReference<>();Subscriber<Integer> subscriber = new BaseSubscriber<Integer>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {timerSubscription.set(subscription);}@Overrideprotected void hookOnNext(Integer value) {System.out.println("consuming " + value);}@Overrideprotected void hookOnComplete() {latch.countDown();}@Overrideprotected void hookOnError(Throwable throwable) {latch.countDown();}};timerPublisher.onBackpressureDrop().subscribe(subscriber);new Timer().schedule(new TimerTask() {@Overridepublic void run() {timerSubscription.get().request(1);}}, 100, 200);latch.await();
}

第一个测试用例演示了在理想情况下, 即订阅者的处理速度能够跟上发布者的发布速度 (以 100ms 为间隔产生 10 个数字), 控制台从 0 打印到 9, 一共 10 个数字, 和发布端一致;

第二个测试用例故意条漫了订阅者的处理速度 (每 200ms 处理一个数字), 同时发布者采用了 Drop 的背压策略, 结果控制台只打印了一半的数字 (0, 2, 4, 6, 8), 另外一半由于背压的原因被发布者 Drop 了, 并没有发给订阅者.

Conclusion

本文简单概括了 Reactive Programming 和 Project Reactor. 部分内容来源网络, 部分源于总结. 接下来开始响应式编程之旅……

Reference

  • Official project reactor reference
  • Github demo reactor
  • PSUG #52 Dataflow and simplified reactive programming with Akka-streams

⒈响应式编程 Project Reactor 概述相关推荐

  1. Spring笔记(4):响应式编程、Reactor、WebFlux、Flow

    目录 1.Spring Webflux 介绍 2.响应式编程(Java 实现) 3.响应式编程(Reactor 实现) 4.SpringWebflux 执行流程和核心 API 5.SpringWebf ...

  2. 响应式编程实现技术概述

    响应式编程就是利用异步数据流进行编程,本质上就是观察者(Observer)模式的一种表现形式.我们首先讨论实现异步操作的几种常见方式,然后引出响应式编程的主流实现技术. 1. 实现异步的常见方式 在J ...

  3. 响应式编程之一:概述

    什么是响应式编程(Reactive Programming) In computing, reactive programming is an asynchronous programming par ...

  4. 响应式圣经:10W字,实现Spring响应式编程自由

    前言 全链路异步化改造的基础是响应式编程 随着业务的发展,微服务应用的流量越来越大,使用到的资源也越来越多. 在微服务架构下,大量的应用都是 SpringCloud 分布式架构,这种架构总体上是全链路 ...

  5. java9 响应式编程支持

    文章目录 响应式编程系列文章 概述 响应式编程接口 demo 响应式编程系列文章 响应式编程规范 java设计模式之 观察者模式 reactor3 源码分析 java9 响应式编程支持 spring ...

  6. 响应式编程以及反应式编程框架Reactor3的简单介绍

    前言 Reactor 3是一个围绕Reactive Streams规范构建的库,它在JVM上引入了响应式编程的一个范例.目前Spring5 引入的Webflux就是reactor 3实现的一个响应式w ...

  7. response获取响应内容_Project Reactor 深度解析 - 1. 响应式编程介绍,实现以及现有问题

    现在, Java 的各种基于 Reactor 模型的响应式编程库或者框架越来越多了,像是 RxJava,Project Reactor,Vert.x 等等等等.在 Java 9, Java 也引入了自 ...

  8. Reactor响应式编程 之 简介

    1 reactor 出现的背景.初衷和要达到什么样的目标 Reactor 项目始于 2012 年. 经过长时间的内部孵化,于 2013 年发布 Reactor 1.x 版本. Reactor 1 在各 ...

  9. Spring Boot 实践折腾记(10):响应式编程支持库Reactor

    Spring Boot 2.0发布已经过去了2个多月,随着微服务的流行,Spring Boot也越来越受到青睐,更好的隔离编程范式得到了越来越多项目的应用,这是一件值得高兴的事. Spring Boo ...

最新文章

  1. 做行业研究时如何获取相关数据?
  2. 湘潭2020计算机考试,湖南省湘潭市2020年3月计算机等级考试注意事项
  3. java递归实现 堆的删除_java 递归实现删除或查询指定目录下的全部文件
  4. 【TensorFlow-windows】学习笔记二——低级API
  5. ocsng mysql connection problem_OCSNG 介绍及其工作原理
  6. Anatomy of a Flink Program(Flink程序的剖析)
  7. 18.ChainOfResponsibility-职责链模式
  8. [Ext JS]5.11 轻量版的树- treelist
  9. DLA SQL分析函数:SQL语句审计与分析的利器
  10. php压缩解压缩插件,PHP压缩解压缩类:PclZip
  11. 【编译原理笔记06】语法分析,移入-归约分析:自底向上的分析,LR(0)分析法,LR(0)分析表的构建(基于自动机)
  12. mac安装helm工具_部署利器 helm v3 安装以及使用指南
  13. 昨天我遇到一个令我敬佩的人
  14. 最小值最大化问题(贪心系列)
  15. Linux 开源词典工具及下载链接
  16. MyBatis-Plus 分页查询以及自定义sql分页
  17. html embed页面无法播放视频,embed标签什么意思 手机无法播放网页上embed标签的视频...
  18. 关于M0、M1、M2货币最近所学
  19. tpshop index.php,第三讲、TPshop linux安装
  20. 文科生学计算机能考研吗,求推荐文科生可以跨考计算机的名校

热门文章

  1. Unity中协程的yield return
  2. Python venv虚拟环境跨设备迁移
  3. ZJOI2017 NGU!
  4. eclipse代码提示,高亮相同变量颜色设置
  5. Python 玩转 Excel
  6. 热风焊盘和隔离焊盘关系
  7. 影视后期制作学习第五天(AE)
  8. IDEA上移除项目(逻辑删除)
  9. (Spring Cloud面试题)spring cloud断路器的作用是什么?
  10. 解决Can't find bundle for...