CyclicBarrier简介

CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。

注意比较CountDownLatch和CyclicBarrier:
(01) CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。
(02) CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。

CyclicBarrier函数列表

CyclicBarrier(int parties)
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
CyclicBarrier(int parties, Runnable barrierAction)
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。int await()
在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
int await(long timeout, TimeUnit unit)
在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。
int getNumberWaiting()
返回当前在屏障处等待的参与者数目。
int getParties()
返回要求启动此 barrier 的参与者数目。
boolean isBroken()
查询此屏障是否处于损坏状态。
void reset()
将屏障重置为其初始状态。

CyclicBarrier数据结构

CyclicBarrier的UML类图如下:

CyclicBarrier是包含了"ReentrantLock对象lock"和"Condition对象trip",它是通过独占锁实现的。下面通过源码去分析到底是如何实现的。

CyclicBarrier源码分析(基于JDK1.7.0_40)

CyclicBarrier完整源码(基于JDK1.7.0_40)

 View Code

CyclicBarrier是通过ReentrantLock(独占锁)和Condition来实现的。下面,我们分析CyclicBarrier中3个核心函数: 构造函数, await()作出分析。

1. 构造函数

CyclicBarrier的构造函数共2个:CyclicBarrier 和 CyclicBarrier(int parties, Runnable barrierAction)。第1个构造函数是调用第2个构造函数来实现的,下面第2个构造函数的源码。

public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();// parties表示“必须同时到达barrier的线程个数”。this.parties = parties;// count表示“处在等待状态的线程个数”。this.count = parties;// barrierCommand表示“parties个线程到达barrier时,会执行的动作”。this.barrierCommand = barrierAction;
}

2. 等待函数

CyclicBarrier.java中await()方法如下:

public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen;}
}

说明:await()是通过dowait()实现的。

private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;// 获取“独占锁(lock)”lock.lock();try {// 保存“当前的generation”final Generation g = generation;// 若“当前generation已损坏”,则抛出异常。if (g.broken)throw new BrokenBarrierException();// 如果当前线程被中断,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}// 将“count计数器”-1int index = --count;// 如果index=0,则意味着“有parties个线程到达barrier”。if (index == 0) {  // trippedboolean ranAction = false;try {// 如果barrierCommand不为null,则执行该动作。final Runnable command = barrierCommand;if (command != null)command.run();ranAction = true;// 唤醒所有等待线程,并更新generation。nextGeneration();return 0;} finally {if (!ranAction)breakBarrier();}}// 当前线程一直阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,// 当前线程才继续执行。for (;;) {try {// 如果不是“超时等待”,则调用awati()进行等待;否则,调用awaitNanos()进行等待。if (!timed)trip.await();else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {// 如果等待过程中,线程被中断,则执行下面的函数。if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {Thread.currentThread().interrupt();}}// 如果“当前generation已经损坏”,则抛出异常。if (g.broken)throw new BrokenBarrierException();// 如果“generation已经换代”,则返回index。if (g != generation)return index;// 如果是“超时等待”,并且时间已到,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {// 释放“独占锁(lock)”lock.unlock();}
}

说明:dowait()的作用就是让当前线程阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,当前线程才继续执行。
(01) generation是CyclicBarrier的一个成员遍历,它的定义如下:

private Generation generation = new Generation();private static class Generation {boolean broken = false;
}

在CyclicBarrier中,同一批的线程属于同一代,即同一个Generation;CyclicBarrier中通过generation对象,记录属于哪一代。
当有parties个线程到达barrier,generation就会被更新换代。

(02) 如果当前线程被中断,即Thread.interrupted()为true;则通过breakBarrier()终止CyclicBarrier。breakBarrier()的源码如下:

private void breakBarrier() {generation.broken = true;count = parties;trip.signalAll();
}

breakBarrier()会设置当前中断标记broken为true,意味着“将该Generation中断”;同时,设置count=parties,即重新初始化count;最后,通过signalAll()唤醒CyclicBarrier上所有的等待线程。

(03) 将“count计数器”-1,即--count;然后判断是不是“有parties个线程到达barrier”,即index是不是为0。
当index=0时,如果barrierCommand不为null,则执行该barrierCommand,barrierCommand就是我们创建CyclicBarrier时,传入的Runnable对象。然后,调用nextGeneration()进行换代工作,nextGeneration()的源码如下:

private void nextGeneration() {trip.signalAll();count = parties;generation = new Generation();
}

首先,它会调用signalAll()唤醒CyclicBarrier上所有的等待线程;接着,重新初始化count;最后,更新generation的值。

(04) 在for(;;)循环中。timed是用来表示当前是不是“超时等待”线程。如果不是,则通过trip.await()进行等待;否则,调用awaitNanos()进行超时等待。

CyclicBarrier的使用示例

示例1
新建5个线程,这5个线程达到一定的条件时,它们才继续往后运行。

 1 import java.util.concurrent.CyclicBarrier;2 import java.util.concurrent.BrokenBarrierException;3 4 public class CyclicBarrierTest1 {5 6     private static int SIZE = 5;7     private static CyclicBarrier cb;8     public static void main(String[] args) {9
10         cb = new CyclicBarrier(SIZE);
11
12         // 新建5个任务
13         for(int i=0; i<SIZE; i++)
14             new InnerThread().start();
15     }
16
17     static class InnerThread extends Thread{
18         public void run() {
19             try {
20                 System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");
21
22                 // 将cb的参与者数量加1
23                 cb.await();
24
25                 // cb的参与者数量等于5时,才继续往后执行
26                 System.out.println(Thread.currentThread().getName() + " continued.");
27             } catch (BrokenBarrierException e) {
28                 e.printStackTrace();
29             } catch (InterruptedException e) {
30                 e.printStackTrace();
31             }
32         }
33     }
34 }

运行结果

Thread-1 wait for CyclicBarrier.
Thread-2 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
Thread-0 continued.
Thread-4 continued.
Thread-2 continued.
Thread-3 continued.
Thread-1 continued.

结果说明:主线程中新建了5个线程,所有的这些线程都调用cb.await()等待。所有这些线程一直等待,直到cb中所有线程都达到barrier时,这些线程才继续运行!

示例2

新建5个线程,当这5个线程达到一定的条件时,执行某项任务。

 1 import java.util.concurrent.CyclicBarrier;2 import java.util.concurrent.BrokenBarrierException;3 4 public class CyclicBarrierTest2 {5 6     private static int SIZE = 5;7     private static CyclicBarrier cb;8     public static void main(String[] args) {9
10         cb = new CyclicBarrier(SIZE, new Runnable () {
11             public void run() {
12                 System.out.println("CyclicBarrier's parties is: "+ cb.getParties());
13             }
14         });
15
16         // 新建5个任务
17         for(int i=0; i<SIZE; i++)
18             new InnerThread().start();
19     }
20
21     static class InnerThread extends Thread{
22         public void run() {
23             try {
24                 System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");
25
26                 // 将cb的参与者数量加1
27                 cb.await();
28
29                 // cb的参与者数量等于5时,才继续往后执行
30                 System.out.println(Thread.currentThread().getName() + " continued.");
31             } catch (BrokenBarrierException e) {
32                 e.printStackTrace();
33             } catch (InterruptedException e) {
34                 e.printStackTrace();
35             }
36         }
37     }
38 }

运行结果

Thread-1 wait for CyclicBarrier.
Thread-2 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
CyclicBarrier's parties is: 5
Thread-0 continued.
Thread-4 continued.
Thread-2 continued.
Thread-3 continued.
Thread-1 continued.

结果说明:主线程中新建了5个线程,所有的这些线程都调用cb.await()等待。所有这些线程一直等待,直到cb中所有线程都达到barrier时,执行新建cb时注册的Runnable任务。

转载于:https://www.cnblogs.com/duanxz/p/6063707.html

Java多线程系列--“JUC锁”10之 CyclicBarrier原理和示例相关推荐

  1. Java多线程系列---“JUC锁”01之 框架

    本章,我们介绍锁的架构:后面的章节将会对它们逐个进行分析介绍.目录如下: 01. Java多线程系列--"JUC锁"01之 框架 02. Java多线程系列--"JUC锁 ...

  2. Java多线程系列--“JUC锁”03之 公平锁(一)

    概要 本章对"公平锁"的获取锁机制进行介绍(本文的公平锁指的是互斥锁的公平锁),内容包括: 基本概念 ReentrantLock数据结构 参考代码 获取公平锁(基于JDK1.7.0 ...

  3. Java多线程系列--“JUC锁”05之 非公平锁

    转载自:http://www.cnblogs.com/skywang12345/p/3496651.html点击打开链接 概要 前面两章分析了"公平锁的获取和释放机制",这一章开始 ...

  4. Java多线程系列--“JUC锁”07之 LockSupport

    LockSupport介绍 LockSupport是JDK中比较底层的类,用来创建锁和其他同步工具类的基本线程阻塞原语.java锁和同步器框架的核心 AQS: AbstractQueuedSynchr ...

  5. Java多线程系列--“JUC原子类”03之 AtomicLongArray原子类

    概要 AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray这3个数组类型的原子类的原理和用法相似.本章以AtomicLongArray对数 ...

  6. Java多线程系列--“JUC原子类”01之 框架

    2019独角兽企业重金招聘Python工程师标准>>> Java多线程系列--"JUC原子类"01之 框架 根据修改的数据类型,可以将JUC包中的原子操作类可以分 ...

  7. Java多线程系列--“JUC线程池”06之 Callable和Future

    转载自  Java多线程系列--"JUC线程池"06之 Callable和Future Callable 和 Future 简介 Callable 和 Future 是比较有趣的一 ...

  8. Java多线程系列(七):并发容器的原理,7大并发容器详解、及使用场景

    之前谈过高并发编程系列: 高并发编程系列:4种常用Java线程锁的特点,性能比较.使用场景 高并发编程系列:CountDownLatch.Semaphore等4大并发工具类详解 高并发编程系列:4大J ...

  9. Java多线程系列(八):ConcurrentHashMap的实现原理(JDK1.7和JDK1.8)

    HashMap.CurrentHashMap 的实现原理基本都是BAT面试必考内容,阿里P8架构师谈:深入探讨HashMap的底层结构.原理.扩容机制深入谈过hashmap的实现原理以及在JDK 1. ...

最新文章

  1. android的xml置底_Android布局之xml设置
  2. 从五个方面做IT职业规划
  3. 网管软件——Net Meter V3.3中文版
  4. c执行cmd pdf2swf_PDF2SWF简单使用
  5. labelme数据增强_NO. 21 标注工具 amp; 合成数据生成工具
  6. VTK:隐式函数之ImplicitQuadric
  7. python数据挖掘学习笔记】十.Pandas、Matplotlib、PCA绘图实用代码补充
  8. Time complexity analysis of algorithms
  9. postgresql是如何求年龄的_负债累累如何度过难关?她依靠此法三年还清300多万债务!...
  10. 2019.11.27 阵列信号处理
  11. FPGA 20个例程篇:12.千兆网口实现MDIO接口读写
  12. 论BOM管理的若干重要问题
  13. NetWare网络操作系统
  14. 通向实在之路暂记002:毕达哥拉斯定理与平行公设
  15. 应用分发平台之苹果超级签名流程分析及API错误
  16. 【转载】经典SQL语句大全(绝对的经典)
  17. ISO8583报文(一)
  18. Part3-4-1 搭建自己的SSR
  19. HTML文字左侧留白,DIV CSS padding内补白(内边距)left right top bottom
  20. 数字VR虚拟博物馆的功能介绍

热门文章

  1. 喜欢宅在家里的人,有什么合适的工作做?
  2. 女方家长和男方家长第一次见面是怎么样的?
  3. 无需充电的太阳能汽车有多强?约17万起,续航高达1609km
  4. 企业主要培养三大方向的思考习惯
  5. 工作与生活如何平衡?
  6. 揭秘合伙创业做生意成功的密码?
  7. TrueBit白皮书解读
  8. SQL Server 2016 SP1中的新功能和增强功能
  9. 索引sql server_SQL Server报告– SQL Server索引利用率
  10. sql2018 ssas_如何使用SQL Server Analysis Services(SSAS)从头开始构建多维数据集