问题

(1)读写锁是什么?

(2)读写锁具有哪些特性?

(3)ReentrantReadWriteLock是怎么实现读写锁的?

(4)如何使用ReentrantReadWriteLock实现高效安全的TreeMap?

简介

读写锁是一种特殊的锁,它把对共享资源的访问分为读访问和写访问,多个线程可以同时对共享资源进行读访问,但是同一时间只能有一个线程对共享资源进行写访问,使用读写锁可以极大地提高并发量。

特性

读写锁具有以下特性:

是否互斥

可以看到,读写锁除了读读不互斥,读写、写读、写写都是互斥的。

那么,ReentrantReadWriteLock是怎么实现读写锁的呢?

类结构

在看源码之前,我们还是先来看一下ReentrantReadWriteLock这个类的主要结构。

ReentrantReadWriteLock中的类分成三个部分:

(1)ReentrantReadWriteLock本身实现了ReadWriteLock接口,这个接口只提供了两个方法readLock()writeLock()

(2)同步器,包含一个继承了AQS的Sync内部类,以及其两个子类FairSync和NonfairSync;

(3)ReadLock和WriteLock两个内部类实现了Lock接口,它们具有锁的一些特性。

源码分析

主要属性

// 读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
// 写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
// 同步器
final Sync sync;
复制代码

维护了读锁、写锁和同步器。

主要构造方法

// 默认构造方法
public ReentrantReadWriteLock() {this(false);
}
// 是否使用公平锁的构造方法
public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();readerLock = new ReadLock(this);writerLock = new WriteLock(this);
}
复制代码

它提供了两个构造方法,默认构造方法使用的是非公平锁模式,在构造方法中初始化了读锁和写锁。

获取读锁和写锁的方法

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
复制代码

属性中的读锁和写锁是私有属性,通过这两个方法暴露出去。

下面我们主要分析读锁和写锁的加锁、解锁方法,且都是基于非公平模式的。

ReadLock.lock()

// ReentrantReadWriteLock.ReadLock.lock()
public void lock() {sync.acquireShared(1);
}
// AbstractQueuedSynchronizer.acquireShared()
public final void acquireShared(int arg) {// 尝试获取共享锁(返回1表示成功,返回-1表示失败)if (tryAcquireShared(arg) < 0)// 失败了就可能要排队doAcquireShared(arg);
}
// ReentrantReadWriteLock.Sync.tryAcquireShared()
protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();// 状态变量的值// 在读写锁模式下,高16位存储的是共享锁(读锁)被获取的次数,低16位存储的是互斥锁(写锁)被获取的次数int c = getState();// 互斥锁的次数// 如果其它线程获得了写锁,直接返回-1if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;// 读锁被获取的次数int r = sharedCount(c);// 下面说明此时还没有写锁,尝试去更新state的值获取读锁// 读者是否需要排队(是否是公平模式)if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) {// 获取读锁成功if (r == 0) {// 如果之前还没有线程获取读锁// 记录第一个读者为当前线程firstReader = current;// 第一个读者重入的次数为1firstReaderHoldCount = 1;} else if (firstReader == current) {// 如果有线程获取了读锁且是当前线程是第一个读者// 则把其重入次数加1firstReaderHoldCount++;} else {// 如果有线程获取了读锁且当前线程不是第一个读者// 则从缓存中获取重入次数保存器HoldCounter rh = cachedHoldCounter;// 如果缓存不属性当前线程// 再从ThreadLocal中获取// readHolds本身是一个ThreadLocal,里面存储的是HoldCounterif (rh == null || rh.tid != getThreadId(current))// get()的时候会初始化rhcachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)// 如果rh的次数为0,把它放到ThreadLocal中去readHolds.set(rh);// 重入的次数加1(初始次数为0)rh.count++;}// 获取读锁成功,返回1return 1;}// 通过这个方法再去尝试获取读锁(如果之前其它线程获取了写锁,一样返回-1表示失败)return fullTryAcquireShared(current);
}
// AbstractQueuedSynchronizer.doAcquireShared()
private void doAcquireShared(int arg) {// 进入AQS的队列中final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {// 当前节点的前一个节点final Node p = node.predecessor();// 如果前一个节点是头节点(说明是第一个排队的节点)if (p == head) {// 再次尝试获取读锁int r = tryAcquireShared(arg);// 如果成功了if (r >= 0) {// 头节点后移并传播// 传播即唤醒后面连续的读节点setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}// 没获取到读锁,阻塞并等待被唤醒if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}
// AbstractQueuedSynchronizer.setHeadAndPropagate()
private void setHeadAndPropagate(Node node, int propagate) {// h为旧的头节点Node h = head;// 设置当前节点为新头节点setHead(node);// 如果旧的头节点或新的头节点为空或者其等待状态小于0(表示状态为SIGNAL/PROPAGATE)if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {// 需要传播// 取下一个节点Node s = node.next;// 如果下一个节点为空,或者是需要获取读锁的节点if (s == null || s.isShared())// 唤醒下一个节点doReleaseShared();}
}
// AbstractQueuedSynchronizer.doReleaseShared()
// 这个方法只会唤醒一个节点
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;// 如果头节点状态为SIGNAL,说明要唤醒下一个节点if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck cases// 唤醒下一个节点unparkSuccessor(h);}else if (ws == 0 &&// 把头节点的状态改为PROPAGATE成功才会跳到下面的if!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}// 如果唤醒后head没变,则跳出循环if (h == head)                   // loop if head changedbreak;}
}
复制代码

看完【死磕 java同步系列之ReentrantLock源码解析(一)——公平锁、非公平锁】的分析再看这章的内容应该会比较简单,中间一样的方法我们这里直接跳过了。

我们来看看大致的逻辑:

(1)先尝试获取读锁;

(2)如果成功了直接结束;

(3)如果失败了,进入doAcquireShared()方法;

(4)doAcquireShared()方法中首先会生成一个新节点并进入AQS队列中;

(5)如果头节点正好是当前节点的上一个节点,再次尝试获取锁;

(6)如果成功了,则设置头节点为新节点,并传播;

(7)传播即唤醒下一个读节点(如果下一个节点是读节点的话);

(8)如果头节点不是当前节点的上一个节点或者(5)失败,则阻塞当前线程等待被唤醒;

(9)唤醒之后继续走(5)的逻辑;

在整个逻辑中是在哪里连续唤醒读节点的呢?

答案是在doAcquireShared()方法中,在这里一个节点A获取了读锁后,会唤醒下一个读节点B,这时候B也会获取读锁,然后B继续唤醒C,依次往复,也就是说这里的节点是一个唤醒一个这样的形式,而不是一个节点获取了读锁后一次性唤醒后面所有的读节点。

ReadLock.unlock()

// java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock.unlock
public void unlock() {sync.releaseShared(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.releaseShared
public final boolean releaseShared(int arg) {// 如果尝试释放成功了,就唤醒下一个节点if (tryReleaseShared(arg)) {// 这个方法实际是唤醒下一个节点doReleaseShared();return true;}return false;
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryReleaseShared
protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();if (firstReader == current) {// 如果第一个读者(读线程)是当前线程// 就把它重入的次数减1// 如果减到0了就把第一个读者置为空if (firstReaderHoldCount == 1)firstReader = null;elsefirstReaderHoldCount--;} else {// 如果第一个读者不是当前线程// 一样地,把它重入的次数减1HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();int count = rh.count;if (count <= 1) {readHolds.remove();if (count <= 0)throw unmatchedUnlockException();}--rh.count;}for (;;) {// 共享锁获取的次数减1// 如果减为0了说明完全释放了,才返回trueint c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))return nextc == 0;}
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.doReleaseShared
// 行为跟方法名有点不符,实际是唤醒下一个节点
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;// 如果头节点状态为SIGNAL,说明要唤醒下一个节点if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck cases// 唤醒下一个节点unparkSuccessor(h);}else if (ws == 0 &&// 把头节点的状态改为PROPAGATE成功才会跳到下面的if!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}// 如果唤醒后head没变,则跳出循环if (h == head)                   // loop if head changedbreak;}
}
复制代码

解锁的大致流程如下:

(1)将当前线程重入的次数减1;

(2)将共享锁总共被获取的次数减1;

(3)如果共享锁获取的次数减为0了,说明共享锁完全释放了,那就唤醒下一个节点;

如下图,ABC三个节点各获取了一次共享锁,三者释放的顺序分别为ACB,那么最后B释放共享锁的时候tryReleaseShared()才会返回true,进而才会唤醒下一个节点D。

WriteLock.lock()

// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock.lock()
public void lock() {sync.acquire(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire()
public final void acquire(int arg) {// 先尝试获取锁// 如果失败,则会进入队列中排队,后面的逻辑跟ReentrantLock一模一样了if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryAcquire()
protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();// 状态变量state的值int c = getState();// 互斥锁被获取的次数int w = exclusiveCount(c);if (c != 0) {// 如果c!=0且w==0,说明共享锁被获取的次数不为0// 这句话整个的意思就是// 如果共享锁被获取的次数不为0,或者被其它线程获取了互斥锁(写锁)// 那么就返回false,获取写锁失败if (w == 0 || current != getExclusiveOwnerThread())return false;// 溢出检测if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");// 到这里说明当前线程已经获取过写锁,这里是重入了,直接把state加1即可setState(c + acquires);// 获取写锁成功return true;}// 如果c等于0,就尝试更新state的值(非公平模式writerShouldBlock()返回false)// 如果失败了,说明获取写锁失败,返回false// 如果成功了,说明获取写锁成功,把自己设置为占有者,并返回trueif (writerShouldBlock() ||!compareAndSetState(c, c + acquires))return false;setExclusiveOwnerThread(current);return true;
}
// 获取写锁失败了后面的逻辑跟ReentrantLock是一致的,进入队列排队,这里就不列源码了
复制代码

写锁获取的过程大致如下:

(1)尝试获取锁;

(2)如果有读者占有着读锁,尝试获取写锁失败;

(3)如果有其它线程占有着写锁,尝试获取写锁失败;

(4)如果是当前线程占有着写锁,尝试获取写锁成功,state值加1;

(5)如果没有线程占有着锁(state==0),当前线程尝试更新state的值,成功了表示尝试获取锁成功,否则失败;

(6)尝试获取锁失败以后,进入队列排队,等待被唤醒;

(7)后续逻辑跟ReentrantLock是一致;

WriteLock.unlock()

// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock.unlock()
public void unlock() {sync.release(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer.release()
public final boolean release(int arg) {// 如果尝试释放锁成功(完全释放锁)// 就尝试唤醒下一个节点if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryRelease()
protected final boolean tryRelease(int releases) {// 如果写锁不是当前线程占有着,抛出异常if (!isHeldExclusively())throw new IllegalMonitorStateException();// 状态变量的值减1int nextc = getState() - releases;// 是否完全释放锁boolean free = exclusiveCount(nextc) == 0;if (free)setExclusiveOwnerThread(null);// 设置状态变量的值setState(nextc);// 如果完全释放了写锁,返回truereturn free;
}
复制代码

写锁释放的过程大致为:

(1)先尝试释放锁,即状态变量state的值减1;

(2)如果减为0了,说明完全释放了锁;

(3)完全释放了锁才唤醒下一个等待的节点;

总结

(1)ReentrantReadWriteLock采用读写锁的思想,能提高并发的吞吐量;

(2)读锁使用的是共享锁,多个读锁可以一起获取锁,互相不会影响,即读读不互斥;

(3)读写、写读和写写是会互斥的,前者占有着锁,后者需要进入AQS队列中排队;

(4)多个连续的读线程是一个接着一个被唤醒的,而不是一次性唤醒所有读线程;

(5)只有多个读锁都完全释放了才会唤醒下一个写线程;

(6)只有写锁完全释放了才会唤醒下一个等待者,这个等待者有可能是读线程,也可能是写线程;

彩蛋

(1)如果同一个线程先获取读锁,再获取写锁会怎样?

分析上图中的代码,在tryAcquire()方法中,如果读锁被获取的次数不为0(c != 0 && w == 0),返回false,返回之后外层方法会让当前线程阻塞。

可以通过下面的方法验证:

readLock.lock();
writeLock.lock();
writeLock.unlock();
readLock.unlock();
复制代码

运行程序后会发现代码停止在writeLock.lock();,当然,你也可以打个断点跟踪进去看看。

(2)如果同一个线程先获取写锁,再获取读锁会怎样?

分析上面的代码,在tryAcquireShared()方法中,第一个红框处并不会返回,因为不满足getExclusiveOwnerThread() != current;第二个红框处如果原子更新成功就说明获取了读锁,然后就会执行第三个红框处的代码把其重入次数更改为1。

可以通过下面的方法验证:

writeLock.lock();
readLock.lock();
readLock.unlock();
writeLock.unlock();
复制代码

你可以打个断点跟踪一下看看。

(3)死锁了么?

通过上面的两个例子,我们可以感受到同一个线程先读后写和先写后读是完全不一样的,为什么不一样呢?

先读后写,一个线程占有读锁后,其它线程还是可以占有读锁的,这时候如果在其它线程占有读锁之前让自己占有了写锁,其它线程又不能占有读锁了,这段程序会非常难实现,逻辑也很奇怪,所以,设计成只要一个线程占有了读锁,其它线程包括它自己都不能再获取写锁。

先写后读,一个线程占有写锁后,其它线程是不能占有任何锁的,这时候,即使自己占有一个读锁,对程序的逻辑也不会有任何影响,所以,一个线程占有写锁后是可以再占有读锁的,只是这个时候其它线程依然无法获取读锁。

如果你仔细思考上面的逻辑,你会发现一个线程先占有读锁后占有写锁,会有一个很大的问题——锁无法被释放也无法被获取了。这个线程先占有了读锁,然后自己再占有写锁的时候会阻塞,然后它就自己把自己搞死了,进而把其它线程也搞死了,它无法释放锁,其它线程也无法获得锁了。

这是死锁吗?似乎不是,死锁的定义是线程A占有着线程B需要的资源,线程B占有着线程A需要的资源,两个线程相互等待对方释放资源,经典的死锁例子如下:

Object a = new Object();
Object b = new Object();new Thread(()->{synchronized (a) {LockSupport.parkNanos(1000000);synchronized (b) {}}
}).start();new Thread(()->{synchronized (b) {synchronized (a) {}}
}).start();
复制代码

简单的死锁用jstack是可以看到的:

"Thread-1":at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest.lambda$main$1(ReentrantReadWriteLockTest.java:40)- waiting to lock <0x000000076baa9068> (a java.lang.Object)- locked <0x000000076baa9078> (a java.lang.Object)at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest$$Lambda$2/1831932724.run(Unknown Source)at java.lang.Thread.run(Thread.java:748)
"Thread-0":at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest.lambda$main$0(ReentrantReadWriteLockTest.java:32)- waiting to lock <0x000000076baa9078> (a java.lang.Object)- locked <0x000000076baa9068> (a java.lang.Object)at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest$$Lambda$1/1096979270.run(Unknown Source)at java.lang.Thread.run(Thread.java:748)Found 1 deadlock.
复制代码

(4)如何使用ReentrantReadWriteLock实现一个高效安全的TreeMap?

class SafeTreeMap {private final Map<String, Object> m = new TreeMap<String, Object>();private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();private final Lock readLock = lock.readLock();private final Lock writeLock = lock.writeLock();public Object get(String key) {readLock.lock();try {return m.get(key);} finally {readLock.unlock();}}public Object put(String key, Object value) {writeLock.lock();try {return m.put(key, value);} finally {writeLock.unlock();}}
}
复制代码

推荐阅读

  1. 死磕 java同步系列之ReentrantLock VS synchronized

  2. 死磕 java同步系列之ReentrantLock源码解析(二)——条件锁

  3. 死磕 java同步系列之ReentrantLock源码解析(一)——公平锁、非公平锁

  4. 死磕 java同步系列之AQS起篇

  5. 死磕 java同步系列之自己动手写一个锁Lock

  6. 死磕 java魔法类之Unsafe解析

  7. 死磕 java同步系列之JMM(Java Memory Model)

  8. 死磕 java同步系列之volatile解析

  9. 死磕 java同步系列之synchronized解析

欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一起畅游源码的海洋。

转载于:https://juejin.im/post/5d02525f51882520724c92b9

死磕 java同步系列之ReentrantReadWriteLock源码解析相关推荐

  1. java condition_死磕 java同步系列之ReentrantLock源码解析(二)

    (手机横屏看源码更方便) 问题 (1)条件锁是什么? (2)条件锁适用于什么场景? (3)条件锁的await()是在其它线程signal()的时候唤醒的吗? 简介 条件锁,是指在获取锁之后发现当前业务 ...

  2. phaser java_死磕 java同步系列之Phaser源码解析

    问题 (1)Phaser是什么? (2)Phaser具有哪些特性? (3)Phaser相对于CyclicBarrier和CountDownLatch的优势? 简介 Phaser,翻译为阶段,它适用于这 ...

  3. 深读源码-java同步系列之StampedLock源码解析

    问题 (1)StampedLock是什么? (2)StampedLock具有什么特性? (3)StampedLock是否支持可重入? (4)StampedLock与ReentrantReadWrite ...

  4. 死磕java_死磕 java同步系列之AQS终篇(面试)

    问题 (1)AQS的定位? (2)AQS的重要组成部分? (3)AQS运用的设计模式? (4)AQS的总体流程? 简介 AQS的全称是AbstractQueuedSynchronizer,它的定位是为 ...

  5. 死磕 java同步系列之redis分布式锁进化史

    问题 (1)redis如何实现分布式锁? (2)redis分布式锁有哪些优点? (3)redis分布式锁有哪些缺点? (4)redis实现分布式锁有没有现成的轮子可以使用? 简介 Redis(全称:R ...

  6. 死磕 java同步系列之终结篇

    简介 同步系列到此就结束了,本篇文章对同步系列做一个总结. 脑图 下面是关于同步系列的一份脑图,列举了主要的知识点和问题点,看过本系列文章的同学可以根据脑图自行回顾所学的内容,也可以作为面试前的准备. ...

  7. java 同步锁_死磕 java同步系列之自己动手写一个锁Lock

    问题 (1)自己动手写一个锁需要哪些知识? (2)自己动手写一个锁到底有多简单? (3)自己能不能写出来一个完美的锁? 简介 本篇文章的目标一是自己动手写一个锁,这个锁的功能很简单,能进行正常的加锁. ...

  8. 死磕 java同步系列之AQS起篇

    问题 (1)AQS是什么? (2)AQS的定位? (3)AQS的实现原理? (4)基于AQS实现自己的锁? 简介 AQS的全称是AbstractQueuedSynchronizer,它的定位是为Jav ...

  9. 死磕java_死磕 java同步系列之AQS起篇

    问题 (1)AQS是什么? (2)AQS的定位? (3)AQS的实现原理? (4)基于AQS实现自己的锁? 简介 AQS的全称是AbstractQueuedSynchronizer,它的定位是为Jav ...

最新文章

  1. 刚安装的python如何使用-python中RabbitMQ的使用(安装和简单教程)
  2. ORMLite框架 的使用方法---给你的数据库操作插上翅膀
  3. 【数据结构】用java实现不同的七种排序算法和性能比较
  4. CSS——京东首页实战总结
  5. python自由职业可以做什么_我想成为自由职业者,但不知道做什么?
  6. 中国移动锁定数据业务新战略
  7. ****阿里云使用+快速运维总结(不断更新)
  8. 应用程序不被系统杀死
  9. Python编程实现对拉格朗日和KKT条件求极值
  10. 已经通过prince2考试,是否有必要考pmp?
  11. 数据库设计员工管理系统
  12. handler机制原理
  13. 抖音怎么加微信好友?抖音跳转微信的方法
  14. 企业信息与网络通信安全 团队成员简历-叶俊
  15. 超阅读:数码时代的文本变革
  16. 【工业互联网】郭朝晖:工业互联网平台背景下的工业大数据与智能制造
  17. JavaScript:集合与数组,重要但你不知道的ES6数据类型
  18. 计算机烧毁,终极:计算机主板烧毁了吗?为什么会燃烧?
  19. springboot启动 lombok 找不到符号
  20. 如何用matlab对信号进行傅里叶变换

热门文章

  1. 导入特征怎么实体化_幼儿教师开展集体活动开场导入方法 看过的都说很实用!...
  2. 战队基地_走a怪凌晨四点被赶出训练基地,粉丝礼物还被战队瓜分
  3. mysql修改主键属性_mysql如何改变主键属性
  4. docker 启动petalinux镜像脚本
  5. zabbix linux网卡流量,如何使用zabbix3.2监控Windows网卡流量
  6. c语言链表容易犯的错误,急求大牛啊这个容易的链表到底在哪出错了
  7. FPGA设计心得(2)边沿检测的问题进一步说明(仿真中一定能得到上升沿的设计)
  8. 【 MATLAB 】Fourier Analysis and Filtering frame(傅立叶分析和滤波 框架)
  9. 浅析开源数据库MySQL架构
  10. 趋势科技:黑客勒索瞄准智能电视