AbstractQueuedSynchronizer源码解析
目录
关于AbstractQueuedSynchronizer
基本数据结构
节点结构
同步队列结构
实现
子类需要实现的方法
独占模式实现
独占模式同步队列示意
共享模式
共享模式同步队列示意:
关于AbstractQueuedSynchronizer
JDK1.5之后引入了并发包java.util.concurrent,里面包含了很多并发控制锁类,其核心是:AbstractQueuedSynchronizer,其数据结构为链表方式的双向队列。
基本数据结构
节点结构
链表节点的字段含义如下:
字段 | 类型 | 初始值 | 意义 |
SHARED | final Node | 任意一个Node对象 | 一个指示器,用于标识Node处于共享模式。 |
EXCLUSIVE | final Node | null | 一个指示器,用于标识Node处于独占模式。 |
CANCELLED | final int | 1 | waitStatus值,表示Node处于取消状态。一般当Node处于超时或者中断,设置此值。取消节点关联的线程不会重新阻塞。 |
SIGNAL | final int | -1 | waitStatus值,表示Node的后续Node 已经或者即将通过park 阻塞。当此节点取消或者release时,后续节点需要unpark。为了避免竞争。acquire方式必须指示需要SIGNAL,重试acquire,在失败的情况下阻塞。 |
CONDITION | final int | -2 | waitStatus值,表示节点处于等待队列。当在某个时间点set to 0 ,用于同步队列。 |
PROPAGATE | final int | -3 | waitStatus值,用于共享模式,表示下一次acquire无条件的传播。 |
waitStatus | volatile int | 0 | 节点状态 |
prev | volatile Node | null | 前置节点 |
next | volatile Node | null | 后续节点 |
thread | volatile Thread | null | 关联线程 |
nextWaiter | Node | null | 指向下一个Node is waiting on condition。或者指向SHARED,EXCLUSIVE表示模式。 |
同步队列结构
字段 | 类型 | 初始值 | 意义 |
head | volatile Node | null | 同步队列头节点,延迟初始化,必须通过setHead方法设置值。 |
tail | volatile Node | null | 同步队列尾节点,延迟初始化,通过enq方法设置。 |
state | volatile int | 0 | 状态。用于追踪同步状态,具体由各子类处理。 |
- 每次加入节点到同步队列,都加在尾部,释放节点从头节点的后续节点释放。
- 一个节点位于队列头,并不保证acquire成功,只是尽量尝试acquire。
实现
AbstractQueuedSynchronizer仅实现抽象方法控制并发,由子类实现具体的资源控制。
子类需要实现的方法
方法 | 意义 |
tryAcquire | 尝试独占模式下acquire,失败则进入同步队列。 |
tryRelease | 尝试独占模式下release。 |
tryAcquireShared | 尝试共享模式下acquire,失败则进入同步队列。 |
tryReleaseShared | 尝试共享模式下release。 |
isHeldExclusively | 指示同步器是否在独占模式下被当前线程占用 |
独占模式实现
/**/public final void acquire(int arg) {if (!tryAcquire(arg) && --acquire失败,/* 第一尝试acquire失败后,加入到同步队列,会继续尝试*/acquireQueued(/* acquire失败,则加入一个节点到同步队列,节点为独占模式。*/addWaiter(Node.EXCLUSIVE), arg))/*acquireQueued 返回的是中断标识(中断标识以清除),如果为true ,重新设置中断标识*/selfInterrupt();}
/*
节点加入同步队列之后,会尝试acquire。如果失败会通过park阻塞。
*/ final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {//中断标志,标识此线程是否设置过中断标识。boolean interrupted = false;for (;;) {final Node p = node.predecessor(); //如果前一个节点是head节点,(head节点不关联线程),即本节点是第一个acquire失败的节点,则尝试acquire。if (p == head && tryAcquire(arg)) {//acquire成功,则把此节点设置为head节点(thread关联解除),返回中断标识。//【10】setHead(node);p.next = null; // help GCfailed = false;return interrupted;}//不是第一个accquire失败的节点,则判断是否通过park阻塞。if (//判断是否需要park。shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())//此线程 曾经被中断过。(中断标识被清除了)。interrupted = true;}} finally {//仅当出现异常时,才会进入此代码块,一般timeout或者中断,此时需取消节点。if (failed)cancelAcquire(node);}}
/*
判断是否需要park
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;//前一个节点的status为SIGNAL,表示后续节点需要parkif (ws == Node.SIGNAL) //【4】return true;if (ws > 0) {//ws > 0 ,仅当status=CANCELLED。则把取消节点剔除同步队列。do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/*状态为0或者PROPAGATE,独占模式为0,共享模式为PROPAGATE。则需要设置为SIGNAL,表示即将park(是否park由下一次acquire决定)。*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //【3】}return false;}
private final boolean parkAndCheckInterrupt() {LockSupport.park(this); //【5】/**********此处时重点,以上代码执行后,线程会立即挂起。当线程unpark后,后续代码接着运行。*///返回线程的中断标识。同时清除中断标识。return Thread.interrupted();}
/*取消acquire*/private void cancelAcquire(Node node) {// 节点不存在,返回。if (node == null)return;//取消线程关联。node.thread = null;// 跳过前置节点中的CANCELLED节点Node pred = node.prev;while (pred.waitStatus > 0)node.prev = pred = pred.prev;Node predNext = pred.next;//设置状态为CANCELLEDnode.waitStatus = Node.CANCELLED;// 如果是尾节点,仅释放自身。if (node == tail && compareAndSetTail(node, pred)) {//设置前置节点的next为null。compareAndSetNext(pred, predNext, null);} else {// If successor needs signal, try to set pred's next-link// so it will get one. Otherwise wake it up to propagate.int ws;if (pred != head && //前置节点不为head。((ws = pred.waitStatus) == Node.SIGNAL //前置节点为CANCELLED||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)) //前置节点为0,并且设置为CANCELLED成功) &&pred.thread != null //关联线程) {//以上判断条件,表示前置节点不为head节点,并且为SIGNALNode next = node.next;if (next != null && next.waitStatus <= 0)//如果本节点之后仍有后续节点,则剔除本节点,修改指针。//此处未处理node.next.prev,node节点通过next是剔除的,通过prev是可以访问得到的。compareAndSetNext(pred, predNext, next);} else {unparkSuccessor(node);}node.next = node; // help GC}}
private Node addWaiter(Node mode) {/* 构造一个节点,关联到当前线程*/Node node = new Node(Thread.currentThread(), mode);// 为了提高性能,先尝试一次加入同步队列。失败再尝试enq方式。enq方式时自旋方式(即死循环)Node pred = tail;if (pred != null) { node.prev = pred;//CASif (compareAndSetTail(pred, node)) { //【6】pred.next = node;return node;}}enq(node);return node;}
/*
节点加入同步队列,并返回此节点。
采用自旋方式加入,
*/private Node enq(final Node node) {//自旋for (;;) {Node t = tail;//tail为null,则head必定为null,生成一个node作为head。if (t == null) { // Must initializeif (compareAndSetHead(new Node())) //【1】tail = head;} else {//加入节点成尾节点,并返回。node.prev = t;if (compareAndSetTail(t, node)) { //【2】t.next = node;return t;}}}}
public final boolean release(int arg) {if (tryRelease(arg)) { //【7】//尝试成功Node h = head;if (h != null && h.waitStatus != 0) //head节点不为null,并且head状态不为0(在独占模式下,不为0,也不可能为CANCELLED,则为SIGNAL),则unpark后续节点。unparkSuccessor(h); //【8】return true;}return false;}
private void unparkSuccessor(Node node) {/*status < 0 ,则把status设置为0*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);Node s = node.next;if (s == null || s.waitStatus > 0) {//后续节点为null,或者状态为CANCELLED。s = null;//从tail往前查找到第一个status<0的节点,选中作为要unpark的节点for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)//unpark节点对应的线程。LockSupport.unpark(s.thread); //【9】}
独占模式同步队列示意
public class AbstractQueuedSynchronizerTest {@Testpublic void testAbstractQueuedSynchronizer() {Lock lock = new ReentrantLock();Runnable runnable0 = new ReentrantLockThread(lock);Thread thread0 = new Thread(runnable0);thread0.setName("t-0");Runnable runnable1 = new ReentrantLockThread(lock);Thread thread1 = new Thread(runnable1);thread1.setName("t-1");Runnable runnable2 = new ReentrantLockThread(lock);Thread thread2 = new Thread(runnable2);thread2.setName("t-2");Runnable runnable3 = new ReentrantLockThread(lock);Thread thread2 = new Thread(runnable3);thread3.setName("t-3");Runnable runnable4 = new ReentrantLockThread(lock);Thread thread4 = new Thread(runnable4);thread4.setName("t-4");thread0.start();thread1.start();thread2.start();thread3.start();thread4.start();thread2.interrupt();for (;;);}private class ReentrantLockThread implements Runnable {private Lock lock;public ReentrantLockThread(Lock lock) {this.lock = lock;}@Overridepublic void run() {try {lock.lock();for (int i=0;i<1000000;i++);} finally {lock.unlock();}}}}
以前假设各线程按顺序启动
队列变化如下:
1、同步队列初始化。thread:t-0 acquire成功。
2、thread:t-1 启动,执行代码【1】,【2】,【3】,【4】处后状态如下图,然后执行【5】阻塞。
3、thread:t-2 启动,执行代码【6】,【3】,【4】处后状态如下图,然后执行【5】阻塞。
3、thread:t-3 启动,执行代码【6】,【3】,【4】处后状态如下图,然后执行【5】阻塞。
4、thread:t-0,释放,执行代码【7】,【8】,经过【9】后,t-1 线程在代码【5】继续执行。经过【10】后状态如下图
5、thread,t-2,t-3,t-4类似
共享模式
public final void acquireShared(int arg) {//独占模式:tryAcquire,返回boolean表示是否成功。//共享模式:tryAcquireShared,返回int,小于0,表示acquire失败。if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);// nextWaiter为SHARED,表示共享模式。【11】boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg); //【14】if (r >= 0) {//如果r>=0表示,表示许可有剩余。设置head,并继续传播setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted) selfInterrupt(); //与acquire最后2行代码一样,如果中断过,仍设置中断标识。failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; //指向原来的head,后面使用。setHead(node);//设置head,【15】if (propagate > 0 // 许可有剩余 【16】|| h == null // 原有head为null,表示节点已释放|| h.waitStatus < 0 // 状态为PROPAGATE或SIGNAL||(h = head) == null // 新的head为null|| h.waitStatus < 0 //或者新的head的状态<0) {Node s = node.next; //当前节点的后续节点if (s == null || s.isShared()) //当前节点为共享模式或者后续节点为nulldoReleaseShared(); //【17】}}
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {
//头结点本身的waitStatus是SIGNAL且能通过CAS算法将头结点的waitStatus从SIGNAL设置为0,唤醒头结点的后继节点if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //【12】continue; // loop to recheck casesunparkSuccessor(h);//CAS成功,则...【13】}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
//头结点本身的waitStatus是0的话,尝试将其设置为PROPAGATE状态的,意味着共享状态可以向后传播continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}
共享模式同步队列示意:
package main.java.study;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;public class CountDownLatchTest {public class MapOper implements Runnable {CountDownLatch latch ;public MapOper(CountDownLatch latch) {this.latch = latch;}public void run() {try {SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println(Thread.currentThread().getName() + "start:" + df.format(new Date()));latch.await();System.out.println(Thread.currentThread().getName() + "work:" + df.format(new Date()));} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}System.out.println(Thread.currentThread().getName()+" Sync Started!");}}public static void main(String[] args) throws InterruptedException {// TODO Auto-generated method stubCountDownLatchTest test = new CountDownLatchTest();CountDownLatch latch = new CountDownLatch(1);Thread t1 = new Thread(test.new MapOper(latch));Thread t2 = new Thread(test.new MapOper(latch));Thread t3 = new Thread(test.new MapOper(latch));Thread t4 = new Thread(test.new MapOper(latch));t1.setName("Thread1");t2.setName("Thread2");t3.setName("Thread3");t4.setName("Thread4");t1.start();Thread.sleep(1500);t2.start();Thread.sleep(1500);t3.start();Thread.sleep(1500);t4.start();System.out.println("thread already start, sleep for a while...");Thread.sleep(1000);latch.countDown();}}
队列变化:
1、同步队列初始化:前3个工作线程调用await()方法,经过【11】,……,【5】线程挂起,状态如下:
2、主线程调用countDown()方法,经过【12】,【13】,【9】唤醒线程t-1,t-1继续执行,经过【14】,【15】状态如下:
3、t-1继续执行【16】,【17】唤醒下一个线程(node-2)
4、t-2,t-3依次,都唤醒下一个。(每个节点都由前置节点对应的线程唤醒,唤醒立即返回)
5、结束
AbstractQueuedSynchronizer源码解析相关推荐
- 面试官系统精讲Java源码及大厂真题 - 31 AbstractQueuedSynchronizer 源码解析(下)
31 AbstractQueuedSynchronizer 源码解析(下) 低头要有勇气,抬头要有底气. 引导语 AQS 的内容太多,所以我们分成了两个章节,没有看过 AQS 上半章节的同学可以回首看 ...
- 面试官系统精讲Java源码及大厂真题 - 30 AbstractQueuedSynchronizer 源码解析(上)
30 AbstractQueuedSynchronizer 源码解析(上) 不想当将军的士兵,不是好士兵. 引导语 AbstractQueuedSynchronizer 中文翻译叫做同步器,简称 AQ ...
- ReentrantReadWriteLock源码解析
概述 ReentrantReadWriteLock是Lock的另一种实现方式,我们已经知道了ReentrantLock是一个排他锁,同一时间只允许一个线程访问,而ReentrantReadWriteL ...
- AbstractQueuedSynchronizer 源码分析(共享锁)
为什么80%的码农都做不了架构师?>>> 源码看之前的问题 race condition如何避免? 工作流程是怎么样的? 使用什么方式实现的? 使用到的其他类说明和资料 Loc ...
- 死磕 java同步系列之ReentrantReadWriteLock源码解析
问题 (1)读写锁是什么? (2)读写锁具有哪些特性? (3)ReentrantReadWriteLock是怎么实现读写锁的? (4)如何使用ReentrantReadWriteLock实现高效安全的 ...
- 多线程(三)之ReentrantLock源码解析
2019独角兽企业重金招聘Python工程师标准>>> 今天分析ReentrantLock类的源码,在看源码之前,先学习AQS(AbstractQueuedSynchronizer) ...
- latch.await java有什么作用_java相关:CountDownLatch源码解析之await()
java相关:CountDownLatch源码解析之await() 发布于 2020-6-18| 复制链接 摘记: CountDownLatch 源码解析-- await(),具体内容如下上一篇文章说 ...
- synchronized 和 reentrantlock 区别是什么_JUC源码系列之ReentrantLock源码解析
目录 ReentrantLock 简介 ReentrantLock 使用示例 ReentrantLock 与 synchronized 的区别 ReentrantLock 实现原理 Reentrant ...
- 面试官系统精讲Java源码及大厂真题 - 32 ReentrantLock 源码解析
32 ReentrantLock 源码解析 才能一旦让懒惰支配,它就一无可为. 引导语 上两小节我们学习了 AQS,本章我们就要来学习一下第一个 AQS 的实现类:ReentrantLock,看看其底 ...
最新文章
- python 寻找数组的中心索引_Leetcode724:寻找数组的中心索引(java、python3)
- python【蓝桥杯vip练习题库】BASIC-28Huffuman树(贪心 Huffuman)
- android 上拉隐藏布局,Recycleview上拉隐藏与下拉显示
- 炼丹笔记 | 讲讲我们的故事
- fun(n) Java_java程序员的kotlin课(N+2):suspending函数执行编排
- php eurl.axd,Http异常eurl.axd出错信息解决方法
- py导入包异常跳出_python~异常处理及包
- cent os mysql 内存_Cent OS – MySQL – 主从配置
- Python反射和内置方法(双下方法)
- 4019 设备树 Linux device tree 概述
- 饿了么接入之饿了么首次下单测试
- Shiro原理流程,代码示例
- Apollo Planning决策规划算法代码解析 (17):SPEED_HEURISTIC_OPTIMIZER 速度动态规划下
- 第十二天-函数名 迭代器
- 如何把excel中的多行数据按行数拆分成多个
- vue 手写签名_与众不同的手写签批
- 世界海洋日|TcaplusDB与你一同保护海洋生物多样性
- im即时通讯开发如何理解定位技术
- java实现随机点名器
- 一个基于Python的体重BMI计算程序