2019独角兽企业重金招聘Python工程师标准>>>

1、AQS是一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架

2、背景介绍
    互斥锁 
    线程在获取互斥锁的时候,如果发现锁已经被其它线程占有,那么线程就会进行休眠,
         然后在适当的时机(比如唤醒)在获取锁
    自旋锁 
    那么自旋锁顾名思义就是“自旋”。就是当一个线程在尝试获取锁失败之后,线程不会休眠或者挂起,
         而是一直在循环检测锁是否被其它线程释放。

区别 
    互斥锁就是开始开销要大于自旋锁。
    临界区持锁时间的大小并不会对互斥锁的开销造成影响,而自旋锁是死循环检测,加锁全程消耗cpu,
         起始开销虽然低于互斥锁,但是随着持锁时间,加锁的开销是线性增长。 
    适用的情况 
    互斥锁用于临界区持锁时间比较长的操作,比如下面这些情况都可以考虑:临界区有IO操作;
         临界区代码复杂或者循环量大;临界区竞争非常激烈;单核处理器 
    自旋锁就主要用在临界区持锁时间非常短且CPU资源不紧张的情况下,当递归调用时有可能造成死锁

3、AQS中的队列锁
    AQS框架里面的队列锁脱胎于CLH队列锁。
        CLH锁是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋
        AQS中的队列锁[wait queue]对CLH队列锁改动了两个地方,
        1.节点结构上做出改变。
            CLH队列锁的节点包含一个布尔类型locked的字段,如果要获取锁,就将这个locked设置为true,然后就不停的轮训前驱节点的locked是否释放了锁(这个过程我们就叫做自旋)
            AQS的CLH队列在结构上引入了头节点,尾节点。并且拥有一个前节点与下一个节点的引用。 
        2.在等待获取锁的机制上由自旋改成了等待阻塞。

4、代码简述
      AQS实现了线程等待队列的维护(如获取资源失败入队/唤醒出队等),自定义同步器只需要实现对资源state的获取与释放即可
      state定义为volatile 类型,操作方式有getState()、setState()、compareAndSetState()
      一般自定义同步器要么是共享方式要么是独占方式,当然也支持两种方式混用如:ReentrantReadWriteLock
           共享式:acquireShared(int arg);releaseShared(int arg)

独占式: acquire(int arg);release(int arg)

PS:用到condition还需要实现isHeldExclusively(),condition释放后会将节点从阻塞队列摘下放入到sync同步队列,参与竞争资源-->执行

通假方法: acquire/acquireShared -->lock        tryAcquire/tryAcquireShared-->tryLock
                    release/tryRelease -->unlock            tryRelease/tryReleaseShared-->tryUnlock
                    
           A、独占式 实现acquire 忽略中断
       A-1:我们先来看看acquire干了些啥,贴个源码便于分析
            public final void acquire(int arg) {
                if (!tryAcquire(arg) &&
                    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();//最后如果获取到资源了,但是发现期间线程发生了中断,补上
            }
        tryAcquire(arg)由子类实现,获取到arg个资源就返回;否则压栈入队列等待资源获取
        A-2:进入压栈入队列操作acquireQueued之前,分两步,同程序流程,先进入addWaiter一看
            private Node addWaiter(Node mode) {
                Node node = new Node(Thread.currentThread(), mode);//构造队列节点
                Node pred = tail;
                   if (pred != null) {//能直接快速挂靠到尾节点自然最好
                    node.prev = pred;
                    if (compareAndSetTail(pred, node)) {//CAS式挂靠 允许失败
                        pred.next = node;
                        return node;
                    }
                }
                enq(node);//挂靠失败,看来在执行acquireQueued方法前,还需要再次查看下enq操作什么
                return node;
            }
            enq操作如下,特别注意,此处是上文挂靠尾节点失败而来,所以应该保证成功:
            private Node enq(final Node node) {
                for (;;) {//for循环==当然这里要牛逼一点说成是自旋
                    Node t = tail;
                    if (t == null) { //尾节点不存在,这就需要初始化了
                        if (compareAndSetHead(new Node()))
                        tail = head;
                    } else {
                        node.prev = t;
                        if (compareAndSetTail(t, node)) {//CAS更新tail为node 自旋到操作成功
                        t.next = node;
                        return t;
                            /**YSMA-ASK1 问此处tail t已经被CAS更新为node了,
                             *但是为何没有赋值tail=node? 那其余线程获取的tail还是tail么? */
                        }
                    }
                }
            }
            ok节点node已经入队列了,那我们看看acquireQueued做了些什么
            final boolean acquireQueued(final Node node, int arg) {
                boolean failed = true;
                try {
                    boolean interrupted = false;//中断标识
                    for (;;) {//自旋修改前驱节点waitStatus为SIGNAL 然后park阻塞,等待唤醒
                        final Node p = node.predecessor();
                        if (p == head && tryAcquire(arg)) {//如果前驱节点是head,就进行许可/资源获取
                        setHead(node);//获取到资源了,就更新自己为头
                        p.next = null; 
                        failed = false;
                        return interrupted;//返回线程是否被中断过
                        }
                        //见名知意,获取许可失败后应该阻塞
                        if (shouldParkAfterFailedAcquire(p, node) //YSMA1
                        && parkAndCheckInterrupt())//执行阻塞,返回线程interrupted状态
                        interrupted = true;
                    }
                } finally {
                    if (failed)
                    cancelAcquire(node);//tryAcquire抛异常等情况,取消节点抢夺资源的资格
                }
            }
            如上YSMA1,我们扒一下shouldParkAfterFailedAcquire
            private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
                    int ws = pred.waitStatus;
                    if (ws == Node.SIGNAL)//前驱节点为SIGNAL状态,告诉acquireQueued可以进行park阻塞了
                        return true;
                       if (ws > 0) {//前驱节点为取消状态
                          do {
                            node.prev = pred = pred.prev; //从队列中循环摘除无效前驱节点
                        } while (pred.waitStatus > 0);
                        pred.next = node;
                    } else {
                    //cas更新前驱节点waitStatus为SIGNAL,失败了还会进来重试的
                        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
                    }
                    return false;//本次更新失败,告诉acquireQueued先别park线程
                }
            
            总结如下:
                首先tryAcquire获取资源成功,就不用阻塞了,线程得到执行;
                其次tryAcquire失败,需要阻塞线程,过程为:建一个EXCLUSIVE的node,挂到sync队列的尾部,
                     自旋-更新前驱节点的waitStatus状态为SIGNAL,成功后park阻塞线程,等待唤醒。
                ps:YSMA_FLAG 则tail节点的waitStatus状态为0,因为没有后继节点来负责更新

B、独占式 实现relase 忽略中断
              B-1:贴个源码,先来看看release干了些啥
                  public final boolean release(int arg) {
                /**这个比较牛,失败就直接返回false了
                 * 所以难怪AQS的框架都要求手动release,且放在finally里面,就是怕release失败吧
                 */
                if (tryRelease(arg)) {
                    Node h = head;
                    if (h != null && h.waitStatus != 0)
                        //问:这里为什么要强调waitStatus不为0? 见YSMA_FLAG
                        unparkSuccessor(h);//唤醒后继节点
                    return true;
                }
                return false;
              }
                B-2:已经如此了,那就看一下unparkSuccessor吧
            private void unparkSuccessor(Node node) {
                    int ws = node.waitStatus;
                    if (ws < 0)//更新状态为0,允许[但一般不会]失败 马上就要释放node了,没啥影响
                        compareAndSetWaitStatus(node, ws, 0);
                    Node s = node.next;
                    if (s == null || s.waitStatus > 0) {
                        s = null;
                        for (Node t = tail; t != null && t != node; t = t.prev)
                            if (t.waitStatus <= 0)
                            s = t;
                    /**如果后继节点已经被释放或者取消了
                     *从tail开始找起,找到里node[也就是head]最近的有效的node进行唤醒
                     **/
                    }
                    if (s != null)
                    LockSupport.unpark(s.thread);//唤醒后继节点S的线程,S获取资源后会设置自己为head的
                }
            总结如下:
                tryRelease(arg)释放arg个资源后,如果需要唤醒后继节点就返回true,否则返回false
                比如重入锁场景,重入的时候state+1,释放的时候state-1但是此时持锁的还是自己且state不为0,
                就应该返回false

C、共享式 实现acquireShared 忽略中断
                共享模式下线程获取资源的顶层入口。获取指定量的资源,
                获取到直接返回:do what you want
                获取不到,阻塞等待,直到获取到资源返回:do what you want
                PS:整个过程忽略中断:即线程interrupted了也会照样会阻塞,不过结束后会补一个中断
                贴个源码:
                public final void acquireShared(int arg) {
                    if (tryAcquireShared(arg) < 0)
                        doAcquireShared(arg);
                }
                tryAcquireShared方法定义是由自定义同步器去去扩展。通过操作state来标识资源
                负值代表获取失败;
                0代表获取成功,但没有剩余资源;
                正数表示获取成功,还有剩余资源,其他线程还可以去获取.
                贴个源码:
                 protected boolean tryAcquireShared(int arg) {
                    throw new UnsupportedOperationException();
                }
                doAcquireShared方法,将当前线程加入队列等待获取资源,直到被唤醒。
                贴个源码:
                private void doAcquireShared(int arg) {
                    final Node node = addWaiter(Node.SHARED);//标识是共享模式的节点
                    boolean failed = true;
                    try {
                        boolean interrupted = false;
                        for (;;) {
                            final Node p = node.predecessor();
                            if (p == head) {
                            int r = tryAcquireShared(arg);
                            if (r >= 0) {//有资源,不用再阻塞等待了,更新自己为head,返回让主线程继续执行
                                setHeadAndPropagate(node, r);
                                p.next = null; // help GC
                                if (interrupted)//如果线程被中断了,补上.
                                    selfInterrupt();
                                failed = false;
                                return;
                            }
                                /**如果前驱节点是head,但是拿不到资源--也不是不可能
                                 *如countdownlatch 没有countdown 就需要一直await
                                 */
                            }
                            /**同上,cas自旋修改前驱节点状态为single,直到成功,然后进入park阻塞状态*/
                            if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                            interrupted = true;
                        }
                    } finally {
                        if (failed)
                            cancelAcquire(node);
                    }
                }
                setHeadAndPropagate方法值得一窥,不同于独占模式,这里会唤醒之后的节点来抢资源。
                贴个源码:
                private void setHeadAndPropagate(Node node, int propagate) {
                    Node h = head;
                    setHead(node);//自己变为head节点了
                   /**propagate资源还有很多,大于0. [唤醒后继节点来抢资源]
                     * 或者旧head为null:执行完毕 
                     * 或者旧head节点的waiStatus<0:正在执行
                     * 或者本节点[新head]为null:执行完毕
                     * 或者本节点[新head]的waiStatus<0:正在执行
                     * 因为是或运算,所以Doug[dʌg] Lea应该是本着有枣没枣的先打它三杆子的想法书写
                     * YSMA-ASK2 head已经替换,但是head并没有重新显式指向新head,那么head还是head么?
                     */
                    if (propagate > 0 || h == null || h.waitStatus < 0 ||
                        (h = head) == null || h.waitStatus < 0) {
                        Node s = node.next;
                        if (s == null || s.isShared())
                            doReleaseShared();
                    }
                }
                doReleaseShared方法留于releaseShared处讲解。
                 最后,如果线程在等待期间被中断过,也就不用指望对应线程获得执行并在执行完毕后释放资源了
                所以需要cancelAcquire从sync同步队列中卸下当前节点并唤醒后继节点
                private void cancelAcquire(Node node) {
                    // node已经被释放了 已经被摘除过了
                    if (node == null)
                        return;

node.thread = null;

//循环摘除已取消状态的节点,将前驱索引指向有效节点
                    Node pred = node.prev;
                    while (pred.waitStatus > 0)
                        node.prev = pred = pred.prev;

Node predNext = pred.next;//前驱节点的后继节点,不一定是当前的node

node.waitStatus = Node.CANCELLED;//当前节点的状态置为取消

//当前节点是tail,也不用指向自己了,把pred置为tail并直接置后继节点null结束
                    if (node == tail && compareAndSetTail(node, pred)) {
                        compareAndSetNext(pred, predNext, null);
                    } else {
                        //pred非head 且 waitStatus为SIGNAL或者可以变为SIGNAL 且thread不为null
                        int ws;
                        if (pred != head 
                                && ((ws = pred.waitStatus) == Node.SIGNAL ||
                                  (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) 
                                && pred.thread != null) {
                                Node next = node.next;
                                if (next != null && next.waitStatus <= 0)
                                compareAndSetNext(pred, predNext, next);
                                //当前节点的后继节点有效,挂到pred上,卸下当前节点
                        } else {
                            unparkSuccessor(node);//唤醒后继节点
                            /**此时并没有将当前节点的后继节点挂到predNext上?
                             *因为while已经摘除了无效节点了,所以此时也只能是pred为head这个情况了
                             *node的后继节点被唤醒后,会重新设置head的,所以此处可以忽略predNext的问题
                                */
                        }
                        node.next = node; // help gc 为什么不是node.next=null?  YSMA-ASK3
                    }
                }

D、共享式 实现releaseShared 忽略中断
                   共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,
                    如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。
                   贴个源码:
                public final boolean releaseShared(int arg) {
                    if (tryReleaseShared(arg)) {
                        doReleaseShared();
                        return true;
                    }
                    return false;
                    //不代表释放资源失败,可能仅仅是因为释放的资源太少,还达不到唤醒后继节点的条件
                }

tryReleaseShared(arg)方法定义是由自定义同步器去去扩展,释放arg个资源,
                这里返回的是布尔类型,可参考lock实现学习,这里贴个源码:
                protected boolean tryReleaseShared(int arg) {
                    throw new UnsupportedOperationException();
                }
                这里直接进入doReleaseShared,上源码:
            private void doReleaseShared() {
                    for (;;) {//自旋...
                        Node h = head;
                        if (h != null && h != tail) {
                            int ws = h.waitStatus;
                            if (ws == Node.SIGNAL) {//唤醒后继节点的标识
                            if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                                //YSMA1,CAS安全的更新waitStatus为SIGNAL,直到成功
                                continue;// loop to recheck cases
                            unparkSuccessor(h);//执行唤醒操作
                            }
                            else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                            //唤醒后继节点后,且已经更换head,更新waitStatus为PROPAGATE状态
                            continue; // loop on failed CAS
                         }
                         if (h == head)// loop if head changed
                         break;
                    }
                }
                总结:共享式的acquireShared的方式也是先判断前驱节点是否为head,是就竞争锁,
                         特别的,竞争成功后,如果资源大于0[事实上基本是必然执行的]会唤醒后继节点进行竞争锁
                        如此,doReleaseShared就有两个入口了,releaseShared和setHeadAndPropagate,
                        因为都是操作的head,所以就有了ws == Node.SIGNAL和ws == 0的判断了,标识为PROPAGATE的
                        也就无需再次进行唤醒后继节点的操作了

最后:关于YSMA-ASK1和YSMA-ASK2

是因为head和tail的定义为volatile

private transient volatile Node head;                    private transient volatile Node tail;

在结合static块代码:

            private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long stateOffset;private static final long headOffset;private static final long tailOffset;private static final long waitStatusOffset;private static final long nextOffset;static {try {stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));} catch (Exception ex) { throw new Error(ex); }}

以及

            private final boolean compareAndSetHead(Node update) {return unsafe.compareAndSwapObject(this, headOffset, null, update);}

答案就一目了然了,compareAndSwapObject操作的是索引,等同于显示赋值head=XXX

最后的最后,请允许我上一张自己画的Visio图

转载于:https://my.oschina.net/ysma1987/blog/3040540

AQS-sync同步队列 [自定义同步器框架]相关推荐

  1. java同步队列_Java 中队列同步器 AQS(AbstractQueuedSynchronizer)实现原理

    前言 在 Java 中通过锁来控制多个线程对共享资源的访问,使用 Java 编程语言开发的朋友都知道,可以通过 synchronized 关键字来实现锁的功能,它可以隐式的获取锁,也就是说我们使用该关 ...

  2. 同步器AQS中的同步队列与等待队列

    在单纯地使用锁,比如ReentrantLock的时候,这个锁组件内部有一个继承同步器AQS的类,实现了其抽象方法,加锁.释放锁也只是涉及到AQS中的同步队列而已,那么等待队列又是什么呢? 当使用Con ...

  3. AbstractQueuedSynchronizer同步队列与Condition等待队列协同机制

    之前对AbstractQueuedSynchronizer(AQS)同步队列与Condition等待队列的功能一直不是很清晰,没太清楚地区分开二者的区别和联系,最近研究了一下分享出来. 1.同步队列和 ...

  4. java8 同步队列_秋招之路8:JAVA锁体系和AQS抽象队列同步器

    整个的体系图 悲观锁,乐观锁 是一个广义概念:体现的是看待线程同步的不同角度. 悲观锁 认为在自己使用数据的时候一定有别的线程来修改数据,在获取数据的时候会先加锁,确保数据不被别的线程修改. 实现:关 ...

  5. Java Review - 并发编程_抽象同步队列AQS

    文章目录 概述 AQS--锁的底层支持 state 的作用 ConditionObject 独占 VS 共享 独占方式下,获取与释放资源的流程 共享方式下,获取与释放资源的流程 Interruptib ...

  6. java队列加锁_java并发-----浅析ReentrantLock加锁,解锁过程,公平锁非公平锁,AQS入门,CLH同步队列...

    前言 为什么需要去了解AQS,AQS,AbstractQueuedSynchronizer,即队列同步器.它是构建锁或者其他同步组件的基础框架(如ReentrantLock.ReentrantRead ...

  7. 【java】java JUC 同步器框架 AQS AbstractQueuedSynchronizer源码图文分析

    1.概述 转载:JUC锁: 锁核心类AQS详解 AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore ...

  8. AQS同步队列结构分析

    同步队列结构 AQS使用的同步队列是基于一种CLH锁算法来实现. CLH锁也是一种基于链表的可扩展.高性能.公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自 ...

  9. 死磕Java并发:J.U.C之AQS:CLH同步队列

    本文转载自公号:Java技术驿站 在上篇文章"死磕Java并发:J.U.C之AQS简介"中提到了AQS内部维护着一个FIFO队列,该队列就是CLH同步队列. CLH同步队列是一个F ...

  10. AQS独占式同步队列入队与出队

    入队 Node AQS同步队列和等待队列共用同一种节点结构Node,与同步队列相关的属性如下. prev 前驱结点 next 后继节点 thread 入队的线程 入队节点的状态 INITIAl 0 初 ...

最新文章

  1. 计算机科学中抽象的好处与问题—伪共享等实例分析
  2. shell脚本修复MySQL主从同步
  3. iis服务器配置php项目,Windows7下IIS+php配置教程
  4. 使用ADO.NET查询和操作数据
  5. mysql 5.6 没死_MySQL 5.6不删空用户的影响
  6. RNN、GRU、LSTM
  7. 将本地项目上传到gitLab操作
  8. spool文件命名引用两个变量_Python 中命名空间与作用域介绍
  9. 批量word删除页眉页脚——VBS脚本,在office宏中运行即可
  10. win7系统如何卸载漏洞补丁--win10专业版
  11. 解决:RuntimeError: Tensor for ‘out‘ is on CPU, Tensor for argument #1 ‘self‘ is on CPU, but expected t
  12. 大屏互动-大屏交互-大屏投影技术解决方案
  13. 安卓无线蓝牙耳机哪款好?实惠好用的蓝牙耳机品牌
  14. download sources报错: Cannot connect to the Maven process. Try again later. If the problem persists
  15. python 循环写入excel_用PYTHON将“for”循环的输出写到excel中
  16. 深入电子元器件行业产业场景,在线采购商城系统加速电子元器件交易数字化
  17. 佛经小故事--《盲龟浮木》
  18. SASE究竟还能火多久?
  19. 未来的计算机作文六百字,未来想象作文六百字
  20. impala 基础知识及使用

热门文章

  1. 统计学习中常用的损失函数
  2. SunnyOS准备4
  3. win7_fedora 双系统安装方法
  4. redis 实战面试
  5. 2018.09.23 bzoj1076: [SCOI2008]奖励关(期望+状压dp)
  6. (C/C++学习笔记)附页: C/C++变量的存储类型
  7. 项目回顾-RecyclerView和CheckBox错乱问题
  8. 源码分析--SDWebImage
  9. 【转】gem install libv8 错误
  10. 批量删除Cookie(实用)