微服务架构发展到今天,各个服务之前的调用、接口请求越来越频繁,服务器承受的压力自然也越来越大。如果放任所有请求请求到服务器,不管是服务器也好还是数据库也好,都可能被因为无法承受大批量的请求而阻塞、宕机甚至是GG,尤其是一些查询接口请求特别频繁的情况下,是需要对请求进行一定限制的。

譬如:钉钉打卡数据,动辄几十上百万数据,这种数据一秒钟来个几百次请求,钉钉估计也够呛,调用频率肯定有限制。一些支付流水,对账单也是这个原理,在请求方面都有一些限制。

还有,服务接口限制,频繁点击提示亦或是Tomcat对超出限制的请求进行丢弃等,都是为了保护服务而实行的一些保护措施。

之前已经说了漏桶算法、令牌桶算法、滑动时间窗格,这些都是限流实现的一种方式,今天,我们来看一看JUC给我们提供的一个限流工具类——Semaphore!

先写一个简单的Semaphore使用,大家先对Semaphore有个印象,当然比较熟悉的直接略过哈~

  public static void main(String[] args) {//创建Semaphore,需要指定permits,也就是许可证,或者说信号量Semaphore semaphore = new Semaphore(3);// 线程一获取两个许可证(信号量)new Thread(()->{System.out.println(Thread.currentThread().getName()+"开始执行了");boolean flag = semaphore.tryAcquire(2);if (flag){System.out.println(Thread.currentThread().getName() + "成功获取到两个信号量,休息10s");try {TimeUnit.SECONDS.sleep(20);semaphore.release(2);} catch (InterruptedException e) {e.printStackTrace();}}}).start();// 线程二获取两个许可证(信号量)new Thread(()->{try {System.out.println(Thread.currentThread().getName() + "开始执行了");int i = semaphore.availablePermits();System.out.println("剩余信号量 = " + i);// 许可证不够,会进行等待尝试,直到25S之后。刚开始有3个许可证,上面线程获取到了2个,除非// sleep之后释放,否则这里无法获取到足够的许可证。有些类似令牌桶boolean flag = semaphore.tryAcquire(2,25, TimeUnit.SECONDS);if (flag) {System.out.println(Thread.currentThread().getName() + "成功获取到两个信号量,休息10s");TimeUnit.SECONDS.sleep(10);}} catch (Exception e){e.printStackTrace();}}).start();}

Semaphore的核心就是permits!也就是许可证,或者说信号量!创建Semaphore的时候,必须指定许可证的数量,有点类似令牌桶,就是你先放好令牌,任务来了,先去取令牌,拿到令牌才能执行任务,没令牌就等着或者直接尥蹶子,返回失败~

下面就来看一看Semaphore的源码,看一看大佬Doug Lea是怎么实现限流滴。

 //构造方法必须传许可证数量,默认非公平public Semaphore(int permits) {sync = new NonfairSync(permits);}static final class NonfairSync extends Sync {// 默认非公平实现NonfairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}}Sync(int permits) {setState(permits);}protected final void setState(int newState) {state = newState;}

我们可以看到,新建Semaphore中传递的permits最终也就是设置state的数值!也就是说这个permits就是AbstractQueuedSynchronizer(AQS)中共享锁的个数!

接着就是尝试获取信号量了。

 // 获取信号量public boolean tryAcquire(int permits) {// 获取小于0的信号量,完全没意义,异常if (permits < 0) throw new IllegalArgumentException();// 是否获取成功return sync.nonfairTryAcquireShared(permits) >= 0;}// 非公平尝试获取共享锁final int nonfairTryAcquireShared(int acquires) {// 无限循环for (;;) {// 获取当前剩余的信号量int available = getState();int remaining = available - acquires;// 如果剩余信号量小于零,说明不够,直接返回,||后面部分不执行,上文会// 根据>=0为成功,小于零自然是失败if (remaining < 0 ||// CAS 设置剩余信号量,设置成功返回否则重新循环compareAndSetState(available, remaining))return remaining;}}

信号量其实也就是共享锁个数或者说state的数值,尝试去获取就先看下state是否大于需要的permits,小于的话自然是失败。如果大于或者等于,需要使用CAS进行替换,因为这里的remaining缓存到本线程了,会有并发问题,接着,CAS成功就返回,失败就重新执行这个过程。

当然,这种只是最简单的获取方式,一般是会在某个时间段内获取,超时之后才算失败。也就是semaphore.tryAcquire(2,25, TimeUnit.SECONDS),一起来瞅瞅~

// 尝试获取信号量,过期时间以及单位
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException {//信号量小于0,无意义,异常if (permits < 0) throw new IllegalArgumentException();return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));}
// 固定时间内尝试获取信号量
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();return tryAcquireShared(arg) >= 0 ||doAcquireSharedNanos(arg, nanosTimeout);}
//上文已讲过,不多说protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}
// 固定时间内获取arg个信号量
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {// 时间过期,结束返回if (nanosTimeout <= 0L)return false;//当前系统时间加过期时间,获取结束时间点final long deadline = System.nanoTime() + nanosTimeout;//新建共享节点,关联当前线程,并添加到同步队列尾部,//如果队列为空,执行(enq),初始化同步队列,并将节点添加到尾部final Node node = addWaiter(Node.SHARED);boolean failed = true;try {// 循环for (;;) {// 当前节点的前一个节点final Node p = node.predecessor();//如果节点是头结点,获取信号量并返回if (p == head) {int r = tryAcquireShared(arg);// 获取成功if (r >= 0) {// 设置头结点setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return true;}}// 剩余时间nanosTimeout = deadline - System.nanoTime();// 过期,直接返回失败if (nanosTimeout <= 0L)return false;// 尝试获取信号量失败,清理同步队列中无用或者已经线程结束的节点// 并且剩余时间要大于1000ns,否则不用中断,继续循环即可,1000ns太短// 程序执行需要时间,可以直接进行循环处理if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);//中断异常if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}

这个获取只要分两步,如果前置节点就是头节点,也就是说就只有当前线程获取信号量,直接去获取即可。第二步,如果不是头结点,自然要排队等待,节点状态设置为Node.SIGNAL,同时清理同步队列中已经状态waitStatus > 0的节点,然后线程中断,等待唤醒。

其中还有一个setHeadAndPropagate方法,主要的意思就是,上一个线程释放的信号量,可能当前线程获取之后还有盈余,于是,考虑唤醒下一个等待的线程继续获取信号量。这个方法个人感觉理解也是有点绕,推荐一个博客,想深究的可以去瞅下

//设置队列头,并检查后续队列是否正在等待
private void setHeadAndPropagate(Node node, int propagate) {// 当前节点获取到共享锁,成为头节点Node h = head;setHead(node);//propagate > 0 说明有剩余的信号量,后续节点可以尝试获取信号量,故要doReleaseShared//propagate == 0 且h.waitStatus < 0当时tryAcquireShared后没有共享锁剩余,但之后的时刻很可能又有共享锁释放出来了。if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}}

接着就是信号量释放了,也就是相当于令牌桶,走完通道,令牌肯定要放回桶里呗

    //信号量释放public void release(int permits) {// 释放小于0,完全没意义,异常if (permits < 0) throw new IllegalArgumentException();// 释放信号量sync.releaseShared(permits);}// 释放信号量(共享锁)public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}// Semaphore实现protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;// 要么,release是负数,要么就是超出位数限制,导致比当前还小,// 异常:超过最大允许计数if (next < current) // overflowthrow new Error("Maximum permit count exceeded");//CAS替换if (compareAndSetState(current, next))return true;}}

释放信号量的过程也比较简单,就是state重新加上要释放的信号量,然后进行CAS替换即可,当然,还要走一个必须的过程!doReleaseShared!

//释放共享锁
private void doReleaseShared() {//循环for (;;) {Node h = head;//至少有两个node,就一个节点,也不用释放了if (h != null && h != tail) {//获取头结点状态int ws = h.waitStatus;// 头结点是SIGNAL,CAS唤醒if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck casesunparkSuccessor(h);}//如果状态为0,说明h的后继所代表的线程已经被唤醒或即将被唤醒,并且这个中间状态即将消失,要么由于acquire thread获取锁失败再次设置head为SIGNAL并再次阻塞,要么由于acquire thread获取锁成功而将自己(head后继)设置为新head并且只要head后继不是队尾,那么新head肯定为SIGNAL。所以设置这种中间状态的head的status为PROPAGATE,让其status又变成负数,这样可能被被唤醒线程检测到else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}// head不变退出循环,不然会执行多次循环。if (h == head)                   // loop if head changedbreak;}}

Semaphore的主要逻辑就是这样,如果只是简单的尝试获取信号量的话,就是直接修改state!不过一般这种肯定是会有一个尝试的过期时间(timeout)的,也就是semaphore.tryAcquire(2,25, TimeUnit.SECONDS);

这个时候,就需要用到CLH队列了,也就是阻塞等待,由于每次尝试获取的信号量不一致,可能一个线程释放6个信号量,然后后面三个线程,每个都获取两个,线程就会在获取的时候一次被唤醒,这里的逻辑看起来就比较绕,最好是本地造一些数据,debug看下,这样应该能更好的 理解Semaphore中信号量的获取与释放。

好了,到这里就game over了,感觉还行的话,点个赞呗~

no sacrifice,no victory~

瞅一瞅JUC提供的限流工具Semaphore相关推荐

  1. java每秒限流_java限流工具类

    代码 import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.ConcurrentHashM ...

  2. linux限流工具,Linux限流工具之pv

    pv是一款Liunx下的限流工具,可以使用该工具查看任务进度,传输速率,使用 pv示例 # 我的MySQL wing@3306>select count(*) from t; +-------- ...

  3. java计算限流工具

    一.引言 许多需求计算量都在扩大,比如合同下的门店会有三四千个,计算这些门店的数据在进行聚合,对于服务的内存和接口执行时间有着很大的影响. 针对越来越大容量.并发高的接口或者其他计算方法,同一时间在运 ...

  4. 阿里限流工具 Sentinel

    2019独角兽企业重金招聘Python工程师标准>>> https://github.com/alibaba/Sentinel public static void main(Str ...

  5. 高可用架构之限流降级

    一.服务等级协议 我们常说的N个9,就是对SLA的一个描述. SLA全称是ServiceLevel Agreement,翻译为服务水平协议,也称服务等级协议,它表明了公有云提供服务的等级以及质量. 例 ...

  6. 从构建分布式秒杀系统聊聊限流特技

    前言 俗话说的好,冰冻三尺非一日之寒,滴水穿石非一日之功,罗马也不是一天就建成的.两周前秒杀案例初步成型,分享到了中国最大的同 性友网站-码云.同时也收到了不少小伙伴的建议和投诉.我从不认为分布式.集 ...

  7. 阿里云二面:你对限流了解多少?

    今天来说说限流的相关内容,包括常见的限流算法.单机限流场景.分布式限流场景以及一些常见限流组件. 当然在介绍限流算法和具体场景之前我们先得明确什么是限流,为什么要限流?. 任何技术都要搞清它的来源,技 ...

  8. Guava RateLimiter限流源码解析和实例应用

    2019独角兽企业重金招聘Python工程师标准>>> 在开发高并发系统时有三把利器用来保护系统:缓存.降级和限流 缓存 缓存的目的是提升系统访问速度和增大系统处理容量 降级 降级是 ...

  9. SpringBoot 2.0 + 阿里巴巴 Sentinel 动态限流实战

    转载 https://www.cnblogs.com/smallSevens/p/11531534.html 前言 在从0到1构建分布式秒杀系统和打造十万博文系统中,限流是不可缺少的一个环节,在系统能 ...

最新文章

  1. fileupload控件在ajax中无法使用
  2. Linux设备驱动之I/O端口与I/O内存
  3. 基于Apache OLTU的OAuth2.0授权解决方案
  4. 《Spring Boot官方指南》28.安全
  5. springboot整合PageHelper实现分页效果
  6. c语言设计题库及详解答案,c语言程序设计题库及其答案
  7. 深入浅出SpringSecurity
  8. java maven 读写pdf_Java向PDF模板写入数据
  9. 各种 Python 库/模块/工具
  10. QBXT 2018春季DP图论班 2018.5.4 --- 树形DP
  11. 中国石油大学《催化原理》第三阶段在线作业
  12. MacBook更新系统空间不足/无限重启/无法退出更新程序
  13. 微服务之RPC(远程过程调用)的四种方式
  14. ultraiso制作u盘启动盘教程图文详解纯净-U盘启动教程
  15. 怀院计科院1901班NICE组限时答题程序
  16. 斯须改变如苍狗——一张图的随想
  17. Eclipse中使用Ant
  18. 小程序如何把一个页面当作组件来引用,就是既可以当作一个组件也能当一个页面使用
  19. “数据+算法”双轮驱动,自助分析平台助银行实现“智慧转型”
  20. JS 中文数转数字练习

热门文章

  1. CodePen 和JSFiddle
  2. Java连接SMB服务器异常
  3. apache karaf quick start
  4. FastText的简单介绍
  5. 【iMessage苹果推】日历推OS署名第三方签名插件ME环境利用第二个封装形式(Saveforadhocdepleopment)
  6. 四大组件 — Activity启动模式
  7. c语言 case语句用法,switch case语句的用法
  8. VS2019/MFC编程入门之对话框:向导对话框的创建及显示
  9. 微服务架构之:Redisson分布式可重入锁原理
  10. silk v3 decoder php,解码转换QQ微信的SILK v3编码音频为MP3或其他格式