前面我们不止一次提到,用多线程优化性能,其实不过就是将串行操作变成并行操作。如果仔细观察,你还会发现在串行转换成并行的过程中,一定会涉及到异步化,例如下面的示例代码,现在是串行的,为了提升性能,我们得把它们并行化。

// 以下两个方法都是耗时操作

doBizA();

doBizB();

//创建两个子线程去执行就可以了,两个操作已经被异步化了。

new Thread(()->doBizA())

.start();

new Thread(()->doBizB())

.start();

异步化,是并行方案得以实施的基础,更深入地讲其实就是:利用多线程优化性能这个核心方案得以实施的基础。Java 在 1.8 版本提供了 CompletableFuture 来支持异步编程。

CompletableFuture 的核心优势

为了领略 CompletableFuture 异步编程的优势,这里我们用 CompletableFuture 重新实现前面曾提及的烧水泡茶程序。首先还是需要先完成分工方案,在下面的程序中,我们分了 3 个任务:任务 1 负责洗水壶、烧开水,任务 2 负责洗茶壶、洗茶杯和拿茶叶,任务 3 负责泡茶。其中任务 3 要等待任务 1 和任务 2 都完成后才能开始。这个分工如下图所示。

烧水泡茶分工方案

// 任务 1:洗水壶 -> 烧开水

CompletableFuture f1 =

CompletableFuture.runAsync(()->{

System.out.println("T1: 洗水壶...");

sleep(1, TimeUnit.SECONDS);

System.out.println("T1: 烧开水...");

sleep(15, TimeUnit.SECONDS);

});

// 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶

CompletableFuture f2 =

CompletableFuture.supplyAsync(()->{

System.out.println("T2: 洗茶壶...");

sleep(1, TimeUnit.SECONDS);

System.out.println("T2: 洗茶杯...");

sleep(2, TimeUnit.SECONDS);

System.out.println("T2: 拿茶叶...");

sleep(1, TimeUnit.SECONDS);

return " 龙井 ";

});

// 任务 3:任务 1 和任务 2 完成后执行:泡茶

CompletableFuture f3 =

f1.thenCombine(f2, (__, tf)->{

System.out.println("T1: 拿到茶叶:" + tf);

System.out.println("T1: 泡茶...");

return " 上茶:" + tf;

});

// 等待任务 3 执行结果

System.out.println(f3.join());

void sleep(int t, TimeUnit u) {

try {

u.sleep(t);

}catch(InterruptedException e){}

}

// 一次执行结果:

T1: 洗水壶...

T2: 洗茶壶...

T1: 烧开水...

T2: 洗茶杯...

T2: 拿茶叶...

T1: 拿到茶叶: 龙井

T1: 泡茶...

上茶: 龙井

从整体上来看,我们会发现

无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;

语义更清晰,例如f3 = f1.thenCombine(f2, ()->{}) 能够清晰地表述“任务 3 要等待任务 1 和任务 2 都完成后才能开始”;

代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。

领略 CompletableFuture 异步编程的优势之后,下面我们详细介绍 CompletableFuture 的使用。

创建 CompletableFuture 对象

创建 CompletableFuture 对象主要靠下面代码中展示的这 4 个静态方法,我们先看前两个。在烧水泡茶的例子中,我们已经使用了runAsync(Runnable runnable) 和 supplyAsync(Supplier supplier),它们之间的区别是:Runnable 接口的 run() 方法没有返回值,而 Supplier 接口的 get() 方法是有返回值的。

前两个方法和后两个方法的区别在于:后两个方法可以指定线程池参数。

默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。

// 使用默认线程池

static CompletableFuture

runAsync(Runnable runnable)

static CompletableFuture

supplyAsync(Supplier supplier)

// 可以指定线程池

static CompletableFuture

runAsync(Runnable runnable, Executor executor)

static CompletableFuture

supplyAsync(Supplier supplier, Executor executor)

创建完 CompletableFuture 对象之后,会自动地异步执行 runnable.run() 方法或者 supplier.get() 方法,对于一个异步操作,你需要关注两个问题:一个是异步操作什么时候结束,另一个是如何获取异步操作的执行结果。因为 CompletableFuture 类实现了 Future 接口,所以这两个问题你都可以通过 Future 接口来解决。另外,CompletableFuture 类还实现了 CompletionStage 接口,这个接口内容实在是太丰富了,在 1.8 版本里有 40 个方法,这些方法我们该如何理解呢?

理解 CompletionStage 接口

可以站在分工的角度类比一下工作流。任务是有时序关系的,比如有串行关系、并行关系、汇聚关系等。这样说可能有点抽象,这里还举前面烧水泡茶的例子,其中洗水壶和烧开水就是串行关系,洗水壶、烧开水和洗茶壶、洗茶杯这两组任务之间就是并行关系,而烧开水、拿茶叶和泡茶就是汇聚关系。

串行关系

并行关系

汇聚关系

CompletionStage 接口可以清晰地描述任务之间的这种时序关系,例如前面提到的

f3 = f1.thenCombine(f2, ()->{}) 描述的就是一种汇聚关系。烧水泡茶程序中的汇聚关系是一种 AND 聚合关系,这里的 AND 指的是所有依赖的任务(烧开水和拿茶叶)都完成后才开始执行当前任务(泡茶)。既然有 AND 聚合关系,那就一定还有 OR 聚合关系,所谓 OR 指的是依赖的任务只要有一个完成就可以执行当前任务。

最后就是异常,CompletionStage 接口也可以方便地描述异常处理。

下面我们就来一一介绍,CompletionStage 接口如何描述串行关系、AND 聚合关系、OR 聚合关系以及异常处理。

1. 描述串行关系

CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口。

thenApply 系列函数里参数 fn 的类型是接口 Function,这个接口里与 CompletionStage 相关的方法是R apply(T t),这个方法既能接收参数也支持返回值,所以 thenApply 系列方法返回的是CompletionStage。

而 thenAccept 系列方法里参数 consumer 的类型是接口Consumer,这个接口里与 CompletionStage 相关的方法是void accept(T t),这个方法虽然支持参数,但却不支持回值,所以 thenAccept 系列方法返回的是CompletionStage

thenRun 系列方法里 action 的参数是 Runnable,所以 action 既不能接收参数也不支持返回值,所以 thenRun 系列方法返回的也是CompletionStage

这些方法里面 Async 代表的是异步执行 fn、consumer 或者 action。其中,需要你注意的是 thenCompose 系列方法,这个系列的方法会新创建出一个子流程,最终结果和 thenApply 系列是相同的。

CompletionStage thenApply(fn);

CompletionStage thenApplyAsync(fn);

CompletionStage thenAccept(consumer);

CompletionStage thenAcceptAsync(consumer);

CompletionStage thenRun(action);

CompletionStage thenRunAsync(action);

CompletionStage thenCompose(fn);

CompletionStage thenComposeAsync(fn);

通过下面的示例代码,你可以看一下 thenApply() 方法是如何使用的。首先通过 supplyAsync() 启动一个异步流程,之后是两个串行操作,整体看起来还是挺简单的。不过,虽然这是一个异步流程,但任务①②③却是串行执行的,②依赖①的执行结果,③依赖②的执行结果。

CompletableFuture f0 =

CompletableFuture.supplyAsync(

() -> "Hello World") //①

.thenApply(s -> s + " QQ") //②

.thenApply(String::toUpperCase);//③

System.out.println(f0.join());

// 输出结果

HELLO WORLD QQ

2. 描述 AND 汇聚关系

CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。

CompletionStage thenCombine(other, fn);

CompletionStage thenCombineAsync(other, fn);

CompletionStage thenAcceptBoth(other, consumer);

CompletionStage thenAcceptBothAsync(other, consumer);

CompletionStage runAfterBoth(other, action);

CompletionStage runAfterBothAsync(other, action);

3. 描述 OR 汇聚关系

CompletionStage 接口里面描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。

CompletionStage applyToEither(other, fn);

CompletionStage applyToEitherAsync(other, fn);

CompletionStage acceptEither(other, consumer);

CompletionStage acceptEitherAsync(other, consumer);

CompletionStage runAfterEither(other, action);

CompletionStage runAfterEitherAsync(other, action);

CompletableFuture f1 =

CompletableFuture.supplyAsync(()->{

int t = getRandom(5, 10);

sleep(t, TimeUnit.SECONDS);

return String.valueOf(t);

});

CompletableFuture f2 =

CompletableFuture.supplyAsync(()->{

int t = getRandom(5, 10);

sleep(t, TimeUnit.SECONDS);

return String.valueOf(t);

});

CompletableFuture f3 =

f1.applyToEither(f2,s -> s);

System.out.println(f3.join());

4. 异常处理

虽然上面我们提到的 fn、consumer、action 它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常 ,例如下面的代码,执行

CompletableFuture

f0 = CompletableFuture.

.supplyAsync(()->(7/0))

.thenApply(r->r*10);

System.out.println(f0.join());

CompletionStage 接口给我们提供的方案非常简单,比 try{}catch{}还要简单,下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。

CompletionStage exceptionally(fn);

CompletionStage whenComplete(consumer);

CompletionStage whenCompleteAsync(consumer);

CompletionStage handle(fn);

CompletionStage handleAsync(fn);

下面的示例代码展示了如何使用 exceptionally() 方法来处理异常,exceptionally() 的使用非常类似于 try{}catch{}中的 catch{},但是由于支持链式编程方式,所以相对更简单。

whenComplete() 和 handle() 系列方法就类似于 try{}finally{}中的 finally{},无论是否发生异常都会执行 whenComplete() 中的回调函数 consumer 和 handle() 中的回调函数 fn。

whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。

CompletableFuture

f0 = CompletableFuture

.supplyAsync(()->7/0))

.thenApply(r->r*10)

.exceptionally(e->0);

System.out.println(f0.join());

总结

不过最近几年,伴随着 ReactiveX 的发展(Java 语言的实现版本是 RxJava),回调地狱已经被完美解决了,Java 语言也开始官方支持异步编程:在 1.8 版本提供了 CompletableFuture,在 Java 9 版本则提供了更加完备的 Flow API,异步编程目前已经完全工业化。

CompletableFuture 已经能够满足简单的异步编程需求,如果你对异步编程感兴趣,可以重点关注 RxJava 这个项目,利用 RxJava,即便在 Java 1.6 版本也能享受异步编程的乐趣。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

java 并发 异步_Java并发 CompletableFuture异步编程的实现相关推荐

  1. java并发排它锁_Java并发编程进阶——锁(解析)

    一.锁是什么 java开发中进行并发编程时针对操作同一块区域时,如果不加锁会出现并发问题,数据不是自己预计得到的值.我觉得有点像mysql事务中脏读.不可重复读.幻读的问题.加锁的目的是为了保证同一时 ...

  2. java并发常量_Java并发编程-常量对象(七)

    在创建后状态不再发生改变的对象称作常量对象(Immutable Objects).常量对象其可靠性使其广泛地用作开发简单可靠代码的策略.常量对象在开发并发程序中非常有用.由于创建后不能被改变状态,它们 ...

  3. java 并发执行_Java并发执行器的懒惰开发人员简介

    java 并发执行 如果我告诉您util.concurrent API自2004年起提供此类服务,我就会自欺欺人.但是,我想回顾一下一些很酷的功能. 并发专家,现在是时候关闭该窗口了. 所有其他人,请 ...

  4. java8异步任务,Java8 - CompletableFuture 异步编程类

    其他异步编程方法 Java5中新增了Future接口,用来描述异步计算的结果.虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得 ...

  5. java executor 异步_Java并发编程11-异步执行框架Executor

    1 Executor框架的简介 1.5后引入的Executor框架的最大优点是把任务的提交和执行解耦.要执行任务的人只需把Task描述清楚,然后提交即可.这个Task是怎么被执行的,被谁执行的,什么时 ...

  6. java 并发统计_java并发编程|CountDownLatch计数器

    0x01,CountDownLatch介绍 CountDownLatch是一个计数器,作为java并发编程中三个组件之一,这个组件的使用频率还是很多的.这里分享下自己画的java并发编程组件的图,后面 ...

  7. java 并发队列_JAVA并发编程:阻塞队列BlockingQueue之SynchronousQueue

    前面在讲解Executors工厂创建可缓存线程的线程池(newCachedThreadPool)的时候有提到过SynchronousQueue队列,该线程池使用 SynchronousQueue 作为 ...

  8. java投票锁_Java并发编程锁之独占公平锁与非公平锁比较

    Java并发编程锁之独占公平锁与非公平锁比较 公平锁和非公平锁理解: 在上一篇文章中,我们知道了非公平锁.其实Java中还存在着公平锁呢.公平二字怎么理解呢?和我们现实理解是一样的.大家去排队本着先来 ...

  9. java lock 对象_Java并发编程锁系列之ReentrantLock对象总结

    Java并发编程锁系列之ReentrantLock对象总结 在Java并发编程中,根据不同维度来区分锁的话,锁可以分为十五种.ReentranckLock就是其中的多个分类. 本文主要内容:重入锁理解 ...

最新文章

  1. 史上最简洁的UITableView Sections 展示包含NSDicionary 的NSArray
  2. 教你用百度地图API抓取建筑物周边位置、房价信息(附代码)
  3. BZOJ2298 [HAOI2011]problem a 【dp】
  4. label自适应高度
  5. OpenShift — Overview
  6. MySQL事务与存储引擎相关设置
  7. Docker - 避免启动container后运行shell脚本执行完成后docker退出container
  8. 从zabbix的数据库获取数据
  9. 【三维路径规划】基于matlab人工蜂群算法无人机三维路径规划【含Matlab源码 021期】
  10. 嵌入式linux 中文输入法,一种用于嵌入式Linux系统的中文拼音输入法的制作方法...
  11. 微信小程序经典开源代码汇总
  12. Quartz的CronTrigger
  13. 【python量化】统计套利之配对交易策略实现(基于python)
  14. 在excel中创建日历
  15. 聚类分析通俗易懂解释
  16. ppt如何替换其他mo ban_有没有一个 PPT 技巧让自己觉得人生都亮了?
  17. qs计算机圣安排名,2020年QS世界大学排名圣安德鲁斯大学排名第100
  18. 将PNG序列帧图片合成视频
  19. Python:变身超级赛亚人
  20. AutoCAD 描图方法小结

热门文章

  1. 巨杉数据库完成数亿元D轮融资,引领金融级分布式数据库发展
  2. 复购分析实践中,Pandas 遇到了大难题
  3. 阿里影业“云智开放平台”炼成记!
  4. 麒麟 810 实体芯片亮相;1325 个安卓应用私自搜集数据;Linux Kernel 5.2 发布 | 极客头条...
  5. 程序员为什么焦虑于编程语言和框架?
  6. 时至 2018 年,还有必要学 Vim 吗?
  7. TIOBE 5 月编程语言排行榜:暴涨的 C,逆袭的 Scala
  8. 区块链技术人才严重不足,平均薪资 2.58 万
  9. 距离一个优秀程序员,你还差多少?
  10. Android 手机成监控:仍在“逃亡”的斯诺登开发了款反间谍应用