新类库中的构件

JDK1.5引入了java.util.concurrent包,它提供了大量的新类,用于安全而高效地解决并发问题。下面将通过例子一一介绍。

CountDownLatch-倒数计数器

CountDownLatch被称为倒数计步器,它是Java内置的同步器的一种(还有信号量、CyclicBarrier等同步器,后续将作介绍)。它的功能是阻塞一个或多个线程,这些阻塞的线程需要等待其他线程中的某一个或几个条件成立,一旦成立,这些阻塞的线程将并发执行。比如,有若干运动员等待着(若干线程阻塞)发令枪响起(使解除线程阻塞的条件成立),一旦枪声响起,运动员将开始起跑(解除阻塞),这里发令枪是条件,若干运动员是等待条件成立的线程。而当所有运动员冲过终点线时,或者说最后一个运动员冲过终点线时,计时器停止计时,在这里,所有的运动员又成了条件,而计时器成了达成条件的结果——当计时器在等待着最后一名运动员冲过终点线,条件达成,计时终止。

CountDownLatch最重要的方法是countDown()和await(),前者主要是倒数一次,后者是等待倒数到0,如果没有到达0,就只有阻塞等待了。

方法说明:

  • public void countDown()

    • 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。如果当前计数大于零,则将计数减少。如果新的计数为零,出于线程调度目的,将重新启用所有的等待线程。如果当前计数等于零,则不发生任何操作。
  • public boolean await(long timeout,
    TimeUnit unit)
    throws InterruptedException

    • 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。如果当前计数为零,则此方法立刻返回 true 值。
      如果当前计数大于零,则出于线程调度目的,将禁用当前线程,且在发生以下三种情况之一前,该线程将一直处于休眠状态:1、由于调用 countDown() 方法的次数还不够计数到达零;2、其他某个线程中断当前线程;3、已超出指定的等待时间。如果计数到达零,则该方法返回 true 值。如果当前线程:
      在进入此方法时已经设置了该线程的中断状态;或者
      在等待时被中断,
      则抛出 InterruptedException,并且清除当前线程的已中断状态。如果超出了指定的等待时间,则返回值为 false。如果该时间小于等于零,则此方法根本不会等待。

参数:
timeout - 要等待的最长时间
unit - timeout 参数的时间单位。
返回:
如果计数到达零,则返回 true;如果在计数到达零之前超过了等待时间,则返回 false

抛出:
InterruptedException - 如果当前线程在等待时被中断

public class CountDownLatchTest {// 模拟了100米赛跑,10名选手已经准备就绪,只等裁判一声令下。当所有人都到达终点时,比赛结束。public static void main(String[] args) throws InterruptedException {// 开始的倒数锁 final CountDownLatch begin = new CountDownLatch(1);  // 结束的倒数锁 final CountDownLatch end = new CountDownLatch(10);  // 十名选手 final ExecutorService exec = Executors.newFixedThreadPool(10);  for (int index = 0; index < 10; index++) {final int NO = index + 1;  Runnable run = new Runnable() {public void run() {  try {  // 如果当前计数为零,则此方法立即返回。// 等待begin.await();  Thread.sleep((long) (Math.random() * 10000));  System.out.println("No." + NO + " arrived");  } catch (InterruptedException e) {  } finally {  // 每个选手到达终点时,end就减一end.countDown();}  }  };  exec.submit(run);}  System.out.println("Game Start");  // begin减一,开始游戏begin.countDown();  // 等待end变为0,即所有选手到达终点end.await();  System.out.println("Game Over");  exec.shutdown();  }
}
Game Start
No.9 arrived
No.6 arrived
No.8 arrived
No.7 arrived
No.10 arrived
No.1 arrived
No.5 arrived
No.4 arrived
No.2 arrived
No.3 arrived
Game Over

CyclicBarrier

CyclicBarrier和CountDownLatch一样,都是关于线程的计数器。

用法略有不同:并发执行一组任务,它们并行地执行工作然后再进行下一个步骤前等待, 直到所有的任务都完成。它使得所有的并行任务都将在栅栏处排队,因此可以一致地向前移动。它与CountDownLatch的最主要区别是前者可以多次重用,而后者只能触发一次。

 public class TestCyclicBarrier {2 //并发线程数3     private static final int THREAD_NUM = 5;4     5     public static class WorkerThread implements Runnable{6 7         CyclicBarrier barrier;8         9         public WorkerThread(CyclicBarrier b){
10             this.barrier = b;
11         }
12
13         @Override
14         public void run() {
15             // TODO Auto-generated method stub
16             try{
17                 System.out.println("Worker's waiting");
18                 //线程在这里等待,直到所有线程都到达barrier。
19                 barrier.await();
20                 System.out.println("ID:"+Thread.currentThread().getId()+" Working");
21             }catch(Exception e){
22                 e.printStackTrace();
23             }
24         }
25
26     }
27
28     /**
29      * @param args
30      */
31     public static void main(String[] args) {
32         // TODO Auto-generated method stub
33         CyclicBarrier cb = new CyclicBarrier(THREAD_NUM, new Runnable() {
34             //当所有线程到达barrier时执行
35             @Override
36             public void run() {
37                 // TODO Auto-generated method stub
38                 System.out.println("Inside Barrier");
39
40             }
41         });
42
43         for(int i=0;i<THREAD_NUM;i++){
44             new Thread(new WorkerThread(cb)).start();
45         }
46     }
47
48 }
//输出:
51 Worker's waiting
52 Worker's waiting
53 Worker's waiting
54 Worker's waiting
55 Worker's waiting
56 Inside Barrier
57 ID:12 Working
58 ID:8 Working
59 ID:11 Working
60 ID:9 Working
61 ID:10 Working

CyclicBarrier初始化时规定一个数目,然后计算调用了CyclicBarrier.await()进入等待的线程数。当线程数达到了这个数目时,所有进入等待状态的线程被唤醒并继续。
CyclicBarrier就象它名字的意思一样,可看成是个障碍, 所有的线程必须到齐后才能一起通过这个障碍。
CyclicBarrier初始时还可带一个Runnable的参数, 此Runnable任务在CyclicBarrier的数目达到后,所有其它线程被唤醒前被执行。

DelayQueue

1、DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期的时间最长。如果没有任何延迟到期,那么不会有任何元素,并且poll()将返回null。(正是因为这样,你不能将null放置到这种个队列中。)

2、Delayed

种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。

此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。

3、DelayQueue队列中保存的是实现了Delayed接口的实现类,里面必须实现getDelay()和compareTo()方法,前者用于取DelayQueue里面的元素时判断是否到了延时时间,否则不予获取,是则获取。 compareTo()方法用于进行队列内部的排序

getDelay(TimeUnit unit){

return unit.convert(time - now(),TimeUnit.NANOSECONDES);//time为设定的间隔时间

}

   compareTo(Object object){if(object instanceof SchuduledTask){SchuduledTask task = (SchuduledTask) object ;long l = this.time - task.time;if(l > 0) return 1 ; //比当前的小则返回1,比当前的大则返回-1,否则为0else if(l < 0 ) return -1;else return 0;}}

下面是一个示例,其中的Delayed对象自身就是任务,而DelayedTaskConsumer将最“紧急”
的任务(到期时间最长的任务)从队列中取出,然后运行它。这样DelayQueue就成了优先级队列的一种变体。

class DelayedTask implements Runnable, Delayed {private static int counter  = 0;private final int id = counter++;private final int delta;private final long trigger;protected static List<DelayedTask> sequence = new ArrayList<>();public DelayedTask(int delayMilliseconds) {delta = delayMilliseconds;trigger = System.nanoTime() + NANOSECONDS.convert(delta, MILLISECONDS);sequence.add(this);} public long getDelay(TimeUnit unit) {return unit.convert(trigger - System.nanoTime(), NANOSECONDS);}public int compareTo(Delayed arg) {DelayedTask that = (DelayedTask)arg;if(trigger < that.trigger) {return -1;}if(trigger > that.trigger) {return 1;}return 0;}public void run() {system.out.print(this + " ");}public String toString() {return String.format("[%1$-4d]", delta) + " Task " + id;}public String summary() {return "(" + id + ":" + delta + ")";}public static class EndSentinel extends EdlayedTask {private ExecutorService exec;public EndSentinel(int delay, ExecutorService e) {super(delay);exec = e;}public void run() {for(DelayTask pt : sequence) {System.out.print(pt.summary() + " "); }System.out.print();System.out.print(this + "Calling shutdownNow()");exec.shutdownNow();}}
}class DelayedTaskConsumer implements Runnable {private DelayQueue<DelayedTaks> q;public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {this.q = q;}public void run() {try {while(!Thread.interrupted()) {q.take().run();} } catch(InterruptedException e) {}System.out.print("Finished DelayedTaskConsumer");}
}public class DelayQueueDemo {public static void main(String[] args) {Random random = new Random(47);ExecutorService exec = Executors.newCachedThreadPool();DelayQueue<DelayTask> queue = new DelayQueue<>();for(int i = 0 ; i < 20; ++i) {queue.put(new DelayTask(random.nextInt(5000)));}queue.add(new DelayedTask.EndSentinel(5000,exec));exec.execute(new DelayedTaskConsumer(queue)); }
}
//输出
[128 ] Task 11 [200 ] Task 7 [429 ] Task 5 [520 ] Task 18 [535 ] Task 1 [961 ] Task 4 [998 ] Task 16 [1207 ] Task 9 [1693 ] Task 2 [1809 ] Task 14 [1861 ] Task 3 [2278] Task 15 [3288 ] Task 10 [3551 ] Task 12 [4258 ] Task 0 [4258 ] Task 19 [4522 ] Task 8 [4589 ] Task 13 [4861 ] Task 17 [4868 ] Task 6
(0:4258) (1:555) (2:1693) (3:1861) (4:961) (5:429) (6:4868) (7:200) (8:4522) (9:1207) (10:3288) (11:128) (12:3551) (13:4589) (14:1809) (15:2278) (16:998) (17:4861) (18:520) (19:4258) (20:5000)
[5000] Task 20 Calling shutdownNow()
Finished DelayedTaskConsumer

DelayTask包含一个被称为sequence的List<DelayedTask>,他保存了任务创建的顺序,因此我们可以看到排序是按照实际发生的顺序执行的。

Delay接口有一个方法名为getDelay(),它可以用来告知延迟到期有多长时间,或者延迟在多长时间之前已经到期。这个方法将强制我们去使用TimeUnit类,因为这就是参数类型。这回产生一个非常方便的类,因为你可以很容易地转换单位而无需任何声明。例如,delta的值是以毫秒为单位存储的,但是Java SE5的方法System.nanoTime()产生的时间则是以纳秒为单位的。你可以转换delta的值,方法是声明它的单位以及你希望以什么单位来表示:

NANOSECONDS.convert(delta, MILLISECONDS);

在getDelay()中,希望使用的单位是作为unit参数传递进来的,你使用它将当前与处罚时间之间的差转换为调用者要求的单位,而无需知道这些单位是什么。

注意,因为DelayedTaskConsumer自身是一个任务,所以它有自己的Thread,它可以使用这个县城来运行从队列中获取的所有任务。由于任务是按照队列优先级的顺序执行的,因此在本例中不需要启动任何单独的线程来运行DelayedTask。

从输出中可以看到,任务创建的顺序没有任何影响,任务是按照所期望的延迟顺序执行的。

PriorityBlockingQueue

这是一个很基础的优先级队列,它具有可阻塞的读取操作。这面这个示例演示了PriorityBlockingQueue的用法,其中在优先级队列中的对象是按照优先级顺序从队列中出现的任务。PrioritizedTask被赋予了一个优先级数字,以此来提供这种顺序:

class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {private Random random = new Random(47);private static int counter = 0;private final int id = counter++;private final int priority;protected static List<PrioritizedTask> sequence = new ArrayList<>();public PrioritizedTask(int priority) {this.priority = priority;sequence.add(this);}public int compareTo(PriorityTask arg) {return priority < arg.priority ? 1 : (priority > arg.priority ? -1 : 0);}public void run() {try {TimeUnit.MILLISECOND.sleep(rand.nextInt(250));} catch(InterruptedException e) {}System.out.print(this);}public String toString() {return String.format("[%1$-3d]", priority) + " Task " + id;}public String summary() {return "(" + id + ":" + priority + ")";}public static class EndSentinel extends PrioritizedTask {private ExecutorService exec;public EndSentinel(ExecutorService e) {super(-1);exec = e;}public void run() {int count = 0;for(PrioritizedTask pt : sequence) {System.out.print(pt.summary());if(++count % 5 == 0) {System.out.print();} System.out.print();System.out.print(this + " Calling shutdownNow()");exec.shutdownNow();} }}
}public PriorityTaskProducer implements Runnable {private Random random = new Random(47);private Queue<Runnable> queue;private ExecutorService exec;public PriorityTaskProducer(Queue<Runnable> q, ExecutorService e) {this.queue = q;this.exec = e;}public void run() {for(int i = 0; i < 20; ++i) {queue.add(new PrioritizedTask(random.nextInt(10)));Thread.yield();}try {for(int i = 0; i < 10; ++i) {TimeUnit.MILLISECONDS.sleep(250);queue.add(new PriorityTask(10));}for(int i = 0; i < 10; ++i) {queue.add(new PriorityTask(i));}queue.add(new PrioritizedTask.EndSentinel(exec));} catch(InterruptedException e) {}System.out.print("Finished PrioritizedTaskProducer");}
}class PrioritizedTaskConsumer implements Runnable {private PriorityBlockingQueue<Runnable> q;public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q) {this.q = q;}public void run() {try {while(!Thread.interrupted()) {q.take().run();}} catch(InterruptedException e) {}System.out.print("Finished PrioritizedTaskConsumer");}}public class PriorityBlockingQueueDemo {public static void main(String[] args) {Random random = new Random(47);ExecutorService exec = Executors.newCachedThreadPool();PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();exec.execute(new PrioritizedTaskProducer(queue, exec));exec.execute(new PrioritizedTaskConsumer(queue));}
}

这与前一个示例相同,PrioritizedTask对象的创建序列被记录在sequence List中,用于和实际的执行顺序比较。run()方法将休眠一小段时间,然后打印对象。

在这里,不需要任何显式同步——不必考虑当你从这种队列中读取时,其中是否有元素,因为这个队列在没有元素时,将直接阻塞读取者。

Semaphore-信号量

正常的锁(如synchronized或Lock)在任何时刻都只允许一个任务访问同一项资源,而Semaphore(计数信号量)允许n个任务同时访问这个资源。你还可以将信号量看作是在向外分发使用资源的“许可证”,尽管实际上没有任何许可证对象。

下面演示了信号量的使用。并用到了一个概念——对象池,它管理这个数量有限的对象,当要使用对象时可以签出它们,而在用户使用完毕时,可以将它们签回。这种功能被封装在一个泛型类中:

public class Pool<T> {private int size;private List<T> items = new ArrayList<T>();private volatile boolean[] checkOut;private Semapore available;public Pool(Class<T> classObject, int size) {this.size = size;this.checkOut = new boolean[size];this.available = new Semaphore(size, true);for(int i = 0; i < size; ++i) {try {items.add(classObject.newInstance());} catch(Exception e) {throw new RuntimeExecption(e);}}}public T checkOut() throws InterruptedException {available.acquire();return getItem();}public void checkIn(T x) {if(releaseItem(x)) {available.release();}}private synchronized T getItem() {for(int i = 0; i < size; ++i) {if(!checkOut[i]) {checkOut[i] = true;return items.get(i);}return null;}}private synchronized boolean releaseItem(T item) {int index = items.indexOf(item);if(index == -1) {return false;}if(checkOut[index]) {checkOut[index] = false;return true;}return false;}
}

在Pool中,构造方法使用newInstance把对象加载到池中,如果需要一个新的对象,可以调用checkOut(),并在使用完后,嫁给checkIn()。

在checkOut()中,如果没有任何信号量许可证可用——在池中没有更多的对象了,available将阻塞调用过程。在checkIn()中,如果被签入的对象有效,则会向信号量返回一个许可证。

下面使用Fat对象作为示例——Fat类的构造器执行起来很耗时:

public class Fat {private volatile double d;private static int counter = 0;private final int id = counter++;public Fat() {for(int i = 0;i < 10000; ++i) {d += (Math.PI + Math.E) / (double)i;}}public void operation() {System.out.println(this);}public String toString() {return "Fat id: " + id;}
}

我们可以使用Pool在管理这个创建耗时的Fat对象,该任务将签出Fat对象,持有一段时间之后再将它们签入,以此来测试Pool这个类:

class CheckoutTask<T> implements Runnable {private static int counter = 0;private final int id = counter++;private Pool<T> pool;public CheckoutTask(Pool<T> pool) {this.pool = pool;} public void run() {T item = pool.checkOut();System.out.print(this + "checked out " + item);TimeUnit.SECONDS.sleep(1);System.out.print(this + "checking in " + item);pool.checkIn(item);} catch(InterruptedException e) {}public String toString() {return "CheckoutTask " + id + " "; }}public class SemaphoreDemo {final static int SIZE = 25;public static void main(String[] args) {final Pool<Fat> pool = new Pool<Fat>(Fat.class, SIZE);ExecutorService exec = Executors.newCachedThreadPool();for(int i = 0;i < SIZE; ++i) {exec.execute(new CheckoutTask<Fat>(pool));}System.out.print("All CheckoutTasks created");List<Fat> list = new ArrayList<>();for(int i = 0;i < SIZE; ++i) {Fat f = pool.checkOut();System.out.print(i + ": main() thread checked out ");f.operation();list.add(f);}Future<?> bolcked = exec.submit(new Runnable() {public void run() {try {pool.checkOut();} catch(InterruptedException e) {System.out.print("checkOut() Interrupted");}}});TimeUnit.SECONDS.sleep(2);blocked.cancel(true);System.out.print("Checking in objects in " + list);for(Fat f : list) {pool.checkIn(f);}for(Fat f : list) {pool.checkIn(f);}exec.shutdown();}
}

这个示例依赖于Pool客户端严格地并愿意签入所持有的的对象, 当其工作时,这是最简单的解决方案。

Java编程思想-并发(5)相关推荐

  1. java 编程思想 并发_java编程思想-java中的并发(一)

    一.基本的线程机制 并发编程使我们可以将程序划分为多个分离的.独立运行的任务.通过使用多线程机制,这些独立任务中的每一个都将由执行线程来驱动. 线程模型为编程带来了便利,它简化了在单一程序中同时jia ...

  2. Java编程思想-并发

    21 并发 21.1 并发的多面性 操作系统通常会将进程互相隔离开,使得进程之前相互不干涉.但是操作系统对于进程通常会有数量和开销的限制,导致进程不能无限创建. Java中线程调度采用抢占式,表示调度 ...

  3. Java编程思想读书笔记一:并发

    1. Thread.yield( )方法 当调用yield()时,即在建议具有相同优先级的其他线程可以运行了,但是注意的是,仅仅是建议,没有任何机制保证你这个建议会被采纳 .一般情况下,对于任何重要的 ...

  4. 《JAVA编程思想》学习笔记:第21章(并发)

    目录 Java编程思想(一)第1~4章:概述 Java编程思想(二)第5章:初始化和清理 Java编程思想(三)第6章:访问权限 Java编程思想(四)第7章:复用类 Java编程思想(五)第8章:多 ...

  5. java 四舍五入_《JAVA编程思想》5分钟速成:1-4章:概述

    前言: 1.面向对象的特征有哪些方面? 2.Math.round(11.5) 等于多少? Math.round(-11.5)等于多少? 3.float f=3.4;是否正确? 4.short s1 = ...

  6. Java编程思想 (1~10)

    [注:此博客旨在从<Java编程思想>这本书的目录结构上来检验自己的Java基础知识,只为笔记之用] 第一章 对象导论 1.万物皆对象 2.程序就是对象的集合 3.每个对象都是由其它对象所 ...

  7. java编程思想怎么样_读完java编程思想后的思考?

    谢邀,这本书真的给我带来很多思考. 我的java入门不是java编程思想,学校的教材是一本紫色的书,已经忘了叫什么名字了,里面内容倒挺新还讲了些javafx.但那本书实在是太浅并且结构混乱,以至于我和 ...

  8. Java编程思想(第4版)(评注版)

    传世经典书丛  Java编程思想(第4版)(评注版)  (美)埃克尔(Eckel, B.)著 刘中兵评注 ISBN 978-7-121-13521-7 2011年6月出版 定    价:108.00元 ...

  9. 《Java编程思想》读书笔记

    前言:三年之前就买了<Java编程思想>这本书,但是到现在为止都还没有好好看过这本书,这次希望能够坚持通读完整本书并整理好自己的读书笔记,上一篇文章是记录的第十七章到第十八章的内容,这一次 ...

  10. 【java】《java编程思想》 读书笔记

    之前主要用的C++的比较多,之前花了快2个月的实际认真系统全面的学习了以下java的基础语法,<java编程思想>这本书翻译水平确实不是很好,很多话读着会比较拗口.推荐读之前,先去网上搜索 ...

最新文章

  1. 深度学习、人工智能领域顶级书籍推荐
  2. 一个高并发请求的算法
  3. 织梦dedecms移动版设置二级域名的方法 织梦如何设置m.开头的域名
  4. 在线学习(Online Learning)
  5. 复杂网络研究:让世界变得简单
  6. 店铺如何用视觉走出差异化?
  7. 03-树3 Tree Traversals Again
  8. Flash 已死,Deno 当立?
  9. Kail Linux渗透测试教程之ARP侦查Netdiscover端口扫描Zenmap与黑暗搜索引擎Shodan
  10. JAVA网络编程-TCP客户端与服务器端连接
  11. matlab电场线公式,matlab画电场线
  12. 超宽带 DWM1000模块 简介
  13. Sigmoid函数求导
  14. STM32入门之GPIO详解
  15. windows下安装redis并设置自启动
  16. 任务栏右键工具栏里的语言栏没有的修复.reg
  17. 「ZJOI2009」多米诺骨牌
  18. 数学向量 java,数学向量和旋转(Topdown java game dev – physics problem)
  19. 常用工具类之jwt的学习使用
  20. Win10怎么卸载.net framework?

热门文章

  1. Java基础-构造函数
  2. unity webgl 手机端微信直接打开链接
  3. VIM插件管理:管理插件的插件pathogen
  4. JavaScript swiper
  5. Apple account使用不同账号续费问题
  6. 第七颗头骨 忘魂花 凤凰
  7. 调整bandizip压缩参数获得最大压缩效果
  8. sPortfolio: Stratified Visual Analysis of Stock Portfolios
  9. 葫芦娃游戏维护服务器怎么办,葫芦娃一直进不去 无法进入游戏解决方法
  10. ssh-keygen命令详解