本文主要研究一下reactive streams的schedulers

背景

默认情况下Mono以及Flux都在主线程上运行,有时候可能会阻塞主线程,可以通过设定schedulers让其在其他线程运行。

原始输出

没有使用publishOn及subscribeOn时输出如下

11:26:10.668 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
11:26:11.097 [main] INFO com.example.demo.SchedulerTest - defer thread:[main]
11:26:11.116 [main] INFO com.example.demo.SchedulerTest - filter thread:[main]
11:26:11.116 [main] INFO com.example.demo.SchedulerTest - filter thread:[main]
11:26:11.116 [main] INFO com.example.demo.SchedulerTest - subscribe thread:[main],data :2
11:26:11.116 [main] INFO com.example.demo.SchedulerTest - filter thread:[main]
11:26:11.116 [main] INFO com.example.demo.SchedulerTest - filter thread:[main]
11:26:11.117 [main] INFO com.example.demo.SchedulerTest - subscribe thread:[main],data :4

publishOn(给subscriber配置线程)

    @Testpublic void testPublisherThread(){Scheduler pubScheduler = Schedulers.newSingle("pub-thread");Flux.defer(() -> {LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());return Flux.range(1,4);}).filter(e -> {LOGGER.info("filter thread:[{}]",Thread.currentThread().getName());return e % 2 == 0;}).publishOn(pubScheduler).subscribe(e -> {LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);});}

输出

11:31:23.691 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
11:31:23.871 [main] INFO com.example.demo.SchedulerTest - defer thread:[main]
11:31:23.880 [main] INFO com.example.demo.SchedulerTest - filter thread:[main]
11:31:23.881 [main] INFO com.example.demo.SchedulerTest - filter thread:[main]
11:31:23.881 [main] INFO com.example.demo.SchedulerTest - filter thread:[main]
11:31:23.881 [main] INFO com.example.demo.SchedulerTest - filter thread:[main]
11:31:23.881 [publisher-thread-1] INFO com.example.demo.SchedulerTest - subscribe thread:[publisher-thread-1],data :2
11:31:23.881 [publisher-thread-1] INFO com.example.demo.SchedulerTest - subscribe thread:[publisher-thread-1],data :4

可以发现,配置publishOn,改变了subscribe的运行线程

subscribeOn(给publisher配置线程)

    @Testpublic void testSubscriberThread() throws InterruptedException {Scheduler subScheduler = Schedulers.newSingle("sub-thread");Flux.defer(() -> {LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());return Flux.range(1,4);}).filter(e -> {LOGGER.info("filter thread:[{}]",Thread.currentThread().getName());return e % 2 == 0;}).subscribeOn(subScheduler).subscribe(e -> {LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);});Thread.sleep(10*1000);}

输出如下:

11:31:58.294 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
11:31:58.528 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - defer thread:[subscriber-thread-1]
11:31:58.532 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[subscriber-thread-1]
11:31:58.532 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[subscriber-thread-1]
11:31:58.532 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - subscribe thread:[subscriber-thread-1],data :2
11:31:58.533 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[subscriber-thread-1]
11:31:58.533 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[subscriber-thread-1]
11:31:58.533 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - subscribe thread:[subscriber-thread-1],data :4

可以发现,配置了subscribeOn,所有的都在这个线程运行,包括defer、包括filter、包括subscribe

publishOn和subscribeOn

    @Testpublic void testPublisherAndSubscriberThread() throws InterruptedException {Scheduler pubScheduler = Schedulers.newSingle("publisher-thread");Scheduler subScheduler = Schedulers.newSingle("subscriber-thread");Flux.defer(() -> {LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());return Flux.range(1,4);}).filter(e -> {LOGGER.info("filter thread:[{}]",Thread.currentThread().getName());return e % 2 == 0;}).publishOn(pubScheduler).subscribeOn(subScheduler).subscribe(e -> {LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);});Thread.sleep(10*1000);}

输出

11:33:00.964 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
11:33:01.125 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - defer thread:[subscriber-thread-1]
11:33:01.134 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[subscriber-thread-1]
11:33:01.135 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[subscriber-thread-1]
11:33:01.135 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[subscriber-thread-1]
11:33:01.135 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[subscriber-thread-1]
11:33:01.135 [publisher-thread-2] INFO com.example.demo.SchedulerTest - subscribe thread:[publisher-thread-2],data :2
11:33:01.135 [publisher-thread-2] INFO com.example.demo.SchedulerTest - subscribe thread:[publisher-thread-2],data :4

都配置了话,可以看到subscriber运行在publishOn配置的线程,而defer、filter等运行在subscribeOn配置的线程

publishOn及filter

    @Testpublic void testFilterThread(){Scheduler pubScheduler = Schedulers.newSingle("publisher-thread");Flux.defer(() -> {LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());return Flux.range(1,4);}).publishOn(pubScheduler) //NOTE 注意这里放到了filter之前.filter(e -> {LOGGER.info("filter thread:[{}]",Thread.currentThread().getName());return e % 2 == 0;}).subscribe(e -> {LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);});}

输出

13:19:01.606 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
13:19:01.754 [main] INFO com.example.demo.SchedulerTest - defer thread:[main]
13:19:01.766 [publisher-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[publisher-thread-1]
13:19:01.766 [publisher-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[publisher-thread-1]
13:19:01.766 [publisher-thread-1] INFO com.example.demo.SchedulerTest - subscribe thread:[publisher-thread-1],data :2
13:19:01.766 [publisher-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[publisher-thread-1]
13:19:01.766 [publisher-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[publisher-thread-1]
13:19:01.767 [publisher-thread-1] INFO com.example.demo.SchedulerTest - subscribe thread:[publisher-thread-1],data :4

这里将publishOn放在了filter之前,可以发现filter线程也变成publisher线程了
在publishOn之后的filter或map等将使用publishOn配置的线程;之前的话,使用的是main线程或subscribeOn配置的线程

subscribeOn及filter

将subscribeOn放在filter之前,跟之后没有区别,因为没有配置publishOn时,subscribeOn作用于所有,包括filter

window scheduler

还可以给window方法设定线程池

    @Testpublic void testWindowScheduler() throws InterruptedException {Scheduler windowScheduler = Schedulers.newSingle("window-thread");Flux.defer(() -> {LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());return Flux.range(1,4);}).delayElements(Duration.ofMillis(200)) //默认会创建parallel线程,作用于subscribe线程.windowTimeout(1, Duration.ofMillis(100), windowScheduler).onErrorReturn(Flux.<Integer>just(-1)).flatMap(e -> {return e.map(item -> item*10);}).subscribe(e -> {LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);});Thread.sleep(10*1000);}

输出

14:15:28.523 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
14:15:28.701 [main] INFO com.example.demo.SchedulerTest - defer thread:[main]
14:15:28.961 [parallel-1] INFO com.example.demo.SchedulerTest - subscribe thread:[parallel-1],data :10
14:15:29.167 [window-thread-1] INFO com.example.demo.SchedulerTest - subscribe thread:[window-thread-1],data :20
14:15:29.370 [window-thread-1] INFO com.example.demo.SchedulerTest - subscribe thread:[window-thread-1],data :30
14:15:29.573 [parallel-4] INFO com.example.demo.SchedulerTest - subscribe thread:[parallel-4],data :40

注意delayElements方法默认给subscriber创建了parallel线程
timeout(),skip()等方法也默认会创建线程

scheduleGroup

前面在publishOn以及subscribeOn使用的都是Schedulers.newSingle,也可以使用多个线程组成的group,比如

Scheduler parallelGroup = Schedulers.newParallel("parallel-group", 8);

也可以使用elastic类型,比较适合IO类型的操作

    /*** {@link Scheduler} that dynamically creates ExecutorService-based Workers and caches* the thread pools, reusing them once the Workers have been shut down.* <p>* The maximum number of created thread pools is unbounded.* <p>* The default time-to-live for unused thread pools is 60 seconds, use the appropriate* factory to push a different value.* <p>* This scheduler is not restartable.** @param name Thread prefix** @return a new {@link Scheduler} that hosts a fixed pool of single-threaded* ExecutorService-based workers and is suited for parallel work*/public static Scheduler newElastic(String name) {return newElastic(name, ElasticScheduler.DEFAULT_TTL_SECONDS);}

实例

    @Testpublic void testElasticGroup() throws InterruptedException {Scheduler elastic = Schedulers.newElastic("elastic-group");Flux.defer(() -> {LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());return Flux.range(1,4);}).filter(e -> {LOGGER.info("filter thread:[{}]",Thread.currentThread().getName());return e % 2 == 0;}).publishOn(elastic).map(e -> {LOGGER.info("map thread:[{}]",Thread.currentThread().getName());return e * 10;}).subscribeOn(elastic).subscribe(e -> {LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);});Thread.sleep(10*1000);}

输出

13:58:37.356 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
13:58:37.514 [elastic-group-2] INFO com.example.demo.SchedulerTest - defer thread:[elastic-group-2]
13:58:37.520 [elastic-group-2] INFO com.example.demo.SchedulerTest - filter thread:[elastic-group-2]
13:58:37.520 [elastic-group-2] INFO com.example.demo.SchedulerTest - filter thread:[elastic-group-2]
13:58:37.520 [elastic-group-2] INFO com.example.demo.SchedulerTest - filter thread:[elastic-group-2]
13:58:37.520 [elastic-group-2] INFO com.example.demo.SchedulerTest - filter thread:[elastic-group-2]
13:58:37.520 [elastic-group-3] INFO com.example.demo.SchedulerTest - map thread:[elastic-group-3]
13:58:37.520 [elastic-group-3] INFO com.example.demo.SchedulerTest - subscribe thread:[elastic-group-3],data :20
13:58:37.521 [elastic-group-3] INFO com.example.demo.SchedulerTest - map thread:[elastic-group-3]
13:58:37.521 [elastic-group-3] INFO com.example.demo.SchedulerTest - subscribe thread:[elastic-group-3],data :40

小结

  • 命名

这个publishOn及subscribeOn方法名有点晦涩,更直白一点相当于subscriberThreadPools以及publisherThreadPools。

  • publishOn与operations的位置

在publishOn之后的filter或map等将使用publishOn配置的线程;之前的话,使用的是main线程或subscribeOn配置的线程

  • subscribeOn

在没有配置publishOn,只配置subscribeOn的话,则作用所有

  • 方法内置线程

delayElements(),timeout(),skip()内置会使用额外的线程

doc

  • schedulers

聊聊reactive streams的schedulers相关推荐

  1. 聊聊reactive streams的processors

    序 本文主要研究一下reactive streams的processors processors分类 processors既是Publisher也是Subscriber.在project reacto ...

  2. Reactive Streams

    2019独角兽企业重金招聘Python工程师标准>>> Reactive Streams is an initiative to provide a standard for asy ...

  3. Reactive Streams规范及常见库

    一.什么是Reactive Streams Reactive Streams is an initiative to provide a standard for asynchronous strea ...

  4. JDK11的新特性:HTTP API和reactive streams

    文章目录 简介 怎么在java中使用reactive streams POST请求的例子 总结 简介 在JDK11的新特性:新的HTTP API中,我们介绍了通过新的HTTP API,我们可以发送同步 ...

  5. 什么是Reactive Streams in Java 译

    数式编程对于Reactive Programming很重要,但我不会在这篇文章中深入探讨函数式编程. 在这篇文章中,我想看看Java中的整体Reactive发展环境. Reactive Program ...

  6. JVM平台上的响应式流(Reactive Streams)规范

    Reactive Streams 响应式流是一个倡议,用来为具有非阻塞后压的异步流处理提供一个标准.大家努力的目标集中在运行时环境(JVM和JavaScript)和网络协议上. 注:响应式流其实就是一 ...

  7. Reactive Streams规范

    Reactive Streams的目的是提供一个带有 非阻塞背压 特征的 异步流处理 标准. 最新的发布版本如下 <dependency><groupId>org.reacti ...

  8. reactive streams的Mono及Flux

    实例 Mono @Testpublic void testMonoBasic(){Mono.fromSupplier(() -> "Hello").subscribe(Sys ...

  9. FunDA(6)- Reactive Streams:Play with Iteratees、Enumerator and Enumeratees

    在上一节我们介绍了Iteratee.它的功能是消耗从一些数据源推送过来的数据元素,不同的数据消耗方式代表了不同功能的Iteratee.所谓的数据源就是我们这节要讨论的Enumerator.Enumer ...

最新文章

  1. 解决Debian-7.1下Chrome浏览器字体难看的问题
  2. c 自动生成html报告,Pytest框架之 - Allure生成漂亮的HTML图形测试报告
  3. mysql 管理端口_MySQL8新增管理端口
  4. springboot学习笔记(五)
  5. 201671010117 2016-2017-2 《Java程序设计》Java第二周学习心得
  6. C++中的(unsigned int)代表的意思
  7. C语言中字符串的处理方式(一)
  8. UOS桌面专业版下载链接
  9. oracle dba 培训教程 第11章 索引的管理与维护
  10. 你为什么总是很忙碌,却还在原地踏步!
  11. 基于QT5实现的心率变异与心率减速力分析软件
  12. DBMS_AW_EXP: BIN$XXXXXXX==$0 not AW$
  13. Excel日期按秒自增公式
  14. Wi-Fi 工作频段
  15. L. Spicy Restaurant(bfs)
  16. 小微贷款按小时算利息,会是互联网巨头“放贷梦“的终局吗?
  17. 微信小程序开发需要注意的29个坑
  18. 【drawio笔记】将图表导出为更高分辨率的 PNG 图像
  19. ubuntu 的远程桌面客户端工具tsclient
  20. mysql gtid 主备搭建

热门文章

  1. putty ubuntu服务器 上传文件,教你如何使用PuTTY上传文件?
  2. php索引数组转键数组,php索引数组和关联数组
  3. 脑电实验注意事项及实验过程中伪迹识别
  4. 打印表单_超市生鲜常用表单,打印出来直接用!(可收藏)
  5. 25个视频神同步,还能给视频声音移花接木,谷歌开源最新自监督算法
  6. 围剿Sci-Hub力度升级!全球最大学术出版商:网址你也不要提,不然就发律师函...
  7. 20165302第八周总结
  8. 深度学习专家Karpathy加入特斯拉,或将负责自动驾驶视觉研究
  9. Java基础/利用fastjson序列化对象为JSON
  10. CFCC百套计划2 CodeChef December Challenge 2017 Chef And Easy Xor Queries