概述

在多线程程序中,往往存在对共享资源的竞争访问,为了对共享资源进行保护,需要使用一些同步工具,比如 synchronized、ReentrantLock、Semaphore、ReentrantReadWriteLock等,后三种都是在同步框架 AQS(即 AbstractQueuedSynchronizer)的基础上构建的。

下面这段话来自 AQS 的代码文档,描述了其设计意图:提供一个框架用于实现依赖先进先出 FIFO 等待队列的阻塞锁和相关同步器。Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.

下面我们重点分析一下同步框架 AQS 的实现原理和用法。

实现

一个同步器有两个基本功能:获取同步器、释放同步器。AQS 提供了多种锁模式:独占、共享

可中断、不可中断

带超时时间的

设计模式

AQS 采用了标准的模版方法设计模式,对外提供的是以下的方法:// 独占模式

public final void acquire(int arg);    public final boolean release(int arg);    // 独占可中断

public final void acquireInterruptibly(int arg)

throws InterruptedException;    // 独占带超时时间的

public final boolean tryAcquireNanos(int arg, long nanosTimeout);    // 共享模式

public final void acquireShared(int arg);    public final boolean releaseShared(int arg);    // 共享可中断

public final void acquireSharedInterruptibly(int arg)

throws InterruptedException;    // 共享带超时时间

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException;

这些方法上都带了 final 关键字,也就是说不允许重写,那么哪些方法可以重写呢?//独占模式protected boolean tryAcquire(int arg);protected boolean tryRelease(int arg);//共享模式protected int tryAcquireShared(int arg);protected boolean tryReleaseShared(int arg);//是否是独占模式protected boolean isHeldExclusively();

这些模版方法在代码中起什么作用呢?请看他们的调用方,在 acquire 方法中,当 tryAcquire 返回 true 则表示已经获得了锁,否则先 addWaiter 进入等待队列,再 acquireQueued 等候获取锁。acquireInterruptibly 也是类似的,区别只是对中断的处理不同。public final void acquire(int arg) {        if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}    public final void acquireInterruptibly(int arg)

throws InterruptedException {        if (Thread.interrupted())            throw new InterruptedException();        if (!tryAcquire(arg))

doAcquireInterruptibly(arg);

}

以 acquire 为例,真正的排队阻塞等待锁是在 addWaiter 和 acquireQueued 中,那么 tryAcquire 方法需要做什么事儿呢?我们先看一下 ReentrantLock 中公平锁 FairSync 和非公平锁 NonfairSync 的实现:// 公平锁

static final class FairSync extends Sync {        // omit a lot

protected final boolean tryAcquire(int acquires) {            final Thread current = Thread.currentThread();            int c = getState();            if (c == 0) {                if (!hasQueuedPredecessors() &&

compareAndSetState(0, acquires)) {

setExclusiveOwnerThread(current);                    return true;

}

}            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc

setState(nextc);                return true;

}            return false;

}

}    // 非公平锁

static final class NonfairSync extends Sync {        // omit a lot

protected final boolean tryAcquire(int acquires) {            return nonfairTryAcquire(acquires);

}

}    abstract static class Sync extends AbstractQueuedSynchronizer {        // omit a lot

final boolean nonfairTryAcquire(int acquires) {            final Thread current = Thread.currentThread();            int c = getState();            if (c == 0) {                if (compareAndSetState(0, acquires)) {

setExclusiveOwnerThread(current);                    return true;

}

}            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc

throw new Error("Maximum lock count exceeded");

setState(nextc);                return true;

}            return false;

}

}

公平锁的 tryAcquire 方法所做事情大致如下:检查了锁的状态,如果锁不被任何线程占用,且没有等待中的线程,则尝试用 CAS 设置状态,成功则 setExclusiveOwnerThread 设置占用锁的线程为当前线程。

如果当前线程已经占有了该锁,则 setState 更新更新重入次数。

上述两条都失败,则表示无法立即获得锁,返回 false。

非公平锁的 tryAcquire 也大致一样,只是缺少判断是否有等待中的线程,而是尽最大努力去获取锁。他们的共同点是更新锁状态 state,设置当前占用锁的线程 setExclusiveOwnerThread,其中最关键的是更新锁状态 state。

值得注意的是,为何 tryAcquire 这些方法不是抽象方法,而是提供了一个默认的抛异常的方法呢?因为 AQS 中包含多种模式,而实际使用者一般只需要一种,如果不提供默认拒绝的实现,那就需要使用方去手动覆盖,反而显得啰嗦了。// in AQS

protected boolean tryAcquire(int arg) {        throw new UnsupportedOperationException();

}    protected int tryAcquireShared(int arg) {        throw new UnsupportedOperationException();

}

CHL 队列

在 AQS 中,提供了一个基于 CHL 队列的先进先出的阻塞锁。CHL 队列的入队出队操作是不需要加锁的,只是在入队成功后等待资源释放时会阻塞。在Java锁的种类以及辨析(二):自旋锁的其他种类里也提到了一种基于自旋的 CHL 锁。

CHL 队列本质上就是一个基于 CAS 实现的双向链表,其实也算“无锁队列”了,其节点 Node 如下所示:static final class Node {        volatile int waitStatus; // 节点状态

volatile Node prev;      // 前置节点

volatile Node next;      // 后置节点

volatile Thread thread;  // 所属线程

Node nextWaiter;         // 可以用于标记独占 OR 共享模式,也可以用来记录等待条件变量的下一个节点

}

当 nextWaiter 用于标记独占或共享时,其值可以为:/** Marker to indicate a node is waiting in shared mode */static final Node SHARED = new Node();/** Marker to indicate a node is waiting in exclusive mode */static final Node EXCLUSIVE = null;

waitStatus 的值可以有以下几种:static final int CANCELLED =  1;/** waitStatus value to indicate successor's thread needs unparking */static final int SIGNAL    = -1;/** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2;/**

* waitStatus value to indicate the next acquireShared should

* unconditionally propagate

*/static final int PROPAGATE = -3;

入队

CHL 队列的入队操作间 addWaiter 方法,入队采用 CAS 自旋重试,总是添加到尾节点:// 尾节点

private transient volatile Node head;    // 头节点

private transient volatile Node tail;    private Node addWaiter(Node mode) {

Node node = new Node(Thread.currentThread(), mode);        // Try the fast path of enq; backup to full enq on failure

Node pred = tail;        if (pred != null) {

node.prev = pred;            if (compareAndSetTail(pred, node)) {

pred.next = node;                return node;

}

}

enq(node);        return node;

}    private Node enq(final Node node) {        for (;;) {

Node t = tail;            if (t == null) { // Must initialize

if (compareAndSetHead(new Node()))

tail = head;

} else {

node.prev = t;                if (compareAndSetTail(t, node)) {

t.next = node;                    return t;

}

}

}

}

出队

如果当前节点的前任节点是头节点,则尝试获得锁,成功则出队,并且返回。下面代码的第 8-11 行为出队操作,所做事情为:把本节点 node 置为头节点,并且把新旧头节点直接的关联字段置空(新节点 prev 和旧节点 next)。

注:旧头节点置空是为了什么呢?已经无用的 Node 能够被 GC 回收掉。final boolean acquireQueued(final Node node, int arg) {        boolean failed = true;        try {            boolean interrupted = false;            for (;;) {                final Node p = node.predecessor();                if (p == head && tryAcquire(arg)) {

setHead(node);

p.next = null; // help GC

failed = false;                    return interrupted;

}                if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} finally {            if (failed)

cancelAcquire(node);

}

}    private void setHead(Node node) {

head = node;

node.thread = null;

node.prev = null;

}

AQS 之 ConditionObject

在 AQS 中,还有一个 ConditionObject 类,顾名思义是一个条件变量,提供了类似于 Object wait/notify 的机制。

ConditionObject 实现了 Condition 接口,提供了多种条件变量等待接口:可中断、不可中断、超时机制,以及两种唤醒机制:单个、全部。public interface Condition {

// 在条件变量上等待

void await() throws InterruptedException;

void awaitUninterruptibly();

long awaitNanos(long nanosTimeout) throws InterruptedException;

boolean await(long time, TimeUnit unit) throws InterruptedException;

boolean awaitUntil(Date deadline) throws InterruptedException;

// 唤醒

void signal();

void signalAll();

}

await

await 方法类似于 Object#wait 方法,调用之前需要先获得锁。public final void await() throws InterruptedException {            if (Thread.interrupted())                throw new InterruptedException();            // 将当前线程添加到条件队列里

Node node = addConditionWaiter();            // 是否锁资源,也就是说调用 await 之前要先获得锁

int savedState = fullyRelease(node);            int interruptMode = 0;            // 循环阻塞等待,直到被中断,或进入 AQS 锁的同步队列

while (!isOnSyncQueue(node)) {

LockSupport.park(this);                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)                    break;

}            // signal 之后,进入同步队列,再通过 acquireQueued 竞争锁

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

interruptMode = REINTERRUPT;            if (node.nextWaiter != null) // clean up if cancelled

unlinkCancelledWaiters();            // 中断处理

if (interruptMode != 0)

reportInterruptAfterWait(interruptMode);

}

signal

signal 方法类似于 Object#notify 方法,主要功能是唤醒下一个等待中的线程。public final void signal() {            if (!isHeldExclusively())                throw new IllegalMonitorStateException();

Node first = firstWaiter;            if (first != null)

doSignal(first);

}        // 唤醒一个节点

private void doSignal(Node first) {            do {                if ( (firstWaiter = first.nextWaiter) == null)

lastWaiter = null;

first.nextWaiter = null;

} while (!transferForSignal(first) &&

(first = firstWaiter) != null);

}    final boolean transferForSignal(Node node) {        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))            return false;

Node p = enq(node);        int ws = p.waitStatus;        // 唤醒该节点

if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))

LockSupport.unpark(node.thread);        return true;

}

用法

ConditionObject 常用在各种队列的实现中,比如 ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue等,这里我们以 ArrayBlockingQueue 为例学习一下其用法。

请看下面 ArrayBlockingQueue 的代码里锁和条件变量的用法,条件变量是跟锁绑定的,这里一个锁对应多个条件变量:队列非空、队列非满。public class ArrayBlockingQueue extends AbstractQueue        implements BlockingQueue, java.io.Serializable {    /** Main lock guarding all access */

final ReentrantLock lock;    /** Condition for waiting takes */

private final Condition notEmpty;    /** Condition for waiting puts */

private final Condition notFull;    public ArrayBlockingQueue(int capacity, boolean fair) {        if (capacity <= 0)            throw new IllegalArgumentException();        this.items = new Object[capacity];

lock = new ReentrantLock(fair);

notEmpty = lock.newCondition();

notFull =  lock.newCondition();

}

}

在向队列中存放元素时:获得锁

如果队列满,则调用 notFull.await() 等待

队列不满时,则插入数据,并且向 notEmpty 条件变量发 signal 信号

解锁public void put(E e) throws InterruptedException {

checkNotNull(e);        final ReentrantLock lock = this.lock;

lock.lockInterruptibly();        try {            while (count == items.length)

notFull.await();

insert(e);

} finally {

lock.unlock();

}

}    private void insert(E x) {

items[putIndex] = x;

putIndex = inc(putIndex);

++count;

notEmpty.signal();

}

在从队列里取元素时:加锁

如果队列空,则调用 notEmpty.await() 等待

队列非空,则取数据,并且向 notFull 条件变量发送 signal 信号

解锁public E take() throws InterruptedException {        final ReentrantLock lock = this.lock;

lock.lockInterruptibly();        try {            while (count == 0)

notEmpty.await();            return extract();

} finally {

lock.unlock();

}

}    private E extract() {        final Object[] items = this.items;

E x = this.cast(items[takeIndex]);

items[takeIndex] = null;

takeIndex = inc(takeIndex);

--count;

notFull.signal();        return x;

}

总结

AQS 框架提供了先进先出的阻塞锁实现,在此基础上,提供了独占和共享等多种模式供使用方实现。除此之外,还提供了一个条件变量的实现。

锁是一种线程同步机制,用于保护对临界资源的访问。条件变量提供了一个“等待 - 唤醒”的机制,在阻塞队列里起到了生产者和消费者之间的通信的作用。

作者:albon

链接:https://www.jianshu.com/p/2df967e50873

java中chl列表_Java 同步框架 AQS 深入分析相关推荐

  1. java对列表数据排序_如何在Java中对列表进行排序

    java对列表数据排序 Sometimes we have to sort a list in Java before processing its elements. In this tutoria ...

  2. java中怎样克隆,如何在Java中克隆列表?

    要克隆Java中的列表,最简单的方法是使用ArrayList.clone()方法- 示例import java.util.ArrayList; public class Demo { public s ...

  3. 遍历Java中的列表的方法

    本文翻译自:Ways to iterate over a list in Java Being somewhat new to the Java language I'm trying to fami ...

  4. 遍历Java中的列表

    遍历Java中的列表 这篇文章将讨论在 Java 中迭代列表的各种方法. 1.使用 List.toString() 方法 如果我们只是想显示列表的内容,我们可以通过使用 toString() 方法,然 ...

  5. Java同步框架AQS原文分析

    一.概述 类如其名,抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch ...

  6. java中fork函数_java中的forkjoin框架的使用

    fork join框架是java 7中引入框架,这个框架的引入主要是为了提升并行计算的能力. fork join主要有两个步骤,第一就是fork,将一个大任务分成很多个小任务,第二就是join,将第一 ...

  7. java 可变参数列表_java中可变参数列表的实现方法

    我们在对可变参数有一定的认识后,可以引申一下它的使用范围.在数组中也会需要参数的传入,那么结合参数的数量不固定,我们在参数类型上也得到了增加,这就是本篇所要讲的可变参数列表.下面我们就java可变参数 ...

  8. java中包的_Java中的包

    包:定义包用package关键字. 1:对类文件进行分类管理. 2:给类文件提供多层名称空间. 如果生成的包不在当前目录下,需要最好执行classpath,将包所在父目录定义到classpath变量中 ...

  9. 5 java中的集合类_java基础(5)-集合类1

    集合的由来 数组是很常用的一种数据结构,但假如我们遇到以下这样的的问题: 容器长度不确定 能自动排序 存储以键值对方式的数据 如果遇到这样的情况,数组就比较难满足了,所以也就有了一种与数组类似的数据结 ...

  10. java中使用配置文件_Java中使用Properties配置文件的简单方法

    Java中使用Properties配置文件的简单方法 properties Properties文件是java中的一种配置文件,文件后缀为".properties",文件的内容格式 ...

最新文章

  1. Python之pandas数据加载、存储
  2. js中怎么为同级元素添加点击事件
  3. 给网页添加二维码功能
  4. 挪车+php,还在苦苦寻找占你车位的人?关注这个微信号实现“一键挪车”
  5. Python--DBUtil
  6. 2021“MINIEYE杯”中国大学生算法设计超级联赛(2)I love counting(Trie树)
  7. 装饰器模式(讲解+应用)
  8. 使用 OpenCL.Net 进行 C# GPU 并行编程
  9. 微软企业库Unity学习笔记
  10. Java中递归复制多级文件夹(IO流)
  11. 前端技术规划与战略:2022
  12. Python_Pandas_分组汇总数据和创建数据透视表
  13. 计算机表格两行互换步骤,excel怎么让两行互换位置,EXCEL里两个格的内容怎么互换?...
  14. 8. 数仓开发之 DIM 层
  15. 【Go Web学习笔记】第三章 Go与表单的操作
  16. iText如何设置行距
  17. 第十九周学习周报(20180709-20180715)
  18. MySQL、SqlServer、Oracle 三种数据库的优缺点总结
  19. 全波形反演的深度学习方法: 第 4 章 基于正演的 FWI
  20. 非常全面的NFS文档(FOR LINUX)

热门文章

  1. wcf部署到IIS上,安装iis和wcf组件方法,并提供wcf文件
  2. linux 判断某进程 前台还是后台,Linux进程管理——进程前后台(优先级)以及作业控制等...
  3. iOS几款实用的工具类demo
  4. iphone新旧手机数据传输已取消_iPhone 手机支付宝自动扣费?取消服务提示“无法解约”?...
  5. ES6学习笔记七(Set和Map)
  6. solr导入mysql时间类型_docker 安装solr 导入mysql时报错
  7. 修改XAMPP端口(2)
  8. apache-storm例子:统计句子中的单词数量
  9. 大哥你遇到问题,博文在此,连找都懒得找吗?
  10. 编译WINDOWS版FFmpeg:编译SDL