在java.util.concurrent包中,有两个很特殊的工具类,Condition和ReentrantLock,使用过的人都知道,ReentrantLock(重入锁)是jdk的concurrent包提供的一种独占锁的实现。它继承自Dong Lea的 AbstractQueuedSynchronizer(同步器),确切的说是ReentrantLock的一个内部类继承了AbstractQueuedSynchronizer,ReentrantLock只不过是代理了该类的一些方法,可能有人会问为什么要使用内部类在包装一层? 我想是安全的关系,因为AbstractQueuedSynchronizer中有很多方法,还实现了共享锁,Condition(稍候再细说)等功能,如果直接使ReentrantLock继承它,则很容易出现AbstractQueuedSynchronizer中的API被无用的情况。

言归正传,今天,我们讨论下Condition工具类的实现。

ReentrantLock和Condition的使用方式通常是这样的:

运行后,结果如下:

可以看到,

Condition的执行方式,是当在线程1中调用await方法后,线程1将释放锁,并且将自己沉睡,等待唤醒,

线程2获取到锁后,开始做事,完毕后,调用Condition的signal方法,唤醒线程1,线程1恢复执行。

以上说明Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备( signal 或者 signalAll方法被带调用)时 ,这些等待线程才会被唤醒,从而重新争夺锁。

那,它是怎么实现的呢?

首先还是要明白,reentrantLock.newCondition() 返回的是Condition的一个实现,该类在AbstractQueuedSynchronizer中被实现,叫做newCondition()

它可以访问AbstractQueuedSynchronizer中的方法和其余内部类( AbstractQueuedSynchronizer是个抽象类,至于他怎么能访问,这里有个很奇妙的点,后面我专门用demo说明 )

现在,我们一起来看下Condition类的实现,还是从上面的demo入手,

为了方便书写,我将AbstractQueuedSynchronizer缩写为AQS

当await被调用时,代码如下:

public final void await() throws InterruptedException {
if (Thread.interrupted())
 throw new InterruptedException();
 Node node = addConditionWaiter(); //将当前线程包装下后,
                                   //添加到Condition自己维护的一个链表中。
int savedState = fullyRelease(node);//释放当前线程占有的锁,从demo中看到,
                                       //调用await前,当前线程是占有锁的
int interruptMode = 0;
 while (!isOnSyncQueue(node)) {//释放完毕后,遍历AQS的队列,看当前节点是否在队列中,
                           //不在 说明它还没有竞争锁的资格,所以继续将自己沉睡。
                             //直到它被加入到队列中,聪明的你可能猜到了,
                            //没有错,在singal的时候加入不就可以了?
 LockSupport.park(this);
 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
 break;
 }
//被唤醒后,重新开始正式竞争锁,同样,如果竞争不到还是会将自己沉睡,等待唤醒重新开始竞争。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
 interruptMode = REINTERRUPT;
 if (node.nextWaiter != null)
 unlinkCancelledWaiters();
 if (interruptMode != 0)
 reportInterruptAfterWait(interruptMode);
 }

回到上面的demo,锁被释放后,线程1开始沉睡,这个时候线程因为线程1沉睡时,会唤醒AQS队列中的头结点,所所以线程2会开始竞争锁,并获取到,等待3秒后,线程2会调用signal方法,“发出”signal信号,signal方法如下:

public final void signal() {
 if (!isHeldExclusively())
 throw new IllegalMonitorStateException();
 Node first = firstWaiter; //firstWaiter为condition自己维护的一个链表的头结点,
                          //取出第一个节点后开始唤醒操作
 if (first != null)
 doSignal(first);
 }

说明下,其实Condition内部维护了等待队列的头结点和尾节点,该队列的作用是存放等待signal信号的线程,该线程被封装为Node节点后存放于此。

关键的就在于此,我们知道AQS自己维护的队列是当前等待资源的队列,AQS会在资源被释放后,依次唤醒队列中从前到后的所有节点,使他们对应的线程恢复执行。直到队列为空。

而Condition自己也维护了一个队列,该队列的作用是维护一个等待signal信号的队列,两个队列的作用是不同,事实上,每个线程也仅仅会同时存在以上两个队列中的一个,流程是这样的:

1. 线程1调用reentrantLock.lock时,线程被加入到AQS的等待队列中。

2. 线程1调用await方法被调用时,该线程从AQS中移除,对应操作是锁的释放。

3. 接着马上被加入到Condition的等待队列中,以为着该线程需要signal信号。

4. 线程2,因为线程1释放锁的关系,被唤醒,并判断可以获取锁,于是线程2获取锁,并被加入到AQS的等待队列中。

5.  线程2调用signal方法,这个时候Condition的等待队列中只有线程1一个节点,于是它被取出来,并被加入到AQS的等待队列中。  注意,这个时候,线程并没有被唤醒。

6. signal方法执行完毕,线程2调用reentrantLock.unLock()方法,释放锁。这个时候因为AQS中只有线程1,于是,AQS释放锁后按从头到尾的顺序唤醒线程时,线程1被唤醒,于是线程1回复执行。

7. 直到释放所整个过程执行完毕。

可以看到,整个协作过程是靠结点在AQS的等待队列和Condition的等待队列中来回移动实现的,Condition作为一个条件类,很好的自己维护了一个等待信号的队列,并在适时的时候将结点加入到AQS的等待队列中来实现的唤醒操作。

看到这里,signal方法的代码应该不难理解了。

取出头结点,然后doSignal

private void doSignal(Node first) {
 do {
 if ( (firstWaiter = first.nextWaiter) == null) //修改头结点,完成旧头结点的移出工作
 lastWaiter = null;
 first.nextWaiter = null;
 } while (!transferForSignal(first) &&//将老的头结点,加入到AQS的等待队列中
 (first = firstWaiter) != null);
 }
final boolean transferForSignal(Node node) {
 /*
 * If cannot change waitStatus, the node has been cancelled.
 */
 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
 return false;
/*
 * Splice onto queue and try to set waitStatus of predecessor to
 * indicate that thread is (probably) waiting. If cancelled or
 * attempt to set waitStatus fails, wake up to resync (in which
 * case the waitStatus can be transiently and harmlessly wrong).
 */
 Node p = enq(node);
 int ws = p.waitStatus;
//如果该结点的状态为cancel 或者修改waitStatus失败,则直接唤醒。
 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
 LockSupport.unpark(node.thread);
 return true;
 }

可以看到,正常情况 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 这个判断是不会为true的,所以,不会在这个时候唤醒该线程。

只有到发送signal信号的线程调用reentrantLock.unlock()后因为它已经被加到AQS的等待队列中,所以才会被唤醒。

总结:

本文从代码的角度说明了Condition的实现方式,其中,涉及到了AQS的很多操作,比如AQS的等待队列实现独占锁功能,不过,这不是本文讨论的重点,等有机会再将AQS的实现单独分享出来。

2

更多

  • 关于Condition中访问AQS方法的问题
  • 深度解析Java8 – AbstractQueuedSynchronizer的实现分析(上)
  • 从LongAdder 看更高效的无锁实现
  • FutureTask 源码解析
  • 一种超时控制的方式
  • ThreadLocal内存泄露分析
Zemanta

    
0.00 avg. rating (0% score) - 0 votes

program language AbstractQueuedSynchronizer, Condition, java, 多线程permalink

Post navigation

 使用WORDPRESS搭建自己的博客
关于CONDITION中访问AQS方法的问题 

7 thoughts on “怎么理解Condition”

  1. 信言说道:
    2014年2月15日 下午11:00

    向楼主请教一个问题,如最后的流程所述:
    “1. 线程1调用reentrantLock.lock时,线程被加入到AQS的等待队列中。”
    此时无线程争用锁,线程1会先tryAcquire一次,成功则无需入队。因此我认为此处线程1并未加入到AQS的等待队列中。

    “2. 线程1调用await方法被调用时,该线程从AQS中移除,对应操作是锁的释放。”
    我理解lock之后unlock之前都是在临界区内,此时调await直接释放锁(离开临界区)OK,但无需从AQS移除,因为移除是即将进入临界区那一刻的事情。

    “4. 线程2,因为线程1释放锁的关系,被唤醒,并判断可以获取锁,于是线程2获取锁,并被加入到AQS的等待队列中。”
    同理,我认为最后一句加入到AQS队列有误。

    另外,楼主的代码中变量名最好改成thread1和thread2方便对号入座,谢谢!

    回复
    1. liuinsect说道:
      2014年2月17日 下午3:12

      你好,根据你的描述,依次回复下你的问题:
      1. AQS中维护者唯一的一个队列,该队列支持两种模式:独占模式和共享模式,本文中提到的reentrantlock使用的是其独占模式,该队列描述了多线程环境下对锁资源的占用情况,其中,头结点即是表明占有该资源的线程。
      所以,如果线程1成功获取锁,则线程1会被包装成一个Node(AQS中的内部数据结构)加入到AQS的队列中,你所说的并未加入,是不准确的。
      2.调用await方法后,是会从AQS的该队列中移除该Node的,从我本文贴出的源码中可以看到,在await方法中有fullyRelease操作,这个操作会引起结点的移除。

      最后,再说明下,AQS只是维护了一个在多线程环境下对某个资源的占用情况,对外,可以理解成“临界区” 但在AQS内部来说,不过是检查在当前条件下是否可以获取资源这种操作的一种封装。所以,AQS的队列上挂了所有对该资源请求的线程,而AQS定义了头结点是表示占有该资源的线程(独占模式)。在共享模式下,则队列上的一系列结点都可以同时占有资源,对应于,唤醒的时候,这一些列线程都会被唤醒。

      回复
      1. 信言说道:
        2014年2月20日 下午4:34

        感谢楼主的回复。
        我查了源码,ReentrantLock.lock()调了内部类Sync的抽象方法lock,后者有一个公平和另一个不公平的实现。以不公平的实现NonfairSync(默认)为例,lock方法源码为:

        final void lock() {
        if (compareAndSetState(0, 1))//Try immediate barge
        setExclusiveOwnerThread(Thread.currentThread());
        else//backing up to normal acquire on failure.
        acquire(1);
        }
        如果cas操作成功,直接进入临界区(执行lock后续的代码)否则走常规流程调acquire(),
        acquire源码为:

        public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
        }

        tryAcquire如果成功,返回true,if表达式短路就直接结束了。
        似乎没有源码能对应到包装成一个Node加入AQS队列?

        回复
        1. liuinsect说道:
          2014年2月22日 上午10:48

          如果tryAcquire 成功了,没有必要增加到AQS的等待队列中了, 反之,如果增加不成功,进入到acquireQueued方法中去,则会将当先现线程增加到AQS的等待队列中去的。

          回复
      2. 信言说道:
        2014年2月20日 下午5:03

        再看fullyRelease的源码(似乎没有出现结点从队列移除的代码):

        final long fullyRelease(Node node) {
        boolean failed = true;
        try {
        long savedState = getState();
        if (release(savedState)) {//调用release
        failed = false;
        return savedState;
        } else {
        throw new IllegalMonitorStateException();
        }
        } finally {
        if (failed)
        node.waitStatus = Node.CANCELLED;
        }
        }

        它调用了release:
        public final boolean release(long arg) {
        if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
        unparkSuccessor(h);
        return true;
        }
        return false;
        }

        再调用了tryRelease,如果成功,唤醒AQS队列的头结点让它尝试进入临界区(因此我理解的AQS队列上的每个结点都代表了一个正等待进入临界区而被block的线程)

        而tryRelease纯粹是状态值的操作,也不涉及出队列:
        protected final boolean tryRelease(int releases) {
        int c = getState() – releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
        }

        回复
        1. liuinsect说道:
          2014年2月20日 下午8:26

          unparkSuccessor 方法中有移除节点的方法:
          private void unparkSuccessor(Node node) {
          /*
          * If status is negative (i.e., possibly needing signal) try
          * to clear in anticipation of signalling. It is OK if this
          * fails or if status is changed by waiting thread.
          */
          int ws = node.waitStatus;
          if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) {
          s = null;
          for (Node t = tail; t != null && t != node; t = t.prev)
          if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }

          回复
  2. Pingback: 怎么理解Condition | 并发编程网 - ifeve.com

多线程锁--怎么理解Condition相关推荐

  1. php和python的多线程,Python多线程以及线程锁简单理解(代码)

    本篇文章给大家带来的内容是关于Python多线程以及线程锁简单理解(代码),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. 多线程threading 模块创建线程创建自己的线程类线程通 ...

  2. 关于多线程中锁的理解

    2019独角兽企业重金招聘Python工程师标准>>> 在多线程中,锁是非常重要的一个东西. 在Java语言中,有对象和类之分,因此多线程的锁也可分为对象锁和类锁. 对象锁,顾名思义 ...

  3. java线程钥匙_Java多线程并发编程/锁的理解

    一.前言 最近项目遇到多线程并发的情景(并发抢单&恢复库存并行),代码在正常情况下运行没有什么问题,在高并发压测下会出现:库存超发/总库存与sku库存对不上等各种问题. 在运用了 限流/加锁等 ...

  4. 深入剖析基于并发AQS的(独占锁)重入锁(ReetrantLock)及其Condition实现原理

    [版权申明]未经博主同意,谢绝转载!(请尊重原创,博主保留追究权) http://blog.csdn.net/javazejian/article/details/75043422 出自[zejian ...

  5. 【JUC并发编程06】多线程锁 (公平锁和非公平锁,死锁,可重锁)

    文章目录 6 多线程锁 (公平锁和非公平锁,死锁,可重锁) 6.1 synchronized 锁的八种情况 6.2 对上述例子的总结 6.3 公平锁和非公平锁 6.4 可重入锁 6.5 死锁 6 多线 ...

  6. 存储过程没有执行完后没有释放锁_【大厂面试07期】说一说你对synchronized锁的理解?...

    PS:本文已收录到1.3 K+ Star 数的开源项目-<大厂面试指北>,如果想要了解更多,可以看一看,项目地址如下: https://github.com/NotFound9/inter ...

  7. 你对锁的理解?如何手动模拟一个死锁?

    在并发编程中有两个重要的概念:线程和锁,多线程是一把双刃剑,它在提高程序性能的同时,也带来了编码的复杂性,对开发者的要求也提高了一个档次.而锁的出现就是为了保障多线程在同时操作一组资源时的数据一致性, ...

  8. 多线程锁详解之【临界区】

    更多的锁介绍可以先看看这篇文章:多线程锁详解之[序章] 正文: 一般锁的类型可分为两种:用户态锁和内核态锁.用户态锁是指这个锁的不能够跨进程使用.而内核态锁就是指能够跨进程使用的锁.一般书中会说,wi ...

  9. Java多线程 - 锁

    Java多线程 - 锁 三性 可见性 指的是线程之间的可见性,一个线程对状态的修改,对其他线程是可见的.在 Java中 volatile.synchronized 和 final 实现可见性. 原子性 ...

最新文章

  1. 深度学习入门课程推荐
  2. pptp client
  3. 深度学习笔记二:PAC,PAC白化,ZCA白化
  4. 实现一个多线程循环的类
  5. 事务的控制(保存点)
  6. array_uniquee php_【性能为王】从PHP源码剖析array_keys和array_unique
  7. 现代软件工程讲义 5 项目经理 Program Manager
  8. c++primer 3.4练习题
  9. 盘点分库分表中,你一定要避开的那些坑!
  10. 在生产中使用Istio,我们学到了什么?
  11. JQuery Ajax调用asp.net后台方法
  12. java 解压ygb文件_文件系统-目录项缓存与散列表
  13. 《数据挖掘概念与技术》第二版 中文版 第二章答案
  14. matlab 倒位序fft程序,FFT算法设计与实现
  15. jre7或jre8或其他版本共存问题
  16. 仿苹果手机_安卓变苹果,苹果控制中心也能用上了
  17. 计算机专业自主招生有哪些学校,2019自主招生学校有哪些 自主招生考试院校名单...
  18. pycharm导入.pyt后缀文件
  19. 信息服务器v6,服务器ipv6设置
  20. u盘一直提示格式化是什么原因?怎么找回数据?

热门文章

  1. 画像分析(3-3)标签建模-模型管理-新建关系
  2. 1-5Tomcat 目录结构 和 web项目目录结构
  3. 30个WordPress Retina(iPad)自适应主题
  4. (转)结婚那天,妈问我:坐在角落里象两个要饭模样的人是谁?
  5. html循环加载多个图片,两行代码实现图片碎片化加载
  6. STM32震动感应控制继电器(使用循环VS使用外部中断EXTI和中断控制器NVIC)
  7. 全国计算机等级考试题库二级C操作题100套(第59套)
  8. mysql js 命令行登录_mysqlsh 命令行模式与密码保存-爱可生
  9. 关键词联想关联 php,ECSHOP商品关键词模糊分词搜索插件,商品列表关键字加红功能...
  10. 执行git命令时提示秘钥权限太开放‘Permissions 0644 for ‘/Users/liuml/.ssh/id_rsa_tz‘ are too open.’