转载请注明出处:http://www.cnblogs.com/skywang12345/p/3533995.html

CyclicBarrier简介

CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。

注意比较CountDownLatch和CyclicBarrier:
(01) CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。
(02) CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。

CyclicBarrier函数列表

CyclicBarrier(int parties)
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
CyclicBarrier(int parties, Runnable barrierAction)
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。int await()
在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
int await(long timeout, TimeUnit unit)
在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。
int getNumberWaiting()
返回当前在屏障处等待的参与者数目。
int getParties()
返回要求启动此 barrier 的参与者数目。
boolean isBroken()
查询此屏障是否处于损坏状态。
void reset()
将屏障重置为其初始状态。

CyclicBarrier数据结构

CyclicBarrier的UML类图如下:

CyclicBarrier是包含了"ReentrantLock对象lock"和"Condition对象trip",它是通过独占锁实现的。下面通过源码去分析到底是如何实现的。

CyclicBarrier源码分析(基于JDK1.7.0_40)

CyclicBarrier完整源码(基于JDK1.7.0_40)

 View Code

CyclicBarrier是通过ReentrantLock(独占锁)和Condition来实现的。下面,我们分析CyclicBarrier中3个核心函数: 构造函数, await()作出分析。

1. 构造函数

CyclicBarrier的构造函数共2个:CyclicBarrier 和 CyclicBarrier(int parties, Runnable barrierAction)。第1个构造函数是调用第2个构造函数来实现的,下面第2个构造函数的源码。

public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();// parties表示“必须同时到达barrier的线程个数”。this.parties = parties;// count表示“处在等待状态的线程个数”。this.count = parties;// barrierCommand表示“parties个线程到达barrier时,会执行的动作”。this.barrierCommand = barrierAction;
}

2. 等待函数

CyclicBarrier.java中await()方法如下:

public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen;}
}

说明:await()是通过dowait()实现的。

private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;// 获取“独占锁(lock)”lock.lock();try {// 保存“当前的generation”final Generation g = generation;// 若“当前generation已损坏”,则抛出异常。if (g.broken)throw new BrokenBarrierException();// 如果当前线程被中断,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}// 将“count计数器”-1int index = --count;// 如果index=0,则意味着“有parties个线程到达barrier”。if (index == 0) {  // trippedboolean ranAction = false;try {// 如果barrierCommand不为null,则执行该动作。final Runnable command = barrierCommand;if (command != null)command.run();ranAction = true;// 唤醒所有等待线程,并更新generation。nextGeneration();return 0;} finally {if (!ranAction)breakBarrier();}}// 当前线程一直阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,// 当前线程才继续执行。for (;;) {try {// 如果不是“超时等待”,则调用awati()进行等待;否则,调用awaitNanos()进行等待。if (!timed)trip.await();else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {// 如果等待过程中,线程被中断,则执行下面的函数。if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {Thread.currentThread().interrupt();}}// 如果“当前generation已经损坏”,则抛出异常。if (g.broken)throw new BrokenBarrierException();// 如果“generation已经换代”,则返回index。if (g != generation)return index;// 如果是“超时等待”,并且时间已到,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {// 释放“独占锁(lock)”lock.unlock();}
}

说明:dowait()的作用就是让当前线程阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,当前线程才继续执行。
(01) generation是CyclicBarrier的一个成员遍历,它的定义如下:

private Generation generation = new Generation();private static class Generation {boolean broken = false;
}

在CyclicBarrier中,同一批的线程属于同一代,即同一个Generation;CyclicBarrier中通过generation对象,记录属于哪一代。
当有parties个线程到达barrier,generation就会被更新换代。

(02) 如果当前线程被中断,即Thread.interrupted()为true;则通过breakBarrier()终止CyclicBarrier。breakBarrier()的源码如下:

private void breakBarrier() {generation.broken = true;count = parties;trip.signalAll();
}

breakBarrier()会设置当前中断标记broken为true,意味着“将该Generation中断”;同时,设置count=parties,即重新初始化count;最后,通过signalAll()唤醒CyclicBarrier上所有的等待线程。

(03) 将“count计数器”-1,即--count;然后判断是不是“有parties个线程到达barrier”,即index是不是为0。
当index=0时,如果barrierCommand不为null,则执行该barrierCommand,barrierCommand就是我们创建CyclicBarrier时,传入的Runnable对象。然后,调用nextGeneration()进行换代工作,nextGeneration()的源码如下:

private void nextGeneration() {trip.signalAll();count = parties;generation = new Generation();
}

首先,它会调用signalAll()唤醒CyclicBarrier上所有的等待线程;接着,重新初始化count;最后,更新generation的值。

(04) 在for(;;)循环中。timed是用来表示当前是不是“超时等待”线程。如果不是,则通过trip.await()进行等待;否则,调用awaitNanos()进行超时等待。

CyclicBarrier的使用示例

示例1
新建5个线程,这5个线程达到一定的条件时,它们才继续往后运行。

 1 import java.util.concurrent.CyclicBarrier;2 import java.util.concurrent.BrokenBarrierException;3 4 public class CyclicBarrierTest1 {5 6     private static int SIZE = 5;7     private static CyclicBarrier cb;8     public static void main(String[] args) {9
10         cb = new CyclicBarrier(SIZE);
11
12         // 新建5个任务
13         for(int i=0; i<SIZE; i++)
14             new InnerThread().start();
15     }
16
17     static class InnerThread extends Thread{
18         public void run() {
19             try {
20                 System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");
21
22                 // 将cb的参与者数量加1
23                 cb.await();
24
25                 // cb的参与者数量等于5时,才继续往后执行
26                 System.out.println(Thread.currentThread().getName() + " continued.");
27             } catch (BrokenBarrierException e) {
28                 e.printStackTrace();
29             } catch (InterruptedException e) {
30                 e.printStackTrace();
31             }
32         }
33     }
34 }

运行结果

Thread-1 wait for CyclicBarrier.
Thread-2 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
Thread-0 continued.
Thread-4 continued.
Thread-2 continued.
Thread-3 continued.
Thread-1 continued.

结果说明:主线程中新建了5个线程,所有的这些线程都调用cb.await()等待。所有这些线程一直等待,直到cb中所有线程都达到barrier时,这些线程才继续运行!

示例2

新建5个线程,当这5个线程达到一定的条件时,执行某项任务。

 1 import java.util.concurrent.CyclicBarrier;2 import java.util.concurrent.BrokenBarrierException;3 4 public class CyclicBarrierTest2 {5 6     private static int SIZE = 5;7     private static CyclicBarrier cb;8     public static void main(String[] args) {9
10         cb = new CyclicBarrier(SIZE, new Runnable () {
11             public void run() {
12                 System.out.println("CyclicBarrier's parties is: "+ cb.getParties());
13             }
14         });
15
16         // 新建5个任务
17         for(int i=0; i<SIZE; i++)
18             new InnerThread().start();
19     }
20
21     static class InnerThread extends Thread{
22         public void run() {
23             try {
24                 System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");
25
26                 // 将cb的参与者数量加1
27                 cb.await();
28
29                 // cb的参与者数量等于5时,才继续往后执行
30                 System.out.println(Thread.currentThread().getName() + " continued.");
31             } catch (BrokenBarrierException e) {
32                 e.printStackTrace();
33             } catch (InterruptedException e) {
34                 e.printStackTrace();
35             }
36         }
37     }
38 }

运行结果

Thread-1 wait for CyclicBarrier.
Thread-2 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
CyclicBarrier's parties is: 5
Thread-0 continued.
Thread-4 continued.
Thread-2 continued.
Thread-3 continued.
Thread-1 continued.

结果说明:主线程中新建了5个线程,所有的这些线程都调用cb.await()等待。所有这些线程一直等待,直到cb中所有线程都达到barrier时,执行新建cb时注册的Runnable任务。

转载于:https://www.cnblogs.com/kexianting/p/8552578.html

Java锁--CyclicBarrier相关推荐

  1. 面试必会系列 - 1.5 Java 锁机制

    本文已收录至 github,完整图文:https://github.com/HanquanHq/MD-Notes 面试必会系列专栏:https://blog.csdn.net/sinat_424833 ...

  2. java锁实现_Java锁实现

    java锁实现 我们都将第三方库用作开发的正常部分. 通常,我们无法控制其内部. JDK随附的库是一个典型示例. 这些库中的许多库都使用锁来管理争用. JDK锁具有两种实现. 一个使用原子CAS样式指 ...

  3. Java锁-Synchronized深层剖析

    Java锁-Synchronized深层剖析 前言 Java锁的问题,可以说是每个JavaCoder绕不开的一道坎.如果只是粗浅地了解Synchronized等锁的简单应用,那么就没什么谈的了,也不建 ...

  4. java锁的种类以及辨析(转载)

    java锁的种类以及辨析(一):自旋锁 锁作为并发共享数据,保证一致性的工具,在JAVA平台有多种实现(如 synchronized 和 ReentrantLock等等 ) .这些已经写好提供的锁为我 ...

  5. Java锁详解:“独享锁/共享锁+公平锁/非公平锁+乐观锁/悲观锁+线程锁”

    在Java并发场景中,会涉及到各种各样的锁如公平锁,乐观锁,悲观锁等等,这篇文章介绍各种锁的分类: 公平锁/非公平锁 可重入锁 独享锁/共享锁 乐观锁/悲观锁 分段锁 自旋锁 线程锁 乐观锁 VS 悲 ...

  6. 「基本功」不可不说的Java“锁”事

    并发编程是Java程序员必备基本功,今天"基本功"专栏向大家推荐一篇深入解析Java锁机制的文章.Enjoy! 前言 Java提供了种类丰富的锁,每种锁因其特性的不同,在适当的场景 ...

  7. 深入理解 Java 锁与线程阻塞

    相信大家对线程锁和线程阻塞都很了解,无非就是 synchronized, wait/notify 等, 但是你有仔细想过 Java 虚拟机是如何实现锁和阻塞的呢?它们之间又有哪些联系呢?如果感兴趣的话 ...

  8. 转 : 深入解析Java锁机制

    深入解析Java锁机制 https://mp.weixin.qq.com/s?__biz=MzU0OTE4MzYzMw%3D%3D&mid=2247485524&idx=1&s ...

  9. java 锁的类型_Java锁的种类 - shawnplaying的个人页面 - OSCHINA - 中文开源技术交流社区...

    Java锁和并发需要结合在一块了理解,涉及到了多个话题. 本文主要参考了 http://ifeve.com/java_lock_see1/ 但是我认为原文中有某些错误,我在下面的代码中做了修改. 公平 ...

  10. Java锁之可重入锁和递归锁

    Java锁之可重入锁和递归锁 目录 Java锁之可重入锁和递归锁基本概念 Java锁之可重入锁和递归锁代码验证 小结 理论,代码,小结,学习三板斧. 1. Java锁之可重入锁和递归锁基本概念 可重入 ...

最新文章

  1. NR 5G UE初始接入流程
  2. 如何动态调用WebServices
  3. java windows系统监控_Windows资源监控工具大全
  4. 集合计数 二项式反演_对计数数据使用负二项式
  5. 在c语言中a 这条语句的作用,C语言复习第二章
  6. C# WebService 基础实例
  7. webflow ajax,java开发之spring webflow实现上传单个文件及多个文件功能实例
  8. mysqlin索引失效的情况
  9. linux 键盘 键值0x1e,Linux文本处理三剑客之awk学习笔记11:选项、内置变量和内置函数...
  10. window oracle 命令,windows下Oracle命令
  11. gephi使用教程pdf
  12. 【R语言】R语言在安装与下载时遇到的问题与解决方法
  13. Java Web开发后端常用技术汇总
  14. 【JAVA程序设计】(C00019)javaweb高校社团管理系统
  15. 《三级医院评审标准(2020年版)》及解读:医疗机构要不断加强信息化建设
  16. php 微信模拟登陆给用户发送消息(文字,图片,图文)
  17. 固件-驱动-软件 区别
  18. 数据结构 - 主席树
  19. qq邮件中插入html,qq邮箱如何添加标签
  20. Vue-routers(步骤)

热门文章

  1. 文字描边加粗_这些PPT描边字,效果好到没朋友~
  2. mySQL常用操作及基础知识
  3. android studio for android learning (五) 最新Activity理解与其生命周期
  4. 在linux centos中加入中文输入法
  5. WebShell脚本检测机器学习二
  6. myeclipse安装maven
  7. Raki的读paper小记:Kernel Continual Learning
  8. Raki的读paper小记:BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding
  9. 数据库进行大数据量插入/更新操作
  10. OO第二次博客——电梯系列总结