目录

关于AbstractQueuedSynchronizer

基本数据结构

节点结构

同步队列结构

实现

子类需要实现的方法

独占模式实现

独占模式同步队列示意

共享模式

共享模式同步队列示意:


关于AbstractQueuedSynchronizer

JDK1.5之后引入了并发包java.util.concurrent,里面包含了很多并发控制锁类,其核心是:AbstractQueuedSynchronizer,其数据结构为链表方式的双向队列。

基本数据结构

节点结构

链表节点的字段含义如下:

Class:Node
字段 类型 初始值 意义
SHARED final Node 任意一个Node对象 一个指示器,用于标识Node处于共享模式。
EXCLUSIVE final Node null 一个指示器,用于标识Node处于独占模式。
CANCELLED final int 1 waitStatus值,表示Node处于取消状态。一般当Node处于超时或者中断,设置此值。取消节点关联的线程不会重新阻塞。
SIGNAL final int -1 waitStatus值,表示Node的后续Node 已经或者即将通过park 阻塞。当此节点取消或者release时,后续节点需要unpark。为了避免竞争。acquire方式必须指示需要SIGNAL,重试acquire,在失败的情况下阻塞。
CONDITION final int -2 waitStatus值,表示节点处于等待队列。当在某个时间点set to 0 ,用于同步队列
PROPAGATE final int -3 waitStatus值,用于共享模式,表示下一次acquire无条件的传播。
waitStatus volatile int 0 节点状态
prev volatile Node null 前置节点
next volatile Node null 后续节点
thread volatile Thread null 关联线程
nextWaiter Node null 指向下一个Node is waiting on condition。或者指向SHARED,EXCLUSIVE表示模式。

同步队列结构

同步队列
字段 类型 初始值 意义
head volatile Node null 同步队列头节点,延迟初始化,必须通过setHead方法设置值。
tail volatile Node null 同步队列尾节点,延迟初始化,通过enq方法设置。
state volatile int 0 状态。用于追踪同步状态,具体由各子类处理。
  1. 每次加入节点到同步队列,都加在尾部,释放节点从头节点的后续节点释放。
  2. 一个节点位于队列头,并不保证acquire成功,只是尽量尝试acquire。

实现

AbstractQueuedSynchronizer仅实现抽象方法控制并发,由子类实现具体的资源控制。

子类需要实现的方法

方法 意义
tryAcquire 尝试独占模式下acquire,失败则进入同步队列。
tryRelease 尝试独占模式下release。
tryAcquireShared 尝试共享模式下acquire,失败则进入同步队列。
tryReleaseShared 尝试共享模式下release。
isHeldExclusively 指示同步器是否在独占模式下被当前线程占用

独占模式实现


/**/public final void acquire(int arg) {if (!tryAcquire(arg) &&   --acquire失败,/* 第一尝试acquire失败后,加入到同步队列,会继续尝试*/acquireQueued(/*  acquire失败,则加入一个节点到同步队列,节点为独占模式。*/addWaiter(Node.EXCLUSIVE), arg))/*acquireQueued 返回的是中断标识(中断标识以清除),如果为true ,重新设置中断标识*/selfInterrupt();}
/*
节点加入同步队列之后,会尝试acquire。如果失败会通过park阻塞。
*/   final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {//中断标志,标识此线程是否设置过中断标识。boolean interrupted = false;for (;;) {final Node p = node.predecessor();   //如果前一个节点是head节点,(head节点不关联线程),即本节点是第一个acquire失败的节点,则尝试acquire。if (p == head && tryAcquire(arg)) {//acquire成功,则把此节点设置为head节点(thread关联解除),返回中断标识。//【10】setHead(node);p.next = null; // help GCfailed = false;return interrupted;}//不是第一个accquire失败的节点,则判断是否通过park阻塞。if (//判断是否需要park。shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())//此线程 曾经被中断过。(中断标识被清除了)。interrupted = true;}} finally {//仅当出现异常时,才会进入此代码块,一般timeout或者中断,此时需取消节点。if (failed)cancelAcquire(node);}}
/*
判断是否需要park
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;//前一个节点的status为SIGNAL,表示后续节点需要parkif (ws == Node.SIGNAL)      //【4】return true;if (ws > 0) {//ws > 0 ,仅当status=CANCELLED。则把取消节点剔除同步队列。do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/*状态为0或者PROPAGATE,独占模式为0,共享模式为PROPAGATE。则需要设置为SIGNAL,表示即将park(是否park由下一次acquire决定)。*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);   //【3】}return false;}
    private final boolean parkAndCheckInterrupt() {LockSupport.park(this);    //【5】/**********此处时重点,以上代码执行后,线程会立即挂起。当线程unpark后,后续代码接着运行。*///返回线程的中断标识。同时清除中断标识。return Thread.interrupted();}
   /*取消acquire*/private void cancelAcquire(Node node) {// 节点不存在,返回。if (node == null)return;//取消线程关联。node.thread = null;// 跳过前置节点中的CANCELLED节点Node pred = node.prev;while (pred.waitStatus > 0)node.prev = pred = pred.prev;Node predNext = pred.next;//设置状态为CANCELLEDnode.waitStatus = Node.CANCELLED;// 如果是尾节点,仅释放自身。if (node == tail && compareAndSetTail(node, pred)) {//设置前置节点的next为null。compareAndSetNext(pred, predNext, null);} else {// If successor needs signal, try to set pred's next-link// so it will get one. Otherwise wake it up to propagate.int ws;if (pred != head &&       //前置节点不为head。((ws = pred.waitStatus) == Node.SIGNAL  //前置节点为CANCELLED||(ws <= 0  && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))  //前置节点为0,并且设置为CANCELLED成功) &&pred.thread != null   //关联线程) {//以上判断条件,表示前置节点不为head节点,并且为SIGNALNode next = node.next;if (next != null && next.waitStatus <= 0)//如果本节点之后仍有后续节点,则剔除本节点,修改指针。//此处未处理node.next.prev,node节点通过next是剔除的,通过prev是可以访问得到的。compareAndSetNext(pred, predNext, next);} else {unparkSuccessor(node);}node.next = node; // help GC}}
    private Node addWaiter(Node mode) {/* 构造一个节点,关联到当前线程*/Node node = new Node(Thread.currentThread(), mode);// 为了提高性能,先尝试一次加入同步队列。失败再尝试enq方式。enq方式时自旋方式(即死循环)Node pred = tail;if (pred != null) {         node.prev = pred;//CASif (compareAndSetTail(pred, node)) {   //【6】pred.next = node;return node;}}enq(node);return node;}
/*
节点加入同步队列,并返回此节点。
采用自旋方式加入,
*/private Node enq(final Node node) {//自旋for (;;) {Node t = tail;//tail为null,则head必定为null,生成一个node作为head。if (t == null) { // Must initializeif (compareAndSetHead(new Node()))       //【1】tail = head;} else {//加入节点成尾节点,并返回。node.prev = t;if (compareAndSetTail(t, node)) {      //【2】t.next = node;return t;}}}}
    public final boolean release(int arg) {if (tryRelease(arg)) {    //【7】//尝试成功Node h = head;if (h != null && h.waitStatus != 0) //head节点不为null,并且head状态不为0(在独占模式下,不为0,也不可能为CANCELLED,则为SIGNAL),则unpark后续节点。unparkSuccessor(h);   //【8】return true;}return false;}
    private void unparkSuccessor(Node node) {/*status < 0 ,则把status设置为0*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);Node s = node.next;if (s == null || s.waitStatus > 0) {//后续节点为null,或者状态为CANCELLED。s = null;//从tail往前查找到第一个status<0的节点,选中作为要unpark的节点for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)//unpark节点对应的线程。LockSupport.unpark(s.thread);   //【9】}

独占模式同步队列示意

public class AbstractQueuedSynchronizerTest {@Testpublic void testAbstractQueuedSynchronizer() {Lock lock = new ReentrantLock();Runnable runnable0 = new ReentrantLockThread(lock);Thread thread0 = new Thread(runnable0);thread0.setName("t-0");Runnable runnable1 = new ReentrantLockThread(lock);Thread thread1 = new Thread(runnable1);thread1.setName("t-1");Runnable runnable2 = new ReentrantLockThread(lock);Thread thread2 = new Thread(runnable2);thread2.setName("t-2");Runnable runnable3 = new ReentrantLockThread(lock);Thread thread2 = new Thread(runnable3);thread3.setName("t-3");Runnable runnable4 = new ReentrantLockThread(lock);Thread thread4 = new Thread(runnable4);thread4.setName("t-4");thread0.start();thread1.start();thread2.start();thread3.start();thread4.start();thread2.interrupt();for (;;);}private class ReentrantLockThread implements Runnable {private Lock lock;public ReentrantLockThread(Lock lock) {this.lock = lock;}@Overridepublic void run() {try {lock.lock();for (int i=0;i<1000000;i++);} finally {lock.unlock();}}}}

以前假设各线程按顺序启动

队列变化如下:

1、同步队列初始化。thread:t-0 acquire成功。

2、thread:t-1 启动,执行代码【1】,【2】,【3】,【4】处后状态如下图,然后执行【5】阻塞。

3、thread:t-2 启动,执行代码【6】,【3】,【4】处后状态如下图,然后执行【5】阻塞。

3、thread:t-3 启动,执行代码【6】,【3】,【4】处后状态如下图,然后执行【5】阻塞。

4、thread:t-0,释放,执行代码【7】,【8】,经过【9】后,t-1 线程在代码【5】继续执行。经过【10】后状态如下图

5、thread,t-2,t-3,t-4类似

共享模式

    public final void acquireShared(int arg) {//独占模式:tryAcquire,返回boolean表示是否成功。//共享模式:tryAcquireShared,返回int,小于0,表示acquire失败。if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);// nextWaiter为SHARED,表示共享模式。【11】boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);   //【14】if (r >= 0) {//如果r>=0表示,表示许可有剩余。设置head,并继续传播setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)       selfInterrupt();   //与acquire最后2行代码一样,如果中断过,仍设置中断标识。failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
   private void setHeadAndPropagate(Node node, int propagate) {Node h = head; //指向原来的head,后面使用。setHead(node);//设置head,【15】if (propagate > 0   //  许可有剩余    【16】|| h == null    //  原有head为null,表示节点已释放|| h.waitStatus < 0    // 状态为PROPAGATE或SIGNAL||(h = head) == null    //  新的head为null|| h.waitStatus < 0   //或者新的head的状态<0) {Node s = node.next;  //当前节点的后续节点if (s == null || s.isShared()) //当前节点为共享模式或者后续节点为nulldoReleaseShared();  //【17】}}
    public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {
//头结点本身的waitStatus是SIGNAL且能通过CAS算法将头结点的waitStatus从SIGNAL设置为0,唤醒头结点的后继节点if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))  //【12】continue;            // loop to recheck casesunparkSuccessor(h);//CAS成功,则...【13】}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
//头结点本身的waitStatus是0的话,尝试将其设置为PROPAGATE状态的,意味着共享状态可以向后传播continue;                // loop on failed CAS}if (h == head)                   // loop if head changedbreak;}}

共享模式同步队列示意:

package main.java.study;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;public class CountDownLatchTest {public class MapOper implements Runnable {CountDownLatch latch ;public MapOper(CountDownLatch latch) {this.latch = latch;}public void run() {try {SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println(Thread.currentThread().getName() + "start:" + df.format(new Date()));latch.await();System.out.println(Thread.currentThread().getName() + "work:" + df.format(new Date()));} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}System.out.println(Thread.currentThread().getName()+" Sync Started!");}}public static void main(String[] args) throws InterruptedException {// TODO Auto-generated method stubCountDownLatchTest test = new CountDownLatchTest();CountDownLatch latch = new CountDownLatch(1);Thread t1 = new Thread(test.new MapOper(latch));Thread t2 = new Thread(test.new MapOper(latch));Thread t3 = new Thread(test.new MapOper(latch));Thread t4 = new Thread(test.new MapOper(latch));t1.setName("Thread1");t2.setName("Thread2");t3.setName("Thread3");t4.setName("Thread4");t1.start();Thread.sleep(1500);t2.start();Thread.sleep(1500);t3.start();Thread.sleep(1500);t4.start();System.out.println("thread already start, sleep for a while...");Thread.sleep(1000);latch.countDown();}}

队列变化:

1、同步队列初始化:前3个工作线程调用await()方法,经过【11】,……,【5】线程挂起,状态如下:

2、主线程调用countDown()方法,经过【12】,【13】,【9】唤醒线程t-1,t-1继续执行,经过【14】,【15】状态如下:

3、t-1继续执行【16】,【17】唤醒下一个线程(node-2)

4、t-2,t-3依次,都唤醒下一个。(每个节点都由前置节点对应的线程唤醒,唤醒立即返回)

5、结束

AbstractQueuedSynchronizer源码解析相关推荐

  1. 面试官系统精讲Java源码及大厂真题 - 31 AbstractQueuedSynchronizer 源码解析(下)

    31 AbstractQueuedSynchronizer 源码解析(下) 低头要有勇气,抬头要有底气. 引导语 AQS 的内容太多,所以我们分成了两个章节,没有看过 AQS 上半章节的同学可以回首看 ...

  2. 面试官系统精讲Java源码及大厂真题 - 30 AbstractQueuedSynchronizer 源码解析(上)

    30 AbstractQueuedSynchronizer 源码解析(上) 不想当将军的士兵,不是好士兵. 引导语 AbstractQueuedSynchronizer 中文翻译叫做同步器,简称 AQ ...

  3. ReentrantReadWriteLock源码解析

    概述 ReentrantReadWriteLock是Lock的另一种实现方式,我们已经知道了ReentrantLock是一个排他锁,同一时间只允许一个线程访问,而ReentrantReadWriteL ...

  4. AbstractQueuedSynchronizer 源码分析(共享锁)

    为什么80%的码农都做不了架构师?>>>    源码看之前的问题 race condition如何避免? 工作流程是怎么样的? 使用什么方式实现的? 使用到的其他类说明和资料 Loc ...

  5. 死磕 java同步系列之ReentrantReadWriteLock源码解析

    问题 (1)读写锁是什么? (2)读写锁具有哪些特性? (3)ReentrantReadWriteLock是怎么实现读写锁的? (4)如何使用ReentrantReadWriteLock实现高效安全的 ...

  6. 多线程(三)之ReentrantLock源码解析

    2019独角兽企业重金招聘Python工程师标准>>> 今天分析ReentrantLock类的源码,在看源码之前,先学习AQS(AbstractQueuedSynchronizer) ...

  7. latch.await java有什么作用_java相关:CountDownLatch源码解析之await()

    java相关:CountDownLatch源码解析之await() 发布于 2020-6-18| 复制链接 摘记: CountDownLatch 源码解析-- await(),具体内容如下上一篇文章说 ...

  8. synchronized 和 reentrantlock 区别是什么_JUC源码系列之ReentrantLock源码解析

    目录 ReentrantLock 简介 ReentrantLock 使用示例 ReentrantLock 与 synchronized 的区别 ReentrantLock 实现原理 Reentrant ...

  9. 面试官系统精讲Java源码及大厂真题 - 32 ReentrantLock 源码解析

    32 ReentrantLock 源码解析 才能一旦让懒惰支配,它就一无可为. 引导语 上两小节我们学习了 AQS,本章我们就要来学习一下第一个 AQS 的实现类:ReentrantLock,看看其底 ...

最新文章

  1. python 寻找数组的中心索引_Leetcode724:寻找数组的中心索引(java、python3)
  2. python【蓝桥杯vip练习题库】BASIC-28Huffuman树(贪心 Huffuman)
  3. android 上拉隐藏布局,Recycleview上拉隐藏与下拉显示
  4. 炼丹笔记 | 讲讲我们的故事
  5. fun(n) Java_java程序员的kotlin课(N+2):suspending函数执行编排
  6. php eurl.axd,Http异常eurl.axd出错信息解决方法
  7. py导入包异常跳出_python~异常处理及包
  8. cent os mysql 内存_Cent OS – MySQL – 主从配置
  9. Python反射和内置方法(双下方法)
  10. 4019 设备树 Linux device tree 概述
  11. 饿了么接入之饿了么首次下单测试
  12. Shiro原理流程,代码示例
  13. Apollo Planning决策规划算法代码解析 (17):SPEED_HEURISTIC_OPTIMIZER 速度动态规划下
  14. 第十二天-函数名 迭代器
  15. 如何把excel中的多行数据按行数拆分成多个
  16. vue 手写签名_与众不同的手写签批
  17. 世界海洋日|TcaplusDB与你一同保护海洋生物多样性
  18. im即时通讯开发如何理解定位技术
  19. java实现随机点名器
  20. 一个基于Python的体重BMI计算程序

热门文章

  1. VS2010 IDE新特性随笔
  2. JavaWeb 安全问题及解决方案
  3. 使用内置函数操作数据库
  4. Git最最常用的命令
  5. CCNA认证(1)--CCNA简介
  6. jQuery 9 相对选择器
  7. Asp.net在IIS6.0权限设置的问题,大牛进
  8. FreeMarker快速上手
  9. 「干货」编程语言十大经典算法,你知道几个?
  10. java 找出list中相同数据_Java获取List中相同的数据