前言

  JDK中为了处理线程之间的同步问题,除了提供锁机制之外,还提供了几个非常有用的并发工具类:CountDownLatch、CyclicBarrier、Semphore、Exchanger、Phaser;
  CountDownLatch、CyclicBarrier、Semphore、Phaser 这四个工具类提供一种并发流程的控制手段;而Exchanger工具类则提供了在线程之间交换数据的一种手段。

简介

  CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

构造方法摘要

方法名称 说明
CyclicBarrier(int parties) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
CyclicBarrier(int parties, Runnable barrierAction) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。

方法摘要

方法名称 说明
public int await() throws
InterruptedException, BrokenBarrierException
在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。

返回:

到达的当前线程的索引,其中,索引 getParties() - 1 指示将到达的第一个线程,

零指示最后一个到达的线程.
public int await(long timeout,TimeUnit unit) throws
InterruptedException,BrokenBarrierException,
ITimeoutException
在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。
public void reset() 将屏障重置为其初始状态。如果所有参与者目前都在屏障处等待,则它们将返回,同时抛出一个 BrokenBarrierException。注意,在由于其他原因造成损坏之后,实行重置可能会变得很复杂;
public boolean isBroken() 查询此屏障是否处于损坏状态。
public int getNumberWaiting() 返回当前在屏障处等待的参与者数目。此方法主要用于调试和断言。
public int getParties() 返回要求启动此 barrier 的参与者数目。

注意:

  • 对于失败的同步尝试,CyclicBarrier 使用了一种要么全部要么全不 (all-or-none) 的破坏模式:

    如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么在该屏障点等待的其他所有线程也将通过 BrokenBarrierException(如果它们几乎同时被中断,则用 InterruptedException)以反常的方式离开。

  • 内存一致性效果:

    线程中调用 await() 之前的操作 happen-before 那些是屏障操作的一部份的操作,后者依次 happen-before 紧跟在从另一个线程中对应 await() 成功返回的操作。

@ Example1 屏障操作的例子
public static void main(String[] args) {//设置5个屏障,并且有屏障操作CyclicBarrier barrier = new CyclicBarrier(5,new Runnable() {@Overridepublic void run() {System.out.println("线程"+Thread.currentThread().getName()+"执行了屏障操作");        }});for(int i=0;i<5;i++){//创建5个线程Thread thread = new Thread(new MyRunable(barrier),"thread_"+i);thread.start();}
}
复制代码
class MyRunable implements Runnable{CyclicBarrier barrier;public MyRunable(CyclicBarrier barrier ){this.barrier = barrier; }@Overridepublic void run() {//一系列操作...System.out.println("线程 "+Thread.currentThread().getName()+" 到达了屏障点!");try {int index = barrier.await();if(index== (barrier.getParties()-1)){//第一个到达屏障点的线程,执行特殊操作....System.out.println("所有线程到达屏障点,线程 "+Thread.currentThread().getName()+" 被唤醒!!此线程是第一个到达屏障点");}else if(index == 0){//最后一个到达屏障点的线程System.out.println("所有线程到达屏障点,线程 "+Thread.currentThread().getName()+" 被唤醒!!此线程是最后一个到达屏障点");}else{System.out.println("所有线程到达屏障点,线程 "+Thread.currentThread().getName()+" 被唤醒!!");}} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}
}
复制代码

运行结果:

线程 thread_1 到达了屏障点!
线程 thread_4 到达了屏障点!
线程 thread_3 到达了屏障点!
线程 thread_0 到达了屏障点!
线程 thread_2 到达了屏障点!
线程thread_3执行了屏障操作
所有线程到达屏障点,线程 thread_3 被唤醒!!此线程是最后一个到达屏障点
所有线程到达屏障点,线程 thread_0 被唤醒!!
所有线程到达屏障点,线程 thread_4 被唤醒!!
所有线程到达屏障点,线程 thread_1 被唤醒!!此线程是第一个到达屏障点
所有线程到达屏障点,线程 thread_2 被唤醒!!
复制代码

 上面的例子,使用了传入屏障操作的Runable参数的构造方法,

屏障操作是由最后一个到达屏障点的线程执行的,这是不可以改变的

。然而,在实际使用中,

可能会出现由第n个到达屏障点的线程执行特殊的操作(或者说 屏障操作),那么就可以使用 CyclicBarrier.await()进行判断

,如上面的例子,第一个和最后一个到达屏障点的线程都执行特殊的操作。

   顺便说一下,可能会对本例子中前5个输出的顺序 有所疑惑:thread_3 通过awiat()方法返回的索引值,可知 thread_3 是最后一个到达屏障点的,但为什么输出的顺序却是第三个,而不是最后一个;在这就要真正理解CyclicBarrier,CyclicBarrier 本质上是一把锁,多个线程在使用CyclicBarrier 对象时,是需要先获取锁,即需要互斥访问,所以调用await( )方法不一定能够马上获取锁。上面的例子,是先打印输出,再去获取锁,所以输出顺序不是到达屏障点的顺序。

@ Example2 应用场景

   下面的例子是:CyclicBarrier用于多线程计算数据,最后合并计算结果的场景。比如我们用一个Excel保存了用户所有银行流水,每个Sheet保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。

public class BankWaterService implements Runnable {//创建4个屏障,处理完后执行当前类的run方法private CyclicBarrier barrier = new CyclicBarrier(4,this);//假设只有4个sheet,所以只启动4个线程private Executor excutor = Executors.newFixedThreadPool(4);//保存每个sheet计算出的结果private ConcurrentHashMap< String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();private void count(){for(int i=0;i<4;i++){excutor.execute(new Runnable() {@Overridepublic void run() {//计算过程.....//存储计算结果sheetBankWaterCount.put(Thread.currentThread().getName(), 1);try {//计算完成,插入屏障barrier.await();//后续操作,将会使用到四个线程的运行结果....System.out.println("线程"+Thread.currentThread().getName()+"运行结束,最终的计算结果:"+sheetBankWaterCount.get("result"));} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}});}}@Overridepublic void run() {int result = 0;for(Entry<String, Integer> item : sheetBankWaterCount.entrySet()){result += item.getValue();}sheetBankWaterCount.put("result", result);}public static void main(String[] args) {BankWaterService bankWaterService = new BankWaterService();bankWaterService.count();}
}
复制代码

运行结果:

线程pool-1-thread-4运行结束,最终的计算结果:4
线程pool-1-thread-2运行结束,最终的计算结果:4
线程pool-1-thread-1运行结束,最终的计算结果:4
线程pool-1-thread-3运行结束,最终的计算结果:4
复制代码

CyclicBarrier和CountDownLatch的区别

  • CountDownLatch: 一个线程(或者多个线程), 等待另外N个线程完成某个事情之后才能执行。而这N个线程通过调用CountDownLatch.countDown()方法 来告知“某件事件”完成,即计数减一。而一个线程(或者多个线程)则通过CountDownLatch.awiat( ) 进入等待状态,直到 CountDownLatch的计数为0时,才会全部被唤醒
  • CyclicBarrier : N个线程相互等待,任何一个线程完成某个事情之前,所有的线程都必须等待。
    CountDownLatch 是计数器, 线程完成一个就记一个, 就像 报数一样, 只不过是递减的.
    而CyclicBarrier更像一个水闸, 线程执行就想水流, 在水闸处都会堵住, 等到水满(线程到齐)了, 才开始泄流.
  • CountDownLatch只能使用一次,CyclicBarrier则可以通过reset( )方法重置后,重新使用。所以
    CyclicBarrier可以用于更复杂的业务场景。

    例如:计算错误,可以重置计数器,并让线程重新执行一次。

文章源地址:https://www.cnblogs.com/jinggod/p/8494193.html

并发工具类(二)同步屏障CyclicBarrier相关推荐

  1. 并发工具类:CountDownLatch、CyclicBarrier、Semaphore

    在多线程的场景下,有些并发流程需要人为来控制,在JDK的并发包里提供了几个并发工具类:CountDownLatch.CyclicBarrier.Semaphore. 一.CountDownLatch ...

  2. 【重难点】【JUC 02】volitale 常用模式 、JUC 下有哪些内容 、并发工具类

    [重难点][JUC 02]volitale 常用模式 .JUC 下有哪些内容 .并发工具类 文章目录 [重难点][JUC 02]volitale 常用模式 .JUC 下有哪些内容 .并发工具类 一.v ...

  3. 《Java并发编程的艺术》读书笔记 - 第八章 - Java中的并发工具类

    目录 前言 等待多线程完成的 CountDownLatch 示例 同步屏障 CyclicBarrier 示例 CyclicBarrier 和 CountDownLatch 的区别 控制并发线程数量的 ...

  4. 并发工具类(四)线程间的交换数据 Exchanger

    前言   JDK中为了处理线程之间的同步问题,除了提供锁机制之外,还提供了几个非常有用的并发工具类:CountDownLatch.CyclicBarrier.Semphore.Exchanger.Ph ...

  5. 彻底理解Java并发:Java并发工具类

    本篇内容包括:Java 并发工具类的介绍.使用方式与 Demo,包括了 CountDownLatch(线程计数器).CyclicBarrier(回环栅栏).Semaphore(信号量) 以及 Exch ...

  6. Java并发指南9:AQS共享模式与并发工具类的实现

    一行一行源码分析清楚 AbstractQueuedSynchronizer (三) 转自:https://javadoop.com/post/AbstractQueuedSynchronizer-3 ...

  7. Java高并发编程(十):Java并发工具类

    1. 等待多线程完成的CountDownLatch CountDownLatch允许一个或多个线程等待其他线程完成操作. 1.1 应用场景 假如有这样一个需求:我们需要解析一个Excel里多个shee ...

  8. java并发的艺术-读书笔记-第八章常用的并发工具类

    jdk中提供了几个非常有用的工具类,分别是CountDownLatch,CyclicBarrier和semaphore exchanger CountDownLatch:允许一个或者多个线程等待其他线 ...

  9. Java 中的并发工具类

    From: https://blog.wuwii.com/juc-utils.html java.util.concurrent 下提供了一些辅助类来帮助我们在并发编程的设计. 学习了 AQS 后再了 ...

最新文章

  1. Ironic 裸金属实例的部署流程
  2. 数组中两数相加等于特定值,以字符串的形式输出两数角标
  3. python中的if not语句_python中使用if not x 语句用法
  4. EIGRP个人学习笔记
  5. 汇编语言(二十一)之数值交换与自增
  6. 百万人学AI 万人在线大会, 15+ 场直播抢先看!
  7. plsql查询不显示结果_管理NVivo的查询结果
  8. 怎么用python找因子_python找出因数与质因数的方法
  9. mac上iphone4刷机与越狱(二)
  10. 知识图谱构建通俗理解
  11. Arduino开发板使用TFT LCD液晶显示屏的终极新手入门指南
  12. Spring 框架学习—控制反转(IOC)
  13. 听说crmeb多商户增加了种草功能?
  14. 【归档】Kata Containers 2.0 介绍
  15. 微信小程序和uniapp开发工具
  16. c语言提供了三种预处理命令,9、C语言之预处理命令
  17. 网易云音乐.uc格式的缓存文件转.mp3
  18. github 遇到的问题
  19. YUV420P,YUV420,YUV420SP区别
  20. 线性回归与岭回归以及LASSO回归结果比较

热门文章

  1. 看门狗(独立看门狗)与窗口看门狗的区别!
  2. 线程:suspend与resume方法
  3. oracle中把函数的执行权限赋个某个用户
  4. Greenplum添加mirror步骤
  5. 呼叫中心团队管理浅谈
  6. 初学Node(五)文件I/O
  7. 制作一本《First Love, Last Rites》之二
  8. 将spfile从ASM里迁移到文件系统
  9. 解决selenium用cookies时候报错selenium.common.exceptions.InvalidArgumentException: Message: invalid argument
  10. FCKeditor 2.6 精简版