pigeon熔断降级

当服务调用在短时间内出现大量的失败且失败率超过一定阀值时,可以通过配置手动或自动触发降级,调用端直接返回默认对象或抛出异常,不会将调用请求发到服务提供方,如果服务提供方恢复可用,客户端可以自动或手工解除降级。

pigeon降级开关

pigeon提供三种降级开关,来分别支持不同的降级策略:

  1. 强制降级开关:在远程服务大量超时或其他不可用情况时,紧急时候进行设置,开启后,调用端会根据上述降级策略直接返回默认值或抛出降级异常,当远程服务恢复后,建议关闭此开关。对应配置pigeon.invoker.degrade.force=true,默认为false
  2. 失败降级开关:失败降级开关便于客户端在服务端出现非业务异常(比如网络失败,超时,无可用节点等)时进行降级容错,而在出现业务异常(比如登录用户名密码错误)时不需要降级。对应配置pigeon.invoker.degrade.failure=true,默认为false
  3. 自动降级开关:自动降级开关是在调用端设置,开启自动降级后,调用端如果调用某个服务出现连续的超时或不可用,当一段时间内(10秒内)失败率超过一定阀值(默认1%)会触发自动降级,调用端会根据上述降级策略直接返回默认值或抛出降级异常;当服务端恢复后,调用端会自动解除降级模式,再次发起请求到远程服务。对应配置pigeon.invoker.degrade.auto=true,默认为false

若同时开启了多个开关,会根据下面优先级使用相应降级策略:强制降级 > 自动降级 > 失败降级,其中自动降级包含失败降级策略。

pigeon降级处理策略配置

通过配置pigeon.invoker.degrade.methods为不同的服务方法指定不同的降级策略,如:

http://service.dianping.com/com.dianping.pigeon.demo.EchoService#echo=a,http://service.dianping.com/com.dianping.pigeon.demo.EchoService#getUserDetail=b,http://service.dianping.com/com.dianping.pigeon.demo.EchoService#getUserDetailArray=c
上述配置内容包含多个方法的降级策略a、b、c。如果某此调用需要降级,而降级策略没有配置则不降级,进行正常调用流程。

配置解析定义在DegradationFilter#parseDegradeMethodsConfig方法中,

对于a、b、c这些降级策略,可以通过诸如pigeon.invoker.degrade.method.return.a等配置来定义具体的降级处理策略。
在触发降级后,pigeon支持4种降级处理策略:

  1. 指定默认返回值,可以为一个复杂对象
  2. 抛出指定异常
  3. 执行groovy脚本
  4. Mock方式

下面对这几种降级处理策略举例分析:

指定默认返回值

如策略a有pigeon.invoker.degrade.method.return.a配置值为:

{"returnClass": "java.lang.String","content": "echo,input"
}

这里意思是降级返回一个字符串"echo,input"。
而对于复杂对象,可参照json格式配置,如:

{"returnClass": "com.dianping.pigeon.demo.User","content": "{\"username\":\"user-1\"}"
}

如果反序列化是Map类型,还可以配置keyClass和valueClass属性来指定键值类型,如果是Collection类型,可以配置getComponentClass来指定元素类型。

抛出指定异常

如果想在降级后抛出指定异常,可以配置如pigeon.invoker.degrade.method.return.b为:

{"throwException": "true","returnClass": "Exception"
}

执行groovy脚本

如配置pigeon.invoker.degrade.method.return.c值为:

{"useGroovyScript": "true","content": "throw new RuntimeException('test groovy degrade');"
}

pigeon可以根据配置的content,动态执行groovy脚本,这里需注意脚本的最后一条语句必须返回方法的返回值类型或抛出异常。

mock方式

除了上述几种使用lion配置降级策略的方式,pigeon还提供了一种使用mock类的降级配置方式。

例如我们想修改pigeon-test.pigeon.invoker.degrade.method.return.a的降级策略方式为mock方式,只需修改配置为:{"useMockClass":"true"}
打开mock开关,然后在spring的xml配置中添加mock类的引用对象:

<bean id="echoService" class="com.dianping.pigeon.remoting.invoker.config.spring.ReferenceBean" init-method="init"><property name="url" value="com.dianping.pigeon.benchmark.service.EchoService" /><property name="interfaceName" value="com.dianping.pigeon.benchmark.service.EchoService" /><property name="mock" ref="echoServiceMock" /><!-- 添加mock类的引用 -->
</bean><!-- 必须实现EchoService接口 -->
<bean id="echoServiceMock" class="com.dianping.pigeon.benchmark.service.EchoServiceMock"/

对于上面几种策略,可以通过配置enable=true|false来确定是否启动策略,不填写默认为true,如{“useMockClass”:“true”,“enable”:“false”}。
如果对于同一个服务方法启动了多种降级策略,会根据以下优先级执行策略:
mock方式>groovy脚本>抛出异常>返回默认对象。

分析完以上策略配置,来看看pigeon解析配置的代码实现,定义在DegradationFilter#parseDegradeMethodsConfig方法中:

private static void parseDegradeMethodsConfig(String degradeMethodsConfig) throws Throwable {if (StringUtils.isNotBlank(degradeMethodsConfig)) {ConcurrentHashMap<String, DegradeAction> map = new ConcurrentHashMap<String, DegradeAction>();// 格式如"key1=value1,key2=value2",其中key为url + "#" + methodName// 可以从配置"pigeon.invoker.degrade.method.return." + value中获取具体方法的DegradeActionConfig JSON字符串String[] pairArray = degradeMethodsConfig.split(",");for (String str : pairArray) {if (StringUtils.isNotBlank(str)) {String[] pair = str.split("=");if (pair != null && pair.length == 2) {String key = pair[1].trim();DegradeAction degradeAction = new DegradeAction();if (StringUtils.isNotBlank(key)) {// 获取指定的degradeActionConfig,并装配DegradeAction对象String config = configManager.getStringValue(KEY_DEGRADE_METHOD + key);if (StringUtils.isNotBlank(config)) {// 反序列化DegradeActionConfigconfig = config.trim();config = "{\"@class\":\"" + DegradeActionConfig.class.getName() + "\","+ config.substring(1);DegradeActionConfig degradeActionConfig = (DegradeActionConfig) jacksonSerializer.toObject(DegradeActionConfig.class, config);// 解析配置,初始化degradeActiondegradeAction.setUseMockClass(degradeActionConfig.getUseMockClass());degradeAction.setUseGroovyScript(degradeActionConfig.getUseGroovyScript());degradeAction.setThrowException(degradeActionConfig.getThrowException());degradeAction.setEnable(degradeActionConfig.getEnable());String content = degradeActionConfig.getContent();Object returnObj = null;// 解析具体的降级方案if (degradeAction.isUseMockClass()) {// use mock class} else if (degradeAction.isUseGroovyScript()) {degradeAction.setGroovyScript(GroovyUtils.getScript(content));} else if (degradeAction.isThrowException()) {if (StringUtils.isNotBlank(degradeActionConfig.getReturnClass())) {// 反序列化成指定异常returnObj = jacksonSerializer.toObject(Class.forName(degradeActionConfig.getReturnClass()), content);if (!(returnObj instanceof Exception)) {throw new IllegalArgumentException("Invalid exception class:" + degradeActionConfig.getReturnClass());}degradeAction.setReturnObj(returnObj);}} else {if (StringUtils.isNotBlank(degradeActionConfig.getKeyClass())&& StringUtils.isNotBlank(degradeActionConfig.getValueClass())) {// 反序列化map对象returnObj = jacksonSerializer.deserializeMap(content,Class.forName(degradeActionConfig.getReturnClass()),Class.forName(degradeActionConfig.getKeyClass()),Class.forName(degradeActionConfig.getValueClass()));} else if (StringUtils.isNotBlank(degradeActionConfig.getComponentClass())) {// 反序列化collection对象returnObj = jacksonSerializer.deserializeCollection(content,Class.forName(degradeActionConfig.getReturnClass()),Class.forName(degradeActionConfig.getComponentClass()));} else if (StringUtils.isNotBlank(degradeActionConfig.getReturnClass())) {// 反序列化普通java对象returnObj = jacksonSerializer.toObject(Class.forName(degradeActionConfig.getReturnClass()), content);}degradeAction.setReturnObj(returnObj);}}}map.put(pair[0].trim(), degradeAction);}}}// 重置缓存degradeMethodActions.clear();degradeMethodActions = map;} else {// 重置缓存degradeMethodActions.clear();}groovyMocks.clear();
}

降级配置和请求数据统计

在方法实现中,如果开启了强制降级开关,会直接走降级策略,否则如果开启了自动降级开关,会有一个较为复杂的计算逻辑根据自动降级配置及当前的调用情况来判断是否需要降级以及是否从降级中恢复为正常访问。
下面主要看看和自动降级策略相关的配置和数据统计。

在DegradationFilter获取到后面拦截器链调用的相应结果后,会根据结果统计调用数据,来判断后续是否需要走降级策略。
结果数据统计主要通过以下代码实现:

 // 统计失败调用情况,包括返回异常、捕获异常,降级调用异常等情况会走到
DegradationManager.INSTANCE.addFailedRequest(context, failedException);// 统计降级调用情况,在降级调用处理里,如果不是一个失败降级,则会走到当前统计
DegradationManager.INSTANCE.addDegradedRequest(context, null);// 统计成功调用情况
DegradationManager.INSTANCE.addNormalRequest(context);

对应的方法实现:

public void addFailedRequest(InvokerContext context, Throwable t) {if (t instanceof ServiceUnavailableException || t instanceof RequestTimeoutException|| t instanceof RemoteInvocationException || t instanceof RejectedException|| t instanceof ServiceFailureDegreadedException || isCustomizedDegradeException(t)) {// 非业务异常或自定义降级异常addRequest(context, t, false);}
}public void addDegradedRequest(InvokerContext context, Throwable t) {addRequest(context, null, true);if (isLogDegrade && !(t instanceof ServiceDegradedException)) {// 启动记录降级日志开关,且不是降级异常ServiceDegradedException ex = new ServiceDegradedException(getRequestUrl(context), t);ex.setStackTrace(new StackTraceElement[]{});monitor.logError(ex);}
}public void addNormalRequest(InvokerContext context) {addRequest(context, null, false);
}

上面三个方法都统一调用了addRequest方法:

private void addRequest(InvokerContext context, Throwable t, boolean degraded) {// 自动降级或强制降级才进行统计if (isAutoDegrade || isForceDegrade) {int currentSecond = Calendar.getInstance().get(Calendar.SECOND);String requestUrl = getRequestUrl(context);// 获取指定url的每秒请求数统计ConcurrentHashMap<Integer, Count> secondCount = requestSecondCountMap.get(requestUrl);if (secondCount == null) {// 没有则初始化一个secondCount = new ConcurrentHashMap<Integer, Count>();ConcurrentHashMap<Integer, Count> last = requestSecondCountMap.putIfAbsent(requestUrl, secondCount);if (last != null) {// 这里考虑并发处理,可能会有其他线程初始化了,则直接覆盖使用secondCount = last;}}// 获取当前秒的计数Count count = secondCount.get(currentSecond);if (count == null) {// 为空则初始化一个count = new Count(0, 0, 0);// 考虑并发处理Count last = secondCount.putIfAbsent(currentSecond, count);if (last != null) {count = last;}}// 总计数+1count.total.incrementAndGet();if (t != null) {// 存在异常,则失败数+1,仅addFailedRequest会触发这个分支count.failed.incrementAndGet();}if (degraded) {// 存在降级,则降级数+1,仅addDegradedRequest会触发这个分支count.degraded.incrementAndGet();}}
}

从上面我们看到,最终针对每个服务方法,会有三个纬度的秒级统计,为总请求数、降级数、失败数。并最终存储到DegradationManager的requestSecondCountMap成员变量中。但实际上,在进行降级策略触发判断时,用到的是requestCountMap。中间的转化由DegradationManager的静态内部类Checker完成,Checker实现了Runnable接口。DegradationManager会在静态初始化时启动另开一个子线程来运行Checker定时任务,每秒根据requestSecondCountMap重新初始化requestCountMap。
下面看看Checker关于数据汇总的实现,同时历史计数的清理也是在这个定时器中完成:

static class Checker implements Runnable {@Overridepublic void run() {while (true) {try {// 每隔一秒进行Thread.sleep(1000 * degradeCheckInterval);checkRequestSecondCount();} catch (Exception e) {logger.error("", e);}}}private void checkRequestSecondCount() {Map<String, Count> countMap = new ConcurrentHashMap<String, Count>();// 最近需要统计的秒数数据,默认为10,可以理解位一个滑动的统计窗口final int recentSeconds = degradeCheckSeconds;final int currentSecond = Calendar.getInstance().get(Calendar.SECOND);// 遍历每隔服务方法for (String url : requestSecondCountMap.keySet()) {Map<Integer, Count> secondCount = requestSecondCountMap.get(url);int total = 0, failed = 0, degraded = 0;// 统计过去recentSeconds秒的调用情况,并汇总到countMap中for (int i = 1; i <= recentSeconds; i++) {int prevSec = currentSecond - i;// 60秒循环prevSec = prevSec >= 0 ? prevSec : prevSec + 60;Count ct = secondCount.get(prevSec);if (ct != null) {total += ct.getTotalValue();failed += ct.getFailedValue();degraded += ct.getDegradedValue();}}countMap.put(url, new Count(total, failed, degraded));// 如recentSeconds=20,则清空过去20~40秒的计数器计数for (int i = recentSeconds + 1; i <= recentSeconds + 20; i++) {int prevSec = currentSecond - i;prevSec = prevSec >= 0 ? prevSec : prevSec + 60;Count ct = secondCount.get(prevSec);if (ct != null) {ct.clear();}}}Map<String, Count> old = requestCountMap;requestCountMap = countMap;// 清理无用数据,防止不必要的存在引用引起内存泄漏if (old != null) {old.clear();old = null;}// 复用降级统计和清空的线程,用于服务质量统计和清空(窗口默认为10秒)// ……// 省略这部分和降级无关的统计代码}}
}

根据requestCount中的三个汇总统计量,下面看看在自动降级策略下,降级触发和恢复的相关策略。

降级触发和恢复策略

在DegradationFilter#invoker执行拦截器逻辑开始,有两处可能会触发降级策略:

  1. 在开始会先调用DegradationManager.INSTANCE.needDegrade(context)来判断是否需要进行降级。如果满足降级条件直接走降级策略。
  2. 在完成拦截器链的上游调用回来时,会根据返回结果为下面3类异常时,进一步结合自动降级或失败降级配置判断是否走降级策略。
    1. 异常非业务异常
    2. 自定义可降级的业务异常
    3. 在调用过程中抛出而捕获到异常,

降级触发逻辑具体看needDegrade实现:

public boolean needDegrade(InvokerContext context) {// 是否有指定方法降级配置,且配置开关开启if (degradationIsEnable(context)) {// 是否强制降级if (isForceDegrade) {return true;}// 是否开启自动降级开关if (isAutoDegrade) {if (!CollectionUtils.isEmpty(requestCountMap)) {String requestUrl = getRequestUrl(context);// 请求量统计,基于一个滑动窗口Count count = requestCountMap.get(requestUrl);if (count != null) {// 一、 请求总量达到指定阈值if (count.getTotalValue() >= degradeTotalThreshold) {// 二、非降级请求量(成功+失败)达到指定阈值,且失败率(非降级请求量/失败量)小于降级恢复比率// 这意味会尝试从降级中恢复过来if ((count.getTotalValue() - count.getDegradedValue()) > degradeInvokeThreshold&& count.getFailedPercent() < degradeRecoverPercent) {// (降级比率-恢复比率)发生降级,如果请求正常,则降级比率逐渐降低,会慢慢恢复正常return random(count.getDegradedPercent() - degradeRecoverInterval);} else if (count.getFailedPercent() >= degradeRecoverPercent) {// 三、失败率大于恢复率,最大降级比率(默认99.9%)发生降级return random(degradePercentMax);}}}}}// 是否失败降级if (isFailureDegrade) {// 在调用失败后才触发降级,直接返回false表示不降级return false;}}return false;
}// 失败比率计算方法
public float getFailedPercent() {int m = (total.get() - degraded.get());if (total.get() > 0 && m > 0) {return failed.get() * 100 / m;} else {return 0;}
}// 降级比率计算方法
public float getDegradedPercent() {if (total.get() > 0) {return degraded.get() * 100 / total.get();} else {return 0;}
}// 根据概率返回真假,如percent=80,则表示80%返回true,20%返回false
private boolean random(float percent) {return random.nextInt(10000) < percent * 100;
}

自动降级部分逻辑较为复杂,针对注视标注的3个条件判断,这里分情况说明:

  1. 窗口内请求量较少(不满足条件一),不降级,否则继续往下走:
  2. 请求总数增多(满足条件一),但请求基本正常(满足条件二,但负概率降级),即不降级
  3. 请求出现大量失败,失败率升高(满足条件一,三),则大概率降级。
  4. 第3点中,会漏出少部分不降级的请求,如果这部分的请求的基本正常,会触发条件二,即非降级请求达到请求阈值,且失败率低于恢复阈值,会尝试从降级请求中漏出一定比率用于试探正常调用
  5. 在第4点基础上,如果试探继续出现大量失败,则回到第3点,
  6. 在第4点基础上,如果试探请求正常,则调用比率降低,即越来越多的请求会恢复为正常调用,最终知道完全恢复。

在自动降级判断中,有几个关键配置:

  1. degradeTotalThreshold:窗口时间内最少请求总数阈值
  2. degradeInvokeThreshold:降级恢复阈值,当非降级请求量达到指定阈值后,开始尝试恢复。
  3. degradeRecoverPercent:失败率阈值百分比,非降级请求中,成功率超过这个比率阈值会触发恢复。
  4. degradeRecoverInterval: 起始恢复比率,如为10%则表示会从降级请求的比率中恢复10%尝试进行正常请求。
  5. degradePercentMax: 最大降级百分比如99%,表示失败率达到特定阈值后,会有99%的请求直接走降级,其余1%走正常请求用于试探恢复。

降级拦截器总体实现逻辑

基于上面分析,我们对降级策略的解析和处理时机以及具体触发、恢复策略有了较完整的了解,下面来看看DegradationFilter的invke方法完整实现:

public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext context) throws Throwable {context.getTimeline().add(new TimePoint(TimePhase.D));InvocationResponse degradeResponse;// 如果需要降级,则进行降级处理if (DegradationManager.INSTANCE.needDegrade(context)) {degradeResponse = degradeCall(context);if (degradeResponse != null) {// 返回自动降级熔断的降级结果return degradeResponse;}}boolean failed = false;Throwable failedException = null;try {// 继续拦截器链调用InvocationResponse response = handler.handle(context);Object responseReturn = response.getReturn();if (responseReturn != null) {int messageType = response.getMessageType();if (messageType == Constants.MESSAGE_TYPE_EXCEPTION) {// 非业务异常,尝试走降级RpcException rpcException = InvokerUtils.toRpcException(response);if (rpcException instanceof RemoteInvocationException|| rpcException instanceof RejectedException) {// 进一步限制为RemoteInvocationException或者RejectedException等非业务异常failed = true;failedException = rpcException;// 是否启用了调用失败降级if (DegradationManager.INSTANCE.needFailureDegrade(context)) {context.getDegradeInfo().setFailureDegrade(true);context.getDegradeInfo().setCause(rpcException);// 失败调用降级degradeResponse = degradeCall(context);if (degradeResponse != null) {// 返回同步调用模式的失败降级结果return degradeResponse;}}}} else if (messageType == Constants.MESSAGE_TYPE_SERVICE_EXCEPTION) {// 如果捕捉到用户指定的业务异常,包装为降级异常捕捉Exception exception = InvokerUtils.toApplicationException(response);// 异常为自定义降级异常,并且启用了自动降级或失败降级的开关if (DegradationManager.INSTANCE.needFailureDegrade(context)&& DegradationManager.INSTANCE.isCustomizedDegradeException(exception)) {failed = true;failedException = exception;if (DegradationManager.INSTANCE.needFailureDegrade(context)) { // 这个判断是多余的?context.getDegradeInfo().setFailureDegrade(true);context.getDegradeInfo().setCause(exception);// 触发降级策略degradeResponse = degradeCall(context);if (degradeResponse != null) {// 返回同步调用模式的失败降级结果return degradeResponse;}}}}}// 开始统计// 先获取调用方式InvokerConfig<?> invokerConfig = context.getInvokerConfig();byte callMethodCode = invokerConfig.getCallMethod(context.getMethodName());CallMethod callMethod = CallMethod.getCallMethod(callMethodCode);// 添加统计打点信息if (CallMethod.SYNC == callMethod) {// 同步调用统计if (failed) {// 统计失败调用情况DegradationManager.INSTANCE.addFailedRequest(context, failedException);} else {// 统计成功调用情况DegradationManager.INSTANCE.addNormalRequest(context);}}return response;} catch (ServiceUnavailableException | RemoteInvocationException | RequestTimeoutException| RejectedException e) {// 仅捕捉非业务异常failed = true;// 满足下列异常条件,则直接走降级if (DegradationManager.INSTANCE.needFailureDegrade(context)) {context.getDegradeInfo().setFailureDegrade(true);context.getDegradeInfo().setCause(e);degradeResponse = degradeCall(context);if (degradeResponse != null) {// 返回同步调用模式的失败降级结果return degradeResponse;}}// 添加统计打点信息DegradationManager.INSTANCE.addFailedRequest(context, e);throw e;} finally {// 添加统计打点信息RequestQualityManager.INSTANCE.addClientRequest(context, failed);}
}

降级具体策略实现

在上面判断需要触发降级时,会调用DegradationFilter#degradeCall:

public static InvocationResponse degradeCall(InvokerContext context) {InvocationResponse resp = doDegradeCall(context);if (resp != null) {// 监控数据记录InvokerMonitorData monitorData = (InvokerMonitorData) context.getMonitorData();if (monitorData != null) {monitorData.degrade();}if (context.getDegradeInfo().isFailureDegrade()) {// 如果是失败降级,统计失败调用情况DegradationManager.INSTANCE.addFailedRequest(context, new ServiceFailureDegreadedException());} else {// 统计降级调用情况DegradationManager.INSTANCE.addDegradedRequest(context, null);}}return resp;
}

继续追踪降级策略的实现逻辑,在DegradationFilter#doDegradeCall方法:

private static InvocationResponse doDegradeCall(InvokerContext context) {// 获取调用方法策略配置InvokerConfig<?> invokerConfig = context.getInvokerConfig();byte callMethodCode = invokerConfig.getCallMethod(context.getMethodName());CallMethod callMethod = CallMethod.getCallMethod(callMethodCode);InvocationResponse response = null;// 获取调用超时时间配置int timeout = invokerConfig.getTimeout(context.getMethodName());Integer timeoutThreadLocal = InvokerHelper.getTimeout();if (timeoutThreadLocal != null) {timeout = timeoutThreadLocal;}// 监控统计InvokerMonitorData monitorData = (InvokerMonitorData) context.getMonitorData();if (monitorData != null) {monitorData.setCallMethod(invokerConfig.getCallMethod());monitorData.setSerialize(invokerConfig.getSerialize());monitorData.setTimeout(timeout);monitorData.add();}// 线程内设置的默认结果,类似缓存Object defaultResult = InvokerHelper.getDefaultResult();// 获取特定服务方法配置的降级策略String key = DegradationManager.INSTANCE.getRequestUrl(context);DegradeAction action = degradeMethodActions.get(key);if (callMethod == CallMethod.FUTURE && context.getDegradeInfo().isFailureDegrade()) {callMethod = CallMethod.SYNC;}switch (callMethod) {case SYNC:try {if (defaultResult != null) {// 存在默认结果配置,返回默认结果response = InvokerUtils.createDefaultResponse(defaultResult);} else if (action != null) {if (action.isUseMockClass()) {// 依赖注入的mock对象Object mockObj = context.getInvokerConfig().getMock();if (mockObj != null) {// 反射调用配置的mock对象的指定方法,方法参数和请求方法一致defaultResult = new MockProxyWrapper(mockObj).invoke(context.getMethodName(),context.getParameterTypes(), context.getArguments());response = InvokerUtils.createDefaultResponse(defaultResult);}} else if (action.isUseGroovyScript()) {// 传入groovy脚本,生成相应的调用对象,再通过代理进行包装,通过反射调用代理方法来实际完成groovy脚本方法的调用defaultResult = new MockProxyWrapper(getGroovyMockProxy(key, context, action)).invoke(context.getMethodName(), context.getParameterTypes(), context.getArguments());response = InvokerUtils.createDefaultResponse(defaultResult);} else if (action.isThrowException()) {Exception exception;// 如果指定了异常,则返回指定异常,否则返回ServiceDegradedExceptionif (action.getReturnObj() == null) {exception = new ServiceDegradedException(key);} else {exception = (Exception) action.getReturnObj();}throw exception;} else {// 降级配置中的默认返回结果defaultResult = action.getReturnObj();response = InvokerUtils.createDefaultResponse(defaultResult);}}} catch (Throwable t) {// 业务异常response = InvokerUtils.createDefaultResponse(t);response.setMessageType(Constants.MESSAGE_TYPE_SERVICE_EXCEPTION);} finally {if (response != null) {// 标志为降级调用context.getDegradeInfo().setDegrade(true);addCurrentTimeData(timeout);}}break;case CALLBACK:// 实现原理类似与SYNC,只是用callBack进行了一层封装try {if (defaultResult != null) {response = callBackOnSuccess(context, defaultResult);} else if (action != null) {if (action.isUseMockClass()) {Object mockObj = context.getInvokerConfig().getMock();if (mockObj != null) {defaultResult = new MockProxyWrapper(mockObj).invoke(context.getMethodName(),context.getParameterTypes(), context.getArguments());response = callBackOnSuccess(context, defaultResult);}} else if (action.isUseGroovyScript()) {defaultResult = new MockProxyWrapper(getGroovyMockProxy(key, context, action)).invoke(context.getMethodName(), context.getParameterTypes(), context.getArguments());response = callBackOnSuccess(context, defaultResult);} else if (action.isThrowException()) {Exception exception;if (action.getReturnObj() == null) {exception = new ServiceDegradedException(key);} else {exception = (Exception) action.getReturnObj();}throw exception;} else {defaultResult = action.getReturnObj();response = callBackOnSuccess(context, defaultResult);}}} catch (Throwable t) {if (t instanceof Exception) {response = callBackOnfailure(context, (Exception) t);} else {response = callBackOnfailure(context, new ApplicationException(t));}} finally {if (response != null) {context.getDegradeInfo().setDegrade(true);addCurrentTimeData(timeout);MonitorTransaction transaction = MonitorLoader.getMonitor().getCurrentCallTransaction();if (transaction != null) {DegradationManager.INSTANCE.monitorDegrade(context, transaction);}}}break;case FUTURE:// 实现原理类似与SYNC,只是用future进行了一层封装if (defaultResult != null) {DegradeServiceFuture future = new DegradeServiceFuture(context, timeout);FutureFactory.setFuture(future);response = InvokerUtils.createFutureResponse(future);future.callback(InvokerUtils.createDefaultResponse(defaultResult));future.run();} else if (action != null) {if (action.isUseMockClass()) {Object mockObj = context.getInvokerConfig().getMock();if (mockObj != null) {MockProxyWrapper mockProxyWrapper = new MockProxyWrapper(mockObj);MockCallbackFuture future = new MockCallbackFuture(mockProxyWrapper, context, timeout);FutureFactory.setFuture(future);response = InvokerUtils.createFutureResponse(future);future.callback(response);future.run();}} else if (action.isUseGroovyScript()) {MockProxyWrapper mockProxyWrapper = new MockProxyWrapper(getGroovyMockProxy(key, context, action));MockCallbackFuture future = new MockCallbackFuture(mockProxyWrapper, context, timeout);FutureFactory.setFuture(future);response = InvokerUtils.createFutureResponse(future);future.callback(response);future.run();} else if (action.isThrowException()) {Exception exception;if (action.getReturnObj() == null) {exception = new ServiceDegradedException(key);} else {exception = (Exception) action.getReturnObj();}DegradeServiceFuture future = new DegradeServiceFuture(context, timeout);FutureFactory.setFuture(future);response = InvokerUtils.createFutureResponse(future);future.callback(InvokerUtils.createDefaultResponse(exception));future.run();} else {defaultResult = action.getReturnObj();DegradeServiceFuture future = new DegradeServiceFuture(context, timeout);FutureFactory.setFuture(future);response = InvokerUtils.createFutureResponse(future);future.callback(InvokerUtils.createDefaultResponse(defaultResult));future.run();}}if (response != null) {context.getDegradeInfo().setDegrade(true);addCurrentTimeData(timeout);}break;case ONEWAY:// 不需要调用结果的调用,返回空对象context.getDegradeInfo().setDegrade(true);addCurrentTimeData(timeout);response = NO_RETURN_RESPONSE;break;default:break;}if (response != null) {((DefaultInvokerContext) context).setResponse(response);}return response;
}

在降级处理中,看看groovy脚本执行原理:
先生成一个脚本动态代理:

private static Object getGroovyMockProxy(String key, InvokerContext context, DegradeAction action) {// 获取脚本代理缓存Object interfaceProxy = groovyMocks.get(key);// 不存在则初始化if (interfaceProxy == null) {// 生成动态代理interfaceProxy = MockInvocationUtils.getProxy(context.getInvokerConfig(),new GroovyScriptInvocationProxy(action.getGroovyScript()));// 更新缓存,需要考虑并发Object oldInterfaceProxy = groovyMocks.putIfAbsent(key, interfaceProxy);if (oldInterfaceProxy != null) {interfaceProxy = oldInterfaceProxy;}}return interfaceProxy;
}

动态代理的封装调用非常简单:

public class MockInvocationUtils {public static Object getProxy(InvokerConfig invokerConfig, InvocationHandler proxyObject) {return Proxy.newProxyInstance(ClassUtils.getCurrentClassLoader(invokerConfig.getClassLoader()),new Class[]{invokerConfig.getServiceInterface()}, proxyObject);}
}

再看看GroovyScriptInvocationProxy实现:

public class GroovyScriptInvocationProxy implements InvocationHandler {private final Script script;public GroovyScriptInvocationProxy(Script script) {this.script = script;}@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {String methodName = method.getName();Class<?>[] parameterTypes = method.getParameterTypes();// 如果是Object的基本方法或重载方法,特殊处理,if (method.getDeclaringClass() == Object.class) {return method.invoke(script, args);}if ("toString".equals(methodName) && parameterTypes.length == 0) {return script.toString();}if ("hashCode".equals(methodName) && parameterTypes.length == 0) {return script.hashCode();}if ("equals".equals(methodName) && parameterTypes.length == 1) {return script.equals(args[0]);}// 运行脚本return script.run();}
}

最后看看MockProxyWrapper,传入一个代理对象,基于方法名,参数类型和具体参数值,调用具体的代理对象方法,mock对象也基于MockProxyWrapper进行调用:

public class MockProxyWrapper {private final Object proxy;public MockProxyWrapper(Object proxy) {if (proxy == null) {throw new IllegalArgumentException("proxy == null");}this.proxy = proxy;}public Object invoke(String methodName, Class<?>[] parameterTypes, Object[] arguments)throws Throwable {// 根据方法名,参数类型列表反射获取方法对象Method method = proxy.getClass().getMethod(methodName, parameterTypes);try {// 反射调用方法return method.invoke(proxy, arguments);} catch (InvocationTargetException e) {// 反射异常处理,返回实际异常Throwable t = e.getTargetException();if (t instanceof UndeclaredThrowableException && t.getCause() != null) {throw t.getCause();}throw t;}}
}

【Pigeon源码阅读】高可用之熔断降级实现原理(十四)相关推荐

  1. JVM源码阅读-本地库加载流程和原理

    前言 本文主要研究OpenJDK中JVM源码中涉及到native本地库的加载流程和原理的部分.主要目的是为了了解本地库是如何被加载到虚拟机,以及是如何找到并执行本地库里的本地方法,以及JNI的 JNI ...

  2. 《Android源码设计模式解析与实战》读书笔记(十四)

    第十四章.迭代器模式 迭代器模式,又叫做游标模式,是行为型设计模式之一.我们知道对容器对象的访问必然会涉及遍历算法,我们可以将遍历的方法封装在容器中,或者不提供遍历方法,让使用容器的人自己去实现去吧. ...

  3. Flume-NG源码阅读之SourceRunner,及选择器selector和拦截器interceptor的执行

    在AbstractConfigurationProvider类中loadSources方法会将所有的source进行封装成SourceRunner放到了Map<String, SourceRun ...

  4. 多线程与高并发(四):LockSupport,高频面试题,AQS源码,以及源码阅读方法论

    补充几道面试题 锁升级过程:无锁.偏向锁.轻量级锁.重量级锁 StampedLock 自己看一下 面试题:syn和Reentrantlock的区别? LockSupport LockSupport.p ...

  5. 源码 状态机_[源码阅读] 阿里SOFA服务注册中心MetaServer(1)

    [源码阅读] 阿里SOFA服务注册中心MetaServer(1) 0x00 摘要 0x01 服务注册中心 1.1 服务注册中心简介 1.2 SOFARegistry 总体架构 1.3 为什么要分层 0 ...

  6. 封装成jar包_通用源码阅读指导mybatis源码详解:io包

    io包 io包即输入/输出包,负责完成 MyBatis中与输入/输出相关的操作. 说到输入/输出,首先想到的就是对磁盘文件的读写.在 MyBatis的工作中,与磁盘文件的交互主要是对 xml配置文件的 ...

  7. Soul 网关源码阅读(一) 概览

    Soul 源码阅读(一) 概览 简介     阅读soul的官方文档,大致了解soul的功能和相关概念 心得     需要对网关的功能有个大致的了解,把soul官方文档读两遍(第一遍通读,能看懂多少是 ...

  8. 大神手把手教源码阅读的方法、误区以及三种境界

    丁威 中间件兴趣圈 读完需要 1 分钟 速读仅需 1 分钟 在技术职场中普遍存在如下几种现象: 对待工作中所使用的技术不需要阅读源码,只需在开发过程中能够熟练运用就行 看源码太费时间,而且容易忘记,如 ...

  9. [源码阅读] 阿里SOFA服务注册中心MetaServer(1)

    0x00 摘要 SOFARegistry 是蚂蚁金服开源的一个生产级.高时效.高可用的服务注册中心.本系列将带领大家一起分析其MetaServer的实现机制,本文为第一篇,介绍MetaServer总体 ...

  10. 【游戏编程扯淡精粹】EASTL源码阅读

    [游戏编程扯淡精粹]EASTL源码阅读 侯捷先生在<漫谈程序员与编程> 中讲到 STL 运用的三个档次:"会用 STL,是一种档次.对 STL 原理有所了解,又是一个档次.追踪过 ...

最新文章

  1. Blockchain区块链架构设计之四:Fabric多通道和下一代账本设计
  2. 32个最热CPLD-FPGA论坛
  3. JQuery Datatables —— 自定义导出列
  4. Linux——k8s命令别名修改
  5. Java面试之谈谈对Volatile的理解
  6. linux 连接两个异构网,用cheops-ng管理Linux异构网络(图)
  7. .NET2.0 事务处理
  8. php将中文编译成字符串,PHP将汉字字符串转换为数组
  9. 鼠标宏会不会封号_每天一个英雄联盟封号技巧:峡谷先锋可以连续撞塔两次,你会吗?...
  10. 软件加入使用时间_2020年,加入“midi音乐制作讲堂”内部会员,学音乐制作变得更简单...
  11. 如何修改dns服务器ip,如何修改DNS服务器IP地址
  12. esxi主机,提示“当前主机无管理网络冗余“报警。
  13. 四级口语计算机对话,2017大学英语四级口语场景对话练习(5)
  14. 3D场景搭建的隐秘功能——时间轴
  15. 如何生成带附加码的EAN13商品条码
  16. 强人工智能和弱人工智能的区别,你知道吗?
  17. shiro官方源码包下载
  18. 一文看懂“摩根系”,摩根士丹利、摩根大通、大摩、小摩到底有什么关系?
  19. 哈工大李治军老师操作系统笔记【27】:从生磁盘到文件(Learning OS Concepts By Coding Them !)
  20. 国内黑客组织及代表性人物

热门文章

  1. [论文评析]Density‑based weighting for imbalanced regression,Machine Learning,2021
  2. Intel处理器概述
  3. AutoIt的录制(AU3Record)
  4. 如何将多个mp3文件合并成一个?
  5. 关闭开机弹窗广告2345(其他弹窗也适用)
  6. 论《计算机网络技术》与素质教育
  7. 一场CF的台前幕后(下)
  8. sqlserver Month()函数取日期不足两位的加 0
  9. [论文笔记] ASFD 阅读笔记
  10. mysql字符串数值按数值排序问题