万字超强图文讲解 AQS 以及 ReentrantLock 应用
Java SDK 为什么要设计 Lock
曾几何时幻想过,如果 Java 并发控制只有 synchronized 多好,只有下面三种使用方式,简单方便
public class ThreeSync {private static final Object object = new Object();public synchronized void normalSyncMethod(){//临界区}public static synchronized void staticSyncMethod(){//临界区}public void syncBlockMethod(){synchronized (object){//临界区}}
}
如果在 Java 1.5之前,确实是这样,自从 1.5 版本 Doug Lea 大师就重新造了一个轮子 Lock
我们常说:“避免重复造轮子”,如果有了轮子还是要坚持再造个轮子,那么肯定传统的轮子在某些应用场景中不能很好的解决问题
不知你是否还记得 Coffman 总结的四个可以发生死锁的情形 ,其中【不可剥夺条件】是指:
线程已经获得资源,在未使用完之前,不能被剥夺,只能在使用完时自己释放
要想破坏这个条件,就需要具有申请不到进一步资源就释放已有资源的能力
很显然,这个能力是 synchronized 不具备的,使用 synchronized ,如果线程申请不到资源就会进入阻塞状态,我们做什么也改变不了它的状态,这是 synchronized 轮子的致命弱点,这就强有力的给了重造轮子 Lock 的理由
显式锁 Lock
旧轮子有弱点,新轮子就要解决这些问题,所以要具备不会阻塞的功能,下面的三个方案都是解决这个问题的好办法(看下面表格描述你就明白三个方案的含义了)
特性 | 描述 | API |
---|---|---|
能响应中断 | 如果不能自己释放,那可以响应中断也是很好的。Java多线程中断机制 专门描述了中断过程,目的是通过中断信号来跳出某种状态,比如阻塞 | lockInterruptbly() |
非阻塞式的获取锁 | 尝试获取,获取不到不会阻塞,直接返回 | tryLock() |
支持超时 | 给定一个时间限制,如果一段时间内没获取到,不是进入阻塞状态,同样直接返回 | tryLock(long time, timeUnit) |
好的方案有了,但鱼和熊掌不可兼得,Lock 多了 synchronized 不具备的特性,自然不会像 synchronized 那样一个关键字三个玩法走遍全天下,在使用上也相对复杂了一丢丢
Lock 使用范式
synchronized 有标准用法,这样的优良传统咱 Lock 也得有,相信很多人都知道使用 Lock 的一个范式
Lock lock = new ReentrantLock();
lock.lock();
try{...
}finally{lock.unlock();
}
既然是范式(没事不要挑战更改写法的那种),肯定有其理由,我们来看一下
标准1—finally 中释放锁
这个大家应该都会明白,在 finally 中释放锁,目的是保证在获取到锁之后,最终能被释放
标准2—在 try{} 外面获取锁
不知道你有没有想过,为什么会有标准 2 的存在,我们通常是“喜欢” try 住所有内容,生怕发生异常不能捕获的
在 try{}
外获取锁主要考虑两个方面:
如果没有获取到锁就抛出异常,最终释放锁肯定是有问题的,因为还未曾拥有锁谈何释放锁呢
如果在获取锁时抛出了异常,也就是当前线程并未获取到锁,但执行到 finally 代码时,如果恰巧别的线程获取到了锁,则会被释放掉(无故释放)
不同锁的实现方式略有不同,范式的存在就是要避免一切问题的出现,所以大家尽量遵守范式
Lock 是怎样起到锁的作用呢?
如果你熟悉 synchronized,你知道程序编译成 CPU 指令后,在临界区会有 moniterenter
和 moniterexit
指令的出现,可以理解成进出临界区的标识
从范式上来看:
lock.lock()
获取锁,“等同于” synchronized 的 moniterenter指令lock.unlock()
释放锁,“等同于” synchronized 的 moniterexit 指令
那 Lock 是怎么做到的呢?
这里先简单说明一下,这样一会到源码分析时,你可以远观设计轮廓,近观实现细节,会变得越发轻松
其实很简单,比如在 ReentrantLock 内部维护了一个 volatile 修饰的变量 state,通过 CAS 来进行读写(最底层还是交给硬件来保证原子性和可见性),如果CAS更改成功,即获取到锁,线程进入到 try 代码块继续执行;如果没有更改成功,线程会被【挂起】,不会向下执行
但 Lock 是一个接口,里面根本没有 state 这个变量的存在:
它怎么处理这个 state 呢?很显然需要一点设计的加成了,接口定义行为,具体都是需要实现类的
Lock 接口的实现类基本都是通过【聚合】了一个【队列同步器】的子类完成线程访问控制的
那什么是队列同步器呢?(这应该是你见过的最强标题党,聊了半个世纪才入正题,评论区留言骂我)
队列同步器 AQS
队列同步器 (AbstractQueuedSynchronizer),简称同步器或AQS,就是我们今天的主人公
**问:**为什么你分析 JUC 源码,要从 AQS 说起呢?
**答:**看下图
相信看到这个截图你就明白一二了,你听过的,面试常被问起的,工作中常用的
ReentrantLock
ReentrantReadWriteLock
Semaphore(信号量)
CountDownLatch
公平锁
非公平锁
ThreadPoolExecutor
(关于线程池的理解,可以查看 为什么要使用线程池? )
都和 AQS 有直接关系,所以了解 AQS 的抽象实现,在此基础上再稍稍查看上述各类的实现细节,很快就可以全部搞定,不至于查看源码时一头雾水,丢失主线
上面提到,在锁的实现类中会聚合同步器,然后利同步器实现锁的语义,那么问题来了:
为什么要用聚合模式,怎么进一步理解锁和同步器的关系呢?
我们绝大多数都是在使用锁,实现锁之后,其核心就是要使用方便
从 AQS 的类名称和修饰上来看,这是一个抽象类,所以从设计模式的角度来看同步器一定是基于【模版模式】来设计的,使用者需要继承同步器,实现自定义同步器,并重写指定方法,随后将同步器组合在自定义的同步组件中,并调用同步器的模版方法,而这些模版方法又回调用使用者重写的方法
我不想将上面的解释说的这么抽象,其实想理解上面这句话,我们只需要知道下面两个问题就好了
哪些是自定义同步器可重写的方法?
哪些是抽象同步器提供的模版方法?
同步器可重写的方法
同步器提供的可重写方法只有5个,这大大方便了锁的使用者:
按理说,需要重写的方法也应该有 abstract 来修饰的,为什么这里没有?原因其实很简单,上面的方法我已经用颜色区分成了两类:
独占式
共享式
自定义的同步组件或者锁不可能既是独占式又是共享式,为了避免强制重写不相干方法,所以就没有 abstract 来修饰了,但要抛出异常告知不能直接使用该方法:
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}
暖暖的很贴心(如果你有类似的需求也可以仿照这样的设计)
表格方法描述中所说的同步状态
就是上文提到的有 volatile 修饰的 state,所以我们在重写
上面几个方法时,还要通过同步器提供的下面三个方法(AQS 提供的)来获取或修改同步状态:
而独占式和共享式操作 state 变量的区别也就很简单了
所以你看到的 ReentrantLock
ReentrantReadWriteLock
Semaphore(信号量)
CountDownLatch
这几个类其实仅仅是在实现以上几个方法上略有差别,其他的实现都是通过同步器的模版方法来实现的,到这里是不是心情放松了许多呢?我们来看一看模版方法:
同步器提供的模版方法
上面我们将同步器的实现方法分为独占式和共享式两类,模版方法其实除了提供以上两类模版方法之外,只是多了响应中断
和超时限制
的模版方法供 Lock 使用,来看一下
先不用记上述方法的功能,目前你只需要了解个大概功能就好。另外,相信你也注意到了:
上面的方法都有 final 关键字修饰,说明子类不能重写这个方法
看到这你也许有点乱了,我们稍微归纳一下:
程序员还是看代码心里踏实一点,我们再来用代码说明一下上面的关系(注意代码中的注释,以下的代码并不是很严谨,只是为了简单说明上图的代码实现):
package top.dayarch.myjuc;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;/*** 自定义互斥锁** @author tanrgyb* @date 2020/5/23 9:33 PM*/
public class MyMutex implements Lock {// 静态内部类-自定义同步器private static class MySync extends AbstractQueuedSynchronizer{@Overrideprotected boolean tryAcquire(int arg) {// 调用AQS提供的方法,通过CAS保证原子性if (compareAndSetState(0, arg)){// 我们实现的是互斥锁,所以标记获取到同步状态(更新state成功)的线程,// 主要为了判断是否可重入(一会儿会说明)setExclusiveOwnerThread(Thread.currentThread());//获取同步状态成功,返回 truereturn true;}// 获取同步状态失败,返回 falsereturn false;}@Overrideprotected boolean tryRelease(int arg) {// 未拥有锁却让释放,会抛出IMSEif (getState() == 0){throw new IllegalMonitorStateException();}// 可以释放,清空排它线程标记setExclusiveOwnerThread(null);// 设置同步状态为0,表示释放锁setState(0);return true;}// 是否独占式持有@Overrideprotected boolean isHeldExclusively() {return getState() == 1;}// 后续会用到,主要用于等待/通知机制,每个condition都有一个与之对应的条件等待队列,在锁模型中说明过Condition newCondition() {return new ConditionObject();}}// 聚合自定义同步器private final MySync sync = new MySync();@Overridepublic void lock() {// 阻塞式的获取锁,调用同步器模版方法独占式,获取同步状态sync.acquire(1);}@Overridepublic void lockInterruptibly() throws InterruptedException {// 调用同步器模版方法可中断式获取同步状态sync.acquireInterruptibly(1);}@Overridepublic boolean tryLock() {// 调用自己重写的方法,非阻塞式的获取同步状态return sync.tryAcquire(1);}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {// 调用同步器模版方法,可响应中断和超时时间限制return sync.tryAcquireNanos(1, unit.toNanos(time));}@Overridepublic void unlock() {// 释放锁sync.release(1);}@Overridepublic Condition newCondition() {// 使用自定义的条件return sync.newCondition();}
}
如果你现在打开 IDE, 你会发现上文提到的 ReentrantLock
ReentrantReadWriteLock
Semaphore(信号量)
CountDownLatch
都是按照这个结构实现,所以我们就来看一看 AQS 的模版方法到底是怎么实现锁
AQS实现分析
从上面的代码中,你应该理解了lock.tryLock()
非阻塞式获取锁就是调用自定义同步器重写的 tryAcquire()
方法,通过 CAS 设置state 状态,不管成功与否都会马上返回;那么 lock.lock() 这种阻塞式的锁是如何实现的呢?
有阻塞就需要排队,实现排队必然需要队列
CLH:Craig、Landin and Hagersten 队列,是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO)——概念了解就好,不要记
队列中每个排队的个体就是一个 Node,所以我们来看一下 Node 的结构
Node 节点
AQS 内部维护了一个同步队列,用于管理同步状态。
当线程获取同步状态失败时,就会将当前线程以及等待状态等信息构造成一个 Node 节点,将其加入到同步队列中尾部,阻塞该线程
当同步状态被释放时,会唤醒同步队列中“首节点”的线程获取同步状态
为了将上述步骤弄清楚,我们需要来看一看 Node 结构 (如果你能打开 IDE 一起看那是极好的)
乍一看有点杂乱,我们还是将其归类说明一下:
上面这几个状态说明有个印象就好,有了Node 的结构说明铺垫,你也就能想象同步队列的接本结构了:
前置知识基本铺垫完毕,我们来看一看独占式获取同步状态的整个过程
独占式获取同步状态
故事要从范式lock.lock() 开始
public void lock() {// 阻塞式的获取锁,调用同步器模版方法,获取同步状态sync.acquire(1);
}
进入AQS的模版方法 acquire()
public final void acquire(int arg) {// 调用自定义同步器重写的 tryAcquire 方法if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
首先,也会尝试非阻塞的获取同步状态,如果获取失败(tryAcquire返回false),则会调用 addWaiter
方法构造 Node 节点(Node.EXCLUSIVE 独占式)并安全的(CAS)加入到同步队列【尾部】
private Node addWaiter(Node mode) {// 构造Node节点,包含当前线程信息以及节点模式【独占/共享】Node node = new Node(Thread.currentThread(), mode);// 新建变量 pred 将指针指向tail指向的节点Node pred = tail;// 如果尾节点不为空if (pred != null) {// 新加入的节点前驱节点指向尾节点node.prev = pred;// 因为如果多个线程同时获取同步状态失败都会执行这段代码// 所以,通过 CAS 方式确保安全的设置当前节点为最新的尾节点if (compareAndSetTail(pred, node)) {// 曾经的尾节点的后继节点指向当前节点pred.next = node;// 返回新构建的节点return node;}}// 尾节点为空,说明当前节点是第一个被加入到同步队列中的节点// 需要一个入队操作enq(node);return node;}private Node enq(final Node node) {// 通过“死循环”确保节点被正确添加,最终将其设置为尾节点之后才会返回,这里使用 CAS 的理由和上面一样for (;;) {Node t = tail;// 第一次循环,如果尾节点为 nullif (t == null) { // Must initialize// 构建一个哨兵节点,并将头部指针指向它if (compareAndSetHead(new Node()))// 尾部指针同样指向哨兵节点tail = head;} else {// 第二次循环,将新节点的前驱节点指向tnode.prev = t;// 将新节点加入到队列尾节点if (compareAndSetTail(t, node)) {// 前驱节点的后继节点指向当前新节点,完成双向队列t.next = node;return t;}}}}
你可能比较迷惑 enq() 的处理方式,进入该方法就是一个“死循环”,我们就用图来描述它是怎样跳出循环的
有些同学可能会有疑问,为什么会有哨兵节点?
哨兵,顾名思义,是用来解决国家之间边界问题的,不直接参与生产活动。同样,计算机科学中提到的哨兵,也用来解决边界问题,如果没有边界,指定环节,按照同样算法可能会在边界处发生异常,比如要继续向下分析的
acquireQueued()
方法
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;// "死循环",尝试获取锁,或者挂起for (;;) {// 获取当前节点的前驱节点final Node p = node.predecessor();// 只有当前节点的前驱节点是头节点,才会尝试获取锁// 看到这你应该理解添加哨兵节点的含义了吧if (p == head && tryAcquire(arg)) {// 获取同步状态成功,将自己设置为头setHead(node);// 将哨兵节点的后继节点置为空,方便GCp.next = null; // help GCfailed = false;// 返回中断标识return interrupted;}// 当前节点的前驱节点不是头节点//【或者】当前节点的前驱节点是头节点但获取同步状态失败if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
获取同步状态成功会返回可以理解了,但是如果失败就会一直陷入到“死循环”中浪费资源吗?很显然不是,shouldParkAfterFailedAcquire(p, node)
和 parkAndCheckInterrupt()
就会将线程获取同步状态失败的线程挂起,我们继续向下看
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 获取前驱节点的状态int ws = pred.waitStatus;// 如果是 SIGNAL 状态,即等待被占用的资源释放,直接返回 true// 准备继续调用 parkAndCheckInterrupt 方法if (ws == Node.SIGNAL)return true;// ws 大于0说明是CANCELLED状态,if (ws > 0) {// 循环判断前驱节点的前驱节点是否也为CANCELLED状态,忽略该状态的节点,重新连接队列do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {// 将当前节点的前驱节点设置为设置为 SIGNAL 状态,用于后续唤醒操作// 程序第一次执行到这返回为false,还会进行外层第二次循环,最终从代码第7行返回compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
到这里你也许有个问题:
这个地方设置前驱节点为 SIGNAL 状态到底有什么作用?
保留这个问题,我们陆续揭晓
如果前驱节点的 waitStatus 是 SIGNAL状态,即 shouldParkAfterFailedAcquire 方法会返回 true ,程序会继续向下执行 parkAndCheckInterrupt
方法,用于将当前线程挂起
private final boolean parkAndCheckInterrupt() {// 线程挂起,程序不会继续向下执行LockSupport.park(this);// 根据 park 方法 API描述,程序在下述三种情况会继续向下执行// 1. 被 unpark // 2. 被中断(interrupt)// 3. 其他不合逻辑的返回才会继续向下执行// 因上述三种情况程序执行至此,返回当前线程的中断状态,并清空中断状态// 如果由于被中断,该方法会返回 truereturn Thread.interrupted();}
被唤醒的程序会继续执行 acquireQueued
方法里的循环,如果获取同步状态成功,则会返回 interrupted = true
的结果
程序继续向调用栈上层返回,最终回到 AQS 的模版方法 acquire
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
你也许会有疑惑:
程序已经成功获取到同步状态并返回了,怎么会有个自我中断呢?
static void selfInterrupt() {Thread.currentThread().interrupt();
}
如果你不能理解中断,强烈建议你回看 Java多线程中断机制
到这里关于获取同步状态我们还遗漏了一条线,acquireQueued 的 finally 代码块如果你仔细看你也许马上就会有疑惑:
到底什么情况才会执行 if(failed) 里面的代码 ?
if (failed)cancelAcquire(node);
这段代码被执行的条件是 failed 为 true,正常情况下,如果跳出循环,failed 的值为false,如果不能跳出循环貌似怎么也不能执行到这里,所以只有不正常的情况才会执行到这里,也就是会发生异常,才会执行到此处
查看 try 代码块,只有两个方法会抛出异常:
node.processor()
方法自己重写的
tryAcquire()
方法
先看前者:
很显然,这里抛出的异常不是重点,那就以 ReentrantLock 重写的 tryAcquire() 方法为例
另外,上面分析 shouldParkAfterFailedAcquire
方法还对 CANCELLED 的状态进行了判断,那么
什么时候会生成取消状态的节点呢?
答案就在 cancelAcquire
方法中, 我们来看看 cancelAcquire到底怎么设置/处理 CANNELLED 的
private void cancelAcquire(Node node) {// 忽略无效节点if (node == null)return;// 将关联的线程信息清空node.thread = null;// 跳过同样是取消状态的前驱节点Node pred = node.prev;while (pred.waitStatus > 0)node.prev = pred = pred.prev;// 跳出上面循环后找到前驱有效节点,并获取该有效节点的后继节点Node predNext = pred.next;// 将当前节点的状态置为 CANCELLEDnode.waitStatus = Node.CANCELLED;// 如果当前节点处在尾节点,直接从队列中删除自己就好if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);} else {int ws;// 1. 如果当前节点的有效前驱节点不是头节点,也就是说当前节点不是头节点的后继节点if (pred != head &&// 2. 判断当前节点有效前驱节点的状态是否为 SIGNAL((ws = pred.waitStatus) == Node.SIGNAL ||// 3. 如果不是,尝试将前驱节点的状态置为 SIGNAL(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&// 判断当前节点有效前驱节点的线程信息是否为空pred.thread != null) {// 上述条件满足Node next = node.next;// 将当前节点有效前驱节点的后继节点指针指向当前节点的后继节点if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next);} else {// 如果当前节点的前驱节点是头节点,或者上述其他条件不满足,就唤醒当前节点的后继节点unparkSuccessor(node);}node.next = node; // help GC}
看到这个注释你可能有些乱了,其核心目的就是从等待队列中移除 CANCELLED 的节点,并重新拼接整个队列,总结来看,其实设置 CANCELLED 状态节点只是有三种情况,我们通过画图来分析一下:
至此,获取同步状态的过程就结束了,我们简单的用流程图说明一下整个过程
获取锁的过程就这样的结束了,先暂停几分钟整理一下自己的思路。我们上面还没有说明 SIGNAL 的作用, SIGNAL 状态信号到底是干什么用的?这就涉及到锁的释放了,我们来继续了解,整体思路和锁的获取是一样的, 但是释放过程就相对简单很多了
独占式释放同步状态
故事要从 unlock() 方法说起
public void unlock() {// 释放锁sync.release(1);}
调用 AQS 模版方法 release,进入该方法
public final boolean release(int arg) {// 调用自定义同步器重写的 tryRelease 方法尝试释放同步状态if (tryRelease(arg)) {// 释放成功,获取头节点Node h = head;// 存在头节点,并且waitStatus不是初始状态// 通过获取的过程我们已经分析了,在获取的过程中会将 waitStatus的值从初始状态更新成 SIGNAL 状态if (h != null && h.waitStatus != 0)// 解除线程挂起状态unparkSuccessor(h);return true;}return false;}
查看 unparkSuccessor 方法,实际是要唤醒头节点的后继节点
private void unparkSuccessor(Node node) { // 获取头节点的waitStatusint ws = node.waitStatus;if (ws < 0)// 清空头节点的waitStatus值,即置为0compareAndSetWaitStatus(node, ws, 0);// 获取头节点的后继节点Node s = node.next;// 判断当前节点的后继节点是否是取消状态,如果是,需要移除,重新连接队列if (s == null || s.waitStatus > 0) {s = null;// 从尾节点向前查找,找到队列第一个waitStatus状态小于0的节点for (Node t = tail; t != null && t != node; t = t.prev)// 如果是独占式,这里小于0,其实就是 SIGNALif (t.waitStatus <= 0)s = t;}if (s != null)// 解除线程挂起状态LockSupport.unpark(s.thread);}
有同学可能有疑问:
为什么这个地方是从队列尾部向前查找不是 CANCELLED 的节点?
原因有两个:
第一,先回看节点加入队列的情景:
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}
节点入队并不是原子操作,代码第6、7行
node.prev = pred;
compareAndSetTail(pred, node)
这两个地方可以看作是尾节点入队的原子操作,如果此时代码还没执行到 pred.next = node; 这时又恰巧执行了unparkSuccessor方法,就没办法从前往后找了,因为后继指针还没有连接起来,所以需要从后往前找
第二点原因,在上面图解产生 CANCELLED 状态节点的时候,先断开的是 Next 指针,Prev指针并未断开,因此这也是必须要从后往前遍历才能够遍历完全部的Node
同步状态至此就已经成功释放了,之前获取同步状态被挂起的线程就会被唤醒,继续从下面代码第 3 行返回执行:
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
继续返回上层调用栈, 从下面代码15行开始执行,重新执行循环,再次尝试获取同步状态
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
到这里,关于独占式获取/释放锁的流程已经闭环了,但是关于 AQS 的另外两个模版方法还没有介绍
响应中断
超时限制
独占式响应中断获取同步状态
故事要从lock.lockInterruptibly() 方法说起
public void lockInterruptibly() throws InterruptedException {// 调用同步器模版方法可中断式获取同步状态sync.acquireInterruptibly(1);}
有了前面的理解,理解独占式可响应中断的获取同步状态方式,真是一眼就能明白了:
public final void acquireInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 尝试非阻塞式获取同步状态失败,如果没有获取到同步状态,执行代码7行if (!tryAcquire(arg))doAcquireInterruptibly(arg);}
继续查看 doAcquireInterruptibly
方法:
private void doAcquireInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())// 获取中断信号后,不再返回 interrupted = true 的值,而是直接抛出 InterruptedException throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}
没想到 JDK 内部也有如此相近的代码,可响应中断获取锁没什么深奥的,就是被中断抛出 InterruptedException 异常(代码第17行),这样就逐层返回上层调用栈捕获该异常进行下一步操作了
趁热打铁,来看看另外一个模版方法:
独占式超时限制获取同步状态
这个很好理解,就是给定一个时限,在该时间段内获取到同步状态,就返回 true, 否则,返回 false。好比线程给自己定了一个闹钟,闹铃一响,线程就自己返回了,这就不会使自己是阻塞状态了
既然涉及到超时限制,其核心逻辑肯定是计算时间间隔,因为在超时时间内,肯定是多次尝试获取锁的,每次获取锁肯定有时间消耗,所以计算时间间隔的逻辑就像我们在程序打印程序耗时 log 那么简单
nanosTimeout = deadline - System.nanoTime()
故事要从 lock.tryLock(time, unit)
方法说起
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {// 调用同步器模版方法,可响应中断和超时时间限制return sync.tryAcquireNanos(1, unit.toNanos(time));}
来看 tryAcquireNanos 方法
public final boolean tryAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();return tryAcquire(arg) ||doAcquireNanos(arg, nanosTimeout);}
是不是和上面 acquireInterruptibly
方法长相很详细了,继续查看来 doAcquireNanos 方法,看程序, 该方法也是 throws InterruptedException,我们在中断文章中说过,方法标记上有 throws InterruptedException
说明该方法也是可以响应中断的,所以你可以理解超时限制是 acquireInterruptibly
方法的加强版,具有超时和非阻塞控制的双保险
private boolean doAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {// 超时时间内,为获取到同步状态,直接返回falseif (nanosTimeout <= 0L)return false;// 计算超时截止时间final long deadline = System.nanoTime() + nanosTimeout;// 以独占方式加入到同步队列中final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return true;}// 计算新的超时时间nanosTimeout = deadline - System.nanoTime();// 如果超时,直接返回 falseif (nanosTimeout <= 0L)return false;if (shouldParkAfterFailedAcquire(p, node) &&// 判断是最新超时时间是否大于阈值 1000 nanosTimeout > spinForTimeoutThreshold)// 挂起线程 nanosTimeout 长时间,时间到,自动返回LockSupport.parkNanos(this, nanosTimeout);if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}
上面的方法应该不是很难懂,但是又同学可能在第 27 行上有所困惑
为什么 nanosTimeout 和 自旋超时阈值1000进行比较?
/*** The number of nanoseconds for which it is faster to spin* rather than to use timed park. A rough estimate suffices* to improve responsiveness with very short timeouts.*/static final long spinForTimeoutThreshold = 1000L;
其实 doc 说的很清楚,说白了,1000 nanoseconds 时间已经非常非常短暂了,没必要再执行挂起和唤醒操作了,不如直接当前线程直接进入下一次循环
到这里,我们自定义的 MyMutex 只差 Condition 没有说明了,不知道你累了吗?我还在坚持
Condition
如果你看过之前写的 并发编程之等待通知机制 ,你应该对下面这个图是有印象的:
如果当时你理解了这个模型,再看 Condition 的实现,根本就不是问题了,首先 Condition 还是一个接口,肯定也是需要有实现类的
那故事就从 lock.newnewCondition
说起吧
public Condition newCondition() {// 使用自定义的条件return sync.newCondition();}
自定义同步器重封装了该方法:
Condition newCondition() {return new ConditionObject();}
ConditionObject 就是 Condition 的实现类,该类就定义在了 AQS 中,只有两个成员变量:
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
所以,我们只需要来看一下 ConditionObject 实现的 await / signal 方法来使用这两个成员变量就可以了
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 同样构建 Node 节点,并加入到等待队列中Node node = addConditionWaiter();// 释放同步状态int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {// 挂起当前线程LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}
这里注意用词,在介绍获取同步状态时,addWaiter 是加入到【同步队列】,就是上图说的入口等待队列,这里说的是【等待队列】,所以 addConditionWaiter 肯定是构建了一个自己的队列:
private Node addConditionWaiter() {Node t = lastWaiter;if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}// 新构建的节点的 waitStatus 是 CONDITION,注意不是 0 或 SIGNAL 了Node node = new Node(Thread.currentThread(), Node.CONDITION);// 构建单向同步队列if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;}
这里有朋友可能会有疑问:
为什么这里是单向队列,也没有使用CAS 来保证加入队列的安全性呢?
因为 await 是 Lock 范式 try 中使用的,说明已经获取到锁了,所以就没必要使用 CAS 了,至于是单向,因为这里还不涉及到竞争锁,只是做一个条件等待队列
在 Lock 中可以定义多个条件,每个条件都会对应一个 条件等待队列,所以将上图丰富说明一下就变成了这个样子:
线程已经按相应的条件加入到了条件等待队列中,那如何再尝试获取锁呢?signal / signalAll 方法就已经排上用场了
public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);}
Signal 方法通过调用 doSignal 方法,只唤醒条件等待队列中的第一个节点
private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;// 调用该方法,将条件等待队列的线程节点移动到同步队列中} while (!transferForSignal(first) &&(first = firstWaiter) != null);}
继续看 transferForSignal
方法
final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;// 重新进行入队操作Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))// 唤醒同步队列中该线程LockSupport.unpark(node.thread);return true;}
所以我们再用图解一下唤醒的整个过程
到这里,理解 signalAll 就非常简单了,只不过循环判断是否还有 nextWaiter,如果有就像 signal 操作一样,将其从条件等待队列中移到同步队列中
private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;do {Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first);first = next;} while (first != null);}
不知你还是否记得,我在并发编程之等待通知机制 中还说过一句话
没有特殊原因尽量用 signalAll 方法
什么时候可以用 signal 方法也在其中做了说明,请大家自行查看吧
这里我还要多说一个细节,从条件等待队列移到同步队列是有时间差的,所以使用 await() 方法也是范式的, 同样在该文章中做了解释
有时间差,就会有公平和不公平的问题,想要全面了解这个问题,我们就要走近 ReentrantLock 中来看了,除了了解公平/不公平问题,查看 ReentrantLock 的应用还是要反过来验证它使用的AQS的,我们继续吧
ReentrantLock 是如何应用的AQS
独占式的典型应用就是 ReentrantLock 了,我们来看看它是如何重写这个方法的
乍一看挺奇怪的,怎么里面自定义了三个同步器:其实 NonfairSync,FairSync 只是对 Sync 做了进一步划分:
从名称上你应该也知道了,这就是你听到过的 公平锁/非公平锁
了
何为公平锁/非公平锁?
生活中,排队讲求先来后到视为公平。程序中的公平性也是符合请求锁的绝对时间的,其实就是 FIFO,否则视为不公平
我们来对比一下 ReentrantLock 是如何实现公平锁和非公平锁的
其实没什么大不了,公平锁就是判断同步队列是否还有先驱节点的存在,只有没有先驱节点才能获取锁;而非公平锁是不管这个事的,能获取到同步状态就可以,就这么简单,那问题来了:
为什么会有公平锁/非公平锁的设计?
考虑这个问题,我们需重新回忆上面的锁获取实现图了,其实上面我已经透露了一点
主要有两点原因:
原因一:
恢复挂起的线程到真正锁的获取还是有时间差的,从人类的角度来看这个时间微乎其微,但是从CPU的角度来看,这个时间差存在的还是很明显的。所以非公平锁能更充分的利用 CPU 的时间片,尽量减少 CPU 空闲状态时间
原因二:
不知你是否还记得我在 面试问,创建多少个线程合适?文章中反复提到过,使用多线程很重要的考量点是线程切换的开销,想象一下,如果采用非公平锁,当一个线程请求锁获取同步状态,然后释放同步状态,因为不需要考虑是否还有前驱节点,所以刚释放锁的线程在此刻再次获取同步状态的几率就变得非常大,所以就减少了线程的开销
相信到这里,你也就明白了,为什么 ReentrantLock 默认构造器用的是非公平锁同步器
public ReentrantLock() {sync = new NonfairSync();}
看到这里,感觉非公平锁 perfect,非也,有得必有失
使用公平锁会有什么问题?
公平锁保证了排队的公平性,非公平锁霸气的忽视这个规则,所以就有可能导致排队的长时间在排队,也没有机会获取到锁,这就是传说中的 “饥饿”
如何选择公平锁/非公平锁?
相信到这里,答案已经在你心中了,如果为了更高的吞吐量,很显然非公平锁是比较合适的,因为节省很多线程切换时间,吞吐量自然就上去了,否则那就用公平锁还大家一个公平
我们还差最后一个环节,真的要挺住
可重入锁
到这里,我们还没分析 ReentrantLock 的名字,JDK 起名这么有讲究,肯定有其含义,直译过来【可重入锁】
为什么要支持锁的重入?
试想,如果是一个有 synchronized 修饰的递归调用方法,程序第二次进入被自己阻塞了岂不是很大的笑话,所以 synchronized 是支持锁的重入的
Lock 是新轮子,自然也要支持这个功能,其实现也很简单,请查看公平锁和非公平锁对比图,其中有一段代码:
// 判断当前线程是否和已占用锁的线程是同一个
else if (current == getExclusiveOwnerThread())
仔细看代码, 你也许发现,我前面的一个说明是错误的,我要重新解释一下
重入的线程会一直将 state + 1, 释放锁会 state - 1直至等于0,上面这样写也是想帮助大家快速的区分
总结
本文是一个长文,说明了为什么要造 Lock 新轮子,如何标准的使用 Lock,AQS 是什么,是如何实现锁的,结合 ReentrantLock 反推 AQS 中的一些应用以及其独有的一些特性
独占式获取锁就这样介绍完了,我们还差 AQS 共享式 xxxShared
没有分析,结合共享式,接下来我们来阅读一下 Semaphore,ReentrantReadWriteLock 和 CountLatch 等
另外,查看超清大图,也可以点击文末【阅读原文】,也欢迎大家的留言,如有错误之处还请指出。我的手酸了,眼睛干了,我去准备撸下一篇.....
灵魂追问
为什么更改 state 有 setState() , compareAndSetState() 两种方式,感觉后者更安全,但是锁的视线中有好多地方都使用了 setState(),安全吗?
下面代码是一个转账程序,是否存在死锁或者锁的其他问题呢?
class Account {private int balance;private final Lock lock= new ReentrantLock();// 转账void transfer(Account tar, int amt){while (true) {if(this.lock.tryLock()) {try {if (tar.lock.tryLock()) {try {this.balance -= amt;tar.balance += amt;} finally {tar.lock.unlock();}}//if} finally {this.lock.unlock();}}//if}//while}//transfer }
参考
感谢前辈们的知识结晶
Java 并发实战
Java 并发编程的艺术
https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html
推荐关注本文作者
万字超强图文讲解 AQS 以及 ReentrantLock 应用相关推荐
- 万字超强图文讲解AQS以及ReentrantLock应用(建议收藏)
写在前面 祝大家儿童节快乐????,保持童心,这篇文章作为儿童节礼物????送给大家.进入源码阶段了,写了十几篇的 并发系列 知识铺垫终于要派上用场了.相信很多人已经忘了其中的一些理论知识,别担心,我 ...
- 新概念一册电子书课本_新概念英语第二册完整版:音频+动画视频+课本图文讲解...
新概念英语 <新概念英语>是一经典得已经无需多做解释的英语学习好教材.它采用和人们生活.学习密切相关的语言材料,遵循语言习得的科学性及渐进性,可全面提升学生的听.说.读.写能力,使语言能力 ...
- pppoe 服务器 无线,无线路由器怎么设置PPPOE拨号【图文讲解】
1.如果是电话线到家,首先把路由器的WAN口和Modem的LAN口连接起来,电脑网卡连接路由器任意一个LAN口;如果是网线到家,就直接把网线接到路由器WAN口. 然后为电脑设置网络参数,指定IP地址, ...
- linux内存管理详解,Linux内存管理图文讲解.pdf
Linux内存管理图文讲解 逻辑地址.线性地址.物理地址和虚拟地址 一.概念 物理地址(physical address) 用于内存芯片级的单元寻址,和处理器和 CPU 连接的地址总线相对应. 这个概 ...
- 【转】Qt编写串口通信程序全程图文讲解
Qt编写串口通信程序全程图文讲解 本文章原创于www.yafeilinux.com 转载请注明出处. (说明:我们的编程环境是windows xp下,在Qt Creator中进行,如果在Linux下或 ...
- C++动态内存管理好难怎么办?零基础图文讲解,小白轻松理解原理
首先我们先了解一下内存: C语言使用malloc/free动态管理内存空间,C++引入了new/delete,new[]/delete[]来动态管理内存. 如果大家在自学C++中遇到困难,想找一个学习 ...
- po 价格条件表_海纳易拓图文讲解SAP MM模块采购价格条件
MM物料管理是SAP R/3系统的一个模块,支持日常发生的业务处理功能和过程.MM系统(物料管理)的目的是满足下列各种处理,即物料需求计划.物料采购.库存管理.发票确认和物料估价.主要包括:物料需求计 ...
- 天猫精灵智能家居对接,及天猫iot官网配置图文讲解(二)
天猫精灵智能家居对接,及天猫iot官网配置图文讲解(二) 2.天猫精灵设备对接 2-1.介绍 上一章里,我已经讲了天猫精灵的技能配置,设备创建,登录验证这三个部分做了,此次篇文章就讲之后的设备查询 ...
- flutter - 图文讲解表单组件基本使用 注册实战
图文讲解表单组件,创建表单组件.校验表单.复杂表单.复杂校验规则.动态控制表单 实现一个注册界面 创建表单组件 创建form组件 Form组件函数(准确说叫widget),然后写一个key,因为我们等 ...
最新文章
- python多项式回归_Python 多项式回归 - 树懒学堂
- 5g无线网络对电子竞技市场发展影响
- [css] 举例说明跟字体相关的属性有哪些
- oracle10g检测未通过,win64bit安装oracle 10g版本检查未通过解决 提示要求的结果: 5.0,5.1,5.2,6.0 之一 实际结果: 6.1...
- 复杂背景下计算机视觉模型害虫识别的比较研究(像素语义分割网络SegNet)
- FFmpeg4.3.2之ffplay log输出级别(三十)
- Java学习手册:Java基础知识点(不断扩充更新中)
- 2022智慧工地劳务实名制系统——工地人员高效管理黑科技
- SpringCloud OpenFeign调用第三方服务
- 发布源码及依赖到网络maven仓库
- 如何看损失函数图loss
- 外星人大战---------------------游戏开发
- 声韵启蒙(1)-(清)车万育
- swd只能下载一次第二次出现错误
- Android 删除图片后刷新媒体库
- 自做Google Chrome免安装绿色版
- WIN32开发之LRESULT CALLBACK WndProc(HWND hwnd, UINT message, WPARAM wParam, LPARAM lParam)
- 数据库恢复挂起解决办法
- JAVA毕业设计健身房管理系统设计计算机源码+lw文档+系统+调试部署+数据库
- 每日微信晨报早报新闻获取,哪里来的?
热门文章
- eclipse安装ADT插件重启后不显示Android SDK Manager和Android Virtual Device Manager图标的一种解决办法
- vector机器人 HOW TO RESET, ERASE AND RESTORE VECTOR 如何重置,删除和恢复向量
- linux suse 软件管理工具 zypper 简介
- golang post get put delete 请求实例代码
- docker centos7容器 安装ssh服务
- linux c 获取网络接口信息 ioct l函数 ifreq ifconf 结构体 简介
- golang 映射 map 简介
- vm虚拟机中 Kali更新后 不能自动适应窗口
- C语言单链表实现19个功能完全详解
- windows与linux下的\r\n