Java并发编程实践 目录

并发编程 01—— ThreadLocal

并发编程 02—— ConcurrentHashMap

并发编程 03—— 阻塞队列和生产者-消费者模式

并发编程 04—— 闭锁CountDownLatch 与 栅栏CyclicBarrier

并发编程 05—— Callable和Future

并发编程 06—— CompletionService : Executor 和 BlockingQueue

并发编程 07—— 任务取消

并发编程 08—— 任务取消 之 中断

并发编程 09—— 任务取消 之 停止基于线程的服务

并发编程 10—— 任务取消 之 关闭 ExecutorService

并发编程 11—— 任务取消 之 “毒丸”对象

并发编程 12—— 任务取消与关闭 之 shutdownNow 的局限性

并发编程 13—— 线程池的使用 之 配置ThreadPoolExecutor 和 饱和策略

并发编程 14—— 线程池 之 整体架构

并发编程 15—— 线程池 之 原理一

并发编程 16—— 线程池 之 原理二

并发编程 17—— Lock

并发编程 18—— 使用内置条件队列实现简单的有界缓存

并发编程 19—— 显式的Conditon 对象

并发编程 20—— AbstractQueuedSynchronizer 深入分析

并发编程 21—— 原子变量和非阻塞同步机制

概述

第1部分 问题引入及实例

第2部分 实例

第1部分 问题引入

《Java并发编程实践》一书6.3.5节CompletionService:Executor和BlockingQueue,有这样一段话:

"如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future,然后反复使用get方法,同时将参数timeout指定为0,从而通过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:完成服务CompletionService。"

这是什么意思呢?通过一个例子,分别使用繁琐的做法和CompletionService来完成,清晰的对比能让我们更好的理解上面的一段话和CompletionService这个API提供的初衷。

第2部分 实例

考虑这样的场景,有5个Callable任务分别返回5个整数,然后我们在main方法中按照各个任务完成的先后顺序,在控制台打印返回结果。

 1 package com.concurrency.TaskExecution_6;
 2
 3 import java.util.concurrent.Callable;
 4 import java.util.concurrent.TimeUnit;
 5
 6 /**
 7  *
 8  * @ClassName: ReturnAfterSleepCallable
 9  * TODO
10  * @author Xingle
11  * @date 2014-9-16 上午9:20:34
12  */
13 public class ReturnAfterSleepCallable implements Callable<Integer>{
14
15     private int sleepSeconds;
16     private int returnValue;
17
18     public ReturnAfterSleepCallable(int sleepSeconds,int returnValue){
19         this.sleepSeconds = sleepSeconds;
20         this.returnValue = returnValue;
21     }
22
23
24     @Override
25     public Integer call() throws Exception {
26         System.out.println("begin to execute ");
27
28         TimeUnit.SECONDS.sleep(sleepSeconds);
29         return returnValue;
30     }
31
32 }

1.繁琐的做法

  通过一个List来保存每个任务返回的Future,然后轮询这些Future,直到每个Future都已完成。我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,所以在调用get方式时,需要将超时时间设置为0。

 1 package com.concurrency.TaskExecution_6;
 2
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 import java.util.concurrent.ExecutionException;
 6 import java.util.concurrent.ExecutorService;
 7 import java.util.concurrent.Executors;
 8 import java.util.concurrent.Future;
 9
10 /**
11  * 传统的繁琐做法
12  * @ClassName: TraditionalTest
13  * TODO
14  * @author Xingle
15  * @date 2014-9-16 上午10:06:21
16  */
17 public class TraditionalTest {
18
19     public static void main(String[] args){
20         int taskSize = 5;
21         ExecutorService executor = Executors.newFixedThreadPool(taskSize);
22         List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();
23
24         for(int i= 1; i<=taskSize; i++){
25             int sleep = taskSize -1;
26             int value = i;
27             //向线程池提交任务
28             Future<Integer> future = executor.submit(new ReturnAfterSleepCallable(sleep, value));
29             //保留每个任务的Future
30             futureList.add(future);
31         }
32         // 轮询,获取完成任务的返回结果
33         while(taskSize > 0){
34             for (Future<Integer> future : futureList){
35                 Integer result = null;
36                 try {
37                     result = future.get();
38                 } catch (InterruptedException e) {
39                     e.printStackTrace();
40                 } catch (ExecutionException e) {
41                     e.printStackTrace();
42                 }
43                 //任务已经完成
44                 if(result!=null){
45                     System.out.println("result = "+result);
46                     //从future列表中删除已经完成的任务
47                     futureList.remove(future);
48                     taskSize --;
49                     break;
50                 }
51             }
52         }
53         // 所有任务已经完成,关闭线程池
54         System.out.println("all over ");
55         executor.shutdown();
56     }
57
58 }

执行结果:

2.使用CompletionService

 1 package com.concurrency.TaskExecution_6;
 2
 3 import java.util.concurrent.CompletionService;
 4 import java.util.concurrent.ExecutionException;
 5 import java.util.concurrent.ExecutorCompletionService;
 6 import java.util.concurrent.ExecutorService;
 7 import java.util.concurrent.Executors;
 8
 9 /**
10  * 使用CompletionService
11  * @ClassName: CompletionServiceTest
12  * TODO
13  * @author Xingle
14  * @date 2014-9-16 上午11:32:45
15  */
16 public class CompletionServiceTest {
17
18     public static void main(String[] args){
19         int taskSize = 5;
20         ExecutorService executor = Executors.newFixedThreadPool(taskSize);
21         // 构建完成服务
22         CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executor);
23
24         for (int i=1;i<= taskSize; i++){
25             // 睡眠时间
26             int sleep = taskSize - i;
27             // 返回结果
28             int value = i;
29             //向线程池提交任务
30             completionService.submit(new ReturnAfterSleepCallable(sleep, value));
31             try {
32                 System.out.println("result:"+completionService.take().get());
33             } catch (InterruptedException e) {
34                 e.printStackTrace();
35             } catch (ExecutionException e) {
36                 e.printStackTrace();
37             }
38         }
39
40         System.out.println("all over. ");
41         executor.shutdown();
42
43     }
44
45 }

执行结果:

3.CompletionService和ExecutorCompletionService的实现

JDK源码中CompletionService的javadoc说明如下:

/*** A service that decouples the production of new asynchronous tasks* from the consumption of the results of completed tasks.  Producers* <tt>submit</tt> tasks for execution. Consumers <tt>take</tt>* completed tasks and process their results in the order they* complete. */

也就是说,CompletionService实现了生产者提交任务和消费者获取结果的解耦,生产者和消费者都不用关心任务的完成顺序,由CompletionService来保证,消费者一定是按照任务完成的先后顺序来获取执行结果。

ExecutorCompletionService是CompletionService的实现,融合了线程池Executor和阻塞队列BlockingQueue的功能。
 public ExecutorCompletionService(Executor executor) {if (executor == null)throw new NullPointerException();this.executor = executor;this.aes = (executor instanceof AbstractExecutorService) ?(AbstractExecutorService) executor : null;this.completionQueue = new LinkedBlockingQueue<Future<V>>();}

到这里可以推测,按照任务的完成顺序获取结果,就是通过阻塞队列实现的,阻塞队列刚好具有这样的性质:阻塞和有序。
ExecutorCompletionService任务的提交和执行都是委托给Executor来完成。当提交某个任务时,该任务首先将被包装为一个QueueingFuture
public Future<V> submit(Callable<V> task) {if (task == null) throw new NullPointerException();RunnableFuture<V> f = newTaskFor(task);executor.execute(new QueueingFuture(f));return f;
}

QueueingFuture是FutureTask的一个子类,通过改写FutureTask类的done方法,可以实现当任务完成时,将结果放入到BlockingQueue中。

private class QueueingFuture extends FutureTask<Void> {QueueingFuture(RunnableFuture<V> task) {super(task, null);this.task = task;}protected void done() { completionQueue.add(task); }private final Future<V> task;}

FutureTask.done(),这个方法默认什么都不做,就是一个回调,当提交的线程池中的任务完成时,会被自动调用。这也就说时候,当任务完成的时候,会自动执行QueueingFuture.done()方法,将返回结果加入到阻塞队列中,加入的顺序就是任务完成的先后顺序。

转载于:https://www.cnblogs.com/xingele0917/p/3974187.html

并发编程 06—— CompletionService :Executor 和 BlockingQueue相关推荐

  1. 并发编程-06线程安全性之可见性 (synchronized + volatile)

    文章目录 线程安全性文章索引 脑图 可见性定义 导致不可见的原因 可见性 -synchronized (既保证原子性又保证可见性) 可见性 - volatile(但不保证操作的原子性) volatil ...

  2. Java并发编程(08):Executor线程池框架

    本文源码:GitHub·点这里 || GitEE·点这里 一.Executor框架简介 1.基础简介 Executor系统中,将线程任务提交和任务执行进行了解耦的设计,Executor有各种功能强大的 ...

  3. 实战并发编程 - 10Guarded Suspension模式在BlockingQueue源码中应用

    文章目录 Pre Another Case 源码分析 概 继承关系 核心方法 非阻塞式方法(offer . add) offer(E e) add(E e) 阻塞式方法 (put(E e) / tak ...

  4. Java并发编程(06):Lock机制下API用法详解

    本文源码:GitHub·点这里 || GitEE·点这里 一.Lock体系结构 1.基础接口简介 Lock加锁相关结构中涉及两个使用广泛的基础API:ReentrantLock类和Condition接 ...

  5. java 单线程执行器_Java基础-并发编程-线程执行器executor

    线程实现方式 Thread.Runnable.Callable //实现Runnable接口的类将被Thread执行,表示一个基本任务 public interface Runnable { //ru ...

  6. java executor_Java并发编程(08):Executor线程池框架

    一.Executor框架简介 1.基础简介 Executor系统中,将线程任务提交和任务执行进行了解耦的设计,Executor有各种功能强大的实现类,提供便捷方式来提交任务并且获取任务执行结果,封装了 ...

  7. 并发编程-concurrent指南-阻塞队列BlockingQueue

    阻塞队列BlockingQueue,java.util.concurrent下的BlockingQueue接口表示一个线程放入和提取实例的队列. 适用场景: BlockingQueue通常用于一个线程 ...

  8. 实战并发编程 - 06线程在执行过程中的状态是如何流转的

    文章目录 线程的生命周期 线程的状态流转图 线程各个状态说明 NEW(初始化状态) RUNNABLE(就绪,运行中状态) RUNNING 运行中状态 BLOCKED(阻塞状态) WAITING(等待状 ...

  9. 【JUC并发编程06】多线程锁 (公平锁和非公平锁,死锁,可重锁)

    文章目录 6 多线程锁 (公平锁和非公平锁,死锁,可重锁) 6.1 synchronized 锁的八种情况 6.2 对上述例子的总结 6.3 公平锁和非公平锁 6.4 可重入锁 6.5 死锁 6 多线 ...

  10. 并发编程 07—— 任务取消

    Java并发编程实践 目录 并发编程 01-- ThreadLocal 并发编程 02-- ConcurrentHashMap 并发编程 03-- 阻塞队列和生产者-消费者模式 并发编程 04-- 闭 ...

最新文章

  1. Spark2.x写入Elasticsearch的性能测试
  2. reduce 轻松将cookie转化为对象
  3. mongon命令(转)
  4. 【机器学习】scikit-learn 1.0 版本重要新特性一览
  5. postman 发送json请求
  6. 吴恩达入驻知乎首答:如何系统学习机器学习?
  7. 你可能不知道的C#语言特性
  8. linux bash and,linux bash shell中for的用法and示例
  9. python colorbar位置大小调整_python - matplotlib相邻子图:添加colorbar更改子图的大小 - 堆栈内存溢出...
  10. 哪个NBA球队会夺冠?用深度学习预测最有潜力的球员!
  11. 微信小程序 图片上传+php后台源码
  12. 你真的懂协程 (Coroutine) 吗 ? Kotlin Coroutines — Suspending Functions
  13. JS匹配域名的正则表达式
  14. 培训三天敏捷我懂了这些
  15. 车载测试常见关心问题解答
  16. 根据先序和中序(中序和后序)确定二叉树
  17. 【SQL语言】数据库原理与设计
  18. ROsalind 014 Finding a Shared Motif
  19. Spring 6.0 堪称最强!新特性,惊爆了!
  20. Infor与AI的美丽邂逅

热门文章

  1. 博客园鼠标点击烟花特效
  2. 详解Angular开发中的登陆与身份验证
  3. javascript traverse object attributes 遍历对象属性
  4. tkinter中frame布局控件
  5. vue引入bootstrap.min.css报错:Cannot find module ./assets/css/bootstrap.min.css
  6. JSP脚本连接数据库
  7. [转]int String 互转的多种方法
  8. 【转】解密微软的架构师之路
  9. Bitmap缩放(二)
  10. 2018-2019-1 20165319 《信息安全系统设计基础》第八周学习总结