2019独角兽企业重金招聘Python工程师标准>>>

摘要: 原创出处 http://www.iocoder.cn/Hystrix/circuit-breaker/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Hystrix 1.5.X 版本

  • 1. 概述
  • 2. HystrixCircuitBreaker
  • 3. HystrixCircuitBreaker.Factory
  • 4. HystrixCircuitBreakerImpl
    • 4.1 构造方法
    • 4.2 #subscribeToStream()
    • 4.3 #attemptExecution()
    • 4.4 #markSuccess()
    • 4.5 #markNonSuccess()
    • 4.6 #allowRequest()
    • 4.7 #isOpen()
  • 666. 彩蛋

???关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 断路器 HystrixCircuitBreaker

HystrixCircuitBreaker 有三种状态 :

  • CLOSED :关闭
  • OPEN :打开
  • HALF_OPEN :半开

其中,断路器处于 OPEN 状态时,链路处于非健康状态,命令执行时,直接调用回退逻辑,跳过正常逻辑。

HystrixCircuitBreaker 状态变迁如下图 :


  • 红线 :初始时,断路器处于 CLOSED 状态,链路处于健康状态。当满足如下条件,断路器从 CLOSED 变成 OPEN 状态:

    • 周期( 可配,HystrixCommandProperties.default_metricsRollingStatisticalWindow = 10000 ms )内,总请求数超过一定( 可配,HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20 ) 。
    • 错误请求占总请求数超过一定比例( 可配,HystrixCommandProperties.circuitBreakerErrorThresholdPercentage = 50% ) 。
  • 绿线 :断路器处于 OPEN 状态,命令执行时,若当前时间超过断路器开启时间一定时间( HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds = 5000 ms ),断路器变成 HALF_OPEN 状态,尝试调用正常逻辑,根据执行是否成功,打开或关闭熔断器【蓝线】。


推荐 Spring Cloud 书籍

  • 请支持正版。下载盗版,等于主动编写低级 BUG
  • 程序猿DD —— 《Spring Cloud微服务实战》
  • 周立 —— 《Spring Cloud与Docker微服务架构实战》
  • 两书齐买,京东包邮。

2. HystrixCircuitBreaker

com.netflix.hystrix.HystrixCircuitBreaker ,Hystrix 断路器接口。定义接口如下代码 :

public interface HystrixCircuitBreaker {/*** Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not.  It is idempotent and does* not modify any internal state, and takes into account the half-open logic which allows some requests through* after the circuit has been opened* * @return boolean whether a request should be permitted*/boolean allowRequest();/*** Whether the circuit is currently open (tripped).* * @return boolean state of circuit breaker*/boolean isOpen();/*** Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.*/void markSuccess();/*** Invoked on unsuccessful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.*/void markNonSuccess();/*** Invoked at start of command execution to attempt an execution.  This is non-idempotent - it may modify internal* state.*/boolean attemptExecution();
}
  • #allowRequest()#attemptExecution() 方法,方法目的基本类似,差别在于当断路器满足尝试关闭条件时,前者不会将断路器不会修改状态( CLOSE => HALF-OPEN ),而后者会。

HystrixCircuitBreaker 有两个子类实现 :

  • NoOpCircuitBreaker :的断路器实现,用于不开启断路器功能的情况。
  • HystrixCircuitBreakerImpl :完整的断路器实现。

在 AbstractCommand 创建时,初始化 HystrixCircuitBreaker ,代码如下 :

/* package */abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {/*** 断路器*/protected final HystrixCircuitBreaker circuitBreaker;protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {// ... 省略无关代码// 初始化 断路器this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);// ... 省略无关代码}private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,HystrixCommandProperties properties, HystrixCommandMetrics metrics) {if (enabled) {if (fromConstructor == null) {// get the default implementation of HystrixCircuitBreakerreturn HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);} else {return fromConstructor;}} else {return new NoOpCircuitBreaker();}}}
  • HystrixCommandProperties.circuitBreakerEnabled = true 时,即断路器功能开启,使用 Factory 获得 HystrixCircuitBreakerImpl 对象。在 「3. HystrixCircuitBreaker.Factory」 详细解析。
  • HystrixCommandProperties.circuitBreakerEnabled = false 时,即断路器功能关闭,创建 NoOpCircuitBreaker 对象。另外,NoOpCircuitBreaker 代码简单到脑残,点击 链接 查看实现。

3. HystrixCircuitBreaker.Factory

com.netflix.hystrix.HystrixCircuitBreaker.Factory ,HystrixCircuitBreaker 工厂,主要用于:

  • 创建 HystrixCircuitBreaker 对象,目前只创建 HystrixCircuitBreakerImpl 。

  • HystrixCircuitBreaker 容器,基于 HystrixCommandKey 维护了 HystrixCircuitBreaker 单例对象 的映射。代码如下 :

    private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
    

整体代码灰常清晰,点击 链接 查看代码。

4. HystrixCircuitBreakerImpl

com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl完整的断路器实现。

我们来逐个方法看看 HystrixCircuitBreakerImpl 的具体实现。

4.1 构造方法

构造方法,代码如下 :

/* package */class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {private final HystrixCommandProperties properties;private final HystrixCommandMetrics metrics;enum Status {CLOSED, OPEN, HALF_OPEN}private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);private final AtomicLong circuitOpened = new AtomicLong(-1);private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {this.properties = properties;this.metrics = metrics;//On a timer, this will set the circuit between OPEN/CLOSED as command executions occurSubscription s = subscribeToStream();activeSubscription.set(s);}
}
  • Status 枚举类,断路器的三种状态。
  • status 属性,断路器的状态。
  • circuitOpened 属性,断路器打开,即状态变成 OPEN 的时间。
  • activeSubscription 属性,基于 Hystrix Metrics 对请求量统计 Observable 的订阅,在 「4.2 #subscribeToStream()」 详细解析。

4.2 #subscribeToStream()

#subscribeToStream() 方法,向 Hystrix Metrics 对请求量统计 Observable 的发起订阅。代码如下 :

private Subscription subscribeToStream() {1: private Subscription subscribeToStream() {2:     /*3:      * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream4:      */5:     return metrics.getHealthCountsStream()6:             .observe()7:             .subscribe(new Subscriber<HealthCounts>() {8:                 @Override9:                 public void onCompleted() {10: 11:                 }12: 13:                 @Override14:                 public void onError(Throwable e) {15: 16:                 }17: 18:                 @Override19:                 public void onNext(HealthCounts hc) {20:                     System.out.println("totalRequests" + hc.getTotalRequests()); // 芋艿,用于调试21:                     // check if we are past the statisticalWindowVolumeThreshold22:                     if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {23:                         // we are not past the minimum volume threshold for the stat window,24:                         // so no change to circuit status.25:                         // if it was CLOSED, it stays CLOSED26:                         // if it was half-open, we need to wait for a successful command execution27:                         // if it was open, we need to wait for sleep window to elapse28:                     } else {29:                         if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {30:                             //we are not past the minimum error threshold for the stat window,31:                             // so no change to circuit status.32:                             // if it was CLOSED, it stays CLOSED33:                             // if it was half-open, we need to wait for a successful command execution34:                             // if it was open, we need to wait for sleep window to elapse35:                         } else {36:                             // our failure rate is too high, we need to set the state to OPEN37:                             if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {38:                                 circuitOpened.set(System.currentTimeMillis());39:                             }40:                         }41:                     }42:                 }43:             });44: }
  • 第 5 至 7 行 :向 Hystrix Metrics 对请求量统计 Observable 的发起订阅。这里的 Observable 基于 RxJava Window 操作符。

    FROM 《ReactiveX文档中文翻译》「Window」
    定期将来自原始 Observable 的数据分解为一个 Observable 窗口,发射这些窗口,而不是每次发射一项数据

    • 简单来说,固定间隔,#onNext() 方法将不断被调用,每次计算断路器的状态。
  • 第 22 行 :判断周期( 可配,HystrixCommandProperties.default_metricsRollingStatisticalWindow = 10000 ms )内,总请求数超过一定( 可配,HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20 ) 。

    • 这里要注意下,请求次数统计的是周期内,超过周期的不计算在内。例如说,00:00 内发起了 N 个请求,00:11 不计算这 N 个请求。
  • 第 29 行 :错误请求占总请求数超过一定比例( 可配,HystrixCommandProperties.circuitBreakerErrorThresholdPercentage = 50% ) 。

  • 第 37 至 39 行 :满足断路器打开条件,CAS 修改状态( CLOSED => OPEN ),并设置打开时间( circuitOpened ) 。

  • 补充】第 5 至 7 行 :? 怕写在上面,大家有压力。Hystrix Metrics 对请求量统计 Observable 使用了两种 RxJava Window 操作符 :

    • Observable#window(timespan, unit) 方法,固定周期( 可配,HystrixCommandProperties.metricsHealthSnapshotIntervalInMilliseconds = 500 ms ),发射 Observable 窗口。点击 BucketedCounterStream 构造方法 查看调用处的代码。
    • Observable#window(count, skip) 方法,每发射一次(skip) Observable 忽略 count ( 可配,HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20 ) 个数据项。为什么?答案在第 22 行的代码,周期内达到一定请求量是断路器打开的一个条件。点击 BucketedRollingCounterStream 构造方法 查看调用处的代码。

目前该方法有两处调用 :

  • 「4.1 构造方法」,在创建 HystrixCircuitBreakerImpl 时,向 Hystrix Metrics 对请求量统计 Observable 的发起订阅。固定间隔,计算断路器是否要关闭( CLOSE )。
  • 「4.4 #markSuccess()」,清空 Hystrix Metrics 对请求量统计 Observable 的统计信息,取消原有订阅,并发起新的订阅。

4.3 #attemptExecution()

如下是 AbstractCommand#applyHystrixSemantics(_cmd) 方法,对 HystrixCircuitBreakerImpl#attemptExecution 方法的调用的代码 :

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {// ...  省略无关代码/* determine if we're allowed to execute */if (circuitBreaker.attemptExecution()) {// 执行【正常逻辑】} else {// 执行【回退逻辑】}
}
  • 使用 HystrixCircuitBreakerImpl#attemptExecution 方法,判断是否可以执行正常逻辑

#attemptExecution 方法,代码如下 :

  1: @Override2: public boolean attemptExecution() {3:     // 强制 打开4:     if (properties.circuitBreakerForceOpen().get()) {5:         return false;6:     }7:     // 强制 关闭8:     if (properties.circuitBreakerForceClosed().get()) {9:         return true;10:     }11:     // 打开时间为空12:     if (circuitOpened.get() == -1) {13:         return true;14:     } else {15:         // 满足间隔尝试断路器时间16:         if (isAfterSleepWindow()) {17:             //only the first request after sleep window should execute18:             //if the executing command succeeds, the status will transition to CLOSED19:             //if the executing command fails, the status will transition to OPEN20:             //if the executing command gets unsubscribed, the status will transition to OPEN21:             if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {22:                 return true;23:             } else {24:                 return false;25:             }26:         } else {27:             return false;28:         }29:     }30: }
  • 第 4 至 6 行 :当 HystrixCommandProperties.circuitBreakerForceOpen = true ( 默认值 :false) 时,即断路器强制打开,返回 false 。当该配置接入配置中心后,可以动态实现打开熔断。为什么会有该配置?当 HystrixCircuitBreaker 创建完成后,无法动态切换 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,通过该配置以实现类似效果。
  • 第 8 至 10 行 :当 HystrixCommandProperties.circuitBreakerForceClose = true ( 默认值 :false) 时,即断路器强制关闭,返回 true 。当该配置接入配置中心后,可以动态实现关闭熔断。为什么会有该配置?当 HystrixCircuitBreaker 创建完成后,无法动态切换 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,通过该配置以实现类似效果。
  • 第 12 至 13 行 :断路器打开时间( circuitOpened ) 为"空",返回 true
  • 第 16 至 28 行 :调用 #isAfterSleepWindow() 方法,判断是否满足尝试调用正常逻辑的间隔时间。当满足,使用 CAS 方式修改断路器状态( OPEN => HALF_OPEN ),从而保证有且仅有一个线程能够尝试调用正常逻辑。

#isAfterSleepWindow() 方法,代码如下 :

private boolean isAfterSleepWindow() {final long circuitOpenTime = circuitOpened.get();final long currentTime = System.currentTimeMillis();final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();return currentTime > circuitOpenTime + sleepWindowTime;
}
  • 当前时间超过断路器打开时间 HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds ( 默认值,5000 ms ),返回 true

4.4 #markSuccess()

当尝试调用正常逻辑成功时,调用 #markSuccess() 方法,关闭断路器。代码如下 :

  1: @Override2: public void markSuccess() {3:     if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {4:         // 清空 Hystrix Metrics 对请求量统计 Observable 的**统计信息**5:         //This thread wins the race to close the circuit - it resets the stream to start it over from 06:         metrics.resetStream();7:         // 取消原有订阅8:         Subscription previousSubscription = activeSubscription.get();9:         if (previousSubscription != null) {10:             previousSubscription.unsubscribe();11:         }12:         // 发起新的订阅13:         Subscription newSubscription = subscribeToStream();14:         activeSubscription.set(newSubscription);15:         // 设置断路器打开时间为空16:         circuitOpened.set(-1L);17:     }18: }
  • 第 3 行 :使用 CAS 方式,修改断路器状态( HALF_OPEN => CLOSED )。
  • 第 6 行 :清空 Hystrix Metrics 对请求量统计 Observable 的统计信息
  • 第 8 至 14 行 :取消原有订阅,发起新的订阅。
  • 第 16 行 :设置断路器打开时间为"空" 。

如下两处调用了 #markNonSuccess() 方法 :

  • markEmits
  • markOnCompleted

4.5 #markNonSuccess()

当尝试调用正常逻辑失败时,调用 #markNonSuccess() 方法,重新打开断路器。代码如下 :

  1: @Override2: public void markNonSuccess() {3:     if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {4:         //This thread wins the race to re-open the circuit - it resets the start time for the sleep window5:         circuitOpened.set(System.currentTimeMillis());6:     }7: }
  • 第 3 行 :使用 CAS 方式,修改断路器状态( HALF_OPEN => OPEN )。
  • 第 5 行 :设置设置断路器打开时间为当前时间。这样,#attemptExecution() 过一段时间,可以再次尝试执行正常逻辑。

如下两处调用了 #markNonSuccess() 方法 :

  • handleFallback
  • unsubscribeCommandCleanup

4.6 #allowRequest()

#allowRequest()#attemptExecution() 方法,方法目的基本类似,差别在于当断路器满足尝试关闭条件时,前者不会将断路器不会修改状态( CLOSE => HALF-OPEN ),而后者会。点击 链接 查看代码实现。

4.7 #isOpen()

#isOpen() 方法,比较简单,点击 链接 查看代码实现。

666. 彩蛋

呼呼,相对比较干净的一篇文章,满足。

胖友,分享一波朋友圈可好!

转载于:https://my.oschina.net/sword4j/blog/1603318

熔断器 Hystrix 源码解析 —— 断路器 HystrixCircuitBreaker相关推荐

  1. 熔断器 Hystrix 源码解析 —— 命令执行(三)之执行超时

    2019独角兽企业重金招聘Python工程师标准>>> 摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-third-ti ...

  2. Java熔断框架有哪些_降级熔断框架 Hystrix 源码解析:滑动窗口统计

    降级熔断框架 Hystrix 源码解析:滑动窗口统计 概述 Hystrix 是一个开源的降级熔断框架,用于提高服务可靠性,适用于依赖大量外部服务的业务系统.什么是降级熔断呢? 降级 业务降级,是指牺牲 ...

  3. 三. 微服务源码阅读-Hystrix 源码

    3. Hystrix 源码 1. 断路器开关 @SpringBootApplication(scanBasePackages = {"len.hgy"}) //注册到eureka ...

  4. RxJava 源码解析 —— Observable#defer(...)

    摘要: 原创出处 http://www.iocoder.cn/RxJava/observable-defer/ 「芋道源码」欢迎转载,保留摘要,谢谢! 本文主要基于 RxJava 1.2.X 版本 本 ...

  5. hystrix 源码 线程池隔离_Hystrix源码学习--线程池隔离

    分析你的系统 你所认识的分布式系统,哪些是可以进行垂直拆分的?拆分之后系统之间的依赖如何梳理?系统异构之后的稳定性调用如何保证?这些都是可能在分布式场景中面临的问题. 说个比较常见的问题,大家都知道秒 ...

  6. go-resiliency源码解析之-timeout

    go-resiliency源码解析之-timeout 1.go-resiliency简介 ​ 今天看到项目里用到了go-resiliency这个库,库整体比较简单,代码量不大.主要实现go中几种常见的 ...

  7. 【详解】Ribbon 负载均衡服务调用原理及默认轮询负载均衡算法源码解析、手写

    Ribbon 负载均衡服务调用 一.什么是 Ribbon 二.LB负载均衡(Load Balancer)是什么 1.Ribbon 本地负载均衡客户端 VS Nginx 服务端负载均衡的区别 2.LB负 ...

  8. hystrix 源码 线程池隔离_Spring Cloud Hystrix 源码学习合集

    # Spring Cloud Hystrix 源码学习合集 **Hystrix: Latency and Fault Tolerance for Distributed Systems** ![](h ...

  9. 谷歌BERT预训练源码解析(二):模型构建

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/weixin_39470744/arti ...

  10. 谷歌BERT预训练源码解析(三):训练过程

    目录 前言 源码解析 主函数 自定义模型 遮蔽词预测 下一句预测 规范化数据集 前言 本部分介绍BERT训练过程,BERT模型训练过程是在自己的TPU上进行的,这部分我没做过研究所以不做深入探讨.BE ...

最新文章

  1. idea上实现github代码同步
  2. java 移动页面中的图片上传_移动端图片操作——上传
  3. lbs的核心技术都有哪些?_哪些行业适合做小程序呢?
  4. Exchange Server 2016管理系列课件46.DAG管理之Powershell创建DAG
  5. 欠债3000亿,宣布破产!昔日民族品牌,为何总沦为反面教材?
  6. docker swarm k8s比较_Docker 图形化管理又有更新了
  7. java day41【JSP 、MVC开发模式 、EL表达式 、JSTL标签 、三层架构】
  8. Games102_lecture8几何建模与处理基础_离散微分几何,Utopia框架介绍
  9. 怎么制作升温曲线图_如何在EXcel做体温曲线图
  10. mysql 1556_mysqldump: Got error: 1556: You can't use locks with log tables. when doing LOCK TABLES
  11. 制造业数字化转型内涵和过程
  12. ASEMI整流二极管10A10参数,10A10压降,10A10作用
  13. URL编码和Base64编码
  14. 实战OpenPose项目4:实时准确的全身多人姿态估计和跟踪系统
  15. Java导出带有单选款(radio)和复选框(checkbox)选中效果的word doc文档-Freemarker实现方式
  16. at91sam9x5ek linux 4,为AT91SAM9X5-EK开发板建立linux目标文件
  17. 使用python生成crc对照表
  18. 美团大众点评 Hybrid 化建设
  19. (保姆教学)Failed to connect to github.com port 443 after 21094 ms: Timed out
  20. VMware6.0U3 VSAN配置

热门文章

  1. jsp+servlet+mysql的简单使用
  2. 踩坑日记(二):记一次线上业务—Redis 的缓存雪崩
  3. 架构运维篇(二):Centos7/Linux安装部署Tomcat环境
  4. 微信小程序微商城(六):动态API实现新品特卖商品流式布局
  5. 10月24号、25号、26号三天PC端云音乐项目总结
  6. 硬盘变成脱机状态(由于管理员设置的策略,该磁盘处于脱机状态)
  7. 在python3.X中执行python manage.py migrate命令的坑
  8. python怎么理解函数的参数_Python中函数参数理解
  9. 公司买的机器不能自己装系统,问对方几天没回一个字
  10. 使用码云下载github的代码