降级熔断框架 Hystrix 源码解析:滑动窗口统计

概述

Hystrix 是一个开源的降级熔断框架,用于提高服务可靠性,适用于依赖大量外部服务的业务系统。什么是降级熔断呢?

降级

业务降级,是指牺牲非核心的业务功能,保证核心功能的稳定运行。简单来说,要实现优雅的业务降级,需要将功能实现拆分到相对独立的不同代码单元,分优先级进行隔离。在后台通过开关控制,降级部分非主流程的业务功能,减轻系统依赖和性能损耗,从而提升集群的整体吞吐率。

降级的重点是:业务之间有优先级之分。降级的典型应用是:电商活动期间关闭非核心服务,保证核心买买买业务的正常运行。

熔断

老式电闸都安装了保险丝,一旦有人使用超大功率的设备,保险丝就会烧断以保护各个电器不被强电流给烧坏。同理我们的接口也需要安装上“保险丝”,以防止非预期的请求对系统压力过大而引起的系统瘫痪,当流量过大时,可以采取拒绝或者引流等机制。

同样在分布式系统中,当被调用的远程服务无法使用时,如果没有过载保护,就会导致请求的资源阻塞在远程服务器上耗尽资源。很多时候,刚开始可能只是出现了局部小规模的故障,然而由于种种原因,故障影响范围越来越大,最终导致全局性的后果。这种过载保护,就是熔断器。

在 hystrix 中,熔断相关的配置有以下几个:滑动窗口长度,单位毫秒

hystrix.command.HystrixCommandKey.circuitBreaker.sleepWindowInMilliseconds

滑动窗口滚动桶的长度,单位毫秒

hystrix.command.HystrixCommandKey.metrics.rollingPercentile.bucketSize

触发熔断的失败率阈值

hystrix.command.HystrixCommandKey.circuitBreaker.errorThresholdPercentage

触发熔断的请求量阈值

hystrix.command.HystrixCommandKey.circuitBreaker.requestVolumeThreshold

从配置信息里可以看出来,熔断逻辑判断里使用了滑动窗口来统计服务调用的成功、失败量。那么这里的滑动窗口是如何实现的呢?下面我们深入源码来研究一下。

注:使用的源码版本是 2017-09-13 GitHub 上 master 分支最新代码。

滑动窗口

在 hystrix 里,大量使用了 RxJava 这个响应式函数编程框架,滑动窗口的实现也是使用了 RxJava 框架。

源码分析

一个滑动窗口有两个关键要素组成:窗口时长、窗口滚动时间间隔。通常一个窗口会划分为若干个桶 bucket,每个桶的大小等于窗口滚动时间间隔。也就是说,滑动窗口统计数据时,分两步:统计一个 bucket 内的数据;

统计一个窗口,即若干个 bucket 的数据。

bucket 统计的代码位于 BucketedCounterStream 类中,其关键的代码如下所示:// 这里的代码并非全部,只展示了和 bucket 统计相关的关键代码public abstract class BucketedCounterStream {    protected final int numBuckets;    protected final Observable bucketedStream;    protected final AtomicReference subscription = new AtomicReference(null);    private final Func1, Observable> reduceBucketToSummary;    protected BucketedCounterStream(final HystrixEventStream inputEventStream, final int numBuckets, final int bucketSizeInMs,                                    final Func2 appendRawEventToBucket) {        this.numBuckets = numBuckets;        this.reduceBucketToSummary = new Func1, Observable>() {            @Override

public Observable call(Observable eventBucket) {                return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);

}

};        final List emptyEventCountsToStart = new ArrayList();        for (int i = 0; i

emptyEventCountsToStart.add(getEmptyBucketSummary());

}        this.bucketedStream = Observable.defer(new Func0>() {            @Override

public Observable call() {                return inputEventStream

.observe()

.window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext

.flatMap(reduceBucketToSummary)                //for a given bucket, turn it into a long array containing counts of event types

.startWith(emptyEventCountsToStart);           //start it with empty arrays to make consumer logic as generic as possible (windows are always full)

}

});

}    abstract Bucket getEmptyBucketSummary();

}

首先我们看这几行代码,这几行代码功能是:将服务调用级别的输入数据流 inputEventStream 以 bucketSizeInMs 毫秒为一个桶进行了汇总,汇总的结果输入到桶级别数据流 bucketedStream。this.bucketedStream = Observable.defer(new Func0>() {            @Override

public Observable call() {                return inputEventStream

.observe()

.window(bucketSizeInMs, TimeUnit.MILLISECONDS) // window 窗函数汇聚 bucketSizeInMs 毫秒内的数据后,每隔 bucketSizeInMs 毫秒批量发送出去

.flatMap(reduceBucketToSummary)                // flatMap 方法接收到 window 窗函数发来的数据,使用 reduceBucketToSummary 函数进行汇总统计

.startWith(emptyEventCountsToStart);           // 给 bucketedStream 发布源设定一个起始值

}

});

RxJava 基于观察者模式,又叫“发布-订阅”模式。inputEventStream 是 HystrixEventStream 对象,其 observe() 方法返回的是一个被观察者 Observable 对象,也可以说是一个发布源 Publisher。public interface HystrixEventStream {    Observable observe();

}

在 Hystrix 中有多种数据发布源,与服务调用的熔断相关的是 HystrixCommandCompletionStream:每一次服务调用结束,调用 write 方法记录成功、失败等信息;

write 方法调用了 writeOnlySubject.onNext,writeOnlySubject 是一个线程安全的发布源 PublishSubject,用于发布 HystrixCommandCompletion 类型的数据,onNext 功能是发布一个事件或数据;

observe 方法返回的可订阅数据源 readOnlyStream 是 writeOnlySubject 的只读版本。public class HystrixCommandCompletionStream implements HystrixEventStream {    private final HystrixCommandKey commandKey; // 服务调用标记 key

private final Subject writeOnlySubject;    private final Observable readOnlyStream;

HystrixCommandCompletionStream(final HystrixCommandKey commandKey) {        this.commandKey = commandKey;        this.writeOnlySubject = new SerializedSubject(PublishSubject.create());        this.readOnlyStream = writeOnlySubject.share();

}    public void write(HystrixCommandCompletion event) {

writeOnlySubject.onNext(event);

}    @Override

public Observable observe() {        return readOnlyStream;

}

}

上面分析了 bucket 统计和事件发布源相关的代码,下面我们再看一下 window 统计的代码。滑动窗口统计的代码在 BucketedRollingCounterStream 类中,window 统计和 bucket 统计原理是一样的,只是维度不同:bucket 统计的维度是时间,比如 bucketSizeInMs 毫秒;

window 统计的维度是若干数据,在这里是 numBuckets 个 bucket。

注意:numBuckets 的值等于 hystrix.command.HystrixCommandKey.circuitBreaker.sleepWindowInMilliseconds 除以 hystrix.command.HystrixCommandKey.metrics.rollingPercentile.bucketSize,numBuckets 是整数,所以 sleepWindowInMilliseconds 必须是 bucketSize 的整数倍,否则 Hystrix 就会抛出异常。public abstract class BucketedRollingCounterStream extends BucketedCounterStream {    private Observable sourceStream;    private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);    protected BucketedRollingCounterStream(HystrixEventStream stream, final int numBuckets, int bucketSizeInMs,                                           final Func2 appendRawEventToBucket,                                           final Func2 reduceBucket) {        super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);

Func1, Observable> reduceWindowToSummary = new Func1, Observable>() {            @Override

public Observable call(Observable window) {                return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);

}

};        this.sourceStream = bucketedStream      //stream broken up into buckets

.window(numBuckets, 1)          //emit overlapping windows of buckets

.flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary

.doOnSubscribe(new Action0() {                    @Override

public void call() {

isSourceCurrentlySubscribed.set(true);

}

})

.doOnUnsubscribe(new Action0() {                    @Override

public void call() {

isSourceCurrentlySubscribed.set(false);

}

})

.share()                        // multiple subscribers should get same data

.onBackpressureDrop();          // 如果消费者处理数据太慢导致数据堆积,就丢弃部分数据

}    @Override

public Observable observe() {        return sourceStream;

}

}

接下来我们介绍一下 BucketedRollingCounterStream 构造函数的主要参数:HystrixEventStream stream:数据发布源;

int numBuckets:每个窗口内部 bucket 个数;

int bucketSizeInMs:bucket 时长,也是窗口滚动时间间隔;

appendRawEventToBucket:bucket 内部统计函数,其功能是起始值 Bucket 加上 Event 后,输出 Bucket 类型值,对对个数据的处理具有累积的效果;

reduceBucket:和 appendRawEventToBucket 类似,用于 window 统计。

BucketedRollingCounterStream 提供了完整的滑动窗口统计的服务,想要使用滑动窗口来统计数据的继承实现 BucketedRollingCounterStream 即可。 接下来我们看一下用于滑动统计服务调用成功、失败次数的 RollingCommandEventCounterStream 类:public class RollingCommandEventCounterStream extends BucketedRollingCounterStream {    private static final ConcurrentMap streams = new ConcurrentHashMap();    private static final int NUM_EVENT_TYPES = HystrixEventType.values().length;    public static RollingCommandEventCounterStream getInstance(HystrixCommandKey commandKey, int numBuckets, int bucketSizeInMs) {

RollingCommandEventCounterStream initialStream = streams.get(commandKey.name());        if (initialStream != null) {            return initialStream;

} else {            synchronized (RollingCommandEventCounterStream.class) {

RollingCommandEventCounterStream existingStream = streams.get(commandKey.name());                if (existingStream == null) {

RollingCommandEventCounterStream newStream = new RollingCommandEventCounterStream(commandKey, numBuckets, bucketSizeInMs,

HystrixCommandMetrics.appendEventToBucket, HystrixCommandMetrics.bucketAggregator);

streams.putIfAbsent(commandKey.name(), newStream);                    return newStream;

} else {                    return existingStream;

}

}

}

}    private RollingCommandEventCounterStream(HystrixCommandKey commandKey, int numCounterBuckets, int counterBucketSizeInMs,

Func2 reduceCommandCompletion,

Func2 reduceBucket) {        super(HystrixCommandCompletionStream.getInstance(commandKey), numCounterBuckets, counterBucketSizeInMs, reduceCommandCompletion, reduceBucket);

}

}

RollingCommandEventCounterStream 构造函数是私有的,需要通过 getInstance 方法来获取实例,这么做是为了确保每个依赖服务 HystrixCommandKey 只生成一个 RollingCommandEventCounterStream 实例。我们看一下构造 BucketedRollingCounterStream 的时候传入的参数,appendRawEventToBucket、reduceBucket 的实现分别是 HystrixCommandMetrics.appendEventToBucket、HystrixCommandMetrics.bucketAggregator,其主要功能就是一个对各种 HystrixEventType 事件的累加求和。public class HystrixCommandMetrics extends HystrixMetrics {    private static final HystrixEventType[] ALL_EVENT_TYPES = HystrixEventType.values();    public static final Func2 appendEventToBucket = new Func2() {        @Override

public long[] call(long[] initialCountArray, HystrixCommandCompletion execution) {

ExecutionResult.EventCounts eventCounts = execution.getEventCounts();            for (HystrixEventType eventType: ALL_EVENT_TYPES) {                switch (eventType) {                    case EXCEPTION_THROWN: break; //this is just a sum of other anyway - don't do the work here

default:

initialCountArray[eventType.ordinal()] += eventCounts.getCount(eventType);                        break;

}

}            return initialCountArray;

}

};    public static final Func2 bucketAggregator = new Func2() {        @Override

public long[] call(long[] cumulativeEvents, long[] bucketEventCounts) {            for (HystrixEventType eventType: ALL_EVENT_TYPES) {                switch (eventType) {                    case EXCEPTION_THROWN:                        for (HystrixEventType exceptionEventType: HystrixEventType.EXCEPTION_PRODUCING_EVENT_TYPES) {

cumulativeEvents[eventType.ordinal()] += bucketEventCounts[exceptionEventType.ordinal()];

}                        break;                    default:

cumulativeEvents[eventType.ordinal()] += bucketEventCounts[eventType.ordinal()];                        break;

}

}            return cumulativeEvents;

}

};

}

这个滑动窗口是在 Hystrix 哪里使用的呢?必然是熔断逻辑里啊。熔断逻辑位于 HystrixCircuitBreaker 类中,其使用滑动窗口的关键代码如下。主要是调用了 BucketedRollingCounterStream 的 observe 方法,对统计数据的发布源进行了订阅,收到统计数据后,对熔断器状态 circuitOpened 进行更新。/* package */class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {        private final HystrixCommandProperties properties;        private final HystrixCommandMetrics metrics;        enum Status {

CLOSED, OPEN, HALF_OPEN;

}        private final AtomicReference status = new AtomicReference(Status.CLOSED);        private final AtomicLong circuitOpened = new AtomicLong(-1);        private final AtomicReference activeSubscription = new AtomicReference(null);        protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {            this.properties = properties;            this.metrics = metrics;            //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur

Subscription s = subscribeToStream();

activeSubscription.set(s);

}        private Subscription subscribeToStream() {            return metrics.getHealthCountsStream()

.observe()

.subscribe(new Subscriber() {                        @Override

public void onCompleted() {

}                        @Override

public void onError(Throwable e) {

}                        @Override

public void onNext(HealthCounts hc) {                            // 判断请求次数,是否达到阈值。毕竟请求量太小,熔断的意义也就不大了

if (hc.getTotalRequests()

} else {                                // 判断失败率是否达到阈值

if (hc.getErrorPercentage()

} else {                                    // 失败率达到阈值,则修改熔断状态为 OPEN

if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {

circuitOpened.set(System.currentTimeMillis());

}

}

}

}

});

}

}

手动写一个示例

前面解析了 Hystrix 中滑动窗口的实现,由于考虑了各种细节其实现非常复杂,所以我们写了一个简易版本的滑动窗口统计,方便观察学习。import org.slf4j.Logger;import org.slf4j.LoggerFactory;import rx.Observable;import rx.functions.Func1;import rx.functions.Func2;import rx.subjects.PublishSubject;import rx.subjects.SerializedSubject;import java.util.concurrent.TimeUnit;/**

* 模拟滑动窗口计数

* Created by albon on 17/6/24.

*/public class RollingWindowTest {    private static final Logger logger = LoggerFactory.getLogger(WindowTest.class);    public static final Func2 INTEGER_SUM =

(integer, integer2) -> integer + integer2;    public static final Func1, Observable> WINDOW_SUM =

window -> window.scan(0, INTEGER_SUM).skip(3);    public static final Func1, Observable> INNER_BUCKET_SUM =

integerObservable -> integerObservable.reduce(0, INTEGER_SUM);    public static void main(String[] args) throws InterruptedException {

PublishSubject publishSubject = PublishSubject.create();

SerializedSubject serializedSubject = publishSubject.toSerialized();

serializedSubject

.window(5, TimeUnit.SECONDS) // 5秒作为一个基本块

.flatMap(INNER_BUCKET_SUM)           // 基本块内数据求和

.window(3, 1)              // 3个块作为一个窗口,滚动布数为1

.flatMap(WINDOW_SUM)                 // 窗口数据求和

.subscribe((Integer integer) ->

logger.info("[{}] call ...... {}", // 输出统计数据到日志

Thread.currentThread().getName(), integer));        // 缓慢发送数据,观察效果

for (int i=0; i<100; ++i) {            if (i

serializedSubject.onNext(1);

} else {

serializedSubject.onNext(2);

}

Thread.sleep(1000);

}

}

}

总结

一个滑动窗口统计主要分为两步:bucket 统计,bucket 的大小决定了滑动窗口滚动时间间隔;

window 统计,window 的时长决定了包含的 bucket 的数目。

Hystrix 实现滑动窗口利用了 RxJava 这个响应式函数编程框架,主要是其中的几个函数:window:根据指定时间或指定数量对数据流进行聚集,相当于 1 对 N 的转换;

flatMap:将输入数据流,转换成另一种格式的数据流,在滑动窗口统计中起到了数据求和的功能(当然其功能并不限于求和)。

Hystrix 最核心的基础组件,当属提供观察者模式(发布-订阅模式)的 RxJava。

参考文献

作者:albon

链接:https://www.jianshu.com/p/c1b6497889b4

Java熔断框架有哪些_降级熔断框架 Hystrix 源码解析:滑动窗口统计相关推荐

  1. 网络请求框架:Okhttp:Call对象实现请求源码解析【四】

    OKHttp3--调用对象RealCall源码解析[四]_没有鱼了的博客-CSDN博客 一:概述,当我们封装好 Request后需要执行这个请求,但是 OkHttp并不是直接执行 Request ,而 ...

  2. java直接内存为什么快_直接内存与 JVM 源码分析

    直接内存(堆外内存) 直接内存有一种叫法,堆外内存. 直接内存(堆外内存)指的是 Java 应用程序通过直接方式从操作系统中申请的内存.这个差别与之前的堆.栈.方法区,那些内存都是经过了虚拟化.所以严 ...

  3. 【Java深入研究】2、JDK 1.8 LinkedList源码解析

    LinkedList是一个实现了List接口和Deque接口的双端链表.  有关索引的操作可能从链表头开始遍历到链表尾部,也可能从尾部遍历到链表头部,这取决于看索引更靠近哪一端.  LinkedLis ...

  4. [源码解析] 深度学习分布式训练框架 horovod (11) --- on spark --- GLOO 方案

    [源码解析] 深度学习分布式训练框架 horovod (11) - on spark - GLOO 方案 文章目录 [源码解析] 深度学习分布式训练框架 horovod (11) --- on spa ...

  5. [源码解析] 深度学习分布式训练框架 horovod (10) --- run on spark

    [源码解析] 深度学习分布式训练框架 horovod (10) - run on spark 文章目录 [源码解析] 深度学习分布式训练框架 horovod (10) --- run on spark ...

  6. dubbo源码解析之框架粗谈

    dubbo框架设计 一.dubbo框架整体设计 二.各层说明 三.dubbo工程模块分包 四.依赖关系 五.调用链 文章系列 [一.dubbo源码解析之框架粗谈] [二.dubbo源码解析之dubbo ...

  7. java 并发框架源码_Java并发编程高阶技术-高性能并发框架源码解析与实战

    Java并发编程高阶技术-高性能并发框架源码解析与实战 1 _0 Z' @+ l: s3 f6 r% t|____资料3 Z9 P- I2 x8 T6 ^ |____coding-275-master ...

  8. java ssm框架的点歌系统的设计与实现源码

    项目名称 java ssm框架的点歌系统的设计与实现源码 下载地址 下载地址 系统说明 4.2 系统功能 4.2.1 登录与注册功能 系统的登录分为了前台登录和后台登录两个模块,都分别处在不同的界面上 ...

  9. Java集合类框架源码分析 之 LinkedList源码解析 【4】

    上一篇介绍了ArrayList的源码分析[点击看文章],既然ArrayList都已经做了介绍,那么作为他同胞兄弟的LinkedList,当然必须也配拥有姓名! Talk is cheap,show m ...

最新文章

  1. java获取下一季末_java取当前周期、月初至月末、季度初至季度末日期。
  2. Poj 1151-Atlantis 矩形切割
  3. php网站标签加小图标,在htmltitle/title标签添加图标,网页title左边显示网页的logo图标...
  4. VueJS组件之全局组件与局部组件
  5. php把时间戳改为时间格式,php怎么把时间格式转换为时间戳?
  6. jsp中java代码if_jsp中jstl标签的类似 if - else 语句 的语法
  7. MySQL统计信息收集
  8. jdbc连接mysql数据库 工作流程_jdbc连接数据库流程图
  9. java某校在积极推行无人监考,结构化面试题:高校无人监考你怎么看?
  10. 项目管理学习 ---- 项目管理沟通技巧
  11. 稀奇古怪--JAVA篇
  12. MATLAB程序中常见的语法错误,Matlab常见语法错误及解决方法
  13. 网盘修复版新增qq支付仿城通网盘115网盘源码下载
  14. 电脑怎么录制屏幕?分享电脑录制屏幕的3个方法
  15. Android ScrollView、NestedScrollView、Horizo​​ntalScrollView 等
  16. ubuntu 的 arm 版本及其仿真
  17. 去掉RadioButton前面的小圆圈的两种方法
  18. 成就感和尊严,给你快乐
  19. python--爬虫scrapy框架
  20. 图像分割-种子区域生长

热门文章

  1. 深入理解Windows消息循环
  2. C++动态(显式)调用 C++ dll示例
  3. MFC自定义消息的实现方法
  4. MFC的Main函数跑哪去了
  5. Android—多版本主要适配内容
  6. linux加载内核后如何运行app,Android app启动过程
  7. 框架鲜花商城系统测试_分销、团购、秒杀、优惠券小程序商城源码免费分享(Java语言)...
  8. 中山市区电信5g覆盖地图_2020中山数字经济发展论坛举行,上线工业互联网平台...
  9. virtual box虚拟机分区后下一步看不见解决
  10. php中ci的session自动加载报错