转载:https://www.cnblogs.com/exceptioneye/p/4783904.html

https://blog.csdn.net/zrg523/article/details/82185088?utm_source=blogkpcl2(实施方案)

一、场景描述

很多做服务接口的人或多或少的遇到这样的场景,由于业务应用系统的负载能力有限,为了防止非预期的请求对系统压力过大而拖垮业务应用系统。

也就是面对大流量时,如何进行流量控制?

服务接口的流量控制策略:分流、降级、限流等。本文讨论下限流策略,虽然降低了服务接口的访问频率和并发量,却换取服务接口和业务应用系统的高可用。

实际场景中常用的限流策略:

  • Nginx前端限流

按照一定的规则如帐号、IP、系统调用逻辑等在Nginx层面做限流

  • 业务应用系统限流

1、客户端限流

2、服务端限流

  • 数据库限流

红线区,力保数据库

二、常用的限流算法

常用的限流算法由:楼桶算法和令牌桶算法。本文不具体的详细说明两种算法的原理,原理会在接下来的文章中做说明。

1、漏桶算法

漏桶(Leaky Bucket)算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率.示意图如下:

   

可见这里有两个变量,一个是桶的大小,支持流量突发增多时可以存多少的水(burst),另一个是水桶漏洞的大小(rate)。

因为漏桶的漏出速率是固定的参数,所以,即使网络中不存在资源冲突(没有发生拥塞),漏桶算法也不能使流突发(burst)到端口速率.因此,漏桶算法对于存在突发特性的流量来说缺乏效率.

2、令牌桶算法

令牌桶算法(Token Bucket)和 Leaky Bucket 效果一样但方向相反的算法,更加容易理解.随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了.新请求来临时,会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务.

  令牌桶的另外一个好处是可以方便的改变速度. 一旦需要提高速率,则按需提高放入桶中的令牌的速率. 一般会定时(比如100毫秒)往桶中增加一定数量的令牌, 有些变种算法则实时的计算应该增加的令牌的数量.

三、基于Redis功能的实现

简陋的设计思路:假设一个用户(用IP判断)每分钟访问某一个服务接口的次数不能超过10次,那么我们可以在Redis中创建一个键,并此时我们就设置键的过期时间为60秒,每一个用户对此服务接口的访问就把键值加1,在60秒内当键值增加到10的时候,就禁止访问服务接口。在某种场景中添加访问时间间隔还是很有必要的。

1)使用Redis的incr命令,将计数器作为Lua脚本

1 local current
2 current = redis.call("incr",KEYS[1])
3 if tonumber(current) == 1 then
4     redis.call("expire",KEYS[1],1)
5 end

Lua脚本在Redis中运行,保证了incr和expire两个操作的原子性。

2)使用Reids的列表结构代替incr命令

 1 FUNCTION LIMIT_API_CALL(ip)2 current = LLEN(ip)3 IF current > 10 THEN4     ERROR "too many requests per second"5 ELSE6     IF EXISTS(ip) == FALSE7         MULTI8             RPUSH(ip,ip)9             EXPIRE(ip,1)
10         EXEC
11     ELSE
12         RPUSHX(ip,ip)
13     END
14     PERFORM_API_CALL()
15 END

Rate Limit使用Redis的列表作为容器,LLEN用于对访问次数的检查,一个事物中包含了RPUSH和EXPIRE两个命令,用于在第一次执行计数是创建列表并设置过期时间,

RPUSHX在后续的计数操作中进行增加操作。

四、基于令牌桶算法的实现

令牌桶算法可以很好的支撑突然额流量的变化即满令牌桶数的峰值。

  1 import java.io.BufferedWriter;2 import java.io.FileOutputStream;3 import java.io.IOException;4 import java.io.OutputStreamWriter;5 import java.util.Random;6 import java.util.concurrent.ArrayBlockingQueue;7 import java.util.concurrent.Executors;8 import java.util.concurrent.ScheduledExecutorService;9 import java.util.concurrent.TimeUnit;10 import java.util.concurrent.locks.ReentrantLock;11  12 import com.google.common.base.Preconditions;13 import com.netease.datastream.util.framework.LifeCycle;14  1520 public class TokenBucket implements LifeCycle {21  22 // 默认桶大小个数 即最大瞬间流量是64M23  private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;24  25 // 一个桶的单位是1字节26  private int everyTokenSize = 1;27  28 // 瞬间最大流量29  private int maxFlowRate;30  31 // 平均流量32  private int avgFlowRate;33  34 // 队列来缓存桶数量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * 1024 * 6435  private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(DEFAULT_BUCKET_SIZE);36  37 private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();38  39 private volatile boolean isStart = false;40  41 private ReentrantLock lock = new ReentrantLock(true);42  43 private static final byte A_CHAR = 'a';44  45 public TokenBucket() {46  }47  48 public TokenBucket(int maxFlowRate, int avgFlowRate) {49  this.maxFlowRate = maxFlowRate;50  this.avgFlowRate = avgFlowRate;51  }52  53 public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {54  this.everyTokenSize = everyTokenSize;55  this.maxFlowRate = maxFlowRate;56  this.avgFlowRate = avgFlowRate;57  }58  59 public void addTokens(Integer tokenNum) {60  61 // 若是桶已经满了,就不再家如新的令牌62  for (int i = 0; i < tokenNum; i++) {63  tokenQueue.offer(Byte.valueOf(A_CHAR));64  }65  }66  67 public TokenBucket build() {68  69 start();70  return this;71  }72  73 /**74  * 获取足够的令牌个数75  *76  * @return77  */78  public boolean getTokens(byte[] dataSize) {79  80 Preconditions.checkNotNull(dataSize);81  Preconditions.checkArgument(isStart, "please invoke start method first !");82  83 int needTokenNum = dataSize.length / everyTokenSize + 1;// 传输内容大小对应的桶个数84  85 final ReentrantLock lock = this.lock;86  lock.lock();87  try {88  boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足够的桶数量89  if (!result) {90  return false;91  }92  93 int tokenCount = 0;94  for (int i = 0; i < needTokenNum; i++) {95  Byte poll = tokenQueue.poll();96  if (poll != null) {97  tokenCount++;98  }99  }
100
101 return tokenCount == needTokenNum;
102  } finally {
103  lock.unlock();
104  }
105  }
106
107 @Override
108  public void start() {
109
110 // 初始化桶队列大小
111  if (maxFlowRate != 0) {
112  tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
113  }
114
115 // 初始化令牌生产者
116  TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
117  scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS);
118  isStart = true;
119
120 }
121
122 @Override
123  public void stop() {
124  isStart = false;
125  scheduledExecutorService.shutdown();
126  }
127
128 @Override
129  public boolean isStarted() {
130  return isStart;
131  }
132
133 class TokenProducer implements Runnable {
134
135 private int avgFlowRate;
136  private TokenBucket tokenBucket;
137
138 public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
139  this.avgFlowRate = avgFlowRate;
140  this.tokenBucket = tokenBucket;
141  }
142
143 @Override
144  public void run() {
145  tokenBucket.addTokens(avgFlowRate);
146  }
147  }
148
149 public static TokenBucket newBuilder() {
150  return new TokenBucket();
151  }
152
153 public TokenBucket everyTokenSize(int everyTokenSize) {
154  this.everyTokenSize = everyTokenSize;
155  return this;
156  }
157
158 public TokenBucket maxFlowRate(int maxFlowRate) {
159  this.maxFlowRate = maxFlowRate;
160  return this;
161  }
162
163 public TokenBucket avgFlowRate(int avgFlowRate) {
164  this.avgFlowRate = avgFlowRate;
165  return this;
166  }
167
168 private String stringCopy(String data, int copyNum) {
169
170 StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);
171
172 for (int i = 0; i < copyNum; i++) {
173  sbuilder.append(data);
174  }
175
176 return sbuilder.toString();
177
178 }
179
180 public static void main(String[] args) throws IOException, InterruptedException {
181
182 tokenTest();
183  }
184
185 private static void arrayTest() {
186  ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(10);
187  tokenQueue.offer(1);
188  tokenQueue.offer(1);
189  tokenQueue.offer(1);
190  System.out.println(tokenQueue.size());
191  System.out.println(tokenQueue.remainingCapacity());
192  }
193
194 private static void tokenTest() throws InterruptedException, IOException {
195  TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build();
196
197 BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("/tmp/ds_test")));
198  String data = "xxxx";// 四个字节
199  for (int i = 1; i <= 1000; i++) {
200
201 Random random = new Random();
202  int i1 = random.nextInt(100);
203  boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes());
204  TimeUnit.MILLISECONDS.sleep(100);
205  if (tokens) {
206  bufferedWriter.write("token pass --- index:" + i1);
207  System.out.println("token pass --- index:" + i1);
208  } else {
209  bufferedWriter.write("token rejuect --- index" + i1);
210  System.out.println("token rejuect --- index" + i1);
211  }
212
213 bufferedWriter.newLine();
214  bufferedWriter.flush();
215  }
216
217 bufferedWriter.close();
218  }
219
220 }

参考:

http://xiaobaoqiu.github.io/blog/2015/07/02/ratelimiter/

http://redisdoc.com/string/incr.html

http://www.cnblogs.com/zhengyun_ustc/archive/2012/11/17/topic1.html

服务接口API限流 Rate Limit相关推荐

  1. Java互联网架构-如何设计服务接口API限流功能

    API 概念的出现,远远早于个人计算机的诞生,更不用说网络的诞生了.在公用数据处理的早期,为了一个应用能够与其它系统交互,开发者便已开始设计可公开访问并描述清晰的"接入点".早在那 ...

  2. 设计一个基于用户的API限流策略 Rate Limit

    设计一个基于用户的API限流策略 Rate Limit 应用场景 API接口的流量控制策略:缓存.降级.限流.限流可以认为服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统的目的.限流策略 ...

  3. 分布式高并发服务三种常用限流方案简介

    以下文章来源于技术岁月 ,作者贺鹏Kavin 在高并发大流量系统中,由于并发大造成服务资源不足,负载过高,进而引发致一系列问题,这里的流量一般都是突发性的,由于系统准备不足,很难短期扩容来应对 ,进行 ...

  4. 【限流02】限流算法实战篇 - 手撸一个单机版Http接口通用限流框架

    本文将从需求的背景.需求分析.框架设计.框架实现几个层面一步一步去实现一个单机版的Http接口通用限流框架. 一.限流框架分析 1.需求背景 微服务系统中,我们开发的接口可能会提供给很多不同的系统去调 ...

  5. gateway sentinel 熔断 不起作用_Spring Cloud Alibaba集训营第五天(服务熔断和限流)

    前言:为什么需要流控降级 我们的生产环境经常会出现一些不稳定的情况,如: 大促时瞬间洪峰流量导致系统超出最大负载,load 飙高,系统崩溃导致用户无法下单 "黑马"热点商品击穿缓存 ...

  6. java api限流_Java 9:流API的增强

    java api限流 Java 9向Stream接口添加了4种新方法: 1. dropWhile dropWhile方法类似于skip方法,但使用Predicate而不是固定的整数值. 当Predic ...

  7. Service Mesh微服务熔断、限流的骚操作

    在微服务架构中,随着服务调用链路变长,为了防止出现级联雪崩,在微服务治理体系中,熔断.限流作为服务自我保护的重要机制,是确保微服务架构稳定运行的关键手段之一. 那么什么是熔断.限流?在传统Spring ...

  8. 玩转Service Mesh微服务熔断、限流骚操作

    在微服务架构中,随着服务调用链路变长,为了防止出现级联雪崩,在微服务治理体系中,熔断.限流作为服务自我保护的重要机制,是确保微服务架构稳定运行的关键手段之一. 那么什么是熔断.限流?在传统Spring ...

  9. Google Guava缓存实现接口的限流

    一.项目背景 最近项目中需要进行接口保护,防止高并发的情况把系统搞崩,因此需要对一个查询接口进行限流,主要的目的就是限制单位时间内请求此查询的次数,例如1000次,来保护接口. 参考了 开涛的博客聊聊 ...

最新文章

  1. Symantec BE 12.5 备份Exchange错误排除
  2. classmethod和staticmethod
  3. 012-简单辅助元素
  4. 秒杀业务架构优化之路--转
  5. Python基础day04【函数(定义与调用、文档说明、传参函数、全局变量、返回值、嵌套调用)】
  6. 第七章_类_7.4 类的作用域(加了自己的总结部分)
  7. SVM推导过程及SMO详细求解过程(转载+自己笔记)
  8. 关于错误Resource interpreted as Script but transferred with MIME type text_html
  9. Unity3D之UGUI基础3:Image图片
  10. 计算机系统的性能能被准确测量吗,信息技术 基于计算机的软件系统的性能测量与评级...
  11. 18、弱电锦集:网络视频监控知识问答100例
  12. 原相机怎么拍出网图_教你如何用iPhone原相机拍照也能网红风
  13. CentOS7使用yum方式安装Docker(根据官方文档安装)
  14. 计算机图形学实验报告几何变换,计算机图形学实验报告几何变换.doc
  15. 数据库简介、SQL 语的增加删除修改查询命令
  16. 谁说手工测试不香了?25k不比二流的开发美滋滋
  17. CentOS7.0如何查看MAC地址
  18. python项目作业_期末作业.py · 光辉/python练手小项目 - Gitee.com
  19. 文档在线预览产品系列-解决方案篇
  20. 恶意进程 - 挖矿程序处理

热门文章

  1. 笔记本计算机管理打不开怎么办,NVIDIA控制面板打不开怎么办解决方案
  2. 就业报告显示,近七成受访者向往互联网行业,互联网行业前景如何?
  3. 【论文十问】基于数据库日志的变化数据捕获研究
  4. Qoo10趣天jp入驻条件VS亚马逊Amazon日本站 どちらがいいの?
  5. 2022年9月青少年软件编程C语言一级真题答案
  6. 在Linux系统中运行.net程序——Mono介绍
  7. C语言实现医院诊疗管理系统
  8. 【Life系列】【转】一个人最好的生活状态
  9. python统计闰年的个数_闰年python_闰年 python_python 闰年数 - 云+社区 - 腾讯云
  10. 移动云天元“易行”迁移工具亮相ODD 2023,助力全场景业务规模迁移