【SpringWebFlux】Project Reactor的使用
Spring Framework从版本5开始,基于Project Reactor支持响应式编程。
Project Reactor是用于在JVM上构建非阻塞应用程序的Reactive库,基于Reactive Streams规范。
Project Reactor是Spring生态系统中响应式的基础,并且与Spring密切合作进行开发。
Spring WebFlux要求Project Reactor作为核心依赖项。
Project Reactor介绍
Project Reactor主要由下面的模块组成:
- Reactor Core:包含响应式类型Flux和Mono,它们实现了Reactive Stream的Publisher接口以及一组可应用于这些类型的运算符。
- Reactor Test:提供一些实用程序来测试响应流。
- Reactor Extra:提供一些额外的Flux运算符。
- Reactor Netty:无阻塞且支持背压的TCP,HTTP和UDP的客户端和服务器。
- Reactor Adapter:用于与其他响应式库(例如RxJava2和Akka Streams)的适配。
- Reactor Kafka:用于Kafka的响应式API,作为Kafka的生产者和消费者。
要想在应用程序中使用Project Reactor,需要在pom.xml中添加如下maven依赖项:
<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.4.0</version>
</dependency>
<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><version>3.4.0</version><scope>test</scope>
</dependency>
Project Reactor的使用
创建Flux和Mono序列
响应流规范只定义了4个接口,即
- Publisher
- Subscriber
- Subscription
- Processor
Project Reactor提供了Publisher接口的实现,即Flux和Mono。
Flux定义了一个通用的响应式流,它可以产生零个、一个或多个元素,乃至无限元素。
package com.morris.spring.webflux.projectreactor;import reactor.core.publisher.Flux;import java.util.Arrays;/*** Flux的使用*/
public class FluxDemo {public static void main(String[] args) {Flux<String> justFlux = Flux.just("hello", "world");justFlux.subscribe(System.out::println);Flux<String> fromArrayFlux = Flux.fromArray(new String[]{"spring", "webflux"});fromArrayFlux.subscribe(System.out::println);Flux<Integer> fromIterableFlux = Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5,6, 7, 8));fromIterableFlux.subscribe(System.out::println);Flux<Integer> rangeFlux = Flux.range(1000, 5);rangeFlux.subscribe(System.out::println);Flux<String> publish = Flux.just("hello", "world");Flux<String> fromFlux = Flux.from(publish);fromFlux.subscribe(System.out::println);}}
与Flux相比,Mono类型定义了一个最多可以生成一个元素的流。
package com.morris.spring.webflux.projectreactor;import reactor.core.publisher.Mono;import java.util.Optional;/*** Mono的使用*/
public class MonoDemo {public static void main(String[] args) {Mono.just("just").subscribe(System.out::println);Mono.fromCallable(() -> "fromCallable").subscribe(System.out::println);Mono.fromSupplier(() -> "fromSupplier").subscribe(System.out::println);Mono.justOrEmpty(null).subscribe(System.out::println);Mono.justOrEmpty(Optional.empty()).subscribe(System.out::println);}
}
订阅响应式流
Flux和Mono提供了对subscribe()方法的基于lambda的重载,简化了订阅的开发。
subscribe()方法的所有重载都返回Disposable接口的实例,可以用于取消基础的订阅过程。
subscribe()的重载方法:
// 忽略所有事件
subscribe();// 只处理onNext事件
subscribe(Consumer<T> dataConsumer);// 处理onNext事件,处理onError事件
subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer);// 处理onNext事件,处理onError事件,处理onComplete事件
subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer,
Runnable completeConsumer);// 处理onNext事件,处理onError事件,处理onComplete事件,可以获得Subscription
subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer,
Runnable completeConsumer, Consumer<Subscription>
subscriptionConsumer);// 走最原始的方式,自己创建Subscriber
subscribe(Subscriber<T> subscriber);
subscribe()的使用:
package com.morris.spring.webflux.projectreactor;import reactor.core.publisher.Flux;/*** 订阅流*/
public class SubscribeDemo {public static void main(String[] args) {Flux<String> justFlux = Flux.just("hello");justFlux.subscribe();System.out.println("---------");Flux<String> justFlux2 = Flux.just("hello");justFlux2.subscribe(System.out::println);System.out.println("---------");Flux.from(subscriber -> {for (int i = 0; i < 5; i++) {subscriber.onNext(i);}subscriber.onError(new Exception("异常测试"));subscriber.onComplete();}).subscribe(item -> System.out.println("onNext:" + item),ex -> System.out.println("异常情况:" + ex));System.out.println("---------");Flux.from(subscriber -> {for (int i = 0; i < 5; i++) {subscriber.onNext(i);}subscriber.onComplete();}).subscribe(item -> System.out.println("onNext:" + item),ex -> System.out.println("异常情况:" + ex),() -> System.out.println("onComplete"));}
}
操作响应式流
使用响应式流,除了需要能够创建和使用流,还必须能够完美地转换和操作。
转换响应式流:
package com.morris.spring.webflux.projectreactor;import reactor.core.publisher.Flux;/*** 响应流的操作*/
public class StreamOperateDemo {public static void main(String[] args) {// mapFlux.range(1, 10).map(t -> "hello" + t).subscribe(System.out::println);// indexFlux.range(1, 10).map(t -> "hello" + t).index().subscribe(t -> System.out.println(t.getT1() + ":" + t.getT2()));// timestampFlux.range(1, 10).map(t -> "hello" + t).timestamp().subscribe(t -> System.out.println(t.getT1() + ":" + t.getT2()));}
}
过滤响应式流:
- filter操作符仅传递满足条件的元素。
- ignoreElements操作符返回Mono并过滤所有元素。结果序列仅在原始序列结束后结束。
- take(n)操作符限制所获取的元素,该方法忽略除前n个元素之外的所有元素。
- takeLast仅返回流的最后一个元素。
- takeUntil(Predicate)传递一个元素直到满足某个条件。
- elementAt(n)只可用于获取序列的第n个元素。
过滤响应式流的使用:
package com.morris.spring.webflux.projectreactor;import reactor.core.publisher.Flux;import java.util.Objects;/*** 响应流的过滤*/
public class StreamFilterDemo {public static void main(String[] args) {// filter操作符仅传递满足条件的元素Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);// ignoreElements操作符返回Mono<T>并过滤所有元素,结果序列仅在原始序列结束后结束Flux.range(1, 10).ignoreElements().subscribe();// take(n)操作符限制所获取的元素,该方法忽略除前n个元素之外的所有元素。Flux.range(1, 10).take(3).subscribe(System.out::println);// takeLast(n)仅返回流的最后n个元素。Flux.range(1, 10).takeLast(3).subscribe(System.out::println);// takeUntil(Predicate)一直传递直到满足某个条件。Flux.range(1, 10).takeUntil( t -> Objects.equals(t, 5)).subscribe(System.out::println);// elementAt(n)只可用于获取序列的第n个元素。Flux.range(1, 10).elementAt(3).subscribe(System.out::println);}
}
错误处理
响应式流的语义定义了onError()事件是一个终止操作,该操作之后响应式流会停止执行。
不对异常进行处理:
Flux.from(subscriber -> subscriber.onError(new RuntimeException("error test"))).subscribe(System.out::println);
运行结果会抛出如下错误:
13:47:32.620 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: error test
Caused by: java.lang.RuntimeException: error testat com.morris.spring.webflux.projectreactor.ErrorHandlerDemo.lambda$main$0(ErrorHandlerDemo.java:11)at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:66)at reactor.core.publisher.Flux.subscribe(Flux.java:8095)at reactor.core.publisher.Flux.subscribeWith(Flux.java:8268)at reactor.core.publisher.Flux.subscribe(Flux.java:8065)at reactor.core.publisher.Flux.subscribe(Flux.java:7989)at reactor.core.publisher.Flux.subscribe(Flux.java:7932)at com.morris.spring.webflux.projectreactor.ErrorHandlerDemo.main(ErrorHandlerDemo.java:12)
可以通过以下方式进行错误处理:
- 在subscribe()中指定onError事件的处理逻辑
- 通过onErrorReturn()指定发生异常后,返回固定值
- 通过onErrorResume()指定异常发生后执行备用工作流
- 通过onErrorMap()捕获异常并将其转换为另一个异常
- 定义一个在发生错误时重新执行的响应式工作流。如果源响应序列发出错误信号,那么retry()会重新订阅该序列。
错误处理的使用:
package com.morris.spring.webflux.projectreactor;import reactor.core.publisher.Flux;/*** 异常处理*/
public class ErrorHandlerDemo {public static void main(String[] args) {// 不对异常进行处理Flux.from(subscriber -> subscriber.onError(new RuntimeException("error test"))).subscribe(System.out::println);// 有异常返回一个固定值Flux.just(10).map(t -> t / 0).onErrorReturn(0).subscribe(System.out::println);// 有异常返回一个备用流Flux.just(10).map(t -> t / 0).onErrorResume(e -> Flux.just(0)).subscribe(System.out::println);// 有异常可将异常封装为一个新的异常Flux.just(10).map(t -> t / 0).onErrorMap(e -> new RuntimeException(e.getMessage())).subscribe(System.out::println);}
}
【SpringWebFlux】Project Reactor的使用相关推荐
- 使用Spring Boot和Project Reactor处理SQS消息
我最近参与了一个项目,在该项目中,我不得不有效地处理通过AWS SQS Queue流入的大量消息. 在这篇文章(可能还有一篇)中,我将介绍使用出色的Project Reactor处理消息的方法. 以下 ...
- 使用Spring Boot和Project Reactor处理SQS消息-第2部分
这是我关于使用Spring Boot和Project Reactor有效处理SQS消息的博客文章的后续文章 我在第一部分中列出了一些方法上的差距. 1.处理SQS客户端调用中的失败 2.该方法一次只能 ...
- Project Reactor展开方法
最近,我的一位同事向我介绍了Project Reactor类型的expand运算符,在这篇文章中,我想介绍几种使用它的方式. 展开分页结果 考虑在名为City的模型上基于Spring Data的存储库 ...
- ⒈响应式编程 Project Reactor 概述
文章目录 What is Reactive Programming? Project Reactor & Reactive Programming 总览 Features Best pract ...
- spring-webflux理解
了解spring-webflux之前,我们要先了解一个词Reactive Streams ,是一套反应式编程 标准 和 规范, Reactive Streams 由以下几个组件组成: 发布者:发布元素 ...
- Reactor响应式编程 之 简介
1 reactor 出现的背景.初衷和要达到什么样的目标 Reactor 项目始于 2012 年. 经过长时间的内部孵化,于 2013 年发布 Reactor 1.x 版本. Reactor 1 在各 ...
- response获取响应内容_Project Reactor 深度解析 - 1. 响应式编程介绍,实现以及现有问题
现在, Java 的各种基于 Reactor 模型的响应式编程库或者框架越来越多了,像是 RxJava,Project Reactor,Vert.x 等等等等.在 Java 9, Java 也引入了自 ...
- webflux系列--reactor源码(一)
文章目录 基础 顶级接口 reactor 核心原理 core 声明阶段 FluxOperator OptimizableOperator InternalFluxOperator InnerOpera ...
- reactor使用方法_Project Reactor展开方法
reactor使用方法 最近,我的一位同事向我介绍了Project Reactor类型的expand运算符,在这篇文章中,我想介绍几种使用它的方式. 展开分页结果 考虑在名为City的模型上基于Spr ...
最新文章
- webapi控制器怎么接收json_一个秒杀系统的登录系统到底是怎么工作的
- MySQL支持的分区类型
- 单源最短路——dijkstra算法
- linux文件操作常见考题_linux试题
- 1.3. kermit
- 团队项目—每日记录2
- 服务器芯片镜像测试,模拟镜像服务器磁盘问题的两个测试【转】
- 使用VB.net建立excel文件
- C语言 空指针 NULL - C语言零基础入门教程
- 用c语言编程求分数和,用C语言编程平均分数
- 详解java类的生命周期 .
- JAVA开发的APP怎么上传_苹果App Store上传应用流程详解
- 简单的Python少儿编程
- 关于html5毕业论文设计任务书,毕业论文设计任务书(精选多篇)
- IDE,SCSI,SATA硬盘接口比较
- AndroidStudio打包AAR供Unity使用流程
- android 压力和温度 传感器测试,通过智能无源传感器,实现监测温度、湿度或压力...
- 如何判断样本标注的靠谱程度?置信度学习(CL)简述
- Linux 日历和计算器命令
- 项目实战:Qt+Android模拟操作器(模拟操作app,打开,点击,输入,获取验证码等等)
热门文章
- 每日必做8月17日更新
- 2022-2028年中国羽绒工业调研分析及投资战略研究报告
- EXCEL-VBA:选中单元格后,聚光灯效果
- C语言分段函数程序(示例)
- 矩阵和稀疏矩阵相互转化(数组方式实现)
- 为了节约用电,将用户的用电量分成3个区间,针对不同的区间给出不同的收费标准。Java代码
- php获取图片的高和宽,PHP 获取图片的宽和高
- ApacheCN C/C++ 译文集 20211201 更新
- 从“弄潮儿”到“追风者”,康师傅还能引领市场吗?
- suse 添加网络源、本地源