背景

在一些业务场景下,可能需要对一些任务进行异步化,以提高系统的吞吐量,在微服务的服务调用场景下显得尤为突出。
比如某个接口有五个任务:

  • 任务A:执行时间2秒。
  • 任务B:执行时间2秒。
  • 任务C:执行时间1秒。
  • 任务D:执行时间1秒。
  • 任务E:执行时间1秒。

如果不使用异步任务,使用的是同步任务,那么这个接口的执行总时间至少要7秒中,这在很多场景下是不能接受的。
如果使用异步化,前提是这些任务互不相干,也就是各个任务都不依赖于其他任务的执行结果,那就可以使用任务异步来提高吞吐量,接口执行任务A、B、C、D、E立即返回,然后在最后面对异步任务的执行结果进行汇总即可。那么该接口的执行总时间可能就可以减少到2秒,就可以提高吞吐量。

异步任务可以使用线程来实现,为了实现资源的可控、可重复利用,通常使用线程池来对线程进行管理。

例子:

public class AsyncDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {asyncFun();}//异步任务public static void asyncFun() throws ExecutionException, InterruptedException {long start = System.currentTimeMillis();//创建个核心线程7,最大线程7,等待队列为5,解决策略为抛出异常的固定线程池。ExecutorService executorService = new ThreadPoolExecutor(7,7,0L,TimeUnit.SECONDS,new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());//执行异步任务Future<Integer> tackA = executorService.submit(() -> {return taskA();});Future<Integer> tackB = executorService.submit(() -> {return taskB();});Future<Integer> tackC = executorService.submit(() -> {return taskC();});Future<Integer> tackD = executorService.submit(() -> {return taskD();});Future<Integer> tackE = executorService.submit(() -> {return taskE();});//上面的任务都是异步化执行的,最后对执行结果进行汇总合并Integer A = tackA.get();Integer B = tackB.get();Integer C = tackC.get();Integer D = tackD.get();Integer E = tackE.get();System.out.println("异步任务总用时:" + (System.currentTimeMillis() - start) + "毫秒,执行结果为:" + (A + B + C + D + E));}//同步任务public static void syncFun(){long start = System.currentTimeMillis();int A = taskA();int B = taskB();int C = taskC();int D = taskD();int E = taskE();System.out.println("同步任务总用时:" + (System.currentTimeMillis() - start) + "毫秒,执行结果为:" + (A + B + C + D + E));}public static int taskA(){try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return 2;}public static int taskB(){try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return 2;}public static int taskC(){try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return 1;}public static int taskD(){try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return 1;}public static int taskE(){try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return 1;}
}

执行结果:

但是 如果单单使用线程池来实现任务异步化的话,会有一个弊端,因为在实际业务场景中,会有一些任务的执行依赖于另一任务的执行结果,那么这个任务就要等到另一个任务执行完成才可以执行。这些复杂的场景下如果使用线程池的话,就会把复杂的操作交给我们,但是利用CompletableFuture实现任务编排的话,就可以比较简单的实现这一场景。

CompletableFuture

CompletableFuture可以实现复杂任务的编排。要看懂下面的内容,建议要对java8的lambda表达式和函数式接口有一定的了解。

API方法的说明:

启动一个异步任务CompletableFuture:

//开启一个异步任务,返回CompletableFuture,使用默认线程池,没有返回值。
public static CompletableFuture<Void> runAsync(Runnable task)
//开启一个异步任务,返回CompletableFuture,使用传入自定义线程池,没有返回值。
//一般使用这个而不使用上面的,因为使用自定义线程池更加能做到心中有数。
public static CompletableFuture<Void> runAsync(Runnable task, Executor executor)//如果要返回值的话,就要使用这个了,开启一个异步任务,返回CompletableFuture,使用默认线程池,有返回值。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> task)
//开启一个异步任务,返回CompletableFuture,使用默认线程池,有返回值。
//一般使用这个而不使用上面的,因为使用自定义线程池更加能做到心中有数。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> task, Executor executor)

获取返回值

//因为是异步任务,所以这个方法会阻塞住当前线程,知道异步任务执行完成。
public T get() throws InterruptedException, ExecutionException//阻塞住当前线程,可以设置超时方法,如果超时时间内异步任务还没执行完成,就抛出InterruptedException,中断阻塞。
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException

执行完成回调和异常回调


//当异步任务执行完成时的回调,不管是否出异常都会执行这个回调。回调方法继续使用执行异步任务的线程执行。
//当任务正常执行结束时,异常e为null,当任务抛出异常结束时,异常e不为null。
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> e)//当异步任务执行完成时的回调,不管是否出异常都会执行这个回调。回调方法使用执行异步任务的线程池执行。(可能是同一线程,也可能不同)
//当任务正常执行结束时,异常e为null,当任务抛出异常结束时,异常e不为null。
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> e)//当异步任务执行完成时的回调,不管是否出异常都会执行这个回调。这里指定执行回调方法的线程池,回调方法使用该指定线程池执行。
//当任务正常执行结束时,异常e为null,当任务抛出异常结束时,异常e不为null。
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> e, Executor executor )//这个是专门用于处理异步任务抛出异常时的回调,正常结束不会执行该回调
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> e)

线程串行化方法,进行任务的编排

//当任务B的执行要依赖于任务A的执行完成后才能,并且任务B不需要获得任务A的返回结果,并且任务B也不需要返回结果时,就可以使用该方法。A.thenRun(B)
//传入的参数是任务B,调用方是任务A。表示当任务A执行完成以后,才能执行任务B。
public CompletableFuture<Void> thenRun(Runnable task)//与上面方法类似,只是上面方法两个任务是使用同一线程来完成,而这个方法两个任务使用的线程可能相同,
//也可能不相同,因为使用的是同一线程池,所以可能执行任务B也有可能会使用到了与A一样的线程。
public CompletableFuture<Void> thenRunAsync(Runnable task)//与第一个方法类似,只是这个指定了新的线程池来执行任务B。因为每个线程池都是根据任务的特点来设置参数的,如果任务B适合另外特点的线程池,可以指定一下。
public CompletableFuture<Void> thenRunAsync(Runnable task, Executor executor )//这三个方法与上面上个方法的作用类似,只是这个方法线程B可以接收线程A执行完成的结果作为参数,但是线程B还是没有返回值。
public CompletableFuture<Void> thenAccept(Consumer<? super T> var1)public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> var1)public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> var1, Executor executor)//这三个方法也是类似的,只是这三个方法线程B既可以接受使用线程A执行完成的返回结果,线程B本身也可以返回结果。
public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> var1)public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> var1)public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> var1)

双任务合并,两个任务都执行完成才进行合并:

//A.runAfterBoth(B,task)  A任务与B任务合并,只有A、B任务都完成,才会执行task任务。
//该方法task任务不能获得A、B任务的返回值,并且task任务也没有返回值。
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> CompletionStage, Runnable task)//这三个方法的区别与上面的方法一样。
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> CompletionStage, Runnable task)public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> CompletionStage, Runnable task, Executor executor)//这三个方法与上面的方法作用类似,只是这三个方法的的合并任务可以接受被合并的两个任务的返回值作为参数。但是该合并任务还是没有返回值。public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> var1, BiConsumer<? super T, ? super U> task) public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> var1, BiConsumer<? super T, ? super U> task)public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> var1, Executor var2)public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> var1, BiConsumer<? super T, ? super U> task, Executor var3)//这三个方法与上面的方法作用类似,只是这三个方法的的合并任务可以接受被合并的两个任务的返回值作为参数。并且该合并任务可以返回值。
public <U, V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> var1, BiFunction<? super T, ? super U, ? extends V> var2)public <U, V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> var1, BiFunction<? super T, ? super U, ? extends V> var2)public <U, V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> var1, BiFunction<? super T, ? super U, ? extends V> var2, Executor var3)

双任务合并,只要其中一个任务执行完成就会进行任务合并

//这九个方法与上面的九个方法类似,只是上面的九个方法要合并的两个任务都完成,才能执行合并任务,而这九个方法只要有一个任务执行完成,就开始执行合并任务。
public CompletableFuture<Void> runAfterEither(CompletionStage<?> var1, Runnable var2)public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> var1, Runnable var2)public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> var1, Runnable var2, Executor var3)public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> var1, Consumer<? super T> var2)public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> var1, Consumer<? super T> var2)public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> var1, Consumer<? super T> var2, Executor var3)public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> var1, Function<? super T, U> var2)public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> var1, Function<? super T, U> var2)public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> var1, Function<? super T, U> var2, Executor var3)

多任务组合:

//多个任务组合,然后使用返回的值调用get方法就行阻塞,直到所有任务都执行完成才取消阻塞。
public static CompletableFuture<Void> allOf(CompletableFuture... var0)//多个任务组合,然后使用返回的值调用get方法就行阻塞,只要其中的一个任务执行完成就取消阻塞。
public static CompletableFuture<Object> anyOf(CompletableFuture... var0)

下面场景对上面的部分方法进行演示:
有五个任务:

  • 任务A。有返回值
  • 任务B:要依赖任务A返回的结果。并且有返回值。
  • 任务C:独立。有返回值。
  • 任务D和E的执行完成后进行操作。并有返回值。

最后三个任务结果相加得到最后结果返回。

public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {testCompletableFuture();}public static void testCompletableFuture() throws ExecutionException, InterruptedException {//创建一个线程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(20,20,0L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());//异步任务AbCompletableFuture<Integer> taskAB = CompletableFuture.supplyAsync(()->{return taskA();},threadPoolExecutor).thenApplyAsync((res)->{//传入的res是taskA任务执行的返回结果。return taskB(res);});//异步任务CCompletableFuture<Integer> taskC = CompletableFuture.supplyAsync(()->{return taskC();},threadPoolExecutor);//异步任务DECompletableFuture<Integer> taskDE = CompletableFuture.supplyAsync(()->{return taskD();},threadPoolExecutor).thenCombineAsync(CompletableFuture.supplyAsync(()->{return taskE();},threadPoolExecutor),(resD,ResE)->{//任务DE的结果相加。return resD + ResE;});Integer taskABRes = taskAB.get();Integer taskCRes = taskC.get();Integer taskDERes = taskDE.get();System.out.println(taskABRes + taskCRes + taskDERes);}public static boolean taskA(){return true;}public static int taskB(boolean flag){return flag?10:20;}public static int taskC(){return 15;}public static int taskD(){return 18;}public static int taskE(){return 12;}
}

结果:

因为task总是返回true,而taskB如果是true就返回10,taskC返回15、taskD返回18,taskE返回12,相加等于30,所以最后结果等于10+15+30=55,没错。

java并发编程之CompletableFuture相关推荐

  1. zbb20180929 thread java并发编程之Condition

    java并发编程之Condition 引言 在java中,对于任意一个java对象,它都拥有一组定义在java.lang.Object上监视器方法,包括wait(),wait(long timeout ...

  2. java并发编程之4——Java锁分解锁分段技术

    转载自 java并发编程之4--Java锁分解锁分段技术 并发编程的所有问题,最后都转换成了,"有状态bean"的状态的同步与互斥修改问题.而最后提出的解决"有状态bea ...

  3. Java 并发编程之美:并发编程高级篇之一-chat

    借用 Java 并发编程实践中的话:编写正确的程序并不容易,而编写正常的并发程序就更难了.相比于顺序执行的情况,多线程的线程安全问题是微妙而且出乎意料的,因为在没有进行适当同步的情况下多线程中各个操作 ...

  4. Java 并发编程之美:并发编程高级篇之一

    借用 Java 并发编程实践中的话:编写正确的程序并不容易,而编写正常的并发程序就更难了.相比于顺序执行的情况,多线程的线程安全问题是微妙而且出乎意料的,因为在没有进行适当同步的情况下多线程中各个操作 ...

  5. Java并发编程之CAS第三篇-CAS的缺点

    Java并发编程之CAS第三篇-CAS的缺点 通过前两篇的文章介绍,我们知道了CAS是什么以及查看源码了解CAS原理.那么在多线程并发环境中,的缺点是什么呢?这篇文章我们就来讨论讨论 本篇是<凯 ...

  6. Java并发编程之CyclicBarrier详解

    简介 栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生.栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行.闭锁用于等待事件,而栅栏用于等待其他线程. CyclicBarrier ...

  7. java并发编程之AbstractQueuedSynchronizer

    引言 AbstractQueuedSynchronizer,队列同步器,简称AQS,它是java并发用来构建锁或者其他同步组件的基础框架. 一般使用AQS的主要方式是继承,子类通过实现它提供的抽象方法 ...

  8. Java并发编程之synchronized关键字解析

    前言 公司加班太狠了,都没啥时间充电,这周终于结束了.这次整理了Java并发编程里面的synchronized关键字,又称为隐式锁,与JUC包中的Lock显示锁相对应:这个关键字从Java诞生开始就有 ...

  9. Java并发编程之AbstractQueuedSynchronizer(AQS)源码解析

    自己一个人随便看看源码学习的心得,分享一下啦,不过我觉得还是建议去买本Java并发编程的书来看会比较好点,毕竟个人的理解有限嘛. 独占锁和共享锁 首先先引入这两个锁的概念: 独占锁即同一时刻只有一个线 ...

最新文章

  1. 《C#精彩实例教程》小组阅读08 -- C#流程控制语句
  2. 写给计算机的大学生!
  3. winrar压缩文件但是排除指定目录
  4. 类和对象—对象特性—函数的分类和调用
  5. 【python】 读取Excel文件并绘制图表
  6. GPU(CUDA)学习日记(十一)------ 深入理解CUDA线程层次以及关于设置线程数的思考
  7. 【译】Privacy on the Blockchain
  8. boost::format模块演示添加到 printf 语法的功能
  9. PHP客户端缓存控制
  10. Android 下拉刷新上拉载入 多种应用场景 超级大放送(上)
  11. NumPy 算术函数
  12. 大数据分析,利用向外扩展技术深入挖掘商业价值
  13. 60套漂亮的的免费 PSD 界面设计元素包资源(系列二)
  14. 【LeetCode】【数组】题号:414,第三大的数
  15. ECCV 2018 papers+ oral+ 开源+导读
  16. WolframTones:用一种新科学谱写一种新音乐
  17. windows 安装 perl 教程
  18. win10系统电脑IP地址怎么查找,教程来啦,Windows10系统如何查找ip地址
  19. 《中华颂》朗诵比赛准备
  20. com.lbx:xTools

热门文章

  1. Java讲课笔记08:数组
  2. 【BZOJ3172】单词,AC自动机练习
  3. android 打印流程图,Android实现Activities之间进行数据传递的方法
  4. 【英语学习】【Daily English】U13 Holiday L01 I have been waiting for it for ages!
  5. Intel 64/x86_64/IA-32/x86处理器 - 通用指令(7) - 标志寄存器/标志控制指令 段寄存器指令
  6. 计算机组成原理 中央处理器(CPU) 指令系统
  7. python中range 函数_Python
  8. linux c 多线程终止耗时长的任务,Linux C:从main()返回是否导致多线程应用程序终止?...
  9. 调用另外一个文件_从零开始学Python-Day52-文件读写
  10. 使用Adreno Profiler分析android游戏