AQS-sync同步队列 [自定义同步器框架]
2019独角兽企业重金招聘Python工程师标准>>>
1、AQS是一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架
2、背景介绍
互斥锁
线程在获取互斥锁的时候,如果发现锁已经被其它线程占有,那么线程就会进行休眠,
然后在适当的时机(比如唤醒)在获取锁
自旋锁
那么自旋锁顾名思义就是“自旋”。就是当一个线程在尝试获取锁失败之后,线程不会休眠或者挂起,
而是一直在循环检测锁是否被其它线程释放。
区别
互斥锁就是开始开销要大于自旋锁。
临界区持锁时间的大小并不会对互斥锁的开销造成影响,而自旋锁是死循环检测,加锁全程消耗cpu,
起始开销虽然低于互斥锁,但是随着持锁时间,加锁的开销是线性增长。
适用的情况
互斥锁用于临界区持锁时间比较长的操作,比如下面这些情况都可以考虑:临界区有IO操作;
临界区代码复杂或者循环量大;临界区竞争非常激烈;单核处理器
自旋锁就主要用在临界区持锁时间非常短且CPU资源不紧张的情况下,当递归调用时有可能造成死锁
3、AQS中的队列锁
AQS框架里面的队列锁脱胎于CLH队列锁。
CLH锁是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋
AQS中的队列锁[wait queue]对CLH队列锁改动了两个地方,
1.节点结构上做出改变。
CLH队列锁的节点包含一个布尔类型locked的字段,如果要获取锁,就将这个locked设置为true,然后就不停的轮训前驱节点的locked是否释放了锁(这个过程我们就叫做自旋)
AQS的CLH队列在结构上引入了头节点,尾节点。并且拥有一个前节点与下一个节点的引用。
2.在等待获取锁的机制上由自旋改成了等待阻塞。
4、代码简述
AQS实现了线程等待队列的维护(如获取资源失败入队/唤醒出队等),自定义同步器只需要实现对资源state的获取与释放即可
state定义为volatile 类型,操作方式有getState()、setState()、compareAndSetState()
一般自定义同步器要么是共享方式要么是独占方式,当然也支持两种方式混用如:ReentrantReadWriteLock
共享式:acquireShared(int arg);releaseShared(int arg)
独占式: acquire(int arg);release(int arg)
PS:用到condition还需要实现isHeldExclusively(),condition释放后会将节点从阻塞队列摘下放入到sync同步队列,参与竞争资源-->执行
通假方法: acquire/acquireShared -->lock tryAcquire/tryAcquireShared-->tryLock
release/tryRelease -->unlock tryRelease/tryReleaseShared-->tryUnlock
A、独占式 实现acquire 忽略中断
A-1:我们先来看看acquire干了些啥,贴个源码便于分析
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();//最后如果获取到资源了,但是发现期间线程发生了中断,补上
}
tryAcquire(arg)由子类实现,获取到arg个资源就返回;否则压栈入队列等待资源获取
A-2:进入压栈入队列操作acquireQueued之前,分两步,同程序流程,先进入addWaiter一看
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//构造队列节点
Node pred = tail;
if (pred != null) {//能直接快速挂靠到尾节点自然最好
node.prev = pred;
if (compareAndSetTail(pred, node)) {//CAS式挂靠 允许失败
pred.next = node;
return node;
}
}
enq(node);//挂靠失败,看来在执行acquireQueued方法前,还需要再次查看下enq操作什么
return node;
}
enq操作如下,特别注意,此处是上文挂靠尾节点失败而来,所以应该保证成功:
private Node enq(final Node node) {
for (;;) {//for循环==当然这里要牛逼一点说成是自旋
Node t = tail;
if (t == null) { //尾节点不存在,这就需要初始化了
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {//CAS更新tail为node 自旋到操作成功
t.next = node;
return t;
/**YSMA-ASK1 问此处tail t已经被CAS更新为node了,
*但是为何没有赋值tail=node? 那其余线程获取的tail还是tail么? */
}
}
}
}
ok节点node已经入队列了,那我们看看acquireQueued做了些什么
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;//中断标识
for (;;) {//自旋修改前驱节点waitStatus为SIGNAL 然后park阻塞,等待唤醒
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {//如果前驱节点是head,就进行许可/资源获取
setHead(node);//获取到资源了,就更新自己为头
p.next = null;
failed = false;
return interrupted;//返回线程是否被中断过
}
//见名知意,获取许可失败后应该阻塞
if (shouldParkAfterFailedAcquire(p, node) //YSMA1
&& parkAndCheckInterrupt())//执行阻塞,返回线程interrupted状态
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);//tryAcquire抛异常等情况,取消节点抢夺资源的资格
}
}
如上YSMA1,我们扒一下shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)//前驱节点为SIGNAL状态,告诉acquireQueued可以进行park阻塞了
return true;
if (ws > 0) {//前驱节点为取消状态
do {
node.prev = pred = pred.prev; //从队列中循环摘除无效前驱节点
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//cas更新前驱节点waitStatus为SIGNAL,失败了还会进来重试的
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;//本次更新失败,告诉acquireQueued先别park线程
}
总结如下:
首先tryAcquire获取资源成功,就不用阻塞了,线程得到执行;
其次tryAcquire失败,需要阻塞线程,过程为:建一个EXCLUSIVE的node,挂到sync队列的尾部,
自旋-更新前驱节点的waitStatus状态为SIGNAL,成功后park阻塞线程,等待唤醒。
ps:YSMA_FLAG 则tail节点的waitStatus状态为0,因为没有后继节点来负责更新
B、独占式 实现relase 忽略中断
B-1:贴个源码,先来看看release干了些啥
public final boolean release(int arg) {
/**这个比较牛,失败就直接返回false了
* 所以难怪AQS的框架都要求手动release,且放在finally里面,就是怕release失败吧
*/
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//问:这里为什么要强调waitStatus不为0? 见YSMA_FLAG
unparkSuccessor(h);//唤醒后继节点
return true;
}
return false;
}
B-2:已经如此了,那就看一下unparkSuccessor吧
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)//更新状态为0,允许[但一般不会]失败 马上就要释放node了,没啥影响
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
/**如果后继节点已经被释放或者取消了
*从tail开始找起,找到里node[也就是head]最近的有效的node进行唤醒
**/
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒后继节点S的线程,S获取资源后会设置自己为head的
}
总结如下:
tryRelease(arg)释放arg个资源后,如果需要唤醒后继节点就返回true,否则返回false
比如重入锁场景,重入的时候state+1,释放的时候state-1但是此时持锁的还是自己且state不为0,
就应该返回false
C、共享式 实现acquireShared 忽略中断
共享模式下线程获取资源的顶层入口。获取指定量的资源,
获取到直接返回:do what you want
获取不到,阻塞等待,直到获取到资源返回:do what you want
PS:整个过程忽略中断:即线程interrupted了也会照样会阻塞,不过结束后会补一个中断
贴个源码:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared方法定义是由自定义同步器去去扩展。通过操作state来标识资源
负值代表获取失败;
0代表获取成功,但没有剩余资源;
正数表示获取成功,还有剩余资源,其他线程还可以去获取.
贴个源码:
protected boolean tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
doAcquireShared方法,将当前线程加入队列等待获取资源,直到被唤醒。
贴个源码:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//标识是共享模式的节点
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {//有资源,不用再阻塞等待了,更新自己为head,返回让主线程继续执行
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)//如果线程被中断了,补上.
selfInterrupt();
failed = false;
return;
}
/**如果前驱节点是head,但是拿不到资源--也不是不可能
*如countdownlatch 没有countdown 就需要一直await
*/
}
/**同上,cas自旋修改前驱节点状态为single,直到成功,然后进入park阻塞状态*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate方法值得一窥,不同于独占模式,这里会唤醒之后的节点来抢资源。
贴个源码:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);//自己变为head节点了
/**propagate资源还有很多,大于0. [唤醒后继节点来抢资源]
* 或者旧head为null:执行完毕
* 或者旧head节点的waiStatus<0:正在执行
* 或者本节点[新head]为null:执行完毕
* 或者本节点[新head]的waiStatus<0:正在执行
* 因为是或运算,所以Doug[dʌg] Lea应该是本着有枣没枣的先打它三杆子的想法书写
* YSMA-ASK2 head已经替换,但是head并没有重新显式指向新head,那么head还是head么?
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
doReleaseShared方法留于releaseShared处讲解。
最后,如果线程在等待期间被中断过,也就不用指望对应线程获得执行并在执行完毕后释放资源了
所以需要cancelAcquire从sync同步队列中卸下当前节点并唤醒后继节点
private void cancelAcquire(Node node) {
// node已经被释放了 已经被摘除过了
if (node == null)
return;
node.thread = null;
//循环摘除已取消状态的节点,将前驱索引指向有效节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;//前驱节点的后继节点,不一定是当前的node
node.waitStatus = Node.CANCELLED;//当前节点的状态置为取消
//当前节点是tail,也不用指向自己了,把pred置为tail并直接置后继节点null结束
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
//pred非head 且 waitStatus为SIGNAL或者可以变为SIGNAL 且thread不为null
int ws;
if (pred != head
&& ((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
&& pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
//当前节点的后继节点有效,挂到pred上,卸下当前节点
} else {
unparkSuccessor(node);//唤醒后继节点
/**此时并没有将当前节点的后继节点挂到predNext上?
*因为while已经摘除了无效节点了,所以此时也只能是pred为head这个情况了
*node的后继节点被唤醒后,会重新设置head的,所以此处可以忽略predNext的问题
*/
}
node.next = node; // help gc 为什么不是node.next=null? YSMA-ASK3
}
}
D、共享式 实现releaseShared 忽略中断
共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,
如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。
贴个源码:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
//不代表释放资源失败,可能仅仅是因为释放的资源太少,还达不到唤醒后继节点的条件
}
tryReleaseShared(arg)方法定义是由自定义同步器去去扩展,释放arg个资源,
这里返回的是布尔类型,可参考lock实现学习,这里贴个源码:
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
这里直接进入doReleaseShared,上源码:
private void doReleaseShared() {
for (;;) {//自旋...
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {//唤醒后继节点的标识
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
//YSMA1,CAS安全的更新waitStatus为SIGNAL,直到成功
continue;// loop to recheck cases
unparkSuccessor(h);//执行唤醒操作
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
//唤醒后继节点后,且已经更换head,更新waitStatus为PROPAGATE状态
continue; // loop on failed CAS
}
if (h == head)// loop if head changed
break;
}
}
总结:共享式的acquireShared的方式也是先判断前驱节点是否为head,是就竞争锁,
特别的,竞争成功后,如果资源大于0[事实上基本是必然执行的]会唤醒后继节点进行竞争锁
如此,doReleaseShared就有两个入口了,releaseShared和setHeadAndPropagate,
因为都是操作的head,所以就有了ws == Node.SIGNAL和ws == 0的判断了,标识为PROPAGATE的
也就无需再次进行唤醒后继节点的操作了
最后:关于YSMA-ASK1和YSMA-ASK2
是因为head和tail的定义为volatile
private transient volatile Node head; private transient volatile Node tail;
在结合static块代码:
private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long stateOffset;private static final long headOffset;private static final long tailOffset;private static final long waitStatusOffset;private static final long nextOffset;static {try {stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));} catch (Exception ex) { throw new Error(ex); }}
以及
private final boolean compareAndSetHead(Node update) {return unsafe.compareAndSwapObject(this, headOffset, null, update);}
答案就一目了然了,compareAndSwapObject操作的是索引,等同于显示赋值head=XXX
最后的最后,请允许我上一张自己画的Visio图
转载于:https://my.oschina.net/ysma1987/blog/3040540
AQS-sync同步队列 [自定义同步器框架]相关推荐
- java同步队列_Java 中队列同步器 AQS(AbstractQueuedSynchronizer)实现原理
前言 在 Java 中通过锁来控制多个线程对共享资源的访问,使用 Java 编程语言开发的朋友都知道,可以通过 synchronized 关键字来实现锁的功能,它可以隐式的获取锁,也就是说我们使用该关 ...
- 同步器AQS中的同步队列与等待队列
在单纯地使用锁,比如ReentrantLock的时候,这个锁组件内部有一个继承同步器AQS的类,实现了其抽象方法,加锁.释放锁也只是涉及到AQS中的同步队列而已,那么等待队列又是什么呢? 当使用Con ...
- AbstractQueuedSynchronizer同步队列与Condition等待队列协同机制
之前对AbstractQueuedSynchronizer(AQS)同步队列与Condition等待队列的功能一直不是很清晰,没太清楚地区分开二者的区别和联系,最近研究了一下分享出来. 1.同步队列和 ...
- java8 同步队列_秋招之路8:JAVA锁体系和AQS抽象队列同步器
整个的体系图 悲观锁,乐观锁 是一个广义概念:体现的是看待线程同步的不同角度. 悲观锁 认为在自己使用数据的时候一定有别的线程来修改数据,在获取数据的时候会先加锁,确保数据不被别的线程修改. 实现:关 ...
- Java Review - 并发编程_抽象同步队列AQS
文章目录 概述 AQS--锁的底层支持 state 的作用 ConditionObject 独占 VS 共享 独占方式下,获取与释放资源的流程 共享方式下,获取与释放资源的流程 Interruptib ...
- java队列加锁_java并发-----浅析ReentrantLock加锁,解锁过程,公平锁非公平锁,AQS入门,CLH同步队列...
前言 为什么需要去了解AQS,AQS,AbstractQueuedSynchronizer,即队列同步器.它是构建锁或者其他同步组件的基础框架(如ReentrantLock.ReentrantRead ...
- 【java】java JUC 同步器框架 AQS AbstractQueuedSynchronizer源码图文分析
1.概述 转载:JUC锁: 锁核心类AQS详解 AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore ...
- AQS同步队列结构分析
同步队列结构 AQS使用的同步队列是基于一种CLH锁算法来实现. CLH锁也是一种基于链表的可扩展.高性能.公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自 ...
- 死磕Java并发:J.U.C之AQS:CLH同步队列
本文转载自公号:Java技术驿站 在上篇文章"死磕Java并发:J.U.C之AQS简介"中提到了AQS内部维护着一个FIFO队列,该队列就是CLH同步队列. CLH同步队列是一个F ...
- AQS独占式同步队列入队与出队
入队 Node AQS同步队列和等待队列共用同一种节点结构Node,与同步队列相关的属性如下. prev 前驱结点 next 后继节点 thread 入队的线程 入队节点的状态 INITIAl 0 初 ...
最新文章
- 计算机科学中抽象的好处与问题—伪共享等实例分析
- shell脚本修复MySQL主从同步
- iis服务器配置php项目,Windows7下IIS+php配置教程
- 使用ADO.NET查询和操作数据
- mysql 5.6 没死_MySQL 5.6不删空用户的影响
- RNN、GRU、LSTM
- 将本地项目上传到gitLab操作
- spool文件命名引用两个变量_Python 中命名空间与作用域介绍
- 批量word删除页眉页脚——VBS脚本,在office宏中运行即可
- win7系统如何卸载漏洞补丁--win10专业版
- 解决:RuntimeError: Tensor for ‘out‘ is on CPU, Tensor for argument #1 ‘self‘ is on CPU, but expected t
- 大屏互动-大屏交互-大屏投影技术解决方案
- 安卓无线蓝牙耳机哪款好?实惠好用的蓝牙耳机品牌
- download sources报错: Cannot connect to the Maven process. Try again later. If the problem persists
- python 循环写入excel_用PYTHON将“for”循环的输出写到excel中
- 深入电子元器件行业产业场景,在线采购商城系统加速电子元器件交易数字化
- 佛经小故事--《盲龟浮木》
- SASE究竟还能火多久?
- 未来的计算机作文六百字,未来想象作文六百字
- impala 基础知识及使用
热门文章
- 统计学习中常用的损失函数
- SunnyOS准备4
- win7_fedora 双系统安装方法
- redis 实战面试
- 2018.09.23 bzoj1076: [SCOI2008]奖励关(期望+状压dp)
- (C/C++学习笔记)附页: C/C++变量的存储类型
- 项目回顾-RecyclerView和CheckBox错乱问题
- 源码分析--SDWebImage
- 【转】gem install libv8 错误
- 批量删除Cookie(实用)