1.简介

在分析完AbstractQueuedSynchronizer(以下简称 AQS)和ReentrantLock的原理后,本文将分析 java.util.concurrent 包下的两个线程同步组件CountDownLatchCyclicBarrier。这两个同步组件比较常用,也经常被放在一起对比。通过分析这两个同步组件,可使我们对 Java 线程间协同有更深入的了解。同时通过分析其原理,也可使我们做到知其然,并知其所以然。

这里首先来介绍一下 CountDownLatch 的用途,CountDownLatch 允许一个或一组线程等待其他线程完成后再恢复运行。线程可通过调用await方法进入等待状态,在其他线程调用countDown方法将计数器减为0后,处于等待状态的线程即可恢复运行。CyclicBarrier (可循环使用的屏障)则与此不同,CyclicBarrier 允许一组线程到达屏障后阻塞住,直到最后一个线程进入到达屏障,所有线程才恢复运行。它们之间主要的区别在于唤醒等待线程的时机。CountDownLatch 是在计数器减为0后,唤醒等待线程。CyclicBarrier 是在计数器(等待线程数)增长到指定数量后,再唤醒等待线程。除此之外,两种之间还有一些其他的差异,这个将会在后面进行说明。

在下一章中,我将会介绍一下两者的实现原理,继续往下看吧。

2.原理

2.1 CountDownLatch 的实现原理

CountDownLatch 的同步功能是基于 AQS 实现的,CountDownLatch 使用 AQS 中的 state 成员变量作为计数器。在 state 不为0的情况下,凡是调用 await 方法的线程将会被阻塞,并被放入 AQS 所维护的同步队列中进行等待。大致示意图如下:

每个阻塞的线程都会被封装成节点对象,节点之间通过 prev 和 next 指针形成同步队列。初始情况下,队列的头结点是一个虚拟节点。该节点仅是一个占位符,没什么特别的意义。每当有一个线程调用 countDown 方法,就将计数器 state–。当 state 被减至0时,队列中的节点就会按照 FIFO 顺序被唤醒,被阻塞的线程即可恢复运行。

CountDownLatch 本身的原理并不难理解,不过如果大家想深入理解 CountDownLatch 的实现细节,那么需要先去学习一下 AQS 的相关原理。CountDownLatch 是基于 AQS 实现的,所以理解 AQS 是学习 CountDownLatch 的前置条件。我在之前写过一篇关于 AQS 的文章 Java 重入锁 ReentrantLock 原理分析,有兴趣的朋友可以去读一读。

2.2 CyclicBarrier 的实现原理

与 CountDownLatch 的实现方式不同,CyclicBarrier 并没有直接通过 AQS 实现同步功能,而是在重入锁 ReentrantLock 的基础上实现的。在 CyclicBarrier 中,线程访问 await 方法需先获取锁才能访问。在最后一个线程访问 await 方法前,其他线程进入 await 方法中后,会调用 Condition 的 await 方法进入等待状态。在最后一个线程进入 CyclicBarrier await 方法后,该线程将会调用 Condition 的 signalAll 方法唤醒所有处于等待状态中的线程。同时,最后一个进入 await 的线程还会重置 CyclicBarrier 的状态,使其可以重复使用。

在创建 CyclicBarrier 对象时,需要转入一个值,用于初始化 CyclicBarrier 的成员变量 parties,该成员变量表示屏障拦截的线程数。当到达屏障的线程数小于 parties 时,这些线程都会被阻塞住。当最后一个线程到达屏障后,此前被阻塞的线程才会被唤醒。

3.源码分析

通过前面简单的分析,相信大家对 CountDownLatch 和 CyclicBarrier 的原理有一定的了解了。那么接下来趁热打铁,我们一起探索一下这两个同步组件的具体实现吧。

3.1 CountDownLatch 源码分析

CountDownLatch 的原理不是很复杂,所以在具体的实现上,也不是很复杂。当然,前面说过 CountDownLatch 是基于 AQS 实现的,AQS 的实现则要复杂的多。不过这里仅要求大家掌握 AQS 的基本原理,知道它内部维护了一个同步队列,同步队列中的线程会按照 FIFO 依次获取同步状态就行了。好了,下面我们一起去看一下 CountDownLatch 的源码吧。

3.1.1 源码结构

CountDownLatch 的代码量不大,加上注释也不过300多行,所以它的代码结构也会比较简单。如下:

如上图,CountDownLatch 源码包含一个构造方法和一个私有成员变量,以及数个普通方法和一个重要的静态内部类 Sync。CountDownLatch 的主要逻辑都是封装在 Sync 和其父类 AQS 里的。所以分析 CountDownLatch 的源码,本质上是分析 Sync 和 AQS 的原理。相关的分析,将会在下一节中展开,本节先说到这。

3.1.2 构造方法及成员变量

本节来分析一下 CountDownLatch 的构造方法和其 Sync 类型的成员变量实现,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class CountDownLatch {private final Sync sync;/** CountDownLatch 的构造方法,该方法要求传入大于0的整型数值作为计数器 */public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");// 初始化 Syncthis.sync = new Sync(count);}/** CountDownLatch 的同步控制器,继承自 AQS */private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;Sync(int count) {// 设置 AQS statesetState(count);}int getCount() {return getState();}/** 尝试在共享状态下获取同步状态,该方法在 AQS 中是抽象方法,这里进行了覆写 */protected int tryAcquireShared(int acquires) {/** 如果 state = 0,则返回1,表明可获取同步状态,* 此时线程调用 await 方法时就不会被阻塞。*/ return (getState() == 0) ? 1 : -1;}/** 尝试在共享状态下释放同步状态,该方法在 AQS 中也是抽象方法 */protected boolean tryReleaseShared(int releases) {/** 下面的逻辑是将 state--,state 减至0时,调用 await 等待的线程会被唤醒。* 这里使用循环 + CAS,表明会存在竞争的情况,也就是多个线程可能会同时调用 * countDown 方法。在 state 不为0的情况下,线程调用 countDown 是必须要完* 成 state-- 这个操作。所以这里使用了循环 + CAS,确保 countDown 方法可正* 常运行。*/for (;;) {// 获取 stateint c = getState();if (c == 0)return false;int nextc = c-1;// 使用 CAS 设置新的 state 值if (compareAndSetState(c, nextc))return nextc == 0;}}}
}

需要说明的是,Sync 中的 tryAcquireShared 和 tryReleaseShared 方法并不是直接给 await 和 countDown 方法调用了的,这两个方法以“try”开头的方法最终会在 AQS 中被调用。

3.1.3 await

CountDownLatch中有两个版本的 await 方法,一个响应中断,另一个在此基础上增加了超时功能。本节将分析无超时功能的 await,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/** * 该方法会使线程进入等待状态,直到计数器减至0,或者线程被中断。当计数器为0时,调用* 此方法将会立即返回,不会被阻塞住。*/
public void await() throws InterruptedException {// 调用 AQS 中的 acquireSharedInterruptibly 方法sync.acquireSharedInterruptibly(1);
}/** 带有超时功能的 await */
public boolean await(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}+--- AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {// 若线程被中断,则直接抛出中断异常if (Thread.interrupted())throw new InterruptedException();// 调用 Sync 中覆写的 tryAcquireShared 方法,尝试获取同步状态if (tryAcquireShared(arg) < 0)/** 若 tryAcquireShared 小于0,则表示获取同步状态失败,* 此时将线程放入 AQS 的同步队列中进行等待。*/ doAcquireSharedInterruptibly(arg);
}

从上面的代码中可以看出,CountDownLatch await 方法实际上调用的是 AQS 的 acquireSharedInterruptibly 方法。该方法会在内部调用 Sync 所覆写的 tryAcquireShared 方法。在 state != 0时,tryAcquireShared 返回值 -1。此时线程将进入 doAcquireSharedInterruptibly 方法中,在此方法中,线程会被放入同步队列中进行等待。若 state = 0,此时 tryAcquireShared 返回1,acquireSharedInterruptibly 会直接返回。此时调用 await 的线程也不会被阻塞住。

3.1.4 countDown

与 await 方法一样,countDown 实际上也是对 AQS 方法的一层封装。具体的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/** 该方法的作用是将计数器进行自减操作,当计数器为0时,唤醒正在同步队列中等待的线程 */
public void countDown() {// 调用 AQS 中的 releaseShared 方法sync.releaseShared(1);
}+--- AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {// 调用 Sync 中的 tryReleaseShared 尝试释放同步状态if (tryReleaseShared(arg)) {/** tryReleaseShared 返回 true 时,表明 state = 0,即计数器为0。此时调用* doReleaseShared 方法唤醒正在同步队列中等待的线程*/ doReleaseShared();return true;}return false;
}

以上就是 countDown 的源码分析,不是很难懂,这里就不啰嗦了。

3.2 CyclicBarrier 源码分析

3.2.1 源码结构

如前面所说,CyclicBarrier 是基于重入锁 ReentrantLock 实现相关逻辑的。所以要弄懂 CyclicBarrier 的源码,仅需有 ReentrantLock 相关的背景知识即可。关于重入锁 ReentrantLock 方面的知识,有兴趣的朋友可以参考我之前写的文章 Java 重入锁 ReentrantLock 原理分析。下面看一下 CyclicBarrier 的代码结构吧,如下:

从上图可以看出,CyclicBarrier 包含了一个静态内部类Generation、数个方法和一些成员变量。结构上比 CountDownLatch 略为复杂一些,但总体仍比较简单。好了,接下来进入源码分析部分吧。

3.2.2 构造方法及成员变量

CyclicBarrier 包含两个有参构造方法,分别如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/** 创建一个允许 parties 个线程通行的屏障 */
public CyclicBarrier(int parties) {this(parties, null);
}/** * 创建一个允许 parties 个线程通行的屏障,若 barrierAction 回调对象不为 null,* 则在最后一个线程到达屏障后,执行相应的回调逻辑*/
public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;
}

上面的第二个构造方法初始化了一些成员变量,下面我们就来说明一下这些成员变量的作用。

成员变量 作用
parties 线程数,即当 parties 个线程到达屏障后,屏障才会放行
count 计数器,当 count > 0 时,到达屏障的线程会进入等待状态。当最后一个线程到达屏障后,count 自减至0。最后一个到达的线程会执行回调方法,并唤醒其他处于等待状态中的线程。
barrierCommand 回调对象,如果不为 null,会在第 parties 个线程到达屏障后被执行

除了上面几个成员变量,还有一个成员变量需要说明一下,如下:

1
2
3
4
5
6
7
8
9
10
11
/** * CyclicBarrier 是可循环使用的屏障,这里使用 Generation 记录当前轮次 CyclicBarrier * 的运行状态。当所有线程到达屏障后,generation 将会被更新,表示 CyclicBarrier 进入新一* 轮的运行轮次中。
*/
private Generation generation = new Generation();private static class Generation {// 用于记录屏障有没有被破坏boolean broken = false;
}

3.2.3 await

上一节所提到的几个成员变量,在 await 方法中将会悉数登场。下面就来分析一下 await 方法的试下,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
public int await() throws InterruptedException, BrokenBarrierException {try {// await 的逻辑封装在 dowait 中return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}
}private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException, TimeoutException {final ReentrantLock lock = this.lock;// 加锁lock.lock();try {final Generation g = generation;// 如果 g.broken = true,表明屏障被破坏了,这里直接抛出异常if (g.broken)throw new BrokenBarrierException();// 如果线程中断,则调用 breakBarrier 破坏屏障if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}/** index 表示线程到达屏障的顺序,index = parties - 1 表明当前线程是第一个* 到达屏障的。index = 0,表明当前线程是最有一个到达屏障的。*/ int index = --count;// 当 index = 0 时,唤醒所有处于等待状态的线程if (index == 0) {  // trippedboolean ranAction = false;try {final Runnable command = barrierCommand;// 如果回调对象不为 null,则执行回调if (command != null)command.run();ranAction = true;// 重置屏障状态,使其进入新一轮的运行过程中nextGeneration();return 0;} finally {// 若执行回调的过程中发生异常,此时调用 breakBarrier 破坏屏障if (!ranAction)breakBarrier();}}// 线程运行到此处的线程都会被屏障挡住,并进入等待状态。for (;;) {try {if (!timed)trip.await();else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {/** 若下面的条件成立,则表明本轮运行还未结束。此时调用 breakBarrier * 破坏屏障,唤醒其他线程,并抛出异常*/ if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {/** 若上面的条件不成立,则有两种可能:* 1. g != generation*     此种情况下,表明循环屏障的第 g 轮次的运行已经结束,屏障已经*     进入了新的一轮运行轮次中。当前线程在稍后返回 到达屏障 的顺序即可*     * 2. g = generation 但 g.broken = true*     此种情况下,表明已经有线程执行过 breakBarrier 方法了,当前*     线程则会在稍后抛出 BrokenBarrierException*/Thread.currentThread().interrupt();}}// 屏障被破坏,则抛出 BrokenBarrierException 异常if (g.broken)throw new BrokenBarrierException();// 屏障进入新的运行轮次,此时返回线程在上一轮次到达屏障的顺序if (g != generation)return index;// 超时判断if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}
}/** 开启新的一轮运行过程 */
private void nextGeneration() {// 唤醒所有处于等待状态中的线程trip.signalAll();// 重置 countcount = parties;// 重新创建 Generation,表明进入循环屏障进入新的一轮运行轮次中generation = new Generation();
}/** 破坏屏障 */
private void breakBarrier() {// 设置屏障是否被破坏标志generation.broken = true;// 重置 countcount = parties;// 唤醒所有处于等待状态中的线程trip.signalAll();
}

3.2.4 reset

reset 方法用于强制重置屏障,使屏障进入新一轮的运行过程中。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
public void reset() {final ReentrantLock lock = this.lock;lock.lock();try {// 破坏屏障breakBarrier();   // break the current generation// 开启新一轮的运行过程nextGeneration(); // start a new generation} finally {lock.unlock();}
}

reset 方法并不复杂,没什么好讲的。CyclicBarrier 中还有其他一些方法,均不复杂,这里就不一一分析了。

4.两者区别

看完上面的分析,相信大家对着两个同步组件有了更深入的认识。那么下面趁热打铁,简单对比一下两者之间的区别。这里用一个表格列举一下:

差异点 CountDownLatch CyclicBarrier
是否可循环使用
是否可设置回调

除了上面列举的差异点,还有一些其他方面的差异,这里就不一一列举了。

5.总结

分析完 CountDownLatch 和 CyclicBarrier,不知道大家有什么感觉。我个人的感觉是这两个类的源码并不复杂,比较好理解。当然,前提是建立在对 AQS 以及 ReentrantLock 有较深的理解之上。所以在学习这两个类的源码时,还是建议大家先看看前置知识。

好了,本文到这里就结束了。谢谢阅读,再见。

  • 本文链接: https://www.tianxiaobo.com/2018/05/10/Java-线程同步组件-CountDownLatch-与-CyclicBarrier-原理分析/

from: http://www.tianxiaobo.com/2018/05/10/Java-%E7%BA%BF%E7%A8%8B%E5%90%8C%E6%AD%A5%E7%BB%84%E4%BB%B6-CountDownLatch-%E4%B8%8E-CyclicBarrier-%E5%8E%9F%E7%90%86%E5%88%86%E6%9E%90/

Java 线程同步组件 CountDownLatch 与 CyclicBarrier 原理分析相关推荐

  1. Java并发包源码学习系列:同步组件CountDownLatch源码解析

    文章目录 CountDownLatch概述 使用案例与基本思路 类图与基本结构 void await() boolean await(long timeout, TimeUnit unit) void ...

  2. java线程同步原理

    一. java线程同步原理 java会为每个object对象分配一个monitor,当某个对象的同步方法(synchronized methods)被多个线程调用时,该对象的monitor将负责处理这 ...

  3. Java线程池,从使用到原理

    转载自  Java线程池,从使用到原理 线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象 ...

  4. Java并发编程:CountDownLatch、CyclicBarrier和Semaphore

    2019独角兽企业重金招聘Python工程师标准>>> Java并发编程:CountDownLatch.CyclicBarrier和Semaphore 在java 1.5中,提供了一 ...

  5. Java线程同步的几种方式

    Java线程同步的几种方式 1.使用synchronized关键字  它的工作是对同步的代码加锁,使得每一次只能有一个线程进入同步块,从而保证线程间的安全性.  synchronized关键字的用法: ...

  6. java线程 同步与异步 线程池

    1)多线程并发时,多个线程同时请求同一个资源,必然导致此资源的数据不安全,A线程修改了B线 程的处理的数据,而B线程又修改了A线程处理的数理.显然这是由于全局资源造成的,有时为了解 决此问题,优先考虑 ...

  7. (转) Java线程同步阻塞, sleep(), suspend(), resume(), yield(), wait(), notify()

    为了解决对共享存储区的访问冲突,Java 引入了同步机制.但显然不够,因为在任意时刻所要求的资源不一定已经准备好了被访问,反过来,同一时刻准备好了的资源也可能不止一个. 为解决访问控制问题,Java ...

  8. 关于java线程同步的笔记_线程同步(JAVA笔记-线程基础篇)

    在多线程应用程序中经常会遇到线程同步的问题.比如:两个线程A.线程B可能会 "同时" 执行同一段代码,或修改同一个变量.而很多时候我们是不希望这样的. 这时候,就需要用到线程同步. ...

  9. java线程同步——条件对象+synchronized 关键字

    [0]README 0.1) 本文描述转自 core java volume 1, 源代码为原创,旨在理解 java线程同步--条件对象+synchronized 关键字 的相关知识: 0.2)for ...

最新文章

  1. python中字典格式_如何在Python中使用带有字典的格式函数和整数键
  2. Oracle怎么查外键建在哪个表上
  3. LVS技术浅析-proc参数
  4. 通过MATLAB将数据转化为mif文件,供Quartusii软件的ROM核读取调用
  5. SDN教育城域网解决方案
  6. solaris查看主机信息
  7. 微观计量经济学_微观经济学与数据科学
  8. 系列 | 高性能存储-MySQL数据库之存储过程揭秘
  9. JS之Window对象
  10. h5下划线怎么设置_【Word技巧】毕业论文封面那条永远对不齐的下划线?
  11. vsftp 配置参数详解
  12. python csv wordpress xmlrpc_wordpress_xmlrpc模块批量自动发布文章到wordpress - 老牛博客...
  13. 软考高项-项目知识管理体系
  14. Word打字很卡顿 Office打字时反应慢 延迟 Excel输入迟钝 PPT卡死的终极解决办法大全(24种方法)
  15. 2019千股跌停情况是什么?千股跌停原因都有哪些?
  16. 《计算机组成原理》课程学习(7)——第3篇 中央处理器——第7章 指令系统
  17. Symantec Backup Exec 2010 安装报 bad ELF interpreter: No such file or directory
  18. dnf服务器炸团门票怎么找回,dnf补票小技巧 再也不怕掉线炸团制裁
  19. 云计算课程week5
  20. display:Wayland Architecture

热门文章

  1. 历经8年双11流量洗礼,淘宝开放平台如何攻克技术难关?--转
  2. java.sql.SQLException: Lock wait timeout exceeded --转
  3. 著名投资人Chris Dixon:计算的下一波浪潮是什么?
  4. 互联网到了什么程度?
  5. MySQL-CentOS7通过YUM安装MySQL5.7.29
  6. 实战SSM_O2O商铺_02数据模型设计及实体类的创建
  7. MySQL 实现一个字段赋值给另一个字段
  8. 个人博客系统的设计与实现_一个 Go 开发的快速、简洁、美观、前后端分离的个人博客系统...
  9. python args kwargs_Python中的args和kwargs
  10. c语言goto语句用法_C语言中的goto语句该不该使用?