系列目录:

  1. Spring WebFlux运用中的思考与对比
  2. CompletableFuture与Spring的Sleuth结合工具类
  3. CommpetableFuture使用anyOf过程中的一些优化思考
  4. 结合CompletableFuture与Spring的Sleuth结合工具类与allOf以及anyOf

本文基于Spring Cloud Finchley SR4

本文通过几个问题,解析下Spring WebFlux用法最佳实践,并与另一框架Vertx作对比

1. 是否一定要用默认的Web容器,用自己的Web容器是否可以,同时是否可以有web和webflux

是可以的,这样的依赖是可行的(容器用tomcat和undertow或者其他都可以,这里使用undertow):

2. 怎样实现真正的异步背压的Reactor模型呢?

这个问题,除此运用像WebFlux和Vertx的框架的人,都会对这个有误解。认为仅仅简单的把webFlux的依赖添加进来,之后接口返回Mono就实现了异步背压的Reactor模型。实际上并不是这样的。
我们来举几个例子,分步骤深入了解下。
首先为了测试方便,我们将web容器的处理http请求线程池的大小改成唯一一个,对于Tomcat,配置:

server.thread.max-thread=1

对于UnderTow(我们这里用的是underTow):

# 设置IO线程数, 它主要执行非阻塞的任务,它们会负责多个连接, 默认设置每个CPU核心一个线程
server.undertow.io-threads=1
# 阻塞任务线程池, 当执行类似servlet请求阻塞IO操作, undertow会从这个线程池中取得线程
# 它的值设置取决于系统线程执行任务的阻塞系数,默认值是IO线程数*8
server.undertow.worker-threads=1

之后,配置Log4J2日志格式为:

<Property name="springAppName">test</Property><Property name="LOG_ROOT">log</Property><Property name="LOG_DATEFORMAT_PATTERN">yyyy-MM-dd HH:mm:ss.SSS</Property><Property name="LOG_EXCEPTION_CONVERSION_WORD">%xwEx</Property><Property name="LOG_LEVEL_PATTERN">%5p</Property><Property name="logFormat">%d{${LOG_DATEFORMAT_PATTERN}} ${LOG_LEVEL_PATTERN} [${springAppName},%X{X-B3-TraceId},%X{X-B3-SpanId}] [${sys:PID}] [%t][%C:%L]: %m%n${sys:LOG_EXCEPTION_CONVERSION_WORD}</Property>

这样的格式可以使我们看到线程号,还有sleuth的traceId和spanId(我们的项目依赖了sleuth)。
首先编写测试代码,看看直接简单调用并just是否实现了异步背压:

@Log4j2
@RestController
public class TestController {@Autowiredprivate TestService testService;@RequestMapping("/test")public Mono<String> test() {log.info("test started");return Mono.just(testService.simulateIOTest());}@Servicepublic static class TestService {public String simulateIOTest() {try {//simulate iolog.info("simulate start");TimeUnit.SECONDS.sleep(5);log.info("simulate end");} catch (InterruptedException e) {e.printStackTrace();}return "hello";}}
}

并发调用接口,查看日志,发现:

2019-11-12 09:05:41.595  INFO [test,26bf995af305ad34,26bf995af305ad34] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:05:41.596  INFO [test,26bf995af305ad34,26bf995af305ad34] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:38]:simulate start
2019-11-12 09:05:46.598  INFO [test,26bf995af305ad34,26bf995af305ad34] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:40]:simulate end
2019-11-12 09:05:46.635  INFO [test,620bd553b1e55dcd,620bd553b1e55dcd] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:05:46.635  INFO [test,620bd553b1e55dcd,620bd553b1e55dcd] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:38]:simulate start
2019-11-12 09:05:51.636  INFO [test,620bd553b1e55dcd,620bd553b1e55dcd] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:40]:simulate end
2019-11-12 09:05:51.643  INFO [test,bc17d60861ba1a2a,bc17d60861ba1a2a] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:05:51.643  INFO [test,bc17d60861ba1a2a,bc17d60861ba1a2a] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:38]:simulate start
2019-11-12 09:05:56.644  INFO [test,bc17d60861ba1a2a,bc17d60861ba1a2a] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:40]:simulate end

可以明显看出,请求是串行处理的,因为只有一个线程,并且这个线程还在等待请求处理完。这就不符合Reactor模型,处理http请求的线程XNIO-2 task-1应该不等待请求处理完而直接处理下一个请求才对。
Mono.just(testService.simulateIOTest())替换成Mono.fromCallable(() -> testService.simulateIOTest())等等类似的是一样的效果,这里必须自己用其他的线程池,去处理实际请求,处理结束的时候,将结果填写到最外层的Mono里面。这样的话,考虑到代码整洁性不采用纯回调写法,要求每一个调用方法返回的都是Future类型的。这里我们返回CompletableFuture。

@Log4j2
@RestController
public class TestController {@Autowiredprivate TestService testService;@RequestMapping("/test")public Mono<String> test() {log.info("test started");return Mono.create(stringMonoSink -> testService.simulateIOTest().thenApply(s -> {log.info("apply");//填写成功结果stringMonoSink.success(s);return s;}));}@Servicepublic static class TestService {public CompletableFuture<String> simulateIOTest() {return CompletableFuture.supplyAsync(() -> {try {//simulate iolog.info("simulate start");TimeUnit.SECONDS.sleep(5);log.info("simulate end");} catch (InterruptedException e) {e.printStackTrace();}return "hello";});}}
}

结果是:

2019-11-12 09:18:03.457  INFO [test,8d6eddc9cc80612f,8d6eddc9cc80612f] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:18:03.458  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start
2019-11-12 09:18:04.155  INFO [test,c654462e159fd43e,c654462e159fd43e] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:18:04.156  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-5][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start
2019-11-12 09:18:04.962  INFO [test,8366a95d002ca25a,8366a95d002ca25a] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:18:04.963  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-7][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start
2019-11-12 09:18:05.756  INFO [test,5f851d9e2ef49f14,5f851d9e2ef49f14] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:18:05.757  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-9][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start
2019-11-12 09:18:08.459  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end
2019-11-12 09:18:08.459  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply
2019-11-12 09:18:09.156  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-5][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end
2019-11-12 09:18:09.156  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-5][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply
2019-11-12 09:18:09.964  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-7][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end
2019-11-12 09:18:09.964  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-7][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply
2019-11-12 09:18:10.757  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-9][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end
2019-11-12 09:18:10.757  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-9][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply

这样,才真正实现了Reactor模型。

3. CompletableFuture线程池管理还有日志追踪

CompletableFuture可以指定线程池,亦可以不指定。如果像上面不指定的话,那么使用的线程池就是Java8之后会默认启动一个大小为CPU核数减一的CommonForkJoinPool去执行。需要指定的话,基本上每个方法都可以额外传入一个线程池作为参数。

最佳实践是,只要涉及到IO的,就交给不同的线程池去做,不同种类的IO的线程池不同。例如,用于数据库IO的线程池,用于RPC的线程池,用于缓存访问的线程池等等。

这里还有一个问题存在,就是异步调用,导致spanId和traceId丢失了,例如上面的例子:

2019-11-12 09:18:03.457  INFO [test,8d6eddc9cc80612f,8d6eddc9cc80612f] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:18:03.458  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start

8d6eddc9cc80612f这个丢失了,导致微服务调用链日志追踪变得不可行,所以,这里我们对于异步的代码,也需要在异步调用前强制设置下spanId和traceId。

综上之后,修改的代码是:

@Log4j2
@RestController
public class TestController {@Autowiredprivate TestService testService;@RequestMapping("/test")public Mono<String> test() {log.info("test started");return Mono.fromFuture(testService.simulateIOTest());}@Servicepublic static class TestService {@Autowiredprivate Tracer tracer;ThreadFactory build = (new ThreadFactoryBuilder()).setNameFormat("test_service_executor-%d").build();private ExecutorService executorService = new ThreadPoolExecutor(50, 50, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(131072), build, new ThreadPoolExecutor.AbortPolicy());public CompletableFuture<String> simulateIOTest() {Span span = tracer.currentSpan();return CompletableFuture.supplyAsync(() -> {try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {//simulate iolog.info("simulate start");TimeUnit.SECONDS.sleep(5);log.info("simulate end");return "hello";} catch (Exception e) {throw new RuntimeException(e);}}, executorService);}}
}

结果是:

2019-11-12 09:44:30.953  INFO [test,bc1ba4169e037577,bc1ba4169e037577] [2796] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:28]:test started
2019-11-12 09:44:30.991  INFO [test,bc1ba4169e037577,bc1ba4169e037577] [2796] [test_service_executor-0][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start
2019-11-12 09:44:35.991  INFO [test,bc1ba4169e037577,bc1ba4169e037577] [2796] [test_service_executor-0][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end

3. 与Vertx对比,有哪些异同?

实际上,从设计上看,基本思路是一样的。对于任意一个IO操作,如果有原生的异步客户端(返回是一个Future),则运用Future封装交给其他线程池处理,不影响http请求线程接受其他请求。

主要区别在于:

  1. WebFlux框架并没有涉及到线程池,Vertx连异步线程池也封装成为Vertx的WorkerExecutor类。
  2. WebFlux异步Future使用的还是Java原生的,Vetx框架自己封装了Future。
  3. WebFlux与Spring在结合方面更完善,但是Spring生态里面并没有提供原生的NIO客户端,例如实现了MySQL协议栈的NIO mysql客户端,这个Vertx是有的,但是是否健壮还有待考证。这些进一步限制了WebFlux的性能。
  4. Vertx是一个跨语言的框架

Spring WebFlux运用中的思考与对比相关推荐

  1. 处理Spring WebFlux中出现的错误

    处理Spring WebFlux中出现的错误 案例概述 在本教程中,我们将看一下处理Spring WebFlux项目中错误的各种策略,同时介绍一个实际案例. 我们还将指出在一个策略中使用另一个策略并在 ...

  2. Spring WebFlux和Spring Cloud开发响应式微服务

    作者:Piotr Mińkowski 译者:大萝卜爱上小白菜 原文:https://dzone.com/articles/reactive-microservices-with-spring-webf ...

  3. 在Spring WebFlux中创建多个RouterFunction

    在这篇文章中,我们将研究在Spring WebFlux中为不同的逻辑域定义多个路由器功能. 如果您正在创建"微服务",则可能不会出现问题,因为您很可能仅在每个服务的单个域中工作,但 ...

  4. java中spring的web支持nio,Spring WebClient NIO功能和问题域,与Spring Webflux一起使用

    我正在使用最新版本的Spring - Spring 5 . 我正在开发http客户端的WebService"聚合器",有点像路由请求到外部WebServices,接收响应后接收响应 ...

  5. Spring WebFlux 响应式编程学习笔记(一)

    各位Javaer们,大家都在用SpringMVC吧?当我们不亦乐乎的用着SpringMVC框架的时候,Spring5.x又悄(da)无(zhang)声(qi)息(gu)的推出了Spring WebFl ...

  6. (6)Spring WebFlux性能测试——响应式Spring的道法术器

    本系列其他文章见:<响应式Spring的道法术器>. 前情提要:Reactor快速上手 | Spring WebFlux快速上手 本文源码 1.4 从负载测试看异步非阻塞的优势 前面总是& ...

  7. Spring Webflux: Kotlin DSL [片断]

    原文链接:https://dzone.com/articles/spring-webflux-kotlin-dsl-snippets 作者:Biju Kunjummen 译者:Jackie Tang ...

  8. Spring Webflux 响应式编程 (二) - WebFlux编程实战

    第一章 Reactive Stream 第1节 jdk9的响应式流 就是reactive stream,也就是flow.其实和jdk8的stream没有一点关系.说白了就一个发布-订阅模式,一共只有4 ...

  9. 响应式编程之Spring Webflux

    文章目录 一 .响应式编程 二 .响应式流 (1)JDK9响应式流: (2)Reactor响应式流库 三.Spring WebFlux 1.整合Webflux 2.事件推送 3.实现背压 四.配置数据 ...

最新文章

  1. 第三个Python程序:Python函数
  2. docker容器运行mysql持久化_docker容器实现数据持久化的两种方式及其区别
  3. nginx-rtmp常用指令
  4. java axis webservice_Axis Webservice框架使用案例
  5. Jinja的基础知识
  6. 使用SaxParser和完整代码进行XML解析
  7. BringWindowToTop
  8. 32位dll转64位工具_如何在64位系统中运行32位或16位程序
  9. 光环PMP下午茶做题时间
  10. excel如何比对两列数据是否相同
  11. EA使用小技巧-控制图面拷贝时的边框
  12. ansys转子动力学分析
  13. Kubernetes资源配额
  14. 萤石云视频播放器来回切换视频报错bug
  15. 《构建之法》读书笔记(2)
  16. [Linux]搭建Anki同步服务器(针对Anki2.0)
  17. 开源风控系统radar部署
  18. 微信小程序如何直接分享到朋友圈?
  19. 重温马云英文演讲:最伟大的成功
  20. 中国天井钻机行业运行现状分析及发展前景预测报告2022-2028年

热门文章

  1. 为南通市2017中考数学试卷画的图
  2. 使用install shield制作安装程序问题集锦
  3. vscode终端运行vue报错:无法加载文件 C:\Users\14353\AppData\Roaming\npm\vue.ps1,因为在此系统上禁止运行脚本
  4. Token一般存放在哪里
  5. 学python的书-学习python求推荐一波书籍?
  6. 成本中心主数据维护及其导致常见报错解析(如:消息号 KI261 成本中心 / 冻结而不能直接对 收入记帐)
  7. FICO配置详解之四:成本中心会计
  8. python测试开发教程_python3测试工具开发快速入门教程
  9. python哪个方向工资高_学完Python的7大就业方向,哪个赚钱多?
  10. 联想台式机linux系统安装教程,商用台式一键恢复软件各版本使用介绍(适用于现有上市机型)...