AQS 全称是 AbstractQueuedSynchronizer,顾名思义,是一个用来构建锁和同步器的框架,它底层用了 CAS 技术来保证操作的原子性,同时利用 FIFO 队列实现线程间的锁竞争,将基础的同步相关抽象细节放在 AQS,这也是 ReentrantLock、CountDownLatch 等同步工具实现同步的底层实现机制。它能够成为实现大部分同步需求的基础,也是 J.U.C 并发包同步的核心基础组件。

AQS 结构剖析

AQS 就是建立在 CAS 的基础之上,增加了大量的实现细节,例如获取同步状态、FIFO 同步队列,独占式锁和共享式锁的获取和释放等等,这些都是 AQS 类对于同步操作抽离出来的一些通用方法,这么做也是为了对实现的一个同步类屏蔽了大量的细节,大大降低了实现同步工具的工作量,这也是为什么 AQS 是其它许多同步类的基类的原因。

现在我们来直接定位到类 java.util.concurrent.locks.AbstractQueuedSynchronizer,下面是 AQS 类的几个重要字段与方法列出来:


public abstract class AbstractQueuedSynchronizerextends AbstractOwnableSynchronizerimplements java.io.Serializable {private transient volatile Node head;private transient volatile Node tail;private volatile int state;protected final boolean compareAndSetState(int expect, int update) {// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update);}// ...}

1.head 字段为等待队列的头节点,表示当前正在执行的节点;2.tail 字段为等待队列的尾节点;3.state 字段为同步状态,其中 state > 0 为有锁状态,每次加锁就在原有 state 基础上加 1,即代表当前持有锁的线程加了 state 次锁,反之解锁时每次减一,当 statte = 0 为无锁状态;4.通过 compareAndSetState 方法操作 CAS 更改 state 状态,保证 state 的原子性。

有没有发现,这几个字段都用 volatile 关键字进行修饰,以确保多线程间保证字段的可见性。

AQS 提供了两种锁,分别是独占锁和共享锁,独占锁指的是操作被认作一种独占操作,比如 ReentrantLock,它实现了独占锁的方法,而共享锁则指的是一个非独占操作,比如一些同步工具 CountDownLatch 和 Semaphore 等同步工具,下面是 AQS 对这两种锁提供的抽象方法。

独占锁:

// 获取锁方法protected boolean tryAcquire(int arg) {  throw new UnsupportedOperationException();}// 释放锁方法protected boolean tryRelease(int arg) {  throw new UnsupportedOperationException();}

共享锁:

// 获取锁方法protected int tryAcquireShared(int arg) {  throw new UnsupportedOperationException();}// 释放锁方法protected boolean tryReleaseShared(int arg) {  throw new UnsupportedOperationException();}

在我们平时开发中,基本不用直接使用 AQS,我们平时都是直接使用 JDK 自带的同步类工具,如 ReentrantLock、CountDownLatch 和 Semaphore 等,它们已经可以满足绝大部分的需求了,后面会抽几篇文章单独讲一下这些同步类工具是如何使用 AQS 的,这对于我们如何构建自定义的同步工具,有很大的帮助。

下面是同步队列节点的结构:

用大神的注释来形象地描述一下队列的模型:

/**  * <pre>  *      +------+  prev +-----+       +-----+  * head |      | <---- |     | <---- |     |  tail  *      +------+       +-----+       +-----+  * </pre>  */

这是一个普通双向链表的节点结构,多了 thread 字段用于存储当前线程对象,同时每个节点都有一个 waitStatus 等待状态,一共有四种状态:

1.CANCELLED(1):取消状态,如果当前线程的前置节点状态为 CANCELLED,则表明前置节点已经等待超时或者已经被中断了,这时需要将其从等待队列中删除。2.SIGNAL(-1):等待触发状态,如果当前线程的前置节点状态为 SIGNAL,则表明当前线程需要阻塞。3.CONDITION(-2):等待条件状态,表示当前节点在等待 condition,即在 condition 队列中。4.PROPAGATE(-3):状态需要向后传播,表示 releaseShared 需要被传播给后续节点,仅在共享锁模式下使用。

可以这么理解:head 节点可以表示成当前持有锁的线程的节点,其余线程竞争锁失败后,会加入到队尾,tail 始终指向队列的最后一个节点。

AQS 的结构大概可总结为以下 3 部分:

1.用 volatile 修饰的整数类型的 state 状态,用于表示同步状态,提供 getState 和 setState 来操作同步状态;2.提供了一个 FIFO 等待队列,实现线程间的竞争和等待,这是 AQS 的核心;3.AQS 内部提供了各种基于 CAS 原子操作方法,如 compareAndSetState 方法,并且提供了锁操作的acquire和release方法。

独占锁

独占锁的原理是如果有线程获取到锁,那么其它线程只能是获取锁失败,然后进入等待队列中等待被唤醒。

获取锁

获取独占锁方法:

public final void acquire(int arg) {  if (!tryAcquire(arg) &&      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))    selfInterrupt();}

源码解读:

1.通过 tryAcquire(arg) 方法尝试获取锁,这个方法需要实现类自己实现获取锁的逻辑,获取锁成功后则不执行后面加入等待队列的逻辑了;2.如果尝试获取锁失败后,则执行 addWaiter(Node.EXCLUSIVE) 方法将当前线程封装成一个 Node 节点对象,并加入队列尾部;3.把当前线程执行封装成 Node 节点后,继续执行 acquireQueued 的逻辑,该逻辑主要是判断当前节点的前置节点是否是头节点,来尝试获取锁,如果获取锁成功,则当前节点就会成为新的头节点,这也是获取锁的核心逻辑。

基于上面源码的步骤分析后,我们一步步往下看源码具体实现:

private Node addWaiter(Node mode) {  // 创建一个基于当前线程的节点,该节点是 Node.EXCLUSIVE 独占式类型  Node node = new Node(Thread.currentThread(), mode);  // Try the fast path of enq; backup to full enq on failure  Node pred = tail;  // 这里先判断队尾是否为空,如果不为空则直接将节点加入队尾  if (pred != null) {    node.prev = pred;    // 采取 CAS 操作,将当前节点设置为队尾节点,由于采用了 CAS 原子操作,无论并发怎么修改,都有且只有一条线程可以修改成功,其余都将执行后面的enq方法    if (compareAndSetTail(pred, node)) {      pred.next = node;      return node;    }  }  enq(node);  return node;}

简单来说 addWaiter(Node mode) 方法做了以下事情:

1.创建基于当前线程的独占式类型的节点;2.利用 CAS 原子操作,将节点加入队尾。

我们继续看 enq(Node node) 方法:

private Node enq(final Node node) {  // 自旋操作  for (;;) {    Node t = tail;    // 如果队尾节点为空,那么进行CAS操作初始化队列    if (t == null) {      // 这里很关键,即如果队列为空,那么此时必须初始化队列,初始化一个空的节点表示队列头,用于表示当前正在执行的节点,头节点即表示当前正在运行的节点      if (compareAndSetHead(new Node()))        tail = head;    } else {      node.prev = t;      // 这一步也是采取CAS操作,将当前节点加入队尾,如果失败的话,自旋继续修改直到成功为止      if (compareAndSetTail(t, node)) {        t.next = node;        return t;      }    }  }}

enq(final Node node) 方法主要做了以下事情:

1.采用自旋机制,这是 aqs 里面很重要的一个机制;2.如果队尾节点为空,则初始化队列,将头节点设置为空节点,头节点即表示当前正在运行的节点;3.如果队尾节点不为空,则继续采取 CAS 操作,将当前节点加入队尾,不成功则继续自旋,直到成功为止;

对比了上面两段代码,不难看出,首先是判断队尾是否为空,先进行一次 CAS 入队操作,如果失败则进入 enq(final Node node) 方法执行完整的入队操作。

完整的入队操作简单来说就是:如果队列为空,初始化队列,并将头节点设为空节点,表示当前正在运行的节点,然后再将当前线程的节点加入到队列尾部。

关于队列的初始化与入队,务必理解透彻。

经过上面 CAS 不断尝试,这时当前节点已经成功加入到队尾了,接下来就到了acquireQueued 的逻辑,我们继续往下看源码:

final boolean acquireQueued(final Node node, int arg) {  boolean failed = true;  try {    // 线程中断标记字段    boolean interrupted = false;    for (;;) {      // 获取当前节点的 pred 节点      final Node p = node.predecessor();      // 如果 pred 节点为 head 节点,那么再次尝试获取锁      if (p == head && tryAcquire(arg)) {        // 获取锁之后,那么当前节点也就成为了 head 节点        setHead(node);        p.next = null; // help GC        failed = false;        // 不需要挂起,返回 false        return interrupted;      }      // 获取锁失败,则进入挂起逻辑      if (shouldParkAfterFailedAcquire(p, node) &&          parkAndCheckInterrupt())        interrupted = true;    }  } finally {    if (failed)      cancelAcquire(node);  }}

这一步 acquireQueued(final Node node, int arg) 方法主要做了以下事情:

1.判断当前节点的 pred 节点是否为 head 节点,如果是,则尝试获取锁;2.获取锁失败后,进入挂起逻辑。

提醒一点:我们上面也说过,head 节点代表当前持有锁的线程,那么如果当前节点的 pred 节点是 head 节点,很可能此时 head 节点已经释放锁了,所以此时需要再次尝试获取锁。

接下来继续看挂起逻辑源码:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {  int ws = pred.waitStatus;  if (ws == Node.SIGNAL)    // 如果 pred 节点为 SIGNAL 状态,返回true,说明当前节点需要挂起    return true;  // 如果ws > 0,说明节点状态为CANCELLED,需要从队列中删除  if (ws > 0) {    do {      node.prev = pred = pred.prev;    } while (pred.waitStatus > 0);    pred.next = node;  } else {    // 如果是其它状态,则操作CAS统一改成SIGNAL状态    // 由于这里waitStatus的值只能是0或者PROPAGATE,所以我们将节点设置为SIGNAL,从新循环一次判断    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  }  return false;}

这一步 shouldParkAfterFailedAcquire(Node pred, Node node) 方法主要做了以下事情:

1.判断 pred 节点状态,如果为 SIGNAL 状态,则直接返回 true 执行挂起;2.删除状态为 CANCELLED 的节点;3.若 pred 节点状态为 0 或者 PROPAGATE,则将其设置为为 SIGNAL,再从 acquireQueued 方法自旋操作从新循环一次判断。

通俗来说就是:根据 pred 节点状态来判断当前节点是否可以挂起,如果该方法返回 false,那么挂起条件还没准备好,就会重新进入 acquireQueued(final Node node, int arg) 的自旋体,重新进行判断。如果返回 true,那就说明当前线程可以进行挂起操作了,那么就会继续执行挂起。

这里需要注意的时候,节点的初始值为 0,因此如果获取锁失败,会尝试将节点设置为 SIGNAL。

继续看挂起逻辑:

private final boolean parkAndCheckInterrupt() {  LockSupport.park(this);  return Thread.interrupted();}

LockSupport 是用来创建锁和其他同步类的基本线程阻塞原语。LockSupport 提供 park() 和 unpark() 方法实现阻塞线程和解除线程阻塞。release 释放锁方法逻辑会调用 LockSupport.unPark 方法来唤醒后继节点。

获取独占锁流程图:

释放锁

释放锁方法:

public final boolean release(int arg) {  if (tryRelease(arg)) {    Node h = head;    if (h != null && h.waitStatus != 0)      unparkSuccessor(h);    return true;  }  return false;}

释放锁的方法源码就很好理解,通过 tryRelease(arg) 方法尝试释放锁,这个方法需要实现类自己实现释放锁的逻辑,释放锁成功后则执行后面的唤醒后续节点的逻辑了,然后判断 head 节点不为空并且 head 节点状态不为 0,因为 addWaiter 方法默认的节点状态为 0,此时节点还没有进入就绪状态。

继续往下看源码:

private void unparkSuccessor(Node node) {  int ws = node.waitStatus;  if (ws < 0)    // 将头节点的状态设置为0    // 这里会尝试清除头节点的状态,改为初始状态    compareAndSetWaitStatus(node, ws, 0);
  // 后继节点  Node s = node.next;  // 如果后继节点为null,或者已经被取消了  if (s == null || s.waitStatus > 0) {    s = null;    // for循环从队列尾部一直往前找可以唤醒的节点    for (Node t = tail; t != null && t != node; t = t.prev)      if (t.waitStatus <= 0)        s = t;  }  if (s != null)    // 唤醒后继节点    LockSupport.unpark(s.thread);}

从源码可看出:释放锁主要是将头节点的后继节点唤醒,如果后继节点不符合唤醒条件,则从队尾一直往前找,直到找到符合条件的节点为止

总结

这篇文章主要讲述了 AQS 的内部结构和它的同步实现原理,并从源码的角度深度剖析了AQS 独占锁模式下的获取锁与释放锁的逻辑,并且从源码中我们得出:在独占锁模式下,用 state 值表示锁并且 0 表示无锁状态,0 -> 1 表示从无锁到有锁,仅允许一条线程持有锁,其余的线程会被包装成一个 Node 节点放到队列中进行挂起,队列中的头节点表示当前正在执行的线程,当头节点释放后会唤醒后继节点,从而印证了 AQS 的队列是一个 FIFO 同步队列。

推荐阅读:

从源码的角度解析线程池运行原理

关于线程池你不得不知道的一些设置

你都理解创建线程池的参数吗?

Java深海拾遗系列(10)--- Java并发之AQS源码分析相关推荐

  1. Java并发之AQS源码分析ReentranLock、ReentrantReadWriteLock、Condition

    基于AQS的独享锁和共享锁的源码分析 基本概念说明 锁的基本原理思考 测试环境 实现方案1 实现方案2 独占锁:ReentrantLock源码分析 类依赖和类成员变量说明 加锁过程,入口方法:lock ...

  2. 【Java集合学习系列】HashMap实现原理及源码分析

    HashMap特性 hashMap是基于哈希表的Map接口的非同步实现,继承自AbstractMap接口,实现了Map接口(HashTable跟HashMap很像,HashTable中的方法是线程安全 ...

  3. 一步步实现windows版ijkplayer系列文章之三——Ijkplayer播放器源码分析之音视频输出——音频篇

    https://www.cnblogs.com/harlanc/p/9693983.html 目录 OpenSL ES & AudioTrack 源码分析 创建播放器音频输出对象 配置并创建音 ...

  4. Java高并发之CountDownLatch源码分析

    概述 CountDownLatch 允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助.简单来说,就是 CountDownLatch 内部维护了一个计数器,每个线程完成自己的操作之后都 ...

  5. Java并发包中Semaphore的工作原理、源码分析及使用示例

    简介: 在多线程程序设计中有三个同步工具需要我们掌握,分别是Semaphore(信号量),countDownLatch(倒计数门闸锁),CyclicBarrier(可重用栅栏) 欢迎探讨,如有错误敬请 ...

  6. java 向上取整方法 Math.ceil() 用法、源码分析

    刷题用到了,正好好好看看源码. 用法 Math.ceil() 返回值.参数均为double类型, 如果参数为int类型,idea不会报错,但是方法同时不会向上取整. 参数为int类型时,Math.ce ...

  7. bytearrayinputstream java_java io系列02之 ByteArrayInputStream的简介,源码分析和示例(包括InputStream)...

    我们以ByteArrayInputStream,拉开对字节类型的"输入流"的学习序幕. 本章,我们会先对ByteArrayInputStream进行介绍,然后深入了解一下它的源码, ...

  8. producer send源码_RocketMq系列之Producer顺序消息发送源码分析(四)

    有序消息 消息有序指的是可以按照消息的发送顺序来消费. RocketMQ可以严格的保证消息有序.但这个顺序,不是全局顺序,只是分区(queue)顺序. 顺序消息生产者 public static vo ...

  9. Java深海拾遗系列(9)--- 关于Java序列化的10个面试问题

    大多数商业项目使用数据库或内存映射文件或只是普通文件, 来满足持久性要求, 只有很少的项目依赖于 Java 中的序列化过程.无论如何,这篇文章不是 Java 序列化教程或如何序列化在 Java 的对象 ...

最新文章

  1. mapgis矢量化怎么打分数_mapgis矢量化的详细工作流程
  2. SQL SERVER 2008 R2 SP1更新时,遇上共享功能更新失败解决方案
  3. ubuntu16.04下安装mysql详细步骤
  4. 扩大Eclipse的内存
  5. 布同:pdf自定义分割(断章)
  6. java复制文件的命名_java-复制文件时在文件名扩展名前附加“复...
  7. SystemTap Errors Introduce
  8. server-sent events
  9. boost升压电路解析
  10. python2.7 一个莫名其妙的错误
  11. 奇异谱分析(SSA)的matlab实现
  12. 04- kubeadm init流程
  13. 三维重建(1):坐标系之间的变换
  14. c语言与多字节编码,什么是单字节,双字节和多字节编码
  15. linux mint 安装ubuntu软件中心,Ubuntu和Linux Mint:安装Pinta 1.6工具
  16. 瑞星wifi二代来袭,wifi,U盘两用
  17. 市场分析-全球与中国纳米复合太阳能电池市场现状及未来发展趋势
  18. onenote使用记录(1):新建与删除笔记本
  19. 视频教程-【CVPR2018】A Closer Look at Spatiotemporal Convolu-计算机视觉
  20. 中文版3ds Max 2012完全自学教程pdf

热门文章

  1. Altium Designer三维模型的应用
  2. 华硕X550高性价比
  3. 自动播放视频并录屏保存的python实现
  4. H5导出HTML文件,H5 导出文件的解决办法
  5. RFIC(AD9361)助力无线通信
  6. pps服务器未响应_PPS影音黄金VIP会员后看节目出现”服务器未响应,停止播放”怎么办...
  7. Vstar项目练手——学习笔记(2)
  8. 真正搞懂hashCode和hash算法
  9. 国外虚拟主机域名绑定方法,及Addon Domain/Subdomains/Parked domain详解
  10. 计算机办公软件应用三套题目,Office办公软件初级应用第三套试卷100分.doc