Condition队列原理分析

  • 前言
  • 初识Condition
  • Condition使用示例
  • Condition原理分析
  • condition.wait()源码解读
    • AQS#await()
      • AQS#addConditionWaiter()
        • AQS#unlinkCancelledWaiters()
      • AQS#fullyRelease(Node)
      • AQS#isOnSyncQueue(Node)
        • AQS#findNodeFromTail(Node)
  • condition.signal()源码解读
    • AQS#signal()
      • AQS#doSignal(Node)
        • AQS#transferForSignal(Node)
    • 回到AQS#await()
      • AQS#checkInterruptWhileWaiting(Node)
        • AQS#checkInterruptWhileWaiting(Node)
    • 继续回到AQS#await()
  • 总结

前言

每一个Java对象都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、 wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现线程之间的通信(等待/通知)机制。

在前一篇文章中我们介绍了Lock对象的实现类ReentrantLock和AQS队列实现原理,而Lock也有自己对应的等待/通知机制Condition队列,Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,主要通过方法await()和singal()实现。

在学习本篇文章之前,建议先去学一下上一篇文章介绍的ReentrantLock和AQS队列实现原理。因为本文的内容也离不开AQS和Node对象。

初识Condition

Condition和Lock一样,也是JUC内的一个接口。Condition接口定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到 Condition对象关联的锁。Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的,换句话说,Condition是依赖Lock对象的。

Condition的实现类ConditionObject也是AQS类中的一个内内部类,也依赖于Node对象。

Condition使用示例

Condition的使用也非常简单,下面是一个简单的使用示例:

package com.zwx.concurrent.lock;import java.util.Locale;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class LockConditionDemo {public static void main(String[] args) throws InterruptedException {Lock lock = new ReentrantLock();Condition condition = lock.newCondition();new Thread(new ConditionAwait(lock,condition)).start();Thread.sleep(1000);new Thread(new ConditionSingal(lock,condition)).start();}
}class ConditionAwait implements Runnable{private Lock lock;private Condition condition;public ConditionAwait(Lock lock, Condition condition) {this.lock = lock;this.condition = condition;}@Overridepublic void run() {System.out.println("await begin");try {lock.lock();condition.await();}catch (InterruptedException e){e.printStackTrace();}finally {lock.unlock();}System.out.println("await end");}
}class ConditionSingal implements Runnable{private Lock lock;private Condition condition;public ConditionSingal(Lock lock, Condition condition) {this.lock = lock;this.condition = condition;}@Overridepublic void run() {System.out.println("signal begin");try {lock.lock();condition.signal();}finally {lock.unlock();}System.out.println("signal end");}
}

运行之后,输出结果为:

这个效果就是和wait(),nodity()一样的,那么Condition中的等待通知机制是如何实现的呢?

Condition原理分析

Condition接口的实现类ConditionObject是一个多线程协调通信的工具类,可以让线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒。
和上一篇文章介绍的AQS同步队列类似,Condition也是一个依赖Node对象构建的FIFO队列。
Condition队列,称之为等待队列,和AQS队列不同的是,Condition等待队列不会维护prev和next,维护的只是一个单项列表,通过firstWaiter和lastWaiter实现头尾节点,然后除了lastWaiter节点,其余每个节点会有一个nextWaiter指向下一个节点,Condition队列大致示意图如下:

condition.wait()源码解读

接下来让我们进入源码层面开始剖析condition的实现原理。上文的示例中,当我们调用condition.wait()时,我们进入AbstractQueuedSynchronizer类中的await()方法。

AQS#await()


第一步是检测是否被中断,这个就不用多说,我们看下面的addConditionWaiter()方法:

AQS#addConditionWaiter()


为了便于理解,我们还是把Node对象贴出来看一看:

static final class Node {static final Node SHARED = new Node();static final Node EXCLUSIVE = null;static final int CANCELLED =  1;//表示当前线程状态是取消的static final int SIGNAL    = -1;//表示当前线程正在等待锁static final int CONDITION = -2;//Condition队列初始化Node节点时的默认状态static final int PROPAGATE = -3;//CountDownLatch等工具中使用到,暂时用不到volatile int waitStatus;//Node节点中线程的状态,AQS队列中默认为0volatile Node prev;//当前节点的前一个节点volatile Node next;//当前节点的后一个节点volatile Thread thread;//当前节点封装的线程信息Node nextWaiter;//Condition队列维护final boolean isShared() {//暂时用不到return nextWaiter == SHARED;}final Node predecessor() throws NullPointerException {//获取当前节点的上一个节点Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node() {}Node(Thread thread, Node mode) {//构造一个节点:addWaiter方法中会使用,此时waitStatus默认等于0this.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { //构造一个节点:Condition中会使用this.waitStatus = waitStatus;this.thread = thread;}}

需要说明的是,AQS队列中初始化Node节点的时候不会传入状态,所以默认为0,然后我们之前分析的时候知道,中途会被改为1,然后线程异常时候会有3出现,所以AQS队列中的Node节点实际上只会有-1,0,1三种状态,而Condition队列,初始化的时候调用的是另一个构造器,直接传入了-2状态,所以不会有0这个默认状态,故而Condition队列中只会有-2和1两种状态。

这里删除无效节点的方法我们后面再分析,我们现在假设有线程A和线程B,线程A进来的时候因为Condition队列还没被初始化,所以执行的是1879和1882两行代码,这时候就构建出了这样的一个Condition队列:

这时候因为只有一个节点,所以firstWaiter和lastWaiter都是指向同一个节点,而ThreadA节点中这时候nextWaiter是空的,因为这时候还没有下一个节点。

这时候线程B也进来,那么就会加入到已经构建好的对象(注意,这两个线程必须共用一个Lock对象,否则会构建不同的Condition队列),ThreadB进来就会执行1881和1882两行代码,最终得到下面的Condition队列:

Condition构建好了,先不管Node节点状态是怎么变成1(cancle)的,我们假如线程B的节点状态变成1了,然后进入unlinkCancelledWaiters()方法看看是怎么移除无效节点的,当ThreadB状态为1,得到如下Condition队列(和上图唯一的区别就是ThreadB所在Node状态变成了1):

AQS#unlinkCancelledWaiters()

这个方法的逻辑也不算难,只要记住两个属性:
一个是t,t是需要循环的节点,第一次是firstwaiter,循环完了之后就会把nextWaiter赋值给t继续循环(1933和1945两行代码);
另一个是trail,用来记录已经循环过的节点,循环的时候如果没有取消的节点,那就是把t循环完之后赋值给trail,然后继续循环

这里我们还是继续演示一下,第一次循环肯定肯定走的是1944行代码和1945行代码,因为firstWaiter肯定不为空,状态也等于Node.CONDITION,循环结束之后会得到如下结果:t=ThreadB,trail=firstWaiter;

然后继续循环,这时候因为t状态是1,所以if条件成立,进入1935行开始执行清除无效节点的逻辑,t.nextWaiter = null;因为当前ThreadB是尾节点,所以这种情况这句话是不起什么作用的,针对非尾节点,才会有作用。

又因为trail=firstWaiter不等于null,所以会执行1939行代码(else分支),这时候因为ThreadB线程已经没有下一个节点了,所以1939行相当于:trail.nextWaiter = null;因为trail=firstWaiter,所以等价于:firstWaiter.nextWaiter=null,于是得到下面的最新Condition队列:

然后执行lastWaiter = trail;等价于lastWaiter = firster;得到如下Condition队列:

可以看到ThreadB这个无效节点已经被清除了。

忘掉这个清除无效节点逻辑,回到我们的正常逻辑,队列构建完成之后,await()方法会继续往下面执行:

接下来回去执行释放锁fullyRelease(Node)的逻辑,因为线程await()方法本来就是要把当前锁让给另一个线程,所以肯定要释放锁,要不然其他线程不可能获得锁。

AQS#fullyRelease(Node)


这里首先会获取到当前的状态,然后把状态传入elease()方法,前面介绍ReentrantLock的时候,lock.unlock()也会调用这个release(arg)方法,只不过unlock()是固定传的1,也就是说如果有重入调用一次只会state-1,而这里是直接全部被减去。
这里就不在介绍release(arg)方法了,没有了解过的可以看我前面介绍ReentrantLock和AQS的文章。

这里如果释放锁成功之后,又会继续回到我们的await()方法:

这时候会继续去执行while循环中的isOnSyncQueue方法,这个方法的意思是判断一下当前线程所在的Node是不是在AQS同步队列,那么为什么要有这个判断?

大家注意了,这是在并发场景下,所以也可能会有其他线程已经把线程B唤醒了,唤醒之后并不是说就能直接获得锁,而是会去争抢锁,那么争抢锁失败了就会加入到AQS同步队列当中,所以这里要有这个判断,如果不在AQS同步队列,那就可以把当前线程挂起了。

AQS#isOnSyncQueue(Node)


这里有一个点需要特别指出的是,Condition队列的节点,当被其他线程调用了singal()方法唤醒的时候,就需要去争抢锁,而争抢锁失败就有可能被加入到AQS同步队列,所以这里才会有prev和next属性的判断

还有一个点如果大家不记得之前构造AQS同步队列的逻辑可能就不太好理解,为了便于大家理解,我把上文介绍AQS同步队列中的enq代码片段贴过来解释一下就很好理解了:

上面代码中如果597行成功,而598行的CAS失败,那么这时候node.prev!=null,但是他替换tail节点失败了,所以等于是没有加入到AQS同步队列,所以上面即使node.prev!=null,仍然需要从tail节点遍历一下来确定。

AQS#findNodeFromTail(Node)


这段代码应该很好理解,就不多做解释了。

回到await()主方法:

到这里,我们的线程B进来的时候肯定是不会在AQS同步队列中的,搜易进入下一行,当前线程被park()挂起。挂起之后需要等到其他线程调用singal()方法唤醒。

condition.signal()源码解读

上文的示例中,当我们调用condition.signal()时,我们进入AbstractQueuedSynchronizer类中的signal()方法。

AQS#signal()


这个方法比较简单,只是做了个简单的判断,我们进入doSignal(Node)方法看看具体是如何唤醒其他线程的。

AQS#doSignal(Node)


循环体中主要是判断当前Condition队列中第二个节点是否可用,如果可以用,就剔除掉。
而主要的逻辑在while条件当中的transferForSignal(Node),这个就是singal操作的核心代码了,主要就是将Condition队列中的Node转移到AQS同步队列当中去竞争锁。

这里经过一次do操作之后实际上已经把原先的firstWaiter节点移除了,因为线程被唤醒后需要加入到AQ同步队列当中,先把Node移出Condition,后面再调用transferForSignal方法加入AQS同步队列:

注意了,线程被sigal唤醒后并不是说就能直接获得锁,还是需要通过竞争才可以获得锁,所以需要将其转移到AQS同步队列去争抢锁。

AQS#transferForSignal(Node)


这里注释上都写明了大致意思,应该能看的懂,期中enq方法就是将Node节点加入到AQS同步队列的逻辑,而1710到1712行代码不要也是可以的,因为我们在lock.lock()和lock.unlock()的时候都有剔除无效节点的操作,这里这么做的考虑之一,是可以提升一定的性能,我们假设这个AQS同步队列当中原先只有一个节点(除了head哨兵节点),那么这时候p(即原先的tail)节点是无效节点,这时候重新唤醒当前节点去抢占锁,而这时候之前持有锁的线程恰巧释放了锁,那么他就有可能直接抢占成功了。

回到AQS#await()


上面我们的线程被挂在了上面的2062行,但是要注意,这里被唤醒有两种情况:

  • 被singal()方法唤醒
  • 被interrupt()中断
    所以唤醒之后第一件事就是要判断到底是被interrupt()唤醒的还是被singal()唤醒的。

AQS#checkInterruptWhileWaiting(Node)


transferAfterCancelledWait(Node)方法主要就是判断到底是情况2还是情况3。

AQS#checkInterruptWhileWaiting(Node)


上面我们可以知道线程恢复到底是先interrupt()还是先singal(),返回之后回到之前的方法

继续回到AQS#await()


到这里我们的真个流程分析基本上结束了,后面的acquireQueued方法就是抢占锁了,抢占锁的时候如果被中断了才会返回true,所以这里的判断针对的就是如果抢占锁被中断了,而上面的interruptMode=0的情况,我们需要改为REINTERRUPT。再往后就是清除取消的节点,以及根据interruptMode来响应中断了,reportInterruptAfterWait方法也非常简单:

总结

Condition队列和AQS同步队列中的节点共用的是Node对象,通过不同状态来区分,而一个Node同一时间只能存在于一个队列,一个Node从Condition队列移出加入到AQS同步队列的流程图如下:

后面将会继续分析JUC中的其他工具的实现原理,感兴趣的 请关注我,和孤狼一起学习进步

【并发编程系列6】Condition队列原理及await和singal(等待/唤醒)机制源码分析相关推荐

  1. 并发编程系列之AQS实现原理

    并发编程系列之AQS实现原理 1.什么是AQS? AQS(AbstractQueuedSynchronizer),抽象队列同步器,是juc中很多Lock锁和同步组件的基础,比如CountDownLat ...

  2. 消息队列 64式 : 2、oslo.messaging消息处理源码分析

    目标: 弄清楚oslo messaging中executor为threading的处理过程 1 总入口 ceilometer/collector.py class CollectorService(c ...

  3. Linux ARM平台开发系列讲解(TTY) 2.5.2 串口TTY子系统驱动源码分析

    1. TTY驱动流程 如下图,整个TTY驱动流程基本如下,后续会逐一分析,先记住这个框架. 2. TTY驱动流程分析 2.1 驱动入口和出口 2.1.1 函数接口用法分析 根据自己选择的驱动模型,调用 ...

  4. jQuery源码分析系列

    声明:本文为原创文章,如需转载,请注明来源并保留原文链接Aaron,谢谢! 版本截止到2013.8.24 jQuery官方发布最新的的2.0.3为准 附上每一章的源码注释分析 :https://git ...

  5. [转]jQuery源码分析系列

    文章转自:jQuery源码分析系列-Aaron 版本截止到2013.8.24 jQuery官方发布最新的的2.0.3为准 附上每一章的源码注释分析 :https://github.com/JsAaro ...

  6. 【SemiDrive源码分析】系列文章链接汇总(全)

    注意:兄弟们,因为要换工作了,本专栏暂时先不更新了,如果后续工作也涉及芯驰平台的话,那我还会继续更新下去. 有好工作机会的兄弟,也可以私信介绍给我,可以聊聊 谢谢!!! 注意:兄弟们,因为一些其他原因 ...

  7. reentrantlock非公平锁不会随机挂起线程?_【原创】Java并发编程系列16 | 公平锁与非公平锁...

    本文为何适原创并发编程系列第 16 篇,文末有本系列文章汇总. 上一篇提到重入锁 ReentrantLock 支持两种锁,公平锁与非公平锁.那么这篇文章就来介绍一下公平锁与非公平锁. 为什么需要公平锁 ...

  8. python并发编程之semaphore(信号量)_Python 并发编程系列之多线程

    Python 并发编程系列之多线程 2 创建线程 2.1 函数的方式创建线程 2.2 类的方式创建线程 3 Thread 类的常用属性和方法 3.1 守护线程: Deamon 3.2 join()方法 ...

  9. java 银行并发_java并发编程——通过ReentrantLock,Condition实现银行存取款

    Java 并发编程系列文章 java.util.concurrent.locks包为锁和等待条件提供一个框架的接口和类,它不同于内置同步和监视器.该框架允许更灵活地使用锁和条件,但以更难用的语法为代价 ...

最新文章

  1. python登录程序编写-初学Python3 - 写一个登录程序
  2. 计算机学院志愿公益活动,计算机学院开展学雷锋主题公益活动
  3. Full Tank?
  4. SVN安装后创建仓库、用户、上传代码
  5. 黎曼ζ 函数中的Γ是否与欧拉B函数中的Γ一样
  6. docker修改redis配置文件
  7. 密西根州立大学计算机排名,美国密西根州立大学 Alex X. Liu 教授来我校作学术讲座...
  8. 2019牛客暑期多校训练营(第八场)E.Explorer
  9. 现代通用计算机雏形是,科技知识:什么是现代通用计算机的雏形
  10. C语言编辑bmi计算器,使用事件处理程序的BMI计算器计算
  11. java登陆拦截器_登陆拦截器LoginInterceptor
  12. linux如何添加360网站卫士ip,使用加速乐、360网站卫士PHP无法获取用户IP的解决方法...
  13. 平衡二叉树 treap
  14. SimpleFOC(三)—— AS5600角度读取
  15. 【通信原理】第三章 -- 随机过程[下]
  16. PLL锁相环原理以及Altera FPGA的IP核实现
  17. mysql8分区表_MySQL 分区表
  18. 洛谷——P1613 跑路
  19. 解决微信小程序图片延迟加载(四种方法)
  20. red5+adobe flash media live(或OBS) +酷播播放器实现简单的直播及回看(三)------简单分析直播及回放系统的设计

热门文章

  1. 计算机考研复试——数据结构篇
  2. 企业微信推送应用消息-图片(news)/图文(npmnews)/卡片/文字
  3. 计算机网络水晶头博客,两台电脑组成局域网(交叉线)的水晶头制作详解
  4. 容器CICD实践:基于Helm实现应用交付自动回滚
  5. 基于JAVA校园快递管理系统计算机毕业设计源码+系统+mysql数据库+lw文档+部署
  6. RO段、RW段和ZI段
  7. Linux基础入门-2
  8. python加权最小二乘_如何计算加权最小二乘法的样本权重?
  9. 浏览器地址栏输入url到显示主页的过程
  10. php screw 密钥,php加密 php_screw Web程序 - 贪吃蛇学院-专业IT技术平台