Hystrix运行原理

  1. 构造一个HystrixCommand或HystrixObservableCommand对象
  2. 执行命令。
  3. 检查是否已命中缓存,如果命中直接返回。
  4. 检查断路器开关是否打开,如果打开,直接熔断,走fallback逻辑。
  5. 检查线程池/队列/信号量是否已满,如果已满,直接拒绝请求,走fallback逻辑。
  6. 上面条件都不满足,调用HystrixObservableCommand.construct()方法HystrixCommand.run()方法,执行业务逻辑。
  7. 判断运行业务逻辑方法是否出现异常或者超时,如果出现,直接降级,走fallback逻辑。
  8. 上报统计数据,用户计算断路器状态。
  9. 返回结果

从流程图可以发现,只有出现57两种情况时,才会上报错误统计数据。

断路器运行原理

断路器的开关控制逻辑如下:

  1. ***在一个统计时间窗口内(HystrixCommandProperties.metricsRollingStatisticalWindowInMilliseconds())***,处理的请求数量达到设定的最小阈值(HystrixCommandProperties.circuitBreakerRequestVolumeThreshold()),并且错误百分比超过设定的最大阈值(HystrixCommandProperties.circuitBreakerErrorThresholdPercentage()),这时断路器开关就会打开,断路器状态从转换CLOSED切换为OPEN
  2. 当断路器为打开状态时,它他会直接熔断所有请求(快速失败),走fallback逻辑。
  3. 经过一个睡眠窗口时间后(HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds()),Hystrix会放行一个请到后续服务,并将断路器开关切换为半开状态(HALF-OPEN)。如果该请求失败,则断路器会将熔断开关切换为打开状态(OPEN),继续熔断所有请求,直到下一个睡眠时间窗口的到来;如果该请求成功,则断路器会切换到关闭状态(CLOSED),这时将允许所有请求通过,直到出现1步骤的情况,断路器开关会切换为打开状态(OPEN)。

断路器源码

Hystrix断路器的实现类是HystrixCircuitBreaker,源码如下:

/*** Circuit-breaker logic that is hooked into {@link HystrixCommand} execution and will stop allowing executions if failures have gone past the defined threshold.* 断路器,在HystrixCommand执行时会调用断路器逻辑,如果故障超过定义的阈值,断路器熔断开关将会打开,这时将阻止任务执行。* <p>* The default (and only) implementation  will then allow a single retry after a defined sleepWindow until the execution* succeeds at which point it will again close the circuit and allow executions again.* <p>* 默认(且唯一)实现将允许在定义的sleepWindow之后进行单次重试,直到执行成功,此时它将再次关闭电路并允许再次执行。*/
public interface HystrixCircuitBreaker {/*** Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not.  It is idempotent and does* not modify any internal state, and takes into account the half-open logic which allows some requests through* after the circuit has been opened* <p>* 每个HystrixCommand请求都会询问是否允许继续(当断路器开关为OPEN和HALF_OPEN都时返回false,当断路器开关是CLOSE时或者到了下一个睡眠窗口时返回true)。* 它是幂等的,不会修改任何内部状态,并考虑到半开逻辑,当一个睡眠窗口到来时他会放行一些请求到后续逻辑** @return boolean whether a request should be permitted (是否应允许请求)*/boolean allowRequest();/*** Whether the circuit is currently open (tripped).* 判断熔断开关是否打开(有无副作用,不是幂等方式)。** @return boolean state of circuit breaker(返回断路器的状态)*/boolean isOpen();/*** Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.* <p>* 断路器在处于半开状态时,作为反馈机制的一部分,从HystrixCommand成功执行时调用。*/void markSuccess();/*** Invoked on unsuccessful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.* 断路器当处于半开状态时,作为反馈机制的一部分,从HystrixCommand执行不成功的调用。*/void markNonSuccess();/*** Invoked at start of command execution to attempt an execution.  This is non-idempotent - it may modify internal* state.* <p>* 在命令执行开始时调用以尝试执行,主要所用时判断该请求是否可以执行。这是非幂等的 - 它可能会修改内部状态。*/boolean attemptExecution();
}

断路器的默认实现就是它的一个内部类:

/*** @ExcludeFromJavadoc* @ThreadSafe*/
class Factory {// String is HystrixCommandKey.name() (we can't use HystrixCommandKey directly as we can't guarantee it implements hashcode/equals correctly)// key是HystrixCommandKey.name()(我们不能直接使用HystrixCommandKey,因为我们无法保证它正确实现hashcode / equals)private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();/*** 根据HystrixCommandKey获取HystrixCircuitBreaker* Get the {@link HystrixCircuitBreaker} instance for a given {@link HystrixCommandKey}.* <p>* This is thread-safe and ensures only 1 {@link HystrixCircuitBreaker} per {@link HystrixCommandKey}.** @param key        {@link HystrixCommandKey} of {@link HystrixCommand} instance requesting the {@link HystrixCircuitBreaker}* @param group      Pass-thru to {@link HystrixCircuitBreaker}* @param properties Pass-thru to {@link HystrixCircuitBreaker}* @param metrics    Pass-thru to {@link HystrixCircuitBreaker}* @return {@link HystrixCircuitBreaker} for {@link HystrixCommandKey}*/public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {// this should find it for all but the first time// 根据HystrixCommandKey获取断路器HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());if (previouslyCached != null) {return previouslyCached;}// if we get here this is the first time so we need to initialize// Create and add to the map ... use putIfAbsent to atomically handle the possible race-condition of// 2 threads hitting this point at the same time and let ConcurrentHashMap provide us our thread-safety// If 2 threads hit here only one will get added and the other will get a non-null response instead.// 第一次没有获取到断路器,那么我们需要取初始化它// 这里直接利用ConcurrentHashMap的putIfAbsent方法,它是原子操作,加入有两个线程执行到这里,将会只有一个线程将值放到容器中// 让我们省掉了加锁的步骤HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));if (cbForCommand == null) {// this means the putIfAbsent step just created a new one so let's retrieve and return itreturn circuitBreakersByCommand.get(key.name());} else {// this means a race occurred and while attempting to 'put' another one got there before// and we instead retrieved it and will now return itreturn cbForCommand;}}/*** 根据HystrixCommandKey获取HystrixCircuitBreaker,如果没有返回NULL* Get the {@link HystrixCircuitBreaker} instance for a given {@link HystrixCommandKey} or null if none exists.** @param key {@link HystrixCommandKey} of {@link HystrixCommand} instance requesting the {@link HystrixCircuitBreaker}* @return {@link HystrixCircuitBreaker} for {@link HystrixCommandKey}*/public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) {return circuitBreakersByCommand.get(key.name());}/*** Clears all circuit breakers. If new requests come in instances will be recreated.* 清除所有断路器。如果有新的请求将会重新创建断路器放到容器。*//* package */static void reset() {circuitBreakersByCommand.clear();}
}/**1. 默认的断路器实现2. The default production implementation of {@link HystrixCircuitBreaker}.3.  4. @ExcludeFromJavadoc5. @ThreadSafe*/
/* package */class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {private final HystrixCommandProperties properties;private final HystrixCommandMetrics metrics;enum Status {// 断路器状态,关闭,打开,半开CLOSED, OPEN, HALF_OPEN;}// 赋值操作不是线程安全的。若想不用锁来实现,可以用AtomicReference<V>这个类,实现对象引用的原子更新。// AtomicReference 原子引用,保证Status原子性修改private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);// 记录断路器打开的时间点(时间戳),如果这个时间大于0表示断路器处于打开状态或半开状态private final AtomicLong circuitOpened = new AtomicLong(-1);private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {this.properties = properties;this.metrics = metrics;//On a timer, this will set the circuit between OPEN/CLOSED as command executions occur// 在定时器上,当命令执行发生时,这将在OPEN / CLOSED之间设置电路Subscription s = subscribeToStream();activeSubscription.set(s);}private Subscription subscribeToStream() {/** This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream* 此流将重新计算运行状况流中每个onNext上的OPEN / CLOSED状态*/return metrics.getHealthCountsStream().observe().subscribe(new Subscriber<HealthCounts>() {@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onNext(HealthCounts hc) {// check if we are past the statisticalWindowVolumeThreshold// 检查一个时间窗口内的最小请求数if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {// we are not past the minimum volume threshold for the stat window,// so no change to circuit status.// if it was CLOSED, it stays CLOSED// IF IT WAS HALF-OPEN, WE NEED TO WAIT FOR A SUCCESSFUL COMMAND EXECUTION// if it was open, we need to wait for sleep window to elapse// 我们没有超过统计窗口的最小音量阈值,所以我们不会去改变断路器状态,如果是closed状态,他将保持这个状态// 如果是半开状态,那么她需要等到一个成功的 Command执行// 如果是打开状态,那么它需要等到这个时间窗口过去} else {// 检查错误比例阀值if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {//we are not past the minimum error threshold for the stat window,// so no change to circuit status.// if it was CLOSED, it stays CLOSED// if it was half-open, we need to wait for a successful command execution// if it was open, we need to wait for sleep window to elapse} else {// our failure rate is too high, we need to set the state to OPEN// 我们的失败率太高,我们需要将状态设置为OPENif (status.compareAndSet(Status.CLOSED, Status.OPEN)) {circuitOpened.set(System.currentTimeMillis());}}}}});}@Overridepublic void markSuccess() {// 断路器是处理半开并且HystrixCommand执行成功,将状态设置成关闭if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {//This thread wins the race to close the circuit - it resets the stream to start it over from 0//该线程赢得了关闭电路的竞争 - 它重置流以从0开始metrics.resetStream();Subscription previousSubscription = activeSubscription.get();if (previousSubscription != null) {previousSubscription.unsubscribe();}Subscription newSubscription = subscribeToStream();activeSubscription.set(newSubscription);circuitOpened.set(-1L);}}@Overridepublic void markNonSuccess() {// 断路器是处理半开并且HystrixCommand执行成功,将状态设置成打开if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {//This thread wins the race to re-open the circuit - it resets the start time for the sleep window// 该线程赢得了重新打开电路的竞争 - 它重置了睡眠窗口的开始时间circuitOpened.set(System.currentTimeMillis());}}@Overridepublic boolean isOpen() {// 获取配置判断断路器是否强制打开if (properties.circuitBreakerForceOpen().get()) {return true;}// 获取配置判断断路器是否强制关闭if (properties.circuitBreakerForceClosed().get()) {return false;}return circuitOpened.get() >= 0;}@Overridepublic boolean allowRequest() {// 获取配置判断断路器是否强制打开if (properties.circuitBreakerForceOpen().get()) {return false;}// 获取配置判断断路器是否强制关闭if (properties.circuitBreakerForceClosed().get()) {return true;}if (circuitOpened.get() == -1) {return true;} else {// 如果是半开状态则返回不允许Command执行if (status.get().equals(Status.HALF_OPEN)) {return false;} else {// 检查睡眠窗口是否过了return isAfterSleepWindow();}}}private boolean isAfterSleepWindow() {final long circuitOpenTime = circuitOpened.get();final long currentTime = System.currentTimeMillis();// 获取配置的一个睡眠的时间窗口final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();return currentTime > circuitOpenTime + sleepWindowTime;}@Overridepublic boolean attemptExecution() {// 获取配置判断断路器是否强制打开if (properties.circuitBreakerForceOpen().get()) {return false;}// 获取配置判断断路器是否强制关闭if (properties.circuitBreakerForceClosed().get()) {return true;}if (circuitOpened.get() == -1) {return true;} else {if (isAfterSleepWindow()) {//only the first request after sleep window should execute//if the executing command succeeds, the status will transition to CLOSED//if the executing command fails, the status will transition to OPEN//if the executing command gets unsubscribed, the status will transition to OPEN// 只有一个睡眠窗口后的第一个请求会被执行// 如果执行命令成功,状态将转换为CLOSED// 如果执行命令失败,状态将转换为OPEN// 如果执行命令取消订阅,状态将过渡到OPENif (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {return true;} else {6. List itemreturn false;}} else {return false;}}}
}
  • isOpen():判断熔断开关是否打开(该方法是否幂等和Hystrix版本相关)。
  • allowRequest():每个HystrixCommand请求都会询问是否允许继续执行(当断路器开关为OPENHALF_OPEN都时返回false,当断路器开关是CLOSE或到了下一个睡眠窗口时返回true),它是幂等的,不会修改任何内部状态,并考虑到半开逻辑,当一个睡眠窗口到来时他会放行一些请求到后续逻辑。
  • attemptExecution():在命令执行开始时调用以尝试执行,主要所用时判断该请求是否可以执行。这是非幂等的,它可能会修改内部状态。

这里需要注意的是allowRequest()方法时幂等的,可以重复调用;attemptExecution()方法是有副作用的,不可以重复调用;isOpen()是否幂等和Hystrix版本有关。

Hystrix核心原理和断路器源码解析相关推荐

  1. MySQL核心参数含义的源码解析

    引言 你访问的网站,大部分使用Apache服务器;你访问的网站,大部分使用Linux或BSD操作系统:你访问的网站,大部分使用MySQL数据库;你提交DNS域名查询请求大多由BIND服务器分析处理;你 ...

  2. 稀疏多项式的运算用链表_用最简单的大白话聊一聊面试必问的HashMap原理和部分源码解析...

    HashMap在面试中经常会被问到,一定会问到它的存储结构和实现原理,甚至可能还会问到一些源码 今天就来看一下HashMap 首先得看一下HashMap的存储结构和底层实现原理 如上图所示,HashM ...

  3. matlabeig函数根据什么原理_vue3.0 源码解析二 :响应式原理(下)

    一 回顾上文 上节我们讲了数据绑定proxy原理,vue3.0用到的基本的拦截器,以及reactive入口等等.调用reactive建立响应式,首先通过判断数据类型来确定使用的hander,然后创建p ...

  4. Laravel开发:Laravel核心——Ioc服务容器源码解析(服务器绑定)

    服务容器的绑定 bind 绑定 bind 绑定是服务容器最常用的绑定方式,在 上一篇文章中我们讨论过,bind 的绑定有三种: 绑定自身 绑定闭包 绑定接口 今天,我们这篇文章主要从源码上讲解 Ioc ...

  5. 社区发现算法原理与louvain源码解析

    前言 社区发现(community detection),或者社区切分,是一类图聚类算法,它主要作用是将图数据划分为不同的社区,社区内的节点都是连接紧密或者相似的,而社区与社区之间的节点连接则是稀疏的 ...

  6. SpringBoot四大核心之自动装配——源码解析

    四大核心 1.自动装配:简单配置甚至零配置即可运行项目 2.Actuator:springboot程序监控器 3.starter:jar包的引入,解决jar版本冲突问题 4.CLI:命令行 初学体验 ...

  7. Scrapy分布式原理及Scrapy-Redis源码解析(待完善)

    1 Scrapy分布式原理 2 队列用什么维护 首先想到的可能是一些特定数据结构, 数据库, 文件等等. 这里推荐使用Redis队列. 3 怎样来去重 保证Request队列每个request都是唯一 ...

  8. 基于postCss的TaiWindCss源码解析

    基于postCss的TaiWindCss源码解析 前言 了解 postCss 什么 是 postCss? postCss的核心原理/工作流 TaiWindCss 源码解析 TaiWindCss 是什么 ...

  9. Java线程池源码解析及高质量代码案例

    引言 本文为Java高级编程中的一些知识总结,其中第一章对Jdk 1.7.0_25中的多线程架构中的线程池ThreadPoolExecutor源码进行架构原理介绍以及源码解析.第二章则分析了几个违反J ...

最新文章

  1. linux重定向文件不存在,shell 12 21 filename重定向的含义和区别
  2. 谷歌验证 (Google Authenticator) 的实现原理是什么?
  3. Mysql字符串数据插入转义处理
  4. 爬虫requests高阶篇详细教程
  5. UIActivityViewController实现系统原生分享
  6. 【学习笔记】【C语言】进制
  7. Python中的lamda表达式
  8. 遗传算法是机器学习算法嘛?_基于遗传算法的机器人控制器方法
  9. Java项目源码分享——适合新手练手的Java Web项目
  10. JS打开新页面的两种方式:当前页面打开和新页面打开
  11. 多组学联合分析整体思路
  12. android 连接tftp 服务器
  13. 分治法_乒乓球比赛赛程安排(C语言)
  14. keil4 #pragma anon_unions
  15. 视觉感知在数据可视化中的作用
  16. android全景图
  17. python小游戏毕设 俄罗斯方块小游戏设计与实现 (源码)
  18. 如何将多行和多列转换为行和行Excel
  19. 设置代理让github加速
  20. 5G(IMT-2020)简介

热门文章

  1. 怎么解除pdf的加密,建议收藏这几种方法
  2. esxi6.7虚拟机网卡连接第二个虚拟交换机_NAS部署指南 群晖篇六—— NAS兼做路由器,群晖虚拟机套件教程...
  3. SAP 银企直连付款流程
  4. 运筹学基础【七】 之 网络计划技术
  5. Redis+MySQL冷热数据交换
  6. 禁止Docker使用iptables防火墙改为使用Firewalld
  7. 浏览器如何访问FTP目录
  8. Electron桌面悬浮球工具,支持拖动及配置,提供了待办事项、快速笔记等功能。
  9. LPCXpresso54114基于MDK的初试
  10. 数据库原理及应用实验报告-实验8-参照完整性