文章目录

  • 一 、响应式编程
  • 二 、响应式流
    • (1)JDK9响应式流:
    • (2)Reactor响应式流库
  • 三、Spring WebFlux
    • 1、整合Webflux
    • 2、事件推送
    • 3、实现背压
  • 四、配置数据源(以mysql为例)
    • 1、CRUD
    • 2、R2DBC
    • (1)如何体现异步?
    • (2)如何使用事务?
    • (3)线程池配置
  • 五、远程调用
  • 六、与Spring MVC的比较
  • 七、扩展
  • 附录:

一 、响应式编程

响应式编程(reactive programming)是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式
(1)数据流

将要处理的数据抽象成流

通用的流式处理:
生产者产生数据,对数据进行中间处理,消费者拿到数据消费。

(2)变化传递

● 在命令式编程(我们的日常编程模式)下,式子a=b+c,这就意味着a的值是由b和c计算出来的。如果b或者c后续有变化,不会影响到a的值。
● 在响应式编程下,式子a:=b+c,这就意味着a的值是由b和c计算出来的。但如果b或者c的值后续有变化,会影响到a的值。

(3)声明式
通过API来处理数据流中的数据。

适用场景:异步非阻塞。响应式系统具有某些特性,使其成为低延迟、高吞吐量工作负载的理想选择。

比如:
一个日志监控系统,我们的前端页面将不再需要通过“命令式”的轮询的方式不断向服务器请求数据然后进行更新,而是在建立好通道之后,数据流从系统源源不断流向页面,从而展现实时的指标变化曲线;
一个社交平台,朋友的动态、点赞和留言不是手动刷出来的,而是当后台数据变化的时候自动体现到界面上的。

二 、响应式流

响应式流(Reactive Streams)通过定义一组实体,接口和互操作方法,给出了实现异步非阻塞背压的标准。第三方遵循这个标准来实现具体的解决方案,常见的有Reactor,RxJava,Akka Streams,Ratpack等。

常见的响应流库简介

(1)JDK9响应式流:

Publisher:发布者,数据来源,又称生产者。
Subscriber:订阅者,数据目的地,又称消费者。
Processor:发布者和订阅者之间处理数据。

(2)Reactor响应式流库

发布者:Mono(返回0或1个元素)、Flux(返回0~n个元素)。
订阅者:Spring框架来完成。

背压:
Backpressure
在异步场景中,被观察者发送事件的速度远远快于观察者的处理速度情况下,一种告知上游的被观察者降低发送速度的策略。

生产者和消费者模式下,如果生产者生产的消费多了,消费者消费不过来,就容易被压垮了。

借助背压,消费者可以告诉生产者自己需要多少量的数据。

响应式流=异步非阻塞+流量控制。

三、Spring WebFlux

Spring提供了两条并行的技术栈,一条是基于Servlet API的Spring MVC和Spring Data,另一个条是完整的响应式栈,包括Spring WebFlux和Spring Data’s reactive repositories。

SpringBoot提供了自动配置的Spring Webflux,简化了响应式web应用的开发。

1、整合Webflux

pom引入依赖:

 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><scope>test</scope></dependency></dependencies>

启动类添加@EnableWebFlux。

两种方式:
(1)基于注解

@RestController
@RequestMapping("/user")
public class MyController {@GetMapping("/list")public Flux<User> getList(){return Flux.fromIterable(list.stream().filter(k->{return k.getId()>3;}).collect(Collectors.toList()));}@GetMapping("/{id}")public Mono<User> getUser(@PathVariable Long userId) {return this.userRepository.findById(userId);}
}

(2)函数式
使用handler代替controller,做流的处理。
引入路由,类似SpringMVC中的@RequestMapping。

@Component
public class UserHandler {@ResourceUserServiceImpl userService;public Mono<ServerResponse> getList(ServerRequest serverRequest){Flux<User> userFlux=userService.getList();return ServerResponse.ok().body(userFlux, User.class);}
}@Configuration(proxyBeanMethods = false)
public class UserRouting {@Beanpublic RouterFunction<ServerResponse> monoRouterFunction(UserHandler userHandler){return RouterFunctions.route(GET("/user/list"),userHandler::getList).andRoute(GET("/hello").and(accept(TEXT_PLAIN)),userHandler::hello).andRoute(GET("/hello/stream").and(accept(APPLICATION_STREAM_JSON)),userHandler::helloStream);}
}

启动后,可以看到使用的是内置的netty。

访问http://localhost:20001/user/list 即可。

   @GetMapping("list/getStringSleep")public Mono<String> getStringSleep() {log.info("invoke start");Mono<String> s=Mono.fromSupplier(()->createStr());log.info("invoke end");return s;}private String createStr() {try {TimeUnit.SECONDS.sleep(5);System.out.println("sleep");} catch (InterruptedException e) {}return "some string";}

先执行invoke start invoke end 再执行异步流处理中的sleep日志。

2022-02-18 13:47:26.364  INFO 61992 --- [ctor-http-nio-2] com.example.controller.MyController      : invoke start
2022-02-18 13:47:26.366  INFO 61992 --- [ctor-http-nio-2] com.example.controller.MyController      : invoke end
2022-02-18 13:47:26.369 DEBUG 61992 --- [ctor-http-nio-2] o.s.w.r.r.m.a.ResponseBodyResultHandler  : [fceca854-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:53707] Using 'text/html' given [text/html, application/xhtml+xml, image/avif, image/webp, image/apng, application/xml;q=0.9, application/signed-exchange;v=b3;q=0.9, */*;q=0.8] and supported [text/plain;charset=UTF-8, text/event-stream, text/plain;charset=UTF-8, */*]
2022-02-18 13:47:26.369 DEBUG 61992 --- [ctor-http-nio-2] o.s.w.r.r.m.a.ResponseBodyResultHandler  : [fceca854-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:53707] 0..1 [java.lang.String]
sleep
2022-02-18 13:47:31.375 TRACE 61992 --- [ctor-http-nio-2] o.s.core.codec.CharSequenceEncoder       : [fceca854-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:53707] Writing "some string"
2022-02-18 13:47:31.376 DEBUG 61992 --- [ctor-http-nio-2] r.n.http.server.HttpServerOperations     : [fceca854-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:53707] Decreasing pending responses, now 0
2022-02-18 13:47:31.377 DEBUG 61992 --- [ctor-http-nio-2] r.n.http.server.HttpServerOperations     : [fceca854-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:53707] Last HTTP packet was sent, terminating the channel
2022-02-18 13:47:31.377 TRACE 61992 --- [ctor-http-nio-2] o.s.w.s.adapter.HttpWebHandlerAdapter    : [fceca854-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:53707] Completed 200 OK, headers={masked}
2022-02-18 13:47:31.377 TRACE 61992 --- [ctor-http-nio-2] o.s.h.s.r.ReactorHttpHandlerAdapter      : [fceca854-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:53707] Handling completed
2022-02-18 13:47:31.377 DEBUG 61992 --- [ctor-http-nio-2] r.n.http.server.HttpServerOperations     : [fceca854-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:53707] Last HTTP response frame
2022-02-18 13:47:31.378 TRACE 61992 --- [ctor-http-nio-2] reactor.netty.channel.ChannelOperations  : [fceca854, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:53707] Disposing ChannelOperation from a channel

2、事件推送

Server-Sent Event(SSEs)

 @GetMapping(value = "push",produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> push() {log.info("push start");Flux<String> result= Flux.fromStream(IntStream.range(1,5).mapToObj(i->{try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {}return "push data--" + i;}));log.info("push end");return  result;}

在浏览器中执行http://localhost:8080/demo/push,在服务器终端可以看到日志,已经执行完成。但此时页面上的推送还在进行中。

从浏览器的监控可以看到,浏览器接收的是一个事件流 EventStream。

3、实现背压

WebFlux中可以用limitRate()设置背压的限制参数。

测试代码:

其中list为内存中构造的数据集合,此处忽略。

//不使用背压
@GetMapping(value = "list")public Flux<User> getList() {log.info("invoke start");Flux<User> user=Flux.fromStream(list.stream());log.info("invoke end");return user;}//使用背压@GetMapping(value = "list/setLimit",produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<User> getListLimit() {log.info("invoke start");Flux<User> user=Flux.fromStream(list.stream()).limitRate(2);log.info("invoke end");return user;}

对比使用背压和不适用背压:

● 接口的总耗时

● 详细耗时

使用背压:

不使用背压:

浏览器接收的数据格式略有差异:

不使用背压:
[{“id”:0,“name”:“user0”},{“id”:1,“name”:“user1”},{“id”:2,“name”:“user2”},{“id”:3,“name”:“user3”},{“id”:4,“name”:“user4”},{“id”:5,“name”:“user5”}]

使用背压:

data:{“id”:0,“name”:“user0”}

data:{“id”:1,“name”:“user1”}

data:{“id”:2,“name”:“user2”}

data:{“id”:3,“name”:“user3”}

data:{“id”:4,“name”:“user4”}

data:{“id”:5,“name”:“user5”}

四、配置数据源(以mysql为例)

   我们最常使用的 JDBC 其实是同步的,而我们使用 WebFlux 是为了通过异步的方式来提高服务端的响应效率,WebFlux 虽然实现了异步,但是由于 JDBC 还是同步的,而大部分应用都是离不开数据库的,所以其实效率本质上还是没有提升。常见的异步JDBC有:ADAB、R2DBC。下面以R2DBC为例,介绍webflux配置mysql。

1、CRUD

● pom引入

   <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-r2dbc</artifactId></dependency><dependency><groupId>dev.miku</groupId><artifactId>r2dbc-mysql</artifactId></dependency>

● yaml配置数据源地址
当然,使用java config的方式配置也可以。

spring:r2dbc:url: r2dbcs:mysql://localhost:3306/database?sslMode=DISABLEDusername: password:

注意,这里url中的数据库连接协议不在是jdbc,而是r2dbcs

● 操作数据
这里有两种方式,都可以实现。
//方法一:通过ReactiveCrudRepository

//类似DAO
public interface UserRepository extends ReactiveCrudRepository<User,Long> {}//在Controller中注入,并使用。@Resource
UserRepository userRepository;@GetMapping("/list")
public Flux<User> getListFromDB(){return userRepository.findAll();
}

//方法二:使用 DatabaseClient

@Resource
DatabaseClient databaseClient;@GetMapping("/{id}")
public Mono<Map<String,Object>> getDetail(@PathVariable("id") Integer id){return databaseClient.sql("select * from user where id="+id).fetch().one();
}

2、R2DBC

Reactive Relational Database Connectivity,是一个使用响应式驱动集成关系型数据库的孵化器。Spring Data R2DBC运用熟悉的Spring抽象和repository支持R2DBC。

(1)如何体现异步?

测试代码:

     @GetMapping("/{id}")public Mono<Map<String,Object>> getDetail(@PathVariable("id") Integer id){log.info("invoke start");Mono<Map<String,Object>> map= databaseClient.sql("select * from user where id="+id).fetch().one();log.info("invoke end");return map;}

浏览器请求 http://localhost:20001/demo/1,从下面的日志可以看出,日志中invoke start、invoke end相继执行,后续才建立连接,进行数据库查询操作。

2022-02-11 10:39:36.108  INFO 15483 --- [ctor-http-nio-2] com.example.controller.MyController      : invoke start
2022-02-11 10:39:36.108  INFO 15483 --- [ctor-http-nio-2] com.example.controller.MyController      : invoke end
2022-02-11 10:39:36.109 DEBUG 15483 --- [ctor-http-nio-2] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the driver
2022-02-11 10:39:36.110 DEBUG 15483 --- [ctor-http-nio-2] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [select * from user where id=1]
2022-02-11 10:39:36.110 TRACE 15483 --- [ctor-http-nio-2] o.s.r2dbc.core.DefaultDatabaseClient     : Expanded SQL [select * from user where id=1]
2022-02-11 10:39:36.111 DEBUG 15483 --- [ctor-http-nio-2] dev.miku.r2dbc.mysql.MySqlConnection     : Create a simple statement provided by text query
2022-02-11 10:39:36.111 DEBUG 15483 --- [ctor-http-nio-2] d.m.r.mysql.client.ReactorNettyClient    : Request: SimpleQueryMessage{sql=REDACTED}
2022-02-11 10:39:36.112 TRACE 15483 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient    : [id: 0x42250dba, L:/127.0.0.1:49383 - R:localhost/127.0.0.1:3306] READ: 121B+-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 00 00 01 02 28 00 00 02 03 64 65 66 06 66 61 |.....(....def.fa|
|00000010| 6d 69 6c 79 04 75 73 65 72 04 75 73 65 72 02 69 |mily.user.user.i|
|00000020| 64 02 69 64 0c 3f 00 0b 00 00 00 03 00 00 00 00 |d.id.?..........|
|00000030| 00 2c 00 00 03 03 64 65 66 06 66 61 6d 69 6c 79 |.,....def.family|
|00000040| 04 75 73 65 72 04 75 73 65 72 04 6e 61 6d 65 04 |.user.user.name.|
|00000050| 6e 61 6d 65 0c 2d 00 90 01 00 00 fd 00 00 00 00 |name.-..........|
|00000060| 00 09 00 00 04 01 31 06 e5 bc a0 e4 b8 89 07 00 |......1.........|
|00000070| 00 05 fe 00 00 22 00 00 00                      |....."...       |
+--------+-------------------------------------------------+----------------+
2022-02-11 10:39:36.113 DEBUG 15483 --- [actor-tcp-nio-2] d.m.r.mysql.client.MessageDuplexCodec    : Decode context change to DecodeContext-Result
2022-02-11 10:39:36.113 DEBUG 15483 --- [actor-tcp-nio-2] d.m.r.m.m.server.MetadataDecodeContext   : Respond a metadata bundle by filled-up
2022-02-11 10:39:36.113 DEBUG 15483 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient    : Response: SyntheticMetadataMessage{completed=false, messages=[DefinitionMetadataMessage{database='family', table='user' (origin:'user'), column='id' (origin:'id'), collationId=63, size=11, type=3, definitions=0, decimals=0}, DefinitionMetadataMessage{database='family', table='user' (origin:'user'), column='name' (origin:'name'), collationId=45, size=400, type=253, definitions=0, decimals=0}], eof=null}
2022-02-11 10:39:36.113 DEBUG 15483 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient    : Response: RowMessage(encoded)
2022-02-11 10:39:36.113 DEBUG 15483 --- [actor-tcp-nio-2] d.m.r.mysql.client.MessageDuplexCodec    : Decode context change to DecodeContext-Command
2022-02-11 10:39:36.114 DEBUG 15483 --- [actor-tcp-nio-2] io.r2dbc.pool.PooledConnection           : Releasing connection
2022-02-11 10:39:36.114 TRACE 15483 --- [actor-tcp-nio-2] o.s.http.codec.json.Jackson2JsonEncoder  : [8cf12dd9-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:49377] Encoding [{id=1, name=张三}]
2022-02-11 10:39:36.115 TRACE 15483 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient    : [id: 0x42250dba, L:/127.0.0.1:49383 - R:localhost/127.0.0.1:3306] READ COMPLETE
2022-02-11 10:39:36.115 DEBUG 15483 --- [ctor-http-nio-2] r.n.http.server.HttpServerOperations     : [8cf12dd9-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:49377] Decreasing pending responses, now 0
2022-02-11 10:39:36.116 TRACE 15483 --- [ctor-http-nio-2] o.s.w.s.adapter.HttpWebHandlerAdapter    : [8cf12dd9-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:49377] Completed 200 OK, headers={masked}
2022-02-11 10:39:36.116 TRACE 15483 --- [ctor-http-nio-2] o.s.h.s.r.ReactorHttpHandlerAdapter      : [8cf12dd9-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:49377] Handling completed
2022-02-11 10:39:36.116 DEBUG 15483 --- [ctor-http-nio-2] r.n.http.server.HttpServerOperations     : [8cf12dd9-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:49377] Last HTTP response frame
2022-02-11 10:39:36.116 DEBUG 15483 --- [ctor-http-nio-2] r.n.http.server.HttpServerOperations     : [8cf12dd9-2, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:49377] Last HTTP packet was sent, terminating the channel
2022-02-11 10:39:36.116 TRACE 15483 --- [ctor-http-nio-2] reactor.netty.channel.ChannelOperations  : [8cf12dd9, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:49377] Disposing ChannelOperation from a channel

(2)如何使用事务?

非响应式事务:
代码在同一线程上执行,默认同步的方式实现。采用ThreadLocal的方式,保存连接和会话信息。

响应式事务:
代码在不同线程上执行,使用Context来代替ThreadLocal。
类似一个不可变的Map<Object, Object>,采用CopyOnWrite策略,绑定在每一个订阅者上。

两种使用方式:
声明式事务

Spring 5.2后,已经支持响应式事务,可以直接使用@Transactional注解。

   @Resourceprivate DatabaseClient db;@Override@Transactionalpublic Mono<Void> insertRows() {     return db.sql("INSERT INTO user (id,name) VALUES(110, 'aaa')").fetch().rowsUpdated().then(db.sql("INSERT INTO user (id,name) VALUES(111, 'aaa')").then());}

主要执行日志:

2022-02-14 10:59:06.077 DEBUG 22687 --- [ctor-http-nio-2] o.s.r.c.R2dbcTransactionManager          : Creating new transaction with name [com.example.service.UserServiceImpl.insertRows]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2022-02-14 10:59:06.077 DEBUG 22687 --- [ctor-http-nio-2] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the driver
2022-02-14 10:59:06.078 DEBUG 22687 --- [ctor-http-nio-2] o.s.r.c.R2dbcTransactionManager          : Acquired Connection [MonoRetry] for R2DBC transaction
2022-02-14 10:59:06.078 DEBUG 22687 --- [ctor-http-nio-2] o.s.r.c.R2dbcTransactionManager          : Switching R2DBC Connection [PooledConnection[dev.miku.r2dbc.mysql.MySqlConnection@4f99ad97]] to manual commit
2022-02-14 10:59:06.078 DEBUG 22687 --- [ctor-http-nio-2] d.m.r.mysql.client.ReactorNettyClient    : Request: SimpleQueryMessage{sql=REDACTED}+-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 53 45 54 20 61 75 74 6f 63 6f 6d 6d 69 74 3d |.SET autocommit=|
|00000010| 30                                              |0               |
+--------+-------------------------------------------------+----------------++-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 42 45 47 49 4e                               |.BEGIN          |
+--------+-------------------------------------------------+----------------+
2022-02-14 10:59:06.080 TRACE 22687 --- [actor-tcp-nio-2] o.s.t.i.TransactionInterceptor           : Getting transaction for [com.example.service.UserServiceImpl.insertRows]
2022-02-14 10:59:06.081 DEBUG 22687 --- [actor-tcp-nio-2] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [INSERT INTO user (id,name) VALUES(110, 'aaa')]
2022-02-14 10:59:06.081 TRACE 22687 --- [actor-tcp-nio-2] o.s.r2dbc.core.DefaultDatabaseClient     : Expanded SQL [INSERT INTO user (id,name) VALUES(110, 'aaa')]
+-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 49 4e 53 45 52 54 20 49 4e 54 4f 20 75 73 65 |.INSERT INTO use|
|00000010| 72 20 28 69 64 2c 6e 61 6d 65 29 20 56 41 4c 55 |r (id,name) VALU|
|00000020| 45 53 28 31 31 30 2c 20 27 61 61 61 27 29       |ES(110, 'aaa')  |
+--------+-------------------------------------------------+----------------+
2022-02-14 10:59:06.082 DEBUG 22687 --- [actor-tcp-nio-2] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [INSERT INTO user (id,name) VALUES(111, 'aaa')]
2022-02-14 10:59:06.082 TRACE 22687 --- [actor-tcp-nio-2] o.s.r2dbc.core.DefaultDatabaseClient     : Expanded SQL [INSERT INTO user (id,name) VALUES(111, 'aaa')]
2022-02-14 10:59:06.082 DEBUG 22687 --- [actor-tcp-nio-2] dev.miku.r2dbc.mysql.MySqlConnection     : Create a simple statement provided by text query
2022-02-14 10:59:06.082 DEBUG 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient    : Request: SimpleQueryMessage{sql=REDACTED}
2022-02-14 10:59:06.083 TRACE 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient    : [id: 0xc62edac7, L:/127.0.0.1:61272 - R:localhost/127.0.0.1:3306] WRITE: 46B+-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 49 4e 53 45 52 54 20 49 4e 54 4f 20 75 73 65 |.INSERT INTO use|
|00000010| 72 20 28 69 64 2c 6e 61 6d 65 29 20 56 41 4c 55 |r (id,name) VALU|
|00000020| 45 53 28 31 31 31 2c 20 27 61 61 61 27 29       |ES(111, 'aaa')  |
+--------+-------------------------------------------------+----------------+
2022-02-14 10:59:06.083 DEBUG 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.MessageDuplexCodec    : Decode context change to DecodeContext-Command
2022-02-14 10:59:06.083 TRACE 22687 --- [actor-tcp-nio-2] o.s.t.i.TransactionInterceptor           : Completing transaction for [com.example.service.UserServiceImpl.insertRows]
2022-02-14 10:59:06.083 DEBUG 22687 --- [actor-tcp-nio-2] o.s.r.c.R2dbcTransactionManager          : Initiating transaction commit
2022-02-14 10:59:06.083 DEBUG 22687 --- [actor-tcp-nio-2] o.s.r.c.R2dbcTransactionManager          : Committing R2DBC transaction on Connection [PooledConnection[dev.miku.r2dbc.mysql.MySqlConnection@4f99ad97]]
2022-02-14 10:59:06.083 DEBUG 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient    : Request: SimpleQueryMessage{sql=REDACTED}
2022-02-14 10:59:06.084 TRACE 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient    : [id: 0xc62edac7, L:/127.0.0.1:61272 - R:localhost/127.0.0.1:3306] WRITE: 4B+-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 07 00 00 00                                     |....            |
+--------+-------------------------------------------------+----------------+
2022-02-14 10:59:06.084 TRACE 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient    : [id: 0xc62edac7, L:/127.0.0.1:61272 - R:localhost/127.0.0.1:3306] WRITE: 7B+-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 43 4f 4d 4d 49 54                            |.COMMIT         |
+--------+-------------------------------------------------+----------------+2022-02-14 10:59:06.098 TRACE 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient    : [id: 0xc62edac7, L:/127.0.0.1:61272 - R:localhost/127.0.0.1:3306] WRITE: 17B+-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 53 45 54 20 61 75 74 6f 63 6f 6d 6d 69 74 3d |.SET autocommit=|
|00000010| 31                                              |1               |2022-02-14 10:59:06.099 DEBUG 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.MessageDuplexCodec    : Decode context change to DecodeContext-Command
2022-02-14 10:59:06.099 DEBUG 22687 --- [actor-tcp-nio-2] o.s.r.c.R2dbcTransactionManager          : Releasing R2DBC Connection [PooledConnection[dev.miku.r2dbc.mysql.MySqlConnection@4f99ad97]] after transaction
2022-02-14 10:59:06.099 DEBUG 22687 --- [actor-tcp-nio-2] io.r2dbc.pool.PooledConnection           : Releasing connection若执行异常,需要回滚时,日志如下:
2022-02-14 11:10:31.410 DEBUG 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient    : Response: ErrorMessage{errorCode=1062, sqlState='23000', errorMessage='Duplicate entry '110' for key 'PRIMARY''}
2022-02-14 11:10:31.419 TRACE 22687 --- [actor-tcp-nio-2] o.s.t.i.TransactionInterceptor           : Completing transaction for [com.example.service.UserServiceImpl.insertRows] after exception: org.springframework.dao.DataIntegrityViolationException: execute; SQL [INSERT INTO user (id,name) VALUES(110, 'aaa')]; Duplicate entry '110' for key 'PRIMARY'; nested exception is io.r2dbc.spi.R2dbcDataIntegrityViolationException: [1062] [23000] Duplicate entry '110' for key 'PRIMARY'
2022-02-14 11:10:31.421 DEBUG 22687 --- [actor-tcp-nio-2] o.s.r.c.R2dbcTransactionManager          : Initiating transaction rollback
2022-02-14 11:10:31.421 DEBUG 22687 --- [actor-tcp-nio-2] o.s.r.c.R2dbcTransactionManager          : Rolling back R2DBC transaction on Connection [PooledConnection[dev.miku.r2dbc.mysql.MySqlConnection@47e30501]]
2022-02-14 11:10:31.421 DEBUG 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient    : Request: SimpleQueryMessage{sql=REDACTED}
2022-02-14 11:10:31.422 TRACE 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient    : [id: 0x55800452, L:/127.0.0.1:61274 - R:localhost/127.0.0.1:3306] WRITE: 4B+-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 09 00 00 00                                     |....            |
+--------+-------------------------------------------------+----------------+
2022-02-14 11:10:31.422 TRACE 22687 --- [actor-tcp-nio-2] d.m.r.mysql.client.ReactorNettyClient    : [id: 0x55800452, L:/127.0.0.1:61274 - R:localhost/127.0.0.1:3306] WRITE: 9B+-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 52 4f 4c 4c 42 41 43 4b                      |.ROLLBACK       |
+--------+-------------------------------------------------+----------------+

具体源码查看,在TransactionAspectSupport类中。
其中,根据ReactiveTransactionManager接口类型,决定走响应式事务或非响应式事务。下面在编程式事务中提到的R2dbcTransactionManager就实现了ReactiveTransactionManager接口。

   protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final TransactionAspectSupport.InvocationCallback invocation) throws Throwable {TransactionAttributeSource tas = this.getTransactionAttributeSource();TransactionAttribute txAttr = tas != null ? tas.getTransactionAttribute(method, targetClass) : null;TransactionManager tm = this.determineTransactionManager(txAttr);if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {boolean isSuspendingFunction = KotlinDetector.isSuspendingFunction(method);boolean hasSuspendingFlowReturnType = isSuspendingFunction && "kotlinx.coroutines.flow.Flow".equals((new MethodParameter(method, -1)).getParameterType().getName());if (isSuspendingFunction && !(invocation instanceof TransactionAspectSupport.CoroutinesInvocationCallback)) {throw new IllegalStateException("Coroutines invocation not supported: " + method);} else {TransactionAspectSupport.CoroutinesInvocationCallback corInv = isSuspendingFunction ? (TransactionAspectSupport.CoroutinesInvocationCallback)invocation : null;TransactionAspectSupport.ReactiveTransactionSupport txSupport = (TransactionAspectSupport.ReactiveTransactionSupport)this.transactionSupportCache.computeIfAbsent(method, (key) -> {Class<?> reactiveType = isSuspendingFunction ? (hasSuspendingFlowReturnType ? Flux.class : Mono.class) : method.getReturnType();ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(reactiveType);if (adapter == null) {throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " + method.getReturnType());} else {return new TransactionAspectSupport.ReactiveTransactionSupport(adapter);}});TransactionAspectSupport.InvocationCallback callback = invocation;if (corInv != null) {callback = () -> {return CoroutinesUtils.invokeSuspendingFunction(method, corInv.getTarget(), corInv.getArguments());};}Object result = txSupport.invokeWithinTransaction(method, targetClass, callback, txAttr, (ReactiveTransactionManager)tm);if (corInv != null) {Publisher<?> pr = (Publisher)result;return hasSuspendingFlowReturnType ? TransactionAspectSupport.KotlinDelegate.asFlow(pr) : TransactionAspectSupport.KotlinDelegate.awaitSingleOrNull(pr, corInv.getContinuation());} else {return result;}}} else {//略}}

编程式事务
与声明式事务相比,编程式事务使用的较少,因为配置相对繁琐。但可以帮助理解执行逻辑。

 @ResourceConnectionFactory connectionFactory;@Overridepublic Mono<Void> insertRows() {ReactiveTransactionManager tm=new R2dbcTransactionManager(connectionFactory);DatabaseClient db=DatabaseClient.create(connectionFactory);TransactionalOperator transactionalOperator=TransactionalOperator.create(tm);return db.sql("INSERT INTO user (id,name) VALUES(119, 'bbb')").fetch().rowsUpdated().then(db.sql("INSERT INTO user (id,name) VALUES(111, 'aaa')").then()).as(transactionalOperator::transactional);}

R2dbcTransactionManager:响应式事务管理器。
DatabaseClient:客户端使用R2DBC驱动程序提供对SQL数据库的访问。
TransactionalOperator:事务运算符将所有上游R2DBC发布者与事务上下文相关联。

(3)线程池配置

为了提升数据库的执行效率,减少建立连接的开销,一般数据库连接都会有连接池的概念,同样的r2dbc也有一个叫做r2dbc-pool的连接池。

<dependency><groupId>io.r2dbc</groupId><artifactId>r2dbc-pool</artifactId><version>${version}</version>
</dependency>

五、远程调用

Spring webflux提供WebClient进行远程调用,这里使用webclient做示例,okhttp可以后面作补充。

简单服务

 WebClient webClient= WebClient.builder().build();@GetMapping("/remote/webClient")public Flux<User> queryByWebclient(){log.info("invoke start");Flux<User> list=webClient.get().uri(url).retrieve().bodyToFlux(User.class);log.info("invoke end");return list;}

主要执行流程:
main thread 打印 invoke start invoke end
创建线程池,进行远程调用,初始化pipeline。
连接调用url,建立channel。
接收响应,释放channel。

2022-02-15 13:58:32.231 DEBUG 27846 --- [ctor-http-nio-5] s.w.r.r.m.a.RequestMappingHandlerMapping : [7a8c56a7-1, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:61046] Mapped to com.example.controller.MyController#queryByWebclient()
2022-02-15 13:58:32.231  INFO 27846 --- [ctor-http-nio-5] com.example.controller.MyController      : invoke start
2022-02-15 13:58:32.232  INFO 27846 --- [ctor-http-nio-5] com.example.controller.MyController      : invoke end
2022-02-15 13:58:32.234 DEBUG 27846 --- [ctor-http-nio-5] o.s.w.r.r.m.a.ResponseBodyResultHandler  : [7a8c56a7-1, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:61046] Using 'application/json' given [text/html, application/xhtml+xml, image/avif, image/webp, image/apng, application/xml;q=0.9, application/signed-exchange;v=b3;q=0.9, */*;q=0.8] and supported [application/json, application/*+json, application/x-ndjson, text/event-stream]
2022-02-15 13:58:32.234 DEBUG 27846 --- [ctor-http-nio-5] o.s.w.r.r.m.a.ResponseBodyResultHandler  : [7a8c56a7-1, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:61046] 0..N [com.example.model.User]
2022-02-15 13:58:32.236 TRACE 27846 --- [ctor-http-nio-5] o.s.w.r.f.client.ExchangeFunctions       : [6c2270b1] HTTP GET http://localhost:8082/web/test?str=webclient, headers={}
2022-02-15 13:58:32.247 DEBUG 27846 --- [ctor-http-nio-5] r.n.resources.PooledConnectionProvider   : [ae492444] Created a new pooled channel, now: 0 active connections, 0 inactive connections and 0 pending acquire requests.
2022-02-15 13:58:32.247 DEBUG 27846 --- [ctor-http-nio-5] reactor.netty.transport.TransportConfig  : [ae492444] Initialized pipeline DefaultChannelPipeline{(reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.left.httpDecompressor = io.netty.handler.codec.http.HttpContentDecompressor), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2022-02-15 13:58:32.252 DEBUG 27846 --- [ctor-http-nio-5] r.netty.transport.TransportConnector     : [ae492444] Connecting to [localhost/127.0.0.1:8082].
2022-02-15 13:58:32.252 DEBUG 27846 --- [ctor-http-nio-5] r.n.r.DefaultPooledConnectionProvider    : [ae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] Registering 。 release on close event for channel
2022-02-15 13:58:32.252 DEBUG 27846 --- [ctor-http-nio-5] r.n.resources.PooledConnectionProvider   : [ae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] Channel connected, now: 1 active connections, 0 inactive connections and 0 pending acquire requests.
2022-02-15 13:58:32.252 DEBUG 27846 --- [ctor-http-nio-5] r.n.r.DefaultPooledConnectionProvider    : [ae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] onStateChange(PooledConnection{channel=[id: 0xae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082]}, [connected])
2022-02-15 13:58:32.252 DEBUG 27846 --- [ctor-http-nio-5] r.n.r.DefaultPooledConnectionProvider    : [ae492444-1, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] onStateChange(GET{uri=/, connection=PooledConnection{channel=[id: 0xae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082]}}, [configured])
2022-02-15 13:58:32.253 DEBUG 27846 --- [ctor-http-nio-5] r.netty.http.client.HttpClientConnect    : [ae492444-1, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] Handler is being applied: {uri=http://localhost:8082/web/test?str=webclient, method=GET}
2022-02-15 13:58:32.253 DEBUG 27846 --- [ctor-http-nio-5] r.n.r.DefaultPooledConnectionProvider    : [ae492444-1, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] onStateChange(GET{uri=/web/test?str=webclient, connection=PooledConnection{channel=[id: 0xae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082]}}, [request_prepared])
2022-02-15 13:58:32.254 DEBUG 27846 --- [ctor-http-nio-5] r.n.r.DefaultPooledConnectionProvider    : [ae492444-1, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] onStateChange(GET{uri=/web/test?str=webclient, connection=PooledConnection{channel=[id: 0xae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082]}}, [request_sent])
2022-02-15 13:58:33.384 DEBUG 27846 --- [ctor-http-nio-5] r.n.http.client.HttpClientOperations     : [ae492444-1, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] Received response (auto-read:false) : [Date=Tue, 15 Feb 2022 05:58:33 GMT, content-length=0]
2022-02-15 13:58:33.384 DEBUG 27846 --- [ctor-http-nio-5] r.n.r.DefaultPooledConnectionProvider    : [ae492444-1, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] onStateChange(GET{uri=/web/test?str=webclient, connection=PooledConnection{channel=[id: 0xae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082]}}, [response_received])
2022-02-15 13:58:33.384 TRACE 27846 --- [ctor-http-nio-5] o.s.w.r.f.client.ExchangeFunctions       : [6c2270b1] [ae492444-1, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] Response 200 OK, headers={masked}
2022-02-15 13:58:33.385 DEBUG 27846 --- [ctor-http-nio-5] reactor.netty.channel.FluxReceive        : [ae492444-1, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] FluxReceive{pending=0, cancelled=false, inboundDone=false, inboundError=null}: subscribing inbound receiver
2022-02-15 13:58:33.386 DEBUG 27846 --- [ctor-http-nio-5] r.n.http.client.HttpClientOperations     : [ae492444-1, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] Received last HTTP packet
2022-02-15 13:58:33.387 TRACE 27846 --- [ctor-http-nio-5] reactor.netty.channel.ChannelOperations  : [ae492444, L:/127.0.0.1:61048 -
2022-02-15 13:58:33.432 TRACE 27846 --- [ctor-http-nio-5] o.s.http.codec.json.Jackson2JsonEncoder  : [7a8c56a7-1, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:61046] Encoding [[]]
2022-02-15 13:58:33.433 DEBUG 27846 --- [ctor-http-nio-5] r.n.r.DefaultPooledConnectionProvider    : [ae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] onStateChange(GET{uri=/web/test?str=webclient, connection=PooledConnection{channel=[id: 0xae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082]}}, [response_completed])
2022-02-15 13:58:33.434 DEBUG 27846 --- [ctor-http-nio-5] r.n.r.DefaultPooledConnectionProvider    : [ae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] onStateChange(GET{uri=/web/test?str=webclient, connection=PooledConnection{channel=[id: 0xae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082]}}, [disconnecting])
2022-02-15 13:58:33.434 DEBUG 27846 --- [ctor-http-nio-5] r.n.r.DefaultPooledConnectionProvider    : [ae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] Releasing channel
2022-02-15 13:58:33.434 DEBUG 27846 --- [ctor-http-nio-5] r.n.resources.PooledConnectionProvider   : [ae492444, L:/127.0.0.1:61048 - R:localhost/127.0.0.1:8082] Channel cleaned, now: 0 active connections, 1 inactive connections and 0 pending acquire requests.
2022-02-15 13:58:33.434 TRACE 27846 --- [ctor-http-nio-5] o.s.w.s.adapter.HttpWebHandlerAdapter    : [7a8c56a7-1, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:61046] Completed 200 OK, headers={masked}
2022-02-15 13:58:33.434 TRACE 27846 --- [ctor-http-nio-5] o.s.h.s.r.ReactorHttpHandlerAdapter      : [7a8c56a7-1, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:61046] Handling completed
2022-02-15 13:58:33.434 DEBUG 27846 --- [ctor-http-nio-5] r.n.http.server.HttpServerOperations     : [7a8c56a7-1, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:61046] Last HTTP response frame
2022-02-15 13:58:33.434 DEBUG 27846 --- [ctor-http-nio-5] r.n.http.server.HttpServerOperations     : [7a8c56a7-1, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:61046] Decreasing pending responses, now 0
2022-02-15 13:58:33.434 DEBUG 27846 --- [ctor-http-nio-5] r.n.http.server.HttpServerOperations     : [7a8c56a7-1, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:61046] Last HTTP packet was sent, terminating the channel
2022-02-15 13:58:33.435 TRACE 27846 --- [ctor-http-nio-5] reactor.netty.channel.ChannelOperations  : [7a8c56a7, L:/0:0:0:0:0:0:0:1:20001 - R:/0:0:0:0:0:0:0:1:61046] Disposing ChannelOperation from a channel

首次进行远程调用,会创建一个http的客户端线程池,再次发起远程调用时,会直接使用现有线程池。

leasingStrategy:FIFO 先进先出。
evictionInterval:实例销毁的间隔 0s。
maxConnections:最大连接数,500。
maxIdleTime: 最大空闲时间
maxLifeTime: 最大存活时间
metricsEnabled: 是否启用指标
pendingAcquireMaxCount:
pendingAcquireTimeout:

Creating a new [http] client pool [PoolFactory{evictionInterval=PT0S, leasingStrategy=fifo, maxConnections=500, maxIdleTime=-1, maxLifeTime=-1, metricsEnabled=false, pendingAcquireMaxCount=1000, pendingAcquireTimeout=45000}] for [localhost:8082]

Created a new pooled channel, now: 0 active connections, 0 inactive connections and 0 pending acquire requests.

依赖服务
如果两个远程调用方法之间有彼此依赖关系,应该如何使用?比如rpcB需要使用rpcA返回的结果值做处理。

示例:

@GetMapping("/remote/webClient")public Flux<Integer> queryByWebclient(){log.info("invoke start");long start =System.nanoTime();Flux<Integer> userFlux = userService.getListDependency(0 ).flatMap(k -> userService.getListDependency(k));userFlux.subscribe(k-> System.out.println(" controller time"+(System.nanoTime()-start)));log.info("invoke end");return userFlux;}
//远程调用的service
public Flux<Integer> getListDependency(Integer str) {Flux<Integer> list=webClient.get().uri(url+str).retrieve().bodyToFlux(Integer.class);list.subscribe(k-> System.out.print("service time"+new Date()));return list;}

六、与Spring MVC的比较

示例
SpringMVC方式

   @GetMapping("/list/db")public Map<String, User> getListFromDB(){log.info("enter controller");Map<String, User> listBySpringMVC = userService.getListBySpringMVC();log.info("end controller");return listBySpringMVC;}@Overridepublic Map<String,User> getListBySpringMVC() {log.info("enter service");//无实际意义,方便打印日志查看。System.out.println("this is userMap");log.info("end service");return userMap;}

日志执行结果:

WebFlux的响应式处理:

 @GetMapping("/list/reactor")public Flux<User> getListReactor(){log.info("enter controller");Flux<User> listFromDB = userService.getListByReactor();log.info("end controller");return listFromDB;}@Overridepublic  Flux<User> getListByReactor() {log.info("enter service");List<User> userList = new ArrayList<>();Set<Map.Entry<String,User>> entries =userMap.entrySet();entries.stream().forEach(entry->userList.add(entry.getValue()));Flux<User> userFlux = Flux.fromStream(() -> createStr(userList));log.info("end service");return userFlux;}private Stream<User> createStr(List<User> userList) {try {System.out.println("start sleep");TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {}return userList.stream();}

执行结果:

小结:
1、WebFlux是异步处理的,SpringMVC是同步的,但也可以实现异步非阻塞。
2、Servlet3.1已经支持异步非阻塞了,但无论是回调或是使用CompletableFuture,代码实现起来会比较复杂。WebFlux提供的API会更加方便。Spring WebFlux在应对高并发的请求时,借助于异步IO,能够以少量而稳定的线程处理更高吞吐量的请求。

七、扩展

应用:

Gateway是基于Webflux实现的,它通过扩展HandlerMapping与WebHandler来处理用户的请求,先通过Predicate定位到Router然后在经过FilterChain的过滤处理,最后定位到下层服务。

熔断:
待补充

附录:

https://r2dbc.io/
https://zhuanlan.zhihu.com/p/144258269
https://docs.spring.io/spring-boot/docs/current/reference/html/web.html#web.reactive
https://juejin.cn/post/6844903999141838855

《Pro Spring MVC with WebFlux Web Development in Spring Framework 5 and Spring Boot 2 by Marten Deinum Iuliana Cosmina (z-lib.org)》
《函数式编程思维》

响应式编程之Spring Webflux相关推荐

  1. 响应式编程之二:RxJava概述:在Android平台上开发详解

    RxJava 到底是什么 RxJava 好在哪 API 介绍和原理简析 1. 概念:扩展的观察者模式 观察者模式 RxJava 的观察者模式 2. 基本实现 1) 创建 Observer 2) 创建 ...

  2. Android框架式编程之BufferKnife

    BufferKnife作为框架式编程的重要组成部分,使用BufferKnife能够极大的精简View层面的代码量,并为MVP/MVC方式提供辅助. 一.配置 compile 'com.jakewhar ...

  3. Android框架式编程之MVP架构

    MVP是Google官方发布的Android开发相关的架构知识.本文要讲解的是一种最基本的MVP的实现方式,它使用手动的依赖注入来提供具有本地和远程数据源的存储库.异步任务处理回调. 基本的MVP示例 ...

  4. 反应式编程之flux concatmap

    本文基于project reactor,,reactor-bom版本为Dysprosium-SR4,flux concatMap方法作用是将多个publisher组合起来,然后依次消费,消费的顺序跟传 ...

  5. 艿艿连肝了几个周末,写了一篇贼长的 Spring 响应式 Web 框架 WebFlux!市面第二完整~

    本文在提供完整代码示例,可见 https://github.com/YunaiV/SpringBoot-Labs 的 lab-27 目录. 原创不易,给点个 Star 嘿,一起冲鸭! 1. 概述 友情 ...

  6. 艿艿连肝了几个周末,写了一篇贼长的 Spring 响应式 Web 框架 WebFlux!市面第二完整~...

    点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...

  7. corda_使用Spring WebFlux从Corda节点流式传输数据

    corda 自上次发布以来已经有一段时间了,但我终于回来了! 由于我仍在我的项目中,因此我将再次撰写有关使用Corda的文章. 这次,我们将不再关注Corda,而是将Spring与Corda结合使用. ...

  8. 使用Spring WebFlux从Corda节点流式传输数据

    自上次发布以来已经有一段时间了,但我终于回来了! 由于我仍在我的项目中,因此我将再次撰写有关使用Corda的文章. 这次,我们将不再关注Corda,而是将Spring与Corda结合使用. 更具体地说 ...

  9. ⒈响应式编程 Project Reactor 概述

    文章目录 What is Reactive Programming? Project Reactor & Reactive Programming 总览 Features Best pract ...

最新文章

  1. OpenCV 高级API:TextDetectionModel和TextRecognitionModel
  2. 阿里云大数据利器Maxcompute学习之-假如你使用过hive
  3. php explode txt,PHP explode():字符串转数组
  4. gc java root_C#技术漫谈之垃圾回收机制(GC)
  5. Spring IOC学习心得之源码级分析ContextLoaderListener的作用(IOC容器初始化入口)
  6. 浏览器输入一个url会发生什么
  7. python报表自动化系列 - 获得指定目录下所有文件名
  8. 移动web UI推荐
  9. Java jks转换pem,PEM证书转JKS
  10. android12.0(S) 从SD卡导入vCard文件到通讯录 号码带“-“ 如何把横线去除
  11. 如何实现 JS 运行时的 Inspector 能力
  12. Android传感器、语音识别、定位系统、Google Map API、快捷方式、widget编程总结及示例
  13. Lync 2013兼容性
  14. 冥想基础知识(总结于冥想相关书)
  15. Nodejs纯esm模块的迁移方法、社区冲击、评价浅论
  16. u-checkbox
  17. 卧槽!竟然可以直接白嫖 Github Action 的 2C7G 服务器。。
  18. 图像处理中几个基本的处理方法c#代码实现
  19. 在休斯敦巨型人造湖中,NASA“养”了一只海底变形金刚
  20. 接触NIRVANA Band-涅磐乐队

热门文章

  1. PostgreSQL多维空间对象——cube插件
  2. 跳棋游戏利用树构建合法路径出现循环的解决办法
  3. 全新自适应地址发布页HTML源码
  4. wow语音聊天服务器无法使用,无法正常使用《星际争霸II》语音聊天
  5. c语言左值中有运算符,C语言左值,运算符的优先级以及结合性探讨
  6. 来,吃了许嵩这颗毒药
  7. Revit二次开发手记【1】
  8. DPABISurf使用教材
  9. 2019最新《妙堂Javascript全套项目实战》
  10. mysql的url参数_mysql-url参数