前几篇分析了一下AQS的原理和实现,这篇拿Semaphore信号量做例子看看AQS实际是如何使用的。

Semaphore表示了一种可以同时有多个线程进入临界区的同步器,它维护了一个状态表示可用的票据,只有拿到了票据的线程尽可以进入临界区,否则就等待,直到获得释放出的票据。Semaphore常用在资源池中来管理资源。当状态只有1个0两个值时,它退化成了一个互斥的同步器,类似锁。

下面来看看Semaphore的代码。

它维护了一个内部类Sync来继承AQS,定制tryXXX方法来使用AQS。我们之前提到过AQS支持独占和共享两种模式,Semaphore明显就是共享模式,它支持多个线程可以同时进入临界区。所以Sync扩展了Shared相关的方法。

可以看到Sync的主要操作都是对状态的无锁修改,它不需要处理AQS队列相关的操作。在聊聊高并发(二十四)解析java.util.concurrent各个组件(六) 深入理解AQS(四) 我们说了AQS提供了tryXXX接口给子类扩展,相当于给子类一个机会,可以自己处理状态,决定是否入同步队列。

1. nonfailTryAcquireShared()非公平的tryAcquire,它立刻修改了票据状态,而不需要管是否有先来的线程正在等待,而一旦有可用的票据,就直接获得了锁,不需要进入AQS的队列等待同步。

2. tryReleaseShared()方法负责释放共享状态的资源,它只修改了票据状态,由AQS的releaseShared()方法来负责唤醒在AQS队列等待的线程

3. reducePermits()和drainPermits()方法都是直接修改了状态,从而限制可用的资源

 
  1. abstract static class Sync extends AbstractQueuedSynchronizer {

  2. private static final long serialVersionUID = 1192457210091910933L;

  3. Sync(int permits) {

  4. setState(permits);

  5. }

  6. final int getPermits() {

  7. return getState();

  8. }

  9. final int nonfairTryAcquireShared(int acquires) {

  10. for (;;) {

  11. int available = getState();

  12. int remaining = available - acquires;

  13. if (remaining < 0 ||

  14. compareAndSetState(available, remaining))

  15. return remaining;

  16. }

  17. }

  18. protected final boolean tryReleaseShared(int releases) {

  19. for (;;) {

  20. int current = getState();

  21. int next = current + releases;

  22. if (next < current) // overflow

  23. throw new Error("Maximum permit count exceeded");

  24. if (compareAndSetState(current, next))

  25. return true;

  26. }

  27. }

  28. final void reducePermits(int reductions) {

  29. for (;;) {

  30. int current = getState();

  31. int next = current - reductions;

  32. if (next > current) // underflow

  33. throw new Error("Permit count underflow");

  34. if (compareAndSetState(current, next))

  35. return;

  36. }

  37. }

  38. final int drainPermits() {

  39. for (;;) {

  40. int current = getState();

  41. if (current == 0 || compareAndSetState(current, 0))

  42. return current;

  43. }

  44. }

  45. }

Sync也是一个抽象类,具体的实现是NonfailSync和FairSync,代表了非公平实现和公平实现。在上一篇已经提到,所谓的非公平只是说在获取资源时开了一个口子,可以让后来的线程不需要管在AQS队列中的先来的线程来获取资源,而一旦获取失败,就得进入AQS队列等待,而AQS队列是先来先服务的FIFO队列。

可以看到,NonfailSync和FairSync只是在tryAcquireShared方法的实现上不同,其他都是一样的。

 
  1. /**

  2. * NonFair version

  3. */

  4. static final class NonfairSync extends Sync {

  5. private static final long serialVersionUID = -2694183684443567898L;

  6. NonfairSync(int permits) {

  7. super(permits);

  8. }

  9. protected int tryAcquireShared(int acquires) {

  10. return nonfairTryAcquireShared(acquires);

  11. }

  12. }

  13. /**

  14. * Fair version

  15. */

  16. static final class FairSync extends Sync {

  17. private static final long serialVersionUID = 2014338818796000944L;

  18. FairSync(int permits) {

  19. super(permits);

  20. }

  21. protected int tryAcquireShared(int acquires) {

  22. for (;;) {

  23. if (hasQueuedPredecessors())

  24. return -1;

  25. int available = getState();

  26. int remaining = available - acquires;

  27. if (remaining < 0 ||

  28. compareAndSetState(available, remaining))

  29. return remaining;

  30. }

  31. }

  32. }

再来看看Semaphore自己提供的方法,

1.支持可中断和不可中断的获取/释放

2.支持限时获取

3.支持tryXX获取/释放

4. 支持同时获取/释放多个资源

可以看到Semaphore的实现都是基于AQS的方法来作的,单个资源的获取/释放操作都是请求1个资源,所以参数传递的是1,多个资源获取传递了一个int个数。

 
  1. public void acquire() throws InterruptedException {

  2. sync.acquireSharedInterruptibly(1);

  3. }

  4. public void acquireUninterruptibly() {

  5.         sync.acquireShared(1);

  6.     }

  7. public boolean tryAcquire() {

  8.         return sync.nonfairTryAcquireShared(1) >= 0;

  9.     }

  10. public boolean tryAcquire(long timeout, TimeUnit unit)

  11.         throws InterruptedException {

  12.         return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));

  13.     }

  14. public void release() {

  15.         sync.releaseShared(1);

  16.     }

  17. public void acquire(int permits) throws InterruptedException {

  18.         if (permits < 0) throw new IllegalArgumentException();

  19.         sync.acquireSharedInterruptibly(permits);

  20.     }

  21. public void acquireUninterruptibly(int permits) {

  22.         if (permits < 0) throw new IllegalArgumentException();

  23.         sync.acquireShared(permits);

  24.     }

  25. public boolean tryAcquire(int permits) {

  26.         if (permits < 0) throw new IllegalArgumentException();

  27.         return sync.nonfairTryAcquireShared(permits) >= 0;

  28.     }

  29. public boolean tryAcquire(int permits, long timeout, TimeUnit unit)

  30.         throws InterruptedException {

  31.         if (permits < 0) throw new IllegalArgumentException();

  32.         return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));

  33.     }

  34. public void release(int permits) {

  35.         if (permits < 0) throw new IllegalArgumentException();

  36.         sync.releaseShared(permits);

  37.     }

下面用一个实例来测试一下Semaphore的功能。

1. 创建一个有两个票据的Semaphore

2. 创建20个线程来竞争执行race()方法

3. 在race()方法里先打印一句等待获取资源的话,再获取资源,获得资源后打印一句话,最后释放资源,释放资源前打印一句话

 
  1. package com.lock.test;

  2. import java.util.concurrent.Semaphore;

  3. public class SemaphoreUsecase {

  4. private Semaphore semaphore = new Semaphore(2);

  5. public void race(){

  6. System.out.println("Thread " + Thread.currentThread().getName() + " is waiting the resource");

  7. semaphore.acquireUninterruptibly();

  8. try{

  9. System.out.println("Thread " + Thread.currentThread().getName() + " got the resource");

  10. try {

  11. Thread.sleep(3000);

  12. } catch (InterruptedException e) {

  13. e.printStackTrace();

  14. }

  15. }finally{

  16. System.out.println("Thread " + Thread.currentThread().getName() + " is releasing the resource");

  17. semaphore.release();

  18. }

  19. }

  20. public static void main(String[] args){

  21. final SemaphoreUsecase usecase = new SemaphoreUsecase();

  22. for(int i = 0; i < 10; i++){

  23. Thread t = new Thread(new Runnable(){

  24. @Override

  25. public void run() {

  26. usecase.race();

  27. }

  28. }, String.valueOf(i));

  29. t.start();

  30. }

  31. }

  32. }

测试结果:

可以看到先来的两个线程先获得了资源,后来的线程都在等待,当有线程释放资源之后,等待的线程才会去获得资源,直到都获得/释放资源

 
  1. Thread 0 is waiting the resource

  2. Thread 0 got the resource

  3. Thread 2 is waiting the resource

  4. Thread 2 got the resource

  5. Thread 1 is waiting the resource

  6. Thread 4 is waiting the resource

  7. Thread 3 is waiting the resource

  8. Thread 5 is waiting the resource

  9. Thread 6 is waiting the resource

  10. Thread 7 is waiting the resource

  11. Thread 8 is waiting the resource

  12. Thread 9 is waiting the resource

  13. Thread 2 is releasing the resource

  14. Thread 0 is releasing the resource

  15. Thread 1 got the resource

  16. Thread 4 got the resource

  17. Thread 1 is releasing the resource

  18. Thread 4 is releasing the resource

  19. Thread 3 got the resource

  20. Thread 5 got the resource

  21. Thread 3 is releasing the resource

  22. Thread 5 is releasing the resource

  23. Thread 6 got the resource

  24. Thread 7 got the resource

  25. Thread 7 is releasing the resource

  26. Thread 6 is releasing the resource

  27. Thread 8 got the resource

  28. Thread 9 got the resource

  29. Thread 8 is releasing the resource

  30. Thread 9 is releasing the resource

聊聊高并发(二十五)解析java.util.concurrent各个组件(七) 理解Semaphore相关推荐

  1. 聊聊高并发(十七)解析java.util.concurrent各个组件(一) 了解sun.misc.Unsafe类

    了解了并发编程中锁的基本原理之后,接下来看看Java是如何利用这些原理来实现各种锁,原子变量,同步组件的.在开始分析java.util.concurrent的源代码直接,首先要了解的就是sun.mis ...

  2. 聊聊高并发(二十)解析java.util.concurrent各个组件(二) 12个原子变量相关类

    这篇说说java.util.concurrent.atomic包里的类,总共12个,网上有很多文章解析这几个类,这里挑些重点说说. 这12个类可以分为三组: 1. 普通类型的原子变量 2. 数组类型的 ...

  3. 聊聊高并发(二十一)解析java.util.concurrent各个组件(三) 深入理解AQS(一)

    AQS是AbstractQueuedSynchronizer的缩写,AQS是Java并包里大部分同步器的基础构件,利用AQS可以很方便的创建锁和同步器.它封装了一个状态,提供了一系列的获取和释放操作, ...

  4. 聊聊高并发(二十七)解析java.util.concurrent各个组件(九) 理解ReentrantLock可重入锁

    这篇讲讲ReentrantLock可重入锁,JUC里提供的可重入锁是基于AQS实现的阻塞式可重入锁.这篇 聊聊高并发(十六)实现一个简单的可重入锁 模拟了可重入锁的实现.可重入锁的特点是: 1. 是互 ...

  5. 聊聊高并发(三十)解析java.util.concurrent各个组件(十二) 理解CyclicBarrier栅栏

    这篇讲讲CyclicBarrier栅栏,从它的名字可以看出,它是可循环使用的.它的功能和CountDownLatch类似,也是让一组线程等待,然后一起开始往下执行.但是两者还是有几个区别 1. 等待的 ...

  6. 聊聊高并发(二十三)解析java.util.concurrent各个组件(五) 深入理解AQS(三)

    这篇对AQS做一个总结. 上一篇帖了很多AQS的代码,可以看出AQS的实现思路很简单,就是提供了获取acquire和释放操作release,提供了 1. 可中断和不可中断的版本 2. 可定时和不可定时 ...

  7. 聊聊高并发(四十)解析java.util.concurrent各个组件(十六) ThreadPoolExecutor源码分析

    ThreadPoolExecutor是Executor执行框架最重要的一个实现类,提供了线程池管理和任务管理是两个最基本的能力.这篇通过分析ThreadPoolExecutor的源码来看看如何设计和实 ...

  8. 聊聊高并发(二十二)解析java.util.concurrent各个组件(四) 深入理解AQS(二)

    上一篇介绍了AQS的基本设计思路以及两个内部类Node和ConditionObject的实现 聊聊高并发(二十一)解析java.util.concurrent各个组件(三) 深入理解AQS(一) 这篇 ...

  9. 聊聊高并发(二十九)解析java.util.concurrent各个组件(十一) 再看看ReentrantReadWriteLock可重入读-写锁

    上一篇聊聊高并发(二十八)解析java.util.concurrent各个组件(十) 理解ReentrantReadWriteLock可重入读-写锁 讲了可重入读写锁的基本情况和主要的方法,显示了如何 ...

最新文章

  1. 互联网+办公”中的网红企业网盘——坚果云
  2. python里面temp是啥-python temp file:如何打开多次临时文件?
  3. 【FI】SAP ODN简介
  4. HDU1045 Fire Net 递归回溯
  5. mysql 创建表单
  6. .Net Core中使用ref和SpanT提高程序性能
  7. Linux驱动总结3- unlocked_ioctl和堵塞(waitqueue)读写函数的实现 【转】
  8. JavaScript动态网页制作宝库
  9. 2-set 1823: [JSOI2010]满汉全席
  10. 量子计算机中的物理知识,量子计算机和物理学上的量子力学关系大吗?
  11. Linux中Sort命令详解
  12. Win 10 下 android studio显示 Intel haxm无法安装,以及VT-X和hyper-x的冲突问题
  13. quora 查看自己关注了谁
  14. sns.heatmap用法
  15. HTML基础之 小白入门
  16. matlab学霸表白公式,一个理科学霸的表白:数学公式的超酷表
  17. 37岁被裁员,大公司不愿要,无奈去小公司面试,HR的话扎心了
  18. ElasticSearch索引生命周期管理(ILM)
  19. 载谭 Binomial Sum:多项式复合、插值与泰勒展开
  20. 获取Class的三种方法

热门文章

  1. python炫酷特效代码_推荐几个炫酷的 Python 开源项目
  2. android 字体渲染机制,Android:字体渲染问题.ttf 3MB
  3. all any 或 此运算符后面必须跟_用 ANY、SOME 或 ALL 修改的比较运算符
  4. python中area是什么意思_python – 与openCV 3中的contourArea的兼容性问题
  5. python legb_理解 Python 的 LEGB.
  6. 纸的大小图解_图解常见纸张开数尺寸印前小常识
  7. 是什么包_包粽子教程,喜欢的收藏,以后想吃什么样的都可以自己包
  8. vue怎么给html元素加类选择器,Vue.js——获取Dom对象的类选择器名(className)
  9. linux终端帮助,Linux下的帮助命令
  10. php 如何实现全选,如何用thinkphp框架实现全选,反选,全不选功能?