CyclicBarrier源码解读
目录
- `CyclicBarrier` 概述
- `CyclicBarrier` 源码
- `CyclicBarrier` 类的属性
- `CyclicBarrier` 构造函数
- `await()` 方法
- `dowait()` 方法
- `CyclicBarrier` 使用示例
- `CountDownLatch` 和 `CyclicBarrier` 区别
- 共同点
- 不同点
CyclicBarrier
概述
CyclicBarrier
来自于jdk 1.5
的JUC
包,也被称作循环屏障,同步屏障,循环屏障,被作为一种多线程并发控制工具来使用CyclicBarrier
是使用了ReentrantLock
和Condition
来完成屏障效果,本质上底层还是基于AQS
的CyclicBarrier
:利用CyclicBarrier
类可以实现一组线程相互等待
,当所有线程都到达某个屏障点后再进行后续的操作。也就是让一组线程到达一个同步点后再一起继续运行,在其中任意一个线程未达到同步点,其他到达的线程均会被阻塞CyclicBarrier
字面意思是“可重复使用的栅栏”,CyclicBarrier
相比CountDownLatch
来说要简单很多,它是ReentrantLock
和Condition
的组合使用CyclicBarrier
和CountDownLatch
是不是很像,只是CyclicBarrier
可以有不止一个栅栏,因为它的栅栏(Barrier
)可以重复使用(Cyclic
)
CyclicBarrier
源码
CyclicBarrier
类的属性
// 同步操作锁
private final ReentrantLock lock = new ReentrantLock();// 达到屏障并且不能放行的线程在trip条件变量上等待
private final Condition trip = lock.newCondition();// 每次拦截的线程数
private final int parties;// 换代前执行的任务
private final Runnable barrierCommand;// 表示栅栏的当前代
private Generation generation = new Generation();/*** count表示打破一次屏障还需要的线程数量,初始化值等于parties* 每当有一个线程到达屏障调用await方法之后,count就就递减1* 当count 为0 时,表示所需要的所有线程都到了屏障,此时屏障可以被打破* <p>* 变量parties始终用来记录所需总线程个数,而当count 值变为0后,又会将parties 的值赋给count,从而进行复用。* 使用两个变量的原因就是为了实现CyclicBarrier 的可复用性*/
private int count;// 静态内部类Generation
private static class Generation {boolean broken = false;
}
CyclicBarrier
构造函数
// 创建拦截指定线程数的CyclicBarrier对象,并可以指定在所有线程都越过栅栏后的执行动作
public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;// 设置所有线程到达栅栏后,将执行的操作this.barrierCommand = barrierAction;
}public CyclicBarrier(int parties) {this(parties, null);
}
parties
表示每次拦截的线程数,该值在构造时进行赋值count
是内部计数器,它的初始值和parties
相同,以后随着每次await()
方法的调用而减1
,直到减为0
就将所有线程唤醒
CyclicBarrier
有一个静态内部类 Generation
,该类的对象代表栅栏的当前代,就像玩游戏时代表的本局游戏,利用它可以实现循环等待。barrierCommand
表示换代前执行的任务,当 count
减为 0
时表示本局游戏结束,需要转到下一局。在转到下一局游戏之前会将所有阻塞的线程唤醒,在唤醒所有线程之前你可以通过指定 barrierCommand
来执行自己的任务
await()
方法
CyclicBarrier
类最主要的功能就是使先到达屏障点的线程阻塞并等待后面的线程,其中它提供了两种等待的方法,分别是定时等待和非定时等待
// 非定时等待
public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe);}
}// 定时等待
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {return dowait(true, unit.toNanos(timeout));
}
dowait()
方法
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;lock.lock();try {final Generation g = generation;// 检查当前栅栏是否被打翻if (g.broken)throw new BrokenBarrierException();// 检查当前线程是否被中断if (Thread.interrupted()) {// 如果当前线程被中断会做以下三件事// 1.打翻当前栅栏// 2.唤醒拦截的所有线程// 3.抛出中断异常breakBarrier();throw new InterruptedException();}// 每次都将计数器的值减 1int index = --count;// 计数器的值减为 0 ,表示当前线程是最后一个达到屏障的线程,所需要的所有线程都到达了屏障点,则需唤醒所有线程并转换到下一代if (index == 0) { // 任务是否被执行标志boolean ranAction = false;try {// 获取所有线程到达栅栏后要执行的任务final Runnable command = barrierCommand;if (command != null)// 执行 command的run方法command.run();ranAction = true;// 更新栅栏的状态并唤醒所有线程nextGeneration();return 0;} finally {// 如果执行栅栏任务失败了if (!ranAction)// 设置当前代的 broken 状态为 true,唤醒所有线程breakBarrier();}}// 如果计数器不为 0 则执行此循环,那么表示当前线程不是最后一个达到屏障的线程,可能需要等待for (;;) {try {// 如果是非超时等待if (!timed)// 当前线程在 trip 条件变量上等待,直到被中断或者被唤醒trip.await();// 否则,就是超时等待,如果超时时间大于0 else if (nanos > 0L)// 当前线程在trip条件变量上超时等待最多nanos纳秒,直到被中断或者被唤醒或者超时等待完毕// 返回nanos,表示剩余超时等待时间nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {// 若当前线程在等待期间被中断则打翻栅栏唤醒其他线程if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {// 若在捕获中断异常前已经完成在栅栏上的等待, 则直接调用中断操作Thread.currentThread().interrupt();}}// 如果线程因为打翻栅栏操作而被唤醒则抛出异常if (g.broken)throw new BrokenBarrierException();// 如果线程因为换代操作而被唤醒则返回计数器的值if (g != generation)return index;// 如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}
}
- 在
dowait
方法中每次都将count
减1
,减完后立马进行判断看看是否等于0
- 如果等于
0
的话就会先去执行之前指定好的任务,执行完之后再调用nextGeneration
方法将栅栏转到下一代,在该方法中会将所有线程唤醒,将计数器的值重新设为parties
,最后会重新设置栅栏代次,在执行完nextGeneration
方法之后就意味着游戏进入下一局 - 如果计数器此时还不等于
0
的话就进入for
循环,根据参数来决定是调trip.awaitNanos(nanos)
还是trip.await()
方法,这两方法对应着定时和非定时等待 - 如果在等待过程中当前线程被中断就会执行
breakBarrier
方法,该方法叫做打破栅栏,意味着游戏在中途被掐断,设置generation
的broken
状态为true
并唤醒所有线程。同时这也说明在等待过程中有任何一个线程被中断整盘游戏就结束,所有之前被阻塞的线程都会被唤醒 - 线程醒来后会执行下面三个判断,看看是否因为调用
breakBarrier
方法而被唤醒,如果是则抛出异常;看看是否是正常的换代操作而被唤醒,如果是则返回计数器的值;看看是否因为超时而被唤醒,如果是的话就调用breakBarrier
打破栅栏并抛出异常。注意,如果其中有任何一个线程因为等待超时而退出,那么整盘游戏也会结束,其他线程都会被唤醒
CyclicBarrier
使用示例
public class CyclicBarrierTest {public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new YourThread());ExecutorService executor = Executors.newFixedThreadPool(3);executor.execute(new MyThread(cyclicBarrier, "test1", 1));executor.execute(new MyThread(cyclicBarrier, "test2", 3));executor.execute(new MyThread(cyclicBarrier, "test3", 6));executor.shutdown();}public static class MyThread implements Runnable {private final CyclicBarrier cyclicBarrier;private final String name;private final long timeOut;public MyThread(CyclicBarrier cyclicBarrier, String name, long timeOut) {this.cyclicBarrier = cyclicBarrier;this.name = name;this.timeOut = timeOut;}@Overridepublic void run() {try {Thread.sleep(timeOut * 1000);System.out.println(printDate() + " " + name + " come in!");cyclicBarrier.await();System.out.println(printDate() + " " + name + " ending!");} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}}public static class YourThread implements Runnable {@Overridepublic void run() {System.out.println(printDate() + " " + "dangdang!");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}}static String printDate() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");return sdf.format(new Date());}
}结果:
2021-05-25 00:49:02 826 test1 come in!
2021-05-25 00:49:04 786 test2 come in!
2021-05-25 00:49:07 776 test3 come in!2021-05-25 00:49:07 778 dangdang!2021-05-25 00:49:09 793 test1 ending!
2021-05-25 00:49:09 793 test2 ending!
2021-05-25 00:49:09 793 test3 ending!
CountDownLatch
和 CyclicBarrier
区别
共同点
二者都可以实现一组线程在到达某个条件之前进行等待,它们内部都有一个计数器,当计数器的值不断的减为 0
的时候所有阻塞的线程将会被唤醒
CountDownLatch
:一个或者多个线程,等待其他多个线程完成某件事情之后才能执行CyclicBarrier
:多个线程互相等待
,直到到达同一个同步点,再继续一起执行
不同点
- 对于
CountDownLatch
来说,重点是一个或者多个线程等待,而其他的N
个线程在完成某件事情之后,可以终止,也可以等待。而对于CyclicBarrier
,重点是多个线程,在任意一个线程没有完成,所有的线程都必须互相等待,然后继续一起执行 CountDownLatch
是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而CyclicBarrier
更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行CountdownLatch
利用继承AQS
的共享锁来实现的。CyclicBarrier
则利用ReentrantLock
的Condition
来阻塞和通知线程。但是追根溯源,这两个组件也是依赖AQS
实现的CountDownLatch
的计数器count
只能使用一次,而CyclicBarrier
的屏障可以使用reset()
方法重置,也会自动重置,所以CyclicBarrier
能处理更为复杂的业务场景
CyclicBarrier源码解读相关推荐
- Bert系列(二)——源码解读之模型主体
本篇文章主要是解读模型主体代码modeling.py.在阅读这篇文章之前希望读者们对bert的相关理论有一定的了解,尤其是transformer的结构原理,网上的资料很多,本文内容对原理部分就不做过多 ...
- Bert系列(三)——源码解读之Pre-train
https://www.jianshu.com/p/22e462f01d8c pre-train是迁移学习的基础,虽然Google已经发布了各种预训练好的模型,而且因为资源消耗巨大,自己再预训练也不现 ...
- linux下free源码,linux命令free源码解读:Procps free.c
linux命令free源码解读 linux命令free源码解读:Procps free.c 作者:isayme 发布时间:September 26, 2011 分类:Linux 我们讨论的是linux ...
- nodeJS之eventproxy源码解读
1.源码缩影 !(function (name, definition) { var hasDefine = typeof define === 'function', //检查上下文环境是否为AMD ...
- PyTorch 源码解读之即时编译篇
点击上方"AI遇见机器学习",选择"星标"公众号 重磅干货,第一时间送达 作者丨OpenMMLab 来源丨https://zhuanlan.zhihu.com/ ...
- Alamofire源码解读系列(九)之响应封装(Response)
本篇主要带来Alamofire中Response的解读 前言 在每篇文章的前言部分,我都会把我认为的本篇最重要的内容提前讲一下.我更想同大家分享这些顶级框架在设计和编码层次究竟有哪些过人的地方?当然, ...
- Feflow 源码解读
Feflow 源码解读 Feflow(Front-end flow)是腾讯IVWEB团队的前端工程化解决方案,致力于改善多类型项目的开发流程中的规范和非业务相关的问题,可以让开发者将绝大部分精力集中在 ...
- spring-session源码解读 sesion
2019独角兽企业重金招聘Python工程师标准>>> spring-session源码解读 sesion 博客分类: java spring 摘要: session通用策略 Ses ...
- 前端日报-20160527-underscore 源码解读
underscore 源码解读 API文档浏览器 JavaScript 中加号操作符细节 抛弃 jQuery,拥抱原生 JS 从 0 开始学习 GitHub 系列之「加入 GitHub」 js实现克隆 ...
- php service locator,Yii源码解读-服务定位器(ServiceLocator)
SL的目的也是解耦,并且非常适合基于服务和组件的应用. Service Locator充当了一个运行时的链接器的角色,可以在运行时动态地修改一个类所要选用的服务, 而不必对类作任何的修改. 一个类可以 ...
最新文章
- 注册博客第一天,有些激动
- 未捕获ReferenceError:未定义$?
- SSRS:服务器更名后,ReportingService无法使用和登录的解决办法
- 用iframe实现局部刷新的各种跳转方法(网上总结)
- 在CentOS下安装crontab服务
- Python的Mixins机制
- boost::mp11::mp_similar相关用法的测试程序
- excel数据库_EXCEL憋出大招,逆袭大数据的黑马出现了
- SpringMVC 单文件上传与多文件上传
- js值发送给php,将JSON数据从Javascript发送到PHP?
- 如何理解halcon 算子get_grayval 、set_grayval 逐行读取和逐行写入
- Prototype使用$F()函数
- C++制作鼠标连点器
- 在学习时,遇到in module ssbuild. File is included in 4 contexts
- httpd三种MPM的原理剖析
- linux怎么停止ping命令
- android 字体外发光,CSS3 霓虹外发光字体效果
- 关于虚拟机中IPI中断的思考
- 【linux技术】记一次虚拟机vmware里 Centos7开机故障:sd 0:0:0:0: [sda] Assuming drive cache: write through
- GUID 分区表详解