概要

在ExecutorService的submit方法中可以获取返回值,通过Future的get方法,但是这个Future类存在缺陷,Future接口调用get()方法取得处理后的返回结果时具有阻塞性,也就是说调用Future的get方法时,任务没有执行完成,则get方法要一直阻塞等到任务完成为止。 这样大大的影响了系统的性能,这就是Future的最大缺点。为此,java1.5以后提供了CompletionServlice来解决这个问题。

CompletionService 接口CompletionService的功能是异步的方式,一边生产任务,一边处理完成的任务结果,这样可以将执行的任务与处理任务隔离开来进行处理,使用submit执行任务,使用塔克获取已完成的任务,并按照这些任务的完成的时间顺序来处理他们的结果。

示例

向ExecutorService 提交一组任务,哪个任务先完成,就把完成任务的返回结果打印出来。

public class CompletionServiceExecutorDemo {public static void main(String[] args) {ExecutorService threadPool = Executors.newFixedThreadPool(10);// 同时运行多个任务,那个任务先返回数据,就先获取该数据CompletionService<String> completionService = new ExecutorCompletionService<String>(threadPool);for (int i = 1; i <= 10; i++) {final int seq = i;completionService.submit(new Callable<String>() {@Overridepublic String call() throws Exception {int waitTime = new Random().nextInt(10);TimeUnit.SECONDS.sleep(waitTime);return "callable:"+seq+" 执行时间:"+waitTime+"s";}});}for (int i = 1; i <= 10; i++) {try {Future<String> future = completionService.take();System.out.println(future.get());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}threadPool.shutdown();}
}

执行结果如下:

callable:6 执行时间:1s
callable:2 执行时间:3s
callable:10 执行时间:3s
callable:1 执行时间:4s
callable:4 执行时间:5s
callable:8 执行时间:5s
callable:7 执行时间:7s
callable:5 执行时间:8s
callable:9 执行时间:9s
callable:3 执行时间:9s

从打印结果可以看出,这些任务是按照任务执行完成的顺序打印的,先执行完就先返回结果。

ExecutorCompletionService 源码分析

ExecutorCompletionService 类结构如下

public class ExecutorCompletionService<V> implements CompletionService<V> {private final Executor executor;  //线程池private final AbstractExecutorService aes;  private final BlockingQueue<Future<V>> completionQueue;  //任务完成队列private class QueueingFuture extends FutureTask<Void> {QueueingFuture(RunnableFuture<V> task) {super(task, null);this.task = task;}protected void done() { completionQueue.add(task); }private final Future<V> task;}

ExecutorCompletionService 类中定义了一个QueueingFuture 的内部类,继承于FutureTask类,内部重写了FutureTask的done方法,该方法是在FutureTask任务执行完成后会调用的方法,在FutureTask中该方法未实现任何逻辑。

重写done方法,在任务处理完成后把该FutureTask任务放入到阻塞队列(BlockingQueue)中,然后我们就可以从阻塞队列中take执行完成的任务,进行想用的处理。

这里是实现ExecutorCompletionService的核心逻辑。

newTaskFor 方法

private RunnableFuture<V> newTaskFor(Callable<V> task) {if (aes == null)return new FutureTask<V>(task);elsereturn aes.newTaskFor(task);}private RunnableFuture<V> newTaskFor(Runnable task, V result) {if (aes == null)return new FutureTask<V>(task, result);elsereturn aes.newTaskFor(task, result);}

ExecutorCompletionService 支持Callable和Runnable任务
1. 把用户提交的Callable任务转成FutureTask。
2. 把用户提交的Runnable任务转成FutureTask。

ExecutorCompletionService 构造方法1

    public ExecutorCompletionService(Executor executor) {if (executor == null)throw new NullPointerException();this.executor = executor;this.aes = (executor instanceof AbstractExecutorService) ?(AbstractExecutorService) executor : null;this.completionQueue = new LinkedBlockingQueue<Future<V>>();}
  1. 连接池(executor)不能为空。
  2. 判断该线程池是否AbstractExecutorService类型,如果是则赋值给aes,否则赋值null
    (aes作用:把用户提交的Callable和Runnable任务转换成FutureTask)
  3. 创建一个阻塞队列。(存放执行完成的FutureTask任务)

ExecutorCompletionService 构造方法2

    public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue) {if (executor == null || completionQueue == null)throw new NullPointerException();this.executor = executor;this.aes = (executor instanceof AbstractExecutorService) ?(AbstractExecutorService) executor : null;this.completionQueue = completionQueue;}

该构造可以指定一个阻塞队列,其它功能同上构造方法。

submit方法

    public Future<V> submit(Callable<V> task) {if (task == null) throw new NullPointerException();RunnableFuture<V> f = newTaskFor(task);executor.execute(new QueueingFuture(f));return f;}public Future<V> submit(Runnable task, V result) {if (task == null) throw new NullPointerException();RunnableFuture<V> f = newTaskFor(task, result);executor.execute(new QueueingFuture(f));return f;}

该方法可以向ExecutorCompletionService 中提交要执行的任务。
支持Callable和Runnable两种类型的任务。
如果提交的Runnable任务,则执行完后返回的结果为null。

    public Future<V> take() throws InterruptedException {return completionQueue.take();}

从阻塞队列中获取执行完成的任务的,如果队列为空且任务没有全部完成,则阻塞当前线程,直到有任务执行完成。

    public Future<V> poll() {return completionQueue.poll();}public Future<V> poll(long timeout, TimeUnit unit)throws InterruptedException {return completionQueue.poll(timeout, unit);}}

ExecutorCompletionService支持非阻塞方式从阻塞队列中获取已完成的任务
1. 可以通过poll方法来从阻塞队列中获取任务,如果队列为空,则直接返回null,不会阻塞当前线程。
2.支持等待多长时间来从阻塞队列中获取已经完成的任务。

总结

ExecutorCompletionService的实现原理是内部使用了FutureTask来实现异步的任务执行。通过一个内部类继承FutureTask,并实现了FutureTask的一个done方法。该done方法会在任务执行完成之后调用该方法,在任务执行完之后把当前的FutureTask放入到阻塞队列中。这样就实现了先执行完成的任务先存放到阻塞队列中,应用程序可以从阻塞队列中提前获取先执行完的任务。

本人简书blog地址:http://www.jianshu.com/u/1f0067e24ff8    
点击这里快速进入简书

GIT地址:http://git.oschina.net/brucekankan/
点击这里快速进入GIT

ExecutorCompletionService 源码分析相关推荐

  1. FutureTask源码分析

    2019独角兽企业重金招聘Python工程师标准>>> 在JCU中,FutureTask是Future的具体实现,且实现了Runnable接口,即FutureTask满足了Task的 ...

  2. Solr初始化源码分析-Solr初始化与启动

    用solr做项目已经有一年有余,但都是使用层面,只是利用solr现有机制,修改参数,然后监控调优,从没有对solr进行源码级别的研究.但是,最近手头的一个项目,让我感觉必须把solrn内部原理和扩展机 ...

  3. 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析

    目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...

  4. SpringBoot-web开发(四): SpringMVC的拓展、接管(源码分析)

    [SpringBoot-web系列]前文: SpringBoot-web开发(一): 静态资源的导入(源码分析) SpringBoot-web开发(二): 页面和图标定制(源码分析) SpringBo ...

  5. SpringBoot-web开发(二): 页面和图标定制(源码分析)

    [SpringBoot-web系列]前文: SpringBoot-web开发(一): 静态资源的导入(源码分析) 目录 一.首页 1. 源码分析 2. 访问首页测试 二.动态页面 1. 动态资源目录t ...

  6. SpringBoot-web开发(一): 静态资源的导入(源码分析)

    目录 方式一:通过WebJars 1. 什么是webjars? 2. webjars的使用 3. webjars结构 4. 解析源码 5. 测试访问 方式二:放入静态资源目录 1. 源码分析 2. 测 ...

  7. Yolov3Yolov4网络结构与源码分析

    Yolov3&Yolov4网络结构与源码分析 从2018年Yolov3年提出的两年后,在原作者声名放弃更新Yolo算法后,俄罗斯的Alexey大神扛起了Yolov4的大旗. 文章目录 论文汇总 ...

  8. ViewGroup的Touch事件分发(源码分析)

    Android中Touch事件的分发又分为View和ViewGroup的事件分发,View的touch事件分发相对比较简单,可参考 View的Touch事件分发(一.初步了解) View的Touch事 ...

  9. View的Touch事件分发(二.源码分析)

    Android中Touch事件的分发又分为View和ViewGroup的事件分发,先来看简单的View的touch事件分发. 主要分析View的dispatchTouchEvent()方法和onTou ...

最新文章

  1. oracle 时间相关
  2. python编码规范手册-Python官方竟然给出了一种编码规范PEP 8
  3. Mac解决终端显示乱码
  4. [Python人工智能] 一.TensorFlow环境搭建及神经网络入门
  5. hibernate 双向一对多 关联在多端维护
  6. Java中classLoader浅析
  7. 利用python读取点矢量对应栅格值
  8. 高精度加法(A+BProblemII)
  9. (17)VHDL实现编码器
  10. JavaScript中的输入输出语句
  11. ssh连接失败,排错经验
  12. 推荐几个好的粉碎文件的软件?这3款软件让你彻底摆脱无法删除文件的烦恼
  13. ALFA深度学习外观检测自学习人工智能软件
  14. 2022秋招前端面试题(一)(附答案)
  15. 我爱大自然教案计算机,我们热爱大自然教案.doc
  16. android3.0快捷键,向日葵新出版本安卓3.0手机远程控制手机详细教程
  17. 疫情期间在家办公信息安全指南
  18. IT行业发展前景如何?揭秘2019年IT行业发展趋势
  19. Android 增强版百分比布局库 为了适配而扩展
  20. 美国智能网联最新政策动态(2021年9-11月)

热门文章

  1. Django中的shell,和数据增删查改
  2. 博士申请 | 宾夕法尼亚州立大学招收机器学习/人工智能方向全奖博士
  3. WSDM 2021 | 基于双向推理的多跳知识库问答技术
  4. ACL 2020 开源论文 | 基于Span Prediction的共指消解模型
  5. ALBERT第一作者亲自讲解:词向量、BERT、ALBERT、XLNet全面解析
  6. 【腾讯面试题】Java集合:List、Set以及Map
  7. 零基础实践深度学习之数学基础
  8. 简单计算器 逆波兰表达式
  9. Python-面向对象的编程语言
  10. 西安科技大学计算机考研难度,西安科技大学考研难吗