Phaser并发阶段器

Phaser由JDK1.7提出,是一个复杂强大的同步辅助类,是对同步工具类CountDownLatch和CyclicBarrier的综合升级,能够支持分阶段实现等待的业务场景。

我们可以回忆下CountDownLatch讲的是先指定N个线程,在N个线程干完活之前,其它线程都需要等待(导游等待旅游团所有人上车才能开车),而CyclicBarrier讲的是先指定N个线程。等N个线程到齐了大家同时干活(多个驴友相约去旅游,先到的需要等待后来的),而Phaser是两者的结合,可以理解为先指定N个线程,等N个线程到齐后开始干第一阶段的活,等第一阶段所有的线程都干完活了,接着N个线程开始干第二阶段的活,直到所有的阶段完成工作,程序结束,当然需要注意的是每个阶段可以根据业务需要新增或者删除一些线程,并不是开始指定多少个线程每个阶段就必须有多少个线程。

入门体验

看了概念可能不容易理解,从一个小demo入手体验下

public class PhaserDemo1 {// 指定随机种子private static Random random = new Random(System.currentTimeMillis());public static void main(String[] args) {Phaser phaser = new Phaser();// 将线程注册到phaserphaser.register();for (int i = 0; i <5 ; i++) {Task task = new Task(phaser);task.start();}phaser.arriveAndAwaitAdvance();System.out.println("all task execute close");}static class Task extends Thread{Phaser phaser;public Task(Phaser phaser){this.phaser = phaser;this.phaser.register();}@Overridepublic void run() {try {System.out.println(Thread.currentThread().getName()+"开始执行");TimeUnit.SECONDS.sleep(random.nextInt(5));System.out.println(Thread.currentThread().getName()+"执行完毕");// 类似CountDownLatch中的 awaitphaser.arriveAndAwaitAdvance();} catch (InterruptedException e) {e.printStackTrace();}}}
}

不知道有没有这样的疑惑,phaser.register是向phaser去注册这个线程,那么为什么主线程也需要注册呢?

其实很简单主线程需要等待所有子线程执行完毕才能继续往下面执行所以必须要phaser.arriveAndAwaitAdvance();阻塞等待,而这个语句是意思当前线程已经到达屏障,在此等待一段时间等条件满足后需要向下一个屏障继续执行,如果没有主线程的phaser.register,直接调用phaser.arriveAndAwaitAdvance,在源码中提到可能会有异常,所以必须在主程序中注册phaser.register();

/* <p>It is a usage error for an unregistered party to invoke this
* method.  However, this error may result in an {@code
* IllegalStateException} only upon some subsequent operation on
* this phaser, if ever.
*/
译:
未注册方调用此函数是一个使用错误方法。但是,这个错误可能会导致
{@codeIllegalStateException}仅在一些后续操作这个相位器,如果有的话。

Phaser解决分科考试问题

从体验的示例中其实没看出其优势在哪里,上诉场景完全可以采用CountDownLatch,所以现在换一种场景来说明Phaser的优势。

假设某校举行期末考试,有三门考试语文、数学、英语,每门课允许学生提前交卷,只有当所有学生完成考试后才能举行下一次的考试,这就是典型的分阶段任务处理,示例图如下。

将上诉场景语义化如下

public class PhaserExam {public static Random random = new Random(System.currentTimeMillis());public static void main(String[] args) {// 一次初始化2个 相当于两次registerPhaser phaser = new Phaser(2);for (int i = 0; i <2 ; i++) {Exam exam = new Exam(phaser,random.nextLong());exam.start();}}static class Exam extends Thread{Phaser phaser;Long id;public Exam(Phaser phaser,Long id){this.phaser = phaser;this.id = id;}@Overridepublic void run() {try {System.out.println(Thread.currentThread().getName()+"===开始语文考试");TimeUnit.SECONDS.sleep(random.nextInt(5));System.out.println(Thread.currentThread().getName()+"===结束语文考试");phaser.arriveAndAwaitAdvance();System.out.println(Thread.currentThread().getName()+"===开始数学考试");TimeUnit.SECONDS.sleep(random.nextInt(5));System.out.println(Thread.currentThread().getName()+"===结束数学考试");phaser.arriveAndAwaitAdvance();System.out.println(Thread.currentThread().getName()+"===开始英语考试");TimeUnit.SECONDS.sleep(random.nextInt(5));System.out.println(Thread.currentThread().getName()+"===结束英语考试");phaser.arriveAndAwaitAdvance();} catch (InterruptedException e) {e.printStackTrace();}}}
}

代码执行结果如下,可以看到三个阶段都是等待所有线程执行完毕后才往下执行,相当于多个栅栏。

到这里请注意,通过Phaser类的构造方法构建的party数,也就是线程数需要和循环的次数对应,不然可能影响后续阶段器的正常运行。

两个重要状态

在Phaser内有2个重要状态,分别是phase和party,乍一看很难理解,他们的定义如下。

phase就是阶段,如上面提到的语文、数学、英语考试这每个考试对应一个阶段,不过phase是从0开始的,当所有任务执行完毕,准备进入下一个阶段时phase就会加一。

party对应注册到Phaser线程数,party初始值有两种形式

  • 方法一就是通过Phaser的有参构造初始化party值。

  • 方法二采用动态注册方法phaser.register()或phaser.bulkRegister(线程数)指定线程数,注销线程调用phaser.arriveAndDeregister()方法party值会减一。

Phaser常用API

Phaser常用API总结如下所示

// 获取Phaser阶段数,默认0
public final int getPhase();
// 向Phaser注册一个线程
public int register();
// 向Phaser注册多个线程
public int bulkRegister(int parties);
// 获取已经注册的线程数,也就是重要状态party的值
public int getRegisteredParties();
// 到达并且等待其它线程到达
public int arriveAndAwaitAdvance();
// 到达后注销不等待其它线程,继续往下执行
public int arriveAndDeregister();
// 已到达线程数
public int getArrivedParties();
// 未到达线程数
public int getUnarrivedParties();
// Phaser是否结束 只有当party的数量是0或者调用方法forceTermination时才会结束
public boolean isTerminated();
// 结束Phaser
public void forceTermination();

代码演示如下

public class PhaserApiTest {public static void main(String[] args) throws InterruptedException {Phaser phaser = new Phaser(5);System.out.println("当前阶段"+phaser.getPhase());System.out.println("注册线程数==="+phaser.getRegisteredParties());// 向phaser注册一个线程phaser.register();System.out.println("注册线程数==="+phaser.getRegisteredParties());// 向phaser注册多个线程,批量注册phaser.bulkRegister(4);System.out.println("注册线程数==="+phaser.getRegisteredParties());new Thread(()->{// 到达且等待phaser.arriveAndAwaitAdvance();System.out.println(Thread.currentThread().getName()+"===执行1");}).start();new Thread(()->{// 到达不等待,从phaser中注销一个线程phaser.arriveAndDeregister();System.out.println(Thread.currentThread().getName()+"===执行2");}).start();TimeUnit.SECONDS.sleep(3);System.out.println("已到达线程数==="+phaser.getArrivedParties());System.out.println("未到达线程数==="+phaser.getUnarrivedParties());System.out.println("Phaser是否结束"+phaser.isTerminated());phaser.forceTermination();System.out.println("Phaser是否结束"+phaser.isTerminated());}
}

执行结果如下所示

arriveAndAwaitAdvance解析

arriveAndAwaitAdvance是Phaser中一个重要实现阻塞的API,其实arriveAndAwaitAdvance是由arrive方法和awaitAdvance方法合并而来,两个方法的作用分别为

  • arrive:到达屏障但不阻塞,返回值为到达的阶段号。

  • awaitAdvance(int):接收一个 int 值的阶段号,在指定的屏障处阻塞。

测试代码如下

public class PhaserTestArrive {public static Random random = new Random(System.currentTimeMillis());public static void main(String[] args) {Phaser phaser = new Phaser(5);for (int i = 0; i <5 ; i++) {new Task(i,phaser).start();}phaser.register();// 主线程需要调用arrive的原因是主线程注册的第六个线程还未到达,需要手动到达,才能调用awaitAdvance阻塞屏障phaser.arrive();// 因为Phaser线程数为6,所以即使5个线程已经到达,但是还差主线程的一个,目前阶段数就是0phaser.awaitAdvance(0);System.out.println("all task is end");}static class Task extends Thread{Phaser phaser;public Task(int num,Phaser phaser){super("Thread--"+String.valueOf(num));this.phaser = phaser;}@Overridepublic void run() {try {System.out.println(Thread.currentThread().getName()+"===task1 is start");TimeUnit.SECONDS.sleep(random.nextInt(3));System.out.println(Thread.currentThread().getName()+"===task1 is end");// 到达且不等待phaser.arrive();System.out.println(Thread.currentThread().getName()+"===task2 is start");TimeUnit.SECONDS.sleep(random.nextInt(3));System.out.println(Thread.currentThread().getName()+"===task2 is end");} catch (InterruptedException e) {e.printStackTrace();}}}
}

中断响应

我们需要特别注意的就是Phaser所有API中只有awaitAdvanceInterruptibly是响应中断的,其余全部不会响应中断所以不需要对其进行异常处理,演示如下

public static void main(String[] args) {Phaser phaser = new Phaser(3);Thread T1 = new Thread(()->{try {phaser.awaitAdvanceInterruptibly(phaser.getPhase());} catch (InterruptedException e) {System.out.println("中断异常");e.printStackTrace();}//phaser.arriveAndAwaitAdvance();});T1.start();T1.interrupt();phaser.arriveAndAwaitAdvance();}

Phaser并发阶段器相关推荐

  1. EBS并发管理器启动失败,系统暂挂,在重置计数器之前修复管理程序

    今天EBS安装补丁之后,因为停并发管理器的时候,因为关闭EBS应用时,并发管理器没有在前台停止,就直接停了应用服务,导致启动时,并发管理器直接起不来了,使用adcmctl.sh也没有办法启动. 进入系 ...

  2. python编写下载器可暂停_python 并发下载器实现方法示例

    本文实例讲述了python 并发下载器实现方法.分享给大家供大家参考,具体如下: 并发下载器 并发下载原理 from gevent import monkey import gevent import ...

  3. EBS并发管理器请求汇总(按照并发消耗时间,等待时间,平均等待事件等汇总)...

    此数据集用于确定正在使用中并发管理器,并可与实际的在启动时分配的并发管理器.而且考虑完成状态为 正常/警告 的请求. select q.concurrent_queue_name,count(*) c ...

  4. 12 Go 并发调度器模型

    一.聊聊并发这件事 在基础系列我们学习了Go的并发编程,对并发的概念已经有了一定的了解.在各种现代高级语言中,对并发的支持已经是标配,但Go的并发无论在开发效率还是在性能上都有相当的优越性.Go有什么 ...

  5. linux调度器并发,12 Go 并发调度器模型

    一.聊聊并发这件事 在基础系列我们学习了Go的并发编程,对并发的概念已经有了一定的了解.在各种现代高级语言中,对并发的支持已经是标配,但Go的并发无论在开发效率还是在性能上都有相当的优越性.Go有什么 ...

  6. java G1(并发)垃圾收集器(二)

    官方文档:https://docs.oracle.com/javase/8/docs/technotes/guides/vm/gctuning/g1_gc_tuning.html#sthref50 G ...

  7. Go 1.5正式发布:实现自举、引入并发垃圾收集器

    在经历了几个Beta版本和一个rc版本之后,Go 1.5终于正式发布了.这是一个很重要的版本,实现方面有很多变化.因为这个版本仍然遵循Go 1兼容性承诺,开发团队预计,所有的Go程序应该都可以和以前一 ...

  8. python 并发下载器

    目标:使用协程实现网络图片下载 思路: 1.根据url地址请求网络资源 2.在本地创建文件准备保存 3.读取网络资源数据 4.把读取的网络资源写入到本地文件中 5.做异常捕获 代码: 单线程下载: i ...

  9. 15-3 并发调度器

最新文章

  1. Tensroflow随笔-测试集
  2. wxWidgets:wxSearchCtrl类用法
  3. 在ASP.NET MVC 中获取当前URL、controller、action
  4. python练手经典100例微盘_Python练手项目实例汇总(附源码下载)
  5. springboot获取客户端发来的数据
  6. 第五篇:关于MVPArms打包混淆及报错解决
  7. 人工智能、机器学习、神经网络、深度学习之间的关系
  8. 字谜游戏(b)C语言
  9. 怎样用360改计算机名称,360随身wifi网络名称怎么修改
  10. sam格式的结构和意义_NGS数据格式02-SAM/BAM最详细解读
  11. python pandas excle 把两列合并新的一列
  12. 2019年第八届java B组蓝桥杯省赛真题
  13. Java后端开发工程师简历加分项:个人在线简历的搭建
  14. Django REST framework学习笔记
  15. 搭建私有云盘ownCloud
  16. (25):SPA单页面的理解
  17. 软件测试--两个星期的工作经历
  18. 手把手教你用Python爬中国电影票房数据
  19. xlwing 边框_Python与Excel交互——Xlwings实战
  20. 基于微信小程序的宠物寄养平台小程序

热门文章

  1. MT6752/MT6755处理器功能介绍,MT6752/MT6755芯片资料分享
  2. 电信 802.1p 设置_电信VoLTE免费开通
  3. 全球人口突破80亿!免费分享全球人口分布数据
  4. python 判断excel单元格为空_用python检测空白Excel单元格
  5. 误删除系统libselinux.so.1之后
  6. 不一样的视角来学习Spring源码之AOP---上
  7. 统计学习导论(ISLR)(三):线性回归(超详细介绍)
  8. 用pygame实现一个简单的垃圾分类小游戏(已获校级二等奖)
  9. IC卡读卡器卡号输出格式
  10. DID-双重差分模型