文章目录

  • 简介
  • 例子
  • 实现原理
  • 小结

简介

从字面意思理解,CyclicBarrier是回环屏障的意思,它可以让一组线程全部达到一个状态后再全部同时执行。这里之所以叫作回环是因为当所有等待线程执行完毕,并重置CyclicBarrier 的状态后它可以被重用。之所以叫作屏障是因为线程调用await 方法后就会被阻塞,这个阻塞点就称为屏障点,等所有线程都调用了await方法后,线程们就会冲破屏障,继续向下运行。

CyclicBarrier是一种同步辅助工具,允许一组线程相互等待,直到达到共同的障碍点.

经常用于一组固定数量的线程必须相互等待的程序.

假如计数器值为N,那么随后调用await方法的N-1个线程都会因为到达屏障点而被阻塞,当第N个线程调用await后,计数器值为 0了,这时候第N个线程才会发出通知唤醒前面的N-1个线程。也就是当全部线程都到达屏障点时才能一块继续向下执行。

线程进入屏障通过CyclicBarrier的await()方法。

CyclicBarrier实例是可重复使用的:所有等待线程被唤醒的时候,任何线程再次执行CyclicBarrier.await()又会被暂停,直到这些线程中的最后一个线程执行了CyclicBarrier.await().

例子

如下例子,新建10个线程,直到10个线程都调用了await方法,即都到达屏障点后,就调用CyclicBarrier初始化时定义的方法(召唤神龙).

public static void main(String[] args) throws InterruptedException {CyclicBarrier cyclicBarrier = new CyclicBarrier(10, () -> {System.out.println("召唤神龙");});for (int i = 0; i < 10; i++) {new Thread(()->{try {System.out.println(Thread.currentThread().getName()+"收集到龙珠");cyclicBarrier.await(); //等待其他线程执行完自己的操作,当等待线程数量达到10时,会召唤神龙} catch (Exception e) {e.printStackTrace();}}, Thread.currentThread().getName()+":"+i).start();}

如下例子:假设一个任务由阶段1、阶段2和阶段3组成,每个线程要串行地执行阶段1、阶段2和阶段3,当多个线程执行该任务时,必须要保证所有线程的阶段1全部完成后才能进入阶段2执行,当所有线程的阶段2全部完成后才能进入阶段3执行。该例子利用了CyclicBarrier的可复用性.

 public static void main(String[] args) throws Exception {CyclicBarrier cyclicBarrier = new CyclicBarrier(3);ExecutorService executorService = Executors.newFixedThreadPool(3);for (int i = 0; i < 3; i++) {executorService.submit(() -> {try {System.out.println(Thread.currentThread().getName() + " step1");cyclicBarrier.await();System.out.println(Thread.currentThread().getName() + " step2");cyclicBarrier.await();System.out.println(Thread.currentThread().getName() + " step3");} catch (Exception e) {}});}executorService.shutdown();
}输出结果:
pool-1-thread-1 step1
pool-1-thread-3 step1
pool-1-thread-2 step1
pool-1-thread-2 step2
pool-1-thread-1 step2
pool-1-thread-3 step2
pool-1-thread-3 step3
pool-1-thread-1 step3
pool-1-thread-2 step3

在如上代码中,每个子线程在执行完阶段1后都调用了await方法,等到所有线程都到达屏障点后才会一块往下执行,这就保证了所有线程都完成了阶段 1后才会开始执行阶段2。然后在阶段 2后面调用了await方法,这保证了所有线程都完成了阶段2后 ,才能开始阶段3的执行。这个功能使用单个CountDownLatch是无法完成的 。

实现原理

private static class Generation {boolean broken = false;
}/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();/*** Number of parties still waiting. Counts down from parties to 0* on each generation.  It is reset to parties on each new* generation or when broken.*/
private int count;

CyclicBarrier基于独占锁实现,本质底层还是基于AQS的。

parties用来记录线程个数,这里表示多少线程调用await后,所有线程才会冲破屏障继续往下运行。而count一开始等于parties,每当有线程调用await方法就递减1,当count为0时就表示所有线程都到了屏障点。

你可能会疑惑,为何维护parties和count两个变量,只使用
count不就可以了?另外别忘了CyclieBarrier是可以被复用的,使用两个变量的原因是,parties始终用来记录总的线程个数,当count计数器值变为0后,会将parties的值赋给count,从而进行复用。这两个变量是在构造CyclicBarrier对象时传递的.如下所示:

public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;
}

还有一个变量barrierCommand也通过构造函数传递,这是一个任务,这个任务的执行时机是当所有线程都到达屏障点后。使用lock首先保证了更新计数器count的原子性。另外使用lock 的条件变量trip支持线程间使用await和signal操作进行同步。

最后,在变量generation内部有一个变量broken,其用来记录当前屏障是否被打破。注意,这里的broken并没有被声明为volatile的,因为是在锁内使用变量,所以不需要声明。

private static class Generation {boolean broken = false;
}

几个重要方法

  1. int await()方法

当前线程调用CyclicBarrier的该方法时会被阻塞,直到满足 下面条件之一才会返回:

  • parties个线程都调用了await()方法,也就是线程都到了屏障点;
  • 其他线程调用了当前线程的interrupt()方法中断了当前线程,则当前线程会抛出InterruptedException异常而返回; - - 与当前屏障点关联的Generation对象的broken标志被设置为 true时,会抛出BrokenBarrierException异常,然后返回。

由如下代码可知,在内部调用了dowait方法。第一个参数为false,则说明不设置超时时间,这时候第二个参数没有意义。

public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}
}
  1. int dowait(boolean timed, long nanos)方法

该方法实现了CyclicBarrier的核心功能,其代码如下:

private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;lock.lock();try {final Generation g = generation;if (g.broken)throw new BrokenBarrierException();if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}//(1)如果index==O则说明所有线程都到了屏障点,此时执行初始化时传递的任务int index = --count;if (index == 0) {  // trippedboolean ranAction = false;try {final Runnable command = barrierCommand;//(2)执行任务if (command != null)command.run();ranAction = true;//(3)激活其他因调用await方法而被阻塞的线程,并重置CyclieBarriernextGeneration();//返回return 0;} finally {if (!ranAction)breakBarrier();}}// loop until tripped, broken, interrupted, or timed out//(4)如果index不为0for (;;) {try {//没有设置超时时间if (!timed)trip.await();//设置了超时时间else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {// We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g != generation)return index;if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}
}private void nextGeneration() {// signal completion of last generation//(7)唤醒条件队列里面阻塞线程trip.signalAll();// set up next generation//重置CyclicBarriercount = parties;generation = new Generation();
}

当一个线程调用了dowait方法后,首先会获取独占锁lock,如果创建CycleBarrier时传递的参数为10,那么后面9个调用钱程会被阻塞。然后当前获取到锁的线程会对计数器count进行递减操作,递减后count=index=9,因为index!=O所以当前线程会执行代码(4)。如果当前线程调用的是无参数的await() 方法 ,则这里timed=false,所以当前线程会被放入条件变量 的trip的条件阻塞队列,当前线程会被挂起并释放获取的lock 锁。如果调用的是有参数的await方法则timed=true,然后当前线程也会被放入条件变量的条件队列并释放锁资源,不同的是当前线程会在指定时间超时后自动被激活。

当第一个获取锁的线程由于被阻塞释放锁后,被阻塞的9个线程中有一个会竞争到lock锁,然后执行与第一个线程同样的操作,直到最后一个线程获取到lock锁,此时己经有9个线程被放入了条件变量trip的条件队列里面。最后count=index等于 0,所以执行代码(2),如果创建CyclicBarrier时传递了任务,则在其他线程被唤醒前先执行任务,任务执行完毕后再执行代码(3),唤醒其他9个线程,并重置 CyclicBarrier,然后这 10个线程就可以继续向下运行了。

小结

CycleBarrier与CountDownLatch的不同在于,前者是可以复用 的,并且前者特别适合分段任务有序执行的场景。

CycleBarrier其底层通过独占锁ReentrantLock实现计数器原子性更新,并使用条件变量队列来实现线程同步。CyclicBarrier内部使用了一个条件变量trip来实现等待/通知.使用了分代(Generation)的概念用于表示CyclicBarrier实例是可以重复使用的.

多线程-并发工具类之CyclicBarrier详解相关推荐

  1. java多线程aqs实现工具类_Java并发多线程 - 并发工具类JUC

    (adsbygoogle = window.adsbygoogle || []).push({}); 安全共享对象策略 1.线程限制 : 一个被线程限制的对象,由线程独占,并且只能被占有它的线程修改 ...

  2. java semaphore 等待时间_一个java同步工具类Semaphore的详解

    Semaphore是java并发包里面的一个工具类,我们限制可以访问某些资源的线程数目就可以使用Semaphore了.这篇文章将对Semaphore的概念和使用进行一个详解. 一.概念理解 官方是这样 ...

  3. JUC 工具类之 Exchanger 详解

    一 Exchanger 简介 线程间的数据共享除了定义一个共享数据然后各个线程去访问这种方式外,还可以使用 Exchanger 交换数据. Exchanger--交换器,是 JDK1.5 时引入的一个 ...

  4. Java并发编程系列之CyclicBarrier详解

    简介 jdk原文 A synchronization aid that allows a set of threads to all wait for each other to reach a co ...

  5. java dateutil 获取时间戳_java DateUtil工具类时间戳类型转换详解

    本文实例为大家分享了DateUtil工具类时间戳类型转换的具体代码,供大家参考,具体内容如下 package com.sinosoft.media.sms.util; import java.text ...

  6. java rowmapper 通用实现_springmvc工具类封装RowMapper详解

    springmvc通常是先写实体,在数据库查询,最后增删改差,最感觉代码很冗余,自己在封装了一下. 常见的结构是: entity:如 package com.liuxinquan.entiry; /* ...

  7. 死磕Java并发:J.U.C之并发工具类:CyclicBarrier

    作者:chenssy 来源:Java技术栈公众号 CyclicBarrier,一个同步辅助类,在API中是这么介绍的: 它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier ...

  8. java线程同步barrier_Java多线程同步工具类之CyclicBarrier

    一.CyclicBarrier使用 CyclicBarrier从字面上可以直接理解为线程运行的屏障,它可以让一组线程执行到一个共同的屏障点时被阻塞,直到最后一个线程执行到指定位置,你设置的执行线程就会 ...

  9. idea 断点线程_在IntelliJ IDEA中多线程并发代码的调试方法详解

    通常来说,多线程的并发及条件断点的debug是很难完成的,或许本篇文章会给你提供一个友好的调试方法.让你在多线程开发过程中的调试更加的有的放矢. 我们将通过一个例子来学习.在这里,我编写了一个多线程程 ...

最新文章

  1. 云栖社区 正文 永久免费SSL安全证书Letsencrypt安装使用方法
  2. python实验过程心得体会_20192416 实验四《Python程序设计》综合实践报告
  3. 复制SQLSERVER数据库文件
  4. poj 2886Who Gets the Most Candies?
  5. (四)Neo4j删除数据需要注意的问题
  6. 物联网、工业互联网大数据的特点
  7. OAuth2.0_JWT令牌-生成令牌和校验令牌_Spring Security OAuth2.0认证授权---springcloud工作笔记148
  8. 你如何删除ActiveRecord对象?
  9. 在售后技术服务里,Kubernetes到底是什么? | 凌云时刻
  10. MySQL5.7.32 64位解压缩版 windows操作系统安装教程图解
  11. 债券久期为什么难理解
  12. linux在用户登陆界面出现不断循环解决方法
  13. 服务器端查看图片库 eog
  14. 论文整理:Probabilistic Logic Neural Networks for Reasoning
  15. shader拖尾_拖尾效果 - LouisSong - 博客园
  16. 安装lux:推荐一款网页视频下载工具。并简单使用。(win)
  17. 运行django项目报错Couldn‘t import Django
  18. android声音播放函数双声道合并,Android音频编辑之音频合成功能
  19. 推荐几本这个系列封面的编程书,涉及Python、计算机图形学、Linux
  20. 【友情链接NO.0000?】大佬们的博客(°ー°〃)

热门文章

  1. 赠与大学毕业生_如何出售或赠与您的Kindle
  2. js+html 实现关系拓扑图
  3. SAP中图文展示分摊和分配的区别
  4. uniapp 使用pdf.js 加载本地pdf文件报错问题
  5. 华为2020软件精英挑战赛初赛、复赛、决赛代码+心得分享
  6. 人脸识别 人脸实名认证
  7. 易企秀 背景音乐 下载
  8. 用中文把玩Google开源的Deep-Learning项目word2vec
  9. Java WebSocket 基础 建立端点
  10. 利用HTML5新特性实现拖拽交换表格单元格元素