4 显示锁和AQS

4.1 Lock接口

核心方法

Java在java.util.concurrent.locks包中提供了一系列的显示锁类,其中最基础的就是Lock接口,该接口提供了几个常见的锁相关的操作。

public interface Lock {

void lock();

void lockInterruptibly() throws InterruptedException;

boolean tryLock();

boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

void unlock();

Condition newCondition();

}

复制代码

下面分别进行介绍:

void lock();

获取锁。如果锁不可用,出于线程调度目的,将禁用当前线程,并且在获得锁之前,该线程将一直处于休眠状态。

void lockInterruptibly();

如果当前线程未被中断,则获取锁。如果锁可用,则获取锁,并立即返回。与lock()接口唯一的区别是可以被中断。

boolean tryLock();

试图获取锁,若锁可用,则获取锁,并立即返回值true。若锁不可用,则此方法将立即返回值false。

boolean tryLock(long time, TimeUnit unit) throws

与上个方法不同的就是给定了超时时间,若锁在给定的等待时间内空闲,并且当前线程未被中断,则获取锁。

Condition newCondition();

返回绑定到此 Lock 实例的新 Condition 实例。

使用模板

通常使用显示锁Lock时,会采用下面的操作流程:

lock.lock();

try {

//...需要保证线程安全的代码。

} finally {

lock.unlock();

}

复制代码

Lock的lock()方法保证了只有一个线程能够执有此锁。对于任何一个lock()方法,都需要一个unlock()方法与之对应,通常情况下为了保证unlock()方法总是能够执行,unlock()方法被置于finally中。

Lock VS synchronized

Synchronized是Java的关键字,当它用来修饰一个方法或一个代码块时,能够保证在同一时刻最多只有一个线程执行该代码。因为当调用Synchronized修饰的代码时,并不需要显示的加锁和解锁的过程,代码简洁,一般称之为隐式锁。

Lock是一个接口,提供了无条件的、可轮询的、定时的、可中断的锁获取操作,所有的加锁和解锁操作方法都是显示的,因而称为显示锁。

4.2 ReentrantLock

可重入锁ReentrantLock是对Lock接口的一种实现,支持当一个线程获取锁以后,可以再次得到该对象锁。

ReentrantLock在初始化时,需要设定该锁的公平性:

如果在时间上,先对锁进行获取的请求,一定先被满足,这个锁就是公平的,不满足,就是非公平的

非公平的效率一般来讲更高

ReentrantLock的特性如下:

1. 可重入

synchronized和ReentrantLock均有可重入性,即一个线程请求得到一个对象锁后再次请求此对象锁,可以再次得到该对象锁。

在使用synchronized时,当一个线程已经进入到synchronized方法/块中时,可以进入到本类的其他synchronized方法/块中。

2. 可中断

在lockInterruptibly()锁定的同时,还可以响应中断通知。一旦接收到中断通知,就会抛出InterruptedException异常。

这点与synchronized不同,在synchronized加锁的代码中,无法获取中断通知。

3. 可设置超时

ReentrantLock.tryLock()方法用于尝试锁定。参数为等待时间。该方法返回boolean值。若锁定成功,则返回true。锁定失败,则返回false。tryLock方法在超时不能获得锁时,就返回false,不会永久等待构成死锁。

4. 公平锁

ReentrantLock内部利用AQS的线程队列,可以实现公平锁,但是性能相比非公平锁会差一点。

在构造方法中,ReentrantLock(boolean fair),fair默认为false,当设置为true时,及表示当前构造的锁是公平锁。

当需要可定时的、可轮询的与可中断的锁获取操作,公平队列,或者非块结构的锁,建议使用ReentrantLock。否则,请使用synchronized。在Java 1.6之后,ReentrantLock和synchronized性能相差不大,所以一般情况下,使用synchronized就足够了,只有当有特定需求时,可以使用可重入锁。

4.3 Lock与Condition实现消息传递

利用Lock和Condition可以实现消息的等待和通知,这里我们利用ReentrantLock来进行举例。

注意在使用condition时,需要首先lock.newCondition来获取Condition对象,如果有多个条件,需要针对不同的条件来获取condition。

发送信号,调用condition.signal()方法;等待,调用condition.await()方法。

注意与notify与wait的区别,后者Object的方法,一般用在一个对象上进行等待,等待的线程和某个特定的对象绑定。当需要notify所有线程时,为了保证我们的消息被所有线程接收到,通常使用notifyAll发送消息。但是使用condition对象,await和signal操作都是在condition对象是进行的,所以使用signal通知时,不会存在等待其他消息的线程阻止消息传递,所以通常使用signal而不是signalAll。

public class ExpressCond {

public final static String CITY = "ShangHai";

private int km;/*快递运输里程数*/

private String site;/*快递到达地点*/

private Lock lock = new ReentrantLock();

private Condition keCond = lock.newCondition();

private Condition siteCond = lock.newCondition();

public ExpressCond() {

}

public ExpressCond(int km, String site) {

this.km = km;

this.site = site;

}

/* 变化公里数,然后通知处于wait状态并需要处理公里数的线程进行业务处理*/

public void changeKm(){

lock.lock();

try {

this.km = 101;

keCond.signal();

}finally {

lock.unlock();

}

}

/* 变化地点,然后通知处于wait状态并需要处理地点的线程进行业务处理*/

public void changeSite(){

lock.lock();

try {

this.site = "BeiJing";

siteCond.signal();

}finally {

lock.unlock();

}

}

/*当快递的里程数大于100时更新数据库*/

public void waitKm(){

lock.lock();

try {

while(this.km<=100) {

try {

keCond.await();

System.out.println("check km thread["+Thread.currentThread().getId()

+"] is be notifed.");

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}finally {

lock.unlock();

}

System.out.println("the Km is "+this.km+",I will change db");

}

/*当快递到达目的地时通知用户*/

public void waitSite(){

lock.lock();

try {

while(CITY.equals(this.site)) {

try {

siteCond.await();

System.out.println("check site thread["+Thread.currentThread().getId()

+"] is be notifed.");

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}finally {

lock.unlock();

}

System.out.println("the site is "+this.site+",I will call user");

}

}

复制代码

下面是测试函数,将会唤醒一个等待km变化的线程。

public class TestCond {

private static ExpressCond express = new ExpressCond(0,ExpressCond.CITY);

/*检查里程数变化的线程,不满足条件,线程一直等待*/

private static class CheckKm extends Thread{

@Override

public void run() {

express.waitKm();

}

}

/*检查地点变化的线程,不满足条件,线程一直等待*/

private static class CheckSite extends Thread{

@Override

public void run() {

express.waitSite();

}

}

public static void main(String[] args) throws InterruptedException {

for(int i=0;i<3;i++){

new CheckSite().start();

}

for(int i=0;i<3;i++){

new CheckKm().start();

}

Thread.sleep(1000);

express.changeKm();//快递里程变化

}

}

复制代码

4.4 ReadWriteLock 和 ReentrantReadWriteLock

ReadWriteLock接口提供了单独的读锁和写锁,

public interface ReadWriteLock {

Lock readLock();

Lock writeLock();

}

复制代码

ReentrantReadWriteLock类是ReadWriteLock接口的一个实现,它与ReentrantLock类一样提供了公平竞争与不公平竞争两种机制,默认也是使用非公平竞争机制。

ReentrantReadWriteLock的可以被多个读者访问和一个写者访问,提供了读写分离功能:

读-读不互斥:读读之间不阻塞。

读-写互斥:读阻塞写,写也会阻塞读。

写-写互斥:写写阻塞。

ReentrantReadWriteLock在读多写少的场景下,具有很强的性能优势。

WriteLock VS ReadLock

1.重入方面其内部的WriteLock可以获取ReadLock,但是反过来ReadLock无法获得WriteLock。

2.WriteLock可以降级为ReadLock,顺序是:先获得WriteLock再获得ReadLock,然后释放WriteLock,这时候线程将保持Readlock的持有。反过来ReadLock想要升级为WriteLock则不可能。

4.不管是ReadLock还是WriteLock都支持Interrupt,语义与ReentrantLock一致。

5.WriteLock支持Condition并且与ReentrantLock语义一致,而ReadLock则不能使用Condition,否则抛出UnsupportedOperationException异常。

ReentrantLock VS ReentrantReadWriteLock

ReentrantLock是排他锁,使用非公平竞争机制时,抢占的机会相对还是比较少的,只有当新请求恰逢锁释放时才有机会抢占,所以发生线程饥饿的现象几乎很少。

ReentrantReadWriteLock是共享锁,或者说读读共享,并且经常使用于读多写少的场景,即请求读操作的线程多而频繁而请求写操作的线程极少且间隔长,在这种场景下,使用非公平竞争机制极有可能造成写线程饥饿。比如,R1线程此时持有读锁且在进行读取操作,W1线程请求写锁所以需要排队等候,在R1释放锁之前,如果R2,R3,...,Rn 不断的到来请求读锁,因为读读共享,所以他们不用等待马上可以获得锁,如此下去W1永远无法获得写锁,一直处于饥饿状态。

参考链接:

4.5 LockSupport

LockSupport是一个方便的线程阻塞工具,它可以在线程的任何位置让线程阻塞。与Thread.suspend()方法相比,它弥补了由于resume()方法导致线程无法继续执行的情况。和Object.wait()方法相比,它不需要先获得某个对象的锁,也不会抛出InterruptedException异常。

LockSupport主要有两个方法,

LockSupport.park()

park()方法会阻塞当前线程(线程进入Waiting状态),除非它获取了"许可证"。

LockSupport.unpark(Thread t)

unpark(Thread t)方法会给线程t颁发一个"许可证"。

LockSupport使用了类似信号量的机制,它为每一个线程准备了一个许可,如果许可可用,park()方法会立刻返回,并且消费这个许可(也就是将许可变为不可用);如果许可不可用,就会阻塞,而unpack()方法则使得一个许可变为可用(但是和信号量不同的是,许可不可累加,永远只能拥有不超过一个许可)。

4.6 AQS

AQS:AbstractQueuedSynchronizer,即队列同步器。它是构建锁或者其他同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),是JUC并发包中的核心基础组件。JUC并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。

AQS解决了实现同步器时涉及到的大量细节问题,例如获取同步状态、FIFO同步队列。基于AQS来构建同步器可以带来很多好处。它不仅能够极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题。

AQS通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。

AQS使用了模板方法设计模式,子类通过继承同步器并实现它的抽象方法来管理同步状态。

AQS模板方法

AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了如下三个方法来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。

getState():返回同步状态的当前值;

setState(int newState):设置当前同步状态;

compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性;

独占式获取:

tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态

tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;

tryRelease(int arg):独占式释放同步状态;

acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;

acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;

isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占;

共享式获取:

tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败;

tryReleaseShared(int arg):共享式释放同步状态;

acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;

acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;

tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;

独占式释放锁:

release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;

共享式释放锁:

releaseShared(int arg):共享式释放同步状态;

当在实现自己的lock类时,需要子类覆盖如下方法,

独占式获取 tryAcquire

独占式释放 tryRelease

共享式获取 tryAcquireShared

共享式释放 tryReleaseShared

这个同步器是否处于独占模式 isHeldExclusively

CLH同步队列

CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程以及等待状态等信息打包成一个节点(Node),并将其加入到CLH同步队列,同时会阻塞当前线程。当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。

在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),

CANCELLED,值为1 。场景:当该线程等待超时或者被中断,需要从同步队列中取消等待,则该线程被置1,即被取消(这里该线程在取消之前是等待状态)。节点进入了取消状态则不再变化;

SIGNAL,值为-1。场景:后继的节点处于等待状态,当前节点的线程如果释放了同步状态或者被取消(当前节点状态置为-1),将会通知后继节点,使后继节点的线程得以运行;

CONDITION,值为-2。场景:节点处于等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点从等待队列中转移到同步队列中,加入到对同步状态的获取中;

PROPAGATE,值为-3。场景:表示下一次的共享状态会被无条件的传播下去;

INITIAL,值为0,初始状态。

其定义如下:

static final class Node {

/** 共享 */

static final Node SHARED = new Node();

/** 独占 */

static final Node EXCLUSIVE = null;

/**

* 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态;

*/

static final int CANCELLED = 1;

/**

* 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行

*/

static final int SIGNAL = -1;

/**

* 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,该节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中

*/

static final int CONDITION = -2;

/**

* 表示下一次共享式同步状态获取将会无条件地传播下去

*/

static final int PROPAGATE = -3;

/** 等待状态 */

volatile int waitStatus;

/** 前驱节点 */

volatile Node prev;

/** 后继节点 */

volatile Node next;

/** 获取同步状态的线程 */

volatile Thread thread;

Node nextWaiter;

final boolean isShared() {

return nextWaiter == SHARED;

}

final Node predecessor() throws NullPointerException {

Node p = prev;

if (p == null)

throw new NullPointerException();

else

return p;

}

Node() {

}

Node(Thread thread, Node mode) {

this.nextWaiter = mode;

this.thread = thread;

}

Node(Thread thread, int waitStatus) {

this.waitStatus = waitStatus;

this.thread = thread;

}

}

复制代码

对于CLH同步队列,一般有如下几种操作:

1. 节点加入到同步队列

队列的主要变化是tail指向新节点、新节点的prev指向当前最后的节点,当前最后一个节点的next指向当前节点。

整个流程图如下:

具体实现可以查看addWaiter(Node node)方法:

private Node addWaiter(Node mode) {

//新建Node

Node node = new Node(Thread.currentThread(), mode);

//快速尝试添加尾节点

Node pred = tail;

if (pred != null) {

node.prev = pred;

//CAS设置尾节点

if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

//多次尝试

enq(node);

return node;

}

复制代码

addWaiter(Node node)先通过快速尝试设置尾节点,如果失败,则调用enq(Node node)方法设置尾节点

private Node enq(final Node node) {

//多次尝试,直到成功为止

for (;;) {

Node t = tail;

//tail不存在,设置为首节点

if (t == null) {

if (compareAndSetHead(new Node()))

tail = head;

} else {

//设置为尾节点

node.prev = t;

if (compareAndSetTail(t, node)) {

t.next = node;

return t;

}

}

}

}

复制代码

两个方法都是通过一个CAS方法compareAndSetTail(Node expect, Node update)来设置尾节点,该方法可以确保节点是线程安全添加的。在enq(Node node)方法中,AQS通过“死循环”的方式来保证节点可以正确添加,只有成功添加后,当前线程才会从该方法返回,否则会一直执行下去。

2. 首节点移出同步队列

首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点,这个过程非常简单,head执行该节点并断开原首节点的next和当前节点的prev即可,注意在这个过程是不需要使用CAS来保证的,因为只有一个线程能够成功获取到同步状态。

流程图如下:

独占式同步状态获取与释放

AQS提供了acquire(int arg)方法来进行独占式同步状态获取,实现如下:

public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

复制代码

其中相关函数的定义为:

tryAcquire:去尝试获取锁,获取成功则设置锁状态并返回true,否则返回false。该方法自定义同步组件自己实现,该方法必须要保证线程安全的获取同步状态。

addWaiter:如果tryAcquire返回FALSE(获取同步状态失败),则调用该方法将当前线程加入到CLH同步队列尾部。

acquireQueued:当前线程会根据公平性原则来进行阻塞等待(自旋),直到获取锁为止;并且返回当前线程在等待过程中有没有中断过。

selfInterrupt:产生一个中断。

acquireQueued方法为一个自旋的过程,当前线程(Node)进入同步队列后,就会进入一个自旋的过程,当条件满足,获取到同步状态后,就可以从这个自旋过程中退出,否则会一直执行下去。

final boolean acquireQueued(final Node node, int arg) {

boolean failed = true;

try {

//中断标志

boolean interrupted = false;

/*

* 自旋过程,其实就是一个死循环而已

*/

for (;;) {

//当前线程的前驱节点

final Node p = node.predecessor();

//当前线程的前驱节点是头结点,且同步状态成功

if (p == head && tryAcquire(arg)) {

setHead(node);

p.next = null; // help GC

failed = false;

return interrupted;

}

//获取失败,线程等待

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} finally {

if (failed)

cancelAcquire(node);

}

}

复制代码

当前线程会一直尝试获取同步状态,当然前提是只有其前驱节点为头结点才能够尝试获取同步状态,主要是为了保持FIFO同步队列原则。头节点释放同步状态后,将会唤醒其后继节点,后继节点被唤醒后需要检查自己是否为头节点。

在上面的流程中,当获取失败时,需要判断是否阻塞当前线程,

if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())

interrupted = true;

复制代码

在获取同步状态失败后,线程并不是立马进行阻塞,需要检查该线程的状态,检查状态的方法为 shouldParkAfterFailedAcquire(Node pred, Node node) 方法,该方法主要靠前驱节点判断当前线程是否应该被阻塞,代码如下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

//前驱节点

int ws = pred.waitStatus;

//状态为signal,表示当前线程处于等待状态,直接放回true

if (ws == Node.SIGNAL)

return true;

//前驱节点状态 > 0 ,则为Cancelled,表明该节点已经超时或者被中断了,需要从同步队列中取消

if (ws > 0) {

do {

node.prev = pred = pred.prev;

} while (pred.waitStatus > 0);

pred.next = node;

}

//前驱节点状态为Condition、propagate

else {

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

}

return false;

}

复制代码

这段代码主要检查当前线程是否需要被阻塞,具体规则如下:

如果当前线程的前驱节点状态为SIGNAL,则表明当前线程需要被阻塞,直接返回true,当前线程阻塞

如果当前线程的前驱节点状态为CANCELLED(ws > 0),则表明该线程的前驱节点已经等待超时或者被中断了,则需要从CLH队列中将该前驱节点删除掉,直到回溯到前驱节点状态 <= 0 ,返回false

如果前驱节点非SIGNAL,非CANCELLED,则通过CAS的方式将其前驱节点设置为SIGNAL,返回false

如果 shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回true,则调用parkAndCheckInterrupt()方法阻塞当前线程:

private final boolean parkAndCheckInterrupt() {

LockSupport.park(this);

return Thread.interrupted();

}

复制代码

parkAndCheckInterrupt() 方法主要是把当前线程挂起,从而阻塞住线程的调用栈,同时返回当前线程的中断状态。其内部则是调用LockSupport工具类的park()方法来阻塞该方法。

当线程释放同步状态后,则需要唤醒该线程的后继节点:

public final boolean release(int arg) {

if (tryRelease(arg)) {

Node h = head;

if (h != null && h.waitStatus != 0)

//唤醒后继节点

unparkSuccessor(h);

return true;

}

return false;

}

复制代码

调用unparkSuccessor(Node node)唤醒后继节点:

private void unparkSuccessor(Node node) {

//当前节点状态

int ws = node.waitStatus;

//当前状态 < 0 则设置为 0

if (ws < 0)

compareAndSetWaitStatus(node, ws, 0);

//当前节点的后继节点

Node s = node.next;

//后继节点为null或者其状态 > 0 (超时或者被中断了)

if (s == null || s.waitStatus > 0) {

s = null;

//从tail节点来找可用节点

for (Node t = tail; t != null && t != node; t = t.prev)

if (t.waitStatus <= 0)

s = t;

}

//唤醒后继节点

if (s != null)

LockSupport.unpark(s.thread);

}

复制代码

可能会存在当前线程的后继节点为null,超时、被中断的情况,如果遇到这种情况了,则需要跳过该节点,但是为何是从tail尾节点开始,而不是从node.next开始呢?原因在于node.next仍然可能会存在null或者取消了,所以采用tail回溯办法找第一个可用的线程。最后调用LockSupport的unpark(Thread thread)方法唤醒该线程。

以上就是整个独占式获取和释放的过程,流程图如下:

独占式获取响应中断

AQS提供了acquire(int arg)方法以供独占式获取同步状态,但是该方法对中断不响应,对线程进行中断操作后,该线程会依然位于CLH同步队列中等待着获取同步状态。为了响应中断,AQS提供了acquireInterruptibly(int arg)方法,该方法在等待获取同步状态时,如果当前线程被中断了,会立刻响应中断抛出异常InterruptedException。

public final void acquireInterruptibly(int arg)

throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

if (!tryAcquire(arg))

doAcquireInterruptibly(arg);

}

复制代码

首先校验该线程是否已经中断了,如果是则抛出InterruptedException,否则执行tryAcquire(int arg)方法获取同步状态,如果获取成功,则直接返回,否则执行doAcquireInterruptibly(int arg)。doAcquireInterruptibly(int arg)定义如下:

private void doAcquireInterruptibly(int arg)

throws InterruptedException {

final Node node = addWaiter(Node.EXCLUSIVE);

boolean failed = true;

try {

for (;;) {

final Node p = node.predecessor();

if (p == head && tryAcquire(arg)) {

setHead(node);

p.next = null; // help GC

failed = false;

return;

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

throw new InterruptedException();

}

} finally {

if (failed)

cancelAcquire(node);

}

}

复制代码

doAcquireInterruptibly(int arg)方法与acquire(int arg)方法仅有两个差别。

1.方法声明抛出InterruptedException异常。

2.在中断方法处不再是使用interrupted标志,而是直接抛出InterruptedException异常。

独占式超时获取

AQS除了提供上面两个方法外,还提供了一个增强版的方法:tryAcquireNanos(int arg,long nanos)。该方法为acquireInterruptibly方法的进一步增强,它除了响应中断外,还有超时控制。即如果当前线程没有在指定时间内获取同步状态,则会返回false,否则返回true。

共享式同步状态获取与释放

AQS提供acquireShared(int arg)方法共享式获取同步状态:

public final void acquireShared(int arg) {

if (tryAcquireShared(arg) < 0)

//获取失败,自旋获取同步状态

doAcquireShared(arg);

}

复制代码

从上面程序可以看出,方法首先是调用tryAcquireShared(int arg)方法尝试获取同步状态,如果获取失败则调用doAcquireShared(int arg)自旋方式获取同步状态,共享式获取同步状态的标志是返回 >= 0 的值表示获取成功。

private void doAcquireShared(int arg) {

/共享式节点

final Node node = addWaiter(Node.SHARED);

boolean failed = true;

try {

boolean interrupted = false;

for (;;) {

//前驱节点

final Node p = node.predecessor();

//如果其前驱节点,获取同步状态

if (p == head) {

//尝试获取同步

int r = tryAcquireShared(arg);

if (r >= 0) {

setHeadAndPropagate(node, r);

p.next = null; // help GC

if (interrupted)

selfInterrupt();

failed = false;

return;

}

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} finally {

if (failed)

cancelAcquire(node);

}

}

复制代码

tryAcquireShared(int arg)方法尝试获取同步状态,返回值为int,当其 >= 0 时,表示能够获取到同步状态,这个时候就可以从自旋过程中退出。

默认AQS没有提供tryAcquireShared的实现,需要子类自己实现该方法,

protected int tryAcquireShared(int arg) {

throw new UnsupportedOperationException();

}

复制代码

注意到独占式获取锁不同的是,如果tryAcquireShared的返回值大于0,会进行setHeadAndPropagate的操作,下面是该方法的实现,可以看到当某个节点被设置为head之后,如果它的后继节点是SHARED状态的,那么将继续通过doReleaseShared方法尝试往后唤醒节点,实现了共享状态的向后传播。doReleaseShared后面会仔细分析。

private void setHeadAndPropagate(Node node, int propagate) {

Node h = head; // Record old head for check below

setHead(node);

/*

* Try to signal next queued node if:

* Propagation was indicated by caller,

* or was recorded (as h.waitStatus either before

* or after setHead) by a previous operation

* (note: this uses sign-check of waitStatus because

* PROPAGATE status may transition to SIGNAL.)

* and

* The next node is waiting in shared mode,

* or we don't know, because it appears null

*

* The conservatism in both of these checks may cause

* unnecessary wake-ups, but only when there are multiple

* racing acquires/releases, so most need signals now or soon

* anyway.

*/

if (propagate > 0 || h == null || h.waitStatus < 0 ||

(h = head) == null || h.waitStatus < 0) {

Node s = node.next;

if (s == null || s.isShared())

doReleaseShared();

}

}复制代码

获取同步状态后,完成相应的任务之后,需要调用release(int arg)方法释放同步状态,方法如下:

public final boolean releaseShared(int arg) {

if (tryReleaseShared(arg)) {

doReleaseShared();

return true;

}

return false;

}

复制代码

在doReleaseShared中,如果头节点的状态为SIGNAL,则通过CAS将头节点的状态设置为0,并且唤醒后续阻塞的线程;接着再通过CAS设置节点的状态为Node.PROPAGATE。

private void doReleaseShared() {

/*

* Ensure that a release propagates, even if there are other

* in-progress acquires/releases. This proceeds in the usual

* way of trying to unparkSuccessor of head if it needs

* signal. But if it does not, status is set to PROPAGATE to

* ensure that upon release, propagation continues.

* Additionally, we must loop in case a new node is added

* while we are doing this. Also, unlike other uses of

* unparkSuccessor, we need to know if CAS to reset status

* fails, if so rechecking.

*/

for (;;) {

Node h = head;

if (h != null && h != tail) {

int ws = h.waitStatus;

if (ws == Node.SIGNAL) {

if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))

continue; // loop to recheck cases

unparkSuccessor(h);

}

else if (ws == 0 &&

!h.compareAndSetWaitStatus(0, Node.PROPAGATE))

continue; // loop on failed CAS

}

if (h == head) // loop if head changed

break;

}

}

复制代码关于doReleaseShared的几点分析:

调用该方法的线程可能有很多:在共享锁中,持有共享锁的线程可以有多个,这些线程都可以调用releaseShared方法释放锁;而这些线程想要获得共享锁,则它们必然曾经成为过头节点,或者就是现在的头节点。因此,如果是在releaseShared方法中调用的doReleaseShared,可能此时调用方法的线程已经不是头节点所代表的线程了,头节点可能已经被易主好几次了。

调用该方法的目的:无论是在acquireShared中调用,还是在releaseShared方法中调用,该方法的目的都是在当前共享锁是可获取的状态时,唤醒head节点的下一个节点。这一点看上去和独占锁似乎一样,但是它们的一个重要的差别是——在共享锁中,当头节点发生变化时,是会回到循环中再立即唤醒head节点的下一个节点的。也就是说,在当前节点完成唤醒后继节点的任务之后将要退出时,如果发现被唤醒后继节点已经成为了新的头节点,则会立即触发唤醒head节点的下一个节点的操作,如此周而复始。

只有在当前head没有易主时,才会退出,否则继续循环。因为当前可能有多个线程在队列中,比如A -> B -> C -> D, 如果A唤醒B,则B成为新的头节点,接着B会调用doReleaseShared去唤醒C,此时A线程中的head变成了C,因此也加入到了唤醒D的队伍中,此时可能出现A、B、C同时唤醒D的情况,提高了系统效率。当队列中的所有线程都唤醒之后,此时程序退出。

参考:

Condition实现

在之前的例子中,使用Condition和Lock实现了消息的等待和通知,这节介绍Condiition在AQS中的实现。

JDK的Object对象提供了wait/notify的机制,也能实现消息的等待与通知,Condition与之的差别主要体现在以下几点:

调用wait方法的线程首先必须是已经进入了同步代码块,即已经获取了监视器锁;与之类似,调用await方法的线程首先必须获得lock锁。

调用wait方法的线程会释放已经获得的监视器锁,进入当前监视器锁的等待队列(wait set)中;与之类似,调用await方法的线程会释放已经获得的lock锁,进入到当前Condtion对应的条件队列中。

调用监视器锁的notify方法会唤醒等待在该监视器锁上的线程,这些线程将开始参与锁竞争,并在获得锁后,从wait方法处恢复执行;与之类似,调用Condtion的signal方法会唤醒对应的条件队列中的线程,这些线程将开始参与锁竞争,并在获得锁后,从await方法处开始恢复执行。

在AQS的Condition实现中,和独占锁的争夺类似的是,每创建一个Condtion对象就会对应一个Condtion队列,每一个调用了Condtion对象的await方法的线程都会被包装成Node扔进一个条件队列中,就像这样:

同样的,在Condition中也会用到之前介绍的同步队列,当等待队列中的节点获得信号通知时,会将等待队列的节点移到同步队列。

以下是await时节点的变化,

以下是signal信号发出时节点的变化,

Condition的整个await/signal流程如下:

1、Condition提供了await()方法将当前线程阻塞,并提供signal()方法支持另外一个线程将已经阻塞的线程唤醒。

2、Condition需要结合Lock使用

3、线程调用await()方法前必须获取锁,调用await()方法时,将线程构造成节点加入等待队列,同时释放锁,并挂起当前线程

4、其他线程调用signal()方法前也必须获取锁,当执行signal()方法时将等待队列的节点移入到同步队列,当线程退出临界区释放锁的时候,唤醒同步队列的首个节点

下面结合源代码进行分析:

await实现

调用await阻塞当前线程

public final void await() throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

//将当前线程封装成Node加入到等待队列尾部

Node node = addConditionWaiter();

//释放锁

int savedState = fullyRelease(node);

int interruptMode = 0;

//判断当前节点是否已经在同步队列中,如果是则退出循环,如果不是就阻塞当前线程

//其他线程如果发出了signal信号之后,会把等待队列的线程移入同步队列,此时就会退出循环,进入下面的重新获取锁的acquireQueued

while (!isOnSyncQueue(node)) {

LockSupport.park(this);

if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

break;

}

//其他发出signal信号的线程释放锁之后,该线程被唤醒并重新竞争锁

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

interruptMode = REINTERRUPT;

if (node.nextWaiter != null) // clean up if cancelled

unlinkCancelledWaiters();

if (interruptMode != 0)

reportInterruptAfterWait(interruptMode);

}

//线程加入等待队列尾部

private Node addConditionWaiter() {

Node t = lastWaiter;

// If lastWaiter is cancelled, clean out.

if (t != null && t.waitStatus != Node.CONDITION) {//清除cancell态的节点

unlinkCancelledWaiters();

t = lastWaiter;//t指向最后一个状态正确的节点

}

Node node = new Node(Thread.currentThread(), Node.CONDITION);

if (t == null)//列表为空,初始化为第一个节点

firstWaiter = node;

else

t.nextWaiter = node;

lastWaiter = node;

return node;

}

复制代码

signal/signalAll实现

将等待队列的节点移入同步队列(signalAll只是循环执行signal而已)

private void doSignal(Node first) {

do {

if ( (firstWaiter = first.nextWaiter) == null)

lastWaiter = null;

first.nextWaiter = null;//得到firstWaiter

} while (!transferForSignal(first) &&

(first = firstWaiter) != null);

}

//将节点从等待队列移入同步队列

final boolean transferForSignal(Node node) {

/*

* If cannot change waitStatus, the node has been cancelled.

*/

if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))

return false;//cas节点状态错误,说明已经cancell了,直接返回false

/*

* Splice onto queue and try to set waitStatus of predecessor to

* indicate that thread is (probably) waiting. If cancelled or

* attempt to set waitStatus fails, wake up to resync (in which

* case the waitStatus can be transiently and harmlessly wrong).

*/

Node p = enq(node);//加入同步队列

int ws = p.waitStatus;

//设置前置节点状态为signal,可重入锁那篇文章分析过,为了唤醒线程而设置

if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))

LockSupport.unpark(node.thread);//特殊情况下唤醒线程并重新同步,一般情况下这里不会执行

return true;

}

复制代码

参考:

ReentrantReadWriteLock实现

ReentrantReadWriteLock在内部也是利用了AQS进行锁的竞争与释放,同时也实现了ReadWriteLock接口。

为了同时保存读锁和写锁的状态,在内部用一个int保存读和写的状态。读状态从高16位读出,写状态从低16位读出,在保证读写锁互斥的前提下,直接利用了AQS现有的数据结构。

static final int SHARED_SHIFT = 16;

//实际是65536

static final int SHARED_UNIT = (1 << SHARED_SHIFT);

//最大值 65535

static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;

// 同样是65535

static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** 获取读的状态 */

static int sharedCount(int c) { return c >>> SHARED_SHIFT; }

/** 获取写锁的获取状态 */

static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

复制代码

写锁为独占式的,因此读锁的获取和释放和AQS原生的实现一致。

读锁是共享式的,获取读锁的状态,并且加1.

final boolean tryReadLock() {

Thread current = Thread.currentThread();

for (;;) {

int c = getState();

if (exclusiveCount(c) != 0 &&

getExclusiveOwnerThread() != current)

return false; //写锁被其他线程获取了,直接返回false

int r = sharedCount(c); //获取读锁的状态

if (r == MAX_COUNT)

throw new Error("Maximum lock count exceeded");

if (compareAndSetState(c, c + SHARED_UNIT)) { //尝试获取读锁

if (r == 0) { //说明第一个获取到了读锁

firstReader = current; //标记下当前线程是第一个获取的

firstReaderHoldCount = 1; //重入次数

} else if (firstReader == current) {

firstReaderHoldCount++; //次数+1

} else {

//cachedHoldCounter 为缓存最后一个获取锁的线程

HoldCounter rh = cachedHoldCounter;

if (rh == null || rh.tid != getThreadId(current))

cachedHoldCounter = rh = readHolds.get(); //缓存最后一个获取锁的线程

else if (rh.count == 0)// 当前线程获取到了锁,但是重入次数为0,那么把当前线程存入进去

readHolds.set(rh);

rh.count++;

}

return true;

}

}

}

复制代码

读锁的释放:

protected final boolean tryReleaseShared(int unused) {

Thread current = Thread.currentThread();

if (firstReader == current) {

// assert firstReaderHoldCount > 0;

if (firstReaderHoldCount == 1)//如果是首次获取读锁,那么第一次获取读锁释放后就为空了

firstReader = null;

else

firstReaderHoldCount--;

} else {

HoldCounter rh = cachedHoldCounter;

if (rh == null || rh.tid != getThreadId(current))

rh = readHolds.get();

int count = rh.count;

if (count <= 1) { //表示全部释放完毕

readHolds.remove(); //释放完毕,那么久把保存的记录次数remove掉

if (count <= 0)

throw unmatchedUnlockException();

}

--rh.count;

}

for (;;) {

int c = getState();

// nextc 是 state 高 16 位减 1 后的值

int nextc = c - SHARED_UNIT;

if (compareAndSetState(c, nextc)) //CAS设置状态

return nextc == 0; //这个判断如果高 16 位减 1 后的值==0,那么就是读状态和写状态都释放了

}

}

复制代码

锁降级

锁降级算是获取读锁的特例,如在A线程已经获取写锁的情况下,再调取读锁加锁函数则可以直接获取读锁,但此时其他线程仍然无法获取读锁或写锁,在A线程释放写锁后,如果有节点等待则会唤醒后续节点,后续节点可见的状态为目前有A线程获取了读锁。

AQS实战-实现三元共享锁

下面的例子里,利用AQS实现了三元共享锁,也就是当前锁只能被三个线程获取。

public class TripleLock implements Lock {

//为3表示允许两个线程同时获得锁

private final Sync sync = new Sync(3);

private static final class Sync extends AbstractQueuedSynchronizer {

Sync(int count) {

if (count <= 0) {

throw new IllegalArgumentException("count must large than zero.");

}

setState(count);

}

public int tryAcquireShared(int reduceCount) {

for (;;) {

int current = getState();

int newCount = current - reduceCount;

if (newCount < 0 || compareAndSetState(current, newCount)) {

return newCount;

}

}

}

public boolean tryReleaseShared(int returnCount) {

for (;;) {

int current = getState();

int newCount = current + returnCount;

if (compareAndSetState(current, newCount)) {

return true;

}

}

}

final ConditionObject newCondition() {

return new ConditionObject();

}

}

@Override

public void lock() {

sync.acquireShared(1);

}

@Override

public void unlock() {

sync.releaseShared(1);

}

@Override

public void lockInterruptibly() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}

@Override

public boolean tryLock() {

return sync.tryAcquireShared(1) >= 0;

}

@Override

public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

return sync.tryAcquireSharedNanos(1, unit.toNanos(time));

}

@Override

public Condition newCondition() {

return sync.newCondition();

}

}

复制代码

测试程序中,主线程每隔一秒钟打印换行,工作线程直接打印当前的线程名,从结果可以看到,每一个时刻只有三个工作线程在同时运行。

public class testTripleLock {

public void test() {

final Lock lock = new TripleLock();

class Worker extends Thread {

public void run() {

lock.lock();

try {

System.out.println(Thread.currentThread().getName());

SleepTools.second(2);

} finally {

lock.unlock();

}

SleepTools.second(2);

}

}

// 启动10个子线程

for (int i = 0; i < 10; i++) {

Worker w = new Worker();

w.setDaemon(true);

w.start();

}

// 主线程每隔1秒换行

for (int i = 0; i < 10; i++) {

SleepTools.second(1);

System.out.println();

}

}

public static void main(String[] args) {

testTripleLock testMyLock = new testTripleLock();

testMyLock.test();

}

}

复制代码

本文由『后端精进之路』原创,首发于博客 teckee.github.io/ , 转载请注明出处

搜索『后端精进之路』关注公众号,立刻获取最新文章和价值2000元的BATJ精品面试课程。

java 可重入锁 clh_Java并发编程系列-(4) 显式锁与AQS相关推荐

  1. java 中的锁 aqs_Java并发编程系列-(4) 显式锁与AQS

    4 显示锁和AQS 4.1 Lock接口 核心方法 Java在java.util.concurrent.locks包中提供了一系列的显示锁类,其中最基础的就是Lock接口,该接口提供了几个常见的锁相关 ...

  2. java 显示锁_Java 实现一个自己的显式锁Lock(有超时功能)

    Lock接口 package concurency.chapter9; import java.util.Collection; public interface Lock { static clas ...

  3. java 显式锁_Java 实现一个自己的显式锁Lock(有超时功能)

    Lock接口 package concurency.chapter9; import java.util.Collection; public interface Lock { static clas ...

  4. 高并发编程系列:NIO、BIO、AIO的区别,及NIO的应用和框架选型

    谈到并发编程就不得不提到NIO,以及相关的Java NIO框架Netty等,并且在很多面试中也经常提到NIO和AIO.同步和异步.阻塞和非阻塞等的区别.我先简短介绍下几个NIO相关的概念,然后再谈NI ...

  5. java 内置锁_深入理解java内置锁(synchronized)和显式锁(ReentrantLock)

    synchronized 和 Reentrantlock 多线程编程中,当代码需要同步时我们会用到锁.Java为我们提供了内置锁(synchronized)和显式锁(ReentrantLock)两种同 ...

  6. java 变量锁_并发编程高频面试题:可重入锁+线程池+内存模型等(含答案)

    对于一个Java程序员而言,能否熟练掌握并发编程是判断他优秀与否的重要标准之一.因为并发编程是Java语言中最为晦涩的知识点,它涉及操作系统.内存.CPU.编程语言等多方面的基础能力,更为考验一个程序 ...

  7. 并发编程系列之五多线程synchronized是可重复加锁,重入锁

    并发编程系列之五多线程synchronized是可重复加锁,重入锁.对于重入锁的概念就是可以重复的加锁.. 示例1,在同一个类里面进行加锁,不同的方法调用,都一层一层的嵌套进行加锁,示例1演示重入锁的 ...

  8. reentrantlock非公平锁不会随机挂起线程?_【原创】Java并发编程系列16 | 公平锁与非公平锁...

    本文为何适原创并发编程系列第 16 篇,文末有本系列文章汇总. 上一篇提到重入锁 ReentrantLock 支持两种锁,公平锁与非公平锁.那么这篇文章就来介绍一下公平锁与非公平锁. 为什么需要公平锁 ...

  9. java投票锁_Java并发编程锁之独占公平锁与非公平锁比较

    Java并发编程锁之独占公平锁与非公平锁比较 公平锁和非公平锁理解: 在上一篇文章中,我们知道了非公平锁.其实Java中还存在着公平锁呢.公平二字怎么理解呢?和我们现实理解是一样的.大家去排队本着先来 ...

最新文章

  1. 尸鬼封尽の覆盖源码 管你什么鬼一招解决
  2. Git 2.20的重大更新:侧重可用性和性能
  3. 一文带你读懂Python的5大特点与8大应用方向!
  4. 【转】奴性哲学十句话,洗脑常用词!!!
  5. 数据结构--单向链表
  6. linux——系统日志的信息、采集、查看、保存
  7. dede站怎么在首页调用单页的内容?
  8. history模式 nginx配置_Vue history模式Nginx配置
  9. 导出excel 数据取一次合理还是分页取合理_一张报表模板替代数百张Excel表格,用它让报表工作更轻松...
  10. Netscreen ×××配置(一)---基于策略的点到点×××设置
  11. android荣耀v20圆角适配,AR新玩法 让荣耀V20成为你随身的尺子
  12. MySQL的自定义函数
  13. 第一个动态网页——留言板
  14. 技术系统进化法则包括_八大技术系统进化法则主要包括哪些
  15. csgo显示连接任何官方服务器失败,csgo连接任意官方服务器失败怎么办_csgo连接官方服务器失败解决方法...
  16. 【noi.ac #1759】ZYB的测验计划
  17. G6-Editor 编辑器入门使用教程
  18. 社招,文章很细节,大家看下我有多细
  19. 数组的应用和面向对象的开始6
  20. 微信“15。。。。。”背后的故事

热门文章

  1. liunx 双网卡同网段配置
  2. JavaScript进阶【二】JavaScript 严格模式(use strict)的使用
  3. 洛谷 P1137 旅行计划
  4. ASP.NET Core DI 手动获取注入对象
  5. C# 使用摄像头拍照 支持Win7 64位
  6. 开源,自由,免费, 商业,收费,共享. 这些都不矛盾. 细数网络发展中的免费与收费....
  7. 火狐中javascript
  8. HDU 1576 A/B(数论简单题,求逆元)
  9. 关于邮件服务器应用系统安全SSL ×××(强身份认证)方案
  10. gnome-mplayer 挂载 srt字幕 乱码