redis实现令牌桶的正确姿势

  • 场景描述
  • 解决方案

最近需要自己做一个限流功能,其他业务代码都好说。唯一的终点就是限流实现,想到redis可以实现令牌桶。一拍脑门,就用它了!

场景描述

真实开发中才发现想的太简单,如果是基于redis提供的命令在代码中调用的话,效率是小事。原子性根本无法保证!如下:

  1. 线程1 获取到令牌桶获取总数为10
  2. 线程1 消耗1个令牌,剩余9个令牌
  3. 线程2 获取到令牌桶获取总数为10
  4. 线程1 刷新令牌为9个
  5. 线程2 消耗1个令牌,剩余9个令牌
  6. 线程2 刷新令牌为9个

原子性完全无法保证。加锁? 那多节点岂不是需要在引入分布式锁?看来服务器中实现不可取,保证原子性的话,势必要写LUA代码执行了。网上翻阅了一些demo,没找到想要的。想到自己搭建的spring cloud gateway是有内置令牌桶实现的。开始翻阅源码,终于找到最好的方案。

gateway中redis令牌桶实现类是:org.springframework.cloud.gateway.filter.ratelimit.RedisRateLimiter

public Mono<Response> isAllowed(String routeId, String id) {...// 这行是通过lua判断是否被限流Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);...}

顺着这个类找到lua代码是从org.springframework.cloud.gateway.config.GatewayRedisAutoConfiguration注入进来的

public RedisScript redisRequestRateLimiterScript() {DefaultRedisScript redisScript = new DefaultRedisScript<>();redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("META-INF/scripts/request_rate_limiter.lua")));redisScript.setResultType(List.class);return redisScript;}

核心在request_rate_limiter.lua这个文件中

-- 获取到限流资源令牌数的key和响应时间戳的key
local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)
-- 分别获取填充速率、令牌桶容量、当前时间戳、消耗令牌数
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
-- 计算出失效时间,大概是令牌桶填满时间的两倍
local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now " .. ARGV[3])
--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
--redis.log(redis.LOG_WARNING, "ttl " .. ttl)
-- 获取到最近一次的剩余令牌数,如果不存在说明令牌桶是满的
local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil thenlast_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)
-- 上次消耗令牌的时间戳,不存在视为0
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil thenlast_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)
-- 计算出间隔时间
local delta = math.max(0, now-last_refreshed)
-- 剩余令牌数量 =  “令牌桶容量” 和 “最后令牌数+(填充速率*时间间隔)”之间的最小值
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
-- 如果剩余令牌数量大于等于消耗令牌的数量则流量通过,否则不通过
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed thennew_tokens = filled_tokens - requestedallowed_num = 1
end--redis.log(redis.LOG_WARNING, "delta " .. delta)
--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)
-- 最后保存数据现场
if ttl > 0 thenredis.call("setex", tokens_key, ttl, new_tokens)redis.call("setex", timestamp_key, ttl, now)
end-- return { allowed_num, new_tokens, capacity, filled_tokens, requested, new_tokens }
return { allowed_num, new_tokens }

解决方案

好了,可以开始写自己的限流工具类了。

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;import java.time.Instant;
import java.util.Arrays;
import java.util.List;/*** redis限流器**/
public class MyRedisLimiter{private RedisTemplate redisTemplate;private static final Long SUCCESS_FLAG = 1L;/*** 判断是否允许访问*@param id 这次获取令牌桶的id*@param rate 每秒填充速率*@param capacity 令牌桶最大容量*@param tokens 每次访问消耗几个令牌*@return true 允许访问 false 不允许访问*/public boolean isAllowed(String id,int rate,int capacity,int tokens){RedisScript<Long> redisScript = new DefaultRedisScript<>(SCRIPT,Long.class);Object result = redisTemplate.execute(redisScript,getKey(id),rate, capacity,Instant.now().getEpochSecond(), tokens);return SUCCESS_FLAG.equals(result);}private List<String> getKey(String id){String prefix = "limiter:"+id;String tokenKey = prefix + ":tokens";String timestampKey = prefix + ":timestamp";return Arrays.asList(tokenKey, timestampKey);}private static final String SCRIPT = "local tokens_key = KEYS[1]\n" +"local timestamp_key = KEYS[2]\n" +"local rate = tonumber(ARGV[1])\n" +"local capacity = tonumber(ARGV[2])\n" +"local now = tonumber(ARGV[3])\n" +"local requested = tonumber(ARGV[4])\n" +"local fill_time = capacity/rate\n" +"local ttl = math.floor(fill_time*2)\n" +"local last_tokens = tonumber(redis.call('get', tokens_key))\n" +"if last_tokens == nil then\n" +"  last_tokens = capacity\n" +"end\n" +"local last_refreshed = tonumber(redis.call('get', timestamp_key))\n" +"if last_refreshed == nil then\n" +"  last_refreshed = 0\n" +"end\n" +"local diff_time = math.max(0, now-last_refreshed)\n" +"local filled_tokens = math.min(capacity, last_tokens+(diff_time*rate))\n" +"local allowed = filled_tokens >= requested\n" +"local new_tokens = filled_tokens\n" +"local allowed_num = 0\n" +"if allowed then\n" +"  new_tokens = filled_tokens - requested\n" +"  allowed_num = 1\n" +"end\n" +"if ttl > 0 then\n" +"  redis.call('setex', tokens_key, ttl, new_tokens)\n" +"  redis.call('setex', timestamp_key, ttl, now)\n" +"end\n" +"return allowed_num\n";
}

完事儿,这个令牌桶实现完全是参考的spring cloud gateway,食用起来也比较放心。
简单描述一下这个工具类使用的效果吧。

 isAllowed("testId1",1,60,1);

上面这个描述代表:令牌桶testId1。每分钟可通过访问60次。

当然这是理想情况。极限情况的话,应该是可以访问120次的。极限场景如下

  • testId1令牌桶未被使用的时间>=60秒
  • testId1令牌桶在某一刻开始被使用
  • 令牌被消耗的同时也在被填充
  • 在最初被使用的60秒内,令牌桶初始有60个令牌,使用期限有填充了60个

我个人理解,令牌桶主要是为了保证使用速率。对于上面这个场景,到底算不算bug。看每个人的使用情况。不过我已经对上述情况做了解决,只需要如下的几点小改动。

  1. 传入周期时间内最大流量数(周期时间:桶容量60,填充速率1/s,那么周期时间=60s)
  2. 获取key的方法增加一个记录周期时间内数量的key
  3. 修改lua脚本

改动如下:

/*** 判断是否允许访问*@param id 这次获取令牌桶的id*@param rate 每秒填充速率*@param capacity 令牌桶最大容量*@param tokens 每次访问消耗几个令牌*@param maxCount 周期时间内最大访问量*@return true 允许访问 false 不允许访问*/public boolean isAllowed(String id,int rate,int capacity,int tokens,int maxCount){RedisScript<Long> redisScript = new DefaultRedisScript<>(SCRIPT,Long.class);Object result = redisTemplate.execute(redisScript,getKey(id),rate, capacity,Instant.now().getEpochSecond(), tokens,maxCount);return SUCCESS_FLAG.equals(result);}private List<String> getKey(String id){String prefix = "limiter:"+id;String tokenKey = prefix + ":tokens";String timestampKey = prefix + ":timestamp";String countKey = prefix + ":count";return Arrays.asList(tokenKey, timestampKey,countKey);}private static final String SCRIPT = "local tokens_key = KEYS[1]\n" +"local timestamp_key = KEYS[2]\n" +"local count_key = KEYS[3]\n" +"local rate = tonumber(ARGV[1])\n" +"local capacity = tonumber(ARGV[2])\n" +"local now = tonumber(ARGV[3])\n" +"local requested = tonumber(ARGV[4])\n" +"local min_max = tonumber(ARGV[5])\n" +"local fill_time = capacity/rate\n" +"local ttl = math.floor(fill_time*2)\n" +"local has_count = tonumber(redis.call('get', count_key))\n" +"if has_count == nil then\n" +"  has_count = 0\n" +"end\n" +"if has_count >= min_max then\n" +"return 0\n" +"end\n" +"local last_tokens = tonumber(redis.call('get', tokens_key))\n" +"if last_tokens == nil then\n" +"  last_tokens = capacity\n" +"end\n" +"local last_refreshed = tonumber(redis.call('get', timestamp_key))\n" +"if last_refreshed == nil then\n" +"  last_refreshed = 0\n" +"end\n" +"local diff_time = math.max(0, now-last_refreshed)\n" +"local filled_tokens = math.min(capacity, last_tokens+(diff_time*rate))\n" +"local allowed = filled_tokens >= requested\n" +"local new_tokens = filled_tokens\n" +"local allowed_num = 0\n" +"if allowed then\n" +"  new_tokens = filled_tokens - requested\n" +"  allowed_num = 1\n" +"end\n" +"if ttl > 0 then\n" +"  redis.call('setex', tokens_key, ttl, new_tokens)\n" +"  redis.call('setex', timestamp_key, ttl, now)\n" +"end\n" +"local count_ttl = tonumber(redis.call('ttl',count_key))\n" +"if count_ttl < 0 then\n" +"  count_ttl = fill_time\n" +"end\n" +"redis.call('setex', count_key,count_ttl , has_count+1)\n" +"return allowed_num\n";
}

这样的改动做到了保持访问速率和吞吐量的可控,但是有没有必要这样就看自己的需求了。

最后想说一句,优秀的开源项目真的是宝藏。学会使用它,然后站在巨人的肩膀上前进

redis实现令牌桶的正确姿势相关推荐

  1. 【分布式缓存系列】Redis实现分布式锁的正确姿势

    一.前言 在我们日常工作中,除了Spring和Mybatis外,用到最多无外乎分布式缓存框架--Redis.但是很多工作很多年的朋友对Redis还处于一个最基础的使用和认识.所以我就像把自己对分布式缓 ...

  2. redis实现令牌桶算法思路

    1.常用的限流思路令牌桶算法和漏桶算法 直接令牌桶算法代码 <?phpclass TokenBucket{private $_config; //redis设定private $_redis; ...

  3. 令牌桶算法PHP简单实现,php 基于redis使用令牌桶算法 计数器 漏桶算法 实现流量控制...

    通常在高并发和大流量的情况下,一般限流是必须的.为了保证服务器正常的压力.那我们就聊一下几种限流的算法. 计数器 计数器是一种最常用的一种方法,在一段时间间隔内,处理请求的数量固定的,超的就不做处理. ...

  4. 分布式限流实战--redis实现令牌桶限流

    这篇文章我们主要是分析一下分布式限流的玩法. 因为限流也是一个经典用法了. 1.微服务限流 随着微服务的流行,服务和服务之间的稳定性变得越来越重要.缓存.降级和限流是保护微服务系统运行稳定性的三大利器 ...

  5. Redis实现分布式锁的正确姿势 | Spring Cloud 36

    一.分布式锁 1.1 什么是分布式锁 分布式锁,即分布式系统中的锁.在单体应用中我们通过锁解决的是控制共享资源访问的问题,而分布式锁,就是解决了分布式系统中控制共享资源访问的问题.与单体应用不同的是, ...

  6. PHP与Redis结合令牌桶算法进行实现限流

    https://www.php.cn/php-ask-448027.html https://www.zongscan.com/demo333/355.html

  7. 可能要用心学高并发核心编程,限流原理与实战,分布式令牌桶限流

    实战:分布式令牌桶限流 本节介绍的分布式令牌桶限流通过Lua+Java结合完成,首先在Lua脚本中完成限流的计算,然后在Java代码中进行组织和调用. 分布式令牌桶限流Lua脚本 分布式令牌桶限流Lu ...

  8. 使用令牌桶算法解决调用第三方接口限流问题

    我们在调用第三方接口时常常会碰到接口限流问题,为了解决这一问题,大家想出了许多方法.我这里介绍一下我的方法,第三方接口限流一般是基于令牌桶算法的,那么我们可以以彼之道还治彼身,使用令牌桶算法实现我方调 ...

  9. java同名过滤器_Gateway Redis令牌桶请求限流过滤器

    spring cloud gateway默认基于redis令牌桶算法进行微服务的限流保护,采用RateLimter限流算法来实现. 1.引入依赖包 org.springframework.cloud ...

最新文章

  1. 实战讲解Python函数参数
  2. Shell编程中Shift的用法
  3. 40个亿非负整数中找到未出现的数
  4. 基于Java语言构建区块链(四)—— 交易(UTXO)
  5. OpenPano:如何编写一个全景拼接器
  6. js+php在线截图 jquery fileupload.js,另一种图片上传 jquery.fileupload.js
  7. 大厂面试必问!50w字+的Java技术类校招面试题汇总
  8. 对比MS Test与NUnit Test框架
  9. JAVA进阶教学之(StrngBuffer进行字符串拼接)
  10. 利用反射,批量启动WCF服务
  11. c语言字符串每个字母加4,C语言基础:各字符型数据
  12. AlphaGo真的赢了么?
  13. PHP和zookeeper结合实践
  14. 14-一级指针和多级指针
  15. 《银联提交服务单》-业务流程
  16. Go Playground exercise
  17. minio error occured
  18. Linux下编写C语言
  19. 响铃:抖音的敌人不是快手
  20. xmlHttp.send(null)与xmlHttp.send…

热门文章

  1. 网站限制某些ip访问,仅允许某些ip…
  2. PyTorch:生态简介
  3. asterisk安装步骤
  4. Windows自带的造字功能使用
  5. mysql 数据库的编辑工具有哪些_常用的MySQL数据库管理工具有哪些
  6. javascript教程完整版,JavaScript视频教程
  7. 手写字体的fisher算法识别
  8. Ivar Jacobson 先生简介
  9. 什么是信令?什么是信令网?(转)
  10. Ubuntu磁盘扩充