AQS队列同步器学习

在学习并发的时候,我们一定会接触到 JUC 当中的工具,JUC 当中为我们准备了很多在并发中需要用到的东西,但是它们都是基于AQS(AbstractQueuedSynchronizer)队列同步器来实现的,也就是我们如果能够去梳理清楚AQS当中的知识点,对我们以后了解其他并发功能键有很大的帮助。

CLH队列

队列同步器(AbstractQueuedSynchronizer),是用来构建锁或者其他同步组件的基础框架,它使用了一个int变量来表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,并发包的作者Doug Lea期望她能够成为实现大部分同步需求的基础。

而这个内置的队列就是CLH双向队列,当前线程如果获取锁失败的时候,会将当前线程、状态等信息封装成一个Node节点添加到CLH队列当中去--也就是一个Node节点其实就是一个线程,而当有线程释放时,会唤醒CLH队列并取其首节点进行再次获取:

  static final class Node {/** Marker to indicate a node is waiting in shared mode *///共享模式节点static final Node SHARED = new Node();/** Marker to indicate a node is waiting in exclusive mode *///独占模式节点static final Node EXCLUSIVE = null;/** waitStatus value to indicate thread has cancelled *///处于取消的等待状态/* 因为超时或中断就会处于该状态,并且处于该状态的节点不会转变为其他状态处于该状态的节点不会再次被阻塞*/static final int CANCELLED =  1;/** waitStatus value to indicate successors thread needs unparking *///等待状态/*  表示后继节点是否需要被唤醒 */static final int SIGNAL    = -1;/** waitStatus value to indicate thread is waiting on condition *//* 该节点处于条件队列当中,该节点不会用作同步队列直到设置状态0用来传输时才会移到同步队列当中,并且加入对同步状态的获取 */static final int CONDITION = -2;/*** waitStatus value to indicate the next acquireShared should* unconditionally propagate*//* 表示下一次共享式同步状态获取将会无条件地传播下去 */static final int PROPAGATE = -3;​//线程等待状态volatile int waitStatus;​//当前节点的前置节点volatile Node prev;​//当前节点的后置节点volatile Node next;​//节点所在的线程volatile Thread thread;​//条件队列当中的下一个等待节点Node nextWaiter;​/*** 判断节点是否共享模式*/final boolean isShared() {return nextWaiter == SHARED;}​/*** 获取前置节点*/final Node predecessor() throws NullPointerException {Node p = prev;  //获取前置节点if (p == null)  //为空则抛空指针异常throw new NullPointerException();elsereturn p;}​Node() {    // Used to establish initial head or SHARED marker}​Node(Thread thread, Node mode) {     // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}​Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}}
复制代码

通过上面对Node节点的源代码进解说,我想对于之后的内容会有很大的帮助的,因为后面的方法当中会有特别多的状态判断。

当我们重写同步器的时候,需要使用同步器的3个方法来访问和修改同步的状态。分别是:

  • getState():获取当前同步状态

  • setState(int newState):设置当前同步状态

  • compareAndSetState(int expect, int update):通过CAS来设置当前状态,该方法可以保证设置状态操作的原子性

入列

我们在上面既然已经讲到了AQS当中维护着的是CLH双向队列,并且是FIFO,既然是队列,那肯定就存在着入列和出列的操作,我们来先从入列看起:

acquire(int arg)方法

该方法是独占模式下线程获取同步状态的入口,如果当前线程获取同步状态成功,则由该方法返回,如获取不成功将会进入CLH队列当中进行等待。

在该方法当中会调用重写的tryAcquire(int arg)方法。

  public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
复制代码
  • tryAcquire(int arg)

    很多人刚看到这个方法的时候,会不会有种一脸懵逼的感觉,方法体居然只是返回一个异常而已,说好的业务逻辑代码呢?

    回到我们一开始说的,AQS实际上只是作为一个同步组件的基础框架,具体的实现要交由自定义的同步器去自己实现,所以该方法当中只有一句异常。

此方法由用户自定义的同步器去实现,尝试获取独占资源,如果成功则返回true,如果失败则返回false

      protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}
复制代码
  • addWaiter(Node mode)

    将当前线程添加到CLH队列的队尾,并且指定独占模式。

    Node有两种模式,分别是独占模式和共享模式,也就是Node.EXCLUSIVENode.SHARED

    private Node addWaiter(Node mode) {//将当前线程以指定模式来创建Node节点Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;  //获取队列尾部给变量predif (pred != null) {  //若队尾不为空node.prev = pred;  //将当前节点的前置节点指向原来的tailif (compareAndSetTail(pred, node)) {  //通过CAS将tail设置为Node/**如果设置成功,表示此操作没有别的线程执行成功*/ pred.next = node;  //将原来tail节点的后置节点指向node节点return node;  //返回node节点}}enq(node);return node;}
复制代码
  • enq(Node )

    该方法是将节点插入到CLH队列的尾部,并且通过自旋(死循环)来保证Node节点的正确添加

      private Node enq(final Node node) {for (;;) {  //自旋--死循环添加节点Node t = tail;  //获取原来tial节点至t变量if (t == null) { // Must initialize  队列为空if (compareAndSetHead(new Node()))  //设置一个空节点作为head节点tail = head;  //head和tail是同一个节点} else {  //队列不为空的正常情况node.prev = t;  //设置当前节点的前置节点为原tail节点if (compareAndSetTail(t, node)) {  //通过CAS设置当前节点为tail节点t.next = node;  //原tail节点后置节点是当前节点return t;  //返回原tail节点结束循环}}}}
复制代码
  • acquireQueued(final Node node, int arg)

    来到这个方法,证明已经通过tryAcquire获取同步状态失败了,并且调用了addWaiter方法将当前线程添加至CLH队列的尾部了,剩下的就是在等待状态当中等其他线程来唤醒自己去获取同步状态了。

    对于已经处于CLH队列当中的线程,是以独占并且不可中断的模式去获取同步状态。

      final boolean acquireQueued(final Node node, int arg) {boolean failed = true;  //成功获取资源的状态try {boolean interrupted = false;  //是否被中断的状态for (;;) { //自旋--死循环final Node p = node.predecessor();  //获取当前节点的前置节点//如果前置节点是首节点,并且已经成功获取同步状态if (p == head && tryAcquire(arg)) { setHead(node);  //设置当前节点为head节点,并且将当前node节点的前置节点置nullp.next = null; //设置原head节点的后置节点为null,方便GC回收原来的head节点failed = false; return interrupted; //返回是否被中断}//获取同步状态失败后,判断是否需要阻塞或中断if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;  //如果被中断过,设置标记为true}} finally {if (failed)cancelAcquire(node);  //取消当前节点继续获取同步状态的尝试}}
复制代码
  • shouldParkAfterFailedAcquire(Node pred, Node node)

    对于获取状态失败的节点,检查并更新其状态,如果线程阻塞就返回true,这是所有获取状态循环的信号控制方法。

    要求pred == node.prev

实际上除非锁获取成功,要不然都会被阻塞起来

      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;  //获取前驱节点的状态//状态为-1,表示后继节点已经处于waiting等待状态,等该节点释放或取消,就会通知后继节点if (ws == Node.SIGNAL) return true;//如果状态大于0--取消状态,就跳过该节点循环往前找,找到一个非cancel状态的节点if (ws > 0) {do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);//赋值pred的后继节点为node节点pred.next = node;} else {  //如果状态小于0//必须是PROPAGATE或者0--表示无状态,当是-2的时候,在condition queue队列当中//通过CAS设置pred节点状态为signalcompareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
复制代码
  • parkAndCheckInterrupt()

    还有当该节点的前驱节点状态为signal时,才可以将该节点所在线程pack起来,否则无法将线程pack。

      private final boolean parkAndCheckInterrupt() {//通过LockSupport工具阻塞当前线程LockSupport.park(this);return Thread.interrupted();  //清除中断标识,返回清除前的标识}
复制代码
  • cancelAcquire(Node node)

    该方法是取消节点所在线程对同步状态的获取,那说白了就是将节点的状态改为cancelled.

      private void cancelAcquire(Node node) {// Ignore if node doesnt existif (node == null)  //节点为空则返回return;​node.thread = null;  //节点所在线程设为null​// Skip cancelled predecessors//获取node节点的前驱节点Node pred = node.prev;//循环获取前驱节点的状态,找到第一个状态不为cancelled的前驱节点while (pred.waitStatus > 0)node.prev = pred = pred.prev;​// predNext is the apparent node to unsplice. CASes below will// fail if not, in which case, we lost race vs another cancel// or signal, so no further action is necessary.//获取pred节点的后继节点Node predNext = pred.next;//设置node节点状态为CANCELLEDnode.waitStatus = Node.CANCELLED;​//如果node节点是tail节点,通过CAS设置tail节点为predif (node == tail && compareAndSetTail(node, pred)) {//通过CAS将pred节点的next节点设置nullcompareAndSetNext(pred, predNext, null);} else {  //如果不是tail节点​int ws;  //初始化node节点状态变量/**如果pred不是head节点,并且状态是SIGNAL或者状态小于0并且设置pred*状态为SIGNAL成功,。并且pred所封装的线程不为空*/if (pred != head &&((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {//获取node节点的后继节点Node next = node.next;//如果后继节点部位null并且状态不为cancelledif (next != null && next.waitStatus <= 0)//设置pred的后继节点为next,也就是将pred的后继节点不再是nodecompareAndSetNext(pred, predNext, next);} else {unparkSuccessor(node);  //释放后继节点}​node.next = node; // help GC}}
复制代码
  • unparkSuccessor(Node node)
      private void unparkSuccessor(Node node) {//获取node节点的状态int ws = node.waitStatus;if (ws < 0)  //如果状态小于0,为SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3//通过CAS将node节点状态设置为0compareAndSetWaitStatus(node, ws, 0);​//获取node节点的后继节点 Node s = node.next;//如果后继节点为空或者状态大于0--cancelledif (s == null || s.waitStatus > 0) {//后继节点置为空s = null;//从tail节点开始往前遍历for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)  //判断状态小于等于0,就是为了找到状态不为cancelled的节点s = t;  //找到最前的状态小于等于0的节点}if (s != null)  //如果由以上方法找到的节点不为空//通过LockSupport工具释放s节点封装的线程LockSupport.unpark(s.thread);}
复制代码

经过了以上的分析,我想我们对入列的代码也有了一个比较好的了解吧,那我们也可以尝试画一下入列的流程图。

出列

出列的操作相对于入列来说就真的是简单的多了,毕竟入列的时候需要考虑的因素太多,要考虑前驱和后继节点,还要考虑节点的状态等等一堆因素,而出列就是指CLH队列的头部节点,所以麻烦的因素就会少了很多。

release(int arg)

我们废话都不多说了,直接上代码吧。

这也是以独占模式来释放对象

  public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;  //获取head节点//如果head节点不为空并且状态不为0,也就是初始节点if (h != null && h.waitStatus != 0) unparkSuccessor(h);  //唤醒后继节点return true;}return false;}
复制代码
  • tryRelease(int arg)

    这个方法与入列的tryAcquire一样,是只有一个异常的,也就是证明这个方法也是由自定义的同步组件自己去实现,在AQS同步器当中只是定义一个方法而已。

      protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}
复制代码
  • unparkSuccessor(Node node)

    这个方法实际在入列的时候已经讲过了,我直接搬上面的代码解释下来。

      private void unparkSuccessor(Node node) {//获取node节点的状态int ws = node.waitStatus;if (ws < 0)  //如果状态小于0,为SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3//通过CAS将node节点状态设置为0compareAndSetWaitStatus(node, ws, 0);​//获取node节点的后继节点 Node s = node.next;//如果后继节点为空或者状态大于0--cancelledif (s == null || s.waitStatus > 0) {//后继节点置为空s = null;//从tail节点开始往前遍历for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)  //判断状态小于等于0,就是为了找到状态不为cancelled的节点s = t;  //找到最前的状态小于等于0的节点}if (s != null)  //如果由以上方法找到的节点不为空//通过LockSupport工具释放s节点封装的线程LockSupport.unpark(s.thread);}
复制代码

这上面就是出列也就是释放的代码了,其实看起来不是很难。

小结

花了整整3天左右的时间去看了一下AQS的源码,会去看也纯属是想要把自己的并发方面的知识能够丰富起来,但是这次看源码也还是不太顺利,因为很多代码或者方法,单独分开来看的时候或许能理解,感觉方法的作用也的确是那么回事,但是当一整个流程串起来的时候也还是不太明白这样做的具体作用,以及整个的执行流程。更加没办法理解那些自旋里的代码,每一次执行会出现怎样的结果,对CLH队列的影响。

不过,自己也是有收获的,至少相较于一开始来说,自己对AQS有了一点皮毛的理解,不至于以后闻起来完完全全是一问三不知的状态。

同时也希望我这篇文章能够对想要了解AQS的程序猿能够起一点作用,以后自己也还是将自己的一些学习心得或者资料共享出来。

参考资料

方腾飞:《Java并发编程的艺术》

如需转载,请务必注明出处,毕竟一块块搬砖也不是容易的事情。

Java并发之AQS同步器学习相关推荐

  1. Java并发之AQS详解(文章里包含了两片文章结合着看后边文章不清楚,请看原文)

          AQS全称抽象队列同步器(AbstractQuenedSynchronizer),它是一个可以用来实现线程同步的基础框架.当然,它不是我们理解的Spring这种框架,它是一个类,类名就是A ...

  2. Java并发之AQS详解

    一.概述 谈到并发,不得不谈ReentrantLock:而谈到ReentrantLock,不得不谈AbstractQueuedSynchronizer(AQS)! 类如其名,抽象的队列式的同步器,AQ ...

  3. Java 并发之 AQS 详解(上)

    Java 并发之 AQS 详解 前言 Java SDK 为什么要设计 Lock 死锁问题 synchronized 的局限性 显式锁 Lock Lock 使用范式 Lock 是怎样起到锁的作用呢? 队 ...

  4. 你真的弄明白了吗?Java并发之AQS详解

    你真的弄明白了吗?Java并发之AQS详解 带着问题阅读 1.什么是AQS,它有什么作用,核心思想是什么 2.AQS中的独占锁和共享锁原理是什么,AQS提供的锁机制是公平锁还是非公平锁 3.AQS在J ...

  5. aqs java 简书,Java并发之AQS原理

    一.总体框架 AQS是指AbstractQueuedSynchronizer.它是一个抽象类,java并发包里的ReentrantLock.CountDownLatch和Semaphroe等重要的工具 ...

  6. Java深海拾遗系列(10)--- Java并发之AQS源码分析

    AQS 全称是 AbstractQueuedSynchronizer,顾名思义,是一个用来构建锁和同步器的框架,它底层用了 CAS 技术来保证操作的原子性,同时利用 FIFO 队列实现线程间的锁竞争, ...

  7. Java并发之AQS源码分析ReentranLock、ReentrantReadWriteLock、Condition

    基于AQS的独享锁和共享锁的源码分析 基本概念说明 锁的基本原理思考 测试环境 实现方案1 实现方案2 独占锁:ReentrantLock源码分析 类依赖和类成员变量说明 加锁过程,入口方法:lock ...

  8. Java 并发之线程池学习

    2019独角兽企业重金招聘Python工程师标准>>> 创建 通过ThreadPoolExecutor来创建一个线程池 new ThreadPoolExecutor(corePool ...

  9. 面试:你说你精通Java并发,给我讲讲Java并发之J.U.C

    转载自 面试:你说你精通Java并发,给我讲讲Java并发之J.U.C J.U.C J.U.C即java.util.concurrent包,为我们提供了很多高性能的并发类,可以说是java并发的核心. ...

最新文章

  1. 微生态、生信和植物领域最新资讯合集,不看你就亏大啦!!!
  2. CSS修改tr边框属性
  3. 大数据开发你需要知道的十个技术
  4. java 根据类名示例化类_Java即时类| minusNanos()方法与示例
  5. 点击率预估与冷启动(一)
  6. python opencv —— 背景提取(MOG、KNN)、识别与检测(Haar Cascade)
  7. 主题背景_游戏背景音乐的种类—主题曲
  8. S5PV210体系结构与接口01:ARM体系结构概述
  9. DHTMLX Suite 7.1.10 Crack
  10. Database—DML
  11. 华为HCNE题库大全(第一部)
  12. 8.磁盘存储器的管理
  13. mangos新手教程 - 服务器配置文件中文说明
  14. 2019软件工程第三次作业
  15. 三问新能源车险:亲自下场卖保险,意欲何为?
  16. 【最近抖音上元宇宙虚拟项目七国争霸,直播互动游戏源码解析】
  17. matlab 将子文件下同名称文件移植到同一文件夹下并按序排列重命名n.bmp,n=1,2,3...
  18. android开发打开第三方库,Android开发NDK调用三方so库
  19. 用CSS实现圆角图片
  20. 计算机网络系统拓扑图

热门文章

  1. mysql联合索引测试
  2. java 中 如何sum 乘法_java 加法 乘法问题
  3. php后端接收数据,后端如何接收fetch方式发送的数据?
  4. python面向对象的含义_Python面向对象(一)
  5. 使用 Preload/Prefetch 优化
  6. matlab怎么利用圆形度提取园,基于Matlab+GUI图像处理的物料粒度与圆形度测试.pdf...
  7. python根据频率画出词云_利用pandas+python制作100G亚马逊用户评论数据词云
  8. java后台接收参数_java 后台如何 接收 uploader UploadFileOptions 参数
  9. java中while空循环_java – 实现空while循环以保持控制的更好方法
  10. python json.dumps参数_json.dumps参数之解