Spring Cloud Hystrix 源码系列:工作原理
Hystrix 译为 "豪猪",豪猪的棘刺能保护自己不受天敌伤害,代表了强大的防御能力。Hystrix 基于 RxJava 进行实现,RxJava 是一种基于观察者模式的响应式编程框架。Spring Cloud Hystrix 基于 Netflix Hystrix 实现,具备服务降级、服务熔断、线程与信号隔离、请求缓存、请求合并以及服务监控等强大功能。本文基于hystrix-core 1.5.18(近年来几乎很少更新,建议升级)。
目录
1. 由来
2. 功能介绍
2.1 HystrixCommand/HystrixObservableCommand
2.2 基本用法
3. 工作原理
3.1 构建命令
3.2 执行命令
3.3 检查缓存
3.4 断路器是否打开
3.5 检查线程池/信号量情况
3.6 执行任务
3.7 断路器健康检查
3.8 失败时执行 Fallback
3.9 返回执行结果
1. 由来
在单体应用中,一类服务、一个线程、一个Bug等局部因素压垮整个系统也是屡见不鲜。微服务中,服务间依赖重重,通过隔离,很好的控制住风险范围,再结合请求拒绝和超时控制,有效剔除 “老鼠屎”,避免坏了一锅粥。总之,隔离设计是绝妙的防护罩。Netflix 将该组件取名为 Hystrix,宣言为 "defend your app",寓意应该是:当系统受到伤害时,能够像豪猪的棘刺一样保护系统。
2. 功能介绍
Hystrix主要提供了以下功能点:
- 熔断器(Circuit Breaker)
- 隔离(Isolation),提供璧仓模式,实现了线程池隔离和信号量隔离
- 回退(fallback),Hystrix会在run()执行过程中出现错误、超时、线程池拒绝、断路器熔断等情况时进行降级处理,有default fallback、单级fallback、多级fallback。
- 请求合并(Request Collapsing),@HystrixCollapser,适用于请求的合并,通过指定时间窗口@HystrixProperty(name = "timerDelayInMilliseconds", value = "50")及@HystrixProperty(name = "maxRequestsInBatch", value = "200")来执行批量方法,暂时不展开讲。
- 请求缓存(Request Caching)
- 仪表盘
2.1 HystrixCommand/HystrixObservableCommand
Hystrix有两个请求命令 HystrixCommand、HystrixObservableCommand。
HystrixCommand用在依赖服务返回单个操作结果的时候:
- execute():同步执行。从依赖的服务返回一个单一的结果对象,或是在发生错误的时候抛出异常。
- queue():异步执行。直接返回一个Future对象,其中包含了服务执行结束时要返回的单一结果对象。
HystrixObservableCommand用在依赖服务返回多个操作结果的时候:
- observe():返回Obervable对象,他代表了操作的多个结果,它是一个Hot Observable
- toObservable():同样返回Observable对象,也代表了操作多个结果,但它返回的是一个Cold Observable。
2.2 基本用法
有两种:手动自定义command和使用注解,手动自定义这里就不介绍了。这里介绍下注解需要两步(对破析源码有用):
第一步:隐式模式(用户不需要做什么,但你要知道),spirng boot会自动加载Feign的配置类HystrixAutoConfiguration(spring-cloud-netflix-core-1.4.4.RELEASE.jar/META-INF/spring.factories)
第二步:应用系统启动类中添加@EnableHystrix,它的作用是将spring.cloud.circuit.breaker.enabled设为true。
Hystrix默认配置都在HystrixCommandProperties类中,更多
###全集配置(设置熔断超时时间)
hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=10000
###局部配置
hystrix.command.hello.execution.isolation.thread.timeoutInMilliseconds=10000
public class CommandHelloWorld extends HystrixCommand<String> {private final String name;public CommandHelloWorld(String name) {super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));this.name = name;}@Overrideprotected String run() throws Exception {int i = 1/0;return "Hello " + name + "!";}/*** 降级* */@Overrideprotected String getFallback() {return "faild";}
}
public class ObservableCommandHelloWorld extends HystrixObservableCommand<String> {private final String name;public ObservableCommandHelloWorld(String name) {super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));this.name = name;}@Overrideprotected Observable<String> construct() {return Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {try {if(!subscriber.isUnsubscribed()) {subscriber.onNext("Hello");int i = 1 / 0; //模拟异常subscriber.onNext(name + "!");subscriber.onCompleted();}} catch (Exception e) {subscriber.onError(e);}}}).subscribeOn(Schedulers.io());}/*** 服务降级*/@Overrideprotected Observable<String> resumeWithFallback() {return Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {try {if (!subscriber.isUnsubscribed()) {subscriber.onNext("失败了!");subscriber.onNext("找大神来排查一下吧!");subscriber.onCompleted();}} catch (Exception e) {subscriber.onError(e);}}}).subscribeOn(Schedulers.io());}
}
public class CommandHelloWorldTest {@Testpublic void testAll() {//同步new CommandHelloWorld("World").execute();//异步Future<String> fWorld = new CommandHelloWorld("World").queue();//代码1,Hot Observable不论 “事件源” 是否有“订阅者”都会在创建后对事件进行发布。每一个“订阅者”都有可能从“事件源”的中途开始的,并可能只是看到了整个操作的局部过程Observable<String> ho = new CommandHelloWorld("World").observe();ho.subscribe(new Action1<String>() {@Overridepublic void call(String s) {System.out.println("call:" + s);}});//代码2,Cold Observable在没有 “订阅者” 的时候并不会发布,而是进行等待,知道有 “订阅者” 之后才发布事件Observable<String> co = new CommandHelloWorld("World").toObservable();System.out.println(co.toBlocking().single()); Observable<String> observable= new ObservableCommandHelloWorld("World").observe();Iterator<String> iterator = observable.toBlocking().getIterator();while(iterator.hasNext()) {System.out.println(iterator.next());}}
}
思考:HystrixCommand已具备了observe()和toObservable()的功能,和HystrixObservableCommand有和不同?
是的,但它的实现有一定的局限性,它返回的Observable只能发射一次数据,而HystrixObservableCommand实现的命令可以获取能发多次的Observable。
@HystrixCommand(fallbackMethod = "error")
public String hello() {return restTemplate.getForEntity("http://serviceName/hello", String.class).getBody();
}
public String error() {//多级降级return new FirstLevelFallbackCommand(tag).execute();
}
/*** LAZY参数表示使用toObservable()方式执行*/
@HystrixCommand(observableExecutionMode = ObservableExecutionMode.LAZY, fallbackMethod = "toObserbableError")
public Observable<String> getUserByName(final String name) {return Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {try {if(!subscriber.isUnsubscribed()) {subscriber.onNext("找到");subscriber.onNext(name);int i = 1/0; 抛异常,模拟服务降级subscriber.onNext("了");subscriber.onCompleted();}} catch (Exception e) {subscriber.onError(e);}}});
}
private static class FirstLevelFallbackCommand extends HystrixCommand<String> { private String tag; public FirstLevelFallbackCommand(String tag) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("FirstLevelFallbackCommand")));this.tag = tag;} @Overrideprotected String run() throws Exception {if ("error".equals(tag)) {throw new Exception("一级降级失败,二级降级处理");}return "成功";} @Overrideprotected String getFallback() {// 实际项目中,通常会在这里做残缺降级。System.out.println("二级降级执行成功");return "成功";}
}
思考:多级降级的时候,为何将降级command单独做一个线程池?
如果主流程的command都失败了,可能线程池都已经被占满了,降级command必须用自己的独立的线程池。
3. 工作原理
Hystrix 工作原理,当需要完成某项任务时,通过 Hystrix 将任务包裹起来,交由 Hystrix 来完成任务,从而享受 Hystrix 带来保护。这和古代镖局生意有点类似,将任务委托给镖局,以期安全完成任务。官方 Wiki 中对每一步都做了详细的描述,可以直接参考。下面 流程图 来源于 Hystrix Wiki
3.1 构建命令
前面讲过Hystrix 提供了两个Command,可以使用这两个对象来包裹待执行的任务。 注解@HystrixCommand标记方法,Hystrix 将利用AOP自动将目标方法包装成HystrixCommand来执行,也可以继承他们来创建Command。任务委托给 Hystrix 后,Hystrix 可以应用自己的一系列保护机制,在执行用户任务的各节点(执行前、执行后、异常、超时等)做一系列的事情。
3.2 执行命令
有四种方式执行command:
- R execute():同步执行,从依赖服务得到单一结果对象,实现为 queue().get()
- Future queue():异步执行,返回一个 Future 以便获取执行结果,也是单一结果对象,实现为 toObservable().toBlocking().toFuture()
- Observable observe():hot observable,创建Observable后会订阅Observable,可以返回多个结果
- Observable toObservable():cold observable,返回一个Observable,只有订阅时才会执行,可以返回多个结果
public R execute() {return queue().get();// 利用queue()拿到Future, 执行 get()同步等待拿到执行结果
}
public Future<R> queue() {// 实现为 toObservable().toBlocking().toFuture()final Future<R> delegate = toObservable().toBlocking().toFuture();return delegate;
}
//利用toObservable()得到Observable并直接订阅它,立即执行命令
public Observable<R> observe() {ReplaySubject<R> subject = ReplaySubject.create();final Subscription sourceSubscription = toObservable().subscribe(subject);...
}
3.3 检查缓存
第3到9步骤构成了 Hystrix 的保护能力,通过这一些列步骤来执行任务,从而起到保护作用。
如果启用了 Hystrix Cache,任务执行前将先判断是否有相同命令执行的缓存。如果有则直接返回缓存的结果;如果没有缓存的结果,但启动了缓存,将缓存本次执行结果以供后续使用。
3.4 断路器是否打开
断路器(circuit-breaker)和保险丝类似,保险丝在发生危险时将会烧断以保护电路,而断路器可以在达到我们设定的阀值时触发短路(比如请求失败率达到50%),拒绝执行任何请求。如果断路器被打开,Hystrix 将不会执行命令,直接进入Fallback处理逻辑。
3.5 检查线程池/信号量情况
Hystrix 隔离方式有线程池隔离和信号量隔离。当使用Hystrix线程池时,Hystrix 默认为每个依赖服务分配10个线程,当10个线程都繁忙时,将拒绝执行命令。信号量同理。
3.6 执行任务
通过HystrixObservableCommand.construct()或者 HystrixCommand.run()
来运行用户真正的任务。
3.7 断路器健康检查
每次开始执行command、结束执行command以及发生异常等情况时,都会记录执行情况,例如:成功、失败、拒绝以及超时等情况,会定期处理这些数据,再根据设定的条件来判断是否开启断路器。
3.8 失败时执行 Fallback
在命令失败时执行用户指定的 Fallback 逻辑。上图中的断路、线程池拒绝、信号量拒绝、执行执行、执行超时都会进入 Fallback 处理。
3.9 返回执行结果
原始结果将以Observable形式返回,在返回给用户之前,会根据调用方式的不同做一些处理。下面是 Hystrix Return flow
如果你对Hystrix 的源码比较感兴趣,可以看下一篇“HystrixCommandAspect入口解析”。
Spring Cloud Hystrix 源码系列:工作原理相关推荐
- Spring Cloud Gateway源码系列之路由配置加载过程
当前章节主要是讲解配置文件中定义的路由配置被gateway加载,同时转为可以直接操作的路由对象 引入pom坐标 <dependency><groupId>org.springf ...
- hystrix 源码 线程池隔离_Spring Cloud Hystrix 源码学习合集
# Spring Cloud Hystrix 源码学习合集 **Hystrix: Latency and Fault Tolerance for Distributed Systems** ![](h ...
- Spring Cloud脚手架源码
Spring Cloud脚手架源码 @(SpringCloud)[Spring Cloud,框架,组成] Spring Cloud脚手架源码 基本介绍 思维导图 源码 基本介绍 Spring Clou ...
- Java微服务组件Spring cloud ribbon源码分析
微服务组件Spring Cloud Ribbon源码分析_哔哩哔哩_bilibili Ribbon源码分析 | ProcessOn免费在线作图,在线流程图,在线思维导图 | 1.什么是ribbon? ...
- Spring Cloud部分源码分析Eureka,Ribbon,Feign,Zuul
Eureka SpringCloud Eureka使用NetFlix Eureka来实现的,它包括了服务端组件和客户端组件,并且都是用java 编写的. Eureka服务端就是服务注册中心, Eure ...
- Spring Cloud Gateway 源码解析(1) —— 基础
目录 Gateway初始化 启用Gateway GatewayClassPathWarningAutoConfiguration GatewayLoadBalancerClientAutoConfig ...
- 【SpringClould】Spring Cloud Eureka源码分析
文章目录 1.概述 1.1 Eureka的一些概念 2.源码分析 2.1 Eureka Server源码 2.1.1 `@EnableEurekaServer`注解 2.1.2 EurekaServe ...
- Spring Cloud Eureka 源码分析(一) 服务端启动过程
2019独角兽企业重金招聘Python工程师标准>>> 一. 前言 我们在使用Spring Cloud Eureka服务发现功能的时候,简单的引入maven依赖,且在项目入口类根据服 ...
- spring cloud | Hystrix断路器是如何工作的
Hystrix是什么 Hystrix是一个java类库,提供了服务容错保护 遇到的问题 请求响应时间过长,造成资源不能被及时释放.短时巨量请求造成资源耗尽,最终造成系统无法响应. 系统中一个服务服务出 ...
最新文章
- 集合objectjava_collection
- T型加速算法fpga实现思想研究
- KubeMeet 深圳站完整议题出炉
- 北京点击科技有限公司董事长兼总裁——王志东经典语录2
- 数据库连接池原理及常用连接池介绍
- Windows Phone本地数据库(SQLCE):3、[table]attribute(翻译) (转)
- 如何给微软提反馈建议以及bug
- 阿里云物联网平台体验(NetGadgeteer+C#篇)
- 最有效的更改linux 系统时区的方法
- [JSON]2017年最新县及县以上行政区划代码
- AppStore 预览图制作
- Pr 音频效果参考:其它
- mysql常见练习题45题
- cab和ocx什么区别_CAB的完整形式是什么?
- android n换行格式,Android 写文件生成器的时候换行请用\r\n
- Carla学习(一) 小车简单直线行走
- MySQL是什么?它有什么优势?
- 很久没有更新这边了。
- Window安装Prometheus
- [VB.NET]想做一个小界面,不知用什么做
热门文章
- idea使用git如何合并本地及其远程分支
- 西瓜书学习笔记——第十一章:特征选择与稀疏学习
- matlab整理符号表达式,[2018年最新整理]MATLAB符号运算与符号方程求解.ppt
- Bresenham算法详解
- Qt打包生成exe: 无法定位程序输入点
- mysql insert into 语句卡住?原因?
- python线程卡死问题解决_Python中的多线程:最后一个线程卡住了
- Android 实现压缩图片到任意尺寸
- 三星s20 android auto,Automagic一个更简单的方式来自动化您的Android手机 | MOS86
- cocos2dx多线程以及线程同步 与 cocos2dx内存管理与多线程问题