前言

随着业务的越来越复杂,保证程序的健壮性对程序猿来说也变得更加的重要,毕竟不写Bug的程序猿不是一个好的程序猿。但怎样尽可能的保证咱们的程序能够稳定的运行,以及出错后能够进行相应的补偿,这里就需要咱们使用熔断机制了。

PS:在进入正文之前,不妨思考一下两个问题:
①熔断机制究竟为我们解决了什么问题?
②我们怎样去自己实现一个简单的熔断?


自定义熔断的实现

这里咱们简单的实现了一个超时后进行熔断的例子,这里有用到AspectJ的相关知识,对于熟悉Spring AOP知识的同学应该没什么问题。

主要分为两步:

  1. 使用Future控制是否超时,超时后将任务cancel掉。
  2. 调用咱们自己定义好的fallback方法进行处理。在这里需要注意的是,fallback方法参数应该要与原方法相同,这样咱们才能进行补偿措施。例如:咱们可以在fallback方法借助消息中间件将这些参数进行存储,然后在适当的时候从消息中间件中读取出来进行补偿消费处理。
 1@RestController 2public class HelloController { 3    private Random random = new Random(); 4 5    @MyHystrixCommand(fallback="errorMethod") 6    @RequestMapping("/hello") 7    public String hello(@RequestParam("name") String message) throws InterruptedException { 8        int time = random.nextInt(200); 9        System.out.println("spend time : " + time + "ms");10        Thread.sleep(time);11        System.out.println("hhhhhhhhhhhhhhhhhhhhhhhhh");12        return "hello world:" + message;13    }1415    public String errorMethod(String message) {16        return "error message";17    }18}复制代码
1@Target(ElementType.METHOD)2@Retention(RetentionPolicy.RUNTIME)3@Documented4public @interface MyHystrixCommand {5    int value() default 100;6    String fallback() default "";7}复制代码
 1@Aspect 2@Component 3public class MyHystrixCommandAspect { 4 5    ExecutorService executor = Executors.newFixedThreadPool(10); 6 7    @Pointcut(value = "@annotation(MyHystrixCommand)") 8    public void pointCut() { 910    }1112    @Around(value = "pointCut()&&@annotation(hystrixCommand)")13    public Object doPointCut(ProceedingJoinPoint joinPoint, MyHystrixCommand hystrixCommand) throws Throwable {14        int timeout = hystrixCommand.value();15        Future future = executor.submit(() -> {16            try {17                return joinPoint.proceed();18            } catch (Throwable throwable) {19            }20            return null;21        });22        Object returnValue = null;23        try {24            returnValue = future.get(timeout, TimeUnit.MILLISECONDS);25        } catch (InterruptedException | ExecutionException | TimeoutException e) {26            future.cancel(true);27            if (StringUtils.isBlank(hystrixCommand.fallback())){28                throw new Exception("fallback is null");29            }30            returnValue = invokeFallbackMethod(joinPoint, hystrixCommand.fallback());31        }32        return returnValue;33    }3435    private Object invokeFallbackMethod(ProceedingJoinPoint joinPoint, String fallback) throws Exception {36        Method method = findFallbackMethod(joinPoint, fallback);37        if (method == null) {38            throw new Exception("can not find fallback :" + fallback + " method");39        } else {40            method.setAccessible(true);41            try {42                Object invoke = method.invoke(joinPoint.getTarget(), joinPoint.getArgs());43                return invoke;44            } catch (IllegalAccessException | InvocationTargetException e) {45                throw e;46            }47        }48    }495051    private Method findFallbackMethod(ProceedingJoinPoint joinPoint, String fallbackMethodName) {52        Signature signature = joinPoint.getSignature();53        MethodSignature methodSignature = (MethodSignature) signature;54        Method method = methodSignature.getMethod();55        Class<?>[] parameterTypes = method.getParameterTypes();56        Method fallbackMethod = null;57        try {58        //这里通过判断必须取和原方法一样参数的fallback方法59            fallbackMethod = joinPoint.getTarget().getClass().getMethod(fallbackMethodName, parameterTypes);60        } catch (NoSuchMethodException e) {61        }62        return fallbackMethod;63    }6465}复制代码

当然,上述例子只是一个简单的超时后熔断处理的实现方式。咱们在实际应用中,还有可能并发超过指定阈值后咱们也需要进行降级处理,一个最普通的场景:秒杀案例。这些东西在Hystrix中都有相应的处理,它提供了线程池和信号量这两种方式去解决并发的问题。


什么是Hystrix?

咱们看一下官方介绍

In a distributed environment, inevitably some of the many service dependencies will fail. Hystrix is a library that helps you control the interactions between these distributed services by adding latency tolerance and fault tolerance logic. Hystrix does this by isolating points of access between the services, stopping cascading failures across them, and providing fallback options, all of which improve your system’s overall resiliency.

在分布式环境中,调用一些服务不可避免的会出现失败,Hystrix帮助咱们添加了一些容忍策略,并且将服务进行隔离处理,防止一个服务的失败影响到了另一个服务的调用,这些都提高了咱们系统的弹性。


Hystrix的处理流程

这里咱们结合一下Spring Cloud Hystrix进行说明,从HystrixCommandAspect开始分析:

 1@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()") 2    public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable { 3        Method method = getMethodFromTarget(joinPoint); 4        ... 5        MetaHolder metaHolder = metaHolderFactory.create(joinPoint);//第一步 6        HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);//第二步 7        ... 8        Object result; 9        try {10            //第三步11            if (!metaHolder.isObservable()) {12                result = CommandExecutor.execute(invokable, executionType, metaHolder);13            } else {14                result = executeObservable(invokable, executionType, metaHolder);15            }16        } 17        ....18        return result;19    }复制代码

这个切面主要针对HystrixCommandHystrixCollapser这两个注解,前者用于进行熔断降级处理,后者用来根据配置进行合并请求(类比数据库操作,将多个insert语句合并成一个insert batch语句)。咱们侧重进行HystrixCommand这一块的分析。

第一步:获取元数据(MetaHolder)

这段代码对应上面的MetaHolder metaHolder = metaHolderFactory.create(joinPoint);,里面封装了比如调用方法method,参数args,方法所属对象target,动态代理对象proxy,回调方法fallbackMethod等等一些元数据的封装。这些数据在创建命令对象时会被使用。

第二步:获取调用者(HystrixInvokable)

它持有一个命令对象,并且可以在合适的时候通过这个命令对象完成具体的业务逻辑,针对HystrixCommand上述的命令对象就是GenericObservableCommandGenericCommand的一种,这里命令对象的选择和方法的返回值有关,如果返回值为Observable类型,则创建GenericObservableCommand命令,否则创建GenericCommand命令。

第三步:执行命令(execute)
 1    public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException { 2        ... 3        switch (executionType) { 4            case SYNCHRONOUS: { 5                return castToExecutable(invokable, executionType).execute(); 6            } 7            case ASYNCHRONOUS: { 8                HystrixExecutable executable = castToExecutable(invokable, executionType); 9                if (metaHolder.hasFallbackMethodCommand()10                        && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {11                    return new FutureDecorator(executable.queue());12                }13                return executable.queue();14            }15            case OBSERVABLE: {16                HystrixObservable observable = castToObservable(invokable);17                return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();18            }19            ...20        }21    }复制代码

从上面的代码段中,可以很容易的看出共有三种策略,同步、异步、OBSERVABLE,而Observable又分为Cold Observable(observable.toObservable())Hot Observable(observable.observe())。所以说总共有四种执行方式。但是底层都会调用到AbstractCommand.toObservable()方法。

  • execute():同步执行,返回一个单一的对象结果,发生错误时抛出异常。
  • queue():异步执行,返回一个Future对象,包含着执行结束后返回的单一结果。
  • observe():这个方法返回一个Observable对象,它代表操作的多个结果,但是已经被订阅者消费掉了。
  • toObservable():这个方法返回一个Observable对象,它代表操作的多个结果,需要咱们自己手动订阅并消费掉。

在执行逻辑中,大量用到了RxJava,各种回调处理,看的着实头晕,感兴趣的同学可以自行阅读源码,我这里只是介绍一些关键的流程点。

①首先会检查是否命中缓存(toObservable方法中),命中缓存则直接返回:

1/* try from cache first */2 if (requestCacheEnabled) {3      HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);4       if (fromCache != null) {5           isResponseFromCache = true;6           return handleRequestCacheHitAndEmitValues(fromCache, _cmd);7        }8}复制代码

②检查断路器是否打开,如果断路器打开,则通过handleShortCircuitViaFallback直接进行fallback处理:

 1private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { 2        executionHook.onStart(_cmd); 3 4        /* determine if we're allowed to execute */ 5        if (circuitBreaker.allowRequest()) { 6        }else { 7            return handleShortCircuitViaFallback(); 8        } 9        ...10}复制代码

③检查是否用了信号量,如果用了,则判断是否被占满,占满后则抛出异常,通过handleSemaphoreRejectionViaFallback直接转到fallback中进行执行,不执行后面的逻辑。如果没用,则会返回一个默认的TryableSemaphoreNoOp.DEFAULT,在进行executionSemaphore.tryAcquire()时始终返回true。

 1if (executionSemaphore.tryAcquire()) { 2  try { 3    /* used to track userThreadExecutionTime */ 4    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); 5    return executeCommandAndObserve(_cmd) 6            .doOnError(markExceptionThrown) 7            .doOnTerminate(singleSemaphoreRelease) 8            .doOnUnsubscribe(singleSemaphoreRelease); 9    } catch (RuntimeException e) {10        return Observable.error(e);11    }12} else {13    return handleSemaphoreRejectionViaFallback();14}复制代码

④执行命令中的逻辑

通过重写AbstractCommand中的getExecutionObservable()方法使得下面两个命令类中的相应逻辑被调用。

  • GenericCommand中的run()方法
  • GenericObservableCommand中的construct()方法

如果run或者construct中设置了超时时间,如果执行时间超过了阈值,则会抛出TimeoutException,或者在执行过程中抛出其他异常,都会进入fallback中进行处理逻辑。

⑤发生异常后执行fallback

1   private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, 2         final HystrixEventType eventType,3         final FailureType failureType, 4         final String message,5         final Exception originalException) {6}复制代码

最终都会调用到这个方法,咱们看看FailureType具体有哪几种类型。

  • COMMAND_EXCEPTION:执行run方法或者construct方法抛出异常时。
  • TIMEOUT:超时情况下。
  • SHORTCIRCUIT:断路器直接打开时,直接执行handleShortCircuitViaFallback方法。
  • REJECTED_THREAD_EXECUTION:线程池、请求队列被占满的情况下。
  • REJECTED_SEMAPHORE_EXECUTION:信号量占满情况下。
  • BAD_REQUEST_EXCEPTION:
  • REJECTED_SEMAPHORE_FALLBACK:

总结

Hystrix中大量用了RxJava,阅读源码看起来不免会觉得头晕,可以考虑在关键点打几个断点看看,不然各种回调会让你绕圈圈。不过个人觉得RxJava代码看起来还是蛮优美的,只不过有些许不适应而已,后面有时间会研究一下RxJava


END

转载于:https://juejin.im/post/5cbc92135188257ab6106004

如何实现一个简单的熔断以及Hystrix原理分析相关推荐

  1. 一个简单的IPmsg程序源码分析(二)

    离上篇一个简单的IPmsg程序源码分析(一)已经基本半个月(上篇最初发布在点点上面,后边纠结了一下还是选择了博客园),利用空闲的时间终于把源码的构架和一些细节基本都搞清楚了,总的来说是很简单的一个客户 ...

  2. 一个简单的时间片轮转多道程序内核代码分析

    郑斌 原创作品转载请注明出处 <Linux内核分析>MOOC课程http://mooc.study.163.com/course/USTC-1000029000 第二周的实验内容分析 1. ...

  3. Linux内核分析2:一个简单的时间片轮转多道程序内核代码分析

    Lab2:一个简单的时间片轮转多道程序内核代码 席金玉   <Linux内核分析>MOOC课程http://mooc.study.163.com/course/USTC-100002900 ...

  4. linux实验:基于mykernel的一个简单的时间片轮转多道程序内核代码分析

    学号后三位:288 原创作品转载请注明出处 + https://github.com/mengning/linuxkernel/ 1.mykernel mykernel是由中科大孟宁老师建立的用于开发 ...

  5. 一个简单判等例子的深度分析

    今儿给网友讲解了一个判等的问题,看似简单,其实还是蛮有意思的: object s = 1, t = 1; Console.WriteLine( "s == t -> {0}" ...

  6. 一个简单进程调度器的实现和分析

    主体代码文件有三个,mypcb.h,myinterupt.h, mymain.h,mypcb定义了进程控制块结构,myinterupt实现了中断处理程序,mymain是实际入口点,以下代码省去了头文件 ...

  7. 一个简单的Android木马病毒的分析

    一.样本信息 文件名称: 一个安卓病毒木马.apk 文件大小:242867 byte 文件类型:application/jar 病毒名称:Android.Trojan.SMSSend.KS 样本MD5 ...

  8. 一个简单的死锁demo以及死锁分析

    废话不多说,直接贴上demo代码: import java.util.concurrent.TimeUnit;public class DeadlockTest {public static void ...

  9. feign直接走熔断_四、Spring Cloud之熔断处理 Hystrix

    前言 熔断处理什么呢?在微服务项目中,有很多的服务,在服务消费者调用服务提供者的时候可能会出现网络异常或者请求超时或者阻塞等等一系列问题,不过不进行处理的话,就可能导致,长时间等待,进程阻塞,最终导致 ...

  10. flask post json_Flask 和 requests 搭建一个简单的API服务

    (点击上方快速关注并设置为星标,一起学Python) 路由器为腾达路由器,使用requests来进行数据的获取,使用flask来进行实现api的搭建 requests我就不介绍了,这个大家都很熟悉了, ...

最新文章

  1. 9月机器学习开源项目Top10
  2. tensorflow 利用索引获取tensor特定元素
  3. 查找表的原理与结构 什么是竞争与冒险现象?怎样判断?如何消除?
  4. head first java原文_Head First Java
  5. 玛雅Maya 2022 for Mac(三维动画制作软件)
  6. 关于oracle存储微信表情emoji问题 “[[%F0%9F%A4%AA]]“
  7. 用cube移植PS2手柄--HAL库
  8. MySQL安装步骤(ZIP版)
  9. 吃冬瓜对宝宝有什么好处?
  10. fastboot 操作
  11. python学习之心路历程
  12. Win32应用程序开发:完整的开发流程
  13. 为什么5G能比4G快十倍?
  14. ubuntu18.04关闭输入法浮动窗
  15. 明辰智航网络一点通网络性能测试仪可以做什么
  16. 阿里云服务器域名备案
  17. ToB和ToC的产品经理的区别
  18. python图形化方式模块安装_(怎么安装python模块,如何安装python模块,常用安装方式)python助手安装教程视频...
  19. HECKTOR2020第三名论文研读
  20. Python:把一个数组按指定数组大小size分割为多个数组

热门文章

  1. Spinnaker:云原生多云环境持续部署的未来
  2. 入门佳作《例解Python》来了!案例丰富尽显风度ƪ(´▽`ƪ)
  3. Scala中的Apply方法与伴生对象
  4. Git版本控制:Github的使用之 多人协作及参与项目
  5. 改进3D/2D U-NET--添加深度监督deep supervision【Keras】
  6. python hbase_python 操作 hbase
  7. python中scrapy框架项目_Python -- Scrapy 框架简单介绍(Scrapy 安装及项目创建)
  8. python有比赛吗_python编程比赛到底应不应该让孩子参加?有好处
  9. php计算数字的立方,玩疯了!这回是人类发现了把3写成3个整数立方和的第3种写法!...
  10. python窗口怎么显示我输入的_Python分别用两个窗口显示和输入