聊聊FluxFlatMap的concurrency及prefetch参数
序
本文主要研究下FluxFlatMap的concurrency及prefetch参数
实例
@Testpublic void testConcurrencyAndPrefetch(){int concurrency = 3;int prefetch = 6;Flux.range(1,100).log().flatMap(i -> Flux.just(1,2,3,4,5,6,7,8,9,10).log(),concurrency,prefetch).subscribe();}
部分输出
23:29:38.515 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
23:29:38.534 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
23:29:38.537 [main] INFO reactor.Flux.Range.1 - | request(3)
23:29:38.537 [main] INFO reactor.Flux.Range.1 - | onNext(1)
23:29:38.538 [main] INFO reactor.Flux.Array.2 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | request(6)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(1)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(2)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(3)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(4)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(5)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | request(5)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(6)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(7)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(8)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(9)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(10)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | request(5)
23:29:38.540 [main] INFO reactor.Flux.Array.2 - | onComplete()
23:29:38.540 [main] INFO reactor.Flux.Range.1 - | request(1)
23:29:38.540 [main] INFO reactor.Flux.Range.1 - | onNext(2)
23:29:38.540 [main] INFO reactor.Flux.Array.3 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
23:29:38.540 [main] INFO reactor.Flux.Array.3 - | request(6)
23:29:38.540 [main] INFO reactor.Flux.Array.3 - | onNext(1)
23:29:38.540 [main] INFO reactor.Flux.Array.3 - | onNext(2)
但看外内两个flux的第一次request,可以初步看到分别是concurrency及prefetch
源码解析
Flux
reactor-core-3.1.5.RELEASE-sources.jar!/reactor/core/publisher/Flux.java
/*** Transform the elements emitted by this {@link Flux} asynchronously into Publishers,* then flatten these inner publishers into a single {@link Flux} through merging,* which allow them to interleave.* <p>* There are three dimensions to this operator that can be compared with* {@link #flatMapSequential(Function) flatMapSequential} and {@link #concatMap(Function) concatMap}:* <ul>* <li><b>Generation of inners and subscription</b>: this operator is eagerly* subscribing to its inners.</li>* <li><b>Ordering of the flattened values</b>: this operator does not necessarily preserve* original ordering, as inner element are flattened as they arrive.</li>* <li><b>Interleaving</b>: this operator lets values from different inners interleave* (similar to merging the inner sequences).</li>* </ul>* The concurrency argument allows to control how many {@link Publisher} can be* subscribed to and merged in parallel. The prefetch argument allows to give an* arbitrary prefetch size to the merged {@link Publisher}.** <p>* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/flatmapc.png" alt="">** @param mapper the {@link Function} to transform input sequence into N sequences {@link Publisher}* @param concurrency the maximum number of in-flight inner sequences* @param prefetch the maximum in-flight elements from each inner {@link Publisher} sequence* @param <V> the merged output sequence type** @return a merged {@link Flux}*/public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, intconcurrency, int prefetch) {return flatMap(mapper, false, concurrency, prefetch);}final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extendsV>> mapper, boolean delayError, int concurrency, int prefetch) {return onAssembly(new FluxFlatMap<>(this,mapper,delayError,concurrency,Queues.get(concurrency),prefetch,Queues.get(prefetch)));}
这里使用的是FluxFlatMap
FluxFlatMap
reactor-core-3.1.5.RELEASE-sources.jar!/reactor/core/publisher/FluxFlatMap.java
FluxFlatMap(Flux<? extends T> source,Function<? super T, ? extends Publisher<? extends R>> mapper,boolean delayError,int maxConcurrency,Supplier<? extends Queue<R>> mainQueueSupplier,int prefetch,Supplier<? extends Queue<R>> innerQueueSupplier) {super(source);if (prefetch <= 0) {throw new IllegalArgumentException("prefetch > 0 required but it was " + prefetch);}if (maxConcurrency <= 0) {throw new IllegalArgumentException("maxConcurrency > 0 required but it was " + maxConcurrency);}this.mapper = Objects.requireNonNull(mapper, "mapper");this.delayError = delayError;this.prefetch = prefetch;this.maxConcurrency = maxConcurrency;this.mainQueueSupplier =Objects.requireNonNull(mainQueueSupplier, "mainQueueSupplier");this.innerQueueSupplier =Objects.requireNonNull(innerQueueSupplier, "innerQueueSupplier");}@Overridepublic void subscribe(CoreSubscriber<? super R> actual) {if (trySubscribeScalarMap(source, actual, mapper, false)) {return;}source.subscribe(new FlatMapMain<>(actual,mapper,delayError,maxConcurrency,mainQueueSupplier,prefetch, innerQueueSupplier));}
这里可以看到subscribe的时候使用了FlatMapMain
FlatMapMain
static final class FlatMapMain<T, R> extends FlatMapTracker<FlatMapInner<R>>implements InnerOperator<T, R> {FlatMapMain(CoreSubscriber<? super R> actual,Function<? super T, ? extends Publisher<? extends R>> mapper,boolean delayError,int maxConcurrency,Supplier<? extends Queue<R>> mainQueueSupplier,int prefetch,Supplier<? extends Queue<R>> innerQueueSupplier) {this.actual = actual;this.mapper = mapper;this.delayError = delayError;this.maxConcurrency = maxConcurrency;this.mainQueueSupplier = mainQueueSupplier;this.prefetch = prefetch;this.innerQueueSupplier = innerQueueSupplier;this.limit = Operators.unboundedOrLimit(maxConcurrency);}@Overridepublic void request(long n) {if (Operators.validate(n)) {Operators.addCap(REQUESTED, this, n);drain();}}@Overridepublic void onSubscribe(Subscription s) {if (Operators.validate(this.s, s)) {this.s = s;actual.onSubscribe(this);s.request(Operators.unboundedOrPrefetch(maxConcurrency));}}@SuppressWarnings("unchecked")@Overridepublic void onNext(T t) {if (done) {Operators.onNextDropped(t, actual.currentContext());return;}Publisher<? extends R> p;try {p = Objects.requireNonNull(mapper.apply(t),"The mapper returned a null Publisher");}catch (Throwable e) {onError(Operators.onOperatorError(s, e, t, actual.currentContext()));return;}if (p instanceof Callable) {R v;try {v = ((Callable<R>) p).call();}catch (Throwable e) {if (!delayError || !Exceptions.addThrowable(ERROR, this, e)) {onError(Operators.onOperatorError(s, e, t, actual.currentContext()));}return;}tryEmitScalar(v);}else {FlatMapInner<R> inner = new FlatMapInner<>(this, prefetch);if (add(inner)) {p.subscribe(inner);}}}//...
}
这个可以理解为对外层flux的操作,可以看到onSubscribe的时候,其内部request的大小为Operators.unboundedOrPrefetch(maxConcurrency),也就是第一个参数concurrency
在onNext操作里头,对里头的flux使用了FlatMapInner
FlatMapInner
static final class FlatMapInner<R>implements InnerConsumer<R>, Subscription {FlatMapInner(FlatMapMain<?, R> parent, int prefetch) {this.parent = parent;this.prefetch = prefetch;
// this.limit = prefetch >> 2;this.limit = Operators.unboundedOrLimit(prefetch);}@Overridepublic void onSubscribe(Subscription s) {if (Operators.setOnce(S, this, s)) {if (s instanceof Fuseable.QueueSubscription) {@SuppressWarnings("unchecked") Fuseable.QueueSubscription<R> f =(Fuseable.QueueSubscription<R>) s;int m = f.requestFusion(Fuseable.ANY | Fuseable.THREAD_BARRIER);if (m == Fuseable.SYNC) {sourceMode = Fuseable.SYNC;queue = f;done = true;parent.drain();return;}if (m == Fuseable.ASYNC) {sourceMode = Fuseable.ASYNC;queue = f;}// NONE is just fall-through as the queue will be created on demand}s.request(Operators.unboundedOrPrefetch(prefetch));}} @Overridepublic void request(long n) {long p = produced + n;if (p >= limit) {produced = 0L;s.request(p);}else {produced = p;}}
}
subscribe的时候,request的数量为Operators.unboundedOrPrefetch(prefetch)
这里可以看到这里对prefetch进行右移2操作,相当于除以4,作为limit,limit是个判断,用来对inner的flux的request数量进行限制
小结
flatMap的两个参数concurrency及prefetch,分别是作用于外头及里头的两个flux,第一次request都是使用该值,后续的话,其内部会对request的数量进行判断和调整。
doc
- webflux-concurrency-model
聊聊FluxFlatMap的concurrency及prefetch参数相关推荐
- java的prefetch()_聊聊FluxFlatMap的concurrency及prefetch参数
序 本文主要研究下FluxFlatMap的concurrency及prefetch参数 实例 @Test public void testConcurrencyAndPrefetch(){ int c ...
- 聊聊tomcat jdbc pool的默认参数及poolSweeper
序 本文主要研究一下tomcat jdbc pool的默认参数及poolSweeper tomcat jdbc pool 参数默认值 initialSize = 10(默认值) maxActive=1 ...
- java的maxrow_聊聊pg jdbc statement的maxRows参数
序 本文主要解析一下pg jdbc statement的maxRows参数 Statement.setMaxRows void setMaxRows(int max) throws SQLExcept ...
- python可变参数_Python 的四种共享传参详解
点击上方"Python数据之道",选择"星标公众号" 精品文章,第一时间送达 作者 | 杨仁聪 编辑 | Lemon 出品 | Python数据之道 本文来自公 ...
- python argparse nargs_Python | 使用argparse解析命令行参数
今天是Python专题第27篇文章,我们来聊聊Python当中的命令行参数工具argparse. 命令行参数工具是我们非常常用的工具,比如当我们做实验希望调节参数的时候,如果参数都是通过硬编码写在代码 ...
- SpringBoot 如何进行参数校验,老鸟们都这么玩的!
大家好,我是飘渺. 前几天写了一篇 SpringBoot如何统一后端返回格式?老鸟们都是这样玩的! 阅读效果还不错,而且被很多号主都转载过,今天我们继续第二篇,来聊聊在SprinBoot中如何集成参数 ...
- 内存Prefetch
最近在用vtune分析程序性能瓶颈时,发现一些内存访问的地方竟然成了cpu热点.经过仔细分析,发现这些热点主要是对大数组非连续位置的访问的引起的.比较消耗cpu的原因应该是cache不命中.因为 ...
- webflux系列--reactor功能
创建一个新的Flux just 指定序列中包含的全部元素.创建出来的 Flux 序列在发布这些元素之后会自动结束. 即有限序列. public static <T> Flux<T&g ...
- MQ消息队列之RabbitMQ的安装和原理
1. RabbitMQ安装 1.1 推荐方法:docker安装 一行命令搞定: docker pull rabbitmq:management 开启宿主机与容器的两个重要的端口映射即可: docker ...
- RabbitMQ队列阻塞该如何处理
概述 事故经过 由于大量商户反应收不到推送,第一反应是不是推送系统挂了,导致没有进行推送.于是让运维检查推送系统个节点的情况,发现都正常.于是打开RabbitMQ的管控台看了一下,人都蒙了.已经有几万 ...
最新文章
- 【每日一念经】四轮面试,我如何拿到美团的offer?
- Springboot中给图片添加文字水印
- Unity3D安卓程序中常用静态方法封装
- Python: sklearn库——数据预处理
- 为什么晚结婚的离婚率低?与这个对于我们的启示。
- 一个java文件中可包含多个main方法
- android 捕捉home键
- Zuul异常Zuul spring cloud zuul com.netflix.zuul.exception.ZuulException GENERAL
- 微软官方首度回应黑屏恐慌:不强制 不诉讼
- java成组链接法的实现_c++磁盘存储空间的管理模拟(UNIX存储管理的成组链接法的设计与实现)...
- Java加载Class文件的原理机制
- 值得收藏的5款办公软件,PDF转Word不再头疼
- 道德经和译文_老子《道德经》全文解析及通俗译文
- C++ 侯捷系列视频汇总
- 无法向虚拟机中拷贝文件解决办法
- 语句摘抄——第29周
- kotlin写的加解密算法,包括模式和填充方式
- html实现气流动态图,新风系统的动态图,简单易懂
- 浮生寂的伤感非主流日志发布:相信有天我会比你幸福
- 使用SQLite的感想