原文:Java 8 CompletableFutures Part I

  • 作者:Bill Bejeck
  • 译者:noONE

译者前言

JDK1.5就增加了Future接口,但是接口使用不是很能满足异步开发的需求,使用起来不是那么友好。所以出现了很多第三方封装的Future,Guava中就提供了一个更好的 ListenableFuture 类,Netty中则提供了一个自己的Future。所以,Java8中的CompletableFuture可以说是解决Future了一些痛点,可以优雅得进行组合式异步编程,同时也更加契合函数式编程。

Java8已经发布了很长一段时间,其中新增了一个很棒的并发控制工具,就是CompletableFuture类。CompletableFuture实现了Future接口,并且它可以显式地设定值,更有意思的是我们可以进行链式处理,并且支持依赖行为,这些行为由CompletableFuture完成所触发。CompletableFuture类似于Guava中的 ListenableFuture 类。它们两个提供了类似的功能,本文不会再对它们进行对比。我已经在之前的文章中介绍过ListenableFutrue。虽然对于ListenableFutrue的介绍有点过时,但是绝大数的知识仍然适用。CompletableFuture的文档已经非常全面了,但是缺少如何使用它们的具体示例 。本文意在通过单元测试中的一系列的简单示例来展示如何使用CompletableFuture。最初我想在一篇文章中介绍完CompleteableFuture,但是信息太多了,分成三部分似乎更好一些:

  1. 创建/组合任务以及为它们增加监听器。
  2. 处理错误以及错误恢复。
  3. 取消或者强制完成。

CompletableFuture 入门

在开始使用CompletableFuture之前, 我们需要了解一些背景知识。CompletableFuture实现了 CompletionStage 接口。javadoc中简明地介绍了CompletionStage

一个可能的异步计算的阶段,当另外一个CompletionStage 完成时,它会执行一个操作或者计算一个值。一个阶段的完成取决于它本身结算的结果,同时也可能反过来触发其他依赖阶段。

CompletionStage 的全部文档的内容很多,所以,我们在这里总结几个关键点:

  1. 计算可以由 Future ,Consumer 或者 Runnable 接口中的 applyaccept 或者 run等方法表示。

  2. 计算的执行主要有以下

    a. 默认执行(可能调用线程)

    b. 使用默认的CompletionStage的异步执行提供者异步执行。这些方法名使用someActionAsync这种格式表示。

    c. 使用 Executor 提供者异步执行。这些方法同样也是someActionAsync这种格式,但是会增加一个Executor参数。

接下来,我会在本文中直接引用CompletableFuture 和 CompletionStage

创建一个CompleteableFuture

创建一个CompleteableFuture很简单,但是不是很清晰。最简单的方法就是使用CompleteableFuture.completedFuture方法,该方法返回一个新的且完结的CompleteableFuture

@Test
public void test_completed_future() throws Exception {String expectedValue = "the expected value";CompletableFuture<String> alreadyCompleted = CompletableFuture.completedFuture(expectedValue);assertThat(alreadyCompleted.get(), is(expectedValue));
}
复制代码

这样看起来有点乏味,稍后,我们就会看到如何创建一个已经完成的CompleteableFuture 会派上用场。

现在,让我们看一下如何创建一个表示异步任务的CompleteableFuture

private static ExecutorService service = Executors.newCachedThreadPool();@Test
public void test_run_async() throws Exception {CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() ->                           System.out.println("running async task"), service);//utility testing methodpauseSeconds(1);assertThat(runAsync.isDone(), is(true));
}@Test
public void test_supply_async() throws Exception {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(simulatedTask(1, "Final Result"), service);assertThat(completableFuture.get(), is("Final Result"));
}
复制代码

在第一个方法中,我们看到了runAsync任务,在第二个方法中,则是supplyAsync的示例。这可能是显而易见的,然而使用runAsync还是使用supplyAsync,这取决于任务是否有返回值。在这两个例子中,我们都提供了一个自定义的Executor,它作为一个异步执行提供者。当使用supplyAsync方法时,我个人认为使用 Callable 而不是一个Supplier似乎更自然一些。因为它们都是函数式接口,Callable与异步任务的关系更紧密一些,并且它还可以抛出受检异常,而Supplier则不会(尽管我们可以通过少量的代码让Supplier抛出受检异常)。

增加监听器

现在,我们可以创建CompleteableFuture 对象去运行异步任务,让我们开始学习如何去“监听”任务的完成,并且执行随后的一些动作。这里重点提一下,当增加对 CompletionStage 对象的追随时,之前的任务需要彻底成功,后续的任务和阶段才能运行。本文会介绍介绍一些处理失败任务的方法,而在CompleteableFuture中链式处理错误的方案会在后续的文章中介绍。

 @Test
public void test_then_run_async() throws Exception {Map<String,String> cache = new HashMap<>();cache.put("key","value");CompletableFuture<String> taskUsingCache =           CompletableFuture.supplyAsync(simulatedTask(1,cache.get("key")),service);CompletableFuture<Void> cleanUp = taskUsingCache.thenRunAsync(cache::clear,service);cleanUp.get();String theValue = taskUsingCache.get();assertThat(cache.isEmpty(),is(true));assertThat(theValue,is("value"));
}
复制代码

这个例子主要展示在第一个CompletableFuture成功结束后,运行一个清理的任务。 在之前的例子中,当最初的任务成功结束后,我们使用Runnable任务执行。我们也可以定义一个后续任务,它可以直接获取之前任务的成功结果。

@Test
public void test_accept_result() throws Exception {CompletableFuture<String> task = CompletableFuture.supplyAsync(simulatedTask(1, "add when done"), service);CompletableFuture<Void> acceptingTask = task.thenAccept(results::add);pauseSeconds(2);assertThat(acceptingTask.isDone(), is(true));assertThat(results.size(), is(1));assertThat(results.contains("add when done"), is(true));
}
复制代码

这是一个使用Accept 方法的例子,该方法会获取CompletableFuture的结果,然后将结果传给一个 Consumer 对象。在Java 8中, Consumer 实例是没有返回值的 ,如果想得到运行的副作用,需要把结果放到一个列表中。

组合与构成任务

除了增加监听器去运行后续任务或者接受CompletableFuture的成功结果,我们还可以组合或者构成任务。

构成任务

构成意味着获取一个成功的CompletableFuture结果作为输入,通过 一个Function 返回另外一个 CompletableFuture。下面是一个使用CompletableFuture.thenComposeAsync的例子:

@Test
public void test_then_compose() throws Exception {Function<Integer,Supplier<List<Integer>>> getFirstTenMultiples = num ->()->Stream.iterate(num, i -> i + num).limit(10).collect(Collectors.toList());Supplier<List<Integer>> multiplesSupplier = getFirstTenMultiples.apply(13);//Original CompletionStageCompletableFuture<List<Integer>> getMultiples = CompletableFuture.supplyAsync(multiplesSupplier, service);//Function that takes input from orignal CompletionStageFunction<List<Integer>, CompletableFuture<Integer>> sumNumbers = multiples ->CompletableFuture.supplyAsync(() -> multiples.stream().mapToInt(Integer::intValue).sum());//The final CompletableFuture composed of previous two.CompletableFuture<Integer> summedMultiples = getMultiples.thenComposeAsync(sumNumbers, service);assertThat(summedMultiples.get(), is(715));
}
复制代码

在这个列子中,第一个CompletionStage提供了一个列表,该列表包含10个数字,每个数字都乘以13。这个提供的Function获取这些结果,并且创建另外一个CompletionStage,它将对列表中的数字求和。

组合任务

组合任务的完成是通过获取两个成功的CompletionStages,并且从中获取BiFunction类型的参数,进而产出另外的结果。以下是一个非常简单的例子用来说明从组合的CompletionStages中获取结果。

@Test
public void test_then_combine_async() throws Exception {CompletableFuture<String> firstTask = CompletableFuture.supplyAsync(simulatedTask(3, "combine all"), service);CompletableFuture<String> secondTask = CompletableFuture.supplyAsync(simulatedTask(2, "task results"), service);CompletableFuture<String> combined = firstTask.thenCombineAsync(secondTask, (f, s) -> f + " " + s, service);assertThat(combined.get(), is("combine all task results"));
}
复制代码

这个例子展示了如何组合两个异步任务的CompletionStage,然而,我们也可以组合已经完成的CompletableFuture的异步任务。 组合一个已知的需要计算的值,也是一种很好的处理方式:

@Test
public void test_then_combine_with_one_supplied_value() throws Exception {CompletableFuture<String> asyncComputedValue = CompletableFuture.supplyAsync(simulatedTask(2, "calculated value"), service);CompletableFuture<String> knowValueToCombine = CompletableFuture.completedFuture("known value");BinaryOperator<String> calcResults = (f, s) -> "taking a " + f + " then adding a " + s;CompletableFuture<String> combined = asyncComputedValue.thenCombine(knowValueToCombine, calcResults);assertThat(combined.get(), is("taking a calculated value then adding a known value"));
}
复制代码

最后,是一个使用CompletableFuture.runAfterbothAsync的例子

@Test
public void test_run_after_both() throws Exception {CompletableFuture<Void> run1 = CompletableFuture.runAsync(() -> {pauseSeconds(2);results.add("first task");}, service);CompletableFuture<Void> run2 = CompletableFuture.runAsync(() -> {pauseSeconds(3);results.add("second task");}, service);CompletableFuture<Void> finisher = run1.runAfterBothAsync(run2,() -> results. add(results.get(0)+ "&"+results.get(1)),service);pauseSeconds(4);assertThat(finisher.isDone(),is(true));assertThat(results.get(2),is("first task&second task"));
}
复制代码

监听第一个结束的任务

在之前所有的例子中,所有的结果需要等待所有的CompletionStage结束,然而,需求并不总是这样的。我们可能需要获取第一个完成的任务的结果。下面的例子展示使用Consumer接受第一个完成的结果:

@Test
public void test_accept_either_async_nested_finishes_first() throws Exception {CompletableFuture<String> callingCompletable = CompletableFuture.supplyAsync(simulatedTask(2, "calling"), service);CompletableFuture<String> nestedCompletable = CompletableFuture.supplyAsync(simulatedTask(1, "nested"), service);CompletableFuture<Void> collector = callingCompletable.acceptEither(nestedCompletable, results::add);pauseSeconds(2);assertThat(collector.isDone(), is(true));assertThat(results.size(), is(1));assertThat(results.contains("nested"), is(true));
}
复制代码

类似功能的CompletableFuture.runAfterEither

@Test
public void test_run_after_either() throws Exception {CompletableFuture<Void> run1 = CompletableFuture.runAsync(() -> {pauseSeconds(2);results.add("should be first");}, service);CompletableFuture<Void> run2 = CompletableFuture.runAsync(() -> {pauseSeconds(3);results.add("should be second");}, service);CompletableFuture<Void> finisher = run1.runAfterEitherAsync(run2,() -> results.add(results.get(0).toUpperCase()),service);pauseSeconds(4);assertThat(finisher.isDone(),is(true));assertThat(results.get(1),is("SHOULD BE FIRST"));}
复制代码

多重组合

到目前为止,所有的组合/构成的例子都只有两个CompletableFuture对象。这里是有意为之,为了让例子尽量的简单明了。我们可以组合任意数量的CompletionStage。请注意,下面例子仅仅是为了说明而已!

@Test
public void test_several_stage_combinations() throws Exception {Function<String,CompletableFuture<String>> upperCaseFunction = s -> CompletableFuture.completedFuture(s.toUpperCase());CompletableFuture<String> stage1 = CompletableFuture.completedFuture("the quick ");CompletableFuture<String> stage2 = CompletableFuture.completedFuture("brown fox ");CompletableFuture<String> stage3 = stage1.thenCombine(stage2,(s1,s2) -> s1+s2);CompletableFuture<String> stage4 = stage3.thenCompose(upperCaseFunction);CompletableFuture<String> stage5 = CompletableFuture.supplyAsync(simulatedTask(2,"jumped over"));CompletableFuture<String> stage6 = stage4.thenCombineAsync(stage5,(s1,s2)-> s1+s2,service);CompletableFuture<String> stage6_sub_1_slow = CompletableFuture.supplyAsync(simulatedTask(4,"fell into"));CompletableFuture<String> stage7 = stage6.applyToEitherAsync(stage6_sub_1_slow,String::toUpperCase,service);CompletableFuture<String> stage8 = CompletableFuture.supplyAsync(simulatedTask(3," the lazy dog"),service);CompletableFuture<String> finalStage = stage7.thenCombineAsync(stage8,(s1,s2)-> s1+s2,service);assertThat(finalStage.get(),is("THE QUICK BROWN FOX JUMPED OVER the lazy dog"));
}
复制代码

需要注意的是,组合CompletionStage的时候并不保证顺序。在这些单元测试中,提供了一个时间去模拟任务以确保完成顺序。

小结

本文主要是使用CompletableFuture类的第一部分。在后续文章中,将主要介绍错误处理及恢复,强制完成或取消。

资源

  • CompletableFuture
  • CompletionStage
  • Source Code

关注我:

Java 8 CompletableFuture相关推荐

  1. Java 8 CompletableFuture 教程

    Java 8 有大量的新特性和增强如 Lambda 表达式,Streams,CompletableFuture等.在本篇文章中我将详细解释清楚CompletableFuture以及它所有方法的使用. ...

  2. JUC系列(十一) | Java 8 CompletableFuture 异步编程

    多线程一直Java开发中的难点,也是面试中的常客,趁着还有时间,打算巩固一下JUC方面知识,我想机会随处可见,但始终都是留给有准备的人的,希望我们都能加油!!! 沉下去,再浮上来,我想我们会变的不一样 ...

  3. Java 8 CompletableFuture 教程 1

    Java 8 有大量的新特性和增强如 Lambda 表达式,Streams,CompletableFuture等.在本篇文章中我将详细解释清楚CompletableFuture以及它所有方法的使用. ...

  4. rx.observable_使用Java 8 CompletableFuture和Rx-Java Observable

    rx.observable 我想探索一个使用Java 8 CompletableFuture和Rx-Java Observable的简单分散聚集场景. 场景很简单–产生大约10个任务,每个任务返回一个 ...

  5. 使用Java 8 CompletableFuture和Rx-Java Observable

    我想使用Java 8 CompletableFuture和Rx-Java Observable探索一个简单的分散聚集场景. 场景很简单–产生大约10个任务,每个任务返回一个字符串,最终将结果收集到一个 ...

  6. JAVA基于CompletableFuture的流水线并行处理深度实践,满满干货

    在项目开发中,后端服务对外提供API接口一般都会关注响应时长.但是某些情况下,由于业务规划逻辑的原因,我们的接口可能会是一个聚合信息处理类的处理逻辑,比如我们从多个不同的地方获取数据,然后汇总处理为最 ...

  7. 一篇文章搞清楚Java中CompletableFuture的使用

    ---------- Yesterday is history, tomorrow is a mystery, but today is a gift. That is why it's called ...

  8. Java 9 CompletableFuture 进化小脚步

    简介 Java 9附带了对CompletableFuture类的一些更改.这些更改是作为JEP 266的一部分引入的,以便解决自JDK 8引入以来的一些问题,更具体地说,支持延迟和超时,抽象出一个工具 ...

  9. Java 8 CompletableFuture 浅入

    Java 1.5 有了 Future, 可谓是跨了一大步,继而 Java 1.8 新加入一个 Future 的实现 CompletableFuture, 从此线程与线程之间可以愉快的对话了.最初两个线 ...

最新文章

  1. 北京大学北京天然气水合物国际研究中心招聘生信博后
  2. 异常处理——上传文件到HDFS,put: `.': No such file or directory
  3. php的防csrf攻击,zblog php添加Token防止CSRF攻击
  4. java socket smtp_JAVA Socket实现smtp发送邮件
  5. Codeforces Round #592 (Div. 2) G. Running in Pairs 构造(水)
  6. MacOSX环境上的多个Java JDK
  7. GitHub的嵌入式开源项目
  8. php 判断编码 错误,请教一个 PHP 代码出错的原因(一个简单的计算器)
  9. php 编码规范哪些_PHP 代码规范有哪些【详细讲解】
  10. 成为一名架构师得学习哪些知识?
  11. chrome插件推荐
  12. 深度学习优化算法大全系列6:Adam
  13. GPRS DTU是什么?其工作原理是什么?
  14. ENVI软件图像放缩出现重影的解决办法
  15. The content of element type mapper must match (cache-ref|cache|resultMap*|parameterMap*|sql*|inse
  16. 与其去雄安买房,不如找中企动力建自己的平台
  17. Jquery鼠标点击后变色,点击另一个按钮颜色还原
  18. 【数据挖掘】-决策树算法+代码实现(七)
  19. c语言浮点数能用八进制输出不,深析C语言浮点型数据的输入输出
  20. php保存文件快捷键,word保存快捷键是ctrl加什么

热门文章

  1. java读取16位深png_读取16位灰度TIFF
  2. 80核处理器_标压版锐龙处理器更香!联想小新Pro 13轻薄笔记本评测
  3. xpath contains_Python 爬虫进阶: Scrapy Shell 和 Xpath 学习心得
  4. websocket中发生数据丢失_什么是WebSocket,它与HTTP有何不同?
  5. python模拟鼠标拖动_Python+Selenium自动化篇-6-模拟鼠标操作
  6. mysqldump单个库导出_初相识 | 全方位认识 sys 系统库
  7. python有哪些方面_Python学习中最基本的内容,看看有哪些需要我们学习的
  8. 如何判断两个平面相交_数学提高平面与平面垂直的判定方法是什么
  9. 将字符串转换为数组_LeetCode 树 108.将有序数组转换为二叉搜索树
  10. 商品领域ddd_DDD 领域驱动设计-商品建模之路