基本介绍

前面文章响应式编程基本介绍介绍了响应式编程的基础,其中就有响应式流规范,这是比较底层的规范,如果使用响应式流,要基于此开发,还是很有难度的。
而我们每天都在使用的 Spring 就是这样一款支持响应式编程的开发框架,Spring 5 的响应式编程模型以 Project Reactor 库为基础,而后者则实现了响应式流规范。

Reactor 异步数据序列

响应式流规范的基本组件是一个异步的数据序列,在 Reactor 框架中,我们可以把这个异步数据序列表示成如下形式:

上图中的异步序列模型从语义上可以用如下公式表示。

onNext x 0..N [onError | onComplete]

以上公式中包含了三种消息通知,分别对应在异步数据序列执行过程中的三种不同数据处理场景:

  • onNext:表示正常的包含元素的消息通知;
  • onComplete:表示序列结束的消息通知;
  • onError:表示序列出错的消息通知。

当触发这些消息通知时,异步序列的订阅者(Subscriber)中对应的这三个同名方法将被调用。正常情况下,onNext() 和 onComplete() 方法都应该被调用,用来正常消费数据并结束序列。如果没有调用 onComplete() 方法就会生成一个无界数据序列,在业务系统中,这通常是不合理的。而 onError() 方法只有序列出现异常时才会被调用。

基于上述异步数据序列,Reactor 框架提供了两个核心组件来发布数据,分别是 Flux 和 Mono 组件。这两个组件可以说是应用程序开发过程中最基本的编程对象。这篇文章,主要就是介绍 Flux 和 Mono 的基本使用。

通过 Flux 对象创建响应式流

基本介绍

Flux 代表的是一个包含 0 到 n 个元素的异步序列,Reactor 官网给出了它弹珠图(Marble Diagram)示意图:

类 Flux 说明,或者直接通过 IDEA 工具查看。

  • operator:代表操作符,对数据进行正常操作。
  • 红色的叉号:代表异常。
  • 最后的一个符号:代表序列正常结束。
    这三种消息,和数据系列完全一致,即:序列的三种消息通知都适用于 Flux。

创建 Flux 的方式非常多,但是大体可以分成两大类:一类是 基于各种工厂模式的静态创建方法;而另一类则 采用编程的方式 动态创建 Flux。相对而言,静态方法在使用上都比较简单,但不如动态方法来得灵活。

操作符

操作符(operator)并不是响应式流规范的一部分,但为了改进响应式代码的可读性并降低开发成本,Reactor 库中的 API 提供了一组丰富的操作符,这些操作符为响应式流规范提供了最大的附加值。操作符的执行效果如下所示:

在 Reactor 中,可以把操作符分成转换、过滤、组合、条件、数学、日志、调试等几大类,每一类中都提供了一批有用的操作符。尤其是针对转换场景,操作符非常健全。

通过静态方法创建 Flux

Reactor 中静态创建 Flux 的方法常见的包括 just()、range()、interval() 以及各种以 from- 为前缀的方法组等。同时,因为 Flux 可以代表 0 个数据,所以也有一些专门用于创建空序列的工具方法。

  • just() 方法

它可以指定序列中包含的全部元素,创建出来的 Flux 序列在发布这些元素之后会自动结束。一般情况下,在已知元素数量和内容时,使用 just() 方法是创建 Flux 的最简单直接的做法。使用 just() 方法创建 Flux 对象的示例代码如下所示:

Flux.just("data1", "data2").subscribe(System.out::println);

执行以上代码,我们将在系统控制台中得到如下结果:

data1
data2

这里我们对 Flux 执行了用于订阅的 subscribe() 方法,并通过使用 Lambda 表达式调用了 System.out.println() 方法,这意味着将结果打印到系统控制台。

  • fromXXX() 方法组
    如果我们已经有了一个数组、一个 Iterable 对象或 Stream 对象,那么就可以通过 Flux 提供的 fromXXX() 方法组来从这些对象中自动创建 Flux,包括 fromArray()、fromIterable() 和 fromStream() 方法。

上一讲我们提到了 Flux.fromIterable() 方法,这里再给出一个使用 fromArray() 方法创建 Flux 对象的示例代码,如下所示。

Flux.fromArray(new Integer[]{1, 2, 3}).subscribe(System.out::println);List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);
Flux.fromIterable(list).subscribe(System.out::println);

这段代码的执行结果就是在控制台中输出三行记录。

1
2
3
  • range() 方法

如果你快速生成一个整数数据流,那么可以采用 range() 方法,该方法允许我们指定目标整数数据流的起始元素以及所包含的个数,序列中的所有对象类型都是 Integer,这在创建连续的年份信息或序号信息等场景下非常有用。使用 range() 方法创建 Flux 对象的示例代码如下所示。

Flux.range(100, 3).subscribe(System.out::println);

这段代码会在控制台中打印出 3 行记录,从 100 开始,到 102 结束。

100
101
102
  • interval() 方法

在 Reactor 框架中,interval() 方法可以用来生成从 0 开始递增的 Long 对象的数据序列。通过 interval() 所具备的一组重载方法,我们可以分别指定这个数据序列中第一个元素发布之前的延迟时间,以及每个元素之间的时间间隔。interval() 方法相对复杂,其弹珠图如下所示:

可以看到,上图中每个元素发布时相当于添加了一个定时器的效果。使用 interval() 方法的示例代码如下所示。

Flux.interval(Duration.ofSeconds(2), Duration.ofMillis(200)).subscribe(System.out::println);

这段代码的执行效果相当于在等待 2 秒钟之后,生成一个从 0 开始逐一递增的无界数据序列,每 200 毫秒推送一次数据。

  • empty()、error() 和 never()

根据前面介绍的 Reactor 异步数据序列的语义,我们可以分别使用 empty()、error() 和 never() 这三个方法类创建一些特殊的数据序列。其中,如果你希望创建一个只包含结束消息的空序列,那么可以使用 empty() 方法,使用示例如下所示。显然,这时候控制台应该没有任何的输出结果。

Flux.empty().subscribe(System.out::println);

然后,通过 error() 方法可以创建一个只包含错误消息的序列。如果你不希望所创建的序列不发出任何类似的消息通知,也可以使用 never() 方法实现这一目标。当然,这几个方法都比较少用,通常只用于调试和测试。

不难看出,静态创建 Flux 的方法简单直接,一般用于生成那些事先已经定义好的数据序列。而如果数据序列事先无法确定,或者生成过程中包含复杂的业务逻辑,那么就需要用到动态创建方法。

通过动态方法创建 Flux

动态创建 Flux 所采用的就是以编程的方式创建数据序列,最常用的就是 generate() 方法和 create() 方法。

  • generate() 方法

generate() 方法生成 Flux 序列依赖于 Reactor 所提供的 SynchronousSink 组件,定义如下。

public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)

SynchronousSink 组件包括 next()、complete() 和 error() 这三个核心方法。从 SynchronousSink 组件的命名上就能知道它是一个同步的 Sink 组件,也就是说元素的生成过程是同步执行的。这里要注意的是 next() 方法只能最多被调用一次。使用 generate() 方法创建 Flux 的示例代码如下。

Flux.generate(sink -> {sink.next("Tang");sink.complete();
}).subscribe(System.out::println);

运行该段代码,会在系统控制台上得到“Tang”。我们在这里调用了一次 next() 方法,并通过 complete() 方法结束了这个数据流。如果不调用 complete() 方法,那么就会生成一个所有元素均为“Tang”的无界数据流。

如果在上面多次调用 next() 方法,就会报错:Caused by: java.lang.IllegalStateException: More than one call to onNext。

这个示例非常简单,但已经具备了动态创建一个 Flux 序列的能力。如果想要在序列生成过程中引入状态,那么可以使用如下所示的 generate() 方法重载。
generate() 重载方法:

public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) {return onAssembly(new FluxGenerate<>(stateSupplier, generator));
}

使用示例:

Flux.generate(() -> 1, (i, sink) -> {sink.next(i);if (i == 5) {sink.complete();}return ++i;
}).subscribe(System.out::println);

这里我们引入了一个代表中间状态的变量 i,然后根据 i 的值来判断是否终止序列。显然,以上代码的执行效果会在控制台中输入 1 到 5 这 5 个数字。

  • create()

create() 方法与 generate() 方法比较类似,但它使用的是一个 FluxSink 组件,定义如下。

public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)

FluxSink 除了 next()、complete() 和 error() 这三个核心方法外,还定义了背压策略,并且可以在一次调用中产生多个元素。使用 create() 方法创建 Flux 的示例代码如下。

Flux.create(sink -> {for (int i = 0; i < 5; i++) {sink.next("Tang" + i);}sink.complete();
}).subscribe(System.out::println);

运行该程序,我们会在系统控制台上得到从“Tang0”到“Tang4”的 5 个数据。通过 create() 方法创建 Flux 对象的方式非常灵活。
以上就是通过Flux 对象创建响应式流的方法。

通过 Mono 对象创建响应式流

上面介绍了使用 Flux 创建响应式流,除了 Flux 还可以通过 Mono 对象来创建响应式流,其操作如下。

基本介绍

Mono 数据序列中只包含 0 个或 1 个元素,弹珠图(Marble Diagram)如下图所示:

类 Mono 说明,或者直接通过 IDEA 工具查看。

对于 Mono 而言,可以认为它是 Flux 的一种特例,所以很多创建 Flux 的方法同样适用。针对静态创建 Mono 的场景,前面给出的 just()、empty()、error() 和 never() 等方法同样适用。除了这些方法之外,比较常用的还有 justOrEmpty() 等方法。

justOrEmpty() 方法会先判断所传入的对象中是否包含值,只有在传入对象不为空时,Mono 序列才生成对应的元素,该方法示例代码如下。

Mono.justOrEmpty(Optional.of("Tang")).subscribe(System.out::println);

另一方面,如果要想动态创建 Mono,我们同样也可以通过 create() 方法并使用 MonoSink 组件,示例代码如下。

Mono.create(sink -> sink.success("Tang")).subscribe(System.out::println);

订阅响应式流

介绍完如何创建响应式流,接下来就需要讨论如何订阅响应式流。想要订阅响应式流,就需要用到 subscribe() 方法。在前面的示例中我们已经演示了 subscribe 操作符的用法,知道可以通过 subscribe() 方法来添加相应的订阅逻辑。同时,在调用 subscribe() 方法时可以指定需要处理的消息通知类型。正如前面内容所看到的,Flux 和 Mono 提供了一批非常有用的 subscribe() 方法重载方法,大大简化了订阅的开发例程。这些重载方法包括如下几种。

//订阅流的最简单方法,忽略所有消息通知
subscribe();//对每个来自 onNext 通知的值调用 dataConsumer,但不处理 onError 和 onComplete 通知
subscribe(Consumer<T> dataConsumer);//在前一个重载方法的基础上添加对 onError 通知的处理
subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer);//在前一个重载方法的基础上添加对 onComplete 通知的处理
subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer,
Runnable completeConsumer);//这种重载方法允许通过请求足够数量的数据来控制订阅过程
subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer,
Runnable completeConsumer, Consumer<Subscription> subscriptionConsumer);//订阅序列的最通用方式,可以为我们的 Subscriber 实现提供所需的任意行为
subscribe(Subscriber<T> subscriber);

Reactor 中的消息通知类型有三种,即正常消息、错误消息和完成消息。显然,通过上述 subscribe() 重载方法,我们可以只处理其中包含的正常消息,也可以同时处理错误消息和完成消息。例如,下面这段代码示例展示了同时处理正常和错误消息的实现方法。

Mono.just("Tang").concatWith(Mono.error(new IllegalStateException())).subscribe(System.out::println, System.err::println);

以上代码的执行结果如下所示,控制台输出字符串“Tang”,同时输出了 IllegalStateException 这个异常。

Tang
java.lang.IllegalStateException

有时候我们不想直接抛出异常,而是希望采用一种容错策略来返回一个默认值,就可以采用如下方式。

Mono.just("Tang").concatWith(Mono.error(new IllegalStateException())).onErrorReturn("defaultError").subscribe(System.out::println);

以上代码的执行结果如下所示,当产生异常时我们使用 onErrorReturn() 方法返回一个默认值“defaultError”。

Tang
defaultError

另外一种容错策略是通过 switchOnError() 方法使用另外的流来产生元素,以下代码演示了这种策略,执行结果与上面的示例一致。

Mono.just("Tang").concatWith(Mono.error(new IllegalStateException())).switchOnError(Mono.just("defaultError")).subscribe(System.out::println);

这种方式,在后续的版本中已经去掉了,可查看:https://github.com/reactor/reactor-core/issues/535

我们还可以充分利用 Lambda 表达式来使用 subscribe() 方法,例如下面这段代码。

Flux.just("Tang1", "Tang2", "Tang3").subscribe(data -> System.out.println("onNext:" + data), err -> {}, () -> System.out.println("onComplete"));

这段代码的执行效果如下所示,可以看到,我们分别对 onNext 通知和 onComplete 通知进行了处理。

onNext:Tang1
onNext:Tang2
onNext:Tang3
onComplete

二、如何使用 Flux 和 Mono 构建响应式数据流?相关推荐

  1. 02-Spring WebFlux Flux和Mono构建响应式数据流

    在上一节入门课程中,我们知道 SpringWebFlux 是借助Reactor来实现的.该框架实现了响应式流规范.我们知道在响应式流规范中,存在代表发布者的 Publisher 接口,而 Reacto ...

  2. 小菜鸡的html初步教程(第十二章 初步构建响应式网站)

    小菜鸡的第三篇博客  今天是3/19,天气不错,跑到自习室来更新博客. 本系列文章仅仅是对基础的HTML5以及CSS进行讲解,更加详细的内容均会附上链接,以便查阅和版权保护.  昨晚我思考了下,决定对 ...

  3. 分享九款构建响应式网站的最佳PHP框架

    目前,网上有大量的框架供大家选择,本文作者分享了9款各方面都兼具优势的PHP框架,主要用来构建响应式网站,开发人员可以根据自己的需求来选择下面的某一个框架. 1.Symfony 2 Symfony是一 ...

  4. vue如何获取年月日_BootstrapVue——Vue和Bootstrap的相结合,构建响应式应用更简单...

    介绍 BootStrap是世界上最受欢迎的构建响应式移动优先网站的框架,Vue是当前最流行的前端框架之一,BootstrapVue则是将两者相结合,使用BootstrapVue,可以使用Vue.js和 ...

  5. Skeljs – 用于构建响应式网站的前端开发框架

    skelJS 是一个轻量级的前端框架,用于构建响应式站点和应用程序.让设计人员和开发人员可能够使用四个强大的组件:CSS 网格系统,响应式处理程序,CSS 的快捷方式和插件系统. 您可能感兴趣的相关文 ...

  6. 使用 Responsive Elements 快速构建响应式网站

    Responsive Elements 可以使任何元素来适应和应对他们所占据的区域.这是一个轻量的 JavaScript 库,你可以轻松嵌入到你的项目.元素会更具自己的宽度,自动响应和适应空间的增加或 ...

  7. vue css隐藏_Vue+BootStrapV4,构建响应式、移动优先项目——BootstrapVue

    介绍 BootStrap是世界上最受欢迎的构建响应式移动优先网站的框架,Vue是当前最流行的前端框架之一,BootstrapVue则是将两者相结合,使用BootstrapVue,可以使用Vue.js和 ...

  8. 一盒用于构建响应式电子邮件的技巧

    photo: fishbulb1022 照片:fishbulb1022 In my previous article on newsletter authoring we've seen how a ...

  9. bone骨骼转换为cs骨骼_使用骨骼构建响应式布局:从入门开始

    bone骨骼转换为cs骨骼 Dave Gamache的Skeleton Boilerplate为快速,可靠地构建响应式网站提供了完美的基础. 最近,我们将使用Skeleton并根据Webdesignt ...

最新文章

  1. Pinpoint 插件开发
  2. jar - 操作jar包的工具
  3. 两次备考信息系统项目管理师长篇心路历程附考试技巧
  4. java对excel经行读写
  5. UVA 1264 - Binary Search Tree(BST+计数)
  6. C++中的多重继承(二)
  7. yuv420p 详解_YUV格式介绍
  8. Java 并发编程如何入门
  9. 考研没过线也能录取?13种特殊录取方式!
  10. 现代黑科技版“指鹿为马:使用CycleGAN实现男女“无痛变性”
  11. 两个数组合成一个json对象_js把两个json数组根据相同键值合并成一个数组
  12. 【对比Java学Kotlin】协程-创建和取消
  13. 博途软件中多重背景块的建立_TIA博途软件中创建多重背景函数块
  14. TCP的三次握手和四次挥手及常见面试题
  15. php 删除文件 unlink,php 删除文件函数unlink及删除文件夹示例
  16. zotero+better bibtex+overleaf(latex):如何批量导出参考文献及相关设置
  17. Linux - 常见端口和服务的对照和解释
  18. c#设计模式(3)—— 利用观察者模式模拟推送消息
  19. vscode + cmake调试配置
  20. 深度学习是什么?深度学习和神经网络的区别是什么

热门文章

  1. webshell之一句话木马变形
  2. 九阴服务器位置,九阴真经合服注意什么_九阴真经服务器互通升级注意事项_牛游戏网...
  3. Vue实战篇二十九:模拟一个简易留言板
  4. 《黄帝内经.四季调神大论篇》四季养生法
  5. linux系统把驱动编译成.ko模块 insmod动态加载
  6. 我的世界服务器显示未知指令,《我的世界》懂这些作弊码就能一个指令呼风唤雨 瞬间环游世界...
  7. 双色球辅助选号工具测试版本
  8. 【干货】选择外贸邮件群发软件,外贸邮件营销软件,邮件群发代发软件15条建议!
  9. 2015最新款连衣裙的搭配
  10. 计算机密码学思路,密码学中加密算法的研究和实现