CyclicBarrier详解
CyclicBarrier介绍
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier),类似于CountDownLatch也是个计数器,不同的是CyclicBarrier要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。之所以用循环修饰,是因为在所有的线程释放彼此之后,这个屏障是可以重新使用的(reset()方法重置屏障点)。
CyclicBarrier,让一组线程到达一个同步点后再一起继续运行,在其中任意一个线程未达到同步点,其他到达的线程均会被阻塞。
CyclicBarrier是一种同步机制允许一组线程相互等待,等到所有线程都到达一个屏障点才退出await方法,它没有直接实现AQS而是借助ReentrantLock来实现的同步机制。它是可循环使用的,而CountDownLatch是一次性的,另外它体现的语义也跟CountDownLatch不同,CountDownLatch减少计数到达条件采用的是release方式,而CyclicBarrier走向屏障点(await)采用的是Acquire方式,Acquire是会阻塞的,这也实现了CyclicBarrier的另外一个特点,只要有一个线程中断那么屏障点就被打破,所有线程都将被唤醒(CyclicBarrier自己负责这部分实现,不是由AQS调度的),这样也避免了因为一个线程中断引起永远不能到达屏障点而导致其他线程一直等待。屏障点被打破的CyclicBarrier将不可再使用(会抛出BrokenBarrierException)除非执行reset操作。
CountDownLatch:一个或者多个线程,等待其他多个线程完成某件事情之后才能执行
CyclicBarrier:多个线程互相等待,直到到达同一个同步点,再继续一起执行
CyclicBarrier示例
package main.java.study;import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {public static class Soldier implements Runnable{private String soldier;private final CyclicBarrier cyclic;public Soldier(CyclicBarrier cyclic ,String soldier) {this.cyclic=cyclic;this.soldier = soldier;}@Overridepublic void run() {try {cyclic.await();doWork();cyclic.await();} catch (InterruptedException | BrokenBarrierException e) {// TODO: handle exceptione.printStackTrace();}}void doWork(){try {Thread.sleep(new Random().nextInt(10) * 1000);System.out.println(soldier + " done!");} catch (InterruptedException e) {// TODO: handle exceptione.printStackTrace();}}}public static class BarrierRun implements Runnable{boolean flag = false;int N;public BarrierRun(boolean flag,int n) {this.flag=flag;this.N =n;}@Overridepublic void run() {if (flag){System.out.println("soldier" + N + " done!");}else {System.out.println("soldier" + N + " collected!");flag = true;}}}public static void main (String[] args){final int N = 10;Thread[] all= new Thread[N];boolean flag=false;CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));System.out.println("begin gather:");for(int i = 0;i< N ;i++){System.out.println("soldier:" + i + " coming.");all[i] = new Thread( new Soldier(cyclic, "solder" + i));all[i].start();}}
}
执行结果:
begin gather:
soldier:0 coming.
soldier:1 coming.
soldier:2 coming.
soldier:3 coming.
soldier:4 coming.
soldier:5 coming.
soldier:6 coming.
soldier:7 coming.
soldier:8 coming.
soldier:9 coming.
soldier10 collected!
solder3 done!
solder9 done!
solder0 done!
solder7 done!
solder5 done!
solder4 done!
solder2 done!
solder1 done!
solder8 done!
solder6 done!
soldier10 done!
Soldier执行了2次 await()方法,第1次被唤醒,执行doWork(),然后再次await(),
CyclicBarrier源码
CyclicBarrier方法
//默认构造方法,参数表示拦截的线程数量
public CyclicBarrier(int parties) {this(parties, null);
}
//用于在线程到达同步点时,优先执行线程barrierAction
public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;
}
//可中断等待
public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}
}
//可超时等待
public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException {return dowait(true, unit.toNanos(timeout));
}
//打破屏障
private void breakBarrier() {generation.broken = true;count = parties;trip.signalAll();
}//屏障是否被打破
public boolean isBroken() {final ReentrantLock lock = this.lock;lock.lock();try {return generation.broken;} finally {lock.unlock();}
}
//重置
public void reset() {final ReentrantLock lock = this.lock;lock.lock();try {breakBarrier(); // break the current generationnextGeneration(); // start a new generation} finally {lock.unlock();}
}
//获取正在barrir处等待数,即已经到达数
public int getNumberWaiting() {final ReentrantLock lock = this.lock;lock.lock();try {return parties - count;} finally {lock.unlock();}
}
CyclicBarrier的数据结构
//用于标记每次屏障private static class Generation {boolean broken = false;}/** The lock for guarding barrier entry */private final ReentrantLock lock = new ReentrantLock();/** 等待直到全部到达 */private final Condition trip = lock.newCondition();/** The number of parties */private final int parties;/* The command to run when tripped */private final Runnable barrierCommand;/** The current generation */private Generation generation = new Generation();//仍然在被等待数,即未到达barrir处数private int count;
CyclicBarrier等待与唤醒
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;lock.lock();try {final Generation g = generation;//如果屏障已被打破,则退出if (g.broken)throw new BrokenBarrierException();//如果发生中断,则打破屏障if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}//未到达数减1int index = --count;if (index == 0) { // tripped//全部都已到达,则继续boolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null)command.run(); //如果有优先执行的命令,则运行ranAction = true; //Action已执行nextGeneration(); //准备下一次设置屏障return 0;} finally {if (!ranAction) //如果Action执行失败,则打破屏障breakBarrier();}}// 在屏障处等待............for (;;) {try {if (!timed) //未设置超时trip.await();else if (nanos > 0L)nanos = trip.awaitNanos(nanos); //设置超时} catch (InterruptedException ie) {if (g == generation && ! g.broken) { //仍是本次barrir,并且屏障未打破,则打破屏障。breakBarrier();throw ie;} else {// We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g != generation) //不是本次barrir,则返回return index;if (timed && nanos <= 0L) { //超时则打破屏障breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}}
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}/**
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
*/
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
ReentrantLock参考: ConditionObject源码
CyclicBarrier详解相关推荐
- Java并发编程之CyclicBarrier详解
简介 栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生.栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行.闭锁用于等待事件,而栅栏用于等待其他线程. CyclicBarrier ...
- Java并发工具类--CyclicBarrier详解
CyclicBarrier允许一组线程在到达某个栅栏点(common barrier point)互相等待,直到最后一个线程到达栅栏点,栅栏才会打开,处于阻塞状态的线程恢复继续执行. 举例 举个例子来 ...
- Java并发编程的艺术笔记(七)——CountDownLatch、CyclicBarrier详解
一.等待多线程完成的CountDownLatch CountDownLatch允许一个或多个线程等待其他线程完成操作,像加强版的join.(t.join()是等待t线程完成) 例: (1)开启多个线程 ...
- Java并发编程系列之CyclicBarrier详解
简介 jdk原文 A synchronization aid that allows a set of threads to all wait for each other to reach a co ...
- 多线程-并发工具类之CyclicBarrier详解
文章目录 简介 例子 实现原理 小结 简介 从字面意思理解,CyclicBarrier是回环屏障的意思,它可以让一组线程全部达到一个状态后再全部同时执行.这里之所以叫作回环是因为当所有等待线程执行完毕 ...
- JUC系列之CyclicBarrier详解
最近又在重读CyclicBarrier源码,并进行了深入分析,重点源码也自己跟过并做了一些注释,仅供大家参考. CyclicBarrier:回环栅栏(有人也称之为循环屏障),通过他可以让一组线程等待至 ...
- 40000+字超强总结?阿里P8把Java全栈知识体系详解整理成这份PDF
40000 +字长文总结,已将此文整理成PDF文档了,需要的见文后下载获取方式. 全栈知识体系总览 Java入门与进阶面向对象与Java基础 Java 基础 - 面向对象 Java 基础 - 知识点 ...
- CyclicBarrier 用法详解
CyclicBarrier 用法详解 CyclicBarrier使用场景 用于协调多个线程同步执行操作的场合,所有线程等待完成,然后一起做事情( 相互之间都准备好,然后一起做事情 ) 例如百米赛跑,必 ...
- java多线程学习-java.util.concurrent详解
http://janeky.iteye.com/category/124727 java多线程学习-java.util.concurrent详解(一) Latch/Barrier 博客分类: java ...
最新文章
- 深入浅出:对MySQL主从配置的一些总结
- properties文件不能输入中文
- openstack——使用命令行删除租户所有信息
- java warning 编译_关于性能:Java编译器警告会影响编译时间吗?
- 什么叫「人的格局」?是否有必要培养大的格局或怎么培养?
- Memcached 与 PHP 结合使用
- 理解新增贷款、M2、社会融资总量之间的关系
- 手动发布证书吊销列表
- 图论 —— 2-SAT 问题
- [Vue.js] 基础 -- Vue实例
- echarts js 删除框选数据_ECharts进行区域选择
- 华为在剑桥建芯片厂;小米公布出货量反驳调研机构; 中移动否认限制号 | 极客头条...
- python如何加密_Python如何玩转数据加密?
- javase二维数组笔记
- 2016计算机二级c语言题库,计算机二级c语言题库2016精选
- OpenSolaris系列文章之----投影仪设置
- 百度富媒体编辑器 使用
- LGP970刷机心得
- An operation on a socket could not be performed because the system lacked sufficient buffer space or
- 黎想深度访谈腾讯顶级产品经理的进阶之路——第五篇《匠心》