ExecutorService VS CompletionService

假设我们有 4 个任务(A, B, C, D)用来执行复杂的计算,每个任务的执行时间随着输入参数的不同而不同,如果将任务提交到 ExecutorService, 相信你已经可以“信手拈来”

ExecutorService executorService = Executors.newFixedThreadPool(4);
List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(executorService.submit(A));
futures.add(executorService.submit(B));
futures.add(executorService.submit(C));
futures.add(executorService.submit(D));// 遍历 Future list,通过 get() 方法获取每个 future 结果
for (Future future:futures) {Integer result = future.get();// 其他业务逻辑
}

先直入主题,用 CompletionService 实现同样的场景

ExecutorService executorService = Executors.newFixedThreadPool(4);// ExecutorCompletionService 是 CompletionService 唯一实现类
CompletionService executorCompletionService= new ExecutorCompletionService<>(executorService );List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(executorCompletionService.submit(A));
futures.add(executorCompletionService.submit(B));
futures.add(executorCompletionService.submit(C));
futures.add(executorCompletionService.submit(D));// 遍历 Future list,通过 get() 方法获取每个 future 结果
for (int i=0; i<futures.size(); i++) {Integer result = executorCompletionService.take().get();// 其他业务逻辑
}

两种方式在代码实现上几乎一毛一样,我们曾经说过 JDK 中不会重复造轮子,如果要造一个新轮子,必定是原有的轮子在某些场景的使用上有致命缺陷

既然新轮子出来了,二者到底有啥不同呢?

如果 Future 结果没有完成,调用 get() 方法,程序会阻塞在那里,直至获取返回结果

先来看第一种实现方式,假设任务 A 由于参数原因,执行时间相对任务 B,C,D 都要长很多,但是按照程序的执行顺序,程序在 get() 任务 A 的执行结果会阻塞在那里,导致任务 B,C,D 的后续任务没办法执行。又因为每个任务执行时间是不固定的,所以无论怎样调整将任务放到 List 的顺序,都不合适,这就是致命弊端

新轮子自然要解决这个问题,它的设计理念就是哪个任务先执行完成,get() 方法就会获取到相应的任务结果,这么做的好处是什么呢?来看个图你就瞬间理解了

两张图一对比,执行时长高下立判了,在当今高并发的时代,这点时间差,在吞吐量上起到的效果可能不是一点半点了

那 CompletionService 是怎么做到获取最先执行完的任务结果的呢?

 

远看CompletionService 轮廓

如果你使用过消息队列,你应该秒懂我要说什么了,CompletionService 实现原理很简单

就是一个将异步任务的生产和任务完成结果的消费解耦的服务

用人话解释一下上面的抽象概念我只能再画一张图了

说白了,哪个任务执行的完,就直接将执行结果放到队列中,这样消费者拿到的结果自然就是最早拿到的那个了

从上图中看到,有任务,有结果队列,那 CompletionService 自然也要围绕着几个关键字做文章了

  • 既然是异步任务,那自然可能用到 Runnable 或 Callable

  • 既然能获取到结果,自然也会用到 Future 了

带着这些线索,我们走进 CompletionService 源码看一看

 

近看 CompletionService 源码

CompletionService  是一个接口,它简单的只有 5 个方法:

Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;

关于 2 个 submit 方法, 我在 不会用Java Future,我怀疑你泡茶没我快 文章中做了非常详细的分析以及案例使用说明,这里不再过多赘述

另外 3 个方法都是从阻塞队列中获取并移除阻塞队列第一个元素,只不过他们的功能略有不同

  • Take: 如果队列为空,那么调用 take() 方法的线程会被阻塞

  • Poll: 如果队列为空,那么调用 poll() 方法的线程会返回 null

  • Poll-timeout: 以超时的方式获取并移除阻塞队列中的第一个元素,如果超时时间到,队列还是空,那么该方法会返回 null

所以说,按大类划分上面5个方法,其实就是两个功能

  • 提交异步任务 (submit)

  • 从队列中拿取并移除第一个元素 (take/poll)

CompletionService 只是接口,ExecutorCompletionService 是该接口的唯一实现类

ExecutorCompletionService 源码分析

先来看一下类结构, 实现类里面并没有多少内容

ExecutorCompletionService 有两种构造函数:

private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;public ExecutorCompletionService(Executor executor) {if (executor == null)throw new NullPointerException();this.executor = executor;this.aes = (executor instanceof AbstractExecutorService) ?(AbstractExecutorService) executor : null;this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue) {if (executor == null || completionQueue == null)throw new NullPointerException();this.executor = executor;this.aes = (executor instanceof AbstractExecutorService) ?(AbstractExecutorService) executor : null;this.completionQueue = completionQueue;
}

两个构造函数都需要传入一个 Executor 线程池,因为是处理异步任务的,我们是不被允许手动创建线程的,所以这里要使用线程池也就很好理解了

另外一个参数是 BlockingQueue,如果不传该参数,就会默认队列为 LinkedBlockingQueue,任务执行结果就是加入到这个阻塞队列中的

所以要彻底理解 ExecutorCompletionService ,我们只需要知道一个问题的答案就可以了:

它是如何将异步任务结果放到这个阻塞队列中的?

想知道这个问题的答案,那只需要看它提交任务之后都做了些什么?

public Future<V> submit(Callable<V> task) {if (task == null) throw new NullPointerException();RunnableFuture<V> f = newTaskFor(task);executor.execute(new QueueingFuture(f));return f;
}

我们前面也分析过,execute 是提交 Runnable 类型的任务,本身得不到返回值,但又可以将执行结果放到阻塞队列里面,所以肯定是在 QueueingFuture 里面做了文章

从上图中看一看出,QueueingFuture 实现的接口非常多,所以说也就具备了相应的接口能力。

重中之重是,它继承了 FutureTask ,FutureTask 重写了 Runnable 的 run() 方法 (方法细节分析可以查看FutureTask源码分析 ) 文中详细说明了,无论是set() 正常结果,还是setException() 结果,都会调用 finishCompletion() 方法:

private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}// 重点 重点 重点done();callable = null;        // to reduce footprint
}

上述方法会执行 done() 方法,而 QueueingFuture 恰巧重写了 FutureTask 的 done() 方法:

方法实现很简单,就是将 task 放到阻塞队列中

protected void done() { completionQueue.add(task);
}

执行到此的 task 已经是前序步骤 set 过结果的 task,所以就可以通过消费阻塞队列获取相应的结果了

相信到这里,CompletionService 在你面前应该没什么秘密可言了

 

CompletionService 的主要用途

在 JDK docs 上明确给了两个例子来说明 CompletionService 的用途:

假设你有一组针对某个问题的solvers,每个都返回一个类型为Result的值,并且想要并发地运行它们,处理每个返回一个非空值的结果,在某些方法使用(Result r)

其实就是文中开头的使用方式

 void solve(Executor e,Collection<Callable<Result>> solvers)throws InterruptedException, ExecutionException {CompletionService<Result> ecs= new ExecutorCompletionService<Result>(e);for (Callable<Result> s : solvers)ecs.submit(s);int n = solvers.size();for (int i = 0; i < n; ++i) {Result r = ecs.take().get();if (r != null)use(r);}}

假设你想使用任务集的第一个非空结果,忽略任何遇到异常的任务,并在第一个任务准备好时取消所有其他任务

void solve(Executor e,Collection<Callable<Result>> solvers)throws InterruptedException {CompletionService<Result> ecs= new ExecutorCompletionService<Result>(e);int n = solvers.size();List<Future<Result>> futures= new ArrayList<Future<Result>>(n);Result result = null;try {for (Callable<Result> s : solvers)futures.add(ecs.submit(s));for (int i = 0; i < n; ++i) {try {Result r = ecs.take().get();if (r != null) {result = r;break;}} catch (ExecutionException ignore) {}}}finally {for (Future<Result> f : futures)// 注意这里的参数给的是 true,详解同样在前序 Future 源码分析文章中f.cancel(true);}if (result != null)use(result);}

这两种方式都是非常经典的 CompletionService 使用 范式 ,请大家仔细品味每一行代码的用意

范式没有说明 Executor 的使用,使用 ExecutorCompletionService,需要自己创建线程池,看上去虽然有些麻烦,但好处是你可以让多个 ExecutorCompletionService 的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险 (这也是我们反复说过多次的,不要所有业务共用一个线程池)

 

总结

CompletionService 的应用场景还是非常多的,比如

  • Dubbo 中的 Forking Cluster

  • 多仓库文件/镜像下载(从最近的服务中心下载后终止其他下载过程)

  • 多服务调用(天气预报服务,最先获取到的结果)

CompletionService 不但能满足获取最快结果,还能起到一定 "load balancer" 作用,获取可用服务的结果,使用也非常简单, 只需要遵循范式即可

并发系列 讲了这么多,分析源码的过程也碰到各种队列,接下来我们就看看那些让人眼花缭乱的队列

 

灵魂追问

  1. 通常处理结果还会用异步方式进行处理,如果采用这种方式,有哪些注意事项?

  2. 如果是你,你会选择使用无界队列吗?为什么?

有道无术,术可成;有术无道,止于术

欢迎大家关注Java之道公众号

好文章,我在看❤️

“既生 ExecutorService, 何生 CompletionService?”相关推荐

  1. 小学生计算机舞蹈,最近“泼水成画”很火?舞蹈生VS体育生,看到计算机:你是来添乱的?...

    最近泼水拍照非常的流行,不知道大家在私底下有没有关注过这个视频,而且在这个视频中,这些花放在水里确实也特别的好看,接下来就一起来看一下,不同的学生拍出来的泼水照片都是什么样的. 首先大家看到的就是舞蹈 ...

  2. 既生瑜何生亮 access_token VS refresh_token

    中国有句老话, 既生瑜何生亮, 既然有我周瑜在世, 为什么老天还要一个诸葛亮啊? 同样的, 众所周知, 在 OAuth 2.0 授权协议中, 也有两个令牌 token , 分别是 access_tok ...

  3. excel如何找到高频词_拟录取后:应届生和往届生档案哪里找;重灾院校区;高频词背诵表...

    今日消息1.应届生和往届生档案哪里找?2.重灾院校区3.考研云督学班高频词背诵表汇总1.应届生和往届生档案哪里找? 往年这个时候论文答辩.复试已经结束,已经进入毕业季!现在你们毕业答辩结束了吗?你们都 ...

  4. python 数学基础_Python3数学基础 - 随笔分类 - 既生喻何生亮 - 博客园

    本系列主要集中于数学知识点,利用python编程描述以往学过的数学知识. 摘要:Kronecker delta 克罗内克函数 Wiki "维基百科" Kronecker delta ...

  5. 转贴:既生瑜何生亮:FreeBSD与Linux再比较

    原贴:http://www.phpchina.com/8051/viewspace_8240.html 传说中FreeBSD比linux稳定,大型网站几乎都建立在FreeBSD系统上,我一直疑惑难道l ...

  6. 计算机系男生生的都是女儿吗,IT男只能生女孩,生男孩几率很小吗?

    "IT男"即指男性网络编辑员.计算机维修工.数据库系统管理员.游戏程序开发师等,"IT"系信息技术.互联网技术.信息论等的缩写.据业内人士传辐射会降低精子活力, ...

  7. 应届生和往届生,报名条件区别汇总!

    即将预报名,今天给大家整理了一下应届生和往届生报名需要注意的点.填写信息和报名需要的材料,大家一定要认真对待. 1 关于应届生和往届生身份确认 应届生是指2022年的毕业生,含普通高校.成人高校.普通 ...

  8. 应届生和往届生,谁更容易考研成功?

    据教育部统计,2017年共201万人报考,其中,应届考生113万人,往届考生88万人:2018年共238万人报考,其中,应届考生131万人,往届考生107万人.从数据可知,2017年,往届生考研人数占 ...

  9. 计算机往届生考研失败找工作,终于发现应届生和往届生考研复试会被歧视吗-考研复习...

    对于20考研的学生来说,或许这一段时间比较的焦虑,一方面是初试成绩公布在即,另一方面,复试准备还在摸索中,有很多问题困扰着我们的考研大学生.而在诸多的复试问题中,其中关于应届生和往届生二者之间,是否会 ...

最新文章

  1. 华人一作统一「视觉-语言」理解与生成:一键生成图像标注,完成视觉问答,Demo可玩...
  2. MFC的“不知从哪调用”的消息处理函数
  3. QWidget::size()和QResizeEvent::size()不一定相同!
  4. Readhat中作安全基线
  5. 复杂推理模型从服务器移植到Web浏览器的理论和实战
  6. 解密昇腾AI处理器--DaVinci架构(控制单元)
  7. vs2013使用remote debug
  8. LINUX安装cuDNN
  9. 关于python编程语法_Python编程入门——基础语法详解
  10. 32.768KHz晶振DST310S成就时钟产业的无限可能
  11. [渝粤题库]西北工业大学大学物理
  12. JavaEE 企业级分布式高级架构师(七)MongoDB学习笔记(3)
  13. 亚马逊Alexa Connect Kit(ACK)
  14. 如何使用css3做简单的动画效果?
  15. Java实现格式化打印慢SQL日志
  16. Python爬虫技巧--selenium解除webdriver特征值
  17. iGuard6.0 — 有序组织的网页防篡改
  18. 三分钟解决文档编辑难题-【文档编辑命令- cat echo vi/vim tail rmdir 】
  19. 恢复W ndows10系统方法步骤,Windows 10系统恢复电脑(刷新电脑)的方法步骤图文教程详解...
  20. 基于C语言的网络电子词典

热门文章

  1. java unsafe 类_Java的Unsafe类
  2. html5视频播放器隐藏控制,HTML5 video标签(播放器)学习笔记(二):播放控制
  3. python语言训练教程_PYTHON零基础快乐学习之旅(K12实战训练)
  4. web程序前后台功能实现_好程序员web前端教程之JS继承实现方式解析
  5. Linux 多线程编程使用pthread_creat()函数条件
  6. 使用foreach循环遍历集合元素
  7. 使用shell脚本或命令行添加、删除 crontab 定时任务
  8. Collections和Collection的区别:
  9. docker命令收集
  10. HTML5 2D平台游戏开发#4状态机