欢迎关注:王有志
期待你加入Java人的提桶跑路群:共同富裕的Java人

今天来和大家聊聊ConditionCondition为AQS“家族”提供了等待与唤醒的能力,使AQS"家族"具备了像synchronized一样暂停与唤醒线程的能力。我们先来看两道关于Condition的面试题目:

  • ConditionObject的等待与唤醒有什么区别?
  • 什么是Condition队列?

接下来,我们就按照“是什么”,“怎么用”和“如何实现”的顺序来揭开Condition的面纱吧。

Condition是什么?

Condition是Java中的接口,提供了与Object#waitObject#notify相同的功能。Doug Lea在Condition接口的描述中提到了这点:

Conditions (also known as condition queues or condition variables) provide a means for one thread to suspend execution (to “wait”) until notified by another thread that some state condition may now be true.

来看Condition接口中提供了哪些方法:

public interface Condition {void await() throws InterruptedException;void awaitUninterruptibly();long awaitNanos(long nanosTimeout) throws InterruptedException;boolean await(long time, TimeUnit unit) throws InterruptedException;boolean awaitUntil(Date deadline) throws InterruptedException;void signal();void signalAll();
}

Condition只提供了两个功能:等待(await)和唤醒(signal),与Object提供的等待与唤醒时相似的:

public final void wait() throws InterruptedException;public final void wait(long timeoutMillis, int nanos) throws InterruptedException;public final native void wait(long timeoutMillis) throws InterruptedException;@HotSpotIntrinsicCandidate
public final native void notify();@HotSpotIntrinsicCandidate
public final native void notifyAll();

唤醒功能上,ConditionObject的差异并不大:

  • Condition#signal ≈ \approx ≈ Object#notify
  • Condition#signalAll = = = Object#notifyAll

多个线程处于等待状态时,Object#notify()是“随机”唤醒线程,而Condition#signal则由具体实现决定如何唤醒线程,如:ConditionObject唤醒的是最早进入等待的线程但两个方法均只唤醒一个线程。

等待功能上,ConditionObject的共同点是:都会释放持有的资源Condition释放锁Object释放Monitor,即进入等待状态后允许其他线程获取锁/监视器。主要的差异体现在Condition支持了更加丰富的场景,通过一张表格来对比下:

Condition方法 Object方法 解释
Condition#await() Object#wait() 暂停线程,抛出线程中断异常
Condition#awaitUninterruptibly() / 暂停线程,不抛出线程中断异常
Condition#await(time, unit) Object#wait(timeoutMillis, nanos) 暂停线程,直到被唤醒或等待指定时间后,超时后自动唤醒返回false,否则返回true
Condition#awaitUntil(deadline) / 暂停线程,直到被唤醒或到达指定时间点,超时后自动唤醒返回false,否则返回true
Condition#awaitNanos(nanosTimeout) / 暂停线程,直到被唤醒或等待指定时间后,返回值表示被唤醒时的剩余时间(nanosTimeout-耗时),结果为负数表示超时

除了以上差异外,Condition还支持创建多个等待队列,即同一把锁拥有多个等待队列,线程在不同队列中等待,而Object只有一个等待队列。《Java并发编程的艺术》中也有一张类似的表格,放在这里供大家参考:

Tips

  • 实际上signal翻译为唤醒并不恰当~~
  • 涉及到Condition的实现部分,下文通过AQS中的ConditionObject详细解释。

Condition怎么用?

既然ConditionObject提供的等待与唤醒功能相同,那么它们的用法是不是也很相似呢?

与调用Object#waitObject#notifyAll必须处于synchronized修饰的代码中一样(获取Monitor),调用Condition#awaitCondition#signalAll的前提是要先获取锁。但不同的是,使用Condition前,需要先通过锁去创建Condition

ReentrantLock中提供的Condition为例,首先是创建Condition对象:

ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();

然后是获取锁并调用await方法:

new Thread(() -> {lock.lock();try {condition.await();} catch (InterruptedException e) {throw new RuntimeException(e);}lock.unlock();
}

最后,通过调用singalAll唤醒全部阻塞中的线程:

new Thread(() -> {lock.lock();condition.signalAll();lock.unlock();
}

ConditionObject的源码分析

作为接口Condition非常惨,因为在Java中只有AQS中的内部类ConditionObject实现了Condition接口:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {public class ConditionObject implements Condition, java.io.Serializable {private transient Node firstWaiter;private transient Node lastWaiter;}static final class Node {// 省略}
}

ConditionObject只有两个Node类型的字段,分别是链式结构中的头尾节点,ConditionObject就是通过它们实现的等待队列。那么ConditionObject的等待队列起到了怎样的作用呢?是类似于AQS中的排队机制吗?带着这两个问题,我们正是开始源码的分析。

await方法的实现

Condition接口中定义了4个线程等待的方法:

  • void await() throws InterruptedException
  • void awaitUninterruptibly();
  • long awaitNanos(long nanosTimeout) throws InterruptedException;
  • boolean await(long time, TimeUnit unit) throws InterruptedException;
  • boolean awaitUntil(Date deadline) throws InterruptedException;

方法虽然很多,但它们之间的差异较小,只体现在时间的处理上,我们看其中最常用的方法:

public final void await() throws InterruptedException {// 线程中断,抛出异常if (Thread.interrupted()) {throw new InterruptedException();}// 注释1:加入到Condition的等待队列中Node node = addConditionWaiter();// 注释2:释放持有锁(调用AQS的release)int savedState = fullyRelease(node);int interruptMode = 0;// 注释3:判断是否在AQS的等待队列中while (!isOnSyncQueue(node)) {LockSupport.park(this);// 中断时退出方法if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {break;}}// 加入到AQS的等待队列中,调用AQS的acquireQueued方法if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {interruptMode = REINTERRUPT;}// 断开与Condition队列的联系if (node.nextWaiter != null) {unlinkCancelledWaiters();}if (interruptMode != 0) {reportInterruptAfterWait(interruptMode);}
}

注释1的部分,调用addConditionWaiter方法添加到Condition队列中:

private Node addConditionWaiter() {// 判断当前线程是否为持有锁的线程if (!isHeldExclusively()) {throw new IllegalMonitorStateException();}// 获取Condition队列的尾节点Node t = lastWaiter;// 断开不再位于Condition队列的节点if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}// 创建Node.CONDITION模式的Node节点Node node = new Node(Node.CONDITION);if (t == null) {// 队列为空的场景,将node设置为头节点firstWaiter = node;} else {// 队列不为空的场景,将node添加到尾节点的后继节点上t.nextWaiter = node;}// 更新尾节点lastWaiter = node;return node;
}

可以看到,Condition的队列是一个朴实无华的双向链表,每次调用addConditionWaiter方法,都会加入到Condition队列的尾部。

注释2的部分,释放线程持有的锁,同时移出AQS的队列,内部调用了AQS的release方法:

=final int fullyRelease(Node node) {try {int savedState = getState();if (release(savedState)) {return savedState;}throw new IllegalMonitorStateException();} catch (Throwable t) {node.waitStatus = Node.CANCELLED;throw t;}
}

因为已经分析过AQS的release方法和ReentrantLock实现的tryRelease方法,这里我们就不过多赘述了。

注释3的部分,isOnSyncQueue判断当前线程是否在AQS的等待队列中,我们来看此时存在的情况:

  • 如果isOnSyncQueue返回false,即线程不在AQS的队列中,进入自旋,调用LockSupport#park暂停线程;
  • 如果isOnSyncQueue返回true,即线程在AQS的队列中,不进入自旋,执行后续逻辑。

结合注释1和注释2的部分,Condition#await的实现原理了就很清晰了:

  • Condition与AQS分别维护了一个等待队列,而且是互斥的,即同一个节点只会出现在一个队列中
  • 当调用Condition#await时,将线程添加到Condition的队列中(注释1),同时从AQS队列中移出(注释2);
  • 接着判断线程位于的队列:
    • 位于Condition队列中,该线程需要被暂停,调用LockSupport#park
    • 位于AQS队列中,该线程正在等待获取锁。

基于以上的结论,我们已经能够猜到唤醒方法Condition#signalAll的原理了:

  • 将线程从Condition队列中移出,并添加到AQS的队列中;
  • 调用LockSupport.unpark唤醒线程。

至于这个猜想是否正确,我们接着来看唤醒方法的实现。

Tips:如果忘记了AQS中相关方法是如何实现的,可以回顾下《AQS的今生,构建出JUC的基础》。

signal和signalAll方法的实现

来看signalsignalAll的源码:

// 唤醒一个处于等待中的线程
public final void signal() {if (!isHeldExclusively()) {throw new IllegalMonitorStateException();}// 获取Condition队列中的第一个节点Node first = firstWaiter;if (first != null) {// 唤醒第一个节点doSignal(first);}
}// 唤醒全部处于等待中的线程
public final void signalAll() {if (!isHeldExclusively()){throw new IllegalMonitorStateException();}Node first = firstWaiter;if (first != null) {// 唤醒所有节点doSignalAll(first);}
}

两个方法唯一的差别在于头节点不为空的场景下,是调用doSignal唤醒一个线程还是调用doSignalAll唤醒所有线程:

private void doSignal(Node first) {do {// 更新头节点if ( (firstWaiter = first.nextWaiter) == null) {// 无后继节点的场景lastWaiter = null;}// 断开节点的连接first.nextWaiter = null;// 唤醒头节点} while (!transferForSignal(first) && (first = firstWaiter) != null);
}private void doSignalAll(Node first) {// 将Condition的队列置为空lastWaiter = firstWaiter = null;do {// 断开链接Node next = first.nextWaiter;first.nextWaiter = null;// 唤醒当前头节点transferForSignal(first);// 更新头节点first = next;} while (first != null);
}

可以看到,无论是doSignal还是doSignalAll都只是将节点移出Condition队列,而真正起到唤醒作用的是transferForSignal方法,从方法名可以看到该方法是通过“转移”进行唤醒的,我们来看源码:

final boolean transferForSignal(Node node) {// 通过CAS替换node的状态// 如果替换失败,说明node不处于Node.CONDITION状态,不需要唤醒if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) {return false;}// 将节点添加到AQS的队列的队尾// 并返回老队尾节点,即node的前驱节点Node p = enq(node);int ws = p.waitStatus;// 对前驱节点状态的判断if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) {LockSupport.unpark(node.thread);}return true;
}

transferForSignal方法中,调用enq方法将node重新添加到AQS的队列中,并返回node的前驱节点,随后对前驱节点的状态进行判断:

  • 当 w s > 0 ws > 0 ws>0时,前驱节点处于Node.CANCELLED状态,前驱节点退出锁的争抢,node可以直接被唤醒;
  • 当 w s ≤ 0 ws \leq 0 ws≤0时,通过CAS修改前驱节点的状态为Node.SIGNAL,设置失败时,直接唤醒node

《AQS的今生,构建出JUC的基础》中介绍了waitStatus的5种状态,其中Node.SIGNAL状态表示需要唤醒后继节点。另外,在分析shouldParkAfterFailedAcquire方法的源码时,我们知道在进入AQS的等待队列时,需要将前驱节点的状态更新为Node.SIGNAL

最后来看enq的实现:

private Node enq(Node node) {for (;;) {// 获取尾节点Node oldTail = tail;if (oldTail != null) {// 更新当前节点的前驱节点node.setPrevRelaxed(oldTail);// 更新尾节点if (compareAndSetTail(oldTail, node)) {oldTail.next = node;// 返回当前节点的前驱节点(即老尾节点)return oldTail;}} else {initializeSyncQueue();}}
}

enq的实现就非常简单了,通过CAS更新AQS的队列尾节点,相当于添加到AQS的队列中,并返回尾节点的前驱节点。好了,唤醒方法的源码到这里就结束了,是不是和我们当初的猜想一模一样呢?

图解ConditionObject原理

功能上,Condition实现了AQS版Object#waitObject#notify,用法上也与之相似,需要先获取锁,即需要在lockunlock之间调用。原理上,简单来说就是线程在AQS的队列和Condition的队列之间的转移

线程t持有锁

假设有线程t已经获取了ReentrantLock,线程t1,t2和t3正在AQS的队列中等待,我们可以得到这样的结构:

线程t执行Condition#await

如果线程t中调用了Condition#await方法,线程t进入Condition的等待队列中,线程t1获取ReentrantLock,并从AQS的队列中移出,结构如下:

线程t1执行Condition#await

如果线程t1中也执行了Condition#await方法,同样线程t1进入Condition队列中,线程t2获取到ReentrantLock,结构如下:

线程t2执行Condition#signal

如果线程t2执行了Condition#signal,唤醒Condition队列中的第一个线程,此时结构如下:

通过上面的流程,我们就可以得到线程是如何在Condition队列与AQS队列中转移的:

结语

关于Condition的内容到这里就结束了,无论是理解,使用还是剖析原理,Condition的难度并不高,只不过大家可能平时用得比较少,因此多少有些陌生。

最后,截止到文章发布,我应该是把开头两道题目的题解写完了吧~~


好了,今天就到这里了,Bye~~

AQS中的Condition是什么?相关推荐

  1. 详解AQS中的condition源码原理

    摘要:condition用于显式的等待通知,等待过程可以挂起并释放锁,唤醒后重新拿到锁. 本文分享自华为云社区<AQS中的condition源码原理详细分析>,作者:breakDawn. ...

  2. AQS理解之七——AQS中的条件队列

    AQS中的条件队列 在AQS中还实现了一个类,ConditionObject,它实现了Condition接口,实现一个绑定在锁上的条件队列. 先看看他的uml图. 主要方法 它实现了Condition ...

  3. 【java并发】AQS中acquire方法解析

    AQS,全名AbstractQueuedSynchronizer(抽象队列同步器),它是CLH(不明白的可以先了解一下CLH)的变种.它与CLH不同之处在于:        CLH是一种公平锁,它是通 ...

  4. java中acquire()_Java高并发系列之AQS中acquire源码解析

    我们知道,AQS中最重要的两个方法就是acquire和release方法.我们本文来走读走读acquire的源码. 首先,tryAcquire是需要子类具体去实现,其作用就是设置state的值,如果设 ...

  5. 同步器AQS中的同步队列与等待队列

    在单纯地使用锁,比如ReentrantLock的时候,这个锁组件内部有一个继承同步器AQS的类,实现了其抽象方法,加锁.释放锁也只是涉及到AQS中的同步队列而已,那么等待队列又是什么呢? 当使用Con ...

  6. Java中的Condition详解

    一.Condition简介 任意一个Java对象,都拥有一组监视器方法(定义在Object类中),主要包括wait,notify,notifyAll方法,这些方法与synchornized关键字相配合 ...

  7. AQS中公平锁和非公平锁区别,你知道么

    点击关注公众号,实用技术文章及时了解 来源:blog.csdn.net/weixin_43823391/ article/details/114259418 一.概念 注意:因为ReentrantLo ...

  8. AQS中那些不得不说的理论知识

    点击关注公众号,利用碎片时间学习 来源:blog.csdn.net/weixin_43823391/ article/details/114259377 一.概念 AQS全称为AbstractQueu ...

  9. java 线程 condition_Java编程中实现Condition控制线程通信

    java中控制线程通信的方法 1.传统的方式:利用synchronized关键字来保证同步,结合wait(),notify(),notifyall()控制线程通信.不灵活. 2.利用condition ...

最新文章

  1. 【科普】为什么ip地址通常以192.168开头?
  2. python中x 1什么意思_Python:A [1:]中x的含义是什么?
  3. php 设置统一处理错误,统一的PHP错误处理理论
  4. leetcode 131. 分割回文串 思考分析
  5. printf是如何实现变长参数的
  6. 黑马程序员_Java基础_枚举 和 单例模式实例
  7. 电路设计软件系列教程(四),Protel DXP电路设计软件之创建PCB文件
  8. Android 激活设备管理器后就无法再次打开设备管理器界面
  9. managed DLL 和 normal DLL
  10. cad2012打开后闪退_2012cad闪退怎么解决win10_cad2012闪退win10系统如何修复
  11. 【数据分析】互联网金融客户画像
  12. Python进阶【第一篇】socket
  13. 游戏开发人员需要看的书籍
  14. linux 卸载nexus,Linux下安装maven和nexus
  15. WebRTC RTCP XR
  16. 微信小程序自定义tabBar(实操)
  17. java正则验证大陆以及港澳台手机号码
  18. 从API地址获取数据并展示
  19. 图片上传之fileupload
  20. Tableau——用条形图或环形图来呈现进度百分比

热门文章

  1. MATLAB 绘制三维图 | 附多个实例
  2. R语言的原子类型和数据结构
  3. 建立在POP3协议下的Java Email
  4. tombstone问题追踪与分析
  5. 用服务提高附加值:无人机服务公司如何打造定制一站式服务?
  6. C语言实现链表的基本操作(超详细注释)
  7. 利用python实现猜数字小游戏
  8. Ubuntu19.10如何消除登录微信后弹出的Wine system tray窗口
  9. ora-00439 未启用 bit-mapped indexes
  10. 用Python解决海量数据的分类汇总~一键化办公的神器!