AbstractQueuedSynchronizer概述

AbstractQueuedSynchronizer是java中非常重要的一个框架类,它实现了最核心的多线程同步的语义,我们只要继承AbstractQueuedSynchronizer就可以非常方便的实现我们自己的线程同步器,java中的锁Lock就是基于AbstractQueuedSynchronizer来实现的。下面首先展示了AbstractQueuedSynchronizer类提供的一些方法:

AbstractQueuedSynchronizer类方法

在类结构上,AbstractQueuedSynchronizer继承了AbstractOwnableSynchronizer,AbstractOwnableSynchronizer仅有的两个方法是提供当前独占模式的线程设置:

/**

* The current owner of exclusive mode synchronization.

*/

private transient Thread exclusiveOwnerThread;

/**

* Sets the thread that currently owns exclusive access.

* A {@code null} argument indicates that no thread owns access.

* This method does not otherwise impose any synchronization or

* {@code volatile} field accesses.

* @param thread the owner thread

*/

protected final void setExclusiveOwnerThread(Thread thread) {

exclusiveOwnerThread = thread;

}

/**

* Returns the thread last set by {@code setExclusiveOwnerThread},

* or {@code null} if never set. This method does not otherwise

* impose any synchronization or {@code volatile} field accesses.

* @return the owner thread

*/

protected final Thread getExclusiveOwnerThread() {

return exclusiveOwnerThread;

}

exclusiveOwnerThread代表的是当前获得同步的线程,因为是独占模式,在exclusiveOwnerThread持有同步的过程中其他的线程的任何同步获取请求将不能得到满足。

至此,需要说明的是,AbstractQueuedSynchronizer不仅支持独占模式下的同步实现,还支持共享模式下的同步实现。在java的锁的实现上就有共享锁和独占锁的区别,而这些实现都是基于AbstractQueuedSynchronizer对于共享同步和独占同步的支持。从上面展示的AbstractQueuedSynchronizer提供的方法中,我们可以发现AbstractQueuedSynchronizer的API大概分为三类:

类似acquire(int)的一类是最基本的一类,不可中断

类似acquireInterruptibly(int)的一类可以被中断

类似tryAcquireNanos(int, long)的一类不仅可以被中断,而且可以设置阻塞时间

上面的三种类型的API分为独占和共享两套,我们可以根据我们的需求来使用合适的API来做多线程同步。

下面是一个继承AbstractQueuedSynchronizer来实现自己的同步器的一个示例:

class Mutex implements Lock, java.io.Serializable {

// Our internal helper class

private static class Sync extends AbstractQueuedSynchronizer {

// Reports whether in locked state

protected boolean isHeldExclusively() {

return getState() == 1;

}

// Acquires the lock if state is zero

public boolean tryAcquire(int acquires) {

assert acquires == 1; // Otherwise unused

if (compareAndSetState(0, 1)) {

setExclusiveOwnerThread(Thread.currentThread());

return true;

}

return false;

}

// Releases the lock by setting state to zero

protected boolean tryRelease(int releases) {

assert releases == 1; // Otherwise unused

if (getState() == 0) throw new IllegalMonitorStateException();

setExclusiveOwnerThread(null);

setState(0);

return true;

}

// Provides a Condition

Condition newCondition() { return new ConditionObject(); }

// Deserializes properly

private void readObject(ObjectInputStream s)

throws IOException, ClassNotFoundException {

s.defaultReadObject();

setState(0); // reset to unlocked state

}

}

// The sync object does all the hard work. We just forward to it.

private final Sync sync = new Sync();

public void lock() { sync.acquire(1); }

public boolean tryLock() { return sync.tryAcquire(1); }

public void unlock() { sync.release(1); }

public Condition newCondition() { return sync.newCondition(); }

public boolean isLocked() { return sync.isHeldExclusively(); }

public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }

public void lockInterruptibly() throws InterruptedException {

sync.acquireInterruptibly(1);

}

public boolean tryLock(long timeout, TimeUnit unit)

throws InterruptedException {

return sync.tryAcquireNanos(1, unit.toNanos(timeout));

}

}}

Mutex实现的功能是:使用0来代表可以获得同步变量,使用1来代表需要等待同步变量被释放再获取,这是一个简单的独占锁实现,任何时刻只会有一个线程获得锁,其他请求获取锁的线程都会阻塞等待直到锁被释放,等待的线程将再次竞争来获得锁。Mutex给了我们很好的范例,我们要实现自己的线程同步器,那么就继承AbstractQueuedSynchronizer实现其三个抽象方法,然后使用该实现类来做lock和unlock的操作,可以发现,AbstractQueuedSynchronizer框架为我们铺平了道路,我们只需要做一点点改变就可以实现高效安全的线程同步去,下文中将分析AbstractQueuedSynchronizer是如何为我么提供如此强大得同步能力的。

AbstractQueuedSynchronizer实现细节

独占模式

AbstractQueuedSynchronizer使用一个volatile类型的int来作为同步变量,任何想要获得锁的线程都需要来竞争该变量,获得锁的线程可以继续业务流程的执行,而没有获得锁的线程会被放到一个FIFO的队列中去,等待再次竞争同步变量来获得锁。AbstractQueuedSynchronizer为每个没有获得锁的线程封装成一个Node再放到队列中去,下面先来分析一下Node这个数据结构:

/** waitStatus value to indicate thread has cancelled */

static final int CANCELLED = 1;

/** waitStatus value to indicate successor's thread needs unparking */

static final int SIGNAL = -1;

/** waitStatus value to indicate thread is waiting on condition */

static final int CONDITION = -2;

/**

* waitStatus value to indicate the next acquireShared should

* unconditionally propagate

*/

static final int PROPAGATE = -3;

上面展示的是Node定义的四个状态,需要注意的是只有一个状态是大于0的,也就是CANCELLED,也就是被取消了,不需要为此线程协调同步变量的竞争了。其他几个的意义见注释。上一小节说到,AbstractQueuedSynchronizer提供独占式和共享式两种模式,AbstractQueuedSynchronizer使用下面的两个变量来标志是共享还是独占模式:

/** Marker to indicate a node is waiting in shared mode */

static final Node SHARED = new Node();

/** Marker to indicate a node is waiting in exclusive mode */

static final Node EXCLUSIVE = null;

有趣的是,Node使用了一个变量nextWaiter来代表两种含义,当在独占模式下,nextWaiter表示下一个等在ConditionObject上的Node,在共享模式下就是SHARED,因为对于任何一个同步器来说,都不可能同时实现共享和独占两种模式的,更为专业的解释为:

/**

* Link to next node waiting on condition, or the special

* value SHARED. Because condition queues are accessed only

* when holding in exclusive mode, we just need a simple

* linked queue to hold nodes while they are waiting on

* conditions. They are then transferred to the queue to

* re-acquire. And because conditions can only be exclusive,

* we save a field by using special value to indicate shared

* mode.

*/

Node nextWaiter;

AbstractQueuedSynchronizer使用双向链表来管理请求同步的Node,保存了链表的head和tail,新的Node将会被插到链表的尾端,而链表的head总是代表着获得锁的线程,链表头的线程释放了锁之后会通知后面的线程来竞争共享变量。下面分析一下AbstractQueuedSynchronizer是如何实现独占模式下的acquire和release的。

首先,使用方法acquire(int)可以竞争同步变量,下面是调用链路:

public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

private Node addWaiter(Node mode) {

Node node = new Node(Thread.currentThread(), mode);

// Try the fast path of enq; backup to full enq on failure

Node pred = tail;

if (pred != null) {

node.prev = pred;

if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

enq(node);

return node;

}

final boolean acquireQueued(final Node node, int arg) {

boolean failed = true;

try {

boolean interrupted = false;

for (;;) {

final Node p = node.predecessor();

if (p == head && tryAcquire(arg)) {

setHead(node);

p.next = null; // help GC

failed = false;

return interrupted;

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} finally {

if (failed)

cancelAcquire(node);

}

}

首先会调用方法tryAcquire来尝试获的锁,而tryAcquire这个方法是需要子类来实现的,子类的实现无非就是通过compareAndSetState、getState、setState三个方法来操作同步变量state,子类的方法实现需要根据各自的需求场景来实现。继续分析上面的acquire流程,如果tryAcquire返回true了,也就是成功改变了state的值了,也就是获得了同步锁了,那么就可以退出了。如果返回false,说明有其他的线程获得锁了,这个时候AbstractQueuedSynchronizer会使用addWaiter将当前线程添加到等待队列的尾部等待再次竞争。需要注意的是将当前线程标记为了独占模式。然后重头戏来了,方法acquireQueued使得新添加的Node在一个for死循环中不断的轮询,也就是自旋,acquireQueued方法退出的条件是:

该节点的前驱节点是头结点,头结点代表的是获得锁的节点,只有它释放了state其他线程才能获得这个变量的所有权

在条件1的前提下,方法tryAcquire返回true,也就是可以获得同步资源state

满足上面两个条件之后,这个Node就会获得锁,根据AbstractQueuedSynchronizer的规定,获得锁的Node必须是链表的头结点,所以,需要将当前节点设定为头结点。那如果不符合上面两个条件的Node会怎么样呢?看for循环里面的第二个分支,首先是shouldParkAfterFailedAcquire方法,看名字应该是说判断是否应该park当前该线程,然后是方法parkAndCheckInterrupt,这个方法是在shouldParkAfterFailedAcquire返回true的前提之下才会之下,意思就是首先判断一下是否需要park该Node,如果需要,那么就park它。关于线程的park和unpark,AbstractQueuedSynchronizer使用了偏向底层的技术来实现,在此先不做分析。现在来分析一下再什么情况下Node会被park(block):

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

int ws = pred.waitStatus;

if (ws == Node.SIGNAL)

/*

* This node has already set status asking a release

* to signal it, so it can safely park.

*/

return true;

if (ws > 0) {

/*

* Predecessor was cancelled. Skip over predecessors and

* indicate retry.

*/

do {

node.prev = pred = pred.prev;

} while (pred.waitStatus > 0);

pred.next = node;

} else {

/*

* waitStatus must be 0 or PROPAGATE. Indicate that we

* need a signal, but don't park yet. Caller will need to

* retry to make sure it cannot acquire before parking.

*/

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

}

return false;

}

可以发现,只有当Node的前驱节点的状态为Node.SIGNAL的时候才会返回true,也就是说,只有当前驱节点的状态变为了Node.SIGNAL,才会去通知当前节点,所以如果前驱节点是Node.SIGNAL的,那么当前节点就可以放心的park就好了,前驱节点在完成工作之后在释放资源的时候会unpark它的后继节点。下面看一下release的过程:

public final boolean release(int arg) {

if (tryRelease(arg)) {

Node h = head;

if (h != null && h.waitStatus != 0)

unparkSuccessor(h);

return true;

}

return false;

}

private void unparkSuccessor(Node node) {

/*

* If status is negative (i.e., possibly needing signal) try

* to clear in anticipation of signalling. It is OK if this

* fails or if status is changed by waiting thread.

*/

int ws = node.waitStatus;

if (ws < 0)

compareAndSetWaitStatus(node, ws, 0);

/*

* Thread to unpark is held in successor, which is normally

* just the next node. But if cancelled or apparently null,

* traverse backwards from tail to find the actual

* non-cancelled successor.

*/

Node s = node.next;

if (s == null || s.waitStatus > 0) {

s = null;

for (Node t = tail; t != null && t != node; t = t.prev)

if (t.waitStatus <= 0)

s = t;

}

if (s != null)

LockSupport.unpark(s.thread);

}

首先通过tryRelease方法来保证资源安全完整的释放了之后,如果发现节点的状态小于0,会变为0。0代表的是初始化的状态,当前的线程已经完成了工作,释放了锁,就要恢复原来的样子。然后会获取该节点的后继节点,如果没有后续节点了,或者后继节点已经被取消了,那么从尾部开始向前找第一个符合要求的节点,然后unpark它。

上面介绍了一对acquire-release,如果希望线程可以在竞争的时候被中断,可以使用acquireInterruptibly。如果希望加上获取锁的时间限制,可以使用tryAcquireNanos(int, long)方法来获取。

共享模式

和独占模式一样,分析一下acquireShared的过程:

public final void acquireShared(int arg) {

if (tryAcquireShared(arg) < 0)

doAcquireShared(arg);

}

private void doAcquireShared(int arg) {

final Node node = addWaiter(Node.SHARED);

boolean failed = true;

try {

boolean interrupted = false;

for (;;) {

final Node p = node.predecessor();

if (p == head) {

int r = tryAcquireShared(arg);

if (r >= 0) {

setHeadAndPropagate(node, r);

p.next = null; // help GC

if (interrupted)

selfInterrupt();

failed = false;

return;

}

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} finally {

if (failed)

cancelAcquire(node);

}

}

获取锁的流程如下:

尝试使用tryAcquireShared方法,如果返回值大于等于0则表示成功,否则运行doAcquireShared方法

将当前竞争同步的线程添加到链表尾部,然后自旋

获取前驱节点,如果前驱节点是头节点,也就是说前驱节点现在持有锁,那么继续运行4,否则park该节点等待被unpark

使用tryAcquireShared方法来竞争,如果返回值大于等于0,那么就算是获取成功了,否则继续自旋尝试

共享模式下的release流程:

public final boolean releaseShared(int arg) {

if (tryReleaseShared(arg)) {

doReleaseShared();

return true;

}

return false;

}

private void doReleaseShared() {

for (;;) {

Node h = head;

if (h != null && h != tail) {

int ws = h.waitStatus;

if (ws == Node.SIGNAL) {

if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

continue; // loop to recheck cases

unparkSuccessor(h);

}

else if (ws == 0 &&

!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

continue; // loop on failed CAS

}

if (h == head) // loop if head changed

break;

}

}

private void unparkSuccessor(Node node) {

int ws = node.waitStatus;

if (ws < 0)

compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;

if (s == null || s.waitStatus > 0) {

s = null;

for (Node t = tail; t != null && t != node; t = t.prev)

if (t.waitStatus <= 0)

s = t;

}

if (s != null)

LockSupport.unpark(s.thread);

}

首先尝试使用tryReleaseShared方法来释放资源,如果释放失败,则返回false,如果释放成功了,那么继续执行doReleaseShared方法唤醒后续节点来竞争资源。需要注意的是,共享模式和独占模式的区别在于,独占模式只允许一个线程获得资源,而共享模式允许多个线程获得资源。所以在独占模式下只有当tryAcquire返回true的时候我们才能确定获得资源了,而在共享模式下,只要tryAcquireShared返回值大于等于0就可以说明获得资源了,所以你要确保你需要实现的需求和AbstractQueuedSynchronizer希望的是一致的。

桶独占模式一样,共享模式也有其他的两种API:

acquireSharedInterruptibly:支持相应中断的资源竞争

tryAcquireSharedNanos:可以设定时间的资源竞争

本文大概描述了AbstractQueuedSynchronizer框架的一些基本情况,具体的细节没有深究,但是AbstractQueuedSynchronizer作为Java中锁实现的底层支撑,需要好好研究一下,后续会基于AbstractQueuedSynchronizer来分析java中各种锁的实现。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

java synchronizer_Java同步框架AbstractQueuedSynchronizer详解相关推荐

  1. Java多线程同步和异步详解

    1. 多线程并发时,多个线程同时请求同一资源,必然导致此资源的数据不安全. 2. 线程池 在WEB服务中,对于web服务器的响应速度必须尽可能的快,这就容不得在用户提交请求按钮后,再创建线程提供服务. ...

  2. Java 多线程同步和异步详解

    多线程并发时,多个线程同时请求同一个资源,必然导致此资源的数据不安全,A线程修改了B线 程的处理的数据,而B线程又修改了A线程处理的数理.显然这是由于全局资源造成的,有时为了解 决此问题,优先考虑使用 ...

  3. java定时任务框架elasticjob详解

    这篇文章主要介绍了java定时任务框架elasticjob详解,Elastic-Job是ddframe中dd-job的作业模块中分离出来的分布式弹性作业框架.该项目基于成熟的开源产品Quartz和Zo ...

  4. java ajax同步请求,成都汇智动力-java ajax实现异步同步请求全面详解

    原标题:成都汇智动力-java ajax实现异步同步请求全面详解 对象 var request=new () //兼容IE5 IE6 if (window.) {// code for IE7+, F ...

  5. Java Spring框架入门详解教程【多测师_何sir】

    Spring框架入门详解教程 spring概述 spring结构 spring IOC spring DI spring概述 Spring是一个非常活跃的开源框架, 它是一个基于IOC和AOP来构架多 ...

  6. Java自动日志监控框架auto-log详解

    Java自动日志监控框架auto-log详解 1. 需求概述 2. auto-log简介 2.1 auto-log定义 2.2 auto-log目的 2.3 auto-log特性 2.4 注解说明 2 ...

  7. java框架魔乐_16 魔乐科技 SpringBoot框架开发详解

    资源内容: 16 魔乐科技 SpringBoot框架开发详解|____springboot开发代码.rar|____第一章:SpringBoot入门          |____2. SpringBo ...

  8. Java开发常见面试题详解(LockSupport,AQS,Spring循环依赖,Redis)_3

    Java开发常见面试题详解(LockSupport,AQS,Spring循环依赖,Redis)_3 总览 问题 详解 String.intern()的作用 link LeetCode的Two Sum题 ...

  9. Android UI 测试框架Espresso详解

    Android UI 测试框架Espresso详解 1. Espresso测试框架 2.提供Intents Espresso 2.1.安装 2.2.为Espresso配置Gradle构建文件 2.3. ...

最新文章

  1. ubuntu14.04安装intel openCL
  2. 数组模拟队列(代码实现)
  3. Python系统调用——运行其他程序
  4. mxGraph实现按住ctrl键盘拖动图形实现复制图形功能
  5. 面试题 01.06. 字符串压缩
  6. python 发红包import random用redenv_python 常用模块之random,os,sys 模块
  7. mysql5.6.20安装详解_MySql 5.6.20 安装 使用
  8. websphere配置oracle数据源,Websphere - 配置Oracle数据源
  9. 高岭土吸附阳离子_高岭石对金属阳离子的吸附特性研究
  10. 使用Hystrix守护应用(1)
  11. php utc时间_datetime - 以PHP格式获取UTC时间
  12. 盘点开发者喜欢用的浏览器,最后这一款值得拥有
  13. ManualResetEvent实现线程的暂停与恢复
  14. (88)信号发生器实现方法?三角波、方波、锯齿波,正弦波
  15. 小牛电动车能跑多快、多远?一起来了解一下
  16. Rush Hour Puzzle
  17. 万兆网络传输速度测试_万兆网络有多快?实测一把先!
  18. reactjs simple text editor
  19. 3种不同脸型的瘦脸方法
  20. python爬b站评论_一个简单的爬取b站up下所有视频的所有评论信息的爬虫

热门文章

  1. 关于X-UA-Compatible
  2. ReadOnlyDictionary之应用场景
  3. pandas数据导出Execl
  4. 201521123050 《Java程序设计》第8周学习总结
  5. POJ1050-To the Max
  6. 【转】ASP.NET 表单验证实现浅析
  7. 基础 - 字符读取函数scanf、getchar、gets、cin(清空缓存区解决单字符回车问题)
  8. adb devices 找不到夜神模拟器解决方法
  9. 【报告分享】2022中国人工智能人才培养报告.pdf(附下载链接)
  10. 【报告分享】产业互联网发展趋势及机会分析报告.pptx(附下载链接)