目录

一、使用

1、基本概念

2、实现CountDownLatch的效果

3、实现CyclicBarrier的效果

4、灵活调整parties

5、onAdvance

6、父子Phaser

二、源码解析

1、定义

2、构造方法

3、register / bulkRegister

4、arrive / arriveAndDeregister / arriveAndAwaitAdvance

5、awaitAdvance / awaitAdvanceInterruptibly / awaitAdvanceInterruptibly

6、forceTermination / isTerminated

7、getArrivedParties / getUnarrivedParties / getRegisteredParties / getPhase


Phaser 是Java7引入一个用于控制任务阶段执行的可重复使用的同步器,包含了CountDownLatch和CyclicBarrier的功能,比他们更加灵活,更加强大,本篇博客就详细探讨该类的使用和

一、使用

1、基本概念

parties:参与线程的个数,跟CountDownLatch或者CyclicBarrier的构造方法的参数的含义是一样的,不同的是这两个只能在构造方法中指定,不能调整,而Phaser提供了调整的方法。

register / deregister : register就是通知Phaser参与等待的线程数增加了,deregister就是通知Phaser参与等待的线程数减少了,然后相应调整parties

arrive / advance:arrive跟CyclicBarrier中到达栅栏是一个意思,当所有parties个线程都arrive了,则触发advance,默认实现下如果此时parties是0,则会终止Phaser,否则将phase加1,同时将未到达线程数从0恢复至parties

phase:表示执行任务的阶段,初始值是0,每一次advance都会将该值加1,最大值是Integer.MAX_VALUE;如果Phaser被终止了,则该值为负数,此时所有的register,arrive或者await操作都会立即返回。

父子Phaser:父子Phaser一方面可以避免parties线程过多时导致cas修改state容易失败,另一方面可以基于父子Phaser实现复杂的执行任务的阶段控制。子Phaser的parties线程可以有多个,但是对于父Phaser相当于只有一个,只有子Phaser所有的parties线程都到达的时候才通知父Phaser当前子Phaser已到达,只有子Phaser所有的parties线程都被注销(deregister)了才会向父Phaser注销当前子Phaser。另外在多级父子Phaser下,子Phaser的phase永远以上一级的Phaser的phase为准,如果不一致则修改成一致,并且所有的Phaser的root属性都指向同一个祖先Phaser,调用internalAwaitAdvance方法时也是在该Phaser上调用,即所有的子Phaser都共享祖先Phaser的等待线程链表,从而实现最后一个到达的子Phaser可以唤醒其他子Phaser关联的等待线程。

2、实现CountDownLatch的效果

测试用例如下:

 @Testpublic void test() throws Exception {int num=6;Phaser phaser=new Phaser(num);Random random=new Random();Runnable task=new Runnable() {@Overridepublic void run() {try {Thread.sleep(random.nextInt(1000));System.out.println(Thread.currentThread().getName()+" exit,time->"+System.currentTimeMillis());//表示当前线程已到达 phaser.arrive();} catch (InterruptedException e) {e.printStackTrace();}}};for(int i=0;i<num;i++){new Thread(task).start();}System.out.println("main thread start await,time->"+System.currentTimeMillis());//等待其他线程都到达phaser.awaitAdvance(phaser.getPhase());System.out.println("main thread end,time->"+System.currentTimeMillis());}@Testpublic void test2() throws Exception {int num=6;CountDownLatch countDownLatch=new CountDownLatch(num);Random random=new Random();Runnable task=new Runnable() {@Overridepublic void run() {try {Thread.sleep(random.nextInt(1000));System.out.println(Thread.currentThread().getName()+" exit,time->"+System.currentTimeMillis());countDownLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();}}};for(int i=0;i<num;i++){new Thread(task).start();}System.out.println("main thread start await,time->"+System.currentTimeMillis());countDownLatch.await();System.out.println("main thread end,time->"+System.currentTimeMillis());}

上述测试用例的输出是一样的,主线程等待5个子线程执行完任务然后退出,如下:

3、实现CyclicBarrier的效果

测试用例如下:

@Testpublic void test5() throws Exception {int num=6;Phaser phaser=new Phaser(num);Random random=new Random();Runnable task=new Runnable() {@Overridepublic void run() {try {//到达并等待其他线程到达phaser.arriveAndAwaitAdvance();System.out.println(Thread.currentThread().getName()+" start,time->"+System.currentTimeMillis());Thread.sleep(random.nextInt(1000));System.out.println(Thread.currentThread().getName()+" exit,time->"+System.currentTimeMillis());//通知当前线程已到达phaser.arrive();} catch (InterruptedException e) {e.printStackTrace();}}};for(int i=0;i<num-1;i++){new Thread(task).start();}System.out.println("main thread start await,time->"+System.currentTimeMillis());phaser.awaitAdvance(phaser.getPhase());System.out.println("all thread start,time->"+System.currentTimeMillis());phaser.awaitAdvance(phaser.getPhase());System.out.println("main thread end,time->"+System.currentTimeMillis());}@Testpublic void test6() throws Exception {int num=6;CyclicBarrier cyclicBarrier=new CyclicBarrier(num);Random random=new Random();Runnable task=new Runnable() {@Overridepublic void run() {try {cyclicBarrier.await();System.out.println(Thread.currentThread().getName()+" start,time->"+System.currentTimeMillis());Thread.sleep(random.nextInt(1000));System.out.println(Thread.currentThread().getName()+" exit,time->"+System.currentTimeMillis());cyclicBarrier.await();} catch (Exception e) {e.printStackTrace();}}};for(int i=0;i<num-1;i++){new Thread(task).start();}System.out.println("main thread start await,time->"+System.currentTimeMillis());cyclicBarrier.await();System.out.println("all thread start,time->"+System.currentTimeMillis());cyclicBarrier.await();System.out.println("main thread end,time->"+System.currentTimeMillis());}

输出如下:

CyclicBarrier的构造函数还支持传入一个Runnable,最后一个到达的线程会负责执行该Runnable,Phaser也可实现类似的功能,测试用例如下:

@Testpublic void test7() throws Exception {int num=6;CyclicBarrier cyclicBarrier=new CyclicBarrier(num, new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName()+" last arrive,time->"+System.currentTimeMillis());}});Random random=new Random();Runnable task=new Runnable() {@Overridepublic void run() {try {cyclicBarrier.await();System.out.println(Thread.currentThread().getName()+" start,time->"+System.currentTimeMillis());Thread.sleep(random.nextInt(1000));System.out.println(Thread.currentThread().getName()+" exit,time->"+System.currentTimeMillis());cyclicBarrier.await();} catch (Exception e) {e.printStackTrace();}}};for(int i=0;i<num-1;i++){new Thread(task).start();}System.out.println("main thread start await,time->"+System.currentTimeMillis());cyclicBarrier.await();System.out.println("all thread start,time->"+System.currentTimeMillis());cyclicBarrier.await();System.out.println("main thread end,time->"+System.currentTimeMillis());}@Testpublic void test8() throws Exception {int num=6;Phaser phaser=new Phaser(num);Random random=new Random();Runnable task=new Runnable() {@Overridepublic void run() {try {//getUnarrivedParties等于1时,当前线程就是最后一个达到的线程if(phaser.getUnarrivedParties()==1){System.out.println(Thread.currentThread().getName()+" last arrive,time->"+System.currentTimeMillis());}phaser.arriveAndAwaitAdvance();System.out.println(Thread.currentThread().getName()+" start,time->"+System.currentTimeMillis());Thread.sleep(random.nextInt(1000));System.out.println(Thread.currentThread().getName()+" exit,time->"+System.currentTimeMillis());if(phaser.getUnarrivedParties()==1){System.out.println(Thread.currentThread().getName()+" last arrive,time->"+System.currentTimeMillis());}phaser.arrive();} catch (InterruptedException e) {e.printStackTrace();}}};for(int i=0;i<num;i++){new Thread(task).start();}System.out.println("main thread start await,time->"+System.currentTimeMillis());phaser.awaitAdvance(phaser.getPhase());System.out.println("all thread start,time->"+System.currentTimeMillis());phaser.awaitAdvance(phaser.getPhase());System.out.println("main thread end,time->"+System.currentTimeMillis());}

输出如下:

4、灵活调整parties

上述用例中CountDownLatch、CyclicBarrier和Phaser在构造时都指定了等待的线程数,如果需要等待的线程数发生变更,则需要重新创建一个新的CountDownLatch或者CyclicBarrier实例,但是如果使用Phaser,则可以通过register方法将等待的线程数加1,通过bulkRegister方法将等待的线程数加上指定的值,通过arriveAndDeregister方法将等待的线程数减1,测试用例如下:

@Testpublic void test4() throws Exception {int num=6;Phaser phaser=new Phaser(num);Random random=new Random();Runnable task=new Runnable() {@Overridepublic void run() {try {Thread.sleep(random.nextInt(1000));System.out.println(Thread.currentThread().getName()+" exit,time->"+System.currentTimeMillis());phaser.arrive();} catch (InterruptedException e) {e.printStackTrace();}}};for(int i=0;i<num;i++){new Thread(task).start();}System.out.println("main thread start Job1 await,time->"+System.currentTimeMillis());phaser.awaitAdvance(phaser.getPhase());System.out.println("main thread doJob1 end,time->"+System.currentTimeMillis());//增加两个等待的线程数phaser.bulkRegister(2);for(int i=0;i<num+2;i++){new Thread(task).start();}System.out.println("main thread start Job2 await,time->"+System.currentTimeMillis());phaser.awaitAdvance(phaser.getPhase());System.out.println("main thread doJob2 end,time->"+System.currentTimeMillis());//减少四个等待的线程数phaser.arriveAndDeregister();phaser.arriveAndDeregister();phaser.arriveAndDeregister();phaser.arriveAndDeregister();for(int i=0;i<num+2-4;i++){new Thread(task).start();}System.out.println("main thread start Job3 await,time->"+System.currentTimeMillis());phaser.awaitAdvance(phaser.getPhase());System.out.println("main thread doJob3 end,time->"+System.currentTimeMillis());}

上述用例的输出如下:

还可以在构造函数中不指定等待的线程数,根据实际执行任务的线程数动态调整,测试用例如下:

class Task implements Runnable{Random random=new Random();Phaser phaser;public Task(Phaser phaser) {this.phaser = phaser;phaser.register();}@Overridepublic void run() {try {//到达并等待其他线程到达phaser.arriveAndAwaitAdvance();System.out.println(Thread.currentThread().getName()+" start,time->"+System.currentTimeMillis());Thread.sleep(random.nextInt(1000));System.out.println(Thread.currentThread().getName()+" exit,time->"+System.currentTimeMillis());//到达然后解除注册phaser.arriveAndDeregister();} catch (InterruptedException e) {e.printStackTrace();}}}@Testpublic void test3() throws Exception {Phaser phaser=new Phaser(){@Override//改写此方法避免parties变成0后被终止了protected boolean onAdvance(int phase, int registeredParties) {return false;}};//线程数可以是任意个for(int i=0;i<6;i++){Thread thread=new Thread(new Task(phaser));thread.start();}System.out.println("main thread start await,time->"+System.currentTimeMillis());phaser.awaitAdvance(phaser.getPhase());System.out.println("all thread start,time->"+System.currentTimeMillis());phaser.awaitAdvance(phaser.getPhase());System.out.println("main thread end,time->"+System.currentTimeMillis());System.out.println("=================");//前后的线程数可以不一致,不需要重新创建Phaser实例for(int i=0;i<4;i++){Thread thread=new Thread(new Task(phaser));thread.start();}System.out.println("main thread start await,time->"+System.currentTimeMillis());phaser.awaitAdvance(phaser.getPhase());System.out.println("all thread start,time->"+System.currentTimeMillis());phaser.awaitAdvance(phaser.getPhase());}

上述用例的输出如下:

5、onAdvance

onAdvance方法是protected方法,子类可以覆写,该方法是最后一个到达的线程执行的,如果返回true表示需要终止Phaser,否则继续下一轮的phase,因此可以借助该方法实现CyclicBarrier的回调函数功能,也可以控制Phaser的阶段数,测试用例如下:

class Task implements Runnable{Random random=new Random();Phaser phaser;public Task(Phaser phaser) {this.phaser = phaser;phaser.register();}@Overridepublic void run() {try {for(int i=0;i<5;i++) {phaser.arriveAndAwaitAdvance();if(phaser.isTerminated()){return;}System.out.println(Thread.currentThread().getName() + " start,time->" + System.currentTimeMillis());Thread.sleep(random.nextInt(1000));}} catch (InterruptedException e) {e.printStackTrace();}}}@Testpublic void test9() throws Exception {Phaser phaser=new Phaser(){@Overrideprotected boolean onAdvance(int phase, int registeredParties) {System.out.println(Thread.currentThread().getName()+" last arrive,time->"+System.currentTimeMillis());if(phase>3){return true;}return false;}};//线程数可以是任意个for(int i=0;i<3;i++){Thread thread=new Thread(new Task(phaser));thread.start();}while (!phaser.isTerminated()){int phase=phaser.getPhase();System.out.println("main thread start await,time->"+System.currentTimeMillis()+",phase->"+phase);phaser.awaitAdvance(phase);}System.out.println("main thread exit");}

其输出如下:

phase为3时,最后一个到达的线程将phase改成4,再下一次循环时,最后一个到达的线程发现phase为4了,终止Phaser,同时唤醒等待的所有线程,判断Phaser终止后就直接退出了。

6、父子Phaser

测试用例如下:

class Job implements Runnable{Random random=new Random();Phaser phaser;String name;public Job(Phaser phaser,String name) {this.phaser = phaser;this.name=name;phaser.register();}@Overridepublic void run() {try {System.out.println(Thread.currentThread().getName() + " start,time->" + System.currentTimeMillis()+",do job:"+name+",phaser->"+phaser.hashCode());Thread.sleep(random.nextInt(1000));phaser.arrive();} catch (InterruptedException e) {e.printStackTrace();}}}@Testpublic void test10() throws Exception {Phaser root=new Phaser();Phaser phaser=null;for(int i=0;i<13;i++){if(i%4==0){ //每4个线程共用一个Phaser实例phaser=new Phaser(root);}new Thread(new Job(phaser,"Job_"+i)).start();}root.awaitAdvance(root.getPhase());System.out.println("main thread exit");}

输出如下,必须等待所有的子Phaser任务都执行完成:

二、源码解析

1、定义

Phaser中包含的实例属性如下:

    //父节点private final Phaser parent;//根节点,如果parent为null,则指向自己,否则指向parent节点的root属性private final Phaser root;//两个等待队列,当phase改变时就使用另外一个队列,避免新增节点和释放节点在同一个队列上操作private final AtomicReference<QNode> evenQ;private final AtomicReference<QNode> oddQ;//保存Phaser的状态,高32位表示phase,低16位表示注册的parties,最后的16位表示未到达的线程数private volatile long state;

包含的静态属性如下:

其中QNode表示等待队列中的一个节点,其实现如下:

static final class QNode implements ForkJoinPool.ManagedBlocker {final Phaser phaser;final int phase; //等待的phasefinal boolean interruptible; //是否响应中断final boolean timed;  //是否等待指定的时间boolean wasInterrupted; //是否被中断了long nanos; //等待的时间final long deadline; //等待的终止时间volatile Thread thread; // 关联的线程QNode next; //下一个节点QNode(Phaser phaser, int phase, boolean interruptible,boolean timed, long nanos) {this.phaser = phaser;this.phase = phase;this.interruptible = interruptible;this.nanos = nanos;this.timed = timed;//计算终止时间this.deadline = timed ? System.nanoTime() + nanos : 0L;thread = Thread.currentThread();}//返回true表示终止等待public boolean isReleasable() {if (thread == null)return true;if (phaser.getPhase() != phase) { //phase改变了,所有线程都到达了thread = null;return true;}if (Thread.interrupted()) //线程被中断了wasInterrupted = true;if (wasInterrupted && interruptible) { //如果线程被中断了且需要响应线程中断thread = null;return true;}if (timed) {if (nanos > 0L) {//计算剩余的等待时间nanos = deadline - System.nanoTime();}if (nanos <= 0L) { //等待超时thread = null;return true;}}return false;}//调用此方法将线程阻塞掉public boolean block() {if (isReleasable()) //终止等待return true;else if (!timed)LockSupport.park(this); //无期限等待else if (nanos > 0L)LockSupport.parkNanos(this, nanos);//等待指定的时间//线程被唤醒,判断是否可以终止等待    return isReleasable();}}

Phaser类定义了多个常量,如下:

    //parties属性的最大值,即2^16,65536,private static final int  MAX_PARTIES     = 0xffff;//MAX_PARTIES 取Log2 的结果private static final int  PARTIES_SHIFT   = 16;//phase的最大值,使用long的高32位保存,其最大值就是int的最大值private static final int  MAX_PHASE       = Integer.MAX_VALUE;//计算state时使用,将int类型的phase左移32位,放在long类型的state的高32位保存private static final int  PHASE_SHIFT     = 32;//用来获取低16位的未到达的线程数private static final int  UNARRIVED_MASK  = 0xffff;      // to mask ints//获取parties,适用于跟long类型state求且private static final long PARTIES_MASK    = 0xffff0000L; // to mask longs//获取低32位的值private static final long COUNTS_MASK     = 0xffffffffL;//最高位为1,表示phaser需要被终止private static final long TERMINATION_BIT = 1L << 63;//表示某个线程到达了,将state减去此值private static final int  ONE_ARRIVAL     = 1;private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;//表示某个线程到达了且需要解除注册,将state减去此值private static final int  ONE_DEREGISTER  = ONE_ARRIVAL|ONE_PARTY;//parties为0时的state值private static final int  EMPTY           = 1;//CPU的核数private static final int NCPU = Runtime.getRuntime().availableProcessors();//自旋等待的次数static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;

2、构造方法

public Phaser() {this(null, 0);}public Phaser(int parties) {this(null, parties);}public Phaser(Phaser parent) {this(parent, 0);}public Phaser(Phaser parent, int parties) {//PARTIES_SHIFT的值是16,此处是无符号右移16位,即parties不能大于2^16,即65536if (parties >>> PARTIES_SHIFT != 0)throw new IllegalArgumentException("Illegal number of parties");int phase = 0;this.parent = parent;if (parent != null) {//获取根节点final Phaser root = parent.root;this.root = root;//保存根节点的evenQ和oddQthis.evenQ = root.evenQ;this.oddQ = root.oddQ;if (parties != 0)phase = parent.doRegister(1);}else {//parent为nullthis.root = this;this.evenQ = new AtomicReference<QNode>();this.oddQ = new AtomicReference<QNode>();}//计算state,注意state是一个long类型,64位//通过下面的运算可知,其高32位为原来的phase,两个低16位都是partiesthis.state = (parties == 0) ? (long)EMPTY :((long)phase << PHASE_SHIFT) |((long)parties << PARTIES_SHIFT) |((long)parties);}

3、register / bulkRegister

//parties加1
public int register() {return doRegister(1);}//将parties增加指定值
public int bulkRegister(int parties) {if (parties < 0)throw new IllegalArgumentException();if (parties == 0) //不增加参与的线程数,直接返回当前的phasereturn getPhase();return doRegister(parties);}//返回当前的phase
public final int getPhase() {return (int)(root.state >>> PHASE_SHIFT);}//将parties增加指定值,返回当前的phase
private int doRegister(int registrations) {//PARTIES_SHIFT的值是16long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;final Phaser parent = this.parent;int phase;for (;;) {//parent不为null调用后面的reconcileStatelong s = (parent == null) ? state : reconcileState();//long类型转换成int,获取long的低32位int counts = (int)s;//无符号右移16位,获取partiesint parties = counts >>> PARTIES_SHIFT;//获取最低的16位的值,即未到达的线程数int unarrived = counts & UNARRIVED_MASK;//如果超过最大值了,抛出异常if (registrations > MAX_PARTIES - parties)throw new IllegalStateException(badRegister(s));//PHASE_SHIFT是32,获取phasephase = (int)(s >>> PHASE_SHIFT);if (phase < 0) //phaser已终止,终止循环,直接返回break;if (counts != EMPTY) {                  //原parties不为0//parent为null,或者parent不为null,reconcileState() == s,即父节点的phase没有改变,如果变了则重新读取if (parent == null || reconcileState() == s) { if (unarrived == 0)             //未到达的线程数为0,所有线程都到达了,正在advance的过程中,通过internalAwaitAdvance自旋等待advance结束//然后unarrived就恢复成原来的parties了,再走下面的else if分支修改stateroot.internalAwaitAdvance(phase, null);//unarrived不等于0,cas修改state    else if (UNSAFE.compareAndSwapLong(this, stateOffset,s, s + adjust)) //s + adjust是s的低32位同adjust相加,可以理解为两者的两个低16位相加break;}}//counts等于EMPTY,parent也为null,此时是第一次注册else if (parent == null) {              // 1st root registration//计算方式同构造函数long next = ((long)phase << PHASE_SHIFT) | adjust;if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) //cas修改state,修改成功终止循环break;}else {//counts等于EMPTY,parent不为nullsynchronized (this) {               // 1st sub registrationif (state == s) {               //再一次检查state没有改变//注册到父节点,无论registrations为多少,此处都是1phase = parent.doRegister(1);if (phase < 0) //父节点已终止break;//cas修改state,如果修改失败进入while循环while (!UNSAFE.compareAndSwapLong(this, stateOffset, s,((long)phase << PHASE_SHIFT) | adjust)) {//重新读取state和phases = state;phase = (int)(root.state >>> PHASE_SHIFT);// assert (int)s == EMPTY;}//cas修改成功,终止循环break;}}}} //for循环结束return phase;}//父节点不为空时调用此方法
private long reconcileState() {final Phaser root = this.root;long s = state;if (root != this) { //root等于this时,parent为nullint phase, p;// CAS to root phase with current parties, tripping unarrivedwhile ((phase = (int)(root.state >>> PHASE_SHIFT)) !=(int)(s >>> PHASE_SHIFT) &&    //如果root节点和当前节点phase不一致,则尝试修改当前节点的state,更新phase,如果cas修改失败则while循环重置!UNSAFE.compareAndSwapLong   (this, stateOffset, s,//phase是根节点的,如果小于0,父节点被终止了,则取 (long)phase << PHASE_SHIFT) | (s & COUNTS_MASK)//如果大于0,如果当前节点的partis为0,则取 (long)phase << PHASE_SHIFT) | EMPTY,如果partis不为0,//则取 (long)phase << PHASE_SHIFT) | (s & PARTIES_MASK) | p)s = (((long)phase << PHASE_SHIFT) |((phase < 0) ? (s & COUNTS_MASK) ://获取state的parties(((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :((s & PARTIES_MASK) | p))))))s = state;}return s;}private String badRegister(long s) {return "Attempt to register more than " +MAX_PARTIES + " parties for " + stateToString(s);}private String stateToString(long s) {return super.toString() +"[phase = " + phaseOf(s) +" parties = " + partiesOf(s) +" arrived = " + arrivedOf(s) + "]";}//转成int,取低32位,再右移16位,取原来的parties
private static int partiesOf(long s) {return (int)s >>> PARTIES_SHIFT;}//右移32位获取phase
private static int phaseOf(long s) {return (int)(s >>> PHASE_SHIFT);}//获取已到达线程数,用parties减去未到达的线程数
private static int arrivedOf(long s) {int counts = (int)s;return (counts == EMPTY) ? 0 :(counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);}

4、arrive / arriveAndDeregister / arriveAndAwaitAdvance

//通知Phaser已到达
public int arrive() {return doArrive(ONE_ARRIVAL);}//通知Phaser已到达并注销
public int arriveAndDeregister() {return doArrive(ONE_DEREGISTER);}//到达并等待
public int arriveAndAwaitAdvance() {// Specialization of doArrive+awaitAdvance eliminating some reads/pathsfinal Phaser root = this.root;for (;;) {long s = (root == this) ? state : reconcileState();//获取当前的phaseint phase = (int)(s >>> PHASE_SHIFT);if (phase < 0) //如果已终止,则退出return phase;//获取未到达的线程数    int counts = (int)s;int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);if (unarrived <= 0) //unarrived不合法抛出异常throw new IllegalStateException(badArrive(s));if (UNSAFE.compareAndSwapLong(this, stateOffset, s,s -= ONE_ARRIVAL)) { //cas修改state,修改失败则重试if (unarrived > 1) //如果其他未达到的线程,则让当前线程休眠,被唤醒后返回当前的phasereturn root.internalAwaitAdvance(phase, null);//unarrived等于1,当前线程是最后一个到达的线程  if (root != this) //如果存在父节点,通知父节点子节点的任务已完成return parent.arriveAndAwaitAdvance();//unarrived等于1,没有父节点//获取parties,即参数下一个phase的线程数,此时s中未到达线程数为0long n = s & PARTIES_MASK;  // base of next stateint nextUnarrived = (int)n >>> PARTIES_SHIFT;if (onAdvance(phase, nextUnarrived)) //如果需要被终止,默认实现是如果nextUnarrived等于0了就返回truen |= TERMINATION_BIT;else if (nextUnarrived == 0) //如果到达的线程都注销了n |= EMPTY;elsen |= nextUnarrived;//设置下一个phase的未到达线程数//增加phase    int nextPhase = (phase + 1) & MAX_PHASE;n |= (long)nextPhase << PHASE_SHIFT;//cas修改stateif (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))return (int)(state >>> PHASE_SHIFT); //如果修改失败,肯定是被终止了,因为此时参数phase的线程都处于休眠或者自旋等待的过程中//唤醒等待的线程    releaseWaiters(phase);return nextPhase;}}}private int doArrive(int adjust) {final Phaser root = this.root;for (;;) {long s = (root == this) ? state : reconcileState();//获取phaseint phase = (int)(s >>> PHASE_SHIFT);if (phase < 0) //小于0,表示已终止return phase;//获取未到达的线程数    int counts = (int)s;int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);if (unarrived <= 0) //参数非法throw new IllegalStateException(badArrive(s));if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) { //修改state成功,修改失败则重试if (unarrived == 1) { //未到达的线程数为1,即当前线程是最后一个到达的//获取partieslong n = s & PARTIES_MASK;  // base of next state//nextUnarrived就等于partiesint nextUnarrived = (int)n >>> PARTIES_SHIFT;if (root == this) { //如果没有父节点 if (onAdvance(phase, nextUnarrived)) //需要被终止,默认实现下nextUnarrived等于0则返回truen |= TERMINATION_BIT;else if (nextUnarrived == 0) //原有到达的线程都注销了n |= EMPTY;elsen |= nextUnarrived;//计算下一个phase    int nextPhase = (phase + 1) & MAX_PHASE;//写入phasen |= (long)nextPhase << PHASE_SHIFT;//修改stateUNSAFE.compareAndSwapLong(this, stateOffset, s, n);//唤醒之前已到达然后等待的线程releaseWaiters(phase);}//如果有父节点else if (nextUnarrived == 0) { //有到达的线程都注销了,从父节点注销phase = parent.doArrive(ONE_DEREGISTER);//更新state,将parties置为0UNSAFE.compareAndSwapLong(this, stateOffset,s, s | EMPTY);}else//nextUnarrived不等于0,有未注销的线程,通知父节点已到达phase = parent.doArrive(ONE_ARRIVAL);}//if结束//如果不是最后到达的线程则直接返回return phase;}}}private String badArrive(long s) {return "Attempted arrival of unregistered party for " +stateToString(s);}//子类可以改写此方法,返回true表示终止Phaser,返回false表示继续下一阶段的任务
protected boolean onAdvance(int phase, int registeredParties) {return registeredParties == 0;}//执行releaseWaiters前会将phase加1,所以QNode中的phase就与当前的phase不一样了
private void releaseWaiters(int phase) {QNode q;   // first element of queueThread t;  // its threadAtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;while ((q = head.get()) != null && q.phase != (int)(root.state >>> PHASE_SHIFT)) { //QNode中的phase与当前节点的phase不一致if (head.compareAndSet(q, q.next) && //将q从链表移除,如果遍历完成head.get()返回null,终止循环(t = q.thread) != null) { //thread不为null,则唤醒该线程,为null则继续处理下一个节点q.thread = null;LockSupport.unpark(t);}}}

5、awaitAdvance / awaitAdvanceInterruptibly / awaitAdvanceInterruptibly

//无期限等待
public int awaitAdvance(int phase) {final Phaser root = this.root;//获取phaselong s = (root == this) ? state : reconcileState();int p = (int)(s >>> PHASE_SHIFT);if (phase < 0) //被终止了,直接返回 return phase;if (p == phase) //跟当前phase一致,则阻塞当前线程return root.internalAwaitAdvance(phase, null);return p;}//无期限等待,如果被中断则抛出异常
public int awaitAdvanceInterruptibly(int phase)throws InterruptedException {final Phaser root = this.root;//获取phaselong s = (root == this) ? state : reconcileState();int p = (int)(s >>> PHASE_SHIFT);if (phase < 0) //被终止了,直接返回 return phase;if (p == phase) { //跟当前phase一致QNode node = new QNode(this, phase, true, false, 0L);//阻塞当前线程p = root.internalAwaitAdvance(phase, node);if (node.wasInterrupted) //如果被中断则抛出异常throw new InterruptedException();}return p;}//等待指定的时间,如果被中断或者等待超时则抛出异常
public int awaitAdvanceInterruptibly(int phase,long timeout, TimeUnit unit)throws InterruptedException, TimeoutException {long nanos = unit.toNanos(timeout);final Phaser root = this.root;//获取phaselong s = (root == this) ? state : reconcileState();int p = (int)(s >>> PHASE_SHIFT);if (phase < 0) //被终止了,直接返回 return phase;if (p == phase) {QNode node = new QNode(this, phase, true, true, nanos);//阻塞当前线程p = root.internalAwaitAdvance(phase, node);if (node.wasInterrupted) //如果被中断则抛出异常throw new InterruptedException();else if (p == phase) //等待超时throw new TimeoutException();}return p;}
//让当前线程自旋或者休眠等待,直到phase发生改变或者响应中断或者等待超时,返回当前的phase
private int internalAwaitAdvance(int phase, QNode node) {// assert root == this;releaseWaiters(phase-1);          //将前一阶段的等待队列中的线程都唤醒boolean queued = false;           // true when node is enqueuedint lastUnarrived = 0;            // to increase spins upon changeint spins = SPINS_PER_ARRIVAL;long s;int p;//当前phase还是指定值,会一直循环直到phase发生改变,即所有线程都到达了触发advancewhile ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {if (node == null) {           //无期限等待//获取未到达的线程数int unarrived = (int)s & UNARRIVED_MASK;if (unarrived != lastUnarrived &&  //某个线程到达了,更新lastUnarrived,如果剩余未达到的线程数小于NCPU则增加自旋的次数(lastUnarrived = unarrived) < NCPU)spins += SPINS_PER_ARRIVAL;boolean interrupted = Thread.interrupted();if (interrupted || --spins < 0) { //如果被中断了或者自旋结束,则创建一个新QNodenode = new QNode(this, phase, false, false, 0L);node.wasInterrupted = interrupted;}}//node不为null,判断是否需要终止等待,如果线程被中断且需要响应中断或者等待超时则可以通过此分支终止while循环else if (node.isReleasable()) // done or abortedbreak;//node不为null,需要阻塞else if (!queued) {  //该Node未入队AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;//原head作为node的next节点QNode q = node.next = head.get();if ((q == null || q.phase == phase) && //校验链表中节点的phase和当前节点一致(int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq//加入到队列中,如果失败则while循环重试queued = head.compareAndSet(q, node);}else {//node已经加入到队列中try {//会调用node的block方法将当前线程阻塞,直到node的isReleasable方法返回true,线程被唤醒则继续下一个while循环ForkJoinPool.managedBlock(node);} catch (InterruptedException ie) {node.wasInterrupted = true;}}}//phase发生改变,或者phase没变,但是线程被中断了或者等待超时if (node != null) {if (node.thread != null)node.thread = null;       //避免releaseWaiters时被再次调用unparkif (node.wasInterrupted && !node.interruptible) //如果线程被中断且node不需要响应中断,则将当前线程标记为已中断Thread.currentThread().interrupt();//phase未改变,说明是等待超时或者响应中断    if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)return abortWait(phase); //其实现和releaseWaiters类似,都是清理等待链表中跟当前phase不一致的节点,将其对应的线程唤醒}//尝试唤醒等待的线程,如果都唤醒了则直接返回releaseWaiters(phase);return p;}public static void managedBlock(ManagedBlocker blocker)throws InterruptedException {ForkJoinPool p;ForkJoinWorkerThread wt;Thread t = Thread.currentThread();//如果当前线程是ForkJoinWorkerThreadif ((t instanceof ForkJoinWorkerThread) &&(p = (wt = (ForkJoinWorkerThread)t).pool) != null) {WorkQueue w = wt.workQueue;while (!blocker.isReleasable()) {if (p.tryCompensate(w)) {try {do {} while (!blocker.isReleasable() &&!blocker.block());} finally {U.getAndAddLong(p, CTL, AC_UNIT);}break;}}}else {//如果是普通的Thread//不断循环直到可以终止等待do {} while (!blocker.isReleasable() &&!blocker.block());}}//将与当前phase不一致的节点从等待链表中移除
private int abortWait(int phase) {AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;for (;;) {Thread t;QNode q = head.get();int p = (int)(root.state >>> PHASE_SHIFT);//如果q对应的phase跟当前phase一样,则返回pif (q == null || ((t = q.thread) != null && q.phase == p))return p;//如果q对应的phase跟当前phase不一样,将其从等待链表移除,并唤醒对应的线程    if (head.compareAndSet(q, q.next) && t != null) {q.thread = null;LockSupport.unpark(t);}}}

6、forceTermination / isTerminated

//强制终止Phaser
public void forceTermination() {// Only need to change root statefinal Phaser root = this.root;long s;while ((s = root.state) >= 0) {if (UNSAFE.compareAndSwapLong(root, stateOffset,s, s | TERMINATION_BIT)) { //cas修改state,将state的最高位变成1,再获取phase时,phase就小于0了                          //两个队列中的等待线程,被唤醒后发现phase小于0,则会终止releaseWaiters(0); // Waiters on evenQreleaseWaiters(1); // Waiters on oddQreturn;}}}//返回phaser是否已终止public boolean isTerminated() {return root.state < 0L;}

7、getArrivedParties / getUnarrivedParties / getRegisteredParties / getPhase

这四个方法用于获取当前Phaser的状态,getArrivedParties获取已经到达的线程数,getUnarrivedParties获取未到达的线程数,这两个相加应该等于getUnarrivedParties返回的注册的parties,getPhase获取当前的phase。

    //获取当前的phasepublic final int getPhase() {return (int)(root.state >>> PHASE_SHIFT);}//获取注册的partiespublic int getRegisteredParties() {return partiesOf(state);}//获取已到达线程数public int getArrivedParties() {return arrivedOf(reconcileState());}//获取未达到的线程数public int getUnarrivedParties() {return unarrivedOf(reconcileState());}private static int partiesOf(long s) {return (int)s >>> PARTIES_SHIFT;}private static int unarrivedOf(long s) {int counts = (int)s;return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);}private static int arrivedOf(long s) {int counts = (int)s;return (counts == EMPTY) ? 0 :(counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);}

Java8 Phaser 源码解析相关推荐

  1. phaser java_死磕 java同步系列之Phaser源码解析

    问题 (1)Phaser是什么? (2)Phaser具有哪些特性? (3)Phaser相对于CyclicBarrier和CountDownLatch的优势? 简介 Phaser,翻译为阶段,它适用于这 ...

  2. Java8 ArrayBlockingQueue 源码解析

    目录 1.定义 2.构造方法 3.add / offer / put 4.poll / take / peek 5.remove / clear /drainTo 6.iterator / Itr / ...

  3. Java8 ForkJoinPool(一) 源码解析

    目录 一.ForkJoinWorkerThread 1.定义 2.run / getPoolIndex 二.InnocuousForkJoinWorkerThread 三.ForkJoinWorker ...

  4. Java8 ConcurrentLinkedQueue和LinkedTransferQueue 源码解析

    目录 一.ConcurrentLinkedQueue 1.定义 2.构造方法 3.add / offer / addAll 4. peek / poll / remove 5.iterator / I ...

  5. Java8 Semaphore与Exchanger 源码解析

    目录 一.Semaphore 1.使用 2.定义 3.acquire / acquireUninterruptibly / acquireUninterruptibly / tryAcquire 4. ...

  6. Java8 HashMap源码分析

    前言 今天,我们主要来研究一下在Java8中HashMap的数据结构及一些重要方法的具体实现.       研究HashMap的源代码之前,我们首先来研究一下常用的三种数据结构:数组.链表和红黑树. ...

  7. 面试官系统精讲Java源码及大厂真题 - 08 HashMap 源码解析

    08 HashMap 源码解析 自信和希望是青年的特权. --大仲马 引导语 HashMap 源码很长,面试的问题也非常多,但这些面试问题,基本都是从源码中衍生出来的,所以我们只需要弄清楚其底层实现原 ...

  8. 面试官系统精讲Java源码及大厂真题 - 04 Arrays、Collections、Objects 常用方法源码解析

    04 Arrays.Collections.Objects 常用方法源码解析 读一本好书,就是和许多高尚的人谈话. --歌德 引导语 我们在工作中都会写工具类,但如何才能使写出来的工具类更好用,也是有 ...

  9. 和我一起读Java8 LinkedList源码

    书接上一篇ArrayList源码解析,这一节继续分析LinkedList在Java8中的实现,它同样实现了List接口,不过由名字就可以知道,内部实现是基于链表的,而且是双向链表,所以Linked L ...

  10. hashmap删除指定key_Java集合之HashMap源码解析(JDK8)

    哈希表(hash table)也叫散列表,是一种非常重要的数据结构,应用场景非常丰富,许多缓存技术(比如memcached)的核心其实就是在内存中维护一张大的哈希表,而HashMap的实现原理也常常出 ...

最新文章

  1. 多线程(十、AQS原理-ReentrantLock公平锁)
  2. 想和产品大咖一对一沟通吗?
  3. OpenCV Tracker简介
  4. yolov4论文_YOLOv4论文详细解读
  5. oracle行的唯一标识符,Oracle 10g SELECT 语句
  6. Linux下搭建 kafka集群 + zookeeper集群部署 安装、启动、停止
  7. vue 后台返回的文件流进行预览_vue实现下载文件流完整前后端代码
  8. php图片中不显示文字内容,水印效果 只有图片,文字不显示
  9. 四、Linux用户管理
  10. Vue2.x总结(1)
  11. 小说搜索站快速搭建:2.内容页解析
  12. oracle dg巡检,oracle dg状态检查及相关命令
  13. fopen()及相关函数使用
  14. jquery的一点点认识
  15. 视频图像处理芯片排名_关于图像处理芯片(DSP)
  16. android 单手模式开发,单手操作毫无压力 安卓单指缩放技巧
  17. python orange3_Anaconda中安装Orange3脚本-完整版
  18. PHP的性能演进(从PHP5.0到PHP7.1的性能全评测)
  19. ROCKCHIP PWM模块开发指南
  20. Hack The Box - Catch 利用let chat API查询信息,Cachet配置泄露漏洞获取ssh登录密码,apk代码注入漏洞利用获取root权限

热门文章

  1. 三角肌前束(02):哑铃交替前举
  2. hadoop集群搭建详述
  3. 手机如何把图片转Word文档?使用这种方法非常方便
  4. 前端关于Base64编码的一些技术分析
  5. 自信转运--《奇迹男孩》
  6. 通信协议之序列化——TLV详解
  7. Mac Ps cc2017 下载
  8. 一文搞懂MySQL索引(清晰明了)
  9. java搜索页面历史记录,页面缓存的操作(搜索历史记录),页面搜索功能实现...
  10. luogu 2411 白银莲花池 luogu 1606 Lilypad Pond