本文主要研究下FluxFlatMap的concurrency及prefetch参数

实例

    @Testpublic void testConcurrencyAndPrefetch(){int concurrency = 3;int prefetch = 6;Flux.range(1,100).log().flatMap(i -> Flux.just(1,2,3,4,5,6,7,8,9,10).log(),concurrency,prefetch).subscribe();}

部分输出

23:29:38.515 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
23:29:38.534 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
23:29:38.537 [main] INFO reactor.Flux.Range.1 - | request(3)
23:29:38.537 [main] INFO reactor.Flux.Range.1 - | onNext(1)
23:29:38.538 [main] INFO reactor.Flux.Array.2 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | request(6)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(1)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(2)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(3)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(4)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(5)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | request(5)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(6)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(7)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(8)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(9)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(10)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | request(5)
23:29:38.540 [main] INFO reactor.Flux.Array.2 - | onComplete()
23:29:38.540 [main] INFO reactor.Flux.Range.1 - | request(1)
23:29:38.540 [main] INFO reactor.Flux.Range.1 - | onNext(2)
23:29:38.540 [main] INFO reactor.Flux.Array.3 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
23:29:38.540 [main] INFO reactor.Flux.Array.3 - | request(6)
23:29:38.540 [main] INFO reactor.Flux.Array.3 - | onNext(1)
23:29:38.540 [main] INFO reactor.Flux.Array.3 - | onNext(2)

但看外内两个flux的第一次request,可以初步看到分别是concurrency及prefetch

源码解析

Flux

reactor-core-3.1.5.RELEASE-sources.jar!/reactor/core/publisher/Flux.java

    /*** Transform the elements emitted by this {@link Flux} asynchronously into Publishers,* then flatten these inner publishers into a single {@link Flux} through merging,* which allow them to interleave.* <p>* There are three dimensions to this operator that can be compared with* {@link #flatMapSequential(Function) flatMapSequential} and {@link #concatMap(Function) concatMap}:* <ul>*     <li><b>Generation of inners and subscription</b>: this operator is eagerly*     subscribing to its inners.</li>*     <li><b>Ordering of the flattened values</b>: this operator does not necessarily preserve*     original ordering, as inner element are flattened as they arrive.</li>*     <li><b>Interleaving</b>: this operator lets values from different inners interleave*     (similar to merging the inner sequences).</li>* </ul>* The concurrency argument allows to control how many {@link Publisher} can be* subscribed to and merged in parallel. The prefetch argument allows to give an* arbitrary prefetch size to the merged {@link Publisher}.** <p>* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/flatmapc.png" alt="">** @param mapper the {@link Function} to transform input sequence into N sequences {@link Publisher}* @param concurrency the maximum number of in-flight inner sequences* @param prefetch the maximum in-flight elements from each inner {@link Publisher} sequence* @param <V> the merged output sequence type** @return a merged {@link Flux}*/public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, intconcurrency, int prefetch) {return flatMap(mapper, false, concurrency, prefetch);}final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extendsV>> mapper, boolean delayError, int concurrency, int prefetch) {return onAssembly(new FluxFlatMap<>(this,mapper,delayError,concurrency,Queues.get(concurrency),prefetch,Queues.get(prefetch)));}

这里使用的是FluxFlatMap

FluxFlatMap

reactor-core-3.1.5.RELEASE-sources.jar!/reactor/core/publisher/FluxFlatMap.java

    FluxFlatMap(Flux<? extends T> source,Function<? super T, ? extends Publisher<? extends R>> mapper,boolean delayError,int maxConcurrency,Supplier<? extends Queue<R>> mainQueueSupplier,int prefetch,Supplier<? extends Queue<R>> innerQueueSupplier) {super(source);if (prefetch <= 0) {throw new IllegalArgumentException("prefetch > 0 required but it was " + prefetch);}if (maxConcurrency <= 0) {throw new IllegalArgumentException("maxConcurrency > 0 required but it was " + maxConcurrency);}this.mapper = Objects.requireNonNull(mapper, "mapper");this.delayError = delayError;this.prefetch = prefetch;this.maxConcurrency = maxConcurrency;this.mainQueueSupplier =Objects.requireNonNull(mainQueueSupplier, "mainQueueSupplier");this.innerQueueSupplier =Objects.requireNonNull(innerQueueSupplier, "innerQueueSupplier");}@Overridepublic void subscribe(CoreSubscriber<? super R> actual) {if (trySubscribeScalarMap(source, actual, mapper, false)) {return;}source.subscribe(new FlatMapMain<>(actual,mapper,delayError,maxConcurrency,mainQueueSupplier,prefetch, innerQueueSupplier));}    

这里可以看到subscribe的时候使用了FlatMapMain

FlatMapMain

static final class FlatMapMain<T, R> extends FlatMapTracker<FlatMapInner<R>>implements InnerOperator<T, R> {FlatMapMain(CoreSubscriber<? super R> actual,Function<? super T, ? extends Publisher<? extends R>> mapper,boolean delayError,int maxConcurrency,Supplier<? extends Queue<R>> mainQueueSupplier,int prefetch,Supplier<? extends Queue<R>> innerQueueSupplier) {this.actual = actual;this.mapper = mapper;this.delayError = delayError;this.maxConcurrency = maxConcurrency;this.mainQueueSupplier = mainQueueSupplier;this.prefetch = prefetch;this.innerQueueSupplier = innerQueueSupplier;this.limit = Operators.unboundedOrLimit(maxConcurrency);}@Overridepublic void request(long n) {if (Operators.validate(n)) {Operators.addCap(REQUESTED, this, n);drain();}}@Overridepublic void onSubscribe(Subscription s) {if (Operators.validate(this.s, s)) {this.s = s;actual.onSubscribe(this);s.request(Operators.unboundedOrPrefetch(maxConcurrency));}}@SuppressWarnings("unchecked")@Overridepublic void onNext(T t) {if (done) {Operators.onNextDropped(t, actual.currentContext());return;}Publisher<? extends R> p;try {p = Objects.requireNonNull(mapper.apply(t),"The mapper returned a null Publisher");}catch (Throwable e) {onError(Operators.onOperatorError(s, e, t, actual.currentContext()));return;}if (p instanceof Callable) {R v;try {v = ((Callable<R>) p).call();}catch (Throwable e) {if (!delayError || !Exceptions.addThrowable(ERROR, this, e)) {onError(Operators.onOperatorError(s, e, t, actual.currentContext()));}return;}tryEmitScalar(v);}else {FlatMapInner<R> inner = new FlatMapInner<>(this, prefetch);if (add(inner)) {p.subscribe(inner);}}}//...
}                        

这个可以理解为对外层flux的操作,可以看到onSubscribe的时候,其内部request的大小为Operators.unboundedOrPrefetch(maxConcurrency),也就是第一个参数concurrency

在onNext操作里头,对里头的flux使用了FlatMapInner

FlatMapInner

static final class FlatMapInner<R>implements InnerConsumer<R>, Subscription {FlatMapInner(FlatMapMain<?, R> parent, int prefetch) {this.parent = parent;this.prefetch = prefetch;
//            this.limit = prefetch >> 2;this.limit = Operators.unboundedOrLimit(prefetch);}@Overridepublic void onSubscribe(Subscription s) {if (Operators.setOnce(S, this, s)) {if (s instanceof Fuseable.QueueSubscription) {@SuppressWarnings("unchecked") Fuseable.QueueSubscription<R> f =(Fuseable.QueueSubscription<R>) s;int m = f.requestFusion(Fuseable.ANY | Fuseable.THREAD_BARRIER);if (m == Fuseable.SYNC) {sourceMode = Fuseable.SYNC;queue = f;done = true;parent.drain();return;}if (m == Fuseable.ASYNC) {sourceMode = Fuseable.ASYNC;queue = f;}// NONE is just fall-through as the queue will be created on demand}s.request(Operators.unboundedOrPrefetch(prefetch));}}        @Overridepublic void request(long n) {long p = produced + n;if (p >= limit) {produced = 0L;s.request(p);}else {produced = p;}}
}            

subscribe的时候,request的数量为Operators.unboundedOrPrefetch(prefetch)
这里可以看到这里对prefetch进行右移2操作,相当于除以4,作为limit,limit是个判断,用来对inner的flux的request数量进行限制

小结

flatMap的两个参数concurrency及prefetch,分别是作用于外头及里头的两个flux,第一次request都是使用该值,后续的话,其内部会对request的数量进行判断和调整。

doc

  • webflux-concurrency-model

聊聊FluxFlatMap的concurrency及prefetch参数相关推荐

  1. java的prefetch()_聊聊FluxFlatMap的concurrency及prefetch参数

    序 本文主要研究下FluxFlatMap的concurrency及prefetch参数 实例 @Test public void testConcurrencyAndPrefetch(){ int c ...

  2. 聊聊tomcat jdbc pool的默认参数及poolSweeper

    序 本文主要研究一下tomcat jdbc pool的默认参数及poolSweeper tomcat jdbc pool 参数默认值 initialSize = 10(默认值) maxActive=1 ...

  3. java的maxrow_聊聊pg jdbc statement的maxRows参数

    序 本文主要解析一下pg jdbc statement的maxRows参数 Statement.setMaxRows void setMaxRows(int max) throws SQLExcept ...

  4. python可变参数_Python 的四种共享传参详解

    点击上方"Python数据之道",选择"星标公众号" 精品文章,第一时间送达 作者 | 杨仁聪 编辑 | Lemon 出品 | Python数据之道 本文来自公 ...

  5. python argparse nargs_Python | 使用argparse解析命令行参数

    今天是Python专题第27篇文章,我们来聊聊Python当中的命令行参数工具argparse. 命令行参数工具是我们非常常用的工具,比如当我们做实验希望调节参数的时候,如果参数都是通过硬编码写在代码 ...

  6. SpringBoot 如何进行参数校验,老鸟们都这么玩的!

    大家好,我是飘渺. 前几天写了一篇 SpringBoot如何统一后端返回格式?老鸟们都是这样玩的! 阅读效果还不错,而且被很多号主都转载过,今天我们继续第二篇,来聊聊在SprinBoot中如何集成参数 ...

  7. 内存Prefetch

     最近在用vtune分析程序性能瓶颈时,发现一些内存访问的地方竟然成了cpu热点.经过仔细分析,发现这些热点主要是对大数组非连续位置的访问的引起的.比较消耗cpu的原因应该是cache不命中.因为 ...

  8. webflux系列--reactor功能

    创建一个新的Flux just 指定序列中包含的全部元素.创建出来的 Flux 序列在发布这些元素之后会自动结束. 即有限序列. public static <T> Flux<T&g ...

  9. MQ消息队列之RabbitMQ的安装和原理

    1. RabbitMQ安装 1.1 推荐方法:docker安装 一行命令搞定: docker pull rabbitmq:management 开启宿主机与容器的两个重要的端口映射即可: docker ...

  10. RabbitMQ队列阻塞该如何处理

    概述 事故经过 由于大量商户反应收不到推送,第一反应是不是推送系统挂了,导致没有进行推送.于是让运维检查推送系统个节点的情况,发现都正常.于是打开RabbitMQ的管控台看了一下,人都蒙了.已经有几万 ...

最新文章

  1. 【每日一念经】四轮面试,我如何拿到美团的offer?
  2. Springboot中给图片添加文字水印
  3. Unity3D安卓程序中常用静态方法封装
  4. Python: sklearn库——数据预处理
  5. 为什么晚结婚的离婚率低?与这个对于我们的启示。
  6. 一个java文件中可包含多个main方法
  7. android 捕捉home键
  8. Zuul异常Zuul spring cloud zuul com.netflix.zuul.exception.ZuulException GENERAL
  9. 微软官方首度回应黑屏恐慌:不强制 不诉讼
  10. java成组链接法的实现_c++磁盘存储空间的管理模拟(UNIX存储管理的成组链接法的设计与实现)...
  11. Java加载Class文件的原理机制
  12. 值得收藏的5款办公软件,PDF转Word不再头疼
  13. 道德经和译文_老子《道德经》全文解析及通俗译文
  14. C++ 侯捷系列视频汇总
  15. 无法向虚拟机中拷贝文件解决办法
  16. 语句摘抄——第29周
  17. kotlin写的加解密算法,包括模式和填充方式
  18. html实现气流动态图,新风系统的动态图,简单易懂
  19. 浮生寂的伤感非主流日志发布:相信有天我会比你幸福
  20. 使用SQLite的感想

热门文章

  1. python学习笔记 day44 表与表之间的关系
  2. Windows Phone 程序发布过程
  3. mysqldump导出数据
  4. Baidu 人脸识别FireFly 与PC连接调试
  5. Android使用adb命令查看CPU信息
  6. MyBatis 简介
  7. 修正win10部署JDK8(使用JDK文件夹中的自带JRE)
  8. 各种数据库的分页查询
  9. SQL中一种类似GUID值的函数实现
  10. 从客户端(content=span class=Apple-s...)中检测到有潜在危险的 Request.Form 值。