这篇讲讲ReentrantReadWriteLock可重入读写锁,它不仅是读写锁的实现,并且支持可重入性。 聊聊高并发(十五)实现一个简单的读-写锁(共享-排他锁) 这篇讲了如何模拟一个读写锁。

可重入的读写锁的特点是

1. 当有线程获取读锁时,不允许再有线程获得写锁

2. 当有线程获得写锁时,不允许其他线程获得读锁和写锁

这里隐含着几层含义:

 
  1. static final int SHARED_SHIFT = 16;

  2. static final int SHARED_UNIT = (1 << SHARED_SHIFT);

  3. static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;

  4. static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

  5. /** Returns the number of shared holds represented in count */

  6. static int sharedCount(int c) { return c >>> SHARED_SHIFT; }

  7. /** Returns the number of exclusive holds represented in count */

  8. static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

1. 可以同时有多个线程同时获得读锁,进入临界区。这时候的读锁的行为和Semaphore信号量是类似的

2. 由于是可重入的,所以1个线程如果获得了读锁,那么它可以重入这个读锁

3. 如果1个线程获得了读锁,那么它不能同时再获得写锁,这个就是所谓的“锁升级”,读锁升级到写锁可能会造成死锁,所以是不允许的

4. 如果1个线程获得了写锁,那么不允许其他线程再获得读锁和写锁,但是它自己可以获得读锁,就是所谓的“锁降级”,锁降级是允许的

关于读写锁的实现还要考虑的几个要点:

1. 释放锁时的优先级问题,是让写锁先获得还是先让读锁先获得

2. 是否允许读线程插队

3. 是否允许写线程插队,因为读写锁一般用在大量读,少量写的情况,如果写线程没有优先级,那么可能造成写线程的饥饿

4. 锁的升降级问题,一般是允许1个线程的写锁降级为读锁,不允许读锁升级成写锁

带着问题看看ReentrantReadWriteLock的源码。 它同样提供了Sync来继承AQS并提供扩展,但是它的Sync相比较Semaphore和CountDownLatch要更加复杂。

1. 把State状态作为一个读写锁的计数器,包括了重入的次数。state是32位的int值,所以把高位16位作为读锁的计数器,低位的16位作为写锁的计数器,并提供了响应的读写这两个计数器的位操作方法。

计算sharedCount时,采用无符号的移位操作,右移16位就是读锁计数器的值

写锁直接用EXCLUSIVE_MASK和state做与运算,EXCLUSIVE_MASK的值是00000000000000001111111111111111,相当于计算了低位16位的值

需要注意计算出来的值包含了重入的次数。所以MAX_COUNT限定了最大值是2^17 - 1

 
  1. static final int SHARED_SHIFT = 16;

  2. static final int SHARED_UNIT = (1 << SHARED_SHIFT);

  3. static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;

  4. static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

  5. /** Returns the number of shared holds represented in count */

  6. static int sharedCount(int c) { return c >>> SHARED_SHIFT; }

  7. /** Returns the number of exclusive holds represented in count */

  8. static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

HoldCount类用来计算1个线程的重入次数,并使用了1个ThreadLocal类型的HoldCounter,可以记录每个线程的锁的重入次数。 cachedHoldCounter记录了最后1个获取读锁的线程的重入次数。 firstReader指向了第一个获取读锁的线程,firstReaderHoldCounter记录了第一个获取读锁的线程的重入次数

 
  1. static final class HoldCounter {

  2. int count = 0;

  3. // Use id, not reference, to avoid garbage retention

  4. final long tid = Thread.currentThread().getId();

  5. }

  6. /**

  7. * ThreadLocal subclass. Easiest to explicitly define for sake

  8. * of deserialization mechanics.

  9. */

  10. static final class ThreadLocalHoldCounter

  11. extends ThreadLocal<HoldCounter> {

  12. public HoldCounter initialValue() {

  13. return new HoldCounter();

  14. }

  15. }

  16. /**

  17.          * The hold count of the last thread to successfully acquire

  18.          * readLock. This saves ThreadLocal lookup in the common case

  19.          * where the next thread to release is the last one to

  20.          * acquire. This is non-volatile since it is just used

  21.          * as a heuristic, and would be great for threads to cache.

  22.          *

  23.          * <p>Can outlive the Thread for which it is caching the read

  24.          * hold count, but avoids garbage retention by not retaining a

  25.          * reference to the Thread.

  26.          *

  27.          * <p>Accessed via a benign data race; relies on the memory

  28.          * model's final field and out-of-thin-air guarantees.

  29.          */

  30.         private transient HoldCounter cachedHoldCounter;

Sync提供了两个抽象方法给子类扩展,用来表示读锁和写锁是否应该阻塞等待

 
  1. /**

  2. * Returns true if the current thread, when trying to acquire

  3. * the read lock, and otherwise eligible to do so, should block

  4. * because of policy for overtaking other waiting threads.

  5. */

  6. abstract boolean readerShouldBlock();

  7. /**

  8. * Returns true if the current thread, when trying to acquire

  9. * the write lock, and otherwise eligible to do so, should block

  10. * because of policy for overtaking other waiting threads.

  11. */

  12. abstract boolean writerShouldBlock();

写锁的tryXXX获取和释放

1. 写锁释放时,由于没有其他线程获得临界区,它的tryRelease()方法只需要设置状态的值,通过exclusiveCount计算写锁的计数器,如果为0表示释放了写锁,就把exclusiveOwnerThread设置为null.

2. 写锁的tryAcquire获取时,

先判断状态是否为0,为0表示没有线程获得锁,就可以直接设置状态,然后把exclusiveOwnerThread设置为当前线程

如果状态不为0,那表示有几种可能:写锁为0,读锁不为0;写锁不为0,读锁为0,写锁不为0,读锁也不为0。

所以它先判断写锁是否为0,写锁为0,那么表示读锁肯定不会为0,就失败,

或者写锁不为0,但是exclusiveOwnerThread不是自己,那么表示已经有其他线程获得了写锁,就失败

写锁不为0,并且exclusiveOwnerThread是自己,那么肯定表示是写锁的重入的情况,所以设置state状态,返回成功。

 
  1. protected final boolean tryRelease(int releases) {

  2. if (!isHeldExclusively())

  3. throw new IllegalMonitorStateException();

  4. int nextc = getState() - releases;

  5. boolean free = exclusiveCount(nextc) == 0;

  6. if (free)

  7. setExclusiveOwnerThread(null);

  8. setState(nextc);

  9. return free;

  10. }

  11. protected final boolean tryAcquire(int acquires) {

  12.             /*

  13.              * Walkthrough:

  14.              * 1. If read count nonzero or write count nonzero

  15.              *    and owner is a different thread, fail.

  16.              * 2. If count would saturate, fail. (This can only

  17.              *    happen if count is already nonzero.)

  18.              * 3. Otherwise, this thread is eligible for lock if

  19.              *    it is either a reentrant acquire or

  20.              *    queue policy allows it. If so, update state

  21.              *    and set owner.

  22.              */

  23.             Thread current = Thread.currentThread();

  24.             int c = getState();

  25.             int w = exclusiveCount(c);

  26.             if (c != 0) {

  27.                 // (Note: if c != 0 and w == 0 then shared count != 0)

  28.                 if (w == 0 || current != getExclusiveOwnerThread())

  29.                     return false;

  30.                 if (w + exclusiveCount(acquires) > MAX_COUNT)

  31.                     throw new Error("Maximum lock count exceeded");

  32.                 // Reentrant acquire

  33.                 setState(c + acquires);

  34.                 return true;

  35.             }

  36.             if (writerShouldBlock() ||

  37.                 !compareAndSetState(c, c + acquires))

  38.                 return false;

  39.             setExclusiveOwnerThread(current);

  40.             return true;

  41.         }

读锁的tryXXX获取和释放

1. 读锁释放时基于共享的方式,修改线程各自的HoldCounter的值,最后采用位操作修改位于state的总体的读锁计数器。tryReleaseShared()之后具体的释放后续线程的操作由AQS根据队列状态来决定。

2. 读所获取时先看写锁的计数器,如果写锁已经被获取,并且不是当前线程所获取的,就直接失败返回

这里会进行一次快速路径获取,尝试获取一次,如果readShouldBlock()返回false,并且CAS操作成功了,意思是可以获得锁,就更新相关读锁计数器

否则就进行轮询方式的获取fullTryAcquireShared()

也就是说如果当前没有线程获取写锁,或者是自己获取写锁,就可以获取读锁

一个线程获取了写锁之后,它还可以获取读锁,也就是所谓的“锁降级”,但这时候其他线程无法获取读锁,在检查到有其他写锁存在时就退出了

 
  1. protected final boolean tryReleaseShared(int unused) {

  2. Thread current = Thread.currentThread();

  3. if (firstReader == current) {

  4. // assert firstReaderHoldCount > 0;

  5. if (firstReaderHoldCount == 1)

  6. firstReader = null;

  7. else

  8. firstReaderHoldCount--;

  9. } else {

  10. HoldCounter rh = cachedHoldCounter;

  11. if (rh == null || rh.tid != current.getId())

  12. rh = readHolds.get();

  13. int count = rh.count;

  14. if (count <= 1) {

  15. readHolds.remove();

  16. if (count <= 0)

  17. throw unmatchedUnlockException();

  18. }

  19. --rh.count;

  20. }

  21. for (;;) {

  22. int c = getState();

  23. int nextc = c - SHARED_UNIT;

  24. if (compareAndSetState(c, nextc))

  25. // Releasing the read lock has no effect on readers,

  26. // but it may allow waiting writers to proceed if

  27. // both read and write locks are now free.

  28. return nextc == 0;

  29. }

  30. }

  31. protected final int tryAcquireShared(int unused) {

  32.             /*

  33.              * Walkthrough:

  34.              * 1. If write lock held by another thread, fail.

  35.              * 2. Otherwise, this thread is eligible for

  36.              *    lock wrt state, so ask if it should block

  37.              *    because of queue policy. If not, try

  38.              *    to grant by CASing state and updating count.

  39.              *    Note that step does not check for reentrant

  40.              *    acquires, which is postponed to full version

  41.              *    to avoid having to check hold count in

  42.              *    the more typical non-reentrant case.

  43.              * 3. If step 2 fails either because thread

  44.              *    apparently not eligible or CAS fails or count

  45.              *    saturated, chain to version with full retry loop.

  46.              */

  47.             Thread current = Thread.currentThread();

  48.             int c = getState();

  49.             if (exclusiveCount(c) != 0 &&

  50.                 getExclusiveOwnerThread() != current)

  51.                 return -1;

  52.             int r = sharedCount(c);

  53.             if (!readerShouldBlock() &&

  54.                 r < MAX_COUNT &&

  55.                 compareAndSetState(c, c + SHARED_UNIT)) {

  56.                 if (r == 0) {

  57.                     firstReader = current;

  58.                     firstReaderHoldCount = 1;

  59.                 } else if (firstReader == current) {

  60.                     firstReaderHoldCount++;

  61.                 } else {

  62.                     HoldCounter rh = cachedHoldCounter;

  63.                     if (rh == null || rh.tid != current.getId())

  64.                         cachedHoldCounter = rh = readHolds.get();

  65.                     else if (rh.count == 0)

  66.                         readHolds.set(rh);

  67.                     rh.count++;

  68.                 }

  69.                 return 1;

  70.             }

  71.             return fullTryAcquireShared(current);

  72.         }

  73.  /**

  74.          * Full version of acquire for reads, that handles CAS misses

  75.          * and reentrant reads not dealt with in tryAcquireShared.

  76.          */

  77.         final int fullTryAcquireShared(Thread current) {

  78.             /*

  79.              * This code is in part redundant with that in

  80.              * tryAcquireShared but is simpler overall by not

  81.              * complicating tryAcquireShared with interactions between

  82.              * retries and lazily reading hold counts.

  83.              */

  84.             HoldCounter rh = cachedHoldCounter;

  85.             if (rh == null || rh.tid != current.getId())

  86.                 rh = readHolds.get();

  87.             for (;;) {

  88.                 int c = getState();

  89.                 int w = exclusiveCount(c);

  90.                 if ((w != 0 && getExclusiveOwnerThread() != current) ||

  91.                     ((rh.count | w) == 0 && readerShouldBlock(current)))

  92.                     return -1;

  93.                 if (sharedCount(c) == MAX_COUNT)

  94.                     throw new Error("Maximum lock count exceeded");

  95.                 if (compareAndSetState(c, c + SHARED_UNIT)) {

  96.                     cachedHoldCounter = rh; // cache for release

  97.                     rh.count++;

  98.                     return 1;

  99.                 }

  100.             }

  101.         } 

tryWriteLock和tryReadLock操作和上面的操作类似,它们是读写锁的tryLock()的实际实现,表示尝试获取一次锁

1. tryWriteLock方法尝试获得写锁,先判断状态是否为0,为0并且CAS操作成功就表示获得锁。如果状态不为0,就判断写锁计数器的值,如果写锁计数器为0就表示存在读锁,就返回失败,获取写锁不为0,但是不是当前线程所获取的,也返回失败。只有写锁不为0并且是当前线程自己获取的写锁,就是所谓的写锁重入操作。CAS成功后就表示获得写锁

 
  1. final boolean tryWriteLock() {

  2. Thread current = Thread.currentThread();

  3. int c = getState();

  4. if (c != 0) {

  5. int w = exclusiveCount(c);

  6. if (w == 0 ||current != getExclusiveOwnerThread())

  7. return false;

  8. if (w == MAX_COUNT)

  9. throw new Error("Maximum lock count exceeded");

  10. }

  11. if (!compareAndSetState(c, c + 1))

  12. return false;

  13. setExclusiveOwnerThread(current);

  14. return true;

  15. }

  16. final boolean tryReadLock() {

  17.             Thread current = Thread.currentThread();

  18.             for (;;) {

  19.                 int c = getState();

  20.                 if (exclusiveCount(c) != 0 &&

  21.                     getExclusiveOwnerThread() != current)

  22.                     return false;

  23.                 if (sharedCount(c) == MAX_COUNT)

  24.                     throw new Error("Maximum lock count exceeded");

  25.                 if (compareAndSetState(c, c + SHARED_UNIT)) {

  26.                     HoldCounter rh = cachedHoldCounter;

  27.                     if (rh == null || rh.tid != current.getId())

  28.                         cachedHoldCounter = rh = readHolds.get();

  29.                     rh.count++;

  30.                     return true;

  31.                 }

  32.             }

  33.         }

ReentrantReadWriteLock也提供了非公平和公平的两个Sync版本

非公平的版本中

1. 写锁总是优先获取,不考虑AQS队列中先来的线程

2. 读锁也不按FIFO队列排队,而是看当前获得锁是否是写锁,如果是写锁,就等待,否则就尝试获得锁

而公平版本中

1. 如果有其他锁存在,获取写锁操作就失败,应该(should)进AQS队列等待

2. 如果有其他锁存在,获取读锁操作就失败,应该(should)进AQS队列等待

 
  1. final static class NonfairSync extends Sync {

  2. private static final long serialVersionUID = -8159625535654395037L;

  3. final boolean writerShouldBlock(Thread current) {

  4. return false; // writers can always barge

  5. }

  6. final boolean readerShouldBlock(Thread current) {

  7. /* As a heuristic to avoid indefinite writer starvation,

  8. * block if the thread that momentarily appears to be head

  9. * of queue, if one exists, is a waiting writer. This is

  10. * only a probablistic effect since a new reader will not

  11. * block if there is a waiting writer behind other enabled

  12. * readers that have not yet drained from the queue.

  13. */

  14. return apparentlyFirstQueuedIsExclusive();

  15. }

  16. }

  17. /**

  18. * Fair version of Sync

  19. */

  20. final static class FairSync extends Sync {

  21. private static final long serialVersionUID = -2274990926593161451L;

  22. final boolean writerShouldBlock(Thread current) {

  23. // only proceed if queue is empty or current thread at head

  24. return !isFirst(current);

  25. }

  26. final boolean readerShouldBlock(Thread current) {

  27. // only proceed if queue is empty or current thread at head

  28. return !isFirst(current);

  29. }

  30. }

具体ReadLock和WriteLock的实现就是依赖Sync来实现的,默认是非公平版本的Sync。

读锁采用共享默认的AQS,它提供了中断/不可中断的lock操作,tryLock操作,限时的tryLock操作。值得注意的时读锁不支持newCondition操作。

 
  1. public static class ReadLock implements Lock, java.io.Serializable {

  2. private static final long serialVersionUID = -5992448646407690164L;

  3. private final Sync sync;

  4. protected ReadLock(ReentrantReadWriteLock lock) {

  5. sync = lock.sync;

  6. }

  7. public void lock() {

  8. sync.acquireShared(1);

  9. }

  10.   public void lockInterruptibly() throws InterruptedException {

  11.             sync.acquireSharedInterruptibly(1);

  12.         }

  13. public  boolean tryLock() {

  14.             return sync.tryReadLock();

  15.         }

  16. public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {

  17.             return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));

  18.         }

  19. public  void unlock() {

  20.             sync.releaseShared(1);

  21.         }

  22.   public Condition newCondition() {

  23.             throw new UnsupportedOperationException();

  24.         }

WriteLock基于独占模式的AQS,它提供了中断/不可中断的lock操作,tryLock操作,限时的tryLock操作

 
  1. public static class WriteLock implements Lock, java.io.Serializable {

  2. private static final long serialVersionUID = -4992448646407690164L;

  3. private final Sync sync;

  4. protected WriteLock(ReentrantReadWriteLock lock) {

  5. sync = lock.sync;

  6. }

  7.   public void lock() {

  8.             sync.acquire(1);

  9.         }

  10.  public void lockInterruptibly() throws InterruptedException {

  11.             sync.acquireInterruptibly(1);

  12.         }

  13. public boolean tryLock( ) {

  14.             return sync.tryWriteLock();

  15.         }

  16. public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {

  17.             return sync.tryAcquireNanos(1, unit.toNanos(timeout));

  18.         }

  19. public void unlock() {

  20.             sync.release(1);

  21.         }

  22. public Condition newCondition() {

  23.             return sync.newCondition();

  24.         }

最后再说一下AQS和各种同步器实现的关系,AQS提供了同步队列和条件队列的管理,包括各种情况下的入队出队操作。而同步器子类实现了tryAcquire和tryRelease方法来操作状态,来表示什么情况下可以直接获得锁而不需要进入AQS,什么情况下获取锁失败则需要进入AQS队列等待

聊聊高并发(二十八)解析java.util.concurrent各个组件(十) 理解ReentrantReadWriteLock可重入读-写锁相关推荐

  1. 聊聊高并发(十七)解析java.util.concurrent各个组件(一) 了解sun.misc.Unsafe类

    了解了并发编程中锁的基本原理之后,接下来看看Java是如何利用这些原理来实现各种锁,原子变量,同步组件的.在开始分析java.util.concurrent的源代码直接,首先要了解的就是sun.mis ...

  2. 聊聊高并发(二十一)解析java.util.concurrent各个组件(三) 深入理解AQS(一)

    AQS是AbstractQueuedSynchronizer的缩写,AQS是Java并包里大部分同步器的基础构件,利用AQS可以很方便的创建锁和同步器.它封装了一个状态,提供了一系列的获取和释放操作, ...

  3. 聊聊高并发(二十)解析java.util.concurrent各个组件(二) 12个原子变量相关类

    这篇说说java.util.concurrent.atomic包里的类,总共12个,网上有很多文章解析这几个类,这里挑些重点说说. 这12个类可以分为三组: 1. 普通类型的原子变量 2. 数组类型的 ...

  4. 聊聊高并发(三十)解析java.util.concurrent各个组件(十二) 理解CyclicBarrier栅栏

    这篇讲讲CyclicBarrier栅栏,从它的名字可以看出,它是可循环使用的.它的功能和CountDownLatch类似,也是让一组线程等待,然后一起开始往下执行.但是两者还是有几个区别 1. 等待的 ...

  5. 聊聊高并发(二十七)解析java.util.concurrent各个组件(九) 理解ReentrantLock可重入锁

    这篇讲讲ReentrantLock可重入锁,JUC里提供的可重入锁是基于AQS实现的阻塞式可重入锁.这篇 聊聊高并发(十六)实现一个简单的可重入锁 模拟了可重入锁的实现.可重入锁的特点是: 1. 是互 ...

  6. 聊聊高并发(二十三)解析java.util.concurrent各个组件(五) 深入理解AQS(三)

    这篇对AQS做一个总结. 上一篇帖了很多AQS的代码,可以看出AQS的实现思路很简单,就是提供了获取acquire和释放操作release,提供了 1. 可中断和不可中断的版本 2. 可定时和不可定时 ...

  7. 聊聊高并发(四十)解析java.util.concurrent各个组件(十六) ThreadPoolExecutor源码分析

    ThreadPoolExecutor是Executor执行框架最重要的一个实现类,提供了线程池管理和任务管理是两个最基本的能力.这篇通过分析ThreadPoolExecutor的源码来看看如何设计和实 ...

  8. 聊聊高并发(二十九)解析java.util.concurrent各个组件(十一) 再看看ReentrantReadWriteLock可重入读-写锁

    上一篇聊聊高并发(二十八)解析java.util.concurrent各个组件(十) 理解ReentrantReadWriteLock可重入读-写锁 讲了可重入读写锁的基本情况和主要的方法,显示了如何 ...

  9. 聊聊高并发(二十九)解析java.util.concurrent各个组件(十一) 再看看ReentrantReadWriteLock可重入读-写锁...

    上一篇聊聊高并发(二十八)解析java.util.concurrent各个组件(十) 理解ReentrantReadWriteLock可重入读-写锁 讲了可重入读写锁的基本情况和基本的方法,显示了怎样 ...

最新文章

  1. 丁贵才130702010042第二次作业
  2. CV00-03-CV基本操作2
  3. 前端验证码后端返回一个图片_Web后端开发(6)——简易图片验证码的制作
  4. 炸裂!VSCode 摸鱼神器!!!
  5. Android-源代码分析
  6. 详解布隆过滤器的原理、使用场景和注意事项
  7. C#获取类名为Internet_Explorer_Server控件的内容
  8. 福建三明市梅列区巧用“智慧梅列” 助推重大项目建设
  9. idea搭建java openCV环境
  10. 找工作,改简历,投递装订——累就一个字啊!
  11. linux 内核恐慌,linux – CentOS 6.5 mdadm Raid 1 – raid数据检查期间的内核恐慌
  12. 年龄的计算方式计算机函数,excel使用时间函数计算年龄 使用Excel函数计算年龄的三种方法...
  13. python实现京东联盟API接口对接
  14. cloudcompare:怎么换背景颜色
  15. 完美收官!Fortinet Accelerate 2022中国站在北京落幕
  16. JustinMind原型制作工具
  17. 昨天面了一位,见识到了Spring的天花板~
  18. “XXX停止运行”问题解决
  19. 美团配送php,PHP对接美团配送接口遇到的坑
  20. 软件测试人员必备思维,软件测试人员的思维

热门文章

  1. Android 服务器推送技术
  2. 8g ubuntu 树莓派4b_树莓派4B安装Ubuntu系统,并安装桌面
  3. udp java 检测连接_简单的JAVA UDP连接测试
  4. 思科怎么隐藏端口_这些著名商标下的隐藏设计,你能发现吗?
  5. idea导入nodejs插件_sbt 项目导入问题
  6. parted如何将磁盘所有空间格式化_linux下大于2T的硬盘格式化问题
  7. 帝国cms php点击删除,帝国CMS删除内容非本站链接的方法(非插件)
  8. linux软件包管理工具,Linux 软件包管理器-----yum配置详解一
  9. python echo(msg) 字符串_[宜配屋]听图阁
  10. css 竖行进度图_css实现横向与竖向进度条效果的方法