AQS是AbstractQueuedSynchronizer的缩写,AQS是Java并包里大部分同步器的基础构件,利用AQS可以很方便的创建锁和同步器。它封装了一个状态,提供了一系列的获取和释放操作,这些获取和释放操作都是基于状态的。它的基本思想是由AQS负责管理同步器类中的状态,其他的同步器比如可重入锁ReentrantLock, 信号量Semaphore基于各自的特点来调用AQS提供了基础能力进行状态的同步。

在AQS的Javadoc里面提到它是CLHLock的变种,在聊聊高并发(八)实现几种自旋锁(三) 这篇文章中我们说了如何利用CLH锁来构件自旋锁,回顾一下CLHLock的一些基本特点:

1. CLHLock是一种队列自旋锁的实现,提供了FIFO先来先服务的公平性

2. 利用一个原子变量AtomicReference tail的CAS操作来构件一个虚拟的链式结构

3. 节点Node维护一个volatile状态,维护一个prev指针指向前一个节点,获取锁时每个线程在prev节点的状态上自旋

4. 当线程释放锁时,只需要修改自身状态即可,后续节点会观察到volatile状态的改动而获取锁

AQS既然是CLHLock的一种变种,那么

1. 也维护以了一个基本的队列结构

2. 也是提供了一个Tail指针从队尾通过CAS操作入队列。

3. 提供了一个volatile类型的int值来维护状态

 
  1. public abstract class AbstractQueuedSynchronizer

  2. extends AbstractOwnableSynchronizer

  3. implements java.io.Serializable {

  4.  private transient volatile Node head;

  5.     private transient volatile Node tail;

  6.     private volatile int state;

  7.     protected final int getState() {

  8.         return state;

  9.     }

  10.   

  11.     protected final void setState(int newState) {

  12.         state = newState;

  13.     }

  14.     protected final boolean compareAndSetState(int expect, int update) {

  15.         // See below for intrinsics setup to support this

  16.         return unsafe.compareAndSwapInt(this, stateOffset, expect, update);

  17.     }

  18. ..................

  19. }

与标准CLHLock实现不同的是,AQS不是一个自旋锁,它提供了更加丰富的语意:

1. 提供了独享(exclusive)方式和共享(share)方式来获取/释放,比如锁是独占方式的,信号量semaphore是共享方式的,可以有多个线程进入临界区

2. 支持可中断和不可中断的获取/释放

3. 支持普通的和具有时间限制的获取/释放

4. 提供了自旋和阻塞的切换,可以先自旋,如果等待时间长,可以阻塞

 
  1. /**

  2. * The number of nanoseconds for which it is faster to spin

  3. * rather than to use timed park. A rough estimate suffices

  4. * to improve responsiveness with very short timeouts.

  5. */

  6. static final long spinForTimeoutThreshold = 1000L;

AQS定义了两个内部类来辅助它的实现,一个是Node定义了队列中的节点,另一个是ConditionObject,是Condition接口的实现类,负责管理条件队列。关于条件队列更多内容可以看这篇 聊聊高并发(十四)理解Java中的管程,条件队列,Condition以及实现一个阻塞队列

先看下Node类,它比CLHLock中的Node有更多属性,除了完成基本的队列功能,还维护了是独享还是共享的模式信息

1. 维护了一个Node SHARED引用表示共享模式

2. 维护了一个Node EXCLUSIVE引用表示独占模式

3. 维护了几种节点等待的状态 waitStatus, 其中CANCELLED = 1是正数,表示取消状态,SIGNAL = -1,CONDITION = -2, PROPAGATE = -3都是负数,表示节点在条件队列的某个状态,SIGNAL表示后续节点需要被唤醒

4. 维护了Node prev引用,指向队列中的前一个节点,通过Tail的CAS操作来创建

5. 维护了Node next引用,指向队列中的下一个节点,也是在通过Tail入队列的时候设置的,这样就维护了一个双向队列

6. 维护了一个volatile的Thread引用,把一个节点关联到一个线程

7. 维护了Node nextWaiter引用,指向在条件队列中的下一个正在等待的节点,是给条件队列使用的。值得注意的是条件队列只有在独享状态下才使用

 
  1. static final class Node {

  2. /** Marker to indicate a node is waiting in shared mode */

  3. static final Node SHARED = new Node();

  4. /** Marker to indicate a node is waiting in exclusive mode */

  5. static final Node EXCLUSIVE = null;

  6. /** waitStatus value to indicate thread has cancelled */

  7. static final int CANCELLED = 1;

  8. /** waitStatus value to indicate successor's thread needs unparking */

  9. static final int SIGNAL = -1;

  10. /** waitStatus value to indicate thread is waiting on condition */

  11. static final int CONDITION = -2;

  12. /**

  13. * waitStatus value to indicate the next acquireShared should

  14. * unconditionally propagate

  15. */

  16. static final int PROPAGATE = -3;

  17. volatile int waitStatus;

  18. volatile Node prev;

  19. volatile Node next;

  20. volatile Thread thread;

  21. Node nextWaiter;

  22. final boolean isShared() {

  23. return nextWaiter == SHARED;

  24. }

  25. final Node predecessor() throws NullPointerException {

  26. Node p = prev;

  27. if (p == null)

  28. throw new NullPointerException();

  29. else

  30. return p;

  31. }

  32. Node() { // Used to establish initial head or SHARED marker

  33. }

  34. Node(Thread thread, Node mode) { // Used by addWaiter

  35. this.nextWaiter = mode;

  36. this.thread = thread;

  37. }

  38. Node(Thread thread, int waitStatus) { // Used by Condition

  39. this.waitStatus = waitStatus;

  40. this.thread = thread;

  41. }

  42. }

再看一下ConditionObject,它是条件Condition接口的具体实现,维护了一个条件队列,条件队列是通过Node来构件的一个单向链表结构。底层的条件操作(等待和唤醒)使用LockSupport类来实现,在这篇中我们说了LockSupport底层使用sun.misc.Unsafe来提供条件队列的park和unpark操作。聊聊高并发(十七)解析java.util.concurrent各个组件(一) 了解sun.misc.Unsafe类

1. 维护了一个Node firstWaiter引用指向条件队列的队首节点

2. 维护了一个Node lastWaiter引用指向条件队列的队尾节点

3. 条件队列支持节点的取消退出机制,CANCELLED节点来表示这种取消状态

4. 支持限时等待机制

5. 支持可中断和不可中断的等待

我们来看几个典型的条件队列的操作实现

往条件队列里面加入一个等待节点,这个是await()方法的基本操作

1. 判断尾节点的状态是不是等待某个条件的状态(CONDITION),如果不是,就把CANCELLED节点从队列中踢出,然后把自己标记为尾节点

 
  1. public class ConditionObject implements Condition, java.io.Serializable {

  2. /** First node of condition queue. */

  3. private transient Node firstWaiter;

  4. /** Last node of condition queue. */

  5. private transient Node lastWaiter;

  6. /**

  7.          * Adds a new waiter to wait queue.

  8.          * @return its new wait node

  9.          */

  10.         private Node addConditionWaiter() {

  11.             Node t = lastWaiter;

  12.             // If lastWaiter is cancelled, clean out.

  13.             if (t != null && t.waitStatus != Node.CONDITION) {

  14.                 unlinkCancelledWaiters();

  15.                 t = lastWaiter;

  16.             }

  17.             Node node = new Node(Thread.currentThread(), Node.CONDITION);

  18.             if (t == null)

  19.                 firstWaiter = node;

  20.             else

  21.                 t.nextWaiter = node;

  22.             lastWaiter = node;

  23.             return node;

  24.         }

  25. private void unlinkCancelledWaiters() {

  26.             Node t = firstWaiter;

  27.             Node trail = null;

  28.             while (t != null) {

  29.                 Node next = t.nextWaiter;

  30.                 if (t.waitStatus != Node.CONDITION) {

  31.                     t.nextWaiter = null;

  32.                     if (trail == null)

  33.                         firstWaiter = next;

  34.                     else

  35.                         trail.nextWaiter = next;

  36.                     if (next == null)

  37.                         lastWaiter = trail;

  38.                 }

  39.                 else

  40.                     trail = t;

  41.                 t = next;

  42.             }

  43.         }

  44. .................

  45. }

从条件队列中唤醒一个节点,实际上doSignal只是把一个节点从条件队列中移除,然后加入到同步队列,并设置它在同步队列的前置节点的waitStatus = SIGNAL, 如果设置失败或者取消在条件队列等待,直接把这个节点的线程unpark唤醒,需要注意的是unpark操作只是把线程从等待状态转化为可运行状态,并不直接获得锁。

 
  1.  public final void signal() {

  2.             if (!isHeldExclusively())

  3.                 throw new IllegalMonitorStateException();

  4.             Node first = firstWaiter;

  5.             if (first != null)

  6.                 doSignal(first);

  7.         }

  8. /**

  9. * Removes and transfers nodes until hit non-cancelled one or

  10. * null. Split out from signal in part to encourage compilers

  11. * to inline the case of no waiters.

  12. * @param first (non-null) the first node on condition queue

  13. */

  14. private void doSignal(Node first) {

  15. do {

  16. if ( (firstWaiter = first.nextWaiter) == null)

  17. lastWaiter = null;

  18. first.nextWaiter = null;

  19. } while (!transferForSignal(first) &&

  20. (first = firstWaiter) != null);

  21. }

  22.    final boolean transferForSignal(Node node) {

  23.         /*

  24.          * If cannot change waitStatus, the node has been cancelled.

  25.          */

  26.         if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))

  27.             return false;

  28.         /*

  29.          * Splice onto queue and try to set waitStatus of predecessor to

  30.          * indicate that thread is (probably) waiting. If cancelled or

  31.          * attempt to set waitStatus fails, wake up to resync (in which

  32.          * case the waitStatus can be transiently and harmlessly wrong).

  33.          */

  34.         Node p = enq(node);

  35.         int ws = p.waitStatus;

  36.         if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))

  37.             LockSupport.unpark(node.thread);

  38.         return true;

  39.     }

Java线程的几种状态如下

支持中断的等待操作, 主要做了两个事情:新建一个Node进入条件队列等待被唤醒;从同步队列中移除并释放锁。它会相应线程的中断抛出中断异常,并且记录中断状态

 
  1. public final void await() throws InterruptedException {

  2. if (Thread.interrupted())

  3. throw new InterruptedException();

  4. Node node = addConditionWaiter();

  5. int savedState = fullyRelease(node);

  6. int interruptMode = 0;

  7. while (!isOnSyncQueue(node)) {

  8. LockSupport.park(this);

  9. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

  10. break;

  11. }

  12. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

  13. interruptMode = REINTERRUPT;

  14. if (node.nextWaiter != null) // clean up if cancelled

  15. unlinkCancelledWaiters();

  16. if (interruptMode != 0)

  17. reportInterruptAfterWait(interruptMode);

  18. }

不可中断的等待,也是先进入条件队列等待,并从同步队列出队列,释放锁。但是它不相应线程中断状态

 
  1. public final void awaitUninterruptibly() {

  2. Node node = addConditionWaiter();

  3. int savedState = fullyRelease(node);

  4. boolean interrupted = false;

  5. while (!isOnSyncQueue(node)) {

  6. LockSupport.park(this);

  7. if (Thread.interrupted())

  8. interrupted = true;

  9. }

  10. if (acquireQueued(node, savedState) || interrupted)

  11. selfInterrupt();

  12. }

限时等待,也是先进入条件队列等待,然后释放锁。轮询等待时间,当超时后再次进入同步队列,等待获得锁。如果获得了锁,就返回false. 如果在等待时被唤醒,就进入同步队列,等待获得锁,如果获得锁就返回true

 
  1. public final boolean await(long time, TimeUnit unit)

  2. throws InterruptedException {

  3. if (unit == null)

  4. throw new NullPointerException();

  5. long nanosTimeout = unit.toNanos(time);

  6. if (Thread.interrupted())

  7. throw new InterruptedException();

  8. Node node = addConditionWaiter();

  9. int savedState = fullyRelease(node);

  10. long lastTime = System.nanoTime();

  11. boolean timedout = false;

  12. int interruptMode = 0;

  13. while (!isOnSyncQueue(node)) {

  14. if (nanosTimeout <= 0L) {

  15. timedout = transferAfterCancelledWait(node);

  16. break;

  17. }

  18. if (nanosTimeout >= spinForTimeoutThreshold)

  19. LockSupport.parkNanos(this, nanosTimeout);

  20. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

  21. break;

  22. long now = System.nanoTime();

  23. nanosTimeout -= now - lastTime;

  24. lastTime = now;

  25. }

  26. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

  27. interruptMode = REINTERRUPT;

  28. if (node.nextWaiter != null)

  29. unlinkCancelledWaiters();

  30. if (interruptMode != 0)

  31. reportInterruptAfterWait(interruptMode);

  32. return !timedout;

  33. }

AQS使用了Unsafe直接操作内存来对字段进行CAS操作和设置值。

 
  1. private static final Unsafe unsafe = Unsafe.getUnsafe();

  2. private static final long stateOffset;

  3. private static final long headOffset;

  4. private static final long tailOffset;

  5. private static final long waitStatusOffset;

  6. private static final long nextOffset;

  7. static {

  8. try {

  9. stateOffset = unsafe.objectFieldOffset

  10. (AbstractQueuedSynchronizer.class.getDeclaredField("state"));

  11. headOffset = unsafe.objectFieldOffset

  12. (AbstractQueuedSynchronizer.class.getDeclaredField("head"));

  13. tailOffset = unsafe.objectFieldOffset

  14. (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));

  15. waitStatusOffset = unsafe.objectFieldOffset

  16. (Node.class.getDeclaredField("waitStatus"));

  17. nextOffset = unsafe.objectFieldOffset

  18. (Node.class.getDeclaredField("next"));

  19. } catch (Exception ex) { throw new Error(ex); }

  20. }

聊聊高并发(二十一)解析java.util.concurrent各个组件(三) 深入理解AQS(一)相关推荐

  1. 聊聊高并发(二十二)解析java.util.concurrent各个组件(四) 深入理解AQS(二)

    上一篇介绍了AQS的基本设计思路以及两个内部类Node和ConditionObject的实现 聊聊高并发(二十一)解析java.util.concurrent各个组件(三) 深入理解AQS(一) 这篇 ...

  2. 聊聊高并发(二十九)解析java.util.concurrent各个组件(十一) 再看看ReentrantReadWriteLock可重入读-写锁

    上一篇聊聊高并发(二十八)解析java.util.concurrent各个组件(十) 理解ReentrantReadWriteLock可重入读-写锁 讲了可重入读写锁的基本情况和主要的方法,显示了如何 ...

  3. 聊聊高并发(二十九)解析java.util.concurrent各个组件(十一) 再看看ReentrantReadWriteLock可重入读-写锁...

    上一篇聊聊高并发(二十八)解析java.util.concurrent各个组件(十) 理解ReentrantReadWriteLock可重入读-写锁 讲了可重入读写锁的基本情况和基本的方法,显示了怎样 ...

  4. 聊聊高并发(二十五)解析java.util.concurrent各个组件(七) 理解Semaphore

    前几篇分析了一下AQS的原理和实现,这篇拿Semaphore信号量做例子看看AQS实际是如何使用的. Semaphore表示了一种可以同时有多个线程进入临界区的同步器,它维护了一个状态表示可用的票据, ...

  5. 聊聊高并发(二十)解析java.util.concurrent各个组件(二) 12个原子变量相关类

    这篇说说java.util.concurrent.atomic包里的类,总共12个,网上有很多文章解析这几个类,这里挑些重点说说. 这12个类可以分为三组: 1. 普通类型的原子变量 2. 数组类型的 ...

  6. 聊聊高并发(十七)解析java.util.concurrent各个组件(一) 了解sun.misc.Unsafe类

    了解了并发编程中锁的基本原理之后,接下来看看Java是如何利用这些原理来实现各种锁,原子变量,同步组件的.在开始分析java.util.concurrent的源代码直接,首先要了解的就是sun.mis ...

  7. 聊聊高并发(二十八)解析java.util.concurrent各个组件(十) 理解ReentrantReadWriteLock可重入读-写锁

    这篇讲讲ReentrantReadWriteLock可重入读写锁,它不仅是读写锁的实现,并且支持可重入性. 聊聊高并发(十五)实现一个简单的读-写锁(共享-排他锁) 这篇讲了如何模拟一个读写锁. 可重 ...

  8. 聊聊高并发(二十七)解析java.util.concurrent各个组件(九) 理解ReentrantLock可重入锁

    这篇讲讲ReentrantLock可重入锁,JUC里提供的可重入锁是基于AQS实现的阻塞式可重入锁.这篇 聊聊高并发(十六)实现一个简单的可重入锁 模拟了可重入锁的实现.可重入锁的特点是: 1. 是互 ...

  9. 聊聊高并发(三十一)解析java.util.concurrent各个组件(十三) 理解Exchanger交换器

    这篇讲讲Exchanger交互器,它是一种比较特殊的两方(Two-Party)栅栏,可以理解成Exchanger是一个栅栏,两边一方是生产者,一方是消费者, 1. 生产者和消费者各自维护了一个容器,生 ...

最新文章

  1. 醉没醉,带上智能手机走两步就知道
  2. 深浅拷贝、函数、内置函数、文件处理、三元运算、递归
  3. warning C4251 needs to have dll-interface解决办法
  4. 全卷积网络(FCN)与图像分割
  5. 【工具使用系列】关于 MATLAB 电路与系统分析,你需要知道的事
  6. java源程序加密解决方案(基于Classloader解密)
  7. 在FFMPEG中使用libRTMP的经验
  8. 计算机java二级_关于Java计算机二级考试内容。
  9. Win10 输入法工具栏抽风,无法调整水平垂直。
  10. Android7.0的xposed框架,Android 7.x 安装Xposed框架
  11. FTP 连接超时解决办法
  12. 《烈烈先秦》7、大秦的克星——侠将公子信陵君
  13. mysql 将中文转换成拼音_mysql 如何将中文转拼音
  14. 树莓派3B和3B+的串口使用(附图)
  15. 陈省身文集51——闭黎曼流形高斯-博内公式的一个简单的内蕴证明
  16. linux 远程扫描仪,技术|如何在ubuntu桌面配置一个网络打印机和扫描仪
  17. 综合架构web服务之nginx详解
  18. Java打包后运行jar包报错Caused by: org.springframework.beans.factory.BeanCreationException: Error creating be
  19. 工作流(三)_什么是工作流管理系统
  20. 使用Xshell部署网页

热门文章

  1. excel 电阻并联计算_电阻器的构成及取代原则
  2. python连接kafka-python连接kafka生产者,消费者脚本
  3. C语言 计算结构体大小
  4. FatFs源码剖析(1)
  5. C/C++内存分配与Linux内存管理进程所涉及到的五个数据段 .
  6. java下拉菜单_薪资对比:Java开发和web前端薪资哪个好
  7. 类的继承和派生java_类的继承和派生
  8. java 矩阵题目_一些数学分析不错的题目
  9. java 状态机_基于 RAFT 一致性算法的 Java 实现 SOFAJRaft
  10. 气温常年在25度的地方_最低调的海滨城市,物价便宜,常年25度,沙滩细白,不输三亚!...