Spring Framework从版本5开始,基于Project Reactor支持响应式编程。

Project Reactor是用于在JVM上构建非阻塞应用程序的Reactive库,基于Reactive Streams规范。

Project Reactor是Spring生态系统中响应式的基础,并且与Spring密切合作进行开发。

Spring WebFlux要求Project Reactor作为核心依赖项。

Project Reactor介绍

Project Reactor主要由下面的模块组成:

  • Reactor Core:包含响应式类型Flux和Mono,它们实现了Reactive Stream的Publisher接口以及一组可应用于这些类型的运算符。
  • Reactor Test:提供一些实用程序来测试响应流。
  • Reactor Extra:提供一些额外的Flux运算符。
  • Reactor Netty:无阻塞且支持背压的TCP,HTTP和UDP的客户端和服务器。
  • Reactor Adapter:用于与其他响应式库(例如RxJava2和Akka Streams)的适配。
  • Reactor Kafka:用于Kafka的响应式API,作为Kafka的生产者和消费者。

要想在应用程序中使用Project Reactor,需要在pom.xml中添加如下maven依赖项:

<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.4.0</version>
</dependency>
<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><version>3.4.0</version><scope>test</scope>
</dependency>

Project Reactor的使用

创建Flux和Mono序列

响应流规范只定义了4个接口,即

  • Publisher
  • Subscriber
  • Subscription
  • Processor

Project Reactor提供了Publisher接口的实现,即Flux和Mono。

Flux定义了一个通用的响应式流,它可以产生零个、一个或多个元素,乃至无限元素。

package com.morris.spring.webflux.projectreactor;import reactor.core.publisher.Flux;import java.util.Arrays;/*** Flux的使用*/
public class FluxDemo {public static void main(String[] args) {Flux<String> justFlux = Flux.just("hello", "world");justFlux.subscribe(System.out::println);Flux<String> fromArrayFlux = Flux.fromArray(new String[]{"spring", "webflux"});fromArrayFlux.subscribe(System.out::println);Flux<Integer> fromIterableFlux = Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5,6, 7, 8));fromIterableFlux.subscribe(System.out::println);Flux<Integer> rangeFlux = Flux.range(1000, 5);rangeFlux.subscribe(System.out::println);Flux<String> publish = Flux.just("hello", "world");Flux<String> fromFlux = Flux.from(publish);fromFlux.subscribe(System.out::println);}}

与Flux相比,Mono类型定义了一个最多可以生成一个元素的流。

package com.morris.spring.webflux.projectreactor;import reactor.core.publisher.Mono;import java.util.Optional;/*** Mono的使用*/
public class MonoDemo {public static void main(String[] args) {Mono.just("just").subscribe(System.out::println);Mono.fromCallable(() -> "fromCallable").subscribe(System.out::println);Mono.fromSupplier(() -> "fromSupplier").subscribe(System.out::println);Mono.justOrEmpty(null).subscribe(System.out::println);Mono.justOrEmpty(Optional.empty()).subscribe(System.out::println);}
}

订阅响应式流

Flux和Mono提供了对subscribe()方法的基于lambda的重载,简化了订阅的开发。

subscribe()方法的所有重载都返回Disposable接口的实例,可以用于取消基础的订阅过程。

subscribe()的重载方法:

// 忽略所有事件
subscribe();// 只处理onNext事件
subscribe(Consumer<T> dataConsumer);// 处理onNext事件,处理onError事件
subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer);// 处理onNext事件,处理onError事件,处理onComplete事件
subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer,
Runnable completeConsumer);// 处理onNext事件,处理onError事件,处理onComplete事件,可以获得Subscription
subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer,
Runnable completeConsumer, Consumer<Subscription>
subscriptionConsumer);// 走最原始的方式,自己创建Subscriber
subscribe(Subscriber<T> subscriber);

subscribe()的使用:

package com.morris.spring.webflux.projectreactor;import reactor.core.publisher.Flux;/*** 订阅流*/
public class SubscribeDemo {public static void main(String[] args) {Flux<String> justFlux = Flux.just("hello");justFlux.subscribe();System.out.println("---------");Flux<String> justFlux2 = Flux.just("hello");justFlux2.subscribe(System.out::println);System.out.println("---------");Flux.from(subscriber -> {for (int i = 0; i < 5; i++) {subscriber.onNext(i);}subscriber.onError(new Exception("异常测试"));subscriber.onComplete();}).subscribe(item -> System.out.println("onNext:" + item),ex -> System.out.println("异常情况:" + ex));System.out.println("---------");Flux.from(subscriber -> {for (int i = 0; i < 5; i++) {subscriber.onNext(i);}subscriber.onComplete();}).subscribe(item -> System.out.println("onNext:" + item),ex -> System.out.println("异常情况:" + ex),() -> System.out.println("onComplete"));}
}

操作响应式流

使用响应式流,除了需要能够创建和使用流,还必须能够完美地转换和操作。

转换响应式流:

package com.morris.spring.webflux.projectreactor;import reactor.core.publisher.Flux;/*** 响应流的操作*/
public class StreamOperateDemo {public static void main(String[] args) {// mapFlux.range(1, 10).map(t -> "hello" + t).subscribe(System.out::println);// indexFlux.range(1, 10).map(t -> "hello" + t).index().subscribe(t -> System.out.println(t.getT1() + ":" + t.getT2()));// timestampFlux.range(1, 10).map(t -> "hello" + t).timestamp().subscribe(t -> System.out.println(t.getT1() + ":" + t.getT2()));}
}

过滤响应式流:

  1. filter操作符仅传递满足条件的元素。
  2. ignoreElements操作符返回Mono并过滤所有元素。结果序列仅在原始序列结束后结束。
  3. take(n)操作符限制所获取的元素,该方法忽略除前n个元素之外的所有元素。
  4. takeLast仅返回流的最后一个元素。
  5. takeUntil(Predicate)传递一个元素直到满足某个条件。
  6. elementAt(n)只可用于获取序列的第n个元素。

过滤响应式流的使用:

package com.morris.spring.webflux.projectreactor;import reactor.core.publisher.Flux;import java.util.Objects;/*** 响应流的过滤*/
public class StreamFilterDemo {public static void main(String[] args) {// filter操作符仅传递满足条件的元素Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);// ignoreElements操作符返回Mono<T>并过滤所有元素,结果序列仅在原始序列结束后结束Flux.range(1, 10).ignoreElements().subscribe();// take(n)操作符限制所获取的元素,该方法忽略除前n个元素之外的所有元素。Flux.range(1, 10).take(3).subscribe(System.out::println);// takeLast(n)仅返回流的最后n个元素。Flux.range(1, 10).takeLast(3).subscribe(System.out::println);// takeUntil(Predicate)一直传递直到满足某个条件。Flux.range(1, 10).takeUntil( t -> Objects.equals(t, 5)).subscribe(System.out::println);// elementAt(n)只可用于获取序列的第n个元素。Flux.range(1, 10).elementAt(3).subscribe(System.out::println);}
}

错误处理

响应式流的语义定义了onError()事件是一个终止操作,该操作之后响应式流会停止执行。

不对异常进行处理:

Flux.from(subscriber -> subscriber.onError(new RuntimeException("error test"))).subscribe(System.out::println);

运行结果会抛出如下错误:

13:47:32.620 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: error test
Caused by: java.lang.RuntimeException: error testat com.morris.spring.webflux.projectreactor.ErrorHandlerDemo.lambda$main$0(ErrorHandlerDemo.java:11)at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:66)at reactor.core.publisher.Flux.subscribe(Flux.java:8095)at reactor.core.publisher.Flux.subscribeWith(Flux.java:8268)at reactor.core.publisher.Flux.subscribe(Flux.java:8065)at reactor.core.publisher.Flux.subscribe(Flux.java:7989)at reactor.core.publisher.Flux.subscribe(Flux.java:7932)at com.morris.spring.webflux.projectreactor.ErrorHandlerDemo.main(ErrorHandlerDemo.java:12)

可以通过以下方式进行错误处理:

  1. 在subscribe()中指定onError事件的处理逻辑
  2. 通过onErrorReturn()指定发生异常后,返回固定值
  3. 通过onErrorResume()指定异常发生后执行备用工作流
  4. 通过onErrorMap()捕获异常并将其转换为另一个异常
  5. 定义一个在发生错误时重新执行的响应式工作流。如果源响应序列发出错误信号,那么retry()会重新订阅该序列。

错误处理的使用:

package com.morris.spring.webflux.projectreactor;import reactor.core.publisher.Flux;/*** 异常处理*/
public class ErrorHandlerDemo {public static void main(String[] args) {// 不对异常进行处理Flux.from(subscriber -> subscriber.onError(new RuntimeException("error test"))).subscribe(System.out::println);// 有异常返回一个固定值Flux.just(10).map(t -> t / 0).onErrorReturn(0).subscribe(System.out::println);// 有异常返回一个备用流Flux.just(10).map(t -> t / 0).onErrorResume(e -> Flux.just(0)).subscribe(System.out::println);// 有异常可将异常封装为一个新的异常Flux.just(10).map(t -> t / 0).onErrorMap(e -> new RuntimeException(e.getMessage())).subscribe(System.out::println);}
}

【SpringWebFlux】Project Reactor的使用相关推荐

  1. 使用Spring Boot和Project Reactor处理SQS消息

    我最近参与了一个项目,在该项目中,我不得不有效地处理通过AWS SQS Queue流入的大量消息. 在这篇文章(可能还有一篇)中,我将介绍使用出色的Project Reactor处理消息的方法. 以下 ...

  2. 使用Spring Boot和Project Reactor处理SQS消息-第2部分

    这是我关于使用Spring Boot和Project Reactor有效处理SQS消息的博客文章的后续文章 我在第一部分中列出了一些方法上的差距. 1.处理SQS客户端调用中的失败 2.该方法一次只能 ...

  3. Project Reactor展开方法

    最近,我的一位同事向我介绍了Project Reactor类型的expand运算符,在这篇文章中,我想介绍几种使用它的方式. 展开分页结果 考虑在名为City的模型上基于Spring Data的存储库 ...

  4. ⒈响应式编程 Project Reactor 概述

    文章目录 What is Reactive Programming? Project Reactor & Reactive Programming 总览 Features Best pract ...

  5. spring-webflux理解

    了解spring-webflux之前,我们要先了解一个词Reactive Streams ,是一套反应式编程 标准 和 规范, Reactive Streams 由以下几个组件组成: 发布者:发布元素 ...

  6. Reactor响应式编程 之 简介

    1 reactor 出现的背景.初衷和要达到什么样的目标 Reactor 项目始于 2012 年. 经过长时间的内部孵化,于 2013 年发布 Reactor 1.x 版本. Reactor 1 在各 ...

  7. response获取响应内容_Project Reactor 深度解析 - 1. 响应式编程介绍,实现以及现有问题

    现在, Java 的各种基于 Reactor 模型的响应式编程库或者框架越来越多了,像是 RxJava,Project Reactor,Vert.x 等等等等.在 Java 9, Java 也引入了自 ...

  8. webflux系列--reactor源码(一)

    文章目录 基础 顶级接口 reactor 核心原理 core 声明阶段 FluxOperator OptimizableOperator InternalFluxOperator InnerOpera ...

  9. reactor使用方法_Project Reactor展开方法

    reactor使用方法 最近,我的一位同事向我介绍了Project Reactor类型的expand运算符,在这篇文章中,我想介绍几种使用它的方式. 展开分页结果 考虑在名为City的模型上基于Spr ...

最新文章

  1. webapi控制器怎么接收json_一个秒杀系统的登录系统到底是怎么工作的
  2. MySQL支持的分区类型
  3. 单源最短路——dijkstra算法
  4. linux文件操作常见考题_linux试题
  5. 1.3. kermit
  6. 团队项目—每日记录2
  7. 服务器芯片镜像测试,模拟镜像服务器磁盘问题的两个测试【转】
  8. 使用VB.net建立excel文件
  9. C语言 空指针 NULL - C语言零基础入门教程
  10. 用c语言编程求分数和,用C语言编程平均分数
  11. 详解java类的生命周期 .
  12. JAVA开发的APP怎么上传_苹果App Store上传应用流程详解
  13. 简单的Python少儿编程
  14. 关于html5毕业论文设计任务书,毕业论文设计任务书(精选多篇)
  15. IDE,SCSI,SATA硬盘接口比较
  16. AndroidStudio打包AAR供Unity使用流程
  17. android 压力和温度 传感器测试,通过智能无源传感器,实现监测温度、湿度或压力...
  18. 如何判断样本标注的靠谱程度?置信度学习(CL)简述
  19. Linux 日历和计算器命令
  20. 项目实战:Qt+Android模拟操作器(模拟操作app,打开,点击,输入,获取验证码等等)

热门文章

  1. 每日必做8月17日更新
  2. 2022-2028年中国羽绒工业调研分析及投资战略研究报告
  3. EXCEL-VBA:选中单元格后,聚光灯效果
  4. C语言分段函数程序(示例)
  5. 矩阵和稀疏矩阵相互转化(数组方式实现)
  6. 为了节约用电,将用户的用电量分成3个区间,针对不同的区间给出不同的收费标准。Java代码
  7. php获取图片的高和宽,PHP 获取图片的宽和高
  8. ApacheCN C/C++ 译文集 20211201 更新
  9. 从“弄潮儿”到“追风者”,康师傅还能引领市场吗?
  10. suse 添加网络源、本地源