若有不正之处请多多谅解,并欢迎批评指正。

请尊重作者劳动成果,转载请标明原文链接:

http://www.cnblogs.com/go2sea/p/5623218.html

我们已经了解了AQS的大致工作流程,接下来看下AQS的一个应用——CountDownLatch。

我们已经知道,AQS提供了两种模式:独占模式&共享模式。CountDownLatch就是一个使用共享模式的自定义同步器实现的共享锁。

源代码:

/** ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.*********************//******* Written by Doug Lea with assistance from members of JCP JSR-166* Expert Group and released to the public domain, as explained at* http://creativecommons.org/publicdomain/zero/1.0/*/package java.util.concurrent;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;/*** A synchronization aid that allows one or more threads to wait until* a set of operations being performed in other threads completes.** <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.* The {@link #await await} methods block until the current count reaches* zero due to invocations of the {@link #countDown} method, after which* all waiting threads are released and any subsequent invocations of* {@link #await await} return immediately.  This is a one-shot phenomenon* -- the count cannot be reset.  If you need a version that resets the* count, consider using a {@link CyclicBarrier}.** <p>A {@code CountDownLatch} is a versatile synchronization tool* and can be used for a number of purposes.  A* {@code CountDownLatch} initialized with a count of one serves as a* simple on/off latch, or gate: all threads invoking {@link #await await}* wait at the gate until it is opened by a thread invoking {@link* #countDown}.  A {@code CountDownLatch} initialized to <em>N</em>* can be used to make one thread wait until <em>N</em> threads have* completed some action, or some action has been completed N times.** <p>A useful property of a {@code CountDownLatch} is that it* doesn't require that threads calling {@code countDown} wait for* the count to reach zero before proceeding, it simply prevents any* thread from proceeding past an {@link #await await} until all* threads could pass.** <p><b>Sample usage:</b> Here is a pair of classes in which a group* of worker threads use two countdown latches:* <ul>* <li>The first is a start signal that prevents any worker from proceeding* until the driver is ready for them to proceed;* <li>The second is a completion signal that allows the driver to wait* until all workers have completed.* </ul>** <pre>* class Driver { // ...*   void main() throws InterruptedException {*     CountDownLatch startSignal = new CountDownLatch(1);*     CountDownLatch doneSignal = new CountDownLatch(N);**     for (int i = 0; i < N; ++i) // create and start threads*       new Thread(new Worker(startSignal, doneSignal)).start();**     doSomethingElse();            // don't let run yet*     startSignal.countDown();      // let all threads proceed*     doSomethingElse();*     doneSignal.await();           // wait for all to finish*   }* }** class Worker implements Runnable {*   private final CountDownLatch startSignal;*   private final CountDownLatch doneSignal;*   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {*      this.startSignal = startSignal;*      this.doneSignal = doneSignal;*   }*   public void run() {*      try {*        startSignal.await();*        doWork();*        doneSignal.countDown();*      } catch (InterruptedException ex) {} // return;*   }**   void doWork() { ... }* }** </pre>** <p>Another typical usage would be to divide a problem into N parts,* describe each part with a Runnable that executes that portion and* counts down on the latch, and queue all the Runnables to an* Executor.  When all sub-parts are complete, the coordinating thread* will be able to pass through await. (When threads must repeatedly* count down in this way, instead use a {@link CyclicBarrier}.)** <pre>* class Driver2 { // ...*   void main() throws InterruptedException {*     CountDownLatch doneSignal = new CountDownLatch(N);*     Executor e = ...**     for (int i = 0; i < N; ++i) // create and start threads*       e.execute(new WorkerRunnable(doneSignal, i));**     doneSignal.await();           // wait for all to finish*   }* }** class WorkerRunnable implements Runnable {*   private final CountDownLatch doneSignal;*   private final int i;*   WorkerRunnable(CountDownLatch doneSignal, int i) {*      this.doneSignal = doneSignal;*      this.i = i;*   }*   public void run() {*      try {*        doWork(i);*        doneSignal.countDown();*      } catch (InterruptedException ex) {} // return;*   }**   void doWork() { ... }* }** </pre>** <p>Memory consistency effects: Until the count reaches* zero, actions in a thread prior to calling* {@code countDown()}* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>* actions following a successful return from a corresponding* {@code await()} in another thread.** @since 1.5* @author Doug Lea*/
public class CountDownLatch {/*** Synchronization control For CountDownLatch.* Uses AQS state to represent count.*/private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;Sync(int count) {setState(count);}int getCount() {return getState();}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}}private final Sync sync;/*** Constructs a {@code CountDownLatch} initialized with the given count.** @param count the number of times {@link #countDown} must be invoked*        before threads can pass through {@link #await}* @throws IllegalArgumentException if {@code count} is negative*/public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}/*** Causes the current thread to wait until the latch has counted down to* zero, unless the thread is {@linkplain Thread#interrupt interrupted}.** <p>If the current count is zero then this method returns immediately.** <p>If the current count is greater than zero then the current* thread becomes disabled for thread scheduling purposes and lies* dormant until one of two things happen:* <ul>* <li>The count reaches zero due to invocations of the* {@link #countDown} method; or* <li>Some other thread {@linkplain Thread#interrupt interrupts}* the current thread.* </ul>** <p>If the current thread:* <ul>* <li>has its interrupted status set on entry to this method; or* <li>is {@linkplain Thread#interrupt interrupted} while waiting,* </ul>* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.** @throws InterruptedException if the current thread is interrupted*         while waiting*/public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}/*** Causes the current thread to wait until the latch has counted down to* zero, unless the thread is {@linkplain Thread#interrupt interrupted},* or the specified waiting time elapses.** <p>If the current count is zero then this method returns immediately* with the value {@code true}.** <p>If the current count is greater than zero then the current* thread becomes disabled for thread scheduling purposes and lies* dormant until one of three things happen:* <ul>* <li>The count reaches zero due to invocations of the* {@link #countDown} method; or* <li>Some other thread {@linkplain Thread#interrupt interrupts}* the current thread; or* <li>The specified waiting time elapses.* </ul>** <p>If the count reaches zero then the method returns with the* value {@code true}.** <p>If the current thread:* <ul>* <li>has its interrupted status set on entry to this method; or* <li>is {@linkplain Thread#interrupt interrupted} while waiting,* </ul>* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.** <p>If the specified waiting time elapses then the value {@code false}* is returned.  If the time is less than or equal to zero, the method* will not wait at all.** @param timeout the maximum time to wait* @param unit the time unit of the {@code timeout} argument* @return {@code true} if the count reached zero and {@code false}*         if the waiting time elapsed before the count reached zero* @throws InterruptedException if the current thread is interrupted*         while waiting*/public boolean await(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}/*** Decrements the count of the latch, releasing all waiting threads if* the count reaches zero.** <p>If the current count is greater than zero then it is decremented.* If the new count is zero then all waiting threads are re-enabled for* thread scheduling purposes.** <p>If the current count equals zero then nothing happens.*/public void countDown() {sync.releaseShared(1);}/*** Returns the current count.** <p>This method is typically used for debugging and testing purposes.** @return the current count*/public long getCount() {return sync.getCount();}/*** Returns a string identifying this latch, as well as its state.* The state, in brackets, includes the String {@code "Count ="}* followed by the current count.** @return a string identifying this latch, as well as its state*/public String toString() {return super.toString() + "[Count = " + sync.getCount() + "]";}
}

View Code

可以看到,CountDownLatch只有一个成员变量,是它自定义的同步器:

private final Sync sync;

一、await方法

    public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public boolean await(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}

CountDownLatch提供了两种await方法:有等待时长限制和一直等待。两种均能响应中断(归根到底是UNSAFE.park可响应中断。但是如果是定时的park,则不能判断被唤醒的原因是超时还是被中断,因此需要isInterrupted判断下,而此方法会清除中断标志,因此如果是延迟处理要“补上”)。

await()方法调用了同步器的acquireSharedInterruptibly方法,这个方法由上层AQS提供,它调用了我们重写的tryAcquireShared方法而封装了排队等待、唤醒、响应中断的细节,我们只关注自定义同步器中的tryAcquireShared方法即可:

        protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}

注意,tryAcquireShared方法的返回值的意义在AQS是这样规定的:负值代表获取资源失败,非负值代表成功获取资源后剩余资源的数量。而这里当getState返回值为0的时候,我们却总是返回1,表示仍有剩余资源。这看上去并不合理,但这确实是正确的:因为可能有多个线程调用了await,同时在队列中等待资源,CountDownLatch的语义要求我们在倒计时结束有唤醒所有等待线程。因此我们在成功获取资源后,总是要告诉AQS“还有剩余”,这样AQS便会继续唤醒队列中的其他等待线程(由AQS中的setHeadAndPropagate方法调用doReleaseShared来唤醒)。一句话:成功获取总返回1是为了保证唤醒的“延续性”。

有等待时长限制的await(long, TimeUnit)方法调用了同步器的tryAcquireSharedNanos方法:

    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();return tryAcquireShared(arg) >= 0 ||doAcquireSharedNanos(arg, nanosTimeout);}

这个方法首先检测中断,然后试图获取,失败后进入“自旋-等待”阶段,直到成功获取或被中断。这是AQS的内容,不再赘述。

二、countDown方法

    public void countDown() {sync.releaseShared(1);}

countDown方法调用releaseShared释放资源:

    public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}

releaseShared会调用tryReleaseShared方法:

        protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}

方法一直自旋,直到成功释放或倒计时完毕。因为可能有超过count的线程调用countDown,因此releaseShared是可能失败的。当然在释放过程中也可能发生竞争,CAS自旋保证竞争发生时的正确执行。

三、总结

CountDownLatch是一个共享锁,但有些特别:他在初始化的时候锁住了所有共享资源,任何线程都可以调用countDown方法释放一个资源,当所有资源都被释放后,所有等待线程被唤醒。从而实现了倒计时的效果。

CountDownLatch是一次性的,计数值不可恢复。

转载于:https://www.cnblogs.com/go2sea/p/5623218.html

Java多线程之JUC包:CountDownLatch源码学习笔记相关推荐

  1. Java多线程之JUC包:Semaphore源码学习笔记

    若有不正之处请多多谅解,并欢迎批评指正. 请尊重作者劳动成果,转载请标明原文链接: http://www.cnblogs.com/go2sea/p/5625536.html Semaphore是JUC ...

  2. java毕业设计线上教学平台mybatis+源码+调试部署+系统+数据库+lw

    java毕业设计线上教学平台mybatis+源码+调试部署+系统+数据库+lw java毕业设计线上教学平台mybatis+源码+调试部署+系统+数据库+lw 本源码技术栈: 项目架构:B/S架构 开 ...

  3. java毕业设计线上花店购物商城源码+lw文档+mybatis+系统+mysql数据库+调试

    java毕业设计线上花店购物商城源码+lw文档+mybatis+系统+mysql数据库+调试 java毕业设计线上花店购物商城源码+lw文档+mybatis+系统+mysql数据库+调试 本源码技术栈 ...

  4. java毕业设计线上鲜花销售系统源码+lw文档+mybatis+系统+mysql数据库+调试

    java毕业设计线上鲜花销售系统源码+lw文档+mybatis+系统+mysql数据库+调试 java毕业设计线上鲜花销售系统源码+lw文档+mybatis+系统+mysql数据库+调试 本源码技术栈 ...

  5. java毕业设计线上旅行信息管理系统源码+lw文档+mybatis+系统+mysql数据库+调试

    java毕业设计线上旅行信息管理系统源码+lw文档+mybatis+系统+mysql数据库+调试 java毕业设计线上旅行信息管理系统源码+lw文档+mybatis+系统+mysql数据库+调试 本源 ...

  6. Apache log4j-1.2.17源码学习笔记

    (1)Apache log4j-1.2.17源码学习笔记 http://blog.csdn.net/zilong_zilong/article/details/78715500 (2)Apache l ...

  7. Vuex 4源码学习笔记 - 通过Vuex源码学习E2E测试(十一)

    在上一篇笔记中:Vuex 4源码学习笔记 - 做好changelog更新日志很重要(十) 我们学到了通过conventional-changelog来生成项目的Changelog更新日志,通过更新日志 ...

  8. 雷神FFMpeg源码学习笔记

    雷神FFMpeg源码学习笔记 文章目录 雷神FFMpeg源码学习笔记 读取编码并依据编码初始化内容结构 每一帧的视频解码处理 读取编码并依据编码初始化内容结构 在开始编解码视频的时候首先第一步需要注册 ...

  9. RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的?

    RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 文章目录 RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 前言 项目 ...

  10. Vuex 4源码学习笔记 - Vuex是怎么与Vue结合?(三)

    在上一篇笔记中:Vuex源码学习笔记 - Vuex开发运行流程(二) 我们通过运行npm run dev命令来启动webpack,来开发Vuex,并在Vuex的createStore函数中添加了第一个 ...

最新文章

  1. 闻声识人时代将至?多家企业争相布局
  2. spring boot实战(第六篇)加载application资源文件源码分析
  3. mongodb--find高级用法
  4. [已解决] java 增加 ALPN支持
  5. 深度评测阿里云、百度云、腾讯云和华为云
  6. OpenCV之模板匹配案例
  7. mysql5.7主从复制_MySQL 5.7.18的安装及主从复制(主从同步)
  8. “猜猜红桃A在哪里”——android小游戏(入门学习必备)
  9. VS2012/VS2013安装教程
  10. 【测试沉思录】3. 如何测试微信公众号?
  11. jquery漂浮广告代码
  12. 阿里云访问控制——OSS——STS
  13. 在centos下安装pycrypto报错 RuntimeError: autoconf error
  14. JavasScript 第一天课 课后笔记 2022.3.21
  15. 独行侠作风之CRM实施阻力
  16. Stream报错:stream has already been operated upon or closed
  17. python爬取美女_知乎大神用Python爬取高颜值美女(Python爬虫+人脸检测+颜值检测)...
  18. 计算机组成原理之计算机最基本的工作原理
  19. 巴西龟饲养日志----七月底巴西龟状况
  20. 台式电脑计算机怎么添加任务栏,如何将我的电脑添加到任务栏中(win7)?

热门文章

  1. 没事不要在for循环期间增减迭代序列的成员
  2. 第三代搜索推出的专题是什么?
  3. 数据仓库和数据库有什么区别
  4. Error accessing PRODUCT_USER_PROFILE?
  5. 【Vegas原创】红烧肉的做法
  6. Week2——XML
  7. SQL中 UNION 和 UNION ALL 操作符小结
  8. Mac 安装 Grunt
  9. 《管理3.0》读书笔记
  10. “带锁的门”问题,并有c语言和python代码运行效率对比