前言

之前讲述了国产微服务网关Soul底层用了 Webflux 技术,本文将将详细剖析其底层的技术细节。

本文知识点架构:

一、什么是WebFlux?

我们从Spring的官网拉下一点点就可以看到介绍WebFlux的地方了

从官网的简介中我们能得出什么样的信息?

  • 我们程序员往往根据不同的应用场景选择不同的技术,有的场景适合用于同步阻塞的,有的场景适合用于异步非阻塞的。而 Spring 5 提供了一整套响应式(非阻塞)的技术栈供我们使用(包括Web控制器、权限控制、数据访问层等等)。
  • 而左侧的图则是技术栈的对比啦;
  • 响应式一般用 Netty 或者 Servlet 3.1的容器(因为支持异步非阻塞),而 Servlet 技术栈用的是 Servlet 容器
  • 在 Web 端,响应式用的是 WebFlux,Servlet 用的是 SpringMVC
  • .....

总结起来,WebFlux 只是响应式编程中的一部分(在 Web 控制端),所以一般我们用它与 SpringMVC 来对比。

二、如何理解响应式编程?

在上面提到了响应式编程(Reactive Programming),而 WebFlux 只是响应式编程的其中一个技术栈而已,所以我们先来探讨一下什么是响应式编程

从维基百科里边我们得到的定义:

reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change

响应式编程(reactive programming)是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式

在维基百科上也举了个小例子:

意思大概如下:

  • 在命令式编程(我们的日常编程模式)下,式子a=b+c,这就意味着a的值是由bc计算出来的。如果b或者c后续有变化,不会影响a的值
  • 在响应式编程下,式子a:=b+c,这就意味着a的值是由bc计算出来的。但如果b或者c的值后续有变化,会影响a的值

我认为上面的例子已经可以帮助我们理解变化传递(propagation of change)

那数据流(data stream)和声明式(declarative)怎么理解呢?那可以提一提我们的Stream流了。之前写过Lambda表达式和Stream流的文章,大家可以先去看看:

  • Lambda表达式基础知识入门
  • 一文带你深入体验Stream流

Lambda的语法是这样的(Stream流的使用会涉及到很多Lambda表达式的东西,所以一般先学Lambda再学Stream流):

Stream流的使用分为三个步骤(创建Stream流、执行中间操作、执行最终操作):

执行中间操作实际上就是给我们提供了很多的API去操作Stream流中的数据(求和/去重/过滤)等等

说了这么多,怎么理解数据流和声明式呢?其实是这样的:

  • 本来数据是我们自行处理的,后来我们把要处理的数据抽象出来(变成了数据流),然后通过API去处理数据流中的数据(是声明式的)

比如下面的代码;将数组中的数据变成数据流,通过显式声明调用.sum()来处理数据流中的数据,得到最终的结果:

public static void main(String[] args) {int[] nums = { 1, 2, 3 };int sum2 = IntStream.of(nums).parallel().sum();System.out.println("结果为:" + sum2);
}

如图下所示:

2.1 响应式编程->异步非阻塞

上面讲了响应式编程是什么:

响应式编程(reactive programming)是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式

也讲解了数据流/变化传递/声明式是什么意思,但说到响应式编程就离不开异步非阻塞

从Spring官网介绍WebFlux的信息我们就可以发现asynchronous, nonblocking 这样的字样,因为响应式编程它是异步的,也可以理解成变化传递它是异步执行的。

如下图,合计的金额会受其他的金额影响(更新的过程是异步的):

我们的JDK8 Stream流是同步的,它就不适合用于响应式编程(但基础的用法是需要懂的,因为响应式流编程都是操作嘛)

而在JDK9 已经支持响应式流了,下面我们来看一下

三、JDK9 Reactive

响应式流的规范早已经被提出了:里面提到了:

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure ----->http://www.reactive-streams.org/

翻译再加点信息:

响应式流(Reactive Streams)通过定义一组实体,接口和互操作方法,给出了实现异步非阻塞背压的标准。第三方遵循这个标准来实现具体的解决方案,常见的有Reactor,RxJava,Akka Streams,Ratpack等。

规范里头实际上就是定义了四个接口:

Java 平台直到 JDK 9才提供了对于Reactive的完整支持,JDK9也定义了上述提到的四个接口,在java.util.concurrent包上

一个通用的流处理架构一般会是这样的(生产者产生数据,对数据进行中间处理,消费者拿到数据消费):

  • 数据来源,一般称为生产者(Producer)
  • 数据的目的地,一般称为消费者(Consumer)
  • 在处理时,对数据执行某些操作一个或多个处理阶段。(Processor)

到这里我们再看回响应式流的接口,我们应该就能懂了:

  • Publisher(发布者)相当于生产者(Producer)
  • Subscriber(订阅者)相当于消费者(Consumer)
  • Processor就是在发布者与订阅者之间处理数据用的

在响应式流上提到了back pressure(背压)这么一个概念,其实非常好理解。在响应式流实现异步非阻塞是基于生产者和消费者模式的,而生产者消费者很容易出现的一个问题就是:生产者生产数据多了,就把消费者给压垮了

而背压说白了就是:消费者能告诉生产者自己需要多少量的数据。这里就是Subscription接口所做的事。

下面我们来看看JDK9接口的方法,或许就更加能理解上面所说的话了:

// 发布者(生产者)
public interface Publisher<T> {public void subscribe(Subscriber<? super T> s);
}
// 订阅者(消费者)
public interface Subscriber<T> {public void onSubscribe(Subscription s);public void onNext(T t);public void onError(Throwable t);public void onComplete();
}
// 用于发布者与订阅者之间的通信(实现背压:订阅者能够告诉生产者需要多少数据)
public interface Subscription {public void request(long n);public void cancel();
}
// 用于处理发布者 发布消息后,对消息进行处理,再交由消费者消费
public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

3.1 看个例子

代码中有大量的注释,我就不多BB了,建议直接复制跑一下看看:

class MyProcessor extends SubmissionPublisher<String>implements Processor<Integer, String> {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 保存订阅关系, 需要用它来给发布者响应this.subscription = subscription;// 请求一个数据this.subscription.request(1);}@Overridepublic void onNext(Integer item) {// 接受到一个数据, 处理System.out.println("处理器接受到数据: " + item);// 过滤掉小于0的, 然后发布出去if (item > 0) {this.submit("转换后的数据:" + item);}// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 出现了异常(例如处理数据的时候产生了异常)throwable.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}@Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println("处理器处理完了!");// 关闭发布者this.close();}}public class FlowDemo2 {public static void main(String[] args) throws Exception {// 1. 定义发布者, 发布的数据类型是 Integer// 直接使用jdk自带的SubmissionPublisherSubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();// 2. 定义处理器, 对数据进行过滤, 并转换为String类型MyProcessor processor = new MyProcessor();// 3. 发布者 和 处理器 建立订阅关系publiser.subscribe(processor);// 4. 定义最终订阅者, 消费 String 类型数据Subscriber<String> subscriber = new Subscriber<String>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 保存订阅关系, 需要用它来给发布者响应this.subscription = subscription;// 请求一个数据this.subscription.request(1);}@Overridepublic void onNext(String item) {// 接受到一个数据, 处理System.out.println("接受到数据: " + item);// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 出现了异常(例如处理数据的时候产生了异常)throwable.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}@Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println("处理完了!");}};// 5. 处理器 和 最终订阅者 建立订阅关系processor.subscribe(subscriber);// 6. 生产数据, 并发布publiser.submit(-111);publiser.submit(111);// 7. 结束后 关闭发布者// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭publiser.close();// 主线程延迟停止, 否则数据没有消费就退出Thread.currentThread().join(1000);}}

输出的结果如下:

流程实际上非常简单的:

参考资料:

  • https://yanbin.blog/java-9-talk-reactive-stream/#more-8877
  • https://blog.csdn.net/wudaoshihun/article/details/83070086
  • http://www.spring4all.com/article/6826
  • https://www.cnblogs.com/IcanFixIt/p/7245377.html

Java 8 的 Stream 主要关注在流的过滤,映射,合并,而 Reactive Stream 更进一层,侧重的是流的产生与消费,即流在生产与消费者之间的协调

说白了就是:响应式流是异步非阻塞+流量控制的(可以告诉生产者自己需要多少的量/取消订阅关系)

展望响应式编程的场景应用:

比如一个日志监控系统,我们的前端页面将不再需要通过“命令式”的轮询的方式不断向服务器请求数据然后进行更新,而是在建立好通道之后,数据流从系统源源不断流向页面,从而展现实时的指标变化曲线;
再比如一个社交平台,朋友的动态、点赞和留言不是手动刷出来的,而是当后台数据变化的时候自动体现到界面上的。

四、入门WebFlux

扯了一大堆,终于回到WebFlux了。经过上面的基础,我们现在已经能够得出一些结论的了:

  • WebFlux是Spring推出响应式编程的一部分(web端)
  • 响应式编程是异步非阻塞的(是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式)

我们再回来看官网的图:

4.1 简单体验WebFlux

Spring官方为了让我们更加快速/平滑到WebFlux上,之前SpringMVC那套都是支持的。也就是说:我们可以像使用SpringMVC一样使用着WebFlux

WebFlux使用的响应式流并不是用JDK9平台的,而是一个叫做Reactor响应式流库。所以,入门WebFlux其实更多是了解怎么使用Reactor的API,下面我们来看看~

Reactor是一个响应式流,它也有对应的发布者(Publisher ),Reactor的发布者用两个类来表示:

  • Mono(返回0或1个元素)
  • Flux(返回0-n个元素)

而订阅者则是Spring框架去完成

下面我们来看一个简单的例子(基于WebFlux环境构建):

// 阻塞5秒钟
private String createStr() {try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {}return "some string";
}// 普通的SpringMVC方法
@GetMapping("/1")
private String get1() {log.info("get1 start");String result = createStr();log.info("get1 end.");return result;
}// WebFlux(返回的是Mono)
@GetMapping("/2")
private Mono<String> get2() {log.info("get2 start");Mono<String> result = Mono.fromSupplier(() -> createStr());log.info("get2 end.");return result;
}

首先,值得说明的是,我们构建WebFlux环境启动时,应用服务器默认是Netty的:

我们分别来访问一下SpringMVC的接口和WebFlux的接口,看一下有什么区别:

SpringMVC:

WebFlux:

从调用者(浏览器)的角度而言,是感知不到有什么变化的,因为都是得等待5s才返回数据。但是,从服务端的日志我们可以看出,WebFlux是直接返回Mono对象的(而不是像SpringMVC一直同步阻塞5s,线程才返回)。

这正是WebFlux的好处:能够以固定的线程来处理高并发(充分发挥机器的性能)。

WebFlux还支持服务器推送(SSE - >Server Send Event),我们来看个例子:

/*** Flux : 返回0-n个元素* 注:需要指定MediaType* @return*/
@GetMapping(value = "/3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
private Flux<String> flux() {Flux<String> result = Flux.fromStream(IntStream.range(1, 5).mapToObj(i -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {}return "flux data--" + i;}));return result;
}

效果就是每秒会给浏览器推送数据:

WebFlux入门极简相关推荐

  1. 深度学习入门极简教程(二)

    深度学习入门极简教程(二) 摘要: 现在的人工智能,大致就是用"硅基大脑"模拟或重现"碳基大脑的过程".那么,在未来会不会出现"碳硅合一"的 ...

  2. 数据分析入门极简书单

    本文授权转载自麻瓜编程 如果你找一个熟悉的朋友给你推荐书单,他会倾向于越短越好,因为他想把他知道的最好的推荐给你,让你少花时间在不重要的事情上. 但如果你在网上看到一个书单,往往会发现都很长长长长长, ...

  3. Maven入门极简使用教程

    Maven入门级别使用 回顾 拦截器 统一异常处理 ​ 实现接口的方式. ​ 注解方式.(分享) ​ 如果是ajax请求,会将异常信息发送到客户端的响应中. 复习spring springmvc SS ...

  4. Pygame游戏入门极简思维导图

    你好,我是zhenguo 最近几天总结的Pygame游戏入门的思维导图,分享需要的诸位伙伴:

  5. 拥抱Node.js 8.0,N-API入门极简例子

    本文摘录自<Nodejs学习笔记>,更多章节及更新,请访问 github主页地址.欢迎加群交流,群号 197339705. N-API简介 Node.js 8.0 在2017年6月份发布, ...

  6. 拥抱 Node.js 8.0,N-API 入门极简例子

    本文摘录自<Nodejs学习笔记>,更多章节及更新,请访问 github主页地址.欢迎加群交流,群号 197339705. N-API简介 Node.js 8.0 在2017年6月份发布, ...

  7. Qt入门极简教程(二)

    <QMainWindow_菜单栏和工具栏> QMainWindow:菜单栏(menu bar).多个工具栏(tool bars).多个铆接部件(浮动窗口dock widgets).中心部件 ...

  8. [转载]芋道 Soul 极简入门(国产微服务网关)

    摘要: 原创出处 http://www.iocoder.cn/Soul/install/ 「芋道源码」欢迎转载,保留摘要,谢谢! 由于原著写作时间时间有点久了:有部分类容需要更新,后去个人会再发布文章 ...

  9. 芋道 Apollo 极简入门

    点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...

最新文章

  1. 人类史上最伟大的 PPT,马斯克的 39 页火星计划PPT
  2. Xen Server二安装xc及管理xen主机
  3. 函数指针和函数指针数组及其应用
  4. 虾皮如何注册店铺_虾皮跨境电商怎样注册店铺?做(shopee)虾皮电商靠谱吗
  5. Spring Data JPA 从入门到精通~关键字列表
  6. C++学习笔记系列之继承多态
  7. 第22课 抽奖 《小学生C++趣味编程》
  8. Celery参数详解、配置参数
  9. MySQL优化SQL性能问题
  10. 为了自动驾驶,Uber每月烧钱1.3亿
  11. Android实例-利用WebBrowser实现浏览器(XE8+小米2)
  12. 利用 Composer 完善自己的 PHP 框架(二)——发送邮件
  13. hbase 基本命令
  14. Mac磁盘如何分区?教你Mac系统磁盘自由分区教程!
  15. 英语单词记忆(词缀 / 词根)
  16. 高德地图Amap绘制路线首尾相连问题
  17. 解决ECharts中使用tooltip时鼠标移入抖动问题
  18. php silk v3 decoder,微信小程序语音搜索踩坑:silk文件格式转换,在PHP中使用
  19. 计算html的高度,html计算各对象的宽高
  20. 【Block-Level Verification】 芯片开发通识_验证目标_ 验证语言_ 验证职业前景 _挑战和瓶颈_验证周期_功能描述文档_验证计划_回归测试_硅后测试_逃逸分析...

热门文章

  1. 输入三个字符串,按照由小到大的顺序输出(指针方法处理)——C语言
  2. 医院病房听音与定点寻呼广播系统方案
  3. 排序算法——冒泡排序原理动图详解及实现
  4. C++学习——坚持(二)
  5. .net core项目iis10上出现 HTTP 错误 500.19,错误代码:0x8007000d
  6. 打开一个英文文本文件,将其中大写字母变成小写,小写字母变成大写。
  7. BZOJ5301:[CQOI2018]异或序列——题解
  8. 2022年网络安全比赛--压缩包文件暴力破解中职组(超详细)
  9. html轮播图3d翻转,jQuery自适应-3D旋转轮播图
  10. splunk 开机启动