文章目录

  • 1. Semaphore
  • 2. CountDownLatch
  • 3. CyclicBarrier

1. Semaphore

Semaphore字面意思是信号量,作用是控制访问特定资源的线程数目,底层依赖AQS的state状态量,常用于限流等场景。Semaphore是一种线程通信工具,类似的还有BlockingQueue、CountDownLatch、CyclicBarrier等!

Semaphore的节点类型是属于共享模式,而BlockingQueue、ReentrantLock都是独占模式!

 final Node node = addWaiter(Node.SHARED); //Semaphore

1.1 Semaphore的使用

假如有10个线程从上游服务器打过来,我的下游服务最大能承受6个线程,代码如下:

public class SemaphoreTest {public static void main(String[] args) {//最大允许6个线程Semaphore semaphore = new Semaphore(6);//一共10个线程for (int i = 1; i <= 10 ; i++) {new Thread(() -> {try {//获取下游服务器门票semaphore.acquire();System.out.println(Thread.currentThread().getName()+"抢到了");Thread.sleep(200);//释放下游服务器门票semaphore.release();System.out.println(Thread.currentThread().getName()+"释放了===");} catch (InterruptedException e) {e.printStackTrace();}finally {}},"线程"+i).start();}}
}

acquire()方法还可以制定每个线程需要的门票个数

  //state的初始值 = 6,门票池中有6个门票Semaphore semaphore = new Semaphore(6);semaphore.acquire();  // state-1 每个线程想过去,拿一张门票就好 semaphore.acquire(2); // state-2 每个线程想过去,需要拿两张门票semaphore.release(2); // state+2 把拿到的两张门票还回去//如果在100毫秒内,没有拿到门票,就不在等待,执行别的降级方法semaphore.tryAcquire(100, TimeUnit.MILLISECONDS);

1.2 Semaphore的acquire、release分析

    public Semaphore(int permits) {sync = new NonfairSync(permits);  //创建Semaphore时 默认非公平的}

semaphore.acquire();源码如下

    public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();//以共享模式获取,中断即中止  permits:acquire(2)中的2sync.acquireSharedInterruptibly(permits);}=====================================================public final void acquireSharedInterruptibly(int arg)throws InterruptedException {//对应方法名,被中断即抛异常if (Thread.interrupted())throw new InterruptedException();//尝试操作状态量,入队阻塞方法,与ReentrantLock类似但不同!if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}

tryAcquireShared(arg)方法源码如下:

       protected int tryAcquireShared(int acquires) {//因为Semaphore时 默认非公平,所以调用非公平方法return nonfairTryAcquireShared(acquires);}=====================================================final int nonfairTryAcquireShared(int acquires) {for (;;) {//获取状态量state,也就是new Semaphore(6) state=6int available = getState();//状态量-线程通过需要的门票数 = 6-2 = 4 = remaining int remaining = available - acquires;/** 如果remaining > 0 ,使用CAS算法修改状态量,并返回remaining 的值,通过tryAcquireShared(arg) < 0比较,结果为false说明这个线程拿到了门票,可以通过限制*//** 如果remaining < 0 , 不修改状态量,直接返回remaining 的值,通过tryAcquireShared(arg) < 0比较,结果为true,说明这个线程过来时门票已经不足,需要入队阻塞*/if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}

如果 state - acquires < 0 ,则tryAcquireShared(arg) < 0 为true,进入doAcquireSharedInterruptibly(arg)方法!

这个方法的目的是入队和阻塞,与ReentrantLock的操作差不多,这里不过是把入队、阻塞整合在一个方法中了!

    private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {//创建一共享节点,与ReentrantLock的入队操作一致,代码公用,//不过是把独享节点改为共享节点final Node node = addWaiter(Node.SHARED);boolean failed = true;try {//阻塞,这里的阻塞和ReentrantLock的阻塞操作基本一样,再注释一边吧for (;;) {//获取当前节点的前一个节点final Node p = node.predecessor();if (p == head) {//如果前一个节点是head节点,会再次尝试获取一次门票(状态量)int r = tryAcquireShared(arg);// r>=0代表获取到了门票,会执行出队if (r >= 0) {//注意:这里与ReetrantLock的独占模式不一样,//当 state - 当前线程要拿的门票acquires > 0,说明还有剩余的门票//又因为是共享模式,所以这里添加了广播,通知其他阻塞的线程去抢门票setHeadAndPropagate(node, r);//GC掉无用的节点p.next = null; // help GCfailed = false;return;}}//如果前一个节点不是head节点,先修改前一个节点的waitStatus = -1 //表示可以被唤醒,此时shouldParkAfterFailedAcquire(p, node)为false,不会进行阻塞操作,//然后循环下来,此时shouldParkAfterFailedAcquire(p, node)为true,阻塞线程!if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}========================================
通知怎么做的呢?setHeadAndPropagate(node, r);代码如下private void setHeadAndPropagate(Node node, int propagate) {//把老的head节点提取出来Node h = head; // Record old head for check below//设置当前节点为新的head节点,因为当前节点已经拿到锁,唤醒了setHead(node);//propagate 是state剩下的值,一定是>0的if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {//当前节点的下一个节点Node s = node.next;if (s == null || s.isShared())//通知方法,详情如下doReleaseShared();}}=============================================private void doReleaseShared() {for (;;) {//把上一步提取出来的老的head节点的引用h 指向 当前新的head(当前线程)Node h = head;if (h != null && h != tail) {//此时,当前线程的head中的waitStatus必为0int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck casesunparkSuccessor(h);}//通过CAS算法把当前新的head节点的WaitStatus设置为-3(PROPAGATE)//代表可以传播else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}//跳出循环if (h == head)                   // loop if head changedbreak;}}

传播这一块抽象流程如下:

至此,acquire() 方法执行完!下面看release方法,看一下使用完门票,如何归还回去的!

semaphore.release();源码如下

     semaphore.release(2);//permits == 2public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.releaseShared(permits);}

releaseShared(permits)如下

    public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}=============================================
上边的doReleaseShared();方法,我们发现和acquire()中通知的方法一模一样!private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}if (h == head)                   // loop if head changedbreak;}}

2. CountDownLatch

2.1 CountDownLatch的实现原理?

  • CountDownLatch是通过“共享锁”实现的。在创建CountDownLatch中时,会传递一个int类型参数count,该参数是“锁计数器”的初始状态,表示该“共享锁”最多能被count给线程同时获取。
  • 当某线程调用该CountDownLatch对象的await()方法时,该线程会等待“共享锁”可用时,才能获取“共享锁”进而继续运行。
  • 而“共享锁”可用的条件,就是“锁计数器”的值为0!而“锁计数器”的初始值为count,每当一个线程调用该CountDownLatch对象的countDown()方法时,才将“锁计数器”-1;
  • 通过这种方式,必须有count个线程调用countDown()之后,“锁计数器”才为0,而前面提到的等待线程才能继续运行!以上,就是CountDownLatch的实现原理。

2.2 CountDownLatch代码示例

/***  * CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。*  * 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),*  * 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。*/
public class CountDownLatchTest {public static void main(String[] args) throws InterruptedException{CountDownLatch countDownLatch = new CountDownLatch(6);for (int i = 1; i <=6; i++) {new Thread(() -> {System.out.println(Thread.currentThread().getName()+"\t"+"离开教室");countDownLatch.countDown();},String.valueOf(i)).start();}countDownLatch.await();//必须要等到countDownLatch从6变为零后才能执行后续流程。System.out.println(Thread.currentThread().getName()+"\t 关门走人");}
}

运行结果:

3. CyclicBarrier

3.1 CyclicBarrier是什么?
        栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
        CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

API:

cyclicBarrier.await()

3.1 CyclicBarrier代码示例

/***  * CyclicBarrier*  * 的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,*  * 让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有*  * 被屏障拦截的线程才会继续干活。线程进入屏障通过CyclicBarrier的await()方法。*  **  * 集齐7颗龙珠就可以召唤神龙*/
public class CyclicBarrierTest {public static void main(String[] args){CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> { System.out.println("****召唤神龙"); });for (int i = 1; i <=7; i++) {final int tmpInt = i;new Thread(() -> {try{System.out.println(Thread.currentThread().getName()+"\t收集到第:"+tmpInt+" 颗龙珠");//进来了就等着,直到7个全部进来cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}},String.valueOf(i)).start();}}
}

运行结果:

3.2 CyclicBarrier使用场景

CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。

例如,用一个Excel保存了用户所有银行流水,每个Sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水:

  • 先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水
  • 再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。
public class BankWaterService implements Runnable {/*** 创建4个屏障,处理完之后执行当前类的run方法*/private CyclicBarrier c = new CyclicBarrier(4, this);/*** 假设只有4个sheet,所以只启动4个线程*/private Executor executor = Executors.newFixedThreadPool(4);/*** 保存每个sheet计算出的银流结果*/private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<String, Integer>();private void count() {for (int i = 0; i < 4; i++) {executor.execute(new Runnable() {@Overridepublic void run() {// 计算当前sheet的银流数据,计算代码省略sheetBankWaterCount.put(Thread.currentThread().getName(), 1);// 银流计算完成,插入一个屏障try {c.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}});}}@Overridepublic void run() {int result = 0;// 汇总每个sheet计算出的结果for (Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {result += sheet.getValue();}// 将结果输出sheetBankWaterCount.put("result", result);System.out.println(result);}public static void main(String[] args) {BankWaterService bankWaterCount = new BankWaterService();bankWaterCount.count();}
}

抽象同步器AQS应用之-- Semaphore、CountDownLatch、CyclicBarrier的介绍相关推荐

  1. 抽象同步器AQS、CAS应用之--ReentrantLock,lock和unlock的流程、源码分析

    文章目录 1. AQS和CAS 1.1 CAS存在的bug:ABA问题 2. ReentrantLock和synchronized的区别 3. ReentrantLock的内部结构 3.1 lock. ...

  2. 《Java 7 并发编程指南》学习概要 (3)Semaphore, CountDownLatch, CyclicBarrier , Phaser, Exchanger...

    1.Semaphore  信号量 Semaphore(信号量)是一个控制访问多个共享资源的计数器. 当一个线程想要访问某个共享资源,首先,它必须获得semaphore.如果semaphore的内部计数 ...

  3. 抽象同步器AQS应用之--阻塞队列BlockingQueue,如何保证任务一定被消费?

    文章目录 1.阻塞队列简介 2. BlockingQueue源码分析 3. 生产者消费者模型如何保证信息不会丢失? 1.阻塞队列简介 1.1 什么是阻塞队列? 阻塞队列是一个队列 ①:当队列是空的,从 ...

  4. 并发编程-15并发容器(J.U.C)核心 AbstractQueuedSynchronizer 抽象队列同步器AQS介绍

    文章目录 J.U.C脑图 J.U.C核心AQS简介 AQS底层数据结构 AQS特点 J.U.C脑图 为了体现出AQS和线程池的重要性,上图单独将AQS和线程池拿出来了. J.U.C的构成如下: J.U ...

  5. 05.抽象队列同步器AQS应用之Lock详解

    AQS应用之Lock Java并发编程核心在于java.concurrent.util包而juc当中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列.条件队列.独占获取.共享获取等,而这个行为 ...

  6. 并发——抽象队列同步器AQS的实现原理

    一.前言 这段时间在研究Java并发相关的内容,一段时间下来算是小有收获了.ReentrantLock是Java并发中的重要部分,所以也是我的首要研究对象,在学习它的过程中,我发现它是基于抽象队列同步 ...

  7. Java并发同步器AQS

    AQS是AbstractQueuedSynchronizer的简写,中文名应该叫抽象队列同步器(我给的名字,哈哈),出生于Java 1.5. 一.什么是同步器 多线程并发的执行,之间通过某种 共享 状 ...

  8. java同步队列_Java 中队列同步器 AQS(AbstractQueuedSynchronizer)实现原理

    前言 在 Java 中通过锁来控制多个线程对共享资源的访问,使用 Java 编程语言开发的朋友都知道,可以通过 synchronized 关键字来实现锁的功能,它可以隐式的获取锁,也就是说我们使用该关 ...

  9. 同步器AQS中的同步队列与等待队列

    在单纯地使用锁,比如ReentrantLock的时候,这个锁组件内部有一个继承同步器AQS的类,实现了其抽象方法,加锁.释放锁也只是涉及到AQS中的同步队列而已,那么等待队列又是什么呢? 当使用Con ...

最新文章

  1. 【模板】Dijkstra
  2. UVA-10047 The Monocycle (图的BFS遍历)
  3. no exceptions support的测试程序
  4. 广东--阳江--闸波一天游归来,上PP~~
  5. Android 应用内实现导航页面,接入百度SDK内置导航,高德SDK内置导航
  6. 菜鸟学asp.net遇到的问题和解决方案
  7. python获取指定区域的像素_如何获得某个区域的像素值?
  8. NISP第一讲信息安全和网络空间安全
  9. 备份VMWare ESXi虚拟机
  10. 查询字符串中不含重复的最长子串
  11. ThinkPHP5支付宝支付(当面付)付款码ISV服务商模式
  12. 【建议收藏】这个工具专门用于寻找路由器中的安全漏洞.md
  13. 使用Windows驱动的虚拟打印机,打印Excel表格无表格线问题解决(1)
  14. Cocos Creator 下载图片动态替换纹理
  15. 淋巴瘤可以学计算机专业吗,淋巴瘤常用检查有哪些?各有什么作用?
  16. SwiftUI 界面大全之IOT物联网复杂管理界面(教程含源码)
  17. 能量守恒(能量是怎么来的)
  18. kubeadm部署kubernetes-1.12.0 HA集群-ipvs
  19. 已知均值、标准差和样本个数如何模拟一组数据?
  20. 从零开始搭建实时用户画像

热门文章

  1. python爬取b站弹幕分析_python爬取B站视频弹幕分析并制作词云
  2. 为什么说Pravega是流处理统一批处理的最后一块拼图?
  3. 误删除VMware虚拟机vmdk文件的恢复方法
  4. Win XP文件夹拒绝访问的解决方法 - 年轻无极限 - 51CTO技术博客
  5. Windows7下硬盘安装RHEL 6.1
  6. C#窗体间的数据传值(转)
  7. indexOf 方法
  8. su 、 sudo 命令及限制 root 远程登录
  9. 应“云”而生--云时代的运维新理念
  10. 怎样为企业挑选正确的EDR解决方案