概述

JDK中提供了一些用于线程之间协同等待的工具类,CountDownLatch和CyclicBarrier就是最典型的两个线程同步辅助类。下面分别详细介绍这两个类,以及他们之间的异同点。

CyclicBarrier类

CyclicBarrier翻译过来就是:循环的屏障。什么是循环?可以重复利用呗,对这个类就是一个可以重复利用的屏障类。CyclicBarrier主要用于一组固定大小的线程之间,各个线程之间相互等待,当所有线程都完成某项任务之后,才能执行之后的任务。
如下场景:

有若干个线程都需要向一个数据库写数据,但是必须要所有的线程都讲数据写入完毕他们才能继续做之后的事情。

分析一下这个场景的特征:

  • 各个线程都必须完成某项任务(写数据)才能继续做后续的任务;
  • 各个线程需要相互等待,不能独善其身。

这种场景便可以利用CyclicBarrier来完美解决。

常用函数

本节介绍CyclicBarrier的基本操作函数。

构造函数

有两种类型的构造函数,函数签名分别如下:

1
2
public CyclicBarrier(int parties, Runnable barrierAction)
public CyclicBarrier(int parties)

参数parties表示一共有多少线程参与这次“活动”,参数barrierAction是可选的,用来指定当所有线程都完成这些必须的“神秘任务”之后需要干的事情,所以barrierAction这里的动作在一个相互等待的循环内只会执行一次。

getParties函数

getParties用来获取当前的CyclicBarrier一共有多少线程参数与,函数签名如下:

1
public int getParties()

返回参与“活动”的线程个数。

await函数

await函数用来执行等待操作,有两种类型的函数签名:

1
2
3
4
5
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException

第一个函数是一个无参函数,第二个函数可以指定等待的超时时间。它们的作用是:一直等待知道所有参与“活动”的线程都调用过await函数,如果当前线程不是即将调用await函数的的最后一个线程,当前线程将会被挂起,直到下列某一种情况发生:

  • 最后一个线程调用了await函数;
  • 某个线程打断了当前线程;
  • 某个线程打断了其他某个正在等待的线程;
  • 其他某个线程等待时间超过给定的超时时间;
  • 其他某个线程调用了reset函数。

如果等待过程中线程被打断了,则会抛出InterruptedException异常;
如果等待过程中出现下列情况中的某一种情况,则会抛出BrokenBarrierException异常:

  • 其他线程被打断了;
  • 当前线程等待超时了;
  • 当前CyclicBarrier被reset了;
  • 等待过程中CyclicBarrier损坏了;
  • 构造函数中指定的barrierAction在执行过程中发生了异常。

如果等待时间超过给定的最大等待时间,则会抛出TimeoutException异常,并且这个时候其他已经嗲用过await函数的线程将会继续后续的动作。

返回值:返回当前线程在调用过await函数的所以线程中的编号,编号为parties-1的表示第一个调用await函数,编号为0表示是最后一个调用await函数。

isBroken函数

给函数用来判断barrier是否已经损坏,函数签名如下:

1
public boolean isBroken()

如果因为任何原因被损坏返回true,否则返回false

reset函数

顾名思义,这个函数用来重置barrier,函数签名如下:

1
public void reset()

如果调用了该函数,则在等待的线程将会抛出BrokenBarrierException异常。

getNumberWaiting函数

该函数用来获取当前正在等待该barrier的线程数,函数签名如下:

1
public int getNumberWaiting()

模拟实验

下面用代码实现下面的场景:

有5个线程都需要向一个数据库写数据,但是必须要所有的线程都讲数据写入完毕他们才能继续做之后的事情。

一般情况

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.winwill.test;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * @author qifuguang
 * @date 15/8/25 00:34
 */
public class TestCyclicBarrier {    private static final int THREAD_NUMBER = 5;
    private static final Random RANDOM = new Random();

    public static void main(String[] args) {        CyclicBarrier barrier = new CyclicBarrier(THREAD_NUMBER, new Runnable() {            public void run() {                System.out.println(Thread.currentThread().getId() + ":我宣布,所有小伙伴写入数据完毕");
            }
        });
        for (int i = 0; i < THREAD_NUMBER; i++) {            Thread t = new Thread(new Worker(barrier));
            t.start();
        }
    }

    static class Worker implements Runnable {        private CyclicBarrier barrier;

        public Worker(CyclicBarrier barrier) {            this.barrier = barrier;
        }

        public void run() {            int time = RANDOM.nextInt(1000);
            System.out.println(Thread.currentThread().getId() + ":我需要" + time + "毫秒时间写入数据.");
            try {                Thread.sleep(time);
            } catch (InterruptedException e) {                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getId() + ":写入数据完毕,等待其他小伙伴...");
            try {                barrier.await(); // 等待所有线程都调用过此函数才能进行后续动作
            } catch (InterruptedException e) {                e.printStackTrace();
            } catch (BrokenBarrierException e) {                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getId() + ":所有线程都写入数据完毕,继续干活...");
        }
    }
}

运行结果如下:

10:我需要16毫秒时间写入数据.
11:我需要353毫秒时间写入数据.
12:我需要101毫秒时间写入数据.
13:我需要744毫秒时间写入数据.
14:我需要51毫秒时间写入数据.
10:写入数据完毕,等待其他小伙伴…
14:写入数据完毕,等待其他小伙伴…
12:写入数据完毕,等待其他小伙伴…
11:写入数据完毕,等待其他小伙伴…
13:写入数据完毕,等待其他小伙伴…
13:我宣布,所有小伙伴写入数据完毕
13:所有线程都写入数据完毕,继续干活…
10:所有线程都写入数据完毕,继续干活…
12:所有线程都写入数据完毕,继续干活…
14:所有线程都写入数据完毕,继续干活…
11:所有线程都写入数据完毕,继续干活…

可以看到,线程小伙伴们非常团结,写完自己的数据之后都在等待其他小伙伴,所有小伙伴都完成之后才继续后续的动作。

重复使用

上面的例子并没有体现CyclicBarrier可以循环使用的特点,所以有如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.winwill.test;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * @author qifuguang
 * @date 15/8/25 00:34
 */
public class TestCyclicBarrier {    private static final int THREAD_NUMBER = 5;
    private static final Random RANDOM = new Random();

    public static void main(String[] args) throws Exception {        CyclicBarrier barrier = new CyclicBarrier(THREAD_NUMBER, new Runnable() {            public void run() {                System.out.println(Thread.currentThread().getId() + ":我宣布,所有小伙伴写入数据完毕");
            }
        });
        for (int i = 0; i < THREAD_NUMBER; i++) {            Thread t = new Thread(new Worker(barrier));
            t.start();
        }
        Thread.sleep(10000);
        System.out.println("================barrier重用==========================");
        for (int i = 0; i < THREAD_NUMBER; i++) {            Thread t = new Thread(new Worker(barrier));
            t.start();
        }
    }

    static class Worker implements Runnable {        private CyclicBarrier barrier;

        public Worker(CyclicBarrier barrier) {            this.barrier = barrier;
        }

        public void run() {            int time = RANDOM.nextInt(1000);
            System.out.println(Thread.currentThread().getId() + ":我需要" + time + "毫秒时间写入数据.");
            try {                Thread.sleep(time);
            } catch (InterruptedException e) {                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getId() + ":写入数据完毕,等待其他小伙伴...");
            try {                barrier.await(); // 等待所有线程都调用过此函数才能进行后续动作
            } catch (InterruptedException e) {                e.printStackTrace();
            } catch (BrokenBarrierException e) {                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getId() + ":所有线程都写入数据完毕,继续干活...");
        }
    }
}

运行结果:

10:我需要228毫秒时间写入数据.
11:我需要312毫秒时间写入数据.
12:我需要521毫秒时间写入数据.
13:我需要720毫秒时间写入数据.
14:我需要377毫秒时间写入数据.
10:写入数据完毕,等待其他小伙伴…
11:写入数据完毕,等待其他小伙伴…
14:写入数据完毕,等待其他小伙伴…
12:写入数据完毕,等待其他小伙伴…
13:写入数据完毕,等待其他小伙伴…
13:我宣布,所有小伙伴写入数据完毕
13:所有线程都写入数据完毕,继续干活…
10:所有线程都写入数据完毕,继续干活…
11:所有线程都写入数据完毕,继续干活…
14:所有线程都写入数据完毕,继续干活…
12:所有线程都写入数据完毕,继续干活…
================barrier重用==========================
15:我需要212毫秒时间写入数据.
16:我需要691毫秒时间写入数据.
17:我需要530毫秒时间写入数据.
18:我需要758毫秒时间写入数据.
19:我需要604毫秒时间写入数据.
15:写入数据完毕,等待其他小伙伴…
17:写入数据完毕,等待其他小伙伴…
19:写入数据完毕,等待其他小伙伴…
16:写入数据完毕,等待其他小伙伴…
18:写入数据完毕,等待其他小伙伴…
18:我宣布,所有小伙伴写入数据完毕
18:所有线程都写入数据完毕,继续干活…
15:所有线程都写入数据完毕,继续干活…
19:所有线程都写入数据完毕,继续干活…
16:所有线程都写入数据完毕,继续干活…
17:所有线程都写入数据完毕,继续干活…

可以看到,barrier的确是重用了。

等待超时

如果await的时候设置了一个最长等待时间,并且等待超时,则会怎么样呢?下面的例子故意让一个线程延迟一段时间才开始写数据,这样就会出现先等待的线程等待最后一个线程抛出等待超时异常的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.winwill.test;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * @author qifuguang
 * @date 15/8/25 00:34
 */
public class TestCyclicBarrier {    private static final int THREAD_NUMBER = 5;
    private static final Random RANDOM = new Random();

    public static void main(String[] args) throws Exception {        CyclicBarrier barrier = new CyclicBarrier(THREAD_NUMBER, new Runnable() {            public void run() {                System.out.println(Thread.currentThread().getId() + ":我宣布,所有小伙伴写入数据完毕");
            }
        });
        for (int i = 0; i < THREAD_NUMBER; i++) {            if (i < THREAD_NUMBER - 1) {                Thread t = new Thread(new Worker(barrier));
                t.start();
            } else {  //最后一个线程故意延迟3s创建。
                Thread.sleep(3000);
                Thread t = new Thread(new Worker(barrier));
                t.start();
            }
        }
    }

    static class Worker implements Runnable {        private CyclicBarrier barrier;

        public Worker(CyclicBarrier barrier) {            this.barrier = barrier;
        }

        public void run() {            int time = RANDOM.nextInt(1000);
            System.out.println(Thread.currentThread().getId() + ":我需要" + time + "毫秒时间写入数据.");
            try {                Thread.sleep(time);
            } catch (InterruptedException e) {                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getId() + ":写入数据完毕,等待其他小伙伴...");
            try {                barrier.await(2000, TimeUnit.MILLISECONDS); // 只等待2s,必然会等待最后一个线程超时
            } catch (InterruptedException e) {                e.printStackTrace();
            } catch (BrokenBarrierException e) {                e.printStackTrace();
            } catch (TimeoutException e) {                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getId() + ":所有线程都写入数据完毕,继续干活...");
        }
    }
}

运行结果:

10:我需要820毫秒时间写入数据.
11:我需要140毫秒时间写入数据.
12:我需要640毫秒时间写入数据.
13:我需要460毫秒时间写入数据.
11:写入数据完毕,等待其他小伙伴…
13:写入数据完毕,等待其他小伙伴…
12:写入数据完毕,等待其他小伙伴…
10:写入数据完毕,等待其他小伙伴…
Java.util.concurrent.BrokenBarrierException
12:所有线程都写入数据完毕,继续干活…
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
13:所有线程都写入数据完毕,继续干活…
11:所有线程都写入数据完毕,继续干活…
10:所有线程都写入数据完毕,继续干活…
at com.winwill.test.TestCyclicBarrier$Worker.run(TestCyclicBarrier.java:52)
at java.lang.Thread.run(Thread.java:744)
java.util.concurrent.BrokenBarrierException
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at com.winwill.test.TestCyclicBarrier$Worker.run(TestCyclicBarrier.java:52)
at java.lang.Thread.run(Thread.java:744)
java.util.concurrent.TimeoutException
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:257)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at com.winwill.test.TestCyclicBarrier$Worker.run(TestCyclicBarrier.java:52)
at java.lang.Thread.run(Thread.java:744)
java.util.concurrent.BrokenBarrierException
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at com.winwill.test.TestCyclicBarrier$Worker.run(TestCyclicBarrier.java:52)
at java.lang.Thread.run(Thread.java:744)
14:我需要850毫秒时间写入数据.
java.util.concurrent.BrokenBarrierException
14:写入数据完毕,等待其他小伙伴…
14:所有线程都写入数据完毕,继续干活…
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at com.winwill.test.TestCyclicBarrier$Worker.run(TestCyclicBarrier.java:52)
at java.lang.Thread.run(Thread.java:744)

可以看到,前面四个线程等待最后一个线程超时了,这个时候他们不再等待最后这个小伙伴了,而是抛出异常并都继续后续的动作。最后这个线程屁颠屁颠地完成写入数据操作之后也继续了后续的动作。需要说明的是,发生了超时异常时候,还没有完成“神秘任务”的线程在完成任务之后不会做任何等待,而是会直接执行后续的操作。

总结

CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:

  • CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;
  • CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
  • CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。

Java多线程系列-CyclicBarrier相关推荐

  1. Java多线程系列---“JUC锁”01之 框架

    本章,我们介绍锁的架构:后面的章节将会对它们逐个进行分析介绍.目录如下: 01. Java多线程系列--"JUC锁"01之 框架 02. Java多线程系列--"JUC锁 ...

  2. Java多线程系列(九):CountDownLatch、Semaphore等4大并发工具类详解

    之前谈过高并发编程系列:4种常用Java线程锁的特点,性能比较.使用场景 ,以及高并发编程系列:ConcurrentHashMap的实现原理(JDK1.7和JDK1.8) 今天主要介绍concurre ...

  3. Java多线程系列(十):源码剖析AQS的实现原理

    在并发编程领域,AQS号称是并发同步组件的基石,很多并发同步组件都是基于AQS实现,所以想掌握好高并发编程,你需要掌握好AQS. 本篇主要通过对AQS的实现原理.数据模型.资源共享方式.获取锁的过程, ...

  4. Java多线程系列--“JUC锁”03之 公平锁(一)

    概要 本章对"公平锁"的获取锁机制进行介绍(本文的公平锁指的是互斥锁的公平锁),内容包括: 基本概念 ReentrantLock数据结构 参考代码 获取公平锁(基于JDK1.7.0 ...

  5. Java多线程系列--AQS的原理

    原文网址:Java多线程系列--AQS的原理_IT利刃出鞘的博客-CSDN博客 简介 本文介绍Java中的AQS的原理. Java的AQS是JDK自带的锁机制,是JUC(java.util.concu ...

  6. java多线程系列(四)---ReentrantLock的使用

    Lock的使用 前言:本系列将从零开始讲解java多线程相关的技术,内容参考于<java多线程核心技术>与<java并发编程实战>等相关资料,希望站在巨人的肩膀上,再通过我的理 ...

  7. Java多线程系列--“JUC原子类”01之 框架

    2019独角兽企业重金招聘Python工程师标准>>> Java多线程系列--"JUC原子类"01之 框架 根据修改的数据类型,可以将JUC包中的原子操作类可以分 ...

  8. Java多线程系列--“JUC原子类”03之 AtomicLongArray原子类

    概要 AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray这3个数组类型的原子类的原理和用法相似.本章以AtomicLongArray对数 ...

  9. Java多线程系列--“JUC线程池”06之 Callable和Future

    转载自  Java多线程系列--"JUC线程池"06之 Callable和Future Callable 和 Future 简介 Callable 和 Future 是比较有趣的一 ...

最新文章

  1. Python os模块常用命令
  2. SharePoint服务器如果需要安装杀毒软件, 需要注意什么?
  3. ubuntu 突然不能 sudo成功,报错su: Authentication failure
  4. 【学习笔记】之多项式使人头秃
  5. python16进制字节序_第 1 章 套接字、IPv4和简单的客户端/服务器编程
  6. php 简单的解密和加密
  7. 代表中国为世界做出探索,杭州城市大脑获IDC亚太区智慧城市大奖
  8. Linux下文件描述符
  9. Elasticsearch:用于内容丰富的文本分析
  10. C++ 定义 string
  11. 对java中接口的简单理解
  12. 用C#生成随机中文汉字验证码
  13. yum 安装oraclejdk_kubernetes-16:制作oraclejdk镜像
  14. Linux操作系统下进程讲解(史上最强总结)
  15. googletest,笔记20190821
  16. Mysql-slowlog
  17. 更新了 pe_xscan 和 ClosePc
  18. linux V4L2子系统——v4l2架构(5)之v4l2_device与v4l2_subdev异步机制
  19. AI微信小程序源码下载人脸照片AI转换动漫照片全新源码安装简单无需服务器域名
  20. 计算机网络_实验5_集线器与交换机对比

热门文章

  1. 【独立站运营】营销邮件被判定为垃圾邮件?四个方法教你避开
  2. GPL和LGPL协议
  3. 从达沃斯世界经济论坛,看区块链和数字货币的三大发展趋势
  4. js 正则例子 验证美国电话号码
  5. about hashCode again understand
  6. CSDN发表文章数量限制的缺陷
  7. Siege 简单教程
  8. mb_detect_encoding php,php mb_detect_encoding检测字符串编码有误的问题
  9. 哪些云主机值得推荐?
  10. android设备打开5555远程连接端口