尽管CompletableFuture大约是两年前(!)于2014年3月在Java 8中引入的,但它仍然是一个相对较新的概念。但是,此类不是很广为人知是一件好事,因为它很容易被滥用,尤其是在线程和线程方面。一路涉及的线程池。 本文旨在描述如何将线程与CompletableFuture一起使用。

运行任务

这是API的基本部分。 有一个便捷的supplyAsync()方法类似于ExecutorService.submit() ,但是返回CompletableFuture

CompletableFuture<String> future =CompletableFuture.supplyAsync(() -> {try (InputStream is = new URL("http://www.nurkiewicz.com").openStream()) {log.info("Downloading");return IOUtils.toString(is, StandardCharsets.UTF_8);} catch (IOException e) {throw new RuntimeException(e);}});

问题是, supplyAsync()默认情况下使用ForkJoinPool.commonPool() ,所有CompletableFuture ,所有并行流以及部署在同一JVM上的所有应用程序之间共享的线程池(如果不幸的是,仍然使用具有许多已部署工件的应用程序服务器) 。 这个硬编码的,不可配置的线程池完全在我们的控制范围之外,难以监视和扩展。 因此,您应该始终指定自己的Executor ,例如此处(并查看我如何创建一个的一些技巧 ):

ExecutorService pool = Executors.newFixedThreadPool(10);final CompletableFuture<String> future =CompletableFuture.supplyAsync(() -> {//...}, pool);

但这仅仅是开始……

回调和转换

假设您要转换给定的CompletableFuture ,例如,提取String的长度:

CompletableFuture<Integer> intFuture =future.thenApply(s -> s.length());

究竟是谁调用s.length()代码? 坦白地说,我亲爱的开发人员,我们不给该死[1] 。 只要像thenApply这样的所有运算符中的lambda表达式thenApply便宜,我们就不在乎谁调用它。 但是,如果此表达式花费一点CPU时间来完成或进行阻塞的网络调用怎么办?

首先,默认情况下会发生什么? 想想看:我们有一个String类型的后台任务,我们想在该值完成后异步应用一些特定的转换。 最简单的实现方法是包装原始任务(返回String ),并在完成任务时对其进行拦截。 内部任务完成后,我们的回调开始,应用转换并返回修改后的值。 这就像介于我们的代码和原始计算结果之间的一个方面。 话虽这么说,很明显s.length()转换将在与原始任务相同的线程中执行,是吗? 不完全的!

CompletableFuture<String> future =CompletableFuture.supplyAsync(() -> {sleepSeconds(2);return "ABC";}, pool);future.thenApply(s -> {log.info("First transformation");return s.length();
});future.get();
pool.shutdownNow();
pool.awaitTermination(1, TimeUnit.MINUTES);future.thenApply(s -> {log.info("Second transformation");return s.length();
});

当任务仍在运行时,将注册thenApply()的第一个转换。 因此,它将在任务完成后立即在与任务相同的线程中执行。 但是,在注册第二个转换之前,我们要等到任务实际完成为止。 更糟糕的是,我们完全关闭了线程池,以确保在那里没有其他代码可以执行。 那么哪个线程将运行第二次转换? 我们知道它必须立即发生,因为future我们在已经完成的回调上进行注册。 事实证明,默认情况下使用客户端线程(!)! 输出如下:

pool-1-thread-1 | First transformation main | Second transformation

在注册了第二个转换后,它意识到CompletableFuture已经完成,因此它立即执行了转换。 周围没有其他线程,因此在当前main线程的上下文中调用thenApply() 。 当实际的转换成本很高时,就会出现这种行为容易出错的最大原因。 想象一下thenApply() lambda表达式进行了一些繁重的计算或阻塞了网络调用。 突然,我们的异步CompletableFuture阻止了调用线程!

控制回调的线程池

有两种技术可以控制哪个线程执行我们的回调和转换。 请注意,仅当您的转换成本很高时才需要这些解决方案。 否则,差异可以忽略不计。 因此,首先我们可以选择*Async版本的运算符,例如:

future.thenApplyAsync(s -> {log.info("Second transformation");return s.length();
});

这次,第二个转换自动卸载给我们的朋友ForkJoinPool.commonPool()

pool-1-thread-1                  | First transformation
ForkJoinPool.commonPool-worker-1 | Second transformation

但是我们不喜欢commonPool所以我们提供自己的:

future.thenApplyAsync(s -> {log.info("Second transformation");return s.length();
}, pool2);

请注意,使用了不同的线程池( pool-1pool-2 ):

pool-1-thread-1 | First transformation
pool-2-thread-1 | Second transformation

将回调视为另一个计算步骤

但是我相信,如果您在长时间运行的回调和转换方面遇到麻烦(请记住,本文适用于CompletableFuture上的几乎所有其他方法),则应该简单地使用另一个显式的CompletableFuture ,例如:

//Imagine this is slow and costly
CompletableFuture<Integer> strLen(String s) {return CompletableFuture.supplyAsync(() -> s.length(),pool2);
}//...CompletableFuture<Integer> intFuture = future.thenCompose(s -> strLen(s));

这种方法更加明确。 知道我们的转换成本很高,因此我们不冒险在任意或不受控制的线程上运行它。 相反,我们将其显式建模为从StringCompletableFuture<Integer>异步操作。 但是,我们必须将thenApply()替换为thenCompose() ,否则最终将获得CompletableFuture<CompletableFuture<Integer>>

但是,如果我们的转换没有一个与嵌套CompletableFuture applyToEither()的版本,例如, applyToEither()等待第一个Future完成并应用转换,该怎么办?

CompletableFuture<CompletableFuture<Integer>> poor = future1.applyToEither(future2, s -> strLen(s));

有一个方便的技巧可以“解包”这种晦涩的数据结构,称为flatten ,可以使用flatMap(identity) (或flatMap(x -> x) )轻松实现。 在我们的例子中, flatMap()称为thenComposeduh! ):

CompletableFuture<Integer> good = poor.thenCompose(x -> x);

我由您自己决定如何运作以及为什么运作。 我希望本文CompletableFuture您更清楚地了解如何在CompletableFuture中涉及线程。

翻译自: https://www.javacodegeeks.com/2015/12/thread-executes-completablefutures-tasks-callbacks.html

哪个线程执行CompletableFuture的任务和回调?相关推荐

  1. c++ 异步下获取线程执行结果_异步编排(CompletableFuture异步调用)

    1.问题背景 问题:当查询接口较复杂时候,数据的获取都需要远程调用,必然需要花费更多的时间. 假如查询文章详情页面,需要如下标注的时间才能完成: 那么,用户需要4s后才能统计的数据.很显然是不能接受的 ...

  2. java 多线程,及获取线程执行结果

    2019独角兽企业重金招聘Python工程师标准>>> Thread.Runnable 创建多线程 new Thread(new Runnable() {public void ru ...

  3. CompletableFuture计算完成时回调方法

    当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action.主要是下面的方法: public CompletableFuture<T> whenCo ...

  4. shell 获取命令执行结果_java高并发系列 第31天:获取线程执行结果,这6种方法你都知道?...

    这是java高并发系列第31篇. 环境:jdk1.8. java高并发系列已经学了不少东西了,本篇文章,我们用前面学的知识来实现一个需求: 在一个线程中需要获取其他线程的执行结果,能想到几种方式?各有 ...

  5. c++11线程池的实现原理及回调函数的使用

    关于线程池 简单来说就是有一堆已经创建好的线程(最大数目一定),初始时他们都处于空闲状态.当有新的任务进来,从线程池中取出一个空闲的线程处理任务然后当任务处理完成之后,该线程被重新放回到线程池中,供其 ...

  6. 面试官:如何让主线程等待所有的子线程执行结束之后再执行

    java 主线程等待所有子线程执行完毕在执行,在工作总往往会遇到异步去执行某段逻辑, 然后先处理其他事情, 处理完后再把那段逻辑的处理结果进行汇总(比如用户下单一个产品,后台会做一系列的处理,为了提高 ...

  7. 面试官:如何让主线程等待所有的子线程执行结束之后再执行?我懵了

    使用Thread的join方法 package com.qcy.testThreadFinish;/*** @author qcy* @create 2020/09/09 17:05:23*/ pub ...

  8. C#线程池ThreadPool.QueueUserWorkItem接收线程执行的方法返回值

    最近在项目中需要用到多线程,考虑了一番,选择了ThreadPool,我的需求是要拿到线程执行方法的返回值, 但是ThreadPool.QueueUserWorkItem的回调方法默认是没有返回值的,搜 ...

  9. 阻塞主线程,等待异步子线程执行完毕再退出主线程,有几种写法?

    这里实际是考察线程间的通信,正常情况下,主线程里启动异步线程执行某个方法,理论上主线程和这个异步线程是并行执行,互不干扰,但是现在要求异步线程执行完毕方法之后,才能继续执行主线程,实际是如何阻塞主线程 ...

最新文章

  1. JavaScript对象的创建
  2. IOS开发笔记5-C语言基础复习
  3. C++中构造函数和析构函数可以抛出异常吗?
  4. 【转】Android 之最新最全的Intent传递数据方法
  5. linux mysql dns_Linux下搭建DNS服务器及踩坑
  6. 局部加权线性回归(Locally weighted linear regression)
  7. 【FFMPEG系列】FFMPEG linux下集成x264
  8. websocket连接相关的几个问题
  9. 通俗易懂的进程与线程解释
  10. 《js读取本地json文件》及浏览器跨域设置、《js保存json到本地》
  11. 2008年全国大学生数学建模D题(加附件)
  12. JS继承--圣杯模式的详解
  13. apache性能调优(转)
  14. 吃海鲜搭配什么菜好 搭配这些健康又美味
  15. 骑士amp;魔法 java_程序员穿越异世界,骑士与魔法!
  16. 联想笔记本声音太小怎么办_电脑声音特别的小是怎么回事?我的笔记本
  17. GitHub 上值得收藏的100个精选前端项目!
  18. Mac系统brew install 安装报错 Error: Failure while executing
  19. Android项目gen目下没有R.class文件 解决方法
  20. 有关研究生教育的话题

热门文章

  1. Tomcat集群session复制与Oracle的坑
  2. 人脸检测的model类facemodel
  3. 插值查找+代码实现+注意事项
  4. java notify 指定_java的notify/notifyAll:如何notify指定的线程?
  5. nbiot开发需要掌握什么_学习软件开发需要准备什么?
  6. 对象集合中如何用对象的某个属性给对象排序?
  7. 配置oracle网络连接命令,配置oracle网络环境
  8. 阿卡姆疯人院需要java吗_蝙蝠侠阿甘疯人院 这个报错 怎么解决 哪位大神知道...
  9. mybatis-启动源码分析
  10. HDU1864(01背包)