Resilience4j是一个轻量级、易于使用的容错库,其灵感来自Netflix Hystrix,但专为Java 8和函数式编程设计。轻量级,因为库只使用Vavr,它没有任何其他外部库依赖项。相比之下,Netflix Hystrix对Archaius有一个编译依赖关系,Archaius有更多的外部库依赖关系,如Guava和Apache Commons。

Resilience4j提供高阶函数(decorators)来增强任何功能接口、lambda表达式或方法引用,包括断路器、速率限制器、重试或舱壁。可以在任何函数接口、lambda表达式或方法引用上使用多个装饰器。优点是您可以选择所需的装饰器,而无需其他任何东西。

有了Resilience4j,你不必全力以赴,你可以选择你需要的。

https://resilience4j.readme.io/docs/getting-started

概览

Resilience4j提供了两种舱壁模式**(Bulkhead)**,可用于限制并发执行的次数:

  • SemaphoreBulkhead(信号量舱壁,默认),基于Java并发库中的Semaphore实现。
  • FixedThreadPoolBulkhead(固定线程池舱壁),它使用一个有界队列和一个固定线程池。

本文将演示在Spring Boot2中集成Resilience4j库,以及在多并发情况下实现如上两种舱壁模式。

引入依赖

在Spring Boot2项目中引入Resilience4j相关依赖

<dependency><groupId>io.github.resilience4j</groupId><artifactId>resilience4j-spring-boot2</artifactId><version>1.4.0</version>
</dependency>
<dependency><groupId>io.github.resilience4j</groupId><artifactId>resilience4j-bulkhead</artifactId><version>1.4.0</version>
</dependency>

由于Resilience4j的Bulkhead依赖于Spring AOP,所以我们需要引入Spring Boot AOP相关依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId>
</dependency>

我们可能还希望了解Resilience4j在程序中的运行时状态,所以需要通过Spring Boot Actuator将其暴露出来

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

实现SemaphoreBulkhead(信号量舱壁)

resilience4j-spring-boot2实现了对resilience4j的自动配置,因此我们仅需在项目中的yml/properties文件中编写配置即可。

SemaphoreBulkhead的配置项如下:

属性配置 默认值 含义
maxConcurrentCalls 25 舱壁允许的最大并行执行量
maxWaitDuration 0 尝试进入饱和舱壁时,应阻塞线程的最长时间。

添加配置

示例(使用yml):

resilience4j.bulkhead:configs:default:maxConcurrentCalls: 5maxWaitDuration: 20msinstances:backendA:baseConfig: defaultbackendB:maxWaitDuration: 10msmaxConcurrentCalls: 20

如上,我们配置了SemaphoreBulkhead的默认配置为maxConcurrentCalls: 5,maxWaitDuration: 20ms。并在backendA实例上应用了默认配置,而在backendB实例上使用自定义的配置。这里的实例可以理解为一个方法/lambda表达式等等的可执行单元。

编写Bulkhead逻辑

定义一个受SemaphoreBulkhead管理的Service类:

@Service
public class BulkheadService {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate BulkheadRegistry bulkheadRegistry;@Bulkhead(name = "backendA")public JsonNode getJsonObject() throws InterruptedException {io.github.resilience4j.bulkhead.Bulkhead.Metrics metrics = bulkheadRegistry.bulkhead("backendA").getMetrics();logger.info("now i enter the method!!!,{}<<<<<<{}", metrics.getAvailableConcurrentCalls(), metrics.getMaxAllowedConcurrentCalls());Thread.sleep(1000L);logger.info("now i exist the method!!!");return new ObjectMapper().createObjectNode().put("file", System.currentTimeMillis());}
}

如上,我们将@Bulkhead注解放到需要管理的方法上面。并且通过name属性指定该方法对应的Bulkhead实例名字(这里我们指定的实例名字为backendA,所以该方法将会利用默认的配置)。

定义接口类:

@RestController
public class BulkheadResource {@Autowiredprivate BulkheadService bulkheadService;@GetMapping("/json-object")public ResponseEntity<JsonNode> getJsonObject() throws InterruptedException {return ResponseEntity.ok(bulkheadService.getJsonObject());}
}

编写测试:

首先添加测试相关依赖

<dependency><groupId>io.rest-assured</groupId><artifactId>rest-assured</artifactId><version>3.0.5</version><scope>test</scope>
</dependency>
<dependency><groupId>org.awaitility</groupId><artifactId>awaitility</artifactId><version>4.0.2</version><scope>test</scope>
</dependency>

这里我们使用rest-assured和awaitility编写多并发情况下的API测试

public class SemaphoreBulkheadTests extends Resilience4jDemoApplicationTests {@LocalServerPortprivate int port;@BeforeEachpublic void init() {RestAssured.baseURI = "http://localhost";RestAssured.port = port;}@Testpublic void 多并发访问情况下的SemaphoreBulkhead测试() {CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {statusList.add(given().get("/json-object").statusCode());}));await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);System.out.println(statusList);assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(5);assertThat(statusList.stream().filter(i -> i == 500).count()).isEqualTo(3);}
}

可以看到所有请求中只有前五个顺利通过了,其余三个都因为超时而导致接口报500异常。我们可能并不希望这种不友好的提示,因此Resilience4j提供了自定义的失败回退方法。当请求并发量过大时,无法正常执行的请求将进入回退方法。

首先我们定义一个回退方法

private JsonNode fallback(BulkheadFullException exception) {return new ObjectMapper().createObjectNode().put("errorFile", System.currentTimeMillis());}

注意:回退方法应该和调用方法放置在同一类中,并且必须具有相同的方法签名,并且仅带有一个额外的目标异常参数。

然后在@Bulkhead注解中指定回退方法:@Bulkhead(name = "backendA", fallbackMethod = "fallback")

最后修改API测试代码:

@Test
public void 多并发访问情况下的SemaphoreBulkhead测试使用回退方法() {CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {statusList.add(given().get("/json-object").statusCode());}));await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);System.out.println(statusList);assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(8);
}

运行单元测试,成功!可以看到,我们定义的回退方法,在请求过量时起作用了。

实现FixedThreadPoolBulkhead(固定线程池舱壁)

FixedThreadPoolBulkhead的配置项如下:

配置名称 默认值 含义
maxThreadPoolSize Runtime.getRuntime().availableProcessors() 配置最大线程池大小
coreThreadPoolSize Runtime.getRuntime().availableProcessors() - 1 配置核心线程池大小
queueCapacity 100 配置队列的容量
keepAliveDuration 20ms 当线程数大于核心时,这是多余空闲线程在终止前等待新任务的最长时间

添加配置

示例(使用yml):

resilience4j.thread-pool-bulkhead:configs:default:maxThreadPoolSize: 4coreThreadPoolSize: 2queueCapacity: 2instances:backendA:baseConfig: defaultbackendB:maxThreadPoolSize: 1coreThreadPoolSize: 1queueCapacity: 1

如上,我们定义了一段简单的FixedThreadPoolBulkhead配置,我们指定的默认配置为:maxThreadPoolSize: 4,coreThreadPoolSize: 2,queueCapacity: 2,并且指定了两个实例,其中backendA使用了默认配置而backendB使用了自定义的配置。

编写Bulkhead逻辑

定义一个受FixedThreadPoolBulkhead管理的方法:

@Bulkhead(name = "backendA", type = Bulkhead.Type.THREADPOOL)
public CompletableFuture<JsonNode> getJsonObjectByThreadPool() throws InterruptedException {io.github.resilience4j.bulkhead.ThreadPoolBulkhead.Metrics metrics = threadPoolBulkheadRegistry.bulkhead("backendA").getMetrics();logger.info("now i enter the method!!!,{}", metrics);Thread.sleep(1000L);logger.info("now i exist the method!!!");return CompletableFuture.supplyAsync(() -> new ObjectMapper().createObjectNode().put("file", System.currentTimeMillis()));
}

如上定义和SemaphoreBulkhead的方法大同小异,其中@Bulkhead显示指定了type的属性为Bulkhead.Type.THREADPOOL,表明其方法受FixedThreadPoolBulkhead管理。由于@Bulkhead默认的BulkheadSemaphoreBulkhead,所以在未指定type的情况下为SemaphoreBulkhead。另外,FixedThreadPoolBulkhead只对CompletableFuture方法有效,所以我们必创建返回CompletableFuture类型的方法。

定义接口类方法

@GetMapping("/json-object-with-threadpool")
public ResponseEntity<JsonNode> getJsonObjectWithThreadPool() throws InterruptedException, ExecutionException {return ResponseEntity.ok(bulkheadService.getJsonObjectByThreadPool().get());
}

编写测试代码

@Test
public void 多并发访问情况下的ThreadPoolBulkhead测试() {CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {statusList.add(given().get("/json-object-with-threadpool").statusCode());}));await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);System.out.println(statusList);assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(6);assertThat(statusList.stream().filter(i -> i == 500).count()).isEqualTo(2);
}

测试中我们并行请求了8次,其中6次请求成功,2次失败。根据FixedThreadPoolBulkhead的默认配置,最多能容纳maxThreadPoolSize+queueCapacity次请求(根据我们上面的配置为6次)。

同样,我们可能并不希望这种不友好的提示,那么我们可以指定回退方法,在请求无法正常执行时使用回退方法。

private CompletableFuture<JsonNode> fallbackByThreadPool(BulkheadFullException exception) {return CompletableFuture.supplyAsync(() -> new ObjectMapper().createObjectNode().put("errorFile", System.currentTimeMillis()));
}
@Bulkhead(name = "backendA", type = Bulkhead.Type.THREADPOOL, fallbackMethod = "fallbackByThreadPool")
public CompletableFuture<JsonNode> getJsonObjectByThreadPoolWithFallback() throws InterruptedException {io.github.resilience4j.bulkhead.ThreadPoolBulkhead.Metrics metrics = threadPoolBulkheadRegistry.bulkhead("backendA").getMetrics();logger.info("now i enter the method!!!,{}", metrics);Thread.sleep(1000L);logger.info("now i exist the method!!!");return CompletableFuture.supplyAsync(() -> new ObjectMapper().createObjectNode().put("file", System.currentTimeMillis()));
}

编写测试代码

@Test
public void 多并发访问情况下的ThreadPoolBulkhead测试使用回退方法() {CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {statusList.add(given().get("/json-object-by-threadpool-with-fallback").statusCode());}));await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);System.out.println(statusList);assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(8);
}

由于指定了回退方法,所有请求的响应状态都为正常了。


总结

本文首先简单介绍了Resilience4j的功能及使用场景,然后具体介绍了Resilience4j中的Bulkhead。演示了如何在Spring Boot2项目中引入Resilience4j库,使用代码示例演示了如何在Spring Boot2项目中实现Resilience4j中的两种Bulkhead(SemaphoreBulkhead和FixedThreadPoolBulkhead),并编写API测试验证我们的示例。

**本文示例代码地址:**https://github.com/cg837718548/resilience4j-demo


Spring Boot2+Resilience4j实现容错之Bulkhead相关推荐

  1. spring boot2.3.1版本导入spring-boot-starter-web没有validation校验框架的解决办法

    导入 我们发现下面的包飘红 再查看spring boot2.3.1版本导入spring-boot-starter-web底下确实没有validation 解决办法 需要手动加入validation的依 ...

  2. Spring Boot2.x-13前后端分离的跨域问题解决方法之Nginx

    文章目录 概述 浏览器同源策略 后台搭建 pom.xml interceptor 配置 Controller 启动测试 浏览器和session 后端工程发布到服务器上 问题复现 通过Nginx反向代理 ...

  3. Spring Boot2.x-12 Spring Boot2.1.2中Filter和Interceptor 的使用

    文章目录 Interceptor 拦截器 拦截器中方法的执行流程 传统项目拦截器的配置 Spring Boot2.1.2整合拦截器Interceptor 示例 Step1 实现HandlerInter ...

  4. Spring Boot2.x-11 使用@ControllerAdvice和@ExceptionHandler实现自定义全局异常

    文章目录 概述 未使用全局异常且未显式捕获异常的情况 使用全局异常 Step1. 自定义异常类 Step2. 封装异常信息模板 Step3. 全局异常处理类 Step4. 使用全局异常 小结 概述 我 ...

  5. Spring Boot2.x-10 基于Spring Boot 2.1.2 + Mybatis 2.0.0实现多数据源,支持事务

    文章目录 概述 思路 步骤 Step1 多数据源配置文件applicaiton.yml Step2 初始化多个数据源 Step3 配置多个数据源 验证测试 支持事务 Step1 配置类中通过@Bean ...

  6. Spring Boot2.x-09 基于Spring Boot 2.1.2 + Mybatis使用自定义注解实现数据库切换

    文章目录 概述 场景说明:读写分离 操作步骤 工程结构 Step1 自定义注解 Step2 数据源定义 Step3 配置文件配置数据源 Step4 数据源实例化DatasourceConfig Ste ...

  7. Spring Boot2.x-08Spring Boot2.1.2 整合 Mybatis1.3.2 + 通用Mapper2.1.4 + PageHelper1.2.10 + Druid 1.1.10

    文章目录 概述 整合 MyBatis 整合 通用Mapper2.1.4及 PageHelper1.2.10 添加依赖 通用mapper 编写 application.yml增加配置 集成验证测试 整合 ...

  8. Spring Boot2.x-05Spring Boot基础-使用注解完成依赖注入

    文章目录 概述 @Autowired注解 @Autowired的匹配原则 @Autowired的 required 属性 使用@Primary 和@Qualifier消除@Autowired的歧义 @ ...

  9. Spring Boot2.0+中,自定义配置类扩展springMVC的功能

    在spring boot1.0+,我们可以使用WebMvcConfigurerAdapter来扩展springMVC的功能,其中自定义的拦截器并不会拦截静态资源(js.css等). @Configur ...

最新文章

  1. JDK11使用IDEA,配置JavaFX
  2. 范进中举,读个博士到底有多难?看看就知道了!
  3. 三天打工生活终于结束了
  4. OpenLayers加载搜狗地图
  5. 获得Google搜索字符串中的关键字
  6. mysql创建表关联_MySQL创建高级联表教程
  7. java compare equla_Java中的equals,==,compareTo和compare的比较
  8. 2021中国跨境电商发展报告
  9. P5725 【深基4.习8】求三角形(python3实现)
  10. Python List sort方法无效
  11. C/C++编程笔记:C++中的atol(),atoll()和atof()函数
  12. copy的过去式_copy的过去式和用法例句
  13. Jmeter如何控制取样器执行顺序
  14. 「Python条件结构」嵌套if:根据星期英文字母输出相应的星期
  15. 信用卡逾期怎么办,如何让信用卡不逾期?
  16. 硬件设备计算存储及数据交互杂谈
  17. 模板方法设计模式两种实现方案
  18. python 对excel的函数操作(2)
  19. 天宫之印服务器基于ARM架构的欧拉系统搭建
  20. 绿联扩展坞拆解_拆解报告:绿联USB-C多功能拓展坞2A1C

热门文章

  1. 夏季来了,市政排水管道检测要注意以下几点
  2. 「关羽斩废」! 垃圾处理与发电参加「护家担当」概念雄安跨界斜杠派展区!AiHiX
  3. 【跨境电商】Zendesk最佳替代品,5款客服插件推荐
  4. 快印通软件通过360软件安全认证中心认证
  5. 原谅帽大作战网页版服务器连接失败,原谅帽大作战网页PC版
  6. Simulink电力电子仿真——(一)概述2
  7. python照片转化为漫画_巧用python实现图片转换成素描和漫画格式
  8. 你的微信头像:该换了
  9. Windows11连接共享打印机指定的网络名不再可用
  10. 如何选择真空机械制造 真空电极 真空穿通件 真空穿通密封件 真空腔 真空腔室 真空腔新材料 真空腔组件 超高压 超高真空材料 铝制CF组件