一、CountDownLatch介绍

CountDownLatch是一种同步手段,允许一个或者更多的线程等待,直到在其他线程正在执行的一组操作完成。给定count数目后CountDownLatch被初始化。await()方法阻塞,直到由于调用countDown()方法,当前count值达到0,之后所有等待线程被释放,而任何后续await()方法的调用会立即返回。这个是只有一次的现场,即count值无法被重设。如果你需要一个能够重设count值的版本,不妨考虑使用CyclicBarrier。

二、CountDownLatch应用

CountDownLatch是一个通用的同步工具,可用于许多目的。一个用count值为1来初始化的CountDownLatch可用作一个开关或者门闩:所有的线程调用await()方法等待一个线程调用countDown()方法后把门打开。一个用count值为N来初始化的CountDownLatch可用作使得一个线程等待,直到N个线程完成各自的事情,或者一些action被完成N次等。CountDownLatch一个有用的特性是:它不需要线程调用countDown()方法等待计数达到零在继续之前,它只是阻止任何线程继续过去一个等待,直到所有线程可以通过。

1、示例应用一

这里有一对类,一组工作线程使用两个countdown latches的示例:

第一个是启动信号,阻止worker线程工作直到driver准备好;

第二个是完成信号,允许driver等到所有的workers工作完成。

代码如下:

package com.pengli.jdk;import java.util.concurrent.CountDownLatch;public class TestCountDownLatch {class Driver {void main() throws InterruptedException {CountDownLatch startSignal = new CountDownLatch(1);CountDownLatch doneSignal = new CountDownLatch(20);for (int i = 0; i < 20; ++i) // create and start threadsnew Thread(new Worker(startSignal, doneSignal)).start();doSomethingElse(); // don't let run yetstartSignal.countDown(); // let all threads proceeddoSomethingElse();doneSignal.await(); // wait for all to finish}void doSomethingElse() {// ...}}class Worker implements Runnable {private final CountDownLatch startSignal;private final CountDownLatch doneSignal;Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {this.startSignal = startSignal;this.doneSignal = doneSignal;}public void run() {try {startSignal.await();doWork();doneSignal.countDown();} catch (InterruptedException ex) {} // return;}void doWork() {// ...}}
}

2、示例应用二

另一个典型用法是将一个问题分成n份,在一个线程中定义并执行一份,并在latch中count down,然后将所有的线程放入一个队列。当所有的部分完成,协调线程将会通过await()方法,继续处理。当线程必须以这种方式反复count down时,使用CyclicBarrier。

代码如下:

package com.pengli.jdk;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;public class TestCountDownLatch2 {class Driver2 { // ...void main() throws InterruptedException {CountDownLatch doneSignal = new CountDownLatch(20);Executor e = Executors.newFixedThreadPool(20);for (int i = 0; i < 20; ++i) // create and start threadse.execute(new WorkerRunnable(doneSignal, i));doneSignal.await(); // wait for all to finish}}class WorkerRunnable implements Runnable {private final CountDownLatch doneSignal;private final int i;WorkerRunnable(CountDownLatch doneSignal, int i) {this.doneSignal = doneSignal;this.i = i;}public void run() {doWork(i);doneSignal.countDown();}void doWork(int i) {// ...}}
}

三、CountDownLatch实现分析

1、Sync

在CountDownLatch内部,有一个Sync的同步器,它继承自java.util.concurrent包中各种同步工具共用的AbstractQueuedSynchronizer,其实现如下:

    /*** Synchronization control For CountDownLatch.* Uses AQS state to represent count.*/private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;Sync(int count) {setState(count);}int getCount() {return getState();}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}}

关于AbstractQueuedSynchronizer,有其它的文章进行专门的介绍。这里只分析下Sync的实现。其有一个需要入参int count的构造函数,设置AbstractQueuedSynchronizer的state。并覆写了tryAcquireShared()和tryReleaseShared()方法,其中tryReleaseShared()方法用于CountDownLatch的countDown()方法,这个tryReleaseShared()方法的逻辑如下:

在一个for循环内,首先通过getState()获取state值,如果为0,直接返回false,否则取state-1,并尝试CAS操作,修改state状态,并且state等于0,返回true,否则返回false。

tryAcquireShared()方法更简单,判断state(即count值),如果等于0,返回1,否则返回-1.

2、countDown()

countDown()方法的实现很简单,如下:

    /*** Decrements the count of the latch, releasing all waiting threads if* the count reaches zero.** <p>If the current count is greater than zero then it is decremented.* If the new count is zero then all waiting threads are re-enabled for* thread scheduling purposes.** <p>If the current count equals zero then nothing happens.*/public void countDown() {sync.releaseShared(1);}

其核心处理就是减少latch中count值,如果cout值为0,释放所有的等待线程。它调用的是sync的releaseShared()方法,而这个方法是在AbstractQueuedSynchronizer中实现的,如下:

    /*** Releases in shared mode.  Implemented by unblocking one or more* threads if {@link #tryReleaseShared} returns true.** @param arg the release argument.  This value is conveyed to*        {@link #tryReleaseShared} but is otherwise uninterpreted*        and can represent anything you like.* @return the value returned from {@link #tryReleaseShared}*/public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}

先调用tryReleaseShared()方法,即上述Sync的同名方法,并且如果返回true的话,继续调用doReleaseShared()方法,返回true,否则返回false。即如果修改后state(即count)值为正,不做其他处理,否则调用doReleaseShared()方法。

3、await()

await()方法的核心作用是,让当前线程阻塞,直到latch的count值更改为0,或者当前线程被interrupted。如果count值为0,则await()方法直接返回。代码如下:

    /*** Causes the current thread to wait until the latch has counted down to* zero, unless the thread is {@linkplain Thread#interrupt interrupted}.** <p>If the current count is zero then this method returns immediately.** <p>If the current count is greater than zero then the current* thread becomes disabled for thread scheduling purposes and lies* dormant until one of two things happen:* <ul>* <li>The count reaches zero due to invocations of the* {@link #countDown} method; or* <li>Some other thread {@linkplain Thread#interrupt interrupts}* the current thread.* </ul>** <p>If the current thread:* <ul>* <li>has its interrupted status set on entry to this method; or* <li>is {@linkplain Thread#interrupt interrupted} while waiting,* </ul>* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.** @throws InterruptedException if the current thread is interrupted*         while waiting*/public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}

还是借助的sync,调用的其acquireSharedInterruptibly()方法,这个方法是在Sync的父类中实现的,代码如下:

    /*** Acquires in shared mode, aborting if interrupted.  Implemented* by first checking interrupt status, then invoking at least once* {@link #tryAcquireShared}, returning on success.  Otherwise the* thread is queued, possibly repeatedly blocking and unblocking,* invoking {@link #tryAcquireShared} until success or the thread* is interrupted.* @param arg the acquire argument* This value is conveyed to {@link #tryAcquireShared} but is* otherwise uninterpreted and can represent anything* you like.* @throws InterruptedException if the current thread is interrupted*/public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}

先调用Sync的tryAcquireShared()方法,如果返回值为负值,则调用doAcquireSharedInterruptibly()方法。上面讲到了,如果state(即count)值为0,则返回1,方法直接返回,否则进入doAcquireSharedInterruptibly()方法,实现阻塞。

doReleaseShared()和doAcquireSharedInterruptibly()方法的介绍参见AbstractQueuedSynchronizer的分析文章。

Java源码分析之CountDownLatch相关推荐

  1. 【Java源码分析】Java8的HashMap源码分析

    Java8中的HashMap源码分析 源码分析 HashMap的定义 字段属性 构造函数 hash函数 comparableClassFor,compareComparables函数 tableSiz ...

  2. 【Java源码分析】LinkedHashSet和HashSet源码分析

    类的定义 public class HashSet<E> extends AbstractSet<E> implements Set<E>, Cloneable, ...

  3. java 源码分析_Java 源代码编译成 Class 文件的过程分析

    原标题:Java 源代码编译成 Class 文件的过程分析 在上篇文章< Java三种编译方式:前端编译 JIT编译 AOT编译 >中了解到了它们各有什么优点和缺点,以及前端编译+JIT编 ...

  4. Java源码分析之HashMap(JDK1.8)

    一.HashMap概述 HashMap是常用的Java集合之一,是基于哈希表的Map接口的实现.与HashTable主要区别为不支持同步和允许null作为key和value.由于HashMap不是线程 ...

  5. 坦克大战java源码分析(上)

    坦克大战源码分析 一.mytank07.java文件分析 注:至上而下将不懂的语句.结构体含义.代码作用等作出解释: 1.包的使用 package com.haiding.tank_7; 包语句的语法 ...

  6. 【Java源码分析】Java8的ArrayList源码分析

    Java8的ArrayList源码分析 源码分析 ArrayList类的定义 字段属性 构造函数 trimToSize()函数 Capacity容量相关的函数,比如扩容 List大小和是否为空 con ...

  7. python打蛇_hprose for java源码分析-1 初识

    1.1初识 hprose是个开源RPC框架.支持语言 JAVA, C#,C++,Python,PHP等一系列语言.这里着重分析下hprose for java源码. 可以到https://github ...

  8. Java源码分析 AbstractList的迭代器实现(Itr和ListItr)

    Itr和ListItr的简介 AbstractList的有两个内部类实现了迭代器,一个内部类Itr实现了Iterator接口,一个内部类ListItr实现了ListIterator接口,后者其实就是前 ...

  9. java源码分析之ArrayList

    ArrayList就是传说中的动态数组,就是Array的复杂版本,它提供了如下一些好处:动态的增加和减少元素.灵活的设置数组的大小...... 认真阅读本文,我相信一定会对你有帮助.比如为什么Arra ...

  10. Java源码分析:深入探讨Iterator模式

    http://tech.ccidnet.com/art/3539/20060712/618391_1.html java.util包中包含了一系列重要的集合类.本文将从分析源码入手,深入研究一个集合类 ...

最新文章

  1. 在CentOS 6.6 x86_64上安装SystemTap/Perf+FlameGraph玩转火焰图实录
  2. Python学习小甲鱼视频003
  3. Facebook使用机器学习手段来自动优化其系统性能
  4. 利用CodeIgniter中的Email类发邮件
  5. 04-JDBC连接MySQL数据库【修改数据】
  6. 从啤酒尿布到自动驾驶,零售行业如何再创营销神话?
  7. 找出矩阵中绝对值最大的元素及其位置_线性代数之——矩阵范数和条件数
  8. 1381. 设计一个支持增量操作的栈
  9. 最详细的YOLOv3论文笔记
  10. 计算机技术在工业的应用,计算机技术在工业自动化控制的应用
  11. 万能手机解锁工具v1.0绿色加强版
  12. Tomcat——通过.bat批处理程序重启Tomcat
  13. 一文搞懂HTTPProxy丨含基础、高级路由、服务韧性
  14. dns劫持是什么 dns被劫持了怎么办、dns被劫持怎么解决
  15. python-数据分析-pandas
  16. 他看了几千份技术简历,愿意把技术简历的秘籍传授给你
  17. python十六进制转换成二进制_python - 将十六进制转换为二进制
  18. 网络和http协议理论
  19. 中等职业学校计算机课程标准,全市中等职业学校信息技术课程标准内涵解析与教学设计培训会议成功举办...
  20. 《TT语音》用户体验分析报告

热门文章

  1. cpu频率_CPU频率的提升到底会产生哪些影响?
  2. webuploader上传多张照片的基本功能
  3. 海边二本计算机学校,读二本不丢人!这些二本大学实力都很强,就业率也不差!...
  4. mysql触发器编程_mysql之触发器trigger详解
  5. apt安装openjdk8
  6. configure: error: readline library not found/libreadline.so: undefined reference to tputs
  7. AndroidStudio是最难用的IDE,没有之一
  8. 解决办法:atoi不能将CString 转化为char *
  9. SHELL中如何对一个变量进行算术操作(加减)
  10. WINDOWS下,找包含特殊字串的文件的解决办法