目录

  • 一、等待/通知机制与Condition接口
    • 1.1 等待/通知机制
    • 1.2 Condition接口
  • 二、AQS的具体实现
    • 2.1 ConditionObject
    • 2.2 等待机制
    • 2.3 通知机制
    • 2.4 响应中断
      • 2.4.1 THROW_IE模式
      • 2.4.2 REINTERRUPT模式
      • 2.4.3 同步队列中发生中断
    • 2.5 总结

一、等待/通知机制与Condition接口

1.1 等待/通知机制

等待/通知机制在本质上属于线程间通信方式的一种,经典范式:

  • 等待方(消费者):获取同步状态——>条件判断,不满足则进入等待——>被通知后退出等待状态——>重新进行条件判断,不满足继续进入等待
  • 通知方(生产者):获取同步状态——>改变条件——>通知等待线程

Java有内置的等待/通知机制解决方案,其相关方法被定义在所有对象的超类Object上。本文主要是剖析JUC中对等待/通知机制的另一种实现:Condition接口。

1.2 Condition接口

Condition是JUC.locks包下定义的一个接口,提供了类似Object的监视器方法,用于与Lock配合以实现等待/通知机制

  • Lock接口中定义了newCondition()方法,即在锁的实现中,可以通过调用newCondition()创建一个与锁关联的Condition对象,从而实现等待/通知机制
  • Condition中定义了等待/通知两种类型的方法
    • await()方法:持有同步状态的线程释放同步状态进入等待状态,直到被通知或中断,如果当前等待线程从await()方法返回,那表明该线程已经获取了同步状态
    • awaitUninterruptly():与await()相对应,不响应中断
    • signal()/signalAll():持有同步状态的线程唤醒一个/所有等待队列中的线程

二、AQS的具体实现

2.1 ConditionObject

ConditionObject是AQS的内部类,其具体实现了Condition接口的等待/通知机制,每个ConditionObject对象都维护一个等待队列,该队列是实现等待/通知机制的关键。

  • ConditionObject中的等待队列是FIFO队列,节点类型与同步队列节点类型相同(静态内部类AQS.Node),实例变量firstWaiter和lastWaiter分别指向了等待队列的头结点和尾结点

  • 调用Condition.await(),将会以当前线程构造节点从尾部加入等待队列。新增节点只需将原有尾结点nextWaiter指向新节点,并且更新尾结点即可,调用await()方法的线程必定是获取了同步状态的线程,因此这个过程不需要CAS保证(这也是Node节点中的实例变量nextWaiter没有用volatile修饰的原因)

  • Obejct的监视器模型中,一个对象拥有一个同步队列和一个等待队列,而AQS在维护一个同步队列的同时支持创建多个等待队列

    • 如阻塞队列的具体实现:

2.2 等待机制

持有同步状态的线程,可以通过调用await()方法释放同步状态并进入等待队列。源码如下:

public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter(); //1.构造Node节点加入等待队列int savedState = fullyRelease(node);//2.释放同步状态int interruptMode = 0;while (!isOnSyncQueue(node)) {LockSupport.park(this); //3.阻塞当前线程if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}//4.被唤醒后开始尝试获取同步状态if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
}
  • 假设当前持有同步状态的线程为T:

  • 此时线程T可以调用CO1.await()方法进入等待状态,同步器的状态发生变化:

    • 线程T释放同步状态,并唤醒T1,T1线程从阻塞处恢复,重新尝试获取同步状态
    • 线程T构造Node节点加入到CO1对应的等待队列尾部
    • 线程T挂起,进入阻塞状态

  • 通知机制,会在下一节剖析,但通过分析await()方法的源码可以看出:

    • 被唤醒的线程从LockSupport.park(this)这行代码恢复之后,需要重新获取同步状态
    • 只有从acquireQueued(node, savedState)方法退出,即获取到同步状态之后,才会从await()方法返回
    • 总结一下:调用await()的前提是当前线程获取了同步状态,调用await()后会释放同步状态进入阻塞态,当重新获取到同步状态后,才会从await()返回

2.3 通知机制

持有同步状态的线程,可以通过调用signal()/signalAll()方法,将等待队列中的节点移动到同步队列中,signal()方法处理的是等待队列中的首个节点,signalAll()处理的是等待队列中的全部节点。以signal()方法为例:

public final void signal() {if (!isHeldExclusively()) //1.确定当前线程是持有同步状态的线程throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);
}private void doSignal(Node first) {do {//2.1首个节点出队,while循环是为了排除已取消的节点if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);
}final boolean transferForSignal(Node node) {//3.1更改node的状态为0(同时跳过已取消的节点)if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;Node p = enq(node); //3.2 将节点添加到同步队列尾部(CAS)int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))//3.3如果前驱节点取消了,需要主动唤醒(这种思想在cancleAcquire方法解析的博文中已经做了详细剖析)LockSupport.unpark(node.thread);return true;
}
  • 假设上一节中T1线程获取到同步状态后,调用了CO1.signal()方法:

    • 等待队列中首个节点T3出队
    • 将T3线程对应的节点状态由-2(CONDITION)置为0,添加到同步队列尾部

  • 可以看出,T1线程调用signal()方法之后,T3线程并没有从阻塞态退出(按signal()的字面意思理解只是发出一个信号),仍依赖于同步队列的前驱节点(即T2线程)唤醒

  • 总结一下:调用signal()的前提是当前线程获取了同步状态,调用signal()后会将等待队列中的首个节点移动到同步队列尾部,但并不会直接唤醒该节点的阻塞态,仍依赖于同步队列中的前驱节点唤醒

  • 可以看出,调用signal()和await()方法的前提是必须持有同步状态,signal()方法中是有判断逻辑(调用isHeldExclusively()方法),那await()方法中是如何处理这个逻辑的呢?

    • 假设一个并没有持有同步状态的线程调用了await()方法,其实是可以正常添加节点到等待队列中的
    • 但在释放同步状态时,会释放失败,此时会将刚才进入等待队列的Node状态改为CANCLED
    • 参考上面signal()源码中的注释,这种节点会在signal()时,被清理出去
    • 总结一下:当没有获取同步状态的线程调用await()方法时,会正常构造节点添加到等待队列中,同时会抛出IllegalMonitorStateException,等待队列中的节点会在其他线程调用signal()时清理掉

2.4 响应中断

上述讨论的await()方法是能够响应中断的,分析源码可以看出,局部变量interruptMode用于记录中断事件,该变量有三个值:

  • 0 : 代表整个过程中一直没有中断发生,即2.2和2,3节分析的正常等待/通知流程
  • THROW_IE : 表示退出await()方法时需要抛出InterruptedException,这种模式用于中断发生在节点被signal之前
  • REINTERRUPT : 表示退出await()方法时只需要再自我中断一下,这种模式对应于中断发生在节点被signal之后

涉及到的源码:

private int checkInterruptWhileWaiting(Node node) {//如果被中断,判断具体的中断模式return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0;
}//判断线程中断之前当前节点是否已经被signal(根据ws的状态值)
final boolean transferAfterCancelledWait(Node node) {if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {//没有被signal过enq(node); //添加到同步队列return true;}while (!isOnSyncQueue(node))Thread.yield();return false; //已经被signal
}//对不同中断模式的处理逻辑
private void reportInterruptAfterWait(int interruptMode)throws InterruptedException {if (interruptMode == THROW_IE)throw new InterruptedException();else if (interruptMode == REINTERRUPT)selfInterrupt();
}

2.4.1 THROW_IE模式

该模式对应的场景是,线程在被中断时当前节点仍在等待队列中,没有被signal。结合await()的源码分析,其流程如下:

  • 线程因为中断,从挂起的地方(LockSupport.park(this);)被唤醒
  • checkInterruptWhileWaiting()中,判断没有signal过,将当前节点状态置为0,并添加到同步队列尾部,返回的中断模式为THROW_IE
  • 由于加入了同步队列,跳出循环(!isOnSyncQueue(node)=false)
  • 接下来线程将在同步队列中以阻塞的方式获取同步状态(acquireQueued(),如果获取不到锁,将会被再次挂起)
  • 获取到同步状态后,由于还没有将当前节点从等待队列中移除(node.nextWaiter != null),需要调用unlinkCancelledWaiters()将当前节点从条件队列中移除,同时顺便移除其他取消等待的节点
  • 最后通过reportInterruptAfterWait抛出了InterruptedException

总结一下:

  • await()中挂起的线程在被中断后不会立即抛出InterruptedException,而是会被添加到同步队列中去获取同步状态,如果争不到,还是会被挂起
  • 只有获取到同步状态之后,该线程才得以从同步队列和条件队列中移除,并最后在退出await()方法之前抛出InterruptedException
  • 这个层面上看,中断和signal的效果其实很像(将它从条件队列中移除,加入到同步队列),不同的是,会抛出InterruptedException异常以结束await()方法,表示节点是由于中断进而停止等待并进入到同步队列
  • 可以看出,signal()以及transferAfterCancelledWait()都有compareAndSetWaitStatus(node, Node.CONDITION, 0)的逻辑,即哪里cas成功了,就执行当前节点从等待队列出队以及同步队列入队的操作

2.4.2 REINTERRUPT模式

该模式对应的场景是,线程在被中断时当前节点已经被signal,即已经从等待队列出队进入同步队列了。结合await()的源码分析,其流程如下:

  • 线程因为中断,从挂起的地方(LockSupport.park(this);)被唤醒
  • transferAfterCancelledWait()中,由于节点已经被signal,cas失败,最后返回的中断模式为REINTERRUPT
    • 分析下transferAfterCancelledWait中的一个细节:

      如果cas失败(即已经被signal过),这个代码逻辑是为了等待执行signal逻辑的线程完成enq操作,即节点添加到同步队列
  • 由于执行signal逻辑的线程将当前节点加入了同步队列,因此能够跳出循环(!isOnSyncQueue(node)=false)
  • 接下来线程将在同步队列中以阻塞的方式获取同步状态(acquireQueued(),如果获取不到锁,将会被再次挂起)
  • 获取到同步状态后,在reportInterruptAfterWait中自我中断一下,存下中断标记
    • 这里可能会存在疑惑,既然已经中断了为什么还要多此一举?
      这是因为checkInterruptWhileWaiting中执行Thread.interrupted()时,已经将中断标记清除(参考Thread类的api注释,这里不赘述),所以需要最后重新标记一下。至于为什么搞这么复杂,个人理解作者的意图是保证在THROW_IE模式下,抛出InterruptedException后线程中断标识被清除。

2.4.3 同步队列中发生中断

await()方法中还有一个比较隐蔽的细节:

执行到acquireQueued()这里说明当前线程已经退出了等待状态(可能是正常被signal,也可能是被中断),acquireQueued()用于不响应中断地获取同步状态,其返回值即为阻塞过程中是否发生中断。

结合后面的判断interruptMode != THROW_IE,这里分析几种场景:

  • 在同步队列中未发生中断,保持之前分析的逻辑不变,不改变中断模式

  • 在同步队列中发生了中断,且等待过程中的中断模式不为THROW_IE,具体展开:

    • 等待过程未发生中断
    • 等待过程发生了中断,且是在被signal之后发生了中断

    上述情况都需要按 REINTERRUPT的模式处理,即await()结束前自我中断,保证线程中断标记为true

  • 在同步队列中发生了中断,且等待过程中的中断模式为THROW_IE,这种场景仍需要保持中断模式为THROW_IE,结束前抛出InterruptedException,以告知调用线程是由于被中断才停止了等待。

2.5 总结

  • AQS的内部类ConditionObject通过维护一个单向队列,并基于同步器的状态管理和同步队列管理能力,实现了等待/通知机制
  • 区别于Obejct的监视器模型,AQS能够同时维护多个等待队列
  • await()方法是能够响应中断的,对应不响应中断的等待能力是awaitUninterruptibly()实现的(中断的场景只是简单记了中断标识)
  • await()对响应的中断并不是直接从await()方法退出,而是退出阻塞态,从等待队列移动到同步队列,在最后退出await()时才抛出InterruptedException

Java并发编程——详解AQS对Condition接口的具体实现相关推荐

  1. java并发编程详解,Java架构师成长路线

    美团一面: 中间省略掉大概几个问题,因为我不记得了,下面记得的基本都是我没怎么答好的. 了解SOA,微服务吗? 分布式系统如何负载均衡?如何确定访问的资源在哪个服务器上? 一.轮询.二.随机.三.最小 ...

  2. Java JUC并发编程详解

    Java JUC并发编程详解 1. JUC概述 1.1 JUC简介 1.2 进程与线程 1.2 并发与并行 1.3 用户线程和守护线程 2. Lock接口 2.1 Synchronized 2.2 什 ...

  3. Java高并发编程详解系列-Java线程入门

    根据自己学的知识加上从各个网站上收集的资料分享一下关于java高并发编程的知识点.对于代码示例会以Maven工程的形式分享到个人的GitHub上面.   首先介绍一下这个系列的东西是什么,这个系列自己 ...

  4. java IO编程详解

    java IO编程详解 一.Socket 1. Sock概述 Socket,套接字就是两台主机之间逻辑连接的端点.TCP/IP协议是传输层协议,主要解决数据如何在网络中传输,而HTTP协议是应用层协议 ...

  5. 5W字高质量java并发系列详解教程(上)-附PDF下载

    文章目录 第一章 java.util.concurrent简介 主要的组件 Executor ExecutorService ScheduledExecutorService Future Count ...

  6. java并发编程——九 AbstractQueuedSynchronizer AQS详解

    文章目录 AbstractQueuedSynchronizer概述 AbstractQueuedSynchronizer的使用 AQS实现分析 同步队列 独占锁的获取与释放 独占式超时获取 共享式锁的 ...

  7. aqs clh java_【Java并发编程实战】—– AQS(四):CLH同步队列

    在[Java并发编程实战]-–"J.U.C":CLH队列锁提过,AQS里面的CLH队列是CLH同步锁的一种变形. 其主要从双方面进行了改造:节点的结构与节点等待机制.在结构上引入了 ...

  8. JAVA并发编程: CAS和AQS

    说起JAVA并发编程,就不得不聊聊CAS(Compare And Swap)和AQS了(AbstractQueuedSynchronizer). CAS(Compare And Swap) 什么是CA ...

  9. 6. Java并发编程-并发包-Lock和Condition

    前文介绍了java语言本身通过synchronized, wait, notify实现了管程,解决了并发编程两大难题:互斥和同步. 这两大问题并发包中也得到了相应的实现,分别时Lock和Conditi ...

最新文章

  1. PHP+MySQL手工注入问题及修复
  2. 【收藏】Oracle存储过程读写文件
  3. qt 编译mysql wince_Qt4.8.6开发WinCE 5.0环境搭建
  4. python 将指定路径(目录)下的图片或文本文件按给定序号重新排序,并批量重命名 (yolo、tensorflow数据集批量处理)
  5. 编程方法学21:监听器和迭代器回顾
  6. dedecms使用php语法,dedecms中使用php语句指南,dedecmsphp语句指南_PHP教程
  7. CF1066F-Yet another 2D Walking【贪心】
  8. 50行以上c语言程序代码,C语言非常简单的字符统计程序50行
  9. 在VC++中访问和修改系统注册表
  10. Python使用集合运算检测密码字符串的安全强度
  11. 使用POI导入导出Excel2003、2007示例
  12. 【裂缝识别】基于matlab GUI无人机裂缝图像处理系统(带面板)【含Matlab源码 1727期】
  13. 字符编码——简体中文编码中区位码、国标码、内码、外码、字形码的区别及关系
  14. 利用IDEA模板快速生成swagger注解
  15. 关于买鸡的问题,5文钱可以买一只公鸡,3文钱可以买一只母鸡,1文钱可以买3只雏鸡.现在用100文钱买100只鸡,那么各有公鸡、母鸡、雏鸡多少只?
  16. 机器学习系列(一), 监督学习和无监督学习
  17. python_爬虫_豆瓣TOP250_url
  18. python日志处理(logging模块)
  19. Unity 简单随机创建玩家游戏名
  20. QPS达到30万的elasticsearch架设之道

热门文章

  1. win7默认关闭802.1X身份验证选项
  2. 【C语言】这些经典题型大家都掌握了吗?一文学会这些题
  3. svm车牌定位matlab,基于SVM的车牌区域定位系统研究
  4. 【翻译】Full-System Power Analysis and Modeling for Server Environments【part1】
  5. 什么是SSL证书它有什么作用?
  6. linphone - Network is unreachable (真的时网络不可达)
  7. BSA-Xylan 牛血清白蛋白-木聚糖,血清白蛋白HSA/卵清白蛋白OVA/乳清白蛋白偶联糖
  8. elsevier投稿的一些事情
  9. 小A的柱状图(单调栈+前缀)+最大子矩阵
  10. X光平板探测器(探测卡/采集卡)