上文我们介绍了RateLimiter文章路径针对IP来限流的方式,上文的限流方案,只针对单应用情况,分布式集群下就不能使用上文的方式,分布式下的限流方案有很多种,这边展示的是Redis的封装redission框架。
可以这么讲,jdk中的juc包提供的是单机版的并发业务。那么Redisson基本是基于juc实现的分布式的业务。

一:Redission官网

我们先去Redission官网喵喵
redission官方地址

我们可以看到wiki提供了很多功能介绍,
分布式锁等,我们这篇文章主要讲限流。进入正题

我们看到如下有个限流器,我们点进去看看

1:依赖版本

org.redisson.redisson 3.11.4

<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.11.4</version>
</dependency>

2:限流器(RRateLimiter)

仔细看,它和谷歌guava实现的限流器(RateLimiter)名字很像,多了一个R。它支持在分布式环境下,现在调用方的请求频率,可以实现不同Redisson实例下的多线程限流,也适用于相同实例Redisson下的多线程限流。是非公平性阻塞。

3:API接口文档


当前版本提供了6个方法,其余都是重载
针对每个方法的注释讲解下作用

3.1:acquire()

根据注释过来的意思:
从此RateLimiter处获取许可,直到获得一个许可为止。
可以知道,是一个阻塞限流,直到获取到令牌。那我们可以对其分析一下源代码

首先我们看下注释

3.1.1:tryAcquire()

仅在调用时可用时才获得许可。
如果有许可证,则获取许可证,然后立即返回,其值为true,将可用许可证的数量减少一个。
如果没有可用的许可,则此方法将立即返回值false。
返回值:
true如果获得许可证,false 否则

3.1.2:tryAcquire(long permits)

permits仅在调用时全部可用时才获取给定数量。
如果所有许可证都可用,则获取许可证,然后立即返回值true,将值减少给定许可证数量的可用许可证数量。
如果没有可用的许可证,则此方法将立即返回值false。
参数:
permits -获得许可证的数量
返回值:
true如果获得许可证,false 否则

4:源码分析

分析之前我们先把依赖引入maven项目并分析,见步骤二:
根据官方文档,
添加测试类


@Autowired
private RedissonClient redissonClient;RRateLimiter myRateLimter = redissonClient.getRateLimiter("myRateLimter");
myRateLimter.tryAcquire();


源码:

我们进入实现类看看:


我们这里可以看到都给到了一个默认值1,一个许可证的数量
我们从它内部调用的方法见:public RFutrue acquireAsync(long permits);分析一下

先创建异步计算对象promise ,然后再调用方法tryAcquireAsync(permits, -1, null),然后将其结果,通过RFutrue返回。
我们再看看它的重载方法

public RFuture<Boolean> tryAcquireAsync(long permits, long timeout, TimeUnit unit)


看内部的代码,该对象,基本上所有的方法都是最终指向了上方法,只是参数,我们都封装好了。便于直接调用。

我们现在来分析下如上的方法到底做了什么事情
我们设定的超时时间统一转换为毫秒值,如果是-1,则不转换,直接为-1.
再定义一个异步任务,传递到tryAcquireAsync(permits, promise, timeoutInMillis);
我们看看源码
首先我们注意到的是,当前方法是私有的
第一行代码,

long s = System.currentTimeMillis();
就是记录该方法的起始时间

第二行代码


RFuture<Long> future = this.tryAcquireAsync(RedisCommands.EVAL_LONG, permits);
进入该方法,发现这个内部代码执行的是一个lua脚本.


这个就是redis获取令牌的命令,其中会判断是否超出获取许可数量。然后将其获取结果放回。返回的结果就是一个Long类型数据,那么我们再通过异步计算,判断其返回结果是否为成功?是否等于NULL?

看源码的条件是if else if,多种条件多种处理。
1:首先判断c是否等于null

如果不等于null,则代表获取到许可证,则立即返回

2:判断delay是否等于null
判断是否延迟,如果delay等于null,则将其异步标记为成功,也立即返回

3:判断超时时间是否为-1

仔细查看代码
如果为-1,则进入递归任务,再获取一次许可,直至退出递归

4: 如果超时时间也不为-1,说明是有确切的超时时间

那看代码,有判断当前消耗的时间是否大于当前任务设定的超时时间

4.1:判断当前时间

如果当前时间已经大于,则将立即返回,并标记结果失败

4.2:如果没有超时,那判断当前剩余的超时时间,是否小于延迟时间(delay)

小于:则结束,并标记失败

大于:则判断当前过去时间是否小于等于剩余超时时间,如果小于等于,则返回,标记失败,否则,则进入下个递归,并且传入剩余超时时间为最新超时时间。

getConfig()
返回此RateLimiter对象的当前配置。

acquire()
从该速率限制器获取许可证,直到有许可证可用为止。
获得许可证(如果有),并立即返回,将可用许可证的数量减少一个

acquire(long permits)
从该速率限制器获取指定的许可证,直到有一个可用的许可证为止。
获取给定数量的许可证(如果可用),并立即返回,将可用许可证的数量减少给定的数量。
参数:
许可证-获得许可证的数量

仔细看看方法是不是如上面源码所说的,都封装统一调用方法

见源码:

trySetRate(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit)

初始化RateLimiter的状态并将配置存储到Redis服务器。
参数:
mode–速率模式
rate–速率
rateInterval–速率时间间隔
rateIntervalUnit–速率时间间隔单位
退货:
如果设置了速率为true,否则为false
代码中的例子:


// var1设置访问速率实例,var2为访问数,var3为单位时间,var4为时间单位
// 每10秒产生1个令牌 总体限流
// 创建令牌桶数据模型
rateLimiter.trySetRate(limit.mode(), limit.count(), limit.period(), RateIntervalUnit.SECONDS);


所有实例共享(RateType.OVERALL所有实例共享、RateType.CLIENT单实例端共享)

二:集成Redission

pom文件添加如下依赖:

<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.11.4</version>
</dependency>

application.yml添加如下配置

redission-dev.yml

# 单节点设置
singleServerConfig:address: redis://127.0.0.1:6379database: 2password: 123456#  (连接空闲超时,单位:毫秒)如果当前连接池里的连接数量超过了最小空闲连接数,而同时有连接空闲时间超过了该数值,那么这些连接将会自动被关闭,并从连接池里去掉。时间单位是毫秒。idleConnectionTimeout: 10000#  未知pingTimeout: 1000#连接间隔 心跳pingConnectionInterval: 1000#  (连接超时,单位:毫秒)同节点建立连接时的等待超时。时间单位是毫秒。connectTimeout: 10000#  (命令等待超时,单位:毫秒)等待节点回复命令的时间。该时间从命令发送成功时开始计时。timeout: 3000#  (命令失败重试次数)如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。retryAttempts: 3#  (命令重试发送时间间隔,单位:毫秒)在某个节点执行相同或不同命令时,连续 失败 failedAttempts(执行失败最大次数) 时,该节点将被从可用节点列表里清除,直到 reconnectionTimeout(重新连接时间间隔) 超时以后再次尝试。retryInterval: 1500#  (重新连接时间间隔 单位:毫秒)reconnectionTimeout: 3000#  (执行失败最大次数)failedAttempts: 3#  (客户端名称)clientName: null# 发布和订阅连接的最小空闲连接数 默认1 多从节点的环境里,每个 从服务节点里用于发布和订阅连接的最小保持连接数(长连接)。Redisson内部经常通过发布和订阅来实现许多功能。长期保持一定数量的发布订阅连接是必须的。subscriptionConnectionMinimumIdleSize: 1# 发布和订阅连接池大小 默认50  多从节点的环境里,每个 从服务节点里用于发布和订阅连接的连接池最大容量。连接池的连接数量自动弹性伸缩。subscriptionConnectionPoolSize: 10# 单个连接最大订阅数量 默认5subscriptionsPerConnection: 5# 最小空闲连接数 默认32,现在暂时不需要那么多的线程 多从节点的环境里,每个 从服务节点里用于普通操作(非 发布和订阅)的最小保持连接数(长连接)。长期保持一定数量的连接有利于提高瞬时读取反映速度。connectionMinimumIdleSize: 5# (连接池大小) 默认64,现在暂时不需要那么多的线程 在启用该功能以后,Redisson将会监测DNS的变化情况。connectionPoolSize: 20# (线程池数量)这个线程池数量被所有RTopic对象监听器,RRemoteService调用者和RExecutorService任务共同共享。threads: 0#(Netty线程池数量) 这个线程池数量是在一个Redisson实例内,被其创建的所有分布式数据类型和服务,以及底层客户端所一同共享的线程池里保存的线程数量。
nettyThreads: 0#(编码)Redisson的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在Redis里的读取和存储。# FstCodec  10倍于JDK序列化性能而且100%兼容的编码
#codec:
#  class: com.efh.otms.common.serializer.redisson.FstCodec#  (传输模式)默认 NIO
transportMode: NIO

三:编写代码实现

添加AOP实现
我们定义一个限流类型的枚举

3.1 枚举

/*** @author Gaci* @className LimiterTypeEnum* @description 定义一个限流类型的枚举* @date 2021/5/18 14:51**/
public enum LimiterTypeEnum {/*** 传统类型*/CUSTOMER,/***  根据 IP地址限制*/IP
}

3.2 定义一个自定义的限流注解

import com.peng.sentinel.enums.LimiterTypeEnum;
import org.redisson.api.RateType;import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** @author Gaci* @className RedisRLimiter* @description 限流自定义注解* @date 2021/5/18 14:49**/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisRLimiter {/*** 资源名称,用于描述接口功能*/String name() default "";/*** 限制访问次数(单位时间内产生的令牌数)*/int count();/*** 时间间隔,单位秒*/int period();/*** 资源 key*/String key() default "";/*** 限制类型(ip/方法名)*/LimiterTypeEnum limitType() default LimiterTypeEnum.CUSTOMER;/*** RRateLimiter 速度类型* OVERALL,    // 所有客户端加总限流* PER_CLIENT; // 每个客户端单独计算流量* @return*/RateType mode() default RateType.PER_CLIENT;

3.3 切面类

import com.peng.sentinel.GaciException;
import com.peng.sentinel.annotation.RedisRLimiter;
import com.peng.sentinel.util.HttpContextUtils;
import com.peng.sentinel.util.IPHelper;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RRateLimiter;
import org.redisson.api.RateIntervalUnit;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.servlet.http.HttpServletRequest;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;/*** @author Gaci* @className RedisRLimiterAspect* @description 类描述* @date 2021/5/18 14:55**/
@Aspect
@Component
public class RedisRLimiterAspect {private static final String REDIS_LIMIT_KEY_HEAD = "limit";private static Logger logger = LoggerFactory.getLogger(RedisRLimiterAspect.class);@Autowiredprivate RedissonClient redisson;// 切入点@Pointcut("@annotation(com.peng.sentinel.annotation.RedisRLimiter)")public void pointcut() {}@Around("pointcut()")public Object around(ProceedingJoinPoint point) throws Throwable {HttpServletRequest request = HttpContextUtils.getHttpServletRequest();MethodSignature signature = (MethodSignature) point.getSignature();Method method = signature.getMethod();RedisRLimiter limit = method.getAnnotation(RedisRLimiter.class);String ip = IPHelper.getIpAddr();String key;switch (limit.limitType()) {case IP:// ip类型key = ip;break;case CUSTOMER:// 传统类型,采用注解提供keykey = limit.key();break;default:// 默认采用方法名key = StringUtils.upperCase(method.getName());}
//        ImmutableList keys = ImmutableList.of(StringUtils.join(REDIS_LIMIT_KEY_HEAD, limit.prefix(), ":", ip, key));// 生成keyfinal String ofRateLimiter = REDIS_LIMIT_KEY_HEAD + ip + key;RRateLimiter rateLimiter = redisson.getRateLimiter(ofRateLimiter);// 根据官方文档的描述/** RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter");* // 初始化* // 最大流速 = 每1秒钟产生10个令牌* rateLimiter.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS);* // 需要1个令牌if(rateLimiter.tryAcquire(1)){// TODO:Do something}*/// 设置访问速率,var2为访问数,var4为单位时间,var6为时间单位// 每10秒产生1个令牌 总体限流// 创建令牌桶数据模型rateLimiter.trySetRate(limit.mode(), limit.count(), limit.period(), RateIntervalUnit.SECONDS);// permits 允许获得的许可数量 (如果获取失败,返回false) 1秒内不能获取到1个令牌,则返回,不阻塞// 尝试访问数据,占数据计算值var1,设置等待时间var3// acquire() 默认如下参数 如果超时时间为-1,则永不超时,则将线程阻塞,直至令牌补充// 此处采用3秒超时方式,服务降级if (!rateLimiter.tryAcquire(1, 2, TimeUnit.SECONDS)) {logger.error("IP【{}】访问接口【{}】超出频率限制,限制规则为[限流模式:{}; 限流数量:{}; 限流时间间隔:{};]",ip, method.getName(), limit.mode().toString(), limit.count(), limit.period());throw new GaciException("接口访问超出频率限制,请稍后重试");}return point.proceed();}

3.4 controller

相关代码




启动项目进行测试
访问页面测试
第一次访问

第二次访问:由于5秒生成一个令牌,导致5秒内令牌已经被获取,提示错误

更细节的功能以及要求,请自行根据需求编写代码

演示就到这里

基于Redis组件之分布式RateLimiter限流相关推荐

  1. 那些年被欺骗的感情---分布式实现限流操作(上)

    几年前的春运 抢火车票 我们不得不说12306的网站 大家每年都要通过这个网站购买火车票 这个系统在春运期间承受的访问压力那是相当大 毕竟不是很多都会去参加双11 但是大家过年总是要回老家的 话说降低 ...

  2. 好代码实践:基于Redis的轻量级分布式均衡消费队列

    简介: 好代码,给人第一个印象的感觉,就像一篇好文章一样,读起来朗朗上口.不同的文章有不同的风格体裁,不同的代码也有不同的编程风格要求.Python有严格的缩进,像诗歌一样工整对仗:C语言面向过程像散 ...

  3. redistemplate分布式锁实现_基于 Redis SETNX 实现分布式锁

    环境与配置 Redis 任意版本即可 SpringBoot 任意版本即可,但是需要依赖 spring-boot-starter-data-redis <dependency><gro ...

  4. 基于 Redis 实现的分布式锁

    点击上方 好好学java ,选择 星标 公众号 重磅资讯.干货,第一时间送达 今日推荐:我的大学到研究生自学 Java 之路,过程艰辛,不放弃,保持热情,最终发现我是这样拿到大厂 offer 的! 作 ...

  5. 基于redis的简易分布式爬虫框架

    代码地址如下: http://www.demodashi.com/demo/13338.html 开发环境 Python 3.6 Requests Redis 3.2.100 Pycharm(非必需, ...

  6. 哨兵 双向 java_SpringCloud微服务:Sentinel哨兵组件,管理服务限流和降级

    一.基本简介 1.概念描述 Sentinel 以流量为切入点,从流量控制.熔断降级.系统负载保护等多个维度保护服务的稳定性.包括核心的独立类库,监控台,丰富的使用场景验证.(这似乎是阿里开源组件的一贯 ...

  7. 通过acquire方法看懂RateLimiter限流机制

    通过acquire方法看懂RateLimiter限流机制 关键方法 1)resync,动态计算剩余令牌数量和下次发放时间: 2)reserveEarliestAvailable,预定令牌,允许超发,超 ...

  8. Guava之RateLimiter限流

    RateLimter是什么,我们为什么需要用到它,以物流系统作为例子:比如系统有一个物流信息查询接口,提供给第三方调用,接口暴露在公网,会出现什么问题,大致讲下如下问题: 1.大量正常用户高频访问导致 ...

  9. java令牌_基于令牌桶算法的Java限流实现

    项目需要使用限流措施,查阅后主要使用令牌桶算法实现,为了更灵活的实现限流,就自己实现了一个简单的基于令牌桶算法的限流实现. 令牌桶算法描述 令牌桶这种控制机制基于令牌桶中是否存在令牌来指示什么时候可以 ...

最新文章

  1. 一个成熟的网站模板如何引起用户的注意?
  2. linux增加epel源,yum安装nignx,脚本安装mysql服务端,shell脚本监控网站页面
  3. [WCF] - 使用 [DataMember] 标记的数据契约需要声明 Set 方法
  4. 《学习之道》第十三章练习大脑,改变思维
  5. pytorch使用Ray-tune对原有训练模型的代码改写,自动调参(一)
  6. Windows Server入门系列之三 硬盘分区
  7. x为正变数,求y=x^3/(x^4+4)的最大值
  8. RuntimeError: all elements of input should be between 0 and 1
  9. [转自他人]一款好用的软件安装管理器
  10. VGGNet网络结构学习
  11. 数据库:园林软件(综合类题库)
  12. 阿里云对象存储OSS是怎么收费的?
  13. IB幼儿课程怎么理解?
  14. flash写保护原理_stm32对flash的读写保护与解除
  15. 用Python做一个游戏辅助脚本,完整编程思路分享
  16. 开源工作流引擎 Workflow Core 的研究和使用教程
  17. 借贷系统后台操作说明
  18. 基于ETH创建自己的代币
  19. 网络流(最大流和最小费用流)
  20. 非对称加密(RSA、数字签名、数字证书)

热门文章

  1. 最新的单片机_关于单片机通过蓝牙将数据传输给手机并在app上面显示出来怎么实现...
  2. [数据压缩作业6]DPCM压缩系统的实现和分析
  3. mongrel_cluster功能介绍及安装方法
  4. Kotlin的高价函数—apply、aslo、let、run的使用总结
  5. mysql函数之ltrim(),rtrim(),trim()
  6. 磁共振T1 T2 T1WI T2WI含义
  7. Android剪切板工具clipper,自动化
  8. React Native广告头条广告穿山甲广告集成
  9. 关于SQL练习2的一些写法。。。
  10. 【已开源】Qt 艾宾浩斯(Ebbinghaus)记忆 软件