CountDownLatch 是一个同步工具类,它允许一个线程或多个线程一直等待,知道其他线程的操作执行完毕再执行。从命名可以解读到 countDown 是倒数的意思,类似于我们倒计时的概念。

CountDownLatch提供了两个方法:await()和countDown();countDownLatch 初始化的时候需要传入一个整数,在这个整数倒数到 0 之前,调用了 await 方法的程序都必须要等待,然后通过 countDown 来倒数。

一、Demo

public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch=new CountDownLatch(3);new Thread(()->{System.out.println(Thread.currentThread().getName()+"->begin");countDownLatch.countDown(); //初始值-1 =3-1=2;System.out.println(Thread.currentThread().getName()+"->end");},"t1").start();new Thread(()->{System.out.println(Thread.currentThread().getName()+"->begin");countDownLatch.countDown(); //2-1=1;System.out.println(Thread.currentThread().getName()+"->end");},"t2").start();new Thread(()->{System.out.println(Thread.currentThread().getName()+"->begin");countDownLatch.countDown(); //1-1=1;System.out.println(Thread.currentThread().getName()+"->end");},"t3").start();countDownLatch.await(); //阻塞Main线程}

从代码的实现来看,有点类似 join 的功能,但是比 join 更加灵活。CountDownLatch 构造函数会接收一个 int 类型的参数作为计数器的初始值,当调用 CountDownLatch 的countDown方法时,这个计数器就会减1。

二、CountDownLatch源码分析

  • CountDownLatch内部依赖Sync实现,而Sync继承AQS。
  • countDown()方法每次调用都会将 state 减 1,直到state 的值为 0。而 await 是一个阻塞方法,当 state 减为 0 的时候, await 方法才会返回。
  • await 可以被多个线程调用, 大家在这个时候脑子里要有个图:所有调用了await 方法的线程阻塞在 AQS 的阻塞队列中,等待条件满足(state == 0),将线程从队列中一个个唤醒过

2.1 await()方法的实现 

当前线程计数器为0之前一致等待,除非线程被中断

public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}

1.sync#acquireSharedInterruptibly方法,await方法调用了它,继续往下看:

public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();/*state如果不为0,说明当前线程需要加入到共享队列中*/if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}

这块代码主要是判断当前线程是否获取到了共享锁 ; ( 在CountDownLatch 中 , 使 用 的 是 共 享 锁 机 制 , 因 为CountDownLatch 并不需要实现互斥的特性)。

可以看到acquireSharedInterruptibly()方法里,会相继调用:tryAcquireShared()、doAcquireSharedInterruptibly()

2.tryAcquireShared()方法

protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}

可以看出来当state=0,则表示获取到了锁,返回1,否则返回-1。

3.doAcquireSharedInterruptibly()方法

private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {新建节点加入阻塞队列,可以看到Node类型为SHARED,推测出是后面用到的是共享锁final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {//获得当前节点pre节点final Node p = node.predecessor();// 如果获取到的 prev 是 head,也就是队列中第一个等待线程if (p == head) {// 再次尝试申请 反应到 CountDownLatch 就是查看是否还有线程需要等待(state是否为0)int r = tryAcquireShared(arg);//r>=0 表示获取到了执行权限,这个时候因为 state!=0,所以不会执行这段代码if (r >= 0) {  //尝试将第一个线程关联的节点设置为 head setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//重组双向链表,清空无效节点,挂起当前线程//经过自旋tryAcquireShared后,state还不为0,就会到这里,第一次的时候,waitStatus是0,那么node的waitStatus就会被置为SIGNAL,第二次再走到这里,就会用LockSupport的park方法把当前线程阻塞住if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}

该方法为一个自旋方法会尝试一直去获取同步状态。

  1. addWaiter设置为shared
  2. 判断tryAcquireShared方法,前面说过当state=0,则表示获取到了锁,返回1,否则返回-1
  3. 当获取到了锁,在 判 断 前 驱 节 点 是 头 节 点 后 , 调 用 了setHeadAndPropagate 方法,而不是简单的更新一下头节点。

doAcquireSharedInterruptibly的逻辑和独占功能的acquireQueued基本相同,阻塞线程的过程是一样的。不同之处:

  1. 创建的Node是定义成共享的(Node.SHARED);
  2. 被唤醒后重新尝试获取锁,不只设置自己为head,还需要通知其他等待的线程。(重点看后文释放操作里的setHeadAndPropagate)

4.addWaiter方法

private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;// 尝试快速入队操作,因为大多数时候尾节点不为 nullif (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}//如果尾节点为空(也就是队列为空) 或者尝试CAS入队失败(由于并发原因),进入enq方法enq(node);return node;}

上面是向等待队列中添加等待者(waiter)的方法。首先构造一个 Node 实体,参数为当前线程和一个mode,这个mode有两种形式,一个是 SHARED ,一个是 EXCLUSIVE,请看上面的代码。然后执行下面的入队操作 addWaiter,和 enq() 方法的 else 分支操作是一样的,这里的操作如果成功了,就不用再进到 enq() 方法的循环中去了,可以提高性能。如果没有成功,再调用 enq() 方法。

5.enq方法

private Node enq(final Node node) {// 死循环+CAS保证所有节点都入队for (;;) {Node t = tail;// 如果队列为空 设置一个空节点作为 headif (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {//加入队尾node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}

说明:循环加 CAS 操作是实现乐观锁的标准方式,CAS 是为了实现原子操作而出现的,所谓的原子操作指操作执行期间,不会受其他线程的干扰。Java 实现的 CAS 是调用 unsafe 类提供的方法,底层是调用 c++ 方法,直接操作内存,在 cpu 层面加锁,直接对内存进行操作。

上面是 AQS 等待队列入队方法,操作在无限循环中进行,如果入队成功则返回新的队尾节点,否则一直自旋,直到入队成功。假设入队的节点为 node ,上来直接进入循环,在循环中,先拿到尾节点。

  1. if 分支,如果尾节点为 null,说明现在队列中还没有等待线程,则尝试 CAS 操作将头节点初始化,然后将尾节点也设置为头节点,因为初始化的时候头尾是同一个,这和 AQS 的设计实现有关, AQS 默认要有一个虚拟节点。此时,尾节点不在为空,循环继续,进入 else 分支;
  2. else 分支,如果尾节点不为 null, node.prev = t ,也就是将当前尾节点设置为待入队节点的前置节点。然后又是利用 CAS 操作,将待入队的节点设置为队列的尾节点,如果 CAS 返回 false,表示未设置成功,继续循环设置,直到设置成功,接着将之前的尾节点(也就是倒数第二个节点)的 next 属性设置为当前尾节点,对应 t.next = node 语句,然后返回当前尾节点,退出循环。

6.setHeadAndPropagate方法

private void setHeadAndPropagate(Node node, int propagate) {//备份现在的 headNode h = head; // Record old head for check below//抢到锁的线程被唤醒 将这个节点设置为headsetHead(node);// propagate 一般都会大于0 或者存在可被唤醒的线程if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;// 只有一个节点 或者是共享模式 释放所有等待线程 各自尝试抢占锁if (s == null || s.isShared())doReleaseShared();}}

Node 对象中有一个属性是 waitStatus ,它有四种状态,分别是:

//线程已被 cancelled ,这种状态的节点将会被忽略,并移出队列
static final int CANCELLED =  1;
// 表示当前线程已被挂起,并且后继节点可以尝试抢占锁
static final int SIGNAL    = -1;
//线程正在等待某些条件
static final int CONDITION = -2;
//共享模式下 无条件所有等待线程尝试抢占锁
static final int PROPAGATE = -3;

2.1.2 图解分析

加入这个时候有 3 个线程调用了 await 方法, 由于这个时候 state 的值还不为 0,所以这三个线程都会加入到 AQS队列中。并且三个线程都处于阻塞状态。

2.2 countDown()方法的实现 

当执行 CountDownLatch 的 countDown()方法,将计数器减一,也就是state减一,当减到0的时候,等待队列中的线程被释放。

public void countDown() {sync.releaseShared(1);}

是调用 AQS 的 releaseShared 方法来实现的,下面代码中的方法是按顺序调用的,摘到了一起,方便查看:

1.releaseShared方法

// AQS类
public final boolean releaseShared(int arg) {// arg 为固定值 1// 如果计数器state 为0 返回true,前提是调用 countDown() 之前不能已经为0if (tryReleaseShared(arg)) {// 唤醒等待队列的线程doReleaseShared();return true;}return false;}

由于线程被 await 方法阻塞了,所以只有等到countdown 方法使得 state=0 的时候才会被唤醒,我们来看看 countdown 做了什么:

  1. 只有当 state 减为 0 的时候, tryReleaseShared 才返回 true, 否则只是简单的 state = state - 1。
  2. 如果 state=0, 则调用 doReleaseShared,唤醒处于 await 状态下的线程

2.tryReleaseShared() 

// CountDownLatch 重写的方法
protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zero// 依然是循环+CAS配合 实现计数器减1for (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}

3.doReleaseShared()方法

/ AQS类
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;// 如果节点状态为SIGNAL,则他的next节点也可以尝试被唤醒if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck casesunparkSuccessor(h);}// 将节点状态设置为PROPAGATE,表示要向下传播,依次唤醒//#1else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}//#2if (h == head)                   // loop if head changedbreak;}}

共享锁的释放和独占锁的释放有一定的差别前面唤醒锁的逻辑和独占锁是一样,先判断头结点是不是SIGNAL 状态,如果是,则修改为 0,并且唤醒头结点的下一个节点。

ROPAGATE: 标识为 PROPAGATE 状态的节点,是共享锁模式下的节点状态,处于这个状态下的节点,会对线程的唤醒进行传播

#1在什么情况下会发生呢?
——这个 CAS 失败的场景是:执行到这里的时候,刚好有一个节点入队,入队会将这个 ws 设置为 -1。

#2在什么情况下会发生呢?
——如果到这里的时候,前面唤醒的线程已经占领了 head,那么再循环;通过检查头节点是否改变了,如果改变了就继续循环。

说明 :

  • h == head:说明头节点还没有被刚刚用unparkSuccessor 唤醒的线程(这里可以理解为ThreadB)占有,此时 break 退出循环。
  • h != head:头节点被刚刚唤醒的线程(这里可以理解为ThreadB)占有,那么这里重新进入下一轮循环,唤醒下一个节点(这里是 ThreadB )。我们知道,等到ThreadB 被唤醒后,其实是会主动唤醒 ThreadC...

因为这是共享型的,当计数器为 0 后,会唤醒等待队列里的所有线程,所有调用了 await() 方法的线程都被唤醒,并发执行。这种情况对应到的场景是,有多个线程需要等待一些动作完成,比如一个线程完成初始化动作,其他5个线程都需要用到初始化的结果,那么在初始化线程调用 countDown 之前,其他5个线程都处在等待状态。一旦初始化线程调用了 countDown ,其他5个线程都被唤醒,开始执行。

三、再回到await方法

3.1 doAcquireSharedInterruptibly方法

一旦 ThreadA 被唤醒,代码又会继续回到doAcquireSharedInterruptibly 中来执行。 如果当前 state满足=0 的条件,则会执行 setHeadAndPropagate 方法。

3.2 setHeadAndPropagate

前面已经分析过,这个方法的主要作用是把被唤醒的节点,设置成 head 节点。 然后继续唤醒队列中的其他线程。
由于现在队列中有 3 个线程处于阻塞状态,一旦 ThreadA被唤醒,并且设置为 head 之后,会继续唤醒后续的ThreadB。

四、图解分析整个过程

五、总结

  1. AQS 分为独占模式和共享模式,CountDownLatch 使用了它的共享模式。
  2. AQS 当第一个等待线程(被包装为 Node)要入队的时候,要保证存在一个 head 节点,这个 head 节点不关联线程,也就是一个虚节点。
  3. 当队列中的等待节点(关联线程的,非 head 节点)抢到锁,将这个节点设置为 head 节点。
  4. 第一次自旋抢锁失败后,waitStatus 会被设置为 -1(SIGNAL),第二次再失败,就会被 LockSupport 阻塞挂起。
  5. 如果一个节点的前置节点为 SIGNAL 状态,则这个节点可以尝试抢占锁。

CountDownLatch分析(AQS共享锁)相关推荐

  1. CountDownLatch分析

    文章目录 (一)概念简介 (二)使用场景 (三)特点 (四)CountDownLatch源码分析 (1)构造函数 (2)await方法(核心) (3)countDown方法(核心) (一)概念简介 C ...

  2. AbstractQueuedSynchronizer 源码分析(共享锁)

    为什么80%的码农都做不了架构师?>>>    源码看之前的问题 race condition如何避免? 工作流程是怎么样的? 使用什么方式实现的? 使用到的其他类说明和资料 Loc ...

  3. 万字长文分析AQS原理以及应用

    1 引言 本文可能又臭又长,希望可以尽量将AQS相关的内容叙述清楚(个人能力有限),不喜勿喷. AQS,即juc并发包下的AbstractQueuedSynchronizer,我们也可以叫做抽象队列同 ...

  4. AQS共享锁的实现原理

    一.AQS共享锁的实现原理 前面的文章Lock的实现中分析了AQS独占锁的实现原理,那么接下来就分析下AQS是如何实现共享锁的. 共享锁的介绍 共享锁:同一时刻有多个线程能够获取到同步状态. 那么它是 ...

  5. 面经手册 · 第18篇《AQS 共享锁,Semaphore、CountDownLatch,听说数据库连接池可以用到!》

    作者:小傅哥 博客:https://bugstack.cn Github:https://github.com/fuzhengwei/CodeGuide/wiki 沉淀.分享.成长,让自己和他人都能有 ...

  6. 图解Semaphore信号量之AQS共享锁-非公平模式

    介绍 之前我们已经讲解过关于AQS的独占锁,这一章节主要讲解AQS的共享锁,以Semaphore信号量来进行讲解,相信通过看了本章节内容的同学可以对AQS的共享模式有一个了解,Semaphore信号量 ...

  7. JUC之CountDownLatch的源码和使用场景分析

    最近工作不饱和,写写文章充充电.何以解忧,唯有Coding.后续更新的文章涉及的方向有:ThreadPoolExecutor.Spring.MyBatis.ReentrantLock.CyclicBa ...

  8. CountDownLatch 源码分析

    1. 类介绍 一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待.用给定的计数 初始化 CountDownLatch.由于调用了 countDown() 方法,所以在 ...

  9. 多线程—AQS独占锁与共享锁原理

    java.util.concurrent.locks包下,包含了多种锁,ReentrantLock独占锁.ReentrantReadWriteLock读写锁等,还有java.util.concurre ...

  10. 抽象同步器AQS应用之-- Semaphore、CountDownLatch、CyclicBarrier的介绍

    文章目录 1. Semaphore 2. CountDownLatch 3. CyclicBarrier 1. Semaphore Semaphore字面意思是信号量,作用是控制访问特定资源的线程数目 ...

最新文章

  1. Gitlab服务器搭建
  2. 光缆衰减标准(待补充)
  3. 理解正确的日志输出级别
  4. JeeWx 微信开发公开课(Jeewx-API 专题),今晚8点不见不散
  5. 怎么用计算机弹是你,抖音带你去旅行怎么用计算器弹奏_抖音带你去旅行计算器乐谱_管理资源吧...
  6. win32开发(图形绘制)
  7. 【报告分享】2021年小红书美妆护肤洞察报告.pdf(附下载链接)
  8. 机器视觉:PC式视觉系统与嵌入式视觉系统区别
  9. [导入]New ASP.NET Charting Control: asp:chart runat=server/
  10. 每个人都应该了解的HTTPS知识
  11. Jquery Sparklines ref
  12. mysql基础之忘掉密码解决办法及恢复root最高权限办法
  13. PWM波、方波的输出与捕获
  14. 自动阅读专业版第九次更新---原薅羊毛专业版(最后一次源代码分享)
  15. html5毕业论文总结,毕业论文总结4
  16. 个人网站性能优化经历(6)网站安全方面优化
  17. C-11 Problem H: 开宝箱2
  18. 好东西大家分享: 怎么画数据流图
  19. 纯C语言INI文件解析
  20. 单片机带掉电保护c语言,基于LM358的单片机掉电保护电路

热门文章

  1. python lasso回归分析_解析python实现Lasso回归
  2. 贝叶斯概率推断(一):贝叶斯思维
  3. 翻译: Octave 入门教程
  4. 极客大学产品经理训练营 产品思维和产品意识 作业2
  5. java过载保护_微服务过载保护原理与实战
  6. android勾选控件_【Android 开发】:UI控件之复选框控件 CheckBox 的使用方法
  7. 2008服务器系统只有回收站,清除Windows Server 2008 R2中所有用户的回收站
  8. 2021-09-14基于用 户 行为 序列建模的推荐算法研究
  9. 102 二叉树层序遍历Binary Tree Level Order Traversal @ Python
  10. 广义注意力- saliency map 关注图、gaze、Att