ExecutorCompletionService分析及使用
当我们通过Executor提交一组并发执行的任务,并且希望在每一个任务完成后能立即得到结果,有两种方式可以采取:
方式一:
通过一个list来保存一组future,然后在循环中轮训这组future,直到每个future都已完成。如果我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,那么在调用get方式时,需要将超时时间设置为0
1 public class ExecutorCompletionServiceTest { 2 3 static class Task implements Callable<String> { 4 private int i; 5 6 public Task(int i) { 7 this.i = i; 8 } 9 10 @Override 11 public String call() throws Exception { 12 Thread.sleep(10000); 13 return Thread.currentThread().getName() + "执行完任务:" + i; 14 } 15 } 16 17 public static void main(String[] args) { 18 testUseFuture(); 19 } 20 21 private static void testUseFuture() { 22 int numThread = 5; 23 ExecutorService executor = Executors.newFixedThreadPool(numThread); 24 List<Future<String>> futureList = new ArrayList<Future<String>>(); 25 for (int i = 0; i < numThread; i++) { 26 Future<String> future = executor 27 .submit(new ExecutorCompletionServiceTest.Task(i)); 28 futureList.add(future); 29 } 30 31 while (numThread > 0) { 32 for (Future<String> future : futureList) { 33 String result = null; 34 try { 35 result = future.get(0, TimeUnit.SECONDS); 36 } catch (InterruptedException e) { 37 e.printStackTrace(); 38 } catch (ExecutionException e) { 39 e.printStackTrace(); 40 } catch (TimeoutException e) { 41 // 超时异常直接忽略 42 } 43 if (null != result) { 44 futureList.remove(future); 45 numThread--; 46 System.out.println(result); 47 // 此处必须break,否则会抛出并发修改异常。(也可以通过将futureList声明为CopyOnWriteArrayList类型解决) 48 break; 49 } 50 } 51 } 52 } 53 }
方式二:
第一种方式显得比较繁琐,通过使用ExecutorCompletionService,则可以达到代码最简化的效果。
1 public class ExecutorCompletionServiceTest { 2 3 static class Task implements Callable<String> { 4 private int i; 5 6 public Task(int i) { 7 this.i = i; 8 } 9 10 @Override 11 public String call() throws Exception { 12 Thread.sleep(10000); 13 return Thread.currentThread().getName() + "执行完任务:" + i; 14 } 15 } 16 17 public static void main(String[] args) throws InterruptedException, ExecutionException { 18 testExecutorCompletionService(); 19 } 20 21 private static void testExecutorCompletionService() throws InterruptedException, ExecutionException{ 22 int numThread = 5; 23 ExecutorService executorService = Executors.newFixedThreadPool(numThread); 24 CompletionService<String> completionService = new ExecutorCompletionService<>(executorService); 25 for (int i = 0; i < numThread; i++) { 26 completionService.submit(new ExecutorCompletionServiceTest.Task(i)); 27 } 28 for (int i = 0; i < numThread; i++) { 29 System.out.println(completionService.take().get()); 30 } 31 executorService.shutdown(); 32 } 33 }
ExecutorCompletionService实现了CompletionService接口,CompletionService是Executor和BlockingQueue的结合体。可以看下构造函数
1 public ExecutorCompletionService(Executor executor) { 2 if (executor == null) 3 throw new NullPointerException(); 4 this.executor = executor; 5 this.aes = (executor instanceof AbstractExecutorService) ? 6 (AbstractExecutorService) executor : null; 7 this.completionQueue = new LinkedBlockingQueue<Future<V>>(); 8 }
任务的提交和执行都是委托给Executor来完成。当提交某个任务时,该任务首先将被包装为一个QueueingFuture,
1 public Future<V> submit(Callable<V> task) { 2 if (task == null) throw new NullPointerException(); 3 RunnableFuture<V> f = newTaskFor(task); 4 executor.execute(new QueueingFuture(f)); 5 return f; 6 }
QueueingFuture是FutureTask的一个子类,通过改写该子类的done方法,可以实现当任务完成时,将结果放入到BlockingQueue中。
1 private class QueueingFuture extends FutureTask<Void> { 2 QueueingFuture(RunnableFuture<V> task) { 3 super(task, null); 4 this.task = task; 5 } 6 protected void done() { completionQueue.add(task); } 7 private final Future<V> task; 8 }
而通过使用BlockingQueue的take或poll方法,则可以得到结果。在BlockingQueue不存在元素时,这两个操作会阻塞,一旦有结果加入,则立即返回。
1 public Future<V> take() throws InterruptedException { 2 return completionQueue.take(); 3 } 4 5 public Future<V> poll() { 6 return completionQueue.poll(); 7 }
ExecutorCompletionService分析及使用相关推荐
- 获取Executor提交的并发执行的任务返回结果的两种方式/ExecutorCompletionService使用...
当我们通过Executor提交一组并发执行的任务,并且希望在每一个任务完成后能立即得到结果,有两种方式可以采取: 方式一: 通过一个list来保存一组future,然后在循环中轮训这组future,直 ...
- Future 和 ExecutorCompletionService 对比和使用
附加:Java 4种线程池介绍请查看 谈谈new Thread的弊端及Java四种线程池的使用 当我们通过Executor提交一组并发执行的任务,并且希望在每一个任务完成后能立即得到结果,有两种方式可 ...
- ExecutorCompletionService 源码分析
概要 在ExecutorService的submit方法中可以获取返回值,通过Future的get方法,但是这个Future类存在缺陷,Future接口调用get()方法取得处理后的返回结果时具有阻塞 ...
- Solr初始化源码分析-Solr初始化与启动
用solr做项目已经有一年有余,但都是使用层面,只是利用solr现有机制,修改参数,然后监控调优,从没有对solr进行源码级别的研究.但是,最近手头的一个项目,让我感觉必须把solrn内部原理和扩展机 ...
- java.util.concurrent包详细分析--转
原文地址:http://blog.csdn.net/windsunmoon/article/details/36903901 概述 Java.util.concurrent 包含许多线程安全.测试良好 ...
- FutureTask源码分析
2019独角兽企业重金招聘Python工程师标准>>> 在JCU中,FutureTask是Future的具体实现,且实现了Runnable接口,即FutureTask满足了Task的 ...
- 原理剖析(第 012 篇)Netty之无锁队列MpscUnboundedArrayQueue原理分析
原理剖析(第 012 篇)Netty之无锁队列MpscUnboundedArrayQueue原理分析 - 一.大致介绍 1.了解过netty原理的童鞋,其实应该知道工作线程组的每个子线程都维护了一个任 ...
- 人脸识别SDK调用与分析
该demo时是基于springboot的人脸识别和后期判断的小项目,利用的是第三发的虹软人脸识别的SDK. 下面主要展示页面功能:照相或照片的方式解析图片,提取特征:照相或照片的方式对比图片,判断相似 ...
- 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析
目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...
最新文章
- Majority Element:主元素
- XamlReader动态使用xaml
- 重学 html の meta 标签
- LaTeX技巧006:使用pdfLaTeX时,添加PDF文件属性的方法
- 【大佬漫谈】数字科技驱动的信贷风险技术——乔杨
- JQ实现仿淘宝条件筛选
- Python类方法和静态方法
- 55种数据可视化开源工具_8种出色的开源数据可视化工具
- html日历菜鸟,菜鸟第一次发代码 -- JS日历
- 922. 按奇偶排序数组 II
- PDF.js插件 | 在线PDF阅读插件,支持手机在线阅读
- 人工智能算法:遗传算法
- svn安装打开不弹出登录认证页面
- Android关闭屏幕时不锁屏
- phxqueue java_微信开源PhxQueue:高可用、高可靠、高性能的分布式队列的几个问题...
- Android文件解压
- HDUSec-安全隔离网闸项目总结
- Tengine 服务健康检查
- mysql 基本语句_mysql 基本语句
- swift实战-豆瓣电台
热门文章
- python发红包问题_一个关于红包的问题引发的python算法初体验
- 机器值计算机组成,计算机组成原理_数据的机器运算.ppt
- 魔术师发牌问题 java_魔术师发牌问题--java实现
- C# 解决串口接收数据不完整
- SpringMVC第五次课 SSM整合
- 最新数据分析与商业智能趋势前瞻
- 产品小白的知识点1——用户周期
- v-model双向绑定
- python3mysql包_python3读取MySQL-Front的MYSQL密码
- Squid-4.1(最新)安装及构建代理服务器