AbstractQueuedSynchronizer是实现Java并发类库的一个基础框架,Java中的各种锁(RenentrantLock, ReentrantReadWriteLock)以及同步工具类(Semaphore, CountDownLatch)等很多都是基于AbstractQueuedSynchronizer实现的。AbstractQueuedSynchronizer 一般简称AQS,Abstract表示他是一个抽象类,Queued表示他是基于先进先出 FIFO 等待队列实现的,Synchronizer表示他是一个同步器。

基于队列的意思是,我们用锁来说明,比如多个线程想要获得同一个对象上的锁,那么这些线程会按照申请锁的先后顺序在该锁对象中的一个FIFO队列上排队等待(也就是将这些线程对象的引用插入到该锁的队列中)。AQS是Java并发的基础框架,同时AOS的实现的基础却是 sun.misc.Unsafe 和 volatile,当然还有LockSupport工具类,LockSupport也是借助于Unsafe,主要实现线程的“阻塞”(park)和线程的“唤醒阻塞”(unpark)。基本原理是 sun.misc.Unsafe 保证了内存操作的“原子性”,而volatile保证了内存“可见性”。Unsafe的源码可以参见:http://www.docjar.com/html/api/sun/misc/Unsafe.java.html ,它提供了各种原子性的内存CAS操作。

本文从ReentrantLock的实现来初步探索AbstractQueuedSynchronizer。为了好把握方向,我们将ReentrantLock的源码(Java1.8.0_40)简化如下:

public class ReentrantLock implements Lock, java.io.Serializable {private static final long serialVersionUID = 7373984872572414699L;/** Synchronizer providing all implementation mechanics */private final Sync sync;abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = -5179523762034025860L;abstract void lock();final boolean nonfairTryAcquire(int acquires) {// ... ...
        }protected final boolean tryRelease(int releases) {// ... ...
        }// ... ...
    }/*** Sync object for non-fair locks*/static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}/*** Sync object for fair locks*/static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}protected final boolean tryAcquire(int acquires) {// ... ...
        }}public ReentrantLock() {sync = new NonfairSync();}public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}public void lock() {sync.lock();}public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}public boolean tryLock() {return sync.nonfairTryAcquire(1);}public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));}public void unlock() {sync.release(1);}    public Condition newCondition() {        return sync.newCondition();    }// ... ...
}

可以明显的看到,ReentrantLock 实现的所以接口都是借助于他的实例属性——同步器sync来实现的,从构造函数可以看出,ReentrantLock默认是非公平锁——使用非公平同步器NonfairSync,传入true时得到的是公平锁——使用公平同步器FairSync。而这两者都是继承于抽象类Sync,而抽象类Sync又继承于我们的AbstractQueuedSynchronizer。我们先整体看下AQS的实现代码:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {static final class Node {   volatile int waitStatus;volatile Node prev;volatile Node next;volatile Thread thread;Node nextWaiter;Node(Thread thread, Node mode) {     // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}}/*** Head of the wait queue, lazily initialized. */private transient volatile Node head;/*** Tail of the wait queue, lazily initialized.  */private transient volatile Node tail;/*** The synchronization state.*/private volatile int state;/*** Inserts node into queue, initializing if necessary. See picture above.* @param node the node to insert* @return node's predecessor*/private Node enq(final Node node) {// ... ...
    }/*** Creates and enqueues node for current thread and given mode.* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared* @return the new node*/private Node addWaiter(Node mode) {// ... ...
    }/*** Sets head of queue to be node, thus dequeuing. Called only by* acquire methods.  Also nulls out unused fields for sake of GC* and to suppress unnecessary signals and traversals.* @param node the node*/private void setHead(Node node) {// ... ...
    }public class ConditionObject implements Condition, java.io.Serializable {private static final long serialVersionUID = 1173984872572414699L;/** First node of condition queue. */private transient Node firstWaiter;/** Last node of condition queue. */private transient Node lastWaiter;// ... ...
    }
}

AbstractQueuedSynchronizer的实现包含了两个内部类,Node 类和 ConditionObject类,而后者只有在使用 ReentrantLock.newCondition()时才会用到,暂时不去管它。Node类主要作为FIFO队列上的节点,存储在锁上等待的所有线程对象的信息。提供了enq(final Node node)方法用于插入队列尾部,addWaiter(Node mode)方法用于加入FIFO队列,setHead(Node node)用于初始化FIFO队列的头部。所以AbstractQueuedSynchronizer没有我们想象的那么复杂,它主要是用于实现一个FIFO的等待队列(我们暂时放下ConditionObject不管),以及管理同步器的状态status。

我们在看一下他继承的父类:

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {protected AbstractOwnableSynchronizer() { }/*** The current owner of exclusive mode synchronization.*/private transient Thread exclusiveOwnerThread;protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;}protected final Thread getExclusiveOwnerThread() {return exclusiveOwnerThread;}
}

很简单,就是实现了互斥同步器的所有者的功能,比如互斥锁正被哪个线程占有者。

我们大体了解了AbstractQueuedSynchronizer之后,我们再从细节上仔细分析ReentrantLock的实现。

1)ReentrantLock.lock实现分析

ReentrantLock分为公平和非公平的锁,NonfairSync 和 FairSync的lock实现分别如下:

    static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock.  Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}/*** Fair version of tryAcquire.  Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}

NonfairSync.lock 和 FairSync.lock实现差别只有两行代码:

if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());

就是这两行代码使得了 NonfairSync.lock 的锁的实现是非公平的,这两行代码的意思是:如果sync同步器的状态为0,也就是锁没有被占,那么就设置为1,也就是立刻获得锁,并且设置锁的拥有者。也就是说非公平锁,可以 不进入等待队列而直接获取锁,并且不管是否在他的前面已经有其它线程在等待着获取该锁,这就是“不公平”锁的原因之一。原因之二是它们的调用 acquire(1); 都是在 AQS 中,都分别调用了子类中的tryAcquire,而NonfairSync.tryAcquire 和 FairSync.tryAcquire实现又不同:

    public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}

这里含义是:tryAcquire(arg)尝试去获得锁,并且调用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),将该申请锁的线程插入FIFO等待队列。而NonfairSync.tryAcquire的实现如下:

        protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}/*** Performs non-fair tryLock.  tryAcquire is implemented in* subclasses, but both need nonfair try for trylock method.*/final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}

而FairSync.tryAcquire的实现如下:

        /*** Fair version of tryAcquire.  Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}

可以明显看到公平锁的实现:

if (c == 0) {
                if (!hasQueuedPredecessors()

即使 c==0 ,也就是锁没有被占有,它也要调用hasQueuedPredecessors()去判断是否在自己前面已经有线程在等待队列上了,所以这里就是实现了FIFO的公平,先到的先获得锁。所以公平锁和非公平锁的实现在上面的两个对方是有区别的。

分析完了锁的公平和非公平的原因,我们再接着上面看如何实现加入FIFO队列,以及如何实现等待:

    public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}

tryAcquire(arg)刚才分析完了,我们再看addWaiter(Node.EXCLUSIVE):

    /*** Creates and enqueues node for current thread and given mode.** @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared* @return the new node*/private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}    /**     * Inserts node into queue, initializing if necessary. See picture above.     * @param node the node to insert     * @return node's predecessor     */    private Node enq(final Node node) {        for (;;) {            Node t = tail;            if (t == null) { // Must initialize                if (compareAndSetHead(new Node()))                    tail = head;            } else {                node.prev = t;                if (compareAndSetTail(t, node)) {                    t.next = node;                    return t;                }            }        }    }

很简单,就是构造一个Node节点,然后插入到等待队列的尾部。

再看acquireQueued(addWaiter(Node.EXCLUSIVE), arg):

    /*** Acquires in exclusive uninterruptible mode for thread already in* queue. Used by condition wait methods as well as acquire.** @param node the node* @param arg the acquire argument* @return {@code true} if interrupted while waiting*/final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}

这里就实现了在锁上的“阻塞”的功能。在一个死循环中,先判断Node是否是等待队列的头节点,如果是的话,然后调用tryAcquire(arg)去获得锁,然后就可以返回了,也就是获得锁成功了。如果Node不是头节点的话,线程就要被阻塞了:

    /*** Checks and updates status for a node that failed to acquire.* Returns true if thread should block. This is the main signal* control in all acquire loops.  Requires that pred == node.prev.** @param pred node's predecessor holding status* @param node the node* @return {@code true} if thread should block*/private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE.  Indicate that we* need a signal, but don't park yet.  Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}

该函数的功能是将Node的前驱节点的等待状态pred.waitStatus设置为SIGNAL。这样设置的原因是方便实现Node节点的“唤醒阻塞”(unpark)。设置成功之后调用:parkAndCheckInterrupt(); 开始被“阻塞”:

    /*** Convenience method to park and then check if interrupted** @return {@code true} if interrupted*/private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}

阻塞的实现利用了LockSupport类,而LockSupport类又使用了Unsafe:

    public static void park(Object blocker) {Thread t = Thread.currentThread();setBlocker(t, blocker);UNSAFE.park(false, 0L);setBlocker(t, null);}

setBlocker(t, blocker) 设置了当前线程被谁阻塞了。UNSAFE.park(false, 0L);实现阻塞:

 /*** Block current thread, returning when a balancing* <tt>unpark</tt> occurs, or a balancing <tt>unpark</tt> has* already occurred, or the thread is interrupted, or, if not* absolute and time is not zero, the given time nanoseconds have* elapsed, or if absolute, the given deadline in milliseconds* since Epoch has passed, or spuriously (i.e., returning for no* "reason"). Note: This operation is in the Unsafe class only* because <tt>unpark</tt> is, so it would be strange to place it* elsewhere.*/public native void park(boolean isAbsolute, long time);

park方法可以被 unpark 唤醒,超时也会被唤醒,中断也会被唤醒。

park方法被唤醒了之后,就会在上面那个死循环中,再次检查自己是否是 头结点:

            for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())interrupted = true;}

如果是头结点的话, 那么重新调用tryAcquire(arg)去获得锁,然后返回,表示获得锁成功了。到这里 ReentrantLock.lock()方法的实现算是分析完了。

2)ReentrantLock.unlock实现分析

    /*** Attempts to release this lock.** <p>If the current thread is the holder of this lock then the hold* count is decremented.  If the hold count is now zero then the lock* is released.  If the current thread is not the holder of this* lock then {@link IllegalMonitorStateException} is thrown.** @throws IllegalMonitorStateException if the current thread does not*         hold this lock*/public void unlock() {sync.release(1);}

    /*** Releases in exclusive mode.  Implemented by unblocking one or* more threads if {@link #tryRelease} returns true.* This method can be used to implement method {@link Lock#unlock}.** @param arg the release argument.  This value is conveyed to*        {@link #tryRelease} but is otherwise uninterpreted and*        can represent anything you like.* @return the value returned from {@link #tryRelease}*/public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}

再看 tryRelease:

        protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}

很简单,就是修改 sync 的属性status。如果stauts等于0了,就表示锁已经被释放了。于是就可以唤醒FIFO队列的头节点了,unparkSuccessor(head):

    /*** Wakes up node's successor, if one exists.** @param node the node*/private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling.  It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node.  But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}

这里 t.waitStatus <= 0 小于0的包括了 我们在调用shouldParkAfterFailedAcquire时 设置waitStatus 为SIGNAL,因为SIGNAL==-1,所以这里的LockSupport.unpark(s.thread)刚好唤醒了前面的 park().

所以lock() 和 unlock()方法也对应起来了。到这里ReentrantLock的lock和unlock方法分析完成。ReentrantLock的实现借助于AQS,而AQS有借助于LockSupport和Unsafe,以及volatile。ReentrantLock使用state表示锁被同一个线程获取了多少次,并且记录了锁的拥有者(线程)。可重入锁的可重入的原因就是因为记录了锁的拥有者和记录锁被获取的次数来实现的。另外锁的公平性的实现就是是否允许锁申请的插队。

Semaphore, CountDownLatch的实现相比ReentrantLock而言更加简单,实现方式也是大体相似的。

其实查看一些JDK关于并发的库,就可以知道:Java并发库的构建的基础基本就两个——Unsafe和volatile,前者保证“原子性”,后者保证“可见性”

转载于:https://www.cnblogs.com/digdeep/p/4445128.html

Java并发基础框架AbstractQueuedSynchronizer初探(ReentrantLock的实现分析)相关推荐

  1. 构建Java并发模型框架

    2002 年 2 月 22 日 Java的多线程特性为构建高性能的应用提供了极大的方便,但是也带来了不少的麻烦.线程间同步.数据一致性等烦琐的问题需要细心的考虑,一不小心就会出现一些微妙的,难以调试的 ...

  2. Java并发基础(六) - 线程池

    Java并发基础(六) - 线程池 1. 概述 这里讲一下Java并发编程的线程池的原理及其实现 2. 线程池的基本用法 2.1 线程池的处理流程图 该图来自<Java并发编程的艺术>: ...

  3. java线程抢占式执行,Java并发基础(一)-线程基础

    原标题:Java并发基础(一)-线程基础 只要涉及到线程,其运行结果就是不确定的,虽然说java很早就提供了线程以及并发的支持,但是我们需要知道,线程是完全交给调度器的.有很多同学在编写书上的代码时, ...

  4. java并发编程——九 AbstractQueuedSynchronizer AQS详解

    文章目录 AbstractQueuedSynchronizer概述 AbstractQueuedSynchronizer的使用 AQS实现分析 同步队列 独占锁的获取与释放 独占式超时获取 共享式锁的 ...

  5. Java锁(二):AbstractQueuedSynchronizer、ReentrantLock详解

    一.AbstractQueuedSynchronizer简介 AQS(AbstractQueuedSynchronizer)是并发容器JUC(java.util.concurrent)下locks包内 ...

  6. Java并发编程:AbstractQueuedSynchronizer的内部结构

    2019独角兽企业重金招聘Python工程师标准>>> 一 前言 虽然已经有很多前辈已经分析过AbstractQueuedSynchronizer(简称AQS,也叫队列同步器)类,但 ...

  7. java并发编程之AbstractQueuedSynchronizer

    引言 AbstractQueuedSynchronizer,队列同步器,简称AQS,它是java并发用来构建锁或者其他同步组件的基础框架. 一般使用AQS的主要方式是继承,子类通过实现它提供的抽象方法 ...

  8. Java并发基础学习(八)——好好聊聊死锁

    文章目录 前言 什么是死锁 发生死锁的条件 发生死锁的必要条件 如何定位修复和避免 如何定位 1.jstack 2.ThreadMXBean 如何修复 如何避免 活锁与饥饿 活锁 饥饿 一些面试题 总 ...

  9. Java并发基础构建模块简介

    在实际并发编程中,可以利用synchronized来同步线程对于共享对象的访问,用户需要显示的定义synchronized代码块或者方法.为了加快开发,可以使用Java平台一些并发基础模块来开发. 注 ...

最新文章

  1. Unity3d 网络编程(一)(Unity3d内建网络Network介绍)
  2. ios5 ARC机制介绍和使用
  3. python背景颜色词云图_python中实现词云图
  4. 如何在O(1)的时间里删除单链表的结点
  5. C++类的使用(四)—— 继承
  6. DIV中文字不换行解决办法
  7. 收藏 | 神经网络中,设计loss function有哪些技巧?
  8. Verilog HDL常用循环语句类型
  9. android 串口通信丢包,新手求教为什么串口接收数据总丢包
  10. 车牌颜色识别现在的结果
  11. Swift - 05 - 数值型字面量
  12. oracle 关键字_oracle常见报错之无法验证 (约束) - 未找到父项关键字解决办法
  13. 黑马程序员,黑马论坛---云2期,已就业45人,平均薪水6806【8月23日更新】
  14. 【JY】2021全国首届工程仿真大赛154个项目视频教程分享
  15. 认知升级三部曲(深度好文)
  16. minIO如何设置直接通过访问链接在浏览器中打开文件
  17. 机房收费系统(二)之下机退卡
  18. 相对舒适的爬虫入门系列(一):手快尝鲜【requests库】
  19. 如何选择适合你的兴趣爱好(四十九),现代舞
  20. 如何使用 COMSOL 进行电热分析?

热门文章

  1. JNDI 笔记(一) 概述
  2. Sublime Text开发Quick-Cocos2d-x环境搭建(Mac)
  3. JavaScript权威设计--CSS(简要学习笔记十六)
  4. iOS 获取键盘相关信息
  5. leetcode算法题--Restore IP Addresses
  6. leetcode算法题--替换空格
  7. lamp 独立mysql_lamp or lnmp 环境搭建之独立安装mysql数据库
  8. mac 下系统目录权限问题
  9. 【ZooKeeper Notes 9】ZooKeepr日志清理
  10. 带你了解超大规模数据中心究竟有何不同?