Mono 的执行流程
目录
- 前言
- 一、示例
- 二、流程
- 1、构建数据发布者
- 2、构建数据订阅者
- 3、建立订阅关系
- 4、请求数据
- 5、发布数据
- 6、发布完成
前言
本文主要同时简单的示例来分析一下Mono在发布订阅过程中的执行流程。
一、示例
@Testpublic void executeProcessTest() {Mono.just("hello mono").filter(v -> v != null).map(v -> v + " map").defaultIfEmpty("default value").subscribe(System.out::println);}
二、流程
1、构建数据发布者
(1)Mono.just(“hello mono”)
返回 MonoJust,包装值
public static <T> Mono<T> just(T data) {return onAssembly(new MonoJust<>(data));}MonoJust(T value) {this.value = Objects.requireNonNull(value, "value");}
(2)filter
返回 MonoFilterFuseable ,包装 MonoJust 和 predicate
public final Mono<T> filter(final Predicate<? super T> tester) {if (this instanceof Fuseable) {return onAssembly(new MonoFilterFuseable<>(this, tester));}return onAssembly(new MonoFilter<>(this, tester));}MonoFilterFuseable(Mono<? extends T> source, Predicate<? super T> predicate) {super(source);this.predicate = Objects.requireNonNull(predicate, "predicate");}
(3)map
返回 MonoMapFuseable 包装 MonoFilterFuseable 和 mapper
public final <R> Mono<R> map(Function<? super T, ? extends R> mapper) {if (this instanceof Fuseable) {return onAssembly(new MonoMapFuseable<>(this, mapper));}return onAssembly(new MonoMap<>(this, mapper));}MonoMapFuseable(Mono<? extends T> source, Function<? super T, ? extends R> mapper) {super(source);this.mapper = Objects.requireNonNull(mapper, "mapper");}
(4)defaultIfEmpty
返回MonoDefaultIfEmpty,包装 MonoMapFuseable 和 defaultValue
public final Mono<T> defaultIfEmpty(T defaultV) {if (this instanceof Fuseable.ScalarCallable) {try {T v = block();if (v == null) {return Mono.just(defaultV);}}catch (Throwable e) {//leave MonoError returns as this}return this;}return onAssembly(new MonoDefaultIfEmpty<>(this, defaultV));}MonoDefaultIfEmpty(Mono<? extends T> source, T defaultValue) {super(source);this.defaultValue = Objects.requireNonNull(defaultValue, "defaultValue");}
数据发布者的发布流程:
数据 -> MonoJust -> MonoFilterFuseable -> MonoMapFuseable -> MonoDefaultIfEmpty
2、构建数据订阅者
从示例中的 subscribe() 开始
(1) subscribe()
传入 consumer 消费者
public final Disposable subscribe(Consumer<? super T> consumer) {Objects.requireNonNull(consumer, "consumer");return subscribe(consumer, null, null);}public final Disposable subscribe(@Nullable Consumer<? super T> consumer,@Nullable Consumer<? super Throwable> errorConsumer,@Nullable Runnable completeConsumer) {return subscribe(consumer, errorConsumer, completeConsumer, (Context) null);}public final Disposable subscribe(@Nullable Consumer<? super T> consumer,@Nullable Consumer<? super Throwable> errorConsumer,@Nullable Runnable completeConsumer,@Nullable Context initialContext) {return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,completeConsumer, null, initialContext));}
创建 LambdaMonoSubscriber 对象,包装最终的消费者consumer
(2)subscribeWith()
public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {subscribe(subscriber);return subscriber;}
public final void subscribe(Subscriber<? super T> actual) {//最后一层发布者,这里是 MonoDefaultIfEmptyCorePublisher publisher = Operators.onLastAssembly(this);//最后一层订阅者,这里是 LambdaMonoSubscriber CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);//发布者与订阅者建立联系try {if (publisher instanceof OptimizableOperator) {OptimizableOperator operator = (OptimizableOperator) publisher;while (true) {subscriber = operator.subscribeOrReturn(subscriber);if (subscriber == null) {// null means "I will subscribe myself", returning...return;}OptimizableOperator newSource = operator.nextOptimizableSource();if (newSource == null) {publisher = operator.source();break;}operator = newSource;}}//发布者发布数据给订阅者publisher.subscribe(subscriber);}catch (Throwable e) {Operators.reportThrowInSubscribe(subscriber, e);return;}}
(3)发布者与订阅者建立联系的过程
核心方法:
subscriber = operator.subscribeOrReturn(subscriber);
a). operator 是 MonoDefaultIfEmpty,subscriber 是 LambdaMonoSubscriber
返回 DefaultIfEmptySubscriber,作为 LambdaMonoSubscriber 的发布者
@Overridepublic CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {return new FluxDefaultIfEmpty.DefaultIfEmptySubscriber<>(actual, defaultValue);}
b). operator 是 MonoMapFuseable ,subscriber 是 DefaultIfEmptySubscriber
返回 MapFuseableSubscriber,作为 DefaultIfEmptySubscriber 的发布者
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> actual) {if (actual instanceof ConditionalSubscriber) {ConditionalSubscriber<? super R> cs = (ConditionalSubscriber<? super R>) actual;return new FluxMapFuseable.MapFuseableConditionalSubscriber<>(cs, mapper);}return new FluxMapFuseable.MapFuseableSubscriber<>(actual, mapper);}
c).operator 是 MonoFilterFuseable ,subscriber 是 MapFuseableSubscriber
返回 FilterFuseableSubscriber,作为 MapFuseableSubscriber 的发布者
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {if (actual instanceof ConditionalSubscriber) {return new FluxFilterFuseable.FilterFuseableConditionalSubscriber<>((ConditionalSubscriber<? super T>) actual, predicate);}return new FluxFilterFuseable.FilterFuseableSubscriber<>(actual, predicate);}
此时发布者与订阅者关系:
FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber -> consumer
3、建立订阅关系
publisher.subscribe(subscriber);
此时publisher 是 MonoJust, subscriber 是 FilterFuseableSubscriber
创建 scalarSubscription ,包装 FilterFuseableSubscriber
@Overridepublic void subscribe(CoreSubscriber<? super T> actual) {actual.onSubscribe(Operators.scalarSubscription(actual, value));}
根据发布订阅关系依次调用订阅者的 onSubscribe() 建立订阅关系
FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber
进入 LambdaMonoSubscriber 的 onSubscribe()
@Overridepublic final void onSubscribe(Subscription s) {if (Operators.validate(subscription, s)) {this.subscription = s;if (subscriptionConsumer != null) {try {subscriptionConsumer.accept(s);}catch (Throwable t) {Exceptions.throwIfFatal(t);s.cancel();onError(t);}}else {//请求数据s.request(Long.MAX_VALUE);}}}
4、请求数据
通过订阅关系调用 request() 请求数据,
s.request(Long.MAX_VALUE);
即根据下面的关系链反向请求数据
FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber
最终到了 Operators 类中
@Overridepublic void request(long n) {if (validate(n)) {if (ONCE.compareAndSet(this, 0, 1)) {Subscriber<? super T> a = actual;//发布数据a.onNext(value);if(once != 2) {//发布完成a.onComplete();}}}}
5、发布数据
从 FilterFuseableSubscriber 开始调用 onNext() 发布数据,根据依次发布给各自的订阅者,最终数据到了最后一个订阅者 LambdaMonoSubscriber
LambdaMonoSubscriber.java@Overridepublic final void onNext(T x) {Subscription s = S.getAndSet(this, Operators.cancelledSubscription());if (s == Operators.cancelledSubscription()) {Operators.onNextDropped(x, this.initialContext);return;}if (consumer != null) {try {//最终调用 consumer 消费数据consumer.accept(x);}catch (Throwable t) {Exceptions.throwIfFatal(t);s.cancel();doError(t);}}if (completeConsumer != null) {try {completeConsumer.run();}catch (Throwable t) {Operators.onErrorDropped(t, this.initialContext);}}}
6、发布完成
在数据发布依次到消费者消费后,进入第4步中的 a.onComplete();
依次调用各自的订阅者调用 onComplete()。
Mono 的执行流程相关推荐
- springcloud gateway 请求执行流程分析
一.示例 pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http:// ...
- 动态执行流程分析和性能瓶颈分析的利器——gperftools的Cpu Profiler
在<动态执行流程分析和性能瓶颈分析的利器--valgrind的callgrind>中,我们领略了valgrind对流程和性能瓶颈分析的强大能力.本文将介绍拥有相似能力的gperftools ...
- 动态执行流程分析和性能瓶颈分析的利器——valgrind的callgrind
在<内存.性能问题分析的利器--valgrind>一文中我们简单介绍了下valgrind工具集,本文将使用callgrind工具进行动态执行流程分析和性能瓶颈分析.(转载请指明出于brea ...
- 使用Caffe进行手写数字识别执行流程解析
之前在 http://blog.csdn.net/fengbingchun/article/details/50987185 中仿照Caffe中的examples实现对手写数字进行识别,这里详细介绍下 ...
- Caffe中对MNIST执行train操作执行流程解析
之前在 http://blog.csdn.net/fengbingchun/article/details/49849225 中简单介绍过使用Caffe train MNIST的文章,当时只是仿照ca ...
- pip包管理工具-install执行流程简单查看
pip概述 pip是python提供的包管理工具,该工具提供了对python包的查找.下载.安装与卸载等功能的工具,当前是python中比较主流的管理工具. pip下载安装包的概述 pip工具的本质通 ...
- djangorestframework源码分析2:serializer序列化数据的执行流程
djangorestframework源码分析 本文环境python3.5.2,djangorestframework (3.5.1)系列 djangorestframework源码分析-serial ...
- djangorestframework源码分析1:generics中的view执行流程
djangorestframework源码分析 本文环境python3.5.2,djangorestframework (3.5.1)系列 djangorestframework源码分析-generi ...
- 一文搞懂select语句在MySQL中的执行流程!
MySQL作为互联网行业使用最多的关系型数据库之一,与其免费.开源的特性是密不可分的.然而,很多小伙伴工作了很多年,只知道使用MySQL进行CRUD操作,这也导致很多小伙伴工作多年后,想跳槽进入大厂, ...
- Java多线程- 线程池的基本使用和执行流程分析 - ThreadPoolExecutor
线程池的实现原理 池化技术 一说到线程池自然就会想到池化技术. 其实所谓池化技术,就是把一些能够复用的东西放到池中,避免重复创建.销毁的开销,从而极大提高性能. 常见池化技术的例如: 线程池 内存池 ...
最新文章
- LINQ之路19:LINQ to XML之X-DOM更新、和Value属性交互
- python数据池连接PG
- 【已解决】FileNotFoundError: [Errno 2] No such file or directory:_Python系列学习笔记
- 今天你多态了吗? 【转】
- 【Vue案例三】使用v-if指令 / component标签实现组件的切换
- curl 没有到主机的路由_安装RaspAP将树莓派变身为无线路由器
- 《当程序员的那些狗日日子》(五十四)转折
- EM从最大似然到EM算法浅解
- android通知栏应用程序更新,Android App自动更新之通知栏下载
- python rabitmq_python中RabbitMQ的使用(安装和简单教程)
- 用计算机怎么发邮件,怎么用qq邮箱发文件-你必须要学会的电脑操作——邮件收发...
- Spring学习笔记(五)--Spring概述
- sqlite3驱动文件
- 《东周列国志》第三十八回 周襄王避乱居郑 晋文公守信降原
- 空气净化器上亚马逊需要提交UL867测试报告
- CUDA_NVCC_FLAGS set
- vue中文本超出省略号
- 如何在资源社区上传图标素材
- TCP 包完整性检验
- MMGG热点 ▏AssangeDAO提案