Java并发之Condition接口
目录
- `Condition` 接口简介
- `Condition` 的应用
- `Condition` 接口源码
- `Condition` 实现原理
- 等待队列
- 等待(调用 `await()`)
- 通知或唤醒
- 总结
Condition
接口简介
任意一个 Java
对象都拥有一组监视器方法(定义在 java.lang.Object
上),主要包括 wait()、wait(long timeout)、notify()
以及 notifyAll()
方法,这些方法与 synchronized
同步关键字配合,可以实现线程之间的等待,唤醒
在 jdk 1.5
之后提供了 Lock
接口,Lock
又提供了条件 Condition
接口,使得对线程的等待、唤醒操作更加详细和灵活。Condition
接口与 Lock
配合也可以实现线程之间的等待,唤醒;但是这两者在使用方式以及功能特性上还是有差别的
对比项 | Object 监视器方法 |
Condition
|
---|---|---|
前置条件 | 获取对象的锁 |
1.调用Lock.lock()获取 2.调用Lock.newCondition()获取Condition对象
|
调用方式 |
直接调用,如object.wait()
|
直接调用,如condition.await()
|
等待队列个数 | 一个 | 多个 |
当前线程释放锁并进入等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态,在等待状态中不响应终端 | 不支持 | 支持 |
当前线程释放锁并进入超时等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态到将来的某个时间 | 不支持 | 支持 |
唤醒等待队列中的一个线程 | 支持 | 支持 |
唤醒等待队列中的全部线程 | 支持 | 支持 |
- 一个
Lock
对象里可以创建多个condition
实例,线程可以注册在指定的condition
中从而选择性的进行线程通知,在调度线程上更加灵活 - 在使用
notify(),notifuAll()
方法进行唤醒时,被调度的线程是由cpu
随机选择的。但使用ReentrantLock
结合condition
是可以实现选择性通知
,这个功能是非常重要的,而且在condition
类中默认提供的 - 而
synchronized
就相当于整个Lock
对象中只有一个单一的condition
对象,所有的线程都注册在它一个对象上。线程开始notifyAll()
时,需要通知所有的WAITING
线程,没有选择权 Conditon
中的await()
对应Object
的wait()
Condition
中的signal()
对应Object
的notify()
Condition
中的signalAll()
对应Object
的notifyAll()
Condition
的应用
@Slf4j
public class ProducerConsumerTest {private final Lock lock = new ReentrantLock();// 通过Lock获取两个Condition实例private final Condition addCondition = lock.newCondition();private final Condition removeCondition = lock.newCondition();private final LinkedList<Integer> resources = new LinkedList<>();private final int maxSize;public ProducerConsumerTest(int maxSize) {this.maxSize = maxSize;}public class Producer implements Runnable {private final int proSize;private Producer(int proSize) {this.proSize = proSize;}@Overridepublic void run() {lock.lock();try {for (int i = 1; i < proSize; i++) {log.info("已经生产产品数: " + i + "\t现仓储量总量:" + resources.size());resources.add(i);while (resources.size() >= maxSize) {log.info("当前仓库已满,等待消费...");try {// 进入阻塞状态addCondition.await();} catch (InterruptedException e) {e.printStackTrace();}}// 唤醒消费者removeCondition.signal();}} finally {lock.unlock();}}}public class Consumer implements Runnable {@Overridepublic void run() {String threadName = Thread.currentThread().getName();while (true) {lock.lock();try {while (resources.size() <= 0) {log.info(threadName + " 当前仓库没有产品,请稍等...");try {// 进入阻塞状态removeCondition.await();} catch (InterruptedException e) {e.printStackTrace();}}// 消费数据int size = resources.size();for (int i = 0; i < size; i++) {Integer remove = resources.remove();log.info(threadName + " 当前消费产品编号为:" + remove);}// 唤醒生产者addCondition.signal();} finally {lock.unlock();}}}}public static void main(String[] args) throws InterruptedException {ProducerConsumerTest producerConsumerTest = new ProducerConsumerTest(10);Producer producer = producerConsumerTest.new Producer(20);Consumer consumer = producerConsumerTest.new Consumer();final Thread producerThread = new Thread(producer, "producer");final Thread consumerThread = new Thread(consumer, "consumer");producerThread.start();TimeUnit.SECONDS.sleep(2);consumerThread.start();}
}
运行结果
17:53:55.123 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 1 现仓储量总量:0
17:53:55.129 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 2 现仓储量总量:1
17:53:55.129 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 3 现仓储量总量:2
17:53:55.129 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 4 现仓储量总量:3
17:53:55.129 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 5 现仓储量总量:4
17:53:55.130 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 6 现仓储量总量:5
17:53:55.130 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 7 现仓储量总量:6
17:53:55.130 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 8 现仓储量总量:7
17:53:55.130 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 9 现仓储量总量:8
17:53:55.130 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 10 现仓储量总量:9
17:53:55.130 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 当前仓库已满,等待消费...
17:53:57.133 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:1
17:53:57.134 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:2
17:53:57.134 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:3
17:53:57.134 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:4
17:53:57.134 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:5
17:53:57.134 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:6
17:53:57.135 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:7
17:53:57.135 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:8
17:53:57.135 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:9
17:53:57.135 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:10
17:53:57.138 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 11 现仓储量总量:0
17:53:57.139 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 12 现仓储量总量:1
17:53:57.140 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 13 现仓储量总量:2
17:53:57.140 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 14 现仓储量总量:3
17:53:57.140 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 15 现仓储量总量:4
17:53:57.140 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 16 现仓储量总量:5
17:53:57.140 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 17 现仓储量总量:6
17:53:57.140 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 18 现仓储量总量:7
17:53:57.140 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 19 现仓储量总量:8
17:53:57.140 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:11
17:53:57.141 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:12
17:53:57.141 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:13
17:53:57.141 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:14
17:53:57.141 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:15
17:53:57.141 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:16
17:53:57.141 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:17
17:53:57.141 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:18
17:53:57.141 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:19
17:53:57.141 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前仓库没有产品,请稍等...
Condition
接口源码
package java.util.concurrent.locks;public interface Condition { void await() throws InterruptedException;void awaitUninterruptibly();long awaitNanos(long nanosTimeout) throws InterruptedException;boolean await(long time, TimeUnit unit) throws InterruptedException;boolean awaitUntil(Date deadline) throws InterruptedException;void signal();void signalAll();
}
await()
:当前线程进入等待状态直到被通知或中断
。该方法返回时,该线程肯定又一次获取了锁awaitUninterruptibly()
:当前线程进入等待状态直到被通知
,等待过程中不响应中断
。该方法返回时,该线程肯定又一次获取了锁awaitNanos()
:当前线程进入等待状态直到被通知,中断,或者超时
。返回值(超时时间减
实际返回所用时间)如果是0
或者负数,那么可以认定已经超时了。该方法返回时,该线程肯定又一次获取了锁await(long time, TimeUnit unit)
:当前线程进入等待状态直到被通知,中断,或者超时
。如果在从此方法返回前检测到等待时间超时,则返回false
,否则返回true
。该方法返回时,该线程肯定又一次获取了锁awaitUntil(Date deadline)
:当前线程进入等待状态直到被通知,中断或者超过指定时间点
。如果没有到指定时间就被通知,则返回true
,否则返回false
signal()
:唤醒一个等待在Condition
上的线程,该线程从等待方法返回前必须获得与Condition
相关联的锁signalAll()
: 唤醒所有等待在Condition
上的线程,能够从等待方法返回的线程必须获得与Condition
相关联的锁
Condition
是一种广义上的条件队列。它为线程提供了一种更为灵活的等待/
唤醒模式,线程在调用 await()
后执行挂起操作,直到线程等待的某个条件为真时才会被唤醒。Condition
必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个 Condition 的实例必须与一个 Lock 绑定
,因此 Condition
一般都是作为 Lock
的内部实现
Condition
实现原理
获取一个 Condition
必须要通过 Lock
的 newCondition()
方法。该方法定义在接口 Lock
中,返回的结果是绑定到此 Lock
实例的新 Condition
实例
package java.util.concurrent.locks;public interface Lock {void lock();void lockInterruptibly() throws InterruptedException;boolean tryLock();boolean tryLock(long time, TimeUnit unit) throws InterruptedException;void unlock();Condition newCondition();
}
Condition
是一个接口,其下仅有一个实现类 ConditionObject
,由于 Condition
的操作需要获取相关的锁,而 AQS
则是同步锁的实现基础,所以 ConditionObject
则定义为 AQS
的内部类
package java.util.concurrent.locks;public abstract class AbstractQueuedLongSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {.........public class ConditionObject implements Condition, java.io.Serializable {}
}
等待队列
每个 Condition
对象都包含着一个 FIFO
队列,该队列是 Condition
对象通知/
等待功能的关键。在队列中每一个节点都包含着一个线程引用,该线程就是在该 Condition
对象上等待的线程
public class ConditionObject implements Condition, java.io.Serializable {private static final long serialVersionUID = 1173984872572414699L;// 头节点private transient Node firstWaiter;// 尾节点private transient Node lastWaiter;public ConditionObject() {}/**省略方法*/
}
从上面代码可以看出 Condition
拥有首节点 firstWaiter
,尾节点 lastWaiter
。当前线程调用 await()
方法后
- 将会以当前线程构造节点
Node
,并将节点加入到该等待队列的尾部 Condition
拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter
指向它,并且更新尾节点即可- 上述节点引用更新的过程并没有使用
CAS
保证,原因在于调用await()
方法的线程必定是获取了锁的线程,也就是说该过程是由锁
来保证线程安全的
事实上,节点 Node
的定义复用了同步器中节点的定义,也就是说,同步队列和等待队列中节点类型都是同步器的静态内部类 AbstractQueuedSynchronizer.Node
在 Object
的监视器模型上,一个对象拥有一个同步队列和等待队列
,而并发包中的 Lock
(更确切地说是同步器)拥有一个同步队列和多个等待队列
(可以创建多个 condition
实例)
等待(调用 await()
)
调用 Condition
的 await()
(或者以 await
开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为 WAITING
状态。当从 await()
方法返回时,当前线程一定获取了 Condition
相关联的锁
如果从队列(同步队列和等待队列)的角度看 await()
方法,当调用 await()
方法时,相当于同步队列的首节点(获取了锁的节点)移动到 Condition
的等待队列中
AQS
中的内部类 ConditionObject
所实现的 await()
public class ConditionObject implements Condition, java.io.Serializable {public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 同步队列的首节点并不会直接加入等待队列,而是通过addConditionWaiter()方法把当前线程构造成一个新的节点并将其加入等待队列中Node node = addConditionWaiter();// 释放同步状态,也就是释放锁int savedState = fullyRelease(node);int interruptMode = 0;// 检测节点是否在同步队列中,若节点没有在同步队列中则一直阻塞while (!isOnSyncQueue(node)) {// 阻塞挂起线程LockSupport.park(this);// 检查阻塞过程中是否发生中断,若有中断则退出while循环if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 调用signal()方法将当前线程从等待队列放到同步队列中// 线程被唤醒,退出while循环// acquireQueued()方法尝试获取同步状态if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;// 清理条件队列中的不是在等待条件的节点if (node.nextWaiter != null)unlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}
}
- 首先将当前线程新建一个节点同时加入到条件等待队列中
- 然后释放当前线程持有的同步状态
- 然后不断检测该节点代表的线程,是否出现在同步队列中(收到
signal()
信号之后就会在AQS
队列中检测到),如果不存在则一直挂起,否则参与竞争同步状态
如果从队列的角度去看,当前线程加入 Condition
的等待队列,该过程如图所示
通知或唤醒
调用 Condition
的 signal()
方法,将会唤醒在等待队列中 等待时间最长的首节点
,在唤醒节点之前,会将节点移到同步队列中
AQS
中的内部类 ConditionObject
所实现的 signal()
public class ConditionObject implements Condition, java.io.Serializable {public final void signal() {// 如果当前线程不是获取锁的线程,则抛出异常if (!isHeldExclusively())throw new IllegalMonitorStateException();// 头节点,唤醒条件队列中的第一个节点 Node first = firstWaiter;if (first != null)// 唤醒doSignal(first);}
}
- 判断当前线程是否已经获取了锁,如果没有获取则直接抛出异常,因为获取锁为通知的前置条件
- 如果线程已经获取了锁,则将唤醒条件等待队列的首节点
- 唤醒首节点是先将条件等待队列中的头节点移出,然后调用
AQS
的enq(Node node)
方法将其安全地移到同步队列中 - 最后判断如果该节点的同步状态是否为
Cancel
,或者修改状态为Signal
失败时,则直接调用LockSupport
唤醒该节点的线程
被唤醒后的线程,将从 await()
中的 while
循环中退出(isOnSyncQueue(Node node)
方法返回 true
,节点已经在同步队列中),进而调用同步器的 acquireQueued()
加入到获取同步状态的竞争中
成功获取同步状态(或者说锁)之后,被唤醒的线程将从先前调用的 await()
方法返回,此时该线程已经成功地获取了锁
Condition
的 signalAll()
方法,相当于对等待队列中的每个节点均执行一次 signal()
方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程
总结
- 一个线程获取锁后,通过调用
Condition
的await()
方法,会将当前线程先加入到条件等待队列中 - 然后释放锁,最后通过
isOnSyncQueue(Node node)
不断自检看节点是否已经在同步队列了,如果是则尝试获取锁,否则一直挂起 - 当线程调用
signal()
方法后,程序首先检查当前线程是否获取了锁,然后通过doSignal(Node first)
唤醒等待队列的首节点。被唤醒的线程,将从await()
中的while
循环中退出来,然后调用acquireQueued()
竞争同步状态
Condition
源码详解:https://blog.csdn.net/yakax/article/details/112982654
Java并发之Condition接口相关推荐
- Java并发之Condition的实现分析
2019独角兽企业重金招聘Python工程师标准>>> 一.Condition的概念 介绍 回忆 synchronized 关键字,它配合 Object 的 wait().notif ...
- java并发之Condition图解与原理剖析
1.Condition定义 Condition是一个接口,定义在juc中(java.util.concurrent.locks.Condition),它的主要功能类似于wait()/notify(), ...
- java并发condition_Java并发之Condition的实现分析
一.Condition的概念 介绍 回忆 synchronized 关键字,它配合 Object 的 wait().notify() 系列方法可以实现等待/通知模式. 对于 Lock,通过 Condi ...
- Java 并发之 FutureTask 的基本使用
为什么80%的码农都做不了架构师?>>> 上次我们说到了 JUC 中的 Future 接口,在最后提到了 FutureTask.CompletionService 等.我们这次 ...
- Java并发编程——详解AQS对Condition接口的具体实现
目录 一.等待/通知机制与Condition接口 1.1 等待/通知机制 1.2 Condition接口 二.AQS的具体实现 2.1 ConditionObject 2.2 等待机制 2.3 通知机 ...
- java condition原理_java中Condition接口原理及实现
Condition是在java 1.5中才出现的,它用来替代传统的Object的wait().notify()实现线程间的协作,相比Object的wait().notify(),使用Condition ...
- 面试:你说你精通Java并发,给我讲讲Java并发之J.U.C
转载自 面试:你说你精通Java并发,给我讲讲Java并发之J.U.C J.U.C J.U.C即java.util.concurrent包,为我们提供了很多高性能的并发类,可以说是java并发的核心. ...
- Java 并发之 AQS 详解(上)
Java 并发之 AQS 详解 前言 Java SDK 为什么要设计 Lock 死锁问题 synchronized 的局限性 显式锁 Lock Lock 使用范式 Lock 是怎样起到锁的作用呢? 队 ...
- Java并发之AQS详解(文章里包含了两片文章结合着看后边文章不清楚,请看原文)
AQS全称抽象队列同步器(AbstractQuenedSynchronizer),它是一个可以用来实现线程同步的基础框架.当然,它不是我们理解的Spring这种框架,它是一个类,类名就是A ...
- Java并发之AQS详解
一.概述 谈到并发,不得不谈ReentrantLock:而谈到ReentrantLock,不得不谈AbstractQueuedSynchronizer(AQS)! 类如其名,抽象的队列式的同步器,AQ ...
最新文章
- Unity3D 材料
- c语言funcode空格消失的函数,01北科大暑期计算机实践FunCode游戏设计+C++课程设计 - 海底世界 - 图文...
- c语言编码表白,C语言告白代码,一闪一闪亮晶晶~
- 带你遍历用户生命价值与流失挽救(上) : 流量下的价值套路
- 图解 CSS (9): 列表
- 洛谷 P1757 通天之分组背包
- C/C++信息隐写术(一)之认识文件结构
- element筛选 ajax,vue使用element Transfer 穿梭框实现ajax请求数据和自定义查询
- Cinemachine教程 | Unity中如何快速制作镜头晃动?
- android开发实践之1:安装部署环境设置
- poj3233(Matrix Power Series)快速幂
- 图像空域增强:灰度映射法
- 自拟计算机作文100字,三年级自拟作文100字
- 【python初级】os.listdir返回目录中包含的文件以及文件夹的列表
- java中bean是什么_java中bean是什么意思?
- Node.js基础(二)-- 模块化、npm与包
- JAVA pinyin4j 中文多音字转拼音转字母大写
- 全球首个 AI 说唱歌手 TikTok 发新歌,东西方审美差异巨大
- Office2019 卸载重新安装-2022 Win10【不花钱+官方途径】
- 2022CoCa: Contrastive Captioners are Image-Text Fountion Models
热门文章
- java复杂的代码做程序_摆脱复杂烧脑的程序代码,利用快速开发平台轻轻松松做软件...
- 微软改进的DSSM结构:
- 273.整数转换英文表示
- php5中使用xslt扩展,.NET_解析在.net中使用XSLT转换xml文档的示例详解,XSL即可扩展的样式表文件。 可 - phpStudy...
- vs 服务容器中已存在服务_无服务器vs容器,企业如何正确选择?
- leetcode 868. Binary Gap
- 2015-11-30 20:59:08之自力更生
- 【文献阅读笔记】(2):使用IMPUTES2和minimac软件完成群体特异性的基因型填充(Imputation)
- 常用 Jacobi 行列式 | 重积分变量替换
- 应急响应的整体思路一