Java中CountDownLatch和CyclicBarrier都是用来做多线程同步的。下面分析一下他们功能的异同。

CountDownLatch

CountDownLatch基于AQS(同步器AbstractQueuedSynchronizer浅析),CountDownLatch中有一个内部类Sync,Sync继承自AbstractQueuedSynchronizer。 我们先看一个CountDownLatch的例子,然后再具体分析源码。

一个CountDownLatch例子

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class CountDownLatchExample {private ExecutorService executorService;private CountDownLatch countDownLatch;private int parties;public static void main(String[] args){CountDownLatchExample countDownLatchExample = new CountDownLatchExample(10);try {countDownLatchExample.example();} catch (InterruptedException e) {e.printStackTrace();}}public CountDownLatchExample(int parties) {executorService = Executors.newFixedThreadPool(parties);countDownLatch = new CountDownLatch(parties);this.parties = parties;}public void example() throws InterruptedException {for (int i = 0; i < parties; i++) {executorService.submit(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " gets job done");countDownLatch.countDown(); // 线程完成任务之后countDown});}// 等待所有的任务完成countDownLatch.await();System.out.println(Thread.currentThread().getName() + " reach barrier");executorService.shutdown();}
}复制代码

上面是一个CountDownLatch的例子,CountDownLatch的API还是很简单的,主要就是countDown和await两个方法。CountDownLatch实例化时将count设置为AQS的state,每次countDown时CAS将state设置为state - 1,await时首先会检查当前state是否为0,如果为0则代表所有的任务完成了,await结束,否则主线程将循环重试,直到线程被中断或者任务完成或者等待超时。下面具体看看CountDownLatch的源码。

CountDownLatch源码分析

首先看一下CountDownLatch的构造函数和Sync的代码:

public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count); // 设置Sync(AQS)的state为count
}private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;Sync(int count) {setState(count); // 设置AQS的state为count}int getCount() {return getState();}// protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1; // 重写了AQS的方法}protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) { // 循环CAS重试int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}
}
复制代码

CountDownLatch构造参数count代表当前参与同步的线程数目,然后设置当前AQS的状态为count。Sync覆写了tryAcquireShared和tryReleaseShared方法。tryAcquireShared方法中会判断当前AQS的state是否为0,如果是0才能获取成功(返回1),否则,获取失败(返回-1)。tryReleaseShared通过for循环进行CAS设置状态。 CountDownLatch中最主要的两个方法是:countDown和await:

public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}public boolean await(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}public void countDown() {sync.releaseShared(1);
}
复制代码

每次一个任务完成之后,调用CountDownLatch的countDown方法,将当前AQS的state减1(AQS初始state为count)。countDown的逻辑其实比较简单,就是通过重试CAS设置当前AQS的state为state - 1。这里着重看一下await方法,await方法体中调用AQS的acquireSharedInterruptibly或者tryAcquireSharedNanos方法。看一下AQS的acquireSharedInterruptibly方法:

public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0) // tryAcquireShared这里已经在Sync中重写了doAcquireSharedInterruptibly(arg);
}private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) { // 循环重试final Node p = node.predecessor(); // 对于CountDownLatch来说,等待队列中其实只有一个main线程在等待,因此这里第一次就应该判断条件`p == head`成立if (p == head) {int r = tryAcquireShared(arg); // 方法已经在CountDownLatch$Sync中重写了if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}// 如果当前失败,则挂起线程,循环重试if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}
复制代码

前面已经分析过AQS了,这里对AQS部分不多做解释了。主要是这里的tryAcquireShared方法,CountDownLatch的Sync内部类中重写了该方法。如果当前state等于0才能获取成功,因此只有当所有任务完成,此时AQS的state为0,await方法才会返回(不考虑中断和超时)。

CyclicBarrier

CyclicBarrier较CountDownLatch而言主要多了两个功能:

  1. 支持重置状态,达到循环利用的目的。这也是Cyclic的由来。CyclicBarrier中有一个内部类Generation,代表当前的同步处于哪一个阶段。当最后一个任务完成,执行任务的线程会通过nextGeneration方法来重置Generation。也可以通过CyclicBarrier的reset方法来重置Generation。
  2. 支持barrierCommand,当最后一个任务运行完成,执行任务的线程会检查CyclicBarrier的barrierCommand是否为null,如果不为null,则运行该任务。

一个CyclicBarrier例子

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class CyclicBarrierExample {private ExecutorService executorService;private CyclicBarrier cyclicBarrier;private int parties;public CyclicBarrierExample(int parties) {executorService = Executors.newFixedThreadPool(parties);cyclicBarrier = new CyclicBarrier(parties, () -> System.out.println(Thread.currentThread().getName() + " gets barrierCommand done"));this.parties = parties;}public static void main(String[] args) {CyclicBarrierExample cyclicBarrierExample = new CyclicBarrierExample(10);cyclicBarrierExample.example();}public void example() {for (int i = 0; i < parties; i++) {executorService.submit(() -> {try {Thread.sleep(1000);cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " gets job done");});}executorService.shutdown();}
}复制代码

可以看到这里CyclicBarrier主要的API就是await方法,每个任务最后调用这个方法等待最后一个任务完成,在这之前所有的线程都会等待。

CyclicBarrier源码分析

CyclicBarrier主要有以下属性:

/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock(); // 锁
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition(); // 锁关联的Condition,用于线程同步
/** The number of parties */
private final int parties; // 多少个任务参与同步
/* The command to run when tripped */
private final Runnable barrierCommand; // 最后一个任务运行线程应该执行的command
/** The current generation */
private Generation generation = new Generation(); // 当前CyclicBarrier所处的Generation/*** Number of parties still waiting. Counts down from parties to 0* on each generation.  It is reset to parties on each new* generation or when broken.*/
private int count; // 剩余的等待的任务数目,取值范围:[0-parties]
复制代码

CyclicBarrier主要的API就是await方法和reset方法。

public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L); // 等待所有任务完成} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}
}public void reset() {final ReentrantLock lock = this.lock;lock.lock();try {breakBarrier();   // break the current generation// 开启下一个Generation,达到循环使用的目的nextGeneration(); // start a new generation} finally {lock.unlock();}
}
复制代码

在每一个任务结束时我们调用CyclicBarrier的await方法,所有的线程都会等待最后一个任务完成才会退出。注意CountDownLatch中任务执行线程调用完了countDown方法之后就会退出,不会等待最后一个任务完成才会退出,这也是CountDownLatch和CyclicBarrier的一个区别。 我们主要看一下dowait方法:

private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock; // 加锁lock.lock();try {final Generation g = generation; // 当前CyclicBarrier所处的Generationif (g.broken) // 检查Generation的broken标志位throw new BrokenBarrierException(); // 被中断if (Thread.interrupted()) { // 检查当前线程是否被中断,响应中断breakBarrier();throw new InterruptedException();}int index = --count; // 剩余等待任务数if (index == 0) {  // trippedboolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null) // 如果当前任务是最后一个任务,则执行任务线程运行barrierCommandcommand.run();ranAction = true;nextGeneration(); // 重置Generationreturn 0;} finally {if (!ranAction)breakBarrier();}}// loop until tripped, broken, interrupted, or timed outfor (;;) {try {// 阻塞当前线程,等待最后一个任务完成,然后线程会通过nextGeneration方法调用trip.signallAll唤醒等待线程。注意当线程进入WAITING(调用await)状态或者TIMED_WAITING(awaitNanos)状态后会让出锁。if (!timed)trip.await();else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {// We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.Thread.currentThread().interrupt();}}if (g.broken) // 响应中断throw new BrokenBarrierException();if (g != generation) // 已经是下一个generation,说明最后一个任务已经执行完成,返回return index;if (timed && nanos <= 0L) { // 检查是否超时breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}
}
复制代码
  1. 首先会检查当前Generation的broken标志位,如果为true,则抛出异常。再响应中断请求。
  2. 检查当前是否为最后一个任务,如果是则检查barrierCommand是否为null,如果不为null则执行任务。如果不是最后一个任务则到步骤3。
  3. 判断是否需要阻塞当前线程,如果需要则通过trip的await或者awaitNanos方法使其进入休眠,注意休眠后线程会让出锁。如果休眠超时则抛出TimeoutException异常。休眠完成后(要么休眠超时要么被最有一个任务执行线程唤醒)会响应中断请求,再判断当前generation是否改变(最后一个执行任务线程会通过nextGeneration方法改变generation),如果改变直接返回。

其流程图如下:

总结

  1. CountDownLatch和CyclicBarrier都是用作多线程同步,CountDownLatch基于AQS,CyclicBarrier基于ReentrantLock。
  2. CyclicBarrier支持复用和barrierCommand,但是CountDownLatch不支持。
  3. CyclicBarrier会阻塞线程,在最后一个任务执行线程完成之前,其余线程都必须等待,而线程在调用CountDownLatch的countDown方法之后就会结束。

谈谈CountDownLatch和CyclicBarrier相关推荐

  1. Java并发编程之CountDownLatch、CyclicBarrier和Semaphore

    前言 本文为对CountDownLatch.CyclicBarrier.Semaphore的整理使用 CountDownLatch CountDownLatch类位于java.util.concurr ...

  2. Java的CountDownLatch和CyclicBarrier的理解和区别

    CountDownLatch和CyclicBarrier的功能看起来很相似,不易区分,有一种谜之的神秘.本文将通过通俗的例子并结合代码讲解两者的使用方法和区别. CountDownLatch和Cycl ...

  3. 使用Java辅助类(CountDownLatch、CyclicBarrier、Semaphore)并发编程

    在java 1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarrier和Semaphore,今天我们就来学习一下这三个辅助类的用法 一.C ...

  4. Java并发编程:CountDownLatch、CyclicBarrier和 Semaphore

    2019独角兽企业重金招聘Python工程师标准>>> 在java 1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarr ...

  5. 秒懂 CountDownLatch 与 CyclicBarrier 使用场景

    作者 | pony-zi 来源 | https://blog.csdn.net/zzg1229059735/article/details/61191679 相信每个想深入了解多线程开发的Java开发 ...

  6. 并发工具类:CountDownLatch、CyclicBarrier、Semaphore

    在多线程的场景下,有些并发流程需要人为来控制,在JDK的并发包里提供了几个并发工具类:CountDownLatch.CyclicBarrier.Semaphore. 一.CountDownLatch ...

  7. Java 线程同步组件 CountDownLatch 与 CyclicBarrier 原理分析

    1.简介 在分析完AbstractQueuedSynchronizer(以下简称 AQS)和ReentrantLock的原理后,本文将分析 java.util.concurrent 包下的两个线程同步 ...

  8. JUC 中的多线程协作工具类:CountDownLatch 和 CyclicBarrier

    点击上方 好好学java ,选择 星标 公众号 重磅资讯.干货,第一时间送达 今日推荐:干掉 Navicat:这个 IDEA 的兄弟真香!个人原创100W+访问量博客:点击前往,查看更多 最近在学习 ...

  9. CountDownLatch、CyclicBarrier、Semaphore的区别,你知道吗?

    点击上方 好好学java ,选择 星标 公众号 重磅资讯.干货,第一时间送达 今日推荐:是时候扔掉Postman了,又一个被低估的IDEA插件出来了...个人原创+1博客:点击前往,查看更多 链接:h ...

最新文章

  1. AAAI 2021 最「严」一届发榜,1692 篇论文中选,录取率仅为 21%
  2. win10安装程序无法将配置为在此计算机,Win10安装会遇到的问题汇总及解决方法...
  3. Yann LeCun最新文章:自监督学习的统一框架
  4. des vue 加密解密_vue DES 加密
  5. 装饰器模式(讲解+应用)
  6. Google Desktop 果然
  7. CentOS自动打开网络连接
  8. 【报告分享】2020年抖音美妆直播报告.pdf(附下载链接)
  9. Linux 下 Git 的源码安装
  10. dfmea文件_DFMEA范本.doc
  11. jsp购物车加mysql_网上购物车(jsp+servlet+mysql)
  12. 完整的Java软件开发学习路线
  13. Nr,GenBank, RefSeq, UniProt 数据库的异同
  14. case when 多条件查询
  15. css前端日记之盒子模型-----一起去未来
  16. 黑苹果万能驱动神器 Hackintool 3.8.4中文版
  17. wex5 实战 常用代码模型集合
  18. java lisp_AI编程:5种最流行的人工智能编程语言!
  19. 聊聊云原生时代湖仓一体建设
  20. verilog计数器

热门文章

  1. 机器学习实战(五)支持向量机SVM(Support Vector Machine)
  2. 新书问答:Agile Management
  3. 【六】Jmeter:断言
  4. java学生通讯录_Java实现XML文件学生通讯录
  5. python抓取汤不热视频_你们想要的 Tumblr 爬虫
  6. linux 修改bios信息,一种基于Linux系统通过IPMI工具修改BIOS选项的测试方法及系统_2017103901253_权利要求书_专利查询_专利网_钻瓜专利网...
  7. matlab求灰度图像梯度,[求助]如何求图像的梯度
  8. es管理器免root_OPPO手机免ROOT更换系统字体教程-适合大部分OPPO机型
  9. 2 华为云闪付_教你区分信用卡刷卡、挥卡、插卡、云闪付等支付方式!
  10. linux里面的perl脚本怎么调用函数,如何在我的Perl脚本中包含另一个文件的函数?...