Java多线程之JUC包:CountDownLatch源码学习笔记
若有不正之处请多多谅解,并欢迎批评指正。
请尊重作者劳动成果,转载请标明原文链接:
http://www.cnblogs.com/go2sea/p/5623218.html
我们已经了解了AQS的大致工作流程,接下来看下AQS的一个应用——CountDownLatch。
我们已经知道,AQS提供了两种模式:独占模式&共享模式。CountDownLatch就是一个使用共享模式的自定义同步器实现的共享锁。
源代码:
/** ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.*********************//******* Written by Doug Lea with assistance from members of JCP JSR-166* Expert Group and released to the public domain, as explained at* http://creativecommons.org/publicdomain/zero/1.0/*/package java.util.concurrent; import java.util.concurrent.locks.*; import java.util.concurrent.atomic.*;/*** A synchronization aid that allows one or more threads to wait until* a set of operations being performed in other threads completes.** <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.* The {@link #await await} methods block until the current count reaches* zero due to invocations of the {@link #countDown} method, after which* all waiting threads are released and any subsequent invocations of* {@link #await await} return immediately. This is a one-shot phenomenon* -- the count cannot be reset. If you need a version that resets the* count, consider using a {@link CyclicBarrier}.** <p>A {@code CountDownLatch} is a versatile synchronization tool* and can be used for a number of purposes. A* {@code CountDownLatch} initialized with a count of one serves as a* simple on/off latch, or gate: all threads invoking {@link #await await}* wait at the gate until it is opened by a thread invoking {@link* #countDown}. A {@code CountDownLatch} initialized to <em>N</em>* can be used to make one thread wait until <em>N</em> threads have* completed some action, or some action has been completed N times.** <p>A useful property of a {@code CountDownLatch} is that it* doesn't require that threads calling {@code countDown} wait for* the count to reach zero before proceeding, it simply prevents any* thread from proceeding past an {@link #await await} until all* threads could pass.** <p><b>Sample usage:</b> Here is a pair of classes in which a group* of worker threads use two countdown latches:* <ul>* <li>The first is a start signal that prevents any worker from proceeding* until the driver is ready for them to proceed;* <li>The second is a completion signal that allows the driver to wait* until all workers have completed.* </ul>** <pre>* class Driver { // ...* void main() throws InterruptedException {* CountDownLatch startSignal = new CountDownLatch(1);* CountDownLatch doneSignal = new CountDownLatch(N);** for (int i = 0; i < N; ++i) // create and start threads* new Thread(new Worker(startSignal, doneSignal)).start();** doSomethingElse(); // don't let run yet* startSignal.countDown(); // let all threads proceed* doSomethingElse();* doneSignal.await(); // wait for all to finish* }* }** class Worker implements Runnable {* private final CountDownLatch startSignal;* private final CountDownLatch doneSignal;* Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {* this.startSignal = startSignal;* this.doneSignal = doneSignal;* }* public void run() {* try {* startSignal.await();* doWork();* doneSignal.countDown();* } catch (InterruptedException ex) {} // return;* }** void doWork() { ... }* }** </pre>** <p>Another typical usage would be to divide a problem into N parts,* describe each part with a Runnable that executes that portion and* counts down on the latch, and queue all the Runnables to an* Executor. When all sub-parts are complete, the coordinating thread* will be able to pass through await. (When threads must repeatedly* count down in this way, instead use a {@link CyclicBarrier}.)** <pre>* class Driver2 { // ...* void main() throws InterruptedException {* CountDownLatch doneSignal = new CountDownLatch(N);* Executor e = ...** for (int i = 0; i < N; ++i) // create and start threads* e.execute(new WorkerRunnable(doneSignal, i));** doneSignal.await(); // wait for all to finish* }* }** class WorkerRunnable implements Runnable {* private final CountDownLatch doneSignal;* private final int i;* WorkerRunnable(CountDownLatch doneSignal, int i) {* this.doneSignal = doneSignal;* this.i = i;* }* public void run() {* try {* doWork(i);* doneSignal.countDown();* } catch (InterruptedException ex) {} // return;* }** void doWork() { ... }* }** </pre>** <p>Memory consistency effects: Until the count reaches* zero, actions in a thread prior to calling* {@code countDown()}* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>* actions following a successful return from a corresponding* {@code await()} in another thread.** @since 1.5* @author Doug Lea*/ public class CountDownLatch {/*** Synchronization control For CountDownLatch.* Uses AQS state to represent count.*/private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;Sync(int count) {setState(count);}int getCount() {return getState();}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}}private final Sync sync;/*** Constructs a {@code CountDownLatch} initialized with the given count.** @param count the number of times {@link #countDown} must be invoked* before threads can pass through {@link #await}* @throws IllegalArgumentException if {@code count} is negative*/public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}/*** Causes the current thread to wait until the latch has counted down to* zero, unless the thread is {@linkplain Thread#interrupt interrupted}.** <p>If the current count is zero then this method returns immediately.** <p>If the current count is greater than zero then the current* thread becomes disabled for thread scheduling purposes and lies* dormant until one of two things happen:* <ul>* <li>The count reaches zero due to invocations of the* {@link #countDown} method; or* <li>Some other thread {@linkplain Thread#interrupt interrupts}* the current thread.* </ul>** <p>If the current thread:* <ul>* <li>has its interrupted status set on entry to this method; or* <li>is {@linkplain Thread#interrupt interrupted} while waiting,* </ul>* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.** @throws InterruptedException if the current thread is interrupted* while waiting*/public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}/*** Causes the current thread to wait until the latch has counted down to* zero, unless the thread is {@linkplain Thread#interrupt interrupted},* or the specified waiting time elapses.** <p>If the current count is zero then this method returns immediately* with the value {@code true}.** <p>If the current count is greater than zero then the current* thread becomes disabled for thread scheduling purposes and lies* dormant until one of three things happen:* <ul>* <li>The count reaches zero due to invocations of the* {@link #countDown} method; or* <li>Some other thread {@linkplain Thread#interrupt interrupts}* the current thread; or* <li>The specified waiting time elapses.* </ul>** <p>If the count reaches zero then the method returns with the* value {@code true}.** <p>If the current thread:* <ul>* <li>has its interrupted status set on entry to this method; or* <li>is {@linkplain Thread#interrupt interrupted} while waiting,* </ul>* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.** <p>If the specified waiting time elapses then the value {@code false}* is returned. If the time is less than or equal to zero, the method* will not wait at all.** @param timeout the maximum time to wait* @param unit the time unit of the {@code timeout} argument* @return {@code true} if the count reached zero and {@code false}* if the waiting time elapsed before the count reached zero* @throws InterruptedException if the current thread is interrupted* while waiting*/public boolean await(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}/*** Decrements the count of the latch, releasing all waiting threads if* the count reaches zero.** <p>If the current count is greater than zero then it is decremented.* If the new count is zero then all waiting threads are re-enabled for* thread scheduling purposes.** <p>If the current count equals zero then nothing happens.*/public void countDown() {sync.releaseShared(1);}/*** Returns the current count.** <p>This method is typically used for debugging and testing purposes.** @return the current count*/public long getCount() {return sync.getCount();}/*** Returns a string identifying this latch, as well as its state.* The state, in brackets, includes the String {@code "Count ="}* followed by the current count.** @return a string identifying this latch, as well as its state*/public String toString() {return super.toString() + "[Count = " + sync.getCount() + "]";} }
View Code
可以看到,CountDownLatch只有一个成员变量,是它自定义的同步器:
private final Sync sync;
一、await方法
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public boolean await(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}
CountDownLatch提供了两种await方法:有等待时长限制和一直等待。两种均能响应中断(归根到底是UNSAFE.park可响应中断。但是如果是定时的park,则不能判断被唤醒的原因是超时还是被中断,因此需要isInterrupted判断下,而此方法会清除中断标志,因此如果是延迟处理要“补上”)。
await()方法调用了同步器的acquireSharedInterruptibly方法,这个方法由上层AQS提供,它调用了我们重写的tryAcquireShared方法而封装了排队等待、唤醒、响应中断的细节,我们只关注自定义同步器中的tryAcquireShared方法即可:
protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}
注意,tryAcquireShared方法的返回值的意义在AQS是这样规定的:负值代表获取资源失败,非负值代表成功获取资源后剩余资源的数量。而这里当getState返回值为0的时候,我们却总是返回1,表示仍有剩余资源。这看上去并不合理,但这确实是正确的:因为可能有多个线程调用了await,同时在队列中等待资源,CountDownLatch的语义要求我们在倒计时结束有唤醒所有等待线程。因此我们在成功获取资源后,总是要告诉AQS“还有剩余”,这样AQS便会继续唤醒队列中的其他等待线程(由AQS中的setHeadAndPropagate方法调用doReleaseShared来唤醒)。一句话:成功获取总返回1是为了保证唤醒的“延续性”。
有等待时长限制的await(long, TimeUnit)方法调用了同步器的tryAcquireSharedNanos方法:
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();return tryAcquireShared(arg) >= 0 ||doAcquireSharedNanos(arg, nanosTimeout);}
这个方法首先检测中断,然后试图获取,失败后进入“自旋-等待”阶段,直到成功获取或被中断。这是AQS的内容,不再赘述。
二、countDown方法
public void countDown() {sync.releaseShared(1);}
countDown方法调用releaseShared释放资源:
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}
releaseShared会调用tryReleaseShared方法:
protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}
方法一直自旋,直到成功释放或倒计时完毕。因为可能有超过count的线程调用countDown,因此releaseShared是可能失败的。当然在释放过程中也可能发生竞争,CAS自旋保证竞争发生时的正确执行。
三、总结
CountDownLatch是一个共享锁,但有些特别:他在初始化的时候锁住了所有共享资源,任何线程都可以调用countDown方法释放一个资源,当所有资源都被释放后,所有等待线程被唤醒。从而实现了倒计时的效果。
CountDownLatch是一次性的,计数值不可恢复。
转载于:https://www.cnblogs.com/go2sea/p/5623218.html
Java多线程之JUC包:CountDownLatch源码学习笔记相关推荐
- Java多线程之JUC包:Semaphore源码学习笔记
若有不正之处请多多谅解,并欢迎批评指正. 请尊重作者劳动成果,转载请标明原文链接: http://www.cnblogs.com/go2sea/p/5625536.html Semaphore是JUC ...
- java毕业设计线上教学平台mybatis+源码+调试部署+系统+数据库+lw
java毕业设计线上教学平台mybatis+源码+调试部署+系统+数据库+lw java毕业设计线上教学平台mybatis+源码+调试部署+系统+数据库+lw 本源码技术栈: 项目架构:B/S架构 开 ...
- java毕业设计线上花店购物商城源码+lw文档+mybatis+系统+mysql数据库+调试
java毕业设计线上花店购物商城源码+lw文档+mybatis+系统+mysql数据库+调试 java毕业设计线上花店购物商城源码+lw文档+mybatis+系统+mysql数据库+调试 本源码技术栈 ...
- java毕业设计线上鲜花销售系统源码+lw文档+mybatis+系统+mysql数据库+调试
java毕业设计线上鲜花销售系统源码+lw文档+mybatis+系统+mysql数据库+调试 java毕业设计线上鲜花销售系统源码+lw文档+mybatis+系统+mysql数据库+调试 本源码技术栈 ...
- java毕业设计线上旅行信息管理系统源码+lw文档+mybatis+系统+mysql数据库+调试
java毕业设计线上旅行信息管理系统源码+lw文档+mybatis+系统+mysql数据库+调试 java毕业设计线上旅行信息管理系统源码+lw文档+mybatis+系统+mysql数据库+调试 本源 ...
- Apache log4j-1.2.17源码学习笔记
(1)Apache log4j-1.2.17源码学习笔记 http://blog.csdn.net/zilong_zilong/article/details/78715500 (2)Apache l ...
- Vuex 4源码学习笔记 - 通过Vuex源码学习E2E测试(十一)
在上一篇笔记中:Vuex 4源码学习笔记 - 做好changelog更新日志很重要(十) 我们学到了通过conventional-changelog来生成项目的Changelog更新日志,通过更新日志 ...
- 雷神FFMpeg源码学习笔记
雷神FFMpeg源码学习笔记 文章目录 雷神FFMpeg源码学习笔记 读取编码并依据编码初始化内容结构 每一帧的视频解码处理 读取编码并依据编码初始化内容结构 在开始编解码视频的时候首先第一步需要注册 ...
- RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的?
RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 文章目录 RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 前言 项目 ...
- Vuex 4源码学习笔记 - Vuex是怎么与Vue结合?(三)
在上一篇笔记中:Vuex源码学习笔记 - Vuex开发运行流程(二) 我们通过运行npm run dev命令来启动webpack,来开发Vuex,并在Vuex的createStore函数中添加了第一个 ...
最新文章
- 闻声识人时代将至?多家企业争相布局
- spring boot实战(第六篇)加载application资源文件源码分析
- mongodb--find高级用法
- [已解决] java 增加 ALPN支持
- 深度评测阿里云、百度云、腾讯云和华为云
- OpenCV之模板匹配案例
- mysql5.7主从复制_MySQL 5.7.18的安装及主从复制(主从同步)
- “猜猜红桃A在哪里”——android小游戏(入门学习必备)
- VS2012/VS2013安装教程
- 【测试沉思录】3. 如何测试微信公众号?
- jquery漂浮广告代码
- 阿里云访问控制——OSS——STS
- 在centos下安装pycrypto报错 RuntimeError: autoconf error
- JavasScript 第一天课 课后笔记 2022.3.21
- 独行侠作风之CRM实施阻力
- Stream报错:stream has already been operated upon or closed
- python爬取美女_知乎大神用Python爬取高颜值美女(Python爬虫+人脸检测+颜值检测)...
- 计算机组成原理之计算机最基本的工作原理
- 巴西龟饲养日志----七月底巴西龟状况
- 台式电脑计算机怎么添加任务栏,如何将我的电脑添加到任务栏中(win7)?