2019独角兽企业重金招聘Python工程师标准>>>

摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-third-timeout/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Hystrix 1.5.X 版本

  • 1. 概述
  • 2. HystrixObservableTimeoutOperator
  • 3. HystrixTimer
    • 3.1 ScheduledExecutor
    • 3.2 TimerListener
    • 3.3 TimerReference
  • 666. 彩蛋

???关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Hystrix 命令执行(三)之执行超时

建议 :对 RxJava 已经有一定的了解的基础上阅读本文。

开启执行超时功能,需要配置 :

  • HystrixCommandProperties.executionTimeoutEnabled :执行命令超时功能开关。

    • 值 :Boolean
    • 默认值 :true
  • HystrixCommandProperties.executionTimeoutInMilliseconds :执行命令超时时长。
    • 值 :Integer
    • 单位 :毫秒
    • 默认值 :1000 毫秒

在 《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》「4. #executeCommandAndObserve(…)」 中,#executeCommandAndObserve(...) 方法的第 75 行 lift(new HystrixObservableTimeoutOperator<R>(_cmd)) ,实现了对执行命令超时的监控。

  • Observable#lift(Operator) 方法不熟悉的同学,在 《RxJava 源码解析 —— Observable#lift(Operator)》 有详细解析。

推荐 Spring Cloud 书籍

  • 请支持正版。下载盗版,等于主动编写低级 BUG
  • 程序猿DD —— 《Spring Cloud微服务实战》
  • 周立 —— 《Spring Cloud与Docker微服务架构实战》
  • 两书齐买,京东包邮。

2. HystrixObservableTimeoutOperator

HystrixObservableTimeoutOperator 类,代码如下 :

  1: private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {2: 3:     final AbstractCommand<R> originalCommand;4: 5:     public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {6:         this.originalCommand = originalCommand;7:     }8: 9:     @Override10:     public Subscriber<? super R> call(final Subscriber<? super R> child) {11:         // 创建 订阅12:         final CompositeSubscription s = new CompositeSubscription();13:         // 添加 订阅14:         // if the child unsubscribes we unsubscribe our parent as well15:         child.add(s);16: 17:         //capture the HystrixRequestContext upfront so that we can use it in the timeout thread later18:         final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();19: 20:         TimerListener listener = new TimerListener() {21: 22:             @Override23:             public void tick() {24:                 // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath25:                 // otherwise it means we lost a race and the run() execution completed or did not start26:                 if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {27:                     // report timeout failure28:                     originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);29: 30:                     // shut down the original request31:                     s.unsubscribe();32: 33:                     final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {34: 35:                         @Override36:                         public void run() {37:                             child.onError(new HystrixTimeoutException());38:                         }39:                     });40: 41:                     timeoutRunnable.run();42:                     //if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout43:                 }44:             }45: 46:             @Override47:             public int getIntervalTimeInMilliseconds() {48:                 return originalCommand.properties.executionTimeoutInMilliseconds().get();49:             }50:         };51: 52:         final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);53: 54:         // set externally so execute/queue can see this55:         originalCommand.timeoutTimer.set(tl);56: 57:         /**58:          * If this subscriber receives values it means the parent succeeded/completed59:          */60:         Subscriber<R> parent = new Subscriber<R>() {61: 62:             @Override63:             public void onCompleted() {64:                 if (isNotTimedOut()) {65:                     // stop timer and pass notification through66:                     tl.clear();67:                     // 完成68:                     child.onCompleted();69:                 } else {70:                     System.out.println("timeout: " + "onCompleted"); // 笔者调试用71:                 } 72:             }73: 74:             @Override75:             public void onError(Throwable e) {76:                 if (isNotTimedOut()) {77:                     // stop timer and pass notification through78:                     tl.clear();79:                     // 异常80:                     child.onError(e);81:                 } else {82:                     System.out.println("timeout: " + "onError"); // 笔者调试用83:                 } 84:             }85: 86:             @Override87:             public void onNext(R v) {88:                 if (isNotTimedOut()) {89:                     // 继续执行90:                     child.onNext(v);91:                 } else {92:                     System.out.println("timeout: " + "onNext"); // 笔者调试用93:                 }94:             }95: 96:             /**97:              * 通过 CAS 判断是否超时98:              *99:              * @return 是否超时
100:              */
101:             private boolean isNotTimedOut() {
102:                 // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
103:                 return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
104:                         originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
105:             }
106:
107:         };
108:
109:         // 添加 订阅
110:         // if s is unsubscribed we want to unsubscribe the parent
111:         s.add(parent);
112:
113:         return parent;
114:     }
115:
116: }
  • 第 12 行 :创建订阅 s

  • 第 15 行 :添加订阅 schild 的订阅。

  • 第 18 行 :获得 HystrixRequestContext 。因为下面 listener 的执行不在当前线程,HystrixRequestContext 基于 ThreadLocal 实现。

  • 第 20 至 50 行 :创建执行命令超时监听器 listener ( TimerListener ) 。当超过执行命令的时长( TimerListener#getIntervalTimeInMilliseconds() )时,TimerListener#tick() 方法触发调用。

    • 第 26 行 :通过 AbstractCommand.isCommandTimedOut 变量 CAS 操作,保证和下面第 60 行parent 有且只有一方操作成功。TimedOutStatus 状态变迁如下图 :
    • 第 28 行 :TODO 【2011】【Hystrix 事件机制】
    • 第 31 行 :取消订阅 s注意 :不同执行隔离策略此处的表现不同
      • ExecutionIsolationStrategy.THREAD :该策略下提供取消订阅( #unsubscribe() ),并且命令执行超时,强制取消命令的执行。在 《Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》「6.5 FutureCompleterWithConfigurableInterrupt」 有详细解析。
      • ExecutionIsolationStrategy.SEMAPHORE :该策略下提供取消订阅( #unsubscribe() )时,对超时执行命令的取消。所以,在选择执行隔离策略,要注意这块
    • 第 34 至 41 行 :执行 child#onError(e) 【Subscriber#onError(Throwable)】 方法,处理 HystrixTimeoutException 异常。该异常会被 handleFallback 处理,点击 链接 查看,在 《Hystrix 源码解析 —— 请求执行(四)之失败回退逻辑》 详细解析。
      • HystrixContextRunnable ,设置第 18 行获得的 HystrixRequestContext 到 Callable#run() 所在线程的 HystrixRequestContext ,并继续执行。点击 链接 查看。另外,HystrixContextRunnable 只有此处使用,独立成类的原因是测试用例使用到。
  • 第 52 行 :使用 TimerListener 到定时器,监听命令的超时执行。

  • 第 55 行 :设置 TimerListener 到 AbstractCommand.timeoutTimer 属性。用于执行超时等等场景下的 TimerListener 的清理( tl#clear() )。如下方法有通过该属性对 TimerListener 的清理 :

    • AbstractCommand#handleCommandEnd()
    • AbstractCommand#cleanUpAfterResponseFromCache()
  • 第 60 至 107 行 :创建的 Subscriber ( parent )。在传参的 child 的基础上,增加了对是否执行超时的判断( #isNotTimedOut() )和TimerListener的清理。

  • 第 111 行 :添加添加订阅 parents 的订阅。整体订阅关系如下 :

    • 这里看起来 s 有些“多余” ?因为 parentlistener 存在互相引用的情况,通过 s 解决。
  • 第 113 行 :返回 parent注意。如果不能理解,建议阅读下 《RxJava 源码解析 —— Observable#lift(Operator)》 。

3. HystrixTimer

com.netflix.hystrix.util.HystrixTimer ,Hystrix 定时器。

目前有如下场景使用 :

  • 执行命令超时任务,本文详细解析。
  • 命令批量执行,在 《Hystrix 源码解析 —— 命令合并执行》「5. CollapsedTask」 详细解析。

HystrixTimer 构造方法,代码如下 :

public class HystrixTimer {/*** 单例*/private static HystrixTimer INSTANCE = new HystrixTimer();/* package */ AtomicReference<ScheduledExecutor> executor = new AtomicReference<ScheduledExecutor>();private HystrixTimer() {// private to prevent public instantiation}public static HystrixTimer getInstance() {return INSTANCE;}}
  • INSTANCE 静态属性,单例。
  • executor 属性,定时任务执行器( ScheduledExecutor )。

调用 HystrixTimer#addTimerListener(TimerListener) 方法,提交定时监听器,生成定时任务,代码如下 :

  1: public Reference<TimerListener> addTimerListener(final TimerListener listener) {2:     startThreadIfNeeded();3:     // add the listener4: 5:     Runnable r = new Runnable() {6: 7:         @Override8:         public void run() {9:             try {10:                 listener.tick();11:             } catch (Exception e) {12:                 logger.error("Failed while ticking TimerListener", e);13:             }14:         }15:     };16: 17:     ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);18:     return new TimerReference(listener, f);19: }
  • 第 2 行 :调用 #startThreadIfNeeded() 方法,保证 executor 延迟初始化已完成。

    • #startThreadIfNeeded() 方法 ,比较简单,点击 链接 查看。
    • ScheduledExecutor 在 「3.1 ScheduledExecutor」 详细解析。
  • 第 5 至 15 行 :创建定时任务 Runnable 。在 Runnable#run() 方法里,调用 TimerListener#tick() 方法。在 「3.2 TimerListener」 详细解析。
  • 第 17 行 :提交定时监听器,生成定时任务 f ( ScheduledFuture )。
  • 第 18 行 :使用 listener + f 创建 TimerReference 返回。在 「3.3 TimerReference」 详细解析。

3.1 ScheduledExecutor

com.netflix.hystrix.util.HystrixTimer.ScheduledExecutor ,Hystrix 定时任务执行器。代码如下 :

/* package */ static class ScheduledExecutor {/*** 定时任务线程池执行器*//* package */ volatile ScheduledThreadPoolExecutor executor;/*** 是否初始化*/private volatile boolean initialized;/*** We want this only done once when created in compareAndSet so use an initialize method*/public void initialize() {// coreSizeHystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();int coreSize = propertiesStrategy.getTimerThreadPoolProperties().getCorePoolSize().get();// 创建 ThreadFactoryThreadFactory threadFactory = null;if (!PlatformSpecific.isAppEngineStandardEnvironment()) {threadFactory = new ThreadFactory() {final AtomicInteger counter = new AtomicInteger();@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r, "HystrixTimer-" + counter.incrementAndGet());thread.setDaemon(true);return thread;}};} else {threadFactory = PlatformSpecific.getAppEngineThreadFactory();}// 创建 ScheduledThreadPoolExecutorexecutor = new ScheduledThreadPoolExecutor(coreSize, threadFactory);// 已初始化initialized = true;}public ScheduledThreadPoolExecutor getThreadPool() {return executor;}public boolean isInitialized() {return initialized;}
}
  • 线程池大小( coreSize ),通过 HystrixTimerThreadPoolProperties.corePoolSize 配置。

3.2 TimerListener

com.netflix.hystrix.util.HystrixTimer.TimerListener ,Hystrix 定时任务监听器****接口。代码如下 :

public static interface TimerListener {/*** The 'tick' is called each time the interval occurs.* <p>* This method should NOT block or do any work but instead fire its work asynchronously to perform on another thread otherwise it will prevent the Timer from functioning.* <p>* This contract is used to keep this implementation single-threaded and simplistic.* <p>* If you need a ThreadLocal set, you can store the state in the TimerListener, then when tick() is called, set the ThreadLocal to your desired value.*/void tick();/*** How often this TimerListener should 'tick' defined in milliseconds.*/int getIntervalTimeInMilliseconds();
}
  • #tick() 方法 :时间到达( 超时 )执行的逻辑。
  • #getIntervalTimeInMilliseconds() 方法 :返回到达( 超时 )时间时长。

3.3 TimerReference

com.netflix.hystrix.util.HystrixTimer.TimerReference ,Hystrix 定时任务引用。代码如下 :

private static class TimerReference extends SoftReference<TimerListener> {private final ScheduledFuture<?> f;TimerReference(TimerListener referent, ScheduledFuture<?> f) {super(referent);this.f = f;}@Overridepublic void clear() {super.clear();// stop this ScheduledFuture from any further executionsf.cancel(false); // 非强制}}
  • 通过 #clear() 方法,可以取消定时任务的执行。

666. 彩蛋

顺畅~刚开始看 Hystrix 执行命令超时逻辑,一直想不通。现在整理干净了。

喵了个咪~

胖友,分享一波朋友圈可好!

转载于:https://my.oschina.net/sword4j/blog/1590758

熔断器 Hystrix 源码解析 —— 命令执行(三)之执行超时相关推荐

  1. Java熔断框架有哪些_降级熔断框架 Hystrix 源码解析:滑动窗口统计

    降级熔断框架 Hystrix 源码解析:滑动窗口统计 概述 Hystrix 是一个开源的降级熔断框架,用于提高服务可靠性,适用于依赖大量外部服务的业务系统.什么是降级熔断呢? 降级 业务降级,是指牺牲 ...

  2. Ardusub源码解析学习(三)——车辆类型

    APM_Sub源码解析学习(三)--车辆类型 一.前言 二.class AP_HAL::HAL 三.class AP_Vehicle 3.1 .h 3.2 .cpp 四.class Sub 4.1 . ...

  3. APM_ArduCopter源码解析学习(三)——无人机类型

    APM_ArduCopter源码解析学习(三)--无人机类型 一.前言 二.class AP_HAL::HAL 三.class AP_Vehicle 3.1 .h 3.2 .cpp 四.class C ...

  4. statement执行insert into语句_【图文并茂】源码解析MyBatis ShardingJdbc SQL语句执行流程详解...

    源码分析Mybatis系列目录: 1.源码分析Mybatis MapperProxy初始化[图文并茂] 2.源码分析Mybatis MappedStatement的创建流程 3.[图文并茂]Mybat ...

  5. Spring AOP源码解析-拦截器链的执行过程

    一.简介 在前面的两篇文章中,分别介绍了 Spring AOP 是如何为目标 bean 筛选合适的通知器,以及如何创建代理对象的过程.现在得到了 bean 的代理对象,且通知也以合适的方式插在了目标方 ...

  6. 【OS xv6】1 万字详解shell源码解析命令(内含wsl+vscode调试xv6教程 文档第一章助读)

    现在前面的 嘻嘻几百年没写文了确实没时间,等搞完毕设可以一起重温重温.最近学os,读源码发现还挺多东西得整理的,尤其途中有必要找资料整理的时候,内容有点多有点乱,写在源码已经显得不现实了.用的vsco ...

  7. 【Java】【系列篇】【Spring源码解析】【三】【体系】【BeanFactory体系】

    BeanFactory体系 BeanFactory整体结构体系图 顶层接口-BeanFactory 1.1.描述 1.2.方法解析(15个) 1.2.1.属性 1.2.2.获取bean实例 1.2.3 ...

  8. 【JVM源码解析】模板解释器解释执行Java字节码指令(上)

    本文由HeapDump性能社区首席讲师鸠摩(马智)授权整理发布 第17章-x86-64寄存器 不同的CPU都能够解释的机器语言的体系称为指令集架构(ISA,Instruction Set Archit ...

  9. 激光SLAM源码解析S-LOAM(三)里程计图优化

    里程计,是通过累计帧间位姿变换得来的,因此会累积帧间误差. 如果想要纠正此累积误差,我们需要通过另一种方法得到可信位姿,以此校正相同时刻里程计位姿. SLAM图优化,是一种记录各帧时刻里程计,并通过可 ...

最新文章

  1. SpringCloud核心组件及其作用
  2. Android 项目结构说明
  3. 不要把游戏当游戏,要把游戏当明星
  4. Activity与Thread之间的通讯(old)
  5. CG CTF WEB pass check
  6. 三、“涤纶纤维和棉纤维两组分纤维在涤/棉混纺织物燃烧过程中有着明显的物理相互作用和化学相互作用”,解释这两种作用。
  7. nginx只能访问80端口_nginx 访问不了非80端口
  8. win10你的组织已关闭自动更新问题怎么解决?
  9. Structural Deep Clustering Network 基于GNN的深度聚类算法 WWW2020
  10. Android开发/源码资源汇总
  11. RK3288_Android7.1接eDP屏休眠之后led状态灯没有亮红色
  12. [转]Spring3 MVC + jQuery easyUI 做的ajax版本用户管理
  13. c语言中math的作用,C语言Math函数库简介
  14. 2016西安教师职称计算机考试,2016教师职称计算机考试模块.doc
  15. 背离、背驰的区别及简单的判断方法
  16. 利用beego开发网站(一)
  17. 怎么让图片铺满手机屏幕_手机版Photoshop怎么把图片铺满屏幕?
  18. NLP炼丹技巧:标签平滑label smoothing
  19. 和风天气:免费天气接口,包含天气预报及空气质量等实用信息,使用简单方便(划重点:免费)
  20. CTF初学笔记解题-密码1

热门文章

  1. 【ESP32教程】ESP32EEPROM的使用(使用示例中的eeprom class用法)
  2. SQL Server 数据分区管理
  3. 迅雷向链享云售让部分区块链业务:包括链克与链克商城
  4. 0x01.被动信息收集
  5. error processing condition on org.autoconfigure.transaction.TransactionAutoConfiguration$Transaction
  6. Cyprss串行铁电存储器64Kbit FM25CL64B-GTR
  7. 【Luat-air105】8.2 fatfs及照片存储
  8. Bugku-CTF之细心 (想办法变成admin)
  9. 【数据库】MySQL的sql语句详解
  10. 发光二极管和光敏二极管