介绍

Condition是在JDK1.5中才出现的,它可以替代传统的Object中的wait()、notify()和notifyAll()方法来实现线程间的通信,使线程间协作更加安全和高效。

Condition是一个接口,它的定义如下:

public interface Condition {void await() throws InterruptedException;void awaitUninterruptibly();long awaitNanos(long nanosTimeout) throws InterruptedException;boolean await(long time, TimeUnit unit) throws InterruptedException;boolean awaitUntil(Date deadline) throws InterruptedException;void signal();void signalAll();
}

AQS的ConditionObject类实现了Condition接口

实现原理

ConditionObject类内部由条件队列存储每个需要等待的线程。条件队列必须与一个独占模式的锁绑定,在执行await,signal之前必须先acquire到锁。

条件队列是一个FIFO队列,是一个单向链表,包含firstWaiter,lastWaiter 2个指示节点,firstWaiter指向第一等待的节点,该节点是绑定线程的,与AQS的等待队列的head节点不同。一个节点当被signal后,会由条件队列转移到等待队列中

源码分析

源码

await

//await方法是能够响应中断的。
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 添加节点到Condition队列中Node node = addConditionWaiter();                 //【1】// 释放当前线程的lock(要把控制权释放出去,由其他线程获取到锁),从AQS的队列中移出//此处savedState 返回的是线程获取到的资源数据(比如ReentranceLock支持多次lock)int savedState = fullyRelease(node);              //【2】int interruptMode = 0;// 循环判断当前线程的Node是否在Sync队列中(被singal之后,会把节点移动到sync队列),如果不在(不在说明未被signal),则parkwhile (!isOnSyncQueue(node)) {LockSupport.park(this);    //如果发生中断,会理解从park返回。   //【3】// checkInterruptWhileWaiting方法根据中断发生的时机返回后续需要处理这次中断的方式,  如果发生中断,退出循环if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// acquireQueued需要再次获取到savedState  对应的资源(即释放多少再获取多少)if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;// 从头到尾遍历Condition队列,移除被cancel的节点if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();// 如果线程已经被中断,则根据之前获取的interruptMode的值来判断是继续中断还是抛出异常if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
}

await方法首先(调用addConditionWaiter)根据当前线程创建了一个Node(waitStauts为CONDITION),然后释放当前线程的独占锁。这里的savedState表示当前线程已经加锁的次数(ReentrantLock为重入锁)。while循环其实就是一直判断,当前的线程是否又被添加到了Sync队列中,如果已经在Sync队列中,则退出循环。调用signal方法的时候,因为这里需要唤醒之前调用await方法的线程,所以会把当前线程又加入到Sync队列中。

final int fullyRelease(Node node) {boolean failed = true;try {//占用了多少资源(这次释放多少,下次acquire时仍要获取多少)int savedState = getState();if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}
}
/*
该方法判断当前线程的node是否在Sync队列中。
*/
final boolean isOnSyncQueue(Node node) {//如果当前线程node的状态是CONDITION或者node.prev为null时说明已经在Condition队列中了,所以返回false;if (node.waitStatus == Node.CONDITION || node.prev == null)return false;//如果node.next不为null,说明在Sync队列中,返回true;if (node.next != null) // If has successor, it must be on queuereturn true;/**如果两个if都未返回时,可以断定node的prev一定不为null,next一定为null(因为node为lastWaiter),*这个时候可以认为node正处于放入Sync队列的执行CAS操作执行过程中(enq 函数调用中)。*而这个CAS操作有可能失败,所以通过findNodeFromTail再尝试一次判断。*/return findNodeFromTail(node);
}private boolean findNodeFromTail(Node node) {Node t = tail;for (;;) {if (t == node)return true;if (t == null)return false;t = t.prev;}
}private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;// 执行findNodeFromTail方法时可能一直在此自旋if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}

中断处理:

/*
判断等待过程中是否发生中断
*/
private int checkInterruptWhileWaiting(Node node) {return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0;
}/*
判断中断的时候,是否有signal方法的调用
*/
final boolean transferAfterCancelledWait(Node node) {/*CAS成功,说明signal还未执行,因为signal之后,会把waitStatus修改为SIGNAL。*/if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {//未signal,则把节点加入到sync队列中去。enq(node);//返回true。表示未发生中断。return true;}/**CAS失败了,则不能判断当前线程是先进行了中断还是先进行了signal方法的调用,可能是先执行了*signal然后中断,也可能是先执行了中断,后执行了signal,当然,这两个操作肯定是发生在CAS之*前。这时需要做的就是等待当前线程的node被添加到Sync队列后,也就是enq方法返回后,返回false*告诉checkInterruptWhileWaiting方法返回REINTERRUPT,后续进行重新中断。*/while (!isOnSyncQueue(node))Thread.yield();return false;
}

signal

        public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);}public final void signalAll() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignalAll(first);}

不管是signal,还是signalAll,都需要先取得lock。

/*
唤醒第一个可以被唤醒的节点。
*/
private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);    //【4】
}//释放条件队列,把每个节点都转化为sync节点
private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;do {Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first);first = next;} while (first != null);
}
/*
把节点移动到sync队列,并且尝试把节点状态改为0,把前置节点状态改为SIGNAL
*/
final boolean transferForSignal(Node node) {/** CAS失败,说明状态为CANCELLED,*/if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))   //【5】return false;/** CAS成功,把节点加入到sync队列。*/Node p = enq(node);//返回的是node前置节点。     //【6】int ws = p.waitStatus;/*ws>0表明,节点CANCLEED。如果为0,需要把前置节点设置为SIGNAL。设置失败则park如果p节点的线程在这时执行了unlock方法,就会调用unparkSuccessor方法,unparkSuccessor方法可   能就将p的状态改为了0,那么执行就会失败。*/if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))  //【7】LockSupport.unpark(node.thread);return true;
}

示例

public class ConTest2 {  private int queueSize = 5;  private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);  private Lock lock = new ReentrantLock();  private Condition notFull = lock.newCondition();  private Condition notEmpty = lock.newCondition();  public static void main(String[] args) throws InterruptedException  {  ConTest2 test = new ConTest2();  Producer producer1 = test.new Producer();  Consumer consumer1 = test.new Consumer();    Consumer consumer2 = test.new Consumer();              producer1.start();  Thread.sleep(1000);consumer1.start();  consumer2.start();}  class Consumer extends Thread{              @Override  public void run() {  consume();  }  volatile boolean flag=true;    private void consume() {  while(flag){  lock.lock();  try {  while(queue.size() == 0){  try {  System.out.println("队列空,等待数据");  notEmpty.await();  } catch (InterruptedException e) {                              flag =false;  }  }  queue.poll();                //每次移走队首元素  notFull.signal();  System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");  } finally{  lock.unlock();  }  }  }  }  class Producer extends Thread{              @Override  public void run() {  produce();  }  volatile boolean flag=true;    private void produce() {  while(flag){  lock.lock();  try {  while(queue.size() == queueSize){  try {  System.out.println("队列满,等待有空余空间");  notFull.await();  } catch (InterruptedException e) {  flag =false;  }  }  queue.offer(1);        //每次插入一个元素  notEmpty.signal();  System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));  } finally{  lock.unlock();  }  }  }  }  }  

以上示例为典型生产者-消费者模式,p1,最先获取到lock。

AQS图例

1、P1,C1,C2线程开始,假设P1先获取到lock,并且把队列写满,C1,C2才启动。此时C1,C2线程获取lock失败,加入同步队列中。

2、P1执行,发现队列满,执行notFull.await()方法,经过【1】,【2】在【3】处park,队列状态如下:

此时,C1线程被unpark,P1线程在条件队列notFull中。

2、C1执行 lock方法成功,然后移出一个元素,调用notFull.signal(),经过【4】,【5】,【6】之后,P1关联节点由条件队列转移到同步队列

3、C1释放锁,假设C1一直可以获取到锁,再释放锁,这样P1,C2一直会在sync队列中。一直到C1把队列消费完,调用notEmpty.await(),C2获取到锁,但是由于队列为空,C2也调用notEmpty.await(),此时P1获取到锁。状态如下:

4、当P1调用notEmpty.signalAll()时,把C1,C2对应节点再加入到sync队列

队列节点的状态

调用条件队列的等待操作,会设置节点的waitingStatus为Condition,标识当前节点正处于条件队列中。条件队列的节点状态转换图如下:

Node的各个状态的主要作用:Cancelled主要是解决线程在持有锁时被外部中断的逻辑,AQS的可中断锁获取方法lockInterrutible()是基于该状态实现的。显式锁必须手动释放锁,尤其是有中断的环境中,一个线程被中断可能仍然持有锁,所以必须注意在finally中unlock。Condition则是支持条件队列的等待操作,是Lock与条件队列关联的基础。Signal是阻塞后继线程的标识,一个等待线程只有在其前驱节点的状态是SIGNAL时才会被阻塞,否则一直执行自旋尝试操作,以减少线程调度的开销。

条件队列上的等待和唤醒操作,本质上是节点在AQS线程等待队列和条件队列之间相互转移的过程,当需要等待某个条件时,线程会将当前节点添加到条件队列中,并释放持有锁;当某个线程执行条件队列的唤醒操作,则会将条件队列的节点转移到AQS等待队列。每个Condition就是一个条件队列,可以通过Lock的newCondition创建多个等待条件。AQS的条件队列,它的等待和唤起操作流程如下:

await与awaitUninterruptibly()比较

await()方法是可中断方法,如果有中断抛出中断异常。

awaitUninterruptibly(),如果有中断,仅重新设置中断状态。

await方法的几种实现

与Object类中wait,notify比较

Condition与Object类中的方法对应如下:

Object Condition
wait() await()
notify() signal()
notifyAll() signalAll()

Condition.await()与Object.wait(),都必须先获取到锁,才可以执行,执行后释放锁

不同的是Condition与Lock结合,wait与synchronized结合。

多线程环境的下,线程直接的互斥[执行]依靠的应该是锁Lock,线程的之间的[通信]依靠的应该是条件Condition/信号,一般情况下lock确实可以同时满足做这两个事情,所以在Object的方式满足了这个一般情况,但是肯定会有复杂的场景比如刚才例子中,需要让满足一定条件的线程执行,仅仅依靠锁是不能完美解决的。所以condition实际上分离了执行和通信

ConditionObject源码相关推荐

  1. 线程条件队列ConditionObject源码解读

    小记 好久没更博,窗外光芒万丈,冬日的晚晨,多么美好,就不浪费了,循着键盘上的点点星辰,开工! 啥子是条件队列? 我们都知道,在万类之祖Object里面定义了几个监视器方法:wait(),notify ...

  2. ReentrantLock源码

    Sync abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serial ...

  3. Java并发包源码学习之AQS框架(三)LockSupport和interrupt

    接着上一篇文章今天我们来介绍下LockSupport和Java中线程的中断(interrupt). 其实除了LockSupport,Java之初就有Object对象的wait和notify方法可以实现 ...

  4. java condition_死磕 java同步系列之ReentrantLock源码解析(二)

    (手机横屏看源码更方便) 问题 (1)条件锁是什么? (2)条件锁适用于什么场景? (3)条件锁的await()是在其它线程signal()的时候唤醒的吗? 简介 条件锁,是指在获取锁之后发现当前业务 ...

  5. java.util.concurrent.locks.Condition 源码

    2019独角兽企业重金招聘Python工程师标准>>> 相关类图: 使用Condition和 object作为条件队列的区别: object: 只能绑定一个内部队列,使用notify ...

  6. 拍卖源码java_Java并发的AQS原理详解

    原文:https://juejin.im/post/5c11d6376fb9a049e82b6253 每一个 Java 的高级程序员在体验过多线程程序开发之后,都需要问自己一个问题,Java 内置的锁 ...

  7. [源码]解析 SynchronousQueue 上界,下界.. 数据保存和数据传递. 堵塞队列. 有无频繁await?...

    简析SynchronousQueue.LinkedBlockingQueue(两个locker,更快),ArrayBlockingQueue(一个locker,读写都竞争) 三者都是blockingQ ...

  8. Java并发——结合CountDownLatch源码、Semaphore源码及ReentrantLock源码来看AQS原理

    前言: 如果说J.U.C包下的核心是什么?那我想答案只有一个就是AQS.那么AQS是什么呢?接下来让我们一起揭开AQS的神秘面纱 AQS是什么? AQS是AbstractQueuedSynchroni ...

  9. java Lock 源码分析

    java 中的lock 先关的类路径: Lock 是个接口,源码: // // Source code recreated from a .class file by IntelliJ IDEA // ...

最新文章

  1. 去了新公司,物理通过
  2. zk reconnect
  3. java中的事件派发机制_事件派发器模式
  4. wlan 网速测试软件,WiFi大师网速测试
  5. glibc升级失败及处理过程
  6. html菜鸟教程选项卡,jQuery EasyUI 布局插件 – Tabs 标签页/选项卡 | 菜鸟教程
  7. 大数据压缩处理:数据分卷压缩和分卷压缩解压
  8. LINUX 常见问题1000个详细解答
  9. 前端学习笔记,加油!
  10. 7-4 sdust-Java-字符串集合求并集 (20分)
  11. VS下同一个solution下不同project之间头文件的相互调用
  12. 本土英雄的退场和归来:Micromax教给了在印中国手机厂商哪些事?
  13. ESP8266-Arduino编程实例-2.8寸TFT LCD驱动(ILI9341控制器)
  14. 灵魂深处的眼泪 秋枫
  15. 如何理解MPC模型预测控制理论
  16. 详细介绍js函数中的arguments
  17. 明源笔试题目--将一个正整数分解质因数
  18. python数据分析的一般步骤_50. Python 数据处理(1)
  19. hexo 博客next主题集成gitment或者gitalk评论系统
  20. 将数组中的对象按照浏览器的x/y轴的显示方式进行排序

热门文章

  1. 《JavaScript 高级程序设计》 7.5 常用模式
  2. commons dbutils 的介绍与使用
  3. 【CentOS 7笔记46】,crondtab任务计划和chkconfig系统服务管理#
  4. 基于canvas的图片压缩函数实现
  5. React Native Weex 区别
  6. 分布式文件系统之MooseFS----管理优化
  7. Log4Net 最简配置
  8. no nlsxbe in java.library.path
  9. NSubstitute完全手册(八)替换返回值
  10. Sublime Text2,跨平台神级编辑器乱码问题解决