文章目录

  • 并发任务编排实现
    • 不带返回值/参数传递任务
      • 串行执行
      • 并行执行
      • 并行执行-自定义线程池
      • 阻塞等待:多并行任务执行完再执行
      • 任意一个任务并发执行完就执行下个任务
      • 串并行任务依赖场景
    • 带返回值/参数传递任务
      • 带返回值实现
      • 串行执行
    • 多线程任务串行执行
      • 对任务并行执行,返回值combine
    • 写在最后

并发任务编排实现

其实Java8中提供了并发编程框架CompletableFuture,以下结合不同场景进行使用。

不带返回值/参数传递任务

模拟任务代码:

    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");class TaskA implements Runnable{@SneakyThrows@Overridepublic void run() {Thread.sleep(2000);System.out.println(String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务A", LocalDateTime.now().format(formatter)));}}class TaskB implements Runnable{@SneakyThrows@Overridepublic void run() {Thread.sleep(1000);System.out.println(String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务B", LocalDateTime.now().format(formatter)));}}class TaskC implements Runnable{@SneakyThrows@Overridepublic void run() {Thread.sleep(50);System.out.println(String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务C", LocalDateTime.now().format(formatter)));}}

串行执行

A、B、C任务串行执行

CompletableFuture,runAsync():异步执行
thenRun():上个任务结束再执行(不带上一个返回值结果)下一个任务
get():阻塞等待任务执行完成

实现方式:

    @Testvoid thenRunTest() throws ExecutionException, InterruptedException {CompletableFuture<Void> future = CompletableFuture.runAsync(new TaskA()).thenRun(new TaskB()).thenRun(new TaskC());future.get();}

输出:

threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务A] time:[2021-06-01 22:56:51]
threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务B] time:[2021-06-01 22:56:52]
threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务C] time:[2021-06-01 22:56:53]

从日志就能看出串行执行就是通过单线程执行多个任务。

并行执行

A、B、C任务并行执行

CompletableFuture.allOf():等待所有的CompletableFuture执行完成,无返回值

代码实现:

    /*** 并发执行ABC任务*/@SneakyThrows@Testvoid SeqTest(){String start = LocalDateTime.now().format(formatter);System.out.println(String.format("start task [%s]", start));CompletableFuture[] futures = new CompletableFuture[3];futures[0] = CompletableFuture.runAsync(new TaskA());futures[1] = CompletableFuture.runAsync(new TaskB());futures[2] = CompletableFuture.runAsync(new TaskC());CompletableFuture.allOf(futures).get();String end = LocalDateTime.now().format(formatter);System.out.println(String.format("end task [%s]", end));}

输出:

start task [2021-06-01 23:03:49]
threadName: [ForkJoinPool.commonPool-worker-3] taskName:[任务C] time:[2021-06-01 23:03:49]
threadName: [ForkJoinPool.commonPool-worker-2] taskName:[任务B] time:[2021-06-01 23:03:50]
threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务A] time:[2021-06-01 23:03:51]
end task [2021-06-01 23:03:51]

上述这种方式执行可以看出CompletableFuture默认使用的是ForkJoinPool.commonPool线程池,居然用的默认线程池那线程数是如何配置的呢?后来找到源码发现commonPool线程池配置代码如下

  1. 先去看看java环境变量有没有制定线程数(如果没有特殊制定默认没有)
  2. 如果没有配置则通过操作系统的核心数减一来设置线程数(我理解的减一应该是为了给main thread执行)
  3. 这种默认配置方式适合用于CPU密集型任务,如果IO型需要我们自己去配置线程池

并行执行-自定义线程池

不是所有任务都是CPU密集型,为了解决上述问题,尤其是IO场景,我们需要根据业务场景配置合理线程数充分使其利用cpu资源。
如何合理配置线程数可以参考我之前文章

    @SneakyThrows@Testvoid ParTestWithThreadPool(){String start = LocalDateTime.now().format(formatter);System.out.println(String.format("start task [%s]", start));ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(24, 32, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));CompletableFuture[] futures = new CompletableFuture[3];futures[0] = CompletableFuture.runAsync(new TaskA(), customThreadPool);futures[1] = CompletableFuture.runAsync(new TaskB(), customThreadPool);futures[2] = CompletableFuture.runAsync(new TaskC(), customThreadPool);CompletableFuture.allOf(futures).get();String end = LocalDateTime.now().format(formatter);System.out.println(String.format("end task [%s]", end));}

输出:

start task [2021-06-02 00:00:05]
threadName: [pool-1-thread-3] taskName:[任务C] time:[2021-06-02 00:00:05]
threadName: [pool-1-thread-2] taskName:[任务B] time:[2021-06-02 00:00:06]
threadName: [pool-1-thread-1] taskName:[任务A] time:[2021-06-02 00:00:07]
end task [2021-06-02 00:00:07]

阻塞等待:多并行任务执行完再执行

A、B并行都执行完后再执行C任务

    @AfterTestvoid after(){String end = LocalDateTime.now().format(formatter);System.out.println(String.format("end task [%s]", end));}@Testvoid SeqAndParTest() throws ExecutionException, InterruptedException {ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));CompletableFuture[] futures = new CompletableFuture[2];futures[0] = CompletableFuture.runAsync(new TaskA(), customThreadPool);futures[1] = CompletableFuture.runAsync(new TaskB(), customThreadPool);CompletableFuture.allOf(futures).get();CompletableFuture.runAsync(new TaskC(), customThreadPool).get();}

输出:

start task [2021-06-02 16:56:42]
threadName: [pool-1-thread-2] taskName:[任务B] time:[2021-06-02 16:56:43]
threadName: [pool-1-thread-1] taskName:[任务A] time:[2021-06-02 16:56:44]
threadName: [pool-1-thread-3] taskName:[任务C] time:[2021-06-02 16:56:44]
end task [2021-06-02 16:56:44]

从输出中能看出B、A任务并发执行完成以后再执行C任务

任意一个任务并发执行完就执行下个任务

A、B并发执行,只要有一个执行完就执行C任务

anyOf:只要有任意一个CompletableFuture结束,就可以做接下来的事情,而无须像AllOf那样,等待所有的CompletableFuture结束

    @Testvoid anyOf() throws ExecutionException, InterruptedException {ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));CompletableFuture[] futures = new CompletableFuture[2];futures[0] = CompletableFuture.runAsync(new TaskA(), customThreadPool);futures[1] = CompletableFuture.runAsync(new TaskB(), customThreadPool);CompletableFuture.anyOf(futures).get();CompletableFuture.runAsync(new TaskC(), customThreadPool).get();}

输出:

start task [2021-06-02 17:43:42]
threadName: [pool-1-thread-2] taskName:[任务B] time:[2021-06-02 17:43:43]
threadName: [pool-1-thread-3] taskName:[任务C] time:[2021-06-02 17:43:43]
-----------
end task [2021-06-02 17:43:43]

串并行任务依赖场景

    @Testvoid multiSeqAndParTest() throws ExecutionException, InterruptedException {ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));CompletableFuture.runAsync(new TaskA(), customThreadPool).get();CompletableFuture[] futures = new CompletableFuture[2];futures[0] = CompletableFuture.runAsync(new TaskB(), customThreadPool).thenRun(new TaskC());futures[1] = CompletableFuture.runAsync(new TaskD(), customThreadPool).thenRun(new TaskE());CompletableFuture.allOf(futures).get();CompletableFuture.runAsync(new TaskF(), customThreadPool).get();}

输出:

start task [2021-06-02 17:33:35]
threadName: [pool-1-thread-1] taskName:[任务A] time:[2021-06-02 17:33:37]
-----------
threadName: [pool-1-thread-3] taskName:[任务D] time:[2021-06-02 17:33:37]
threadName: [pool-1-thread-3] taskName:[任务E] time:[2021-06-02 17:33:37]
-----------
threadName: [pool-1-thread-2] taskName:[任务B] time:[2021-06-02 17:33:38]
threadName: [pool-1-thread-2] taskName:[任务C] time:[2021-06-02 17:33:38]
-----------
threadName: [pool-1-thread-4] taskName:[任务F] time:[2021-06-02 17:33:38]
end task [2021-06-02 17:33:38]

带返回值/参数传递任务

模拟任务

String taskA(){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务A", LocalDateTime.now().format(formatter));return v;}String taskB(){try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务B", LocalDateTime.now().format(formatter));return v;}String taskC(){try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务C", LocalDateTime.now().format(formatter));return v;}

带返回值实现

supplyAsync():异步执行并带返回值

    @Testvoid supplyAsync() throws ExecutionException, InterruptedException {CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> taskA());String result = stringCompletableFuture.get();System.out.println(result);}String taskA(){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return "hello";}

串行执行

thenApply(): 后面跟的是一个有参数、有返回值的方法,称为Function。返回值是CompletableFuture类型。
thenAccept():上个任务结束再执行(前面任务的结果作为下一个任务的入参)下一个任务

    String taskA(){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务A", LocalDateTime.now().format(formatter));return v;}void taskC(String param){try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务C", LocalDateTime.now().format(formatter));System.out.println(param + "\n ->" + v);}@Testvoid seqTest1() throws ExecutionException, InterruptedException {CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> taskA()).thenApply(param -> {String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务B", LocalDateTime.now().format(formatter));return param + "\n ->" + v;}).thenAccept(param -> taskC(param));completableFuture.get();}

输出:

start task [2021-06-03 11:14:27]
threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务A] time:[2021-06-03 11:14:30]->threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务B] time:[2021-06-03 11:14:30]->threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务C] time:[2021-06-03 11:14:31]
end task [2021-06-03 11:14:31]

多线程任务串行执行

A、B、C任务在多个线程环境下执行,但是执行需要带要带参数传递A->B->C,感觉这种使用场景比较少

thenCompose():第1个参数是一个CompletableFuture类型,第2个参数是一个方法,并且是一个BiFunction,也就是该方法有2个输入参数,1个返回值。从该接口的定义可以大致推测,它是要在2个 CompletableFuture 完成之后,把2个CompletableFuture的返回值传进去,再额外做一些事情。

模拟任务:

    String taskA(){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务A", LocalDateTime.now().format(formatter));return v;}String taskB(String param){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务B", LocalDateTime.now().format(formatter));return param + "\n ->" + v;}String taskC2(String param){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务C", LocalDateTime.now().format(formatter));return param + "\n ->" + v;}

实现一:

        @Testvoid multiCompletableFutureSeqTest() throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> taskA()).thenCompose(firstTaskReturn -> CompletableFuture.supplyAsync(() -> taskB(firstTaskReturn))).thenCompose(secondTaskReturn -> CompletableFuture.supplyAsync(() -> taskC2(secondTaskReturn)));System.out.println(future.get());}

输出:

start task [2021-06-03 15:04:45]
threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务A] time:[2021-06-03 15:04:48]->threadName: [ForkJoinPool.commonPool-worker-2] taskName:[任务B] time:[2021-06-03 15:04:51]->threadName: [ForkJoinPool.commonPool-worker-2] taskName:[任务C] time:[2021-06-03 15:04:54]
end task [2021-06-03 15:04:54]

对任务并行执行,返回值combine

如果希望返回值是一个非嵌套的CompletableFuture,可以使用thenCompose

    @SneakyThrows@Testvoid multiCombineTest(){CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> taskA()).thenCombine(CompletableFuture.supplyAsync(() -> taskB2()), (s1, s2) -> s1 + "\n" +  s2 + "\n" + "combine: " + Thread.currentThread().getName()).thenCombine(CompletableFuture.supplyAsync(() -> taskC2()), (s1, s2) -> s1 + "\n" +  s2 + "\n" + "combine: " + Thread.currentThread().getName());System.out.println(future.get());}

写在最后

推荐一个大佬的并发编程框架,文章思路是照着他的readme去写的

基于CompletableFuture并发任务编排实现相关推荐

  1. JAVA基于CompletableFuture的流水线并行处理深度实践,满满干货

    在项目开发中,后端服务对外提供API接口一般都会关注响应时长.但是某些情况下,由于业务规划逻辑的原因,我们的接口可能会是一个聚合信息处理类的处理逻辑,比如我们从多个不同的地方获取数据,然后汇总处理为最 ...

  2. 基于DAG的任务编排框架/平台

    一.任务编排工作流 任务编排是什么意思呢,顾名思义就是可以把"任务"这个原子单位按照自己的方式进行编排,任务之间可能互相依赖.复杂一点的编排之后就能形成一个 workflow 工作 ...

  3. 【多线程】优雅使用线程池结合CompletableFuture实现异步编排

    文章目录 参考 1.线程池引入 2.Executors 2.1.概述 2.2.Executors缺陷 3.优雅的创建线程池 3.1.正确挑选方法 3.2.线程池配置类 4.线程池执行流程 5.Comp ...

  4. 基于高并发的数据采集器

    项目背景: 数据采集是当前很多智能设备都需要的.数据类型有很多种,有字符串,有json等等.交互协议有基于tcp的,有基于http的.现在针对原先项目面临的问题做出解决方案. 2.面临问题 1:并发量 ...

  5. 打造基于大并发通信技术及大数据技术的O2O系统

    2019独角兽企业重金招聘Python工程师标准>>> 本文来自于个推CTO叶新江在2015Qcon的分享整理. 截止2015年6月,个推SDK累计接入总用户数达50亿 (其中海外近 ...

  6. 基于多线程并发-原子量实现自旋锁

    一.数据竞争的两种处理方式 多线程的核心是CPU的时间分片,同一时刻只能有一个线程获取到锁.对于没有获取到锁的线程通常有两种处理方式:自旋锁,没有获取到锁的线程会一直循环等待判断资源是否已经释放锁,不 ...

  7. JAVA基于CompletableFuture的流水线并行处理深度实践

    在项目开发中,后端服务对外提供API接口一般都会关注响应时长.但是某些情况下,由于业务规划逻辑的原因,我们的接口可能会是一个聚合信息处理类的处理逻辑,比如我们从多个不同的地方获取数据,然后汇总处理为最 ...

  8. oracle EBS 基于Host并发程序的开发(转)

    参考此编文章 http://www.doc88.com/p-0972680953307.html http://www.cnblogs.com/benio/archive/2011/06/10/207 ...

  9. 并发编程之深入理解十三:CompletionService CompletableFuture

    CompletionService Callable + Future 可以实现多个task并行执行,但是如果遇到前面的task执行较慢时需要阻塞等待前面的task执行完后面task才能取得结果. 而 ...

最新文章

  1. 安装eclipse时一直跳转JRE Missing页面
  2. Hugepages你用了吗?--原理概念篇
  3. 用python turtle库画正方形_用Python Turtle画一个正方形
  4. Function.prototype.bind相关知识点
  5. php和c语言的字符数组中,字符数组和字符串的区别,C语言字符数组和字符串区别详解...
  6. C++11 thread使用
  7. 调整VirtualBox虚拟机分辨率的方法
  8. Go 并发 多线程 goroutine channel 实例
  9. 从cross entropy 推导到 KL Divergence
  10. ERP系统和ERP软件的介绍
  11. 自己设计过App的数据库框架?还是只是停留在使用ormlite greenDao这类框架,一篇文章帮你解答...
  12. 小米手机 开启 开发者模式
  13. win7远程桌面链接
  14. 【NLP】模型压缩与蒸馏!BERT的忒修斯船
  15. 刨根问底:什么是yum源,yum的工作原理又是什么
  16. 农夫、羊、菜和狼的故事
  17. 关于chrome、edge浏览器f12开发者模式的application中无法添加参数的问题
  18. Affinity Designer Beta(mac设计绘图工具)
  19. get_article_info
  20. qt-gui的GUI hint参数

热门文章

  1. idea怎么更改推到github的路径_IDEA 拉取、上传、更新 项目到 Gitee+GitHub_超详细超简单版...
  2. server sql 众数_sql 语句系列(众数中位数与百分比)[八百章之第十五章]
  3. python 二分法调试代码,Python实现二分法
  4. php 如何守护进程_PHP 如何实现守护进程
  5. java修改已创建程序界面_Java应用程序的Web用户界面
  6. python提取文本中的字符串到新的txt_Python实现jieba对文本分词并写入新的文本文件,然后提取出文本中的关键词...
  7. java list能作为入参吗_springmvc 不支持 List 对象作为 方法的参数
  8. 【LeetCode笔记】112 113. 路径总和 I II(Java、递归、DFS)
  9. python报错defined_python问卷星报错NameError: name 'filename' is not defined
  10. java监听剪贴板_在java中实现windows剪贴板监视