目录

  • 前言
  • 一、示例
  • 二、流程
    • 1、构建数据发布者
    • 2、构建数据订阅者
    • 3、建立订阅关系
    • 4、请求数据
    • 5、发布数据
    • 6、发布完成

前言

本文主要同时简单的示例来分析一下Mono在发布订阅过程中的执行流程。

一、示例

 @Testpublic void executeProcessTest() {Mono.just("hello mono").filter(v -> v != null).map(v -> v + " map").defaultIfEmpty("default value").subscribe(System.out::println);}

二、流程

1、构建数据发布者

(1)Mono.just(“hello mono”)

返回 MonoJust,包装值

public static <T> Mono<T> just(T data) {return onAssembly(new MonoJust<>(data));}MonoJust(T value) {this.value = Objects.requireNonNull(value, "value");}

(2)filter

返回 MonoFilterFuseable ,包装 MonoJust 和 predicate

public final Mono<T> filter(final Predicate<? super T> tester) {if (this instanceof Fuseable) {return onAssembly(new MonoFilterFuseable<>(this, tester));}return onAssembly(new MonoFilter<>(this, tester));}MonoFilterFuseable(Mono<? extends T> source, Predicate<? super T> predicate) {super(source);this.predicate = Objects.requireNonNull(predicate, "predicate");}

(3)map

返回 MonoMapFuseable 包装 MonoFilterFuseable 和 mapper

public final <R> Mono<R> map(Function<? super T, ? extends R> mapper) {if (this instanceof Fuseable) {return onAssembly(new MonoMapFuseable<>(this, mapper));}return onAssembly(new MonoMap<>(this, mapper));}MonoMapFuseable(Mono<? extends T> source, Function<? super T, ? extends R> mapper) {super(source);this.mapper = Objects.requireNonNull(mapper, "mapper");}

(4)defaultIfEmpty

返回MonoDefaultIfEmpty,包装 MonoMapFuseable 和 defaultValue

public final Mono<T> defaultIfEmpty(T defaultV) {if (this instanceof Fuseable.ScalarCallable) {try {T v = block();if (v == null) {return Mono.just(defaultV);}}catch (Throwable e) {//leave MonoError returns as this}return this;}return onAssembly(new MonoDefaultIfEmpty<>(this, defaultV));}MonoDefaultIfEmpty(Mono<? extends T> source, T defaultValue) {super(source);this.defaultValue = Objects.requireNonNull(defaultValue, "defaultValue");}

数据发布者的发布流程:

数据 -> MonoJust -> MonoFilterFuseable -> MonoMapFuseable -> MonoDefaultIfEmpty

2、构建数据订阅者

从示例中的 subscribe() 开始

(1) subscribe()

传入 consumer 消费者

public final Disposable subscribe(Consumer<? super T> consumer) {Objects.requireNonNull(consumer, "consumer");return subscribe(consumer, null, null);}public final Disposable subscribe(@Nullable Consumer<? super T> consumer,@Nullable Consumer<? super Throwable> errorConsumer,@Nullable Runnable completeConsumer) {return subscribe(consumer, errorConsumer, completeConsumer, (Context) null);}public final Disposable subscribe(@Nullable Consumer<? super T> consumer,@Nullable Consumer<? super Throwable> errorConsumer,@Nullable Runnable completeConsumer,@Nullable Context initialContext) {return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,completeConsumer, null, initialContext));}

创建 LambdaMonoSubscriber 对象,包装最终的消费者consumer

(2)subscribeWith()

public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {subscribe(subscriber);return subscriber;}
public final void subscribe(Subscriber<? super T> actual) {//最后一层发布者,这里是 MonoDefaultIfEmptyCorePublisher publisher = Operators.onLastAssembly(this);//最后一层订阅者,这里是 LambdaMonoSubscriber CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);//发布者与订阅者建立联系try {if (publisher instanceof OptimizableOperator) {OptimizableOperator operator = (OptimizableOperator) publisher;while (true) {subscriber = operator.subscribeOrReturn(subscriber);if (subscriber == null) {// null means "I will subscribe myself", returning...return;}OptimizableOperator newSource = operator.nextOptimizableSource();if (newSource == null) {publisher = operator.source();break;}operator = newSource;}}//发布者发布数据给订阅者publisher.subscribe(subscriber);}catch (Throwable e) {Operators.reportThrowInSubscribe(subscriber, e);return;}}

(3)发布者与订阅者建立联系的过程

核心方法:

subscriber = operator.subscribeOrReturn(subscriber);

a). operator 是 MonoDefaultIfEmpty,subscriber 是 LambdaMonoSubscriber

返回 DefaultIfEmptySubscriber,作为 LambdaMonoSubscriber 的发布者

@Overridepublic CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {return new FluxDefaultIfEmpty.DefaultIfEmptySubscriber<>(actual, defaultValue);}

b). operator 是 MonoMapFuseable ,subscriber 是 DefaultIfEmptySubscriber

返回 MapFuseableSubscriber,作为 DefaultIfEmptySubscriber 的发布者

public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> actual) {if (actual instanceof ConditionalSubscriber) {ConditionalSubscriber<? super R> cs = (ConditionalSubscriber<? super R>) actual;return new FluxMapFuseable.MapFuseableConditionalSubscriber<>(cs, mapper);}return new FluxMapFuseable.MapFuseableSubscriber<>(actual, mapper);}

c).operator 是 MonoFilterFuseable ,subscriber 是 MapFuseableSubscriber

返回 FilterFuseableSubscriber,作为 MapFuseableSubscriber 的发布者

public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {if (actual instanceof ConditionalSubscriber) {return new FluxFilterFuseable.FilterFuseableConditionalSubscriber<>((ConditionalSubscriber<? super T>) actual, predicate);}return new FluxFilterFuseable.FilterFuseableSubscriber<>(actual, predicate);}

此时发布者与订阅者关系:

FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber -> consumer

3、建立订阅关系

publisher.subscribe(subscriber);

此时publisher 是 MonoJust, subscriber 是 FilterFuseableSubscriber
创建 scalarSubscription ,包装 FilterFuseableSubscriber

 @Overridepublic void subscribe(CoreSubscriber<? super T> actual) {actual.onSubscribe(Operators.scalarSubscription(actual, value));}

根据发布订阅关系依次调用订阅者的 onSubscribe() 建立订阅关系

FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber

进入 LambdaMonoSubscriber 的 onSubscribe()

 @Overridepublic final void onSubscribe(Subscription s) {if (Operators.validate(subscription, s)) {this.subscription = s;if (subscriptionConsumer != null) {try {subscriptionConsumer.accept(s);}catch (Throwable t) {Exceptions.throwIfFatal(t);s.cancel();onError(t);}}else {//请求数据s.request(Long.MAX_VALUE);}}}

4、请求数据

通过订阅关系调用 request() 请求数据,

s.request(Long.MAX_VALUE);

即根据下面的关系链反向请求数据

FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber

最终到了 Operators 类中

     @Overridepublic void request(long n) {if (validate(n)) {if (ONCE.compareAndSet(this, 0, 1)) {Subscriber<? super T> a = actual;//发布数据a.onNext(value);if(once != 2) {//发布完成a.onComplete();}}}}

5、发布数据

从 FilterFuseableSubscriber 开始调用 onNext() 发布数据,根据依次发布给各自的订阅者,最终数据到了最后一个订阅者 LambdaMonoSubscriber

    LambdaMonoSubscriber.java@Overridepublic final void onNext(T x) {Subscription s = S.getAndSet(this, Operators.cancelledSubscription());if (s == Operators.cancelledSubscription()) {Operators.onNextDropped(x, this.initialContext);return;}if (consumer != null) {try {//最终调用 consumer 消费数据consumer.accept(x);}catch (Throwable t) {Exceptions.throwIfFatal(t);s.cancel();doError(t);}}if (completeConsumer != null) {try {completeConsumer.run();}catch (Throwable t) {Operators.onErrorDropped(t, this.initialContext);}}}

6、发布完成

在数据发布依次到消费者消费后,进入第4步中的 a.onComplete();

依次调用各自的订阅者调用 onComplete()。

Mono 的执行流程相关推荐

  1. springcloud gateway 请求执行流程分析

    一.示例 pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http:// ...

  2. 动态执行流程分析和性能瓶颈分析的利器——gperftools的Cpu Profiler

    在<动态执行流程分析和性能瓶颈分析的利器--valgrind的callgrind>中,我们领略了valgrind对流程和性能瓶颈分析的强大能力.本文将介绍拥有相似能力的gperftools ...

  3. 动态执行流程分析和性能瓶颈分析的利器——valgrind的callgrind

    在<内存.性能问题分析的利器--valgrind>一文中我们简单介绍了下valgrind工具集,本文将使用callgrind工具进行动态执行流程分析和性能瓶颈分析.(转载请指明出于brea ...

  4. 使用Caffe进行手写数字识别执行流程解析

    之前在 http://blog.csdn.net/fengbingchun/article/details/50987185 中仿照Caffe中的examples实现对手写数字进行识别,这里详细介绍下 ...

  5. Caffe中对MNIST执行train操作执行流程解析

    之前在 http://blog.csdn.net/fengbingchun/article/details/49849225 中简单介绍过使用Caffe train MNIST的文章,当时只是仿照ca ...

  6. pip包管理工具-install执行流程简单查看

    pip概述 pip是python提供的包管理工具,该工具提供了对python包的查找.下载.安装与卸载等功能的工具,当前是python中比较主流的管理工具. pip下载安装包的概述 pip工具的本质通 ...

  7. djangorestframework源码分析2:serializer序列化数据的执行流程

    djangorestframework源码分析 本文环境python3.5.2,djangorestframework (3.5.1)系列 djangorestframework源码分析-serial ...

  8. djangorestframework源码分析1:generics中的view执行流程

    djangorestframework源码分析 本文环境python3.5.2,djangorestframework (3.5.1)系列 djangorestframework源码分析-generi ...

  9. 一文搞懂select语句在MySQL中的执行流程!

    MySQL作为互联网行业使用最多的关系型数据库之一,与其免费.开源的特性是密不可分的.然而,很多小伙伴工作了很多年,只知道使用MySQL进行CRUD操作,这也导致很多小伙伴工作多年后,想跳槽进入大厂, ...

  10. Java多线程- 线程池的基本使用和执行流程分析 - ThreadPoolExecutor

    线程池的实现原理 池化技术 一说到线程池自然就会想到池化技术. 其实所谓池化技术,就是把一些能够复用的东西放到池中,避免重复创建.销毁的开销,从而极大提高性能. 常见池化技术的例如: 线程池 内存池 ...

最新文章

  1. LINQ之路19:LINQ to XML之X-DOM更新、和Value属性交互
  2. python数据池连接PG
  3. 【已解决】FileNotFoundError: [Errno 2] No such file or directory:_Python系列学习笔记
  4. 今天你多态了吗? 【转】
  5. 【Vue案例三】使用v-if指令 / component标签实现组件的切换
  6. curl 没有到主机的路由_安装RaspAP将树莓派变身为无线路由器
  7. 《当程序员的那些狗日日子》(五十四)转折
  8. EM从最大似然到EM算法浅解
  9. android通知栏应用程序更新,Android App自动更新之通知栏下载
  10. python rabitmq_python中RabbitMQ的使用(安装和简单教程)
  11. 用计算机怎么发邮件,怎么用qq邮箱发文件-你必须要学会的电脑操作——邮件收发...
  12. Spring学习笔记(五)--Spring概述
  13. sqlite3驱动文件
  14. 《东周列国志》第三十八回 周襄王避乱居郑 晋文公守信降原
  15. 空气净化器上亚马逊需要提交UL867测试报告
  16. CUDA_NVCC_FLAGS set
  17. vue中文本超出省略号
  18. 如何在资源社区上传图标素材
  19. TCP 包完整性检验
  20. MMGG热点 ▏AssangeDAO提案

热门文章

  1. java截取文件名后缀
  2. 7、核心芯片说明文档
  3. 【GTK】【C】GTK学习教程
  4. html网页漂浮广告原理js,JS实现弹性漂浮效果的广告代码
  5. word2010中设置页码起始页从任意一页开始
  6. 鼠标指针乱跑的解决方案
  7. linux限制进程带宽,再Linux系统中限制网络带宽使用的教程
  8. HR面试常见问题汇总
  9. java 解析 office系列文档
  10. 分式智能计算机在线计算,全能智能计算器