resilience4j-ratelimiter:限流器,用作流控

依赖

<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><resilience4j.version>0.16.0</resilience4j.version>
</properties>
<dependency><groupId>io.github.resilience4j</groupId><artifactId>resilience4j-ratelimiter</artifactId><version>${resilience4j.version}</version>
</dependency>

application.yml配置内容如下

resilience4j:ratelimiter:limiters:limiterA: # #RateLimiter名称limit-for-period: 1 # 每时间单位可执行处理数limit-refresh-period-in-millis: 10000 # 单位时间(毫秒)timeout-in-millis: 10000 # 获取令牌的等待超时时间, 超时为获取到令牌将抛出异常limiterB:limit-for-period: 1 # 每时间单位可执行处理数limit-refresh-period-in-millis: 10000 #单位时间(毫秒)timeout-in-millis: 10000 # 获取令牌的等待超时时间, 超时为获取到令牌将抛出异常

api方式使用

// 创建限流器配置
RateLimiterConfig config =  RateLimiterConfig.custom().limitRefreshPeriod(Duration.ofMillis(1000)).limitForPeriod(1).timeoutDuration(Duration.ofMillis(1000)).build();
// 创建限流器注册器RateLimiterRegistry
RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);
// 通过RateLimiterRegistry来创建限流器
RateLimiter rateLimiterWithDefaultConfig = rateLimiterRegistry.rateLimiter("backend1");
RateLimiter rateLimiterWithCustomConfig = rateLimiterRegistry.rateLimiter("backend2", config);
// 通过RateLimiter创建限流器
RateLimiter rateLimiter = RateLimiter.of("emailSendRateLimiter", config);
// 限流器指标:等待线程数,可用令牌数
RateLimiter.Metrics metrics = rateLimiter.getMetrics();
rateLimiter.getEventPublisher().onSuccess(event -> {System.out.println(event.getEventType() + ":::可用令牌数: " + metrics.getAvailablePermissions() + ", 等待线程数: "+ metrics.getNumberOfWaitingThreads());
}).onFailure(event -> {System.out.println(event.getEventType() + ":::可用令牌数: " + metrics.getAvailablePermissions() + ", 等待线程数: "+ metrics.getNumberOfWaitingThreads());
});
// 创建一个受RateLimiter限制的runnable。
Runnable task = RateLimiter.decorateRunnable(rateLimiter, new Runnable() {@Overridepublic void run() {// TODO Auto-generated method stub}});
// 装饰并执行装饰的Runnable。
rateLimiter.executeRunnable(new Runnable() {@Overridepublic void run() {}});
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));
ThreadPoolExecutorFactoryBean threadFactory = new ThreadPoolExecutorFactoryBean();
threadFactory.setThreadNamePrefix("emailSendThreadPool");
poolExecutor.setThreadFactory(threadFactory);
int id = 1;
while (true) {try {Runnable task = RateLimiter.decorateRunnable(rateLimiter, new SendThread(id++));poolExecutor.execute(task);Thread.sleep(500L);} catch (RequestNotPermitted | InterruptedException e) {System.out.println(e.getMessage());}
}

测试代码:

package cn.sino.emailserver;import java.time.Duration;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;import org.junit.Before;
import org.junit.Test;
import org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean;import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import io.vavr.control.Try;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class RateLimiterTest {private RateLimiterConfig config;private AtomicInteger count = new AtomicInteger(0);private CountDownLatch latch = new CountDownLatch(100);@Beforepublic void init() {config = RateLimiterConfig.custom().limitRefreshPeriod(Duration.ofMillis(1000)).limitForPeriod(10).timeoutDuration(Duration.ofMillis(10000)).build();}@Testpublic void test() {// 创建限流器RateLimiter rateLimiter = RateLimiter.of("emailSendRateLimiter", config);RateLimiter.Metrics metrics = rateLimiter.getMetrics();rateLimiter.getEventPublisher().onSuccess(event -> {System.out.println(event.getEventType() + ":::可用令牌数: " + metrics.getAvailablePermissions() + ", 等待线程数: "+ metrics.getNumberOfWaitingThreads());}).onFailure(event -> {System.out.println(event.getEventType() + ":::可用令牌数: " + metrics.getAvailablePermissions() + ", 等待线程数: "+ metrics.getNumberOfWaitingThreads());});      ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 100, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50));ThreadPoolExecutorFactoryBean threadFactory = new ThreadPoolExecutorFactoryBean();threadFactory.setThreadNamePrefix("emailSendThreadPool-");poolExecutor.setThreadFactory(threadFactory);long start = System.currentTimeMillis();IntStream.rangeClosed(1, 100).parallel().forEach(i -> {Runnable task = RateLimiter.decorateRunnable(rateLimiter, new SendThread(i));poolExecutor.execute(task);System.out.println(poolExecutor.getActiveCount()+"::"+poolExecutor.getTaskCount());});try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}log.info("耗时: {} ms, count={}", (System.currentTimeMillis() - start), count.get());}class SendThread implements Runnable {private int id;public SendThread(int id) {this.id = id;}@Overridepublic void run() {log.info("发送邮件, id={}", id);count.incrementAndGet();latch.countDown();}}
}

测试代码:

package cn.sino.emailserver;import java.time.Duration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;import org.junit.Before;
import org.junit.Test;
import org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean;import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadConfig;
import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import io.vavr.control.Try;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class RateLimiterTest {private RateLimiterConfig config;private AtomicInteger count = new AtomicInteger(0);private CountDownLatch latch = new CountDownLatch(100);private RateLimiter rateLimiter;@Beforepublic void init() {config = RateLimiterConfig.custom().limitRefreshPeriod(Duration.ofMillis(1000)).limitForPeriod(100).timeoutDuration(Duration.ofMillis(100000000)).build();rateLimiter = RateLimiter.of("emailSendRateLimiter", config);}@Testpublic void test() {// 创建限流器RateLimiter.Metrics metrics = rateLimiter.getMetrics();rateLimiter.getEventPublisher().onEvent(event -> {log.info("{}:::可用令牌数={}, 等待线程数={}", event.getEventType(), metrics.getAvailablePermissions(), metrics.getNumberOfWaitingThreads());});ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 100, 60, TimeUnit.SECONDS,new ArrayBlockingQueue<>(50));ThreadPoolExecutorFactoryBean threadFactory = new ThreadPoolExecutorFactoryBean();threadFactory.setThreadNamePrefix("emailSendThreadPool-");poolExecutor.setThreadFactory(threadFactory);Bulkhead bulkhead = Bulkhead.of("email-send-executor-",BulkheadConfig.custom().maxConcurrentCalls(10).maxWaitTimeDuration(Duration.ofMillis(3000000L)).build());bulkhead.getEventPublisher().onEvent(event -> {log.info("{}", event.getEventType());});long start = System.currentTimeMillis();IntStream.rangeClosed(1, 100).parallel().forEach(i -> {//          Runnable task = RateLimiter.decorateRunnable(rateLimiter, new SendThread(i));Runnable task = Bulkhead.decorateRunnable(bulkhead, new SendThread(i));
//          poolExecutor.execute(task);Try.runRunnable(task);
//          System.out.println(poolExecutor.getActiveCount()+"::"+poolExecutor.getTaskCount());});try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}log.info("耗时: {} ms, count={}", (System.currentTimeMillis() - start), count.get());}class SendThread implements Runnable {public void send(Integer i) {log.info("email send succ: {}", i);}public void send() {log.info("email send succ: {}");}private int id;public SendThread(int id) {this.id = id;}@Overridepublic void run() {rateLimiter.acquirePermission();send(id);log.info("发送邮件, id={}", id);count.incrementAndGet();latch.countDown();}}
}

resilience4j-ratelimiter:限流器相关推荐

  1. google的RateLimiter限流器的使用

    背景:A系统需要调用B,C,D系统,B,C,D系统没有能力做限流,因此需要A系统针对B,C,D系统做限流,每秒发送对象要求的请求数.可以使用下面的组件进行控制 import com.google.co ...

  2. 基于令牌桶算法的限流器RateLimiter分析

    RateLimiter限流源码分析 主要从以下几个方面进行分析 RateLimiter工作原理的总体介绍 RateLimiter的继承关系(基类与子类的关系图) 关键属性字段分析 核心方法分析(创建- ...

  3. Resilience4j

    微服务容错简介 在高并发访问下,比如天猫双11,流量持续不断的涌入,服务之间的相互调用频率突然增加,引发系统负载过高,这时系统所依赖的服务的稳定性对系统的影响非常大,而且还有很多不确定因素引起雪崩,如 ...

  4. 第四章 微服务容错Resilience4j

    4.1 微服务容错简介 在⾼并发访问下,⽐如天猫双11,流量持续不断的涌⼊,服务之间的相互调⽤频率突然增加,引发系统负载过⾼,这时系统所依赖的服务的稳定性对系统的影响⾮常⼤,⽽且还有很多不确定因素引起 ...

  5. 四,微服务容错Resilience4j(待改)

    写在前面的话:本文大部分(说全部也没问题)都是抄写的,课堂的资料给的特别好.感觉耻于提笔,但不写上就影响整个博客内容了.以后会改/写出我的笔记的. 感谢你的路过,希望学生的笔记能给你一点微不足道的参考 ...

  6. Soring Cloud -- Resilience4j简介

    参考: GitHub - resilience4j/resilience4j: Resilience4j is a fault tolerance library designed for Java8 ...

  7. matlab熔断器,Resilience4j 熔断器

    主要讲解 resilience4j-spring-boot2: 1.依赖导入 dependencies { compile "io.github.resilience4j:resilienc ...

  8. 实战 Spring Cloud Gateway 之限流篇

    来源:https://www.aneasystone.com/archives/2020/08/spring-cloud-gateway-current-limiting.html 话说在 Sprin ...

  9. 这可能是全网Spring Cloud Gateway限流最完整的方案了!

        作者:aneasystone     https://www.aneasystone.com/ 话说在 Spring Cloud Gateway 问世之前,Spring Cloud 的微服务世 ...

最新文章

  1. pipe 函数 (C语言)
  2. UNIX环境高级编程——无名管道和有名管道
  3. 分类模型的性能评估——以SAS Logistic回归为例(3): Lift和Gain
  4. NYOJ 303 序号转换 数学题
  5. PAT (Advanced Level) 1016 Phone Bills(恶心模拟)
  6. WorkerMan 入门学习之(二)基础教程-Connection类的使用
  7. LogSegment分析
  8. MyBatis3与Spring3的整合配置(初级篇)
  9. 教育学考研跨考计算机,某985计算机专业,想要三跨北师大教育学,会不会很难?...
  10. WPF 控件 深度克隆
  11. QT_仿王者荣耀抽奖
  12. OpenRefine数据清洗实战
  13. 微pe工具箱 系统安装教程_通用PE工具箱装系统(V4.0)——安装原版WIN7系统
  14. 8051单片机驱动TM1620任意字符循环显示程序(详细注释版)
  15. 8/14 二维高斯函数
  16. 一键启动多应用(windows版)
  17. PowerBI账户免费注册
  18. 用python画佩奇_用python画小猪票佩奇
  19. 论文精读《LSS: Lift, Splat, Shoot: Encoding Images from Arbitrary Camera Rigs by Implicitly Unprojecting》
  20. Graph4Rec: 基于图神经网络的推荐系统通用工具包

热门文章

  1. Vue - 音频播放器插件(vue-aplayer)
  2. spingMVC 引用实体类绑定,中文乱码过滤器,时间类型转换器 。案例配置步骤
  3. 字、字节、位(word、byte、bit)的关系
  4. 狐言:王阳明心学、量子物理、心外无物的乱弹
  5. 关于jsp网页弹出窗口[很多种方法......]
  6. Carla 启动时初始化到75%就崩溃,出现“Fatal error”
  7. 【笔记】wlan - 基础概念(无线、wifi、常见协议、频谱、信道、ap部署、案例)
  8. Error: listen EADDRINUSE: address already in use 127.0.0.1:8888
  9. Boston Dog
  10. 《进化心理学》《伊斯坦布尔假期》