1. 什么是Hystrix

Hystrix是Netflix的一个开源框架,地址如下:https://github.com/Netflix/Hystrix

中文名为“豪猪”,即平时很温顺,在感受到危险的时候,用刺保护自己;在危险过去后,还是一个温顺的肉球。

所以,整个框架的核心业务也就是这2点:

  1. 何时需要保护

  2. 如何保护

2. 何时需要保护

对于一个系统而言,它往往承担着2层角色,服务提供者与服务消费者。对于服务消费者而言最大的痛苦就是如何“明哲保身”,做过网关项目的同学肯定感同身受

上面是一个常见的系统依赖关系,底层的依赖往往很多,通信协议包括 socket、HTTP、Dubbo、WebService等等。当通信层发生网络抖动以及所依赖的系统发生业务响应异常时,我们业务本身所提供的服务能力也直接会受到影响。

这种效果传递下去就很有可能造成雪崩效应,即整个业务联调发生异常,比如业务整体超时,或者订单数据不一致。

那么核心问题就来了,如何检测业务处于异常状态?

成功率!成功率直接反映了业务的数据流转状态,是最直接的业务表现。

当然,也可以根据超时时间做判断,比如 Sentinel 的实现。其实这里概念上可以做一个转化,用时间做超时控制,超时=失败,这依然是一个成功率的概念。

3. 如何保护 

如同豪猪一样,“刺”就是他的保护工具,所有的攻击都会被刺无情的怼回去。

在 Hystrix 的实现中,这就出现了“熔断器”的概念,即当前的系统是否处于需要保护的状态。

当熔断器处于开启的状态时,所有的请求都不会真正的走之前的业务逻辑,而是直接返回一个约定的信息,即 FallBack。通过这种快速失败原则保护我们的系统。

但是,系统不应该永远处于“有刺”的状态,当危险过后需要恢复正常。

于是对熔断器的核心操作就是如下几个功能:

  1. 如果成功率过低,就打开熔断器,阻止正常业务

  2. 随着时间的流动,熔断器处于半打开状态,尝试性放入一笔请求

  熔断器的核心 API 如下图:

4. 限流、熔断、隔离、降级

这四个概念是我们谈起微服务会经常谈到的概念,这里我们讨论的是 Hystrix 的实现方式。

限流

  • 这里的限流与 Guava 的 RateLimiter 的限流差异比较大,一个是为了“保护自我”,一个是“保护下游”

  • 当对服务进行限流时,超过的流量将直接 Fallback,即熔断。而 RateLimiter 关心的其实是“流量整形”,将不规整流量在一定速度内规整

熔断

  • 当我的应用无法提供服务时,我要对上游请求熔断,避免上游把我压垮

  • 当我的下游依赖成功率过低时,我要对下游请求熔断,避免下游把我拖垮

降级

  • 降级与熔断紧密相关,熔断后业务如何表现,约定一个快速失败的 Fallback,即为服务降级

隔离

  • 业务之间不可互相影响,不同业务需要有独立的运行空间

  • 最彻底的,可以采用物理隔离,不同的机器部

  • 次之,采用进程隔离,一个机器多个 Tomcat

  • 次之,请求隔离

  • 由于 Hystrix 框架所属的层级为代码层,所以实现的是请求隔离,线程池或信号量

5. 源码分析

先上一个 Hystrix 的业务流程图

可以看到 Hystrix 的请求都要经过 HystrixCommand 的包装,其核心逻辑在 AbstractComman.java 类中。

下面的源码是基于 RxJava 的,看之前最好先了解下 RxJava 的常见用法与逻辑,否则看起来会很迷惑。

简单的说,RxJava 就是基于回调的函数式编程。通俗的说,就等同于策略模式的匿名内部类实现。

5.1 熔断器

首先看信号量是如何影响我们请求的:

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {// 自定义扩展executionHook.onStart(_cmd);//判断熔断器是否允许请求过来if (circuitBreaker.attemptExecution()) {//获得分组信号量,如果没有采用信号量分组,返回默认通过的信号量实现final TryableSemaphore executionSemaphore = getExecutionSemaphore();final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);//调用终止的回调函数final Action0 singleSemaphoreRelease = new Action0() {@Overridepublic void call() {if (semaphoreHasBeenReleased.compareAndSet(false, true)) {executionSemaphore.release();}}};//调用异常的回调函数final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {@Overridepublic void call(Throwable t) {eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);}};//根据信号量尝试竞争信号量if (executionSemaphore.tryAcquire()) {try {//竞争成功,注册执行参数executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());return executeCommandAndObserve(_cmd).doOnError(markExceptionThrown).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease);} catch (RuntimeException e) {return Observable.error(e);}} else {//竞争失败,进入fallbackreturn handleSemaphoreRejectionViaFallback();}} else {//熔断器已打开,进入fallbackreturn handleShortCircuitViaFallback();}}

什么时候熔断器可以放请求进来:

@Overridepublic boolean attemptExecution() {//动态属性判断,熔断器是否强制开着,如果强制开着,就不允许请求if (properties.circuitBreakerForceOpen().get()) {return false;}//如果强制关闭,就允许请求if (properties.circuitBreakerForceClosed().get()) {return true;}//如果当前是关闭,就允许请求if (circuitOpened.get() == -1) {return true;} else {//如果当前开着,就看是否已经过了"滑动窗口",过了就可以请求,不过就不可以if (isAfterSleepWindow()) {//only the first request after sleep window should execute//if the executing command succeeds, the status will transition to CLOSED//if the executing command fails, the status will transition to OPEN//if the executing command gets unsubscribed, the status will transition to OPEN//这里使用CAS的方式,只有一个请求能过来,即"半关闭"状态if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {return true;} else {return false;}} else {return false;}}}}

这里有个重要概念就是"滑动窗口":

private boolean isAfterSleepWindow() {final long circuitOpenTime = circuitOpened.get();final long currentTime = System.currentTimeMillis();final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();//滑动窗口的判断就是看看熔断器打开的时间与现在相比是否超过了配置的滑动窗口return currentTime > circuitOpenTime + sleepWindowTime;}

5.2 隔离

如果将业务请求进行隔离?

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {//判断隔离策略是什么,是线程池隔离还是信号量隔离    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)//线程池隔离的运行逻辑如下return Observable.defer(new Func0<Observable<R>>() {@Overridepublic Observable<R> call() {executionResult = executionResult.setExecutionOccurred();if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));}//按照配置生成监控数据metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {// the command timed out in the wrapping thread so we will return immediately// and not increment any of the counters below or other such logicreturn Observable.error(new RuntimeException("timed out before executing run()"));}if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {//we have not been unsubscribed, so should proceedHystrixCounters.incrementGlobalConcurrentThreads();threadPool.markThreadExecution();// store the command that is being runendCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());executionResult = executionResult.setExecutedInThread();/*** If any of these hooks throw an exception, then it appears as if the actual execution threw an error*/try {//执行扩展点逻辑executionHook.onThreadStart(_cmd);executionHook.onRunStart(_cmd);executionHook.onExecutionStart(_cmd);return getUserExecutionObservable(_cmd);} catch (Throwable ex) {return Observable.error(ex);}} else {//command has already been unsubscribed, so return immediatelyreturn Observable.empty();}}//注册各种场景的回调函数}).doOnTerminate(new Action0() {@Overridepublic void call() {if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {handleThreadEnd(_cmd);}if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {//if it was never started and received terminal, then no need to clean up (I don't think this is possible)}//if it was unsubscribed, then other cleanup handled it}}).doOnUnsubscribe(new Action0() {@Overridepublic void call() {if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {handleThreadEnd(_cmd);}if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {//if it was never started and was cancelled, then no need to clean up}//if it was terminal, then other cleanup handled it}//将逻辑放在线程池的调度器上执行,即将上述逻辑放入线程池中}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {@Overridepublic Boolean call() {return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;}}));} else {//走到这里就是信号量隔离,在当前线程中执行,没有调度器return Observable.defer(new Func0<Observable<R>>() {@Overridepublic Observable<R> call() {executionResult = executionResult.setExecutionOccurred();if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));}metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);// semaphore isolated// store the command that is being runendCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());try {executionHook.onRunStart(_cmd);executionHook.onExecutionStart(_cmd);return getUserExecutionObservable(_cmd);  //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw} catch (Throwable ex) {//If the above hooks throw, then use that as the result of the run methodreturn Observable.error(ex);}}});}}

5.3 核心运行流程

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();//执行发生的回调final Action1<R> markEmits = new Action1<R>() {@Overridepublic void call(R r) {if (shouldOutputOnNextEvents()) {executionResult = executionResult.addEvent(HystrixEventType.EMIT);eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);}if (commandIsScalar()) {long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());circuitBreaker.markSuccess();}}};//执行成功的回调,标记下状态,熔断器根据这个状态维护熔断逻辑final Action0 markOnCompleted = new Action0() {@Overridepublic void call() {if (!commandIsScalar()) {long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());circuitBreaker.markSuccess();}}};//执行失败的回调final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {@Overridepublic Observable<R> call(Throwable t) {circuitBreaker.markNonSuccess();Exception e = getExceptionFromThrowable(t);executionResult = executionResult.setExecutionException(e);//各种回调进行各种fallbackif (e instanceof RejectedExecutionException) {return handleThreadPoolRejectionViaFallback(e);} else if (t instanceof HystrixTimeoutException) {return handleTimeoutViaFallback();} else if (t instanceof HystrixBadRequestException) {return handleBadRequestByEmittingError(e);} else {/** Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.*/if (e instanceof HystrixBadRequestException) {eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);return Observable.error(e);}return handleFailureViaFallback(e);}}};final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {@Overridepublic void call(Notification<? super R> rNotification) {setRequestContextIfNeeded(currentRequestContext);}};Observable<R> execution;if (properties.executionTimeoutEnabled().get()) {execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));} else {execution = executeCommandWithSpecifiedIsolation(_cmd);}//注册各种回调函数return execution.doOnNext(markEmits).doOnCompleted(markOnCompleted).onErrorResumeNext(handleFallback).doOnEach(setRequestContext);}

6. 小结

  • Hystrix 是基于单机应用的熔断限流框架

  • 根据熔断器的滑动窗口判断当前请求是否可以执行

  • 线程竞争实现“半关闭”状态,拿一个请求试试是否可以关闭熔断器

  • 线程池隔离将请求丢到线程池中运行,限流依靠线程池拒绝策略

  • 信号量隔离在当前线程中运行,限流依靠并发请求数

  • 当信号量竞争失败/线程池队列满,就进入限流模式,执行 Fallback

  • 当熔断器开启,就熔断请求,执行 Fallback 

  • 整个框架采用的 RxJava 的编程模式,回调函数满天飞 

关于“豪猪”,你理解的透彻吗?【Hystrix是个什么玩意儿】相关推荐

  1. 对javascript匿名函数的理解(透彻版)

    Query片段: view plain copy to clipboard print ? (function(){ //这里忽略jQuery所有实现 })(); 半年前初次接触jQuery的时候,我 ...

  2. 徒手撸了一个API网关,理解更透彻了,代码已上传github,自取~

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 一.背景 最近在github上看了soul网关的设计,突然 ...

  3. 手写一个网关服务,理解更透彻!

    点击关注公众号,实用技术文章及时了解 背景介绍 我们在工作中经常会需要处理http请求,通常都是基于SpringBoot应用直接接受外界的http请求,就如同下方的流程图所示: 但是随着后台应用的增加 ...

  4. 手写一个RPC框架,理解更透彻(附源码)

    作者:烟味i www.cnblogs.com/2YSP/p/13545217.html 一.前言 前段时间看到一篇不错的文章<看了这篇你就会手写RPC框架了>,于是便来了兴趣对着实现了一遍 ...

  5. 突然发现到今天已经很难找到对底层理解这么透彻的人

    为什么80%的码农都做不了架构师?>>>    链接: http://blog.csdn.net/elssann/archive/2004/10/25/150088.aspx 原文: ...

  6. 徒手撸了一个 API 网关,理解更透彻了,代码已上传github,自取~

    code小生 一个专注大前端领域的技术平台 公众号回复Android加入安卓技术群 来源 | cnblogs.com/2YSP/p/14223892.html 一.背景 最近在github上看了sou ...

  7. js原型和原型链的理解(透彻)

    1,原型 function Fn() {} 1)Fn是一个构造函数,每个构造函数都会自动生成一个prototype属性,指向一个空对象,这个空对象就是原型.每一个实例对象都会从原型继承属性和方法. 2 ...

  8. 【SpringCloud】Hystrix断路器【五】

    1. Hystrix理解 1.1 概述 1.2 背景 1.3 雪崩效应常见场景 1.4 雪崩效应应对策略 1.5 初探Hystrix 1.6 Hystrix流程图 2. 构建问题项目 2.2 构建cl ...

  9. 全方位,多角度理解ThreadLocal

    欢迎关注方志朋的博客,回复"666"获面试宝典 来源:https://blog.csdn.net/zzg1229059735/article/details/82715741 本次 ...

最新文章

  1. Python速度提升
  2. spark编程基础--5.2键值对RDD
  3. 嵌入式的我们为什么要学ROS
  4. 如何使用 Python 创建一名可操控的角色玩家
  5. char *a 与 char a[] 的区别
  6. java import class_@class vs. #import
  7. [已经验证通过]xp sp2 不支持WPA协议的解决办法
  8. node.js第一步
  9. 微信开发者工具 出现 Error:unable to verify the first cert?
  10. (八)linux驱动之ioctl的使用
  11. 超轻简洁个人引导页网站源码
  12. Delphi 7下最小化到系统托盘
  13. 无盘服务器性能测试,无盘系统性能测试及结语
  14. 用Intellij Idea创建一个普通的Java工程并用JDBC连接数据库
  15. okhttp配置缓存策略_一网打尽OkHttp中的缓存问题
  16. 学校新机房装系统——联想机房网络同传
  17. 易语言怎么给手机发短信,对接验证码短信接口DEMO示例
  18. 中国思想和柏拉图哲学( 转载)
  19. Online Judge系统大全
  20. Joda - 日期时间

热门文章

  1. vs2008界面查看
  2. HDU - 4292 Food(最大流+思维建边)
  3. boolean类型_JS核心理论之《数据类型、类型转换、深浅拷贝与参数传递》
  4. kicad最小布线宽度默认是多少_你想知道建仓库时叉车通道宽度留多少吗?
  5. 小数在内存中的存储表示
  6. HDU4405(概率DP求期望)
  7. 34.rust宏.txt
  8. ffmpeg推送摄像头rtmp流
  9. 遍历Windows系统的内核模块(源码)
  10. 漫游Kafka设计篇之数据持久化