哪个线程执行CompletableFuture的任务和回调?
尽管CompletableFuture
大约是两年前(!)于2014年3月在Java 8中引入的,但它仍然是一个相对较新的概念。但是,此类不是很广为人知是一件好事,因为它很容易被滥用,尤其是在线程和线程方面。一路涉及的线程池。 本文旨在描述如何将线程与CompletableFuture
一起使用。
运行任务
这是API的基本部分。 有一个便捷的supplyAsync()
方法类似于ExecutorService.submit()
,但是返回CompletableFuture
:
CompletableFuture<String> future =CompletableFuture.supplyAsync(() -> {try (InputStream is = new URL("http://www.nurkiewicz.com").openStream()) {log.info("Downloading");return IOUtils.toString(is, StandardCharsets.UTF_8);} catch (IOException e) {throw new RuntimeException(e);}});
问题是, supplyAsync()
默认情况下使用ForkJoinPool.commonPool()
,所有CompletableFuture
,所有并行流以及部署在同一JVM上的所有应用程序之间共享的线程池(如果不幸的是,仍然使用具有许多已部署工件的应用程序服务器) 。 这个硬编码的,不可配置的线程池完全在我们的控制范围之外,难以监视和扩展。 因此,您应该始终指定自己的Executor
,例如此处(并查看我如何创建一个的一些技巧 ):
ExecutorService pool = Executors.newFixedThreadPool(10);final CompletableFuture<String> future =CompletableFuture.supplyAsync(() -> {//...}, pool);
但这仅仅是开始……
回调和转换
假设您要转换给定的CompletableFuture
,例如,提取String
的长度:
CompletableFuture<Integer> intFuture =future.thenApply(s -> s.length());
究竟是谁调用s.length()
代码? 坦白地说,我亲爱的开发人员,我们不给该死[1] 。 只要像thenApply
这样的所有运算符中的lambda表达式thenApply
便宜,我们就不在乎谁调用它。 但是,如果此表达式花费一点CPU时间来完成或进行阻塞的网络调用怎么办?
首先,默认情况下会发生什么? 想想看:我们有一个String
类型的后台任务,我们想在该值完成后异步应用一些特定的转换。 最简单的实现方法是包装原始任务(返回String
),并在完成任务时对其进行拦截。 内部任务完成后,我们的回调开始,应用转换并返回修改后的值。 这就像介于我们的代码和原始计算结果之间的一个方面。 话虽这么说,很明显s.length()
转换将在与原始任务相同的线程中执行,是吗? 不完全的!
CompletableFuture<String> future =CompletableFuture.supplyAsync(() -> {sleepSeconds(2);return "ABC";}, pool);future.thenApply(s -> {log.info("First transformation");return s.length();
});future.get();
pool.shutdownNow();
pool.awaitTermination(1, TimeUnit.MINUTES);future.thenApply(s -> {log.info("Second transformation");return s.length();
});
当任务仍在运行时,将注册thenApply()
的第一个转换。 因此,它将在任务完成后立即在与任务相同的线程中执行。 但是,在注册第二个转换之前,我们要等到任务实际完成为止。 更糟糕的是,我们完全关闭了线程池,以确保在那里没有其他代码可以执行。 那么哪个线程将运行第二次转换? 我们知道它必须立即发生,因为future
我们在已经完成的回调上进行注册。 事实证明,默认情况下使用客户端线程(!)! 输出如下:
pool-1-thread-1 | First transformation main | Second transformation
在注册了第二个转换后,它意识到CompletableFuture
已经完成,因此它立即执行了转换。 周围没有其他线程,因此在当前main
线程的上下文中调用thenApply()
。 当实际的转换成本很高时,就会出现这种行为容易出错的最大原因。 想象一下thenApply()
lambda表达式进行了一些繁重的计算或阻塞了网络调用。 突然,我们的异步CompletableFuture
阻止了调用线程!
控制回调的线程池
有两种技术可以控制哪个线程执行我们的回调和转换。 请注意,仅当您的转换成本很高时才需要这些解决方案。 否则,差异可以忽略不计。 因此,首先我们可以选择*Async
版本的运算符,例如:
future.thenApplyAsync(s -> {log.info("Second transformation");return s.length();
});
这次,第二个转换自动卸载给我们的朋友ForkJoinPool.commonPool()
:
pool-1-thread-1 | First transformation
ForkJoinPool.commonPool-worker-1 | Second transformation
但是我们不喜欢commonPool
所以我们提供自己的:
future.thenApplyAsync(s -> {log.info("Second transformation");return s.length();
}, pool2);
请注意,使用了不同的线程池( pool-1
与pool-2
):
pool-1-thread-1 | First transformation
pool-2-thread-1 | Second transformation
将回调视为另一个计算步骤
但是我相信,如果您在长时间运行的回调和转换方面遇到麻烦(请记住,本文适用于CompletableFuture
上的几乎所有其他方法),则应该简单地使用另一个显式的CompletableFuture
,例如:
//Imagine this is slow and costly
CompletableFuture<Integer> strLen(String s) {return CompletableFuture.supplyAsync(() -> s.length(),pool2);
}//...CompletableFuture<Integer> intFuture = future.thenCompose(s -> strLen(s));
这种方法更加明确。 知道我们的转换成本很高,因此我们不冒险在任意或不受控制的线程上运行它。 相反,我们将其显式建模为从String
到CompletableFuture<Integer>
异步操作。 但是,我们必须将thenApply()
替换为thenCompose()
,否则最终将获得CompletableFuture<CompletableFuture<Integer>>
。
但是,如果我们的转换没有一个与嵌套CompletableFuture
applyToEither()
的版本,例如, applyToEither()
等待第一个Future
完成并应用转换,该怎么办?
CompletableFuture<CompletableFuture<Integer>> poor = future1.applyToEither(future2, s -> strLen(s));
有一个方便的技巧可以“解包”这种晦涩的数据结构,称为flatten
,可以使用flatMap(identity)
(或flatMap(x -> x)
)轻松实现。 在我们的例子中, flatMap()
称为thenCompose
( duh! ):
CompletableFuture<Integer> good = poor.thenCompose(x -> x);
我由您自己决定如何运作以及为什么运作。 我希望本文CompletableFuture
您更清楚地了解如何在CompletableFuture
中涉及线程。
翻译自: https://www.javacodegeeks.com/2015/12/thread-executes-completablefutures-tasks-callbacks.html
哪个线程执行CompletableFuture的任务和回调?相关推荐
- c++ 异步下获取线程执行结果_异步编排(CompletableFuture异步调用)
1.问题背景 问题:当查询接口较复杂时候,数据的获取都需要远程调用,必然需要花费更多的时间. 假如查询文章详情页面,需要如下标注的时间才能完成: 那么,用户需要4s后才能统计的数据.很显然是不能接受的 ...
- java 多线程,及获取线程执行结果
2019独角兽企业重金招聘Python工程师标准>>> Thread.Runnable 创建多线程 new Thread(new Runnable() {public void ru ...
- CompletableFuture计算完成时回调方法
当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action.主要是下面的方法: public CompletableFuture<T> whenCo ...
- shell 获取命令执行结果_java高并发系列 第31天:获取线程执行结果,这6种方法你都知道?...
这是java高并发系列第31篇. 环境:jdk1.8. java高并发系列已经学了不少东西了,本篇文章,我们用前面学的知识来实现一个需求: 在一个线程中需要获取其他线程的执行结果,能想到几种方式?各有 ...
- c++11线程池的实现原理及回调函数的使用
关于线程池 简单来说就是有一堆已经创建好的线程(最大数目一定),初始时他们都处于空闲状态.当有新的任务进来,从线程池中取出一个空闲的线程处理任务然后当任务处理完成之后,该线程被重新放回到线程池中,供其 ...
- 面试官:如何让主线程等待所有的子线程执行结束之后再执行
java 主线程等待所有子线程执行完毕在执行,在工作总往往会遇到异步去执行某段逻辑, 然后先处理其他事情, 处理完后再把那段逻辑的处理结果进行汇总(比如用户下单一个产品,后台会做一系列的处理,为了提高 ...
- 面试官:如何让主线程等待所有的子线程执行结束之后再执行?我懵了
使用Thread的join方法 package com.qcy.testThreadFinish;/*** @author qcy* @create 2020/09/09 17:05:23*/ pub ...
- C#线程池ThreadPool.QueueUserWorkItem接收线程执行的方法返回值
最近在项目中需要用到多线程,考虑了一番,选择了ThreadPool,我的需求是要拿到线程执行方法的返回值, 但是ThreadPool.QueueUserWorkItem的回调方法默认是没有返回值的,搜 ...
- 阻塞主线程,等待异步子线程执行完毕再退出主线程,有几种写法?
这里实际是考察线程间的通信,正常情况下,主线程里启动异步线程执行某个方法,理论上主线程和这个异步线程是并行执行,互不干扰,但是现在要求异步线程执行完毕方法之后,才能继续执行主线程,实际是如何阻塞主线程 ...
最新文章
- JavaScript对象的创建
- IOS开发笔记5-C语言基础复习
- C++中构造函数和析构函数可以抛出异常吗?
- 【转】Android 之最新最全的Intent传递数据方法
- linux mysql dns_Linux下搭建DNS服务器及踩坑
- 局部加权线性回归(Locally weighted linear regression)
- 【FFMPEG系列】FFMPEG linux下集成x264
- websocket连接相关的几个问题
- 通俗易懂的进程与线程解释
- 《js读取本地json文件》及浏览器跨域设置、《js保存json到本地》
- 2008年全国大学生数学建模D题(加附件)
- JS继承--圣杯模式的详解
- apache性能调优(转)
- 吃海鲜搭配什么菜好 搭配这些健康又美味
- 骑士amp;魔法 java_程序员穿越异世界,骑士与魔法!
- 联想笔记本声音太小怎么办_电脑声音特别的小是怎么回事?我的笔记本
- GitHub 上值得收藏的100个精选前端项目!
- Mac系统brew install 安装报错 Error: Failure while executing
- Android项目gen目下没有R.class文件 解决方法
- 有关研究生教育的话题
热门文章
- Tomcat集群session复制与Oracle的坑
- 人脸检测的model类facemodel
- 插值查找+代码实现+注意事项
- java notify 指定_java的notify/notifyAll:如何notify指定的线程?
- nbiot开发需要掌握什么_学习软件开发需要准备什么?
- 对象集合中如何用对象的某个属性给对象排序?
- 配置oracle网络连接命令,配置oracle网络环境
- 阿卡姆疯人院需要java吗_蝙蝠侠阿甘疯人院 这个报错 怎么解决 哪位大神知道...
- mybatis-启动源码分析
- HDU1864(01背包)