一、概述

类如其名,抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch...。

本文主要转载了2篇文章,做了一些自己的批注。

原文链接:

http://www.cnblogs.com/waterystone/p/4920797.html

https://www.jianshu.com/p/0da2939391cf

二、框架

它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。这里volatile是核心关键词,具体volatile的语义,在此不述。state的访问方式有三种:

  • getState()
  • setState()
  • compareAndSetState()

AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

三、源码详解

本节开始讲解AQS的源码实现。依照acquire-release、acquireShared-releaseShared的次序来。

3.1 acquire(int)

此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。获取到资源后,线程就可以去执行其临界区代码了。下面是acquire()的源码:

public final void acquire(int arg) {

2     if (!tryAcquire(arg) &&

3         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

4         selfInterrupt();

5 }

函数流程如下:

  • tryAcquire()尝试直接去获取资源,如果成功则直接返回;
  • addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  • acquireQueued()使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false
  • 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

3.1.1 tryAcquire(int)

此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。这也正是tryLock()的语义,还是那句话,当然不仅仅只限于tryLock()。如下是tryAcquire()的源码:

1     protected boolean tryAcquire(int arg) {

2         throw new UnsupportedOperationException();

3     }

什么?直接throw异常?说好的功能呢?好吧,还记得概述里讲的AQS只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现吗?就是这里了!!!AQS这里只定义了一个接口,具体资源的获取交由自定义同步器去实现了(通过state的get/set/CAS)!!!至于能不能重入,能不能加塞,那就看具体的自定义同步器怎么去设计了!!!当然,自定义同步器在进行资源访问时要考虑线程安全的影响。

这里之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。说到底,Doug Lea还是站在咱们开发者的角度,尽量减少不必要的工作量。

3.1.2 addWaiter(Node)

此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。还是上源码吧:

private Node addWaiter(Node mode) {

2     //以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)

3     Node node = new Node(Thread.currentThread(), mode);

4

5     //尝试快速方式直接放到队尾。

6     Node pred = tail;

7     if (pred != null) {

8         node.prev = pred;

9         if (compareAndSetTail(pred, node)) {

10             pred.next = node;

11             return node;

12         }

13     }

14

15     //上一步失败则通过enq入队。

16     enq(node);

17     return node;

18 }

不用再说了,直接看注释吧。这里我们说下Node。Node结点是对每一个访问同步代码的线程的封装,其包含了需要同步的线程本身以及线程的状态,如是否被阻塞,是否等待唤醒,是否已经被取消等。变量waitStatus则表示当前被封装成Node结点的等待状态,共有4种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。

  • CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
  • SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
  • CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列,结点的线程等待在Condition,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  • PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
  • 0状态:值为0,代表初始化状态。

AQS在判断状态时,通过用waitStatus>0表示取消状态,而waitStatus<0表示有效状态。

3.1.2.1 enq(Node)

  此方法用于将node加入队尾直到成功为止。源码如下:

private Node enq(final Node node) {

2     //CAS"自旋",直到成功加入队尾

3     for (;;) {

4         Node t = tail;

5         if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。

6             if (compareAndSetHead(new Node()))

7                 tail = head;

8         } else {//正常流程,放入队尾

9             node.prev = t;

10             if (compareAndSetTail(t, node)) {

11                 t.next = node;

12                 return t;

13             }

14         }

15     }

16 }

如果你看过AtomicInteger.getAndIncrement()函数源码,那么相信你一眼便看出这段代码的精华。CAS自旋volatile变量,是一种很经典的用法。还不太了解的,自己去百度一下吧。

3.1.3 acquireQueued(Node, int)

OK,通过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了。聪明的你立刻应该能想到该线程下一部该干什么了吧:进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了。没错,就是这样!是不是跟医院排队拿号有点相似~~acquireQueued()就是干这件事:在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回。这个函数非常关键,还是上源码吧:

final boolean acquireQueued(final Node node, int arg) {

2     boolean failed = true;//标记是否成功拿到资源

3     try {

4         boolean interrupted = false;//标记等待过程中是否被中断过

5

6         //又是一个自旋

7         for (;;) {

8             final Node p = node.predecessor();//拿到前驱(第一次调用或线程被唤醒时会位于此处!)

9             //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。

10             if (p == head && tryAcquire(arg)) {

11                 setHead(node);//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。

12                 p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!

13                 failed = false;

14                 return interrupted;//返回等待过程中是否被中断过

15             }

16

17             //如果自己可以休息了,就进入waiting状态,直到被unpark()

18             if (shouldParkAfterFailedAcquire(p, node) &&

19                 parkAndCheckInterrupt())

20                 interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true

21         }

22     } finally {

23         if (failed)

24             cancelAcquire(node);

25     }

26 }

到这里了,我们先不急着总结acquireQueued()的函数流程,先看看shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()具体干些什么。

3.1.3.1 shouldParkAfterFailedAcquire(Node, Node)

此方法主要用于检查状态,看看自己是否真的可以去休息了,万一队列前边的线程都放弃了只是瞎站着,那也说不定,对吧!

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

2     int ws = pred.waitStatus;//拿到前驱的状态

3     if (ws == Node.SIGNAL)

4         //如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了

5         return true;

6     if (ws > 0) {

7         /*

8          * 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。

9          * 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!

10          */

11         do {

12             node.prev = pred = pred.prev;

13         } while (pred.waitStatus > 0);

14         pred.next = node;

15     } else {

16          //如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!

17         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

18     }

19     return false;

20 }

整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。

3.1.3.2 parkAndCheckInterrupt()

如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。

private final boolean parkAndCheckInterrupt() {

2     LockSupport.park(this);//调用park()使线程进入waiting状态

3     return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。

4 }

  park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt()。(再说一句,如果线程状态转换不熟,可以参考本人写的Thread详解)。需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。

3.1.3.3 小结

OK,看了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt(),现在让我们再回到acquireQueued(),总结下该函数的具体流程:

  • 结点进入队尾后,检查状态,找到安全休息点;
  • 调用park()进入waiting状态,等待unpark()或interrupt()唤醒自己;
  • 被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1。

3.1.4 小结

OKOK,acquireQueued()分析完之后,我们接下来再回到acquire()!再贴上它的源码吧:

public final void acquire(int arg) {

2     if (!tryAcquire(arg) &&

3         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

4         selfInterrupt();

5 }

再来总结下它的流程吧:

  • 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
  • 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  • acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  • 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

由于此函数是重中之重,我再用流程图总结一下:

至此,acquire()的流程终于算是告一段落了。这也就是ReentrantLock.lock()的流程,不信你去看其lock()源码吧,整个函数就是一条acquire(1)!!!

3.2 release(int)

  上一小节已经把acquire()说完了,这一小节就来讲讲它的反操作release()吧。此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是unlock()的语义,当然不仅仅只限于unlock()。下面是release()的源码:

public final boolean release(int arg) {

2     if (tryRelease(arg)) {

3         Node h = head;//找到头结点

4         if (h != null && h.waitStatus != 0)

5             unparkSuccessor(h);//唤醒等待队列里的下一个线程

6         return true;

7     }

8     return false;

9 }

逻辑并不复杂。它调用tryRelease()来释放资源。有一点需要注意的是,它是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自定义同步器在设计tryRelease()的时候要明确这一点!!

3.2.1 tryRelease(int)

此方法尝试去释放指定量的资源。下面是tryRelease()的源码:

protected boolean tryRelease(int arg) {

2     throw new UnsupportedOperationException();

3 }

跟tryAcquire()一样,这个方法是需要独占模式的自定义同步器去实现的。正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。但要注意它的返回值,上面已经提到了,release()是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false。

3.2.2 unparkSuccessor(Node)

此方法用于唤醒等待队列中下一个线程。下面是源码:

private void unparkSuccessor(Node node) {

2     //这里,node一般为当前线程所在的结点。

3     int ws = node.waitStatus;

4     if (ws < 0)//置零当前线程所在的结点状态,允许失败。

5         compareAndSetWaitStatus(node, ws, 0);

6

7     Node s = node.next;//找到下一个需要唤醒的结点s

8     if (s == null || s.waitStatus > 0) {//如果为空或已取消

9         s = null;

10         for (Node t = tail; t != null && t != node; t = t.prev)

11             if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。

12                 s = t;

13     }

14     if (s != null)

15         LockSupport.unpark(s.thread);//只会唤醒一个线程!

16 }

这个函数并不复杂。一句话概括:unpark()唤醒等待队列中最前边的那个未放弃线程(只唤醒一个线程!),这里我们也用s来表示吧。此时,再和acquireQueued()联系起来,s被唤醒后,进入if (p == head && tryAcquire(arg))的判断(即使p!=head也没关系,它会再进入shouldParkAfterFailedAcquire()寻找一个安全点。这里既然s已经是等待队列中最前边的那个未放弃线程了,那么通过shouldParkAfterFailedAcquire()的调整,s也必然会跑到head的next结点,下一次自旋p==head就成立啦),然后s把自己设置成head标杆结点,表示自己已经获取到资源了,acquire()也返回了!!And then, DO what you WANT!

3.2.3 小结

release()是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。

3.3共享锁 acquireShared(int)

共享锁与独占锁相比,共享锁可能被多个线程共同持有

此方法是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。下面是acquireShared()的源码:

public final void acquireShared(int arg) {

2     if (tryAcquireShared(arg) < 0)

3         doAcquireShared(arg);

4 }

这里tryAcquireShared()依然需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。所以这里acquireShared()的流程就是:

  • tryAcquireShared()尝试获取资源,成功则直接返回;
  • 失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回

3.3.1 doAcquireShared(int)

此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。下面是doAcquireShared()的源码:

private void doAcquireShared(int arg) {

2     final Node node = addWaiter(Node.SHARED);//加入队列尾部

3     boolean failed = true;//是否成功标志

4     try {

5         boolean interrupted = false;//等待过程中是否被中断过的标志

6         for (;;) {

7             final Node p = node.predecessor();//前驱

8             if (p == head) {//如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的

9                 int r = tryAcquireShared(arg);//尝试获取资源

10                 if (r >= 0) {//成功

11                     setHeadAndPropagate(node, r);//将head指向自己,还有剩余资源可以再唤醒之后的线程

12                     p.next = null; // help GC

13                     if (interrupted)//如果等待过程中被打断过,此时将中断补上。

14                         selfInterrupt();

15                     failed = false;

16                     return;

17                 }

18             }

19

20             //判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()

21             if (shouldParkAfterFailedAcquire(p, node) &&

22                 parkAndCheckInterrupt())

23                 interrupted = true;

24         }

25     } finally {

26         if (failed)

27             cancelAcquire(node);

28     }

29 }

有木有觉得跟acquireQueued()很相似?对,其实流程并没有太大区别。只不过这里将补中断的selfInterrupt()放到doAcquireShared()里了,而独占模式是放到acquireQueued()之外,其实都一样,不知道Doug Lea是怎么想的。

跟独占模式比,还有一点需要注意的是,这里只有线程是head.next时(“老二”),才会去尝试获取资源,有剩余的话还会唤醒之后的队友。那么问题就来了,假如老大用完后释放了5个资源,而老二需要6个,老三需要1个,老四需要2个。老大先唤醒老二,老二一看资源不够,他是把资源让给老三呢,还是不让?答案是否定的!老二会继续park()等待其他线程释放资源,也更不会去唤醒老三和老四了。独占模式,同一时刻只有一个线程去执行,这样做未尝不可;但共享模式下,多个线程是可以同时执行的,现在因为老二的资源需求量大,而把后面量小的老三和老四也都卡住了。当然,这并不是问题,只是AQS保证严格按照入队顺序唤醒罢了(保证公平,但降低了并发)。

3.3.1.1 setHeadAndPropagate(Node, int)

private void setHeadAndPropagate(Node node, int propagate) {

2     Node h = head;

3     setHead(node);//head指向自己

4      //如果还有剩余量,继续唤醒下一个邻居线程

5     if (propagate > 0 || h == null || h.waitStatus < 0) {

6         Node s = node.next;

7         if (s == null || s.isShared())

8             doReleaseShared();

9     }

10 }

此方法在setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式!

doReleaseShared()我们留着下一小节的releaseShared()里来讲。

3.3.2 小结

OK,至此,acquireShared()也要告一段落了。让我们再梳理一下它的流程:

  • tryAcquireShared()尝试获取资源,成功则直接返回;
  • 失败则通过doAcquireShared()进入等待队列park(),直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。

其实跟acquire()的流程大同小异,只不过多了个自己拿到资源后,还会去唤醒后继队友的操作(这才是共享嘛)

3.4 releaseShared()

上一小节已经把acquireShared()说完了,这一小节就来讲讲它的反操作releaseShared()吧。此方法是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。下面是releaseShared()的源码:

public final boolean releaseShared(int arg) {

2     if (tryReleaseShared(arg)) {//尝试释放资源

3         doReleaseShared();//唤醒后继结点

4         return true;

5     }

6     return false;

7 }

此方法的流程也比较简单,一句话:释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。例如,资源总量是13,A(5)和B(7)分别获取到资源并发运行,C(4)来时只剩1个资源就需要等待。A在运行过程中释放掉2个资源量,然后tryReleaseShared(2)返回true唤醒C,C一看只有3个仍不够继续等待;随后B又释放2个,tryReleaseShared(2)返回true唤醒C,C一看有5个够自己用了,然后C就可以跟A和B一起运行。而ReentrantReadWriteLock读锁的tryReleaseShared()只有在完全释放掉资源(state=0)才返回true,所以自定义同步器可以根据需要决定tryReleaseShared()的返回值。

3.4.1 doReleaseShared()

此方法主要用于唤醒后继。下面是它的源码:

private void doReleaseShared() {

2     for (;;) {

3         Node h = head;

4         if (h != null && h != tail) {

5             int ws = h.waitStatus;

6             if (ws == Node.SIGNAL) {

7                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

8                     continue;

9                 unparkSuccessor(h);//唤醒后继

10             }

11             else if (ws == 0 &&

12                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

13                 continue;

14         }

15         if (h == head)// head发生变化

16             break;

17     }

18 }

4. Condition条件

Condition是为了实现线程之间相互等待的问题。注意Condition对象只能在独占锁中才能使用。

那么在AQS中Condition条件又是如何实现的呢?

  • 首先内部存在一个Condition队列,存储着所有在此Condition条件等待的线程。
  • await系列方法:让当前持有锁的线程释放锁,并唤醒一个在CLH队列上等待锁的线程,再为当前线程创建一个node节点,插入到Condition队列(注意不是插入到CLH队列中)
  • signal系列方法:其实这里没有唤醒任何线程,而是将Condition队列上的等待节点插入到CLH队列中,所以当持有锁的线程执行完毕释放锁时,就会唤醒CLH队列中的一个线程,这个时候才会唤醒线程。

4.1 await系列方法

4.1.1 await方法

/**

* 让当前持有锁的线程阻塞等待,并释放锁。如果有中断请求,则抛出InterruptedException异常

* @throws InterruptedException

*/

public final void await() throws InterruptedException {

// 如果当前线程中断标志位是true,就抛出InterruptedException异常

if (Thread.interrupted())

throw new InterruptedException();

// 为当前线程创建新的Node节点,并且将这个节点插入到Condition队列中了

Node node = addConditionWaiter();

// 释放当前线程占有的锁,并唤醒CLH队列一个等待线程

int savedState = fullyRelease(node);

int interruptMode = 0;

// 如果节点node不在同步队列中(注意不是Condition队列)

while (!isOnSyncQueue(node)) {

// 阻塞当前线程,那么怎么唤醒这个线程呢?

// 首先我们必须调用signal或者signalAll将这个节点node加入到同步队列。

// 只有这样unparkSuccessor(Node node)方法,才有可能唤醒被阻塞的线程

LockSupport.park(this);

// 如果当前线程产生中断请求,就跳出循环

if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

break;

}

// 如果节点node已经在同步队列中了,获取同步锁,只有得到锁才能继续执行,否则线程继续阻塞等待

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

interruptMode = REINTERRUPT;

// 清除Condition队列中状态不是Node.CONDITION的节点

if (node.nextWaiter != null)

unlinkCancelledWaiters();

// 是否要抛出异常,或者发出中断请求

if (interruptMode != 0)

reportInterruptAfterWait(interruptMode);

}

方法流程:

  • addConditionWaiter方法:为当前线程创建新的Node节点,并且将这个节点插入到Condition队列中了
  • fullyRelease方法:释放当前线程占有的锁,并唤醒CLH队列一个等待线程
  • isOnSyncQueue 方法:如果返回false,表示节点node不在CLH队列中,即没有调用过 signal系列方法,所以调用LockSupport.park(this)方法阻塞当前线程。
  • 如果跳出while循环,表示节点node已经在CLH队列中,那么调用acquireQueued方法去获取锁。
  • 清除Condition队列中状态不是Node.CONDITION的节点

4.1.2 addConditionWaiter方法

为当前线程创建新的Node节点,并且将这个节点插入到Condition队列中了

private Node addConditionWaiter() {

Node t = lastWaiter;

// 如果Condition队列尾节点的状态不是Node.CONDITION

if (t != null && t.waitStatus != Node.CONDITION) {

// 清除Condition队列中,状态不是Node.CONDITION的节点,

// 并且可能会重新设置firstWaiter和lastWaiter

unlinkCancelledWaiters();

// 重新将Condition队列尾赋值给t

t = lastWaiter;

}

// 为当前线程创建一个状态为Node.CONDITION的节点

Node node = new Node(Thread.currentThread(), Node.CONDITION);

// 如果t为null,表示Condition队列为空,将node节点赋值给链表头

if (t == null)

firstWaiter = node;

else

// 将新节点node插入到Condition队列尾

t.nextWaiter = node;

// 将新节点node设置为新的Condition队列尾

lastWaiter = node;

return node;

}

4.1.3 fullyRelease方法

释放当前线程占有的锁,并唤醒CLH队列一个等待线程

/**

* 释放当前线程占有的锁,并唤醒CLH队列一个等待线程

* 如果失败就抛出异常,设置node节点的状态是Node.CANCELLED

* @return

*/

final int fullyRelease(Node node) {

boolean failed = true;

try {

int savedState = getState();

// 释放当前线程占有的锁

if (release(savedState)) {

failed = false;

return savedState;

else {

throw new IllegalMonitorStateException();

}

finally {

if (failed)

node.waitStatus = Node.CANCELLED;

}

}

4.1.4 isOnSyncQueue

节点node是不是在CLH队列中

// 节点node是不是在CLH队列中

final boolean isOnSyncQueue(Node node) {

// 如果node的状态是Node.CONDITION,或者node没有前一个节点prev,

// 那么返回false,节点node不在同步队列中

if (node.waitStatus == Node.CONDITION || node.prev == null)

return false;

// 如果node有下一个节点next,那么它一定在同步队列中

if (node.next != null) // If has successor, it must be on queue

return true;

// 从同步队列中查找节点node

return findNodeFromTail(node);

}

// 在同步队列中从后向前查找节点node,如果找到返回true,否则返回false

private boolean findNodeFromTail(Node node) {

Node t = tail;

for (;;) {

if (t == node)

return true;

if (t == null)

return false;

t = t.prev;

}

}

5. 小结

本节我们详解了独占和共享两种模式下获取-释放资源(acquire-release、acquireShared-releaseShared)的源码,相信大家都有一定认识了。值得注意的是,acquire()和acquireSahred()两种方法下,线程在等待队列中都是忽略中断的。AQS也支持响应中断的,acquireInterruptibly()/acquireSharedInterruptibly()即是,这里相应的源码跟acquire()和acquireSahred()差不多,这里就不再详解了。

Java同步框架AQS原文分析相关推荐

  1. 深入理解Java并发框架AQS系列(四):共享锁(Shared Lock)

    深入理解Java并发框架AQS系列(一):线程 深入理解Java并发框架AQS系列(二):AQS框架简介及锁概念 深入理解Java并发框架AQS系列(三):独占锁(Exclusive Lock) 深入 ...

  2. Java集合类框架源码分析 之 LinkedList源码解析 【4】

    上一篇介绍了ArrayList的源码分析[点击看文章],既然ArrayList都已经做了介绍,那么作为他同胞兄弟的LinkedList,当然必须也配拥有姓名! Talk is cheap,show m ...

  3. java上传ddi_Android平台dalvik模式下java Hook框架ddi的分析(2)--dex文件的注入和调用...

    前面的博客<Android平台dalvik模式下java Hook框架 ddi 的分析(1)>中,已经分析了dalvik模式下 ddi 框架Hook java方法的原理和流程,这里来学习一 ...

  4. Android平台dalvik模式下java Hook框架ddi的分析(2)--dex文件的注入和调用

    本文博客地址:http://blog.csdn.net/qq1084283172/article/details/77942585 前面的博客<Android平台dalvik模式下java Ho ...

  5. Android平台dalvik模式下java Hook框架ddi的分析(1)

    本文博客地址:http://blog.csdn.net/qq1084283172/article/details/75710411 一.前 言 在前面的博客中已经学习了作者crmulliner编写的, ...

  6. Java并发框架——AQS之怎样使用AQS构建同步器

    AQS的设计思想是通过继承的方式提供一个模板让大家能够非常easy依据不同场景实现一个富有个性化的同步器.同步器的核心是要管理一个共享状态,通过对状态的控制即能够实现不同的锁机制. AQS的设计必须考 ...

  7. Java并发编程—AQS原理分析

    目录 一.AQS原理简述 二.自定义独占锁及共享锁 三.锁的可重入性 四.锁的公平性 五.惊群效应 AQS全称AbstractQueuedSynchronizer,它是实现 JCU包中几乎所有的锁.多 ...

  8. java.util.concurrent同步框架(AQS论文中文翻译)

    java.util.concurrent同步框架 摘要 目录和主题描述 一般条款 关键字 1.介绍: 需求 设计实现 4.使用方式 5.性能 6.结论 7. 致谢 Doug Lea SUNY Oswe ...

  9. Java并发之AQS详解(文章里包含了两片文章结合着看后边文章不清楚,请看原文)

          AQS全称抽象队列同步器(AbstractQuenedSynchronizer),它是一个可以用来实现线程同步的基础框架.当然,它不是我们理解的Spring这种框架,它是一个类,类名就是A ...

最新文章

  1. python学习手册中文版免费下载-Python学习手册
  2. APScheduler 浅析
  3. 树莓派GPIO口的使用(外设相关开发WringPi库的使用,超声波、继电器)
  4. jhsdb:JDK 9的新工具
  5. 微信公众平台应用开发实战
  6. antlr 可以用java写吗_java – 我们可以用ANTLR定义一个非上下文语法吗?
  7. 你要练神功,就应该先自宫,对不?
  8. 19年北理考研计算机复试分数多少钱,2019年北京理工大学考研复试分数线已出现...
  9. GitHub 40000星!收下这份宇宙最强「程序员装备指南」
  10. 嵌入式和单片机的区别到底在哪?
  11. matlab max函数 最大值好几个,matlab中的最大值和最小值
  12. oracle中字符串长度计算,根据 oracle 标准计算超长字符串的长度
  13. Windows搭建cloudever对接OneDrive教程(新版)
  14. 并发——锁升级(偏向锁,轻量级锁,重量级锁,及常见锁)
  15. 电视root工具_TapTap | 无需Root,成功移植 IOS14,拿下!!!
  16. 使用组件不渲染 Unknown custom element: <xxx> - did you register the component correctly? For recursiv
  17. 最新计算机学术研讨会,TC预告|CCF 2020第十届全国文字与计算学术研讨会
  18. Vue源码解读一:Vue数据响应式原理
  19. 使用top做sql分页
  20. 专项测试之Web测试

热门文章

  1. dpvs中fdir与sa_pool介绍
  2. anaconda如何查看库,安装库和更新库
  3. 数字孪生实战案例——智慧寺院解决方案
  4. Calendar获取本周一与周日
  5. 分享几个免费的专利检索网站
  6. SQL Server聚集索引和非聚集索引
  7. 百度地图的常用事件和方法
  8. 2020年计算机最新硬件,2020年3月份,主流电脑配置分享,盘点当下极具性价比的主机硬件...
  9. 【无标题】单例模式的两种创建方式:饿汉式和懒汉式
  10. Android入门之APP启动流程