摘要:本文通过案例,发现了一个Camel Multicast组件聚合策略相关的问题。通过查看Camel源代码,找到了问题原因并给出了解决方案。希望本文可以帮助到遇到同样问题的Camel用户。

本文分享自华为云社区《使用Apache Camel Multicast组件遇到的一个问题》,作者:中间件小哥。

1 前言

本文翻译自华为加拿大研究所的Reji Mathews发表于Apache Camel社区的《ROUTING MULTICAST OUTPUT AFTER ENCOUNTERING PARTIAL FAILURES》一文。在征得原作者同意后,本文对原文的部分内容作了少许修改。

2 Multicast组件简介

Multicast是Apache Camel(以下简称“Camel”)中一个功能强大的EIP组件,可以将消息发送至多条子路径,然后并行地执行它们。

参考官网文档,我们可以使用两种方式配置Multicast组件:

  • 独立执行所有子路径,并将最后响应的子路径的结果作为最终输出。这也是Multicast组件的默认配置。
  • 通过实现Camel的聚合策略(Aggregation Strategy),使用自定义的聚合器来处理所有子路径的输出。

3 问题描述

本文使用案例如下:使用Jetty组件发布一个API,调用该API后,消息会分别发送至"direct:A"和"direct:B"两条子路径。在使用自定义的聚合策略处理后,继续执行后续步骤。其中在"direct:A"中抛出一个异常,来模拟运行失败;"direct:B"正常运行。同时在onException中定义了异常处理策略。

本文使用的Camel版本为3.8.0

@Override
public void configure() throws Exception {onException(Exception.class).useOriginalMessage().handled(true).log("Exception handler invoked").transform().constant("{\"data\" : \"err\"}").end();from("jetty:http://localhost:8081/myapi?httpMethodRestrict=GET").log("received request").log("Entering multicast").multicast(new SimpleFlowMergeAggregator()).parallelProcessing().to("direct:A", "direct:B").end().log("Aggregated results ${body}").log("Another log").transform(simple("{\"result\" : \"success\"}")).end();from("direct:A").log("Executing PATH_1 - exception path").transform(constant("DATA_FROM_PATH_1")).log("Starting exception throw").throwException(new Exception("USER INITIATED EXCEPTION")).log("PATH_1").end();from("direct:B").log("Executing PATH_2 - success path").delayer(1000).transform(constant("DATA_FROM_PATH_2")).log("PATH_2").end();
}

自定义聚合器SimpleFlowMergeAggregator定义如下,其中我们将所有子路径的结果放入一个list对象。

public class SimpleFlowMergeAggregator implements AggregationStrategy {private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFlowMergeAggregator.class.getName());@Overridepublic Exchange aggregate(Exchange oldExchange, Exchange newExchange) {LOGGER.info("Inside aggregator " + newExchange.getIn().getBody());if(oldExchange == null) {String data = newExchange.getIn().getBody(String.class);List<String> aggregatedDataList = new ArrayList<>();aggregatedDataList.add(data);newExchange.getIn().setBody(aggregatedDataList);return newExchange;}List<String> oldData = oldExchange.getIn().getBody(List.class);oldData.add(newExchange.getIn().getBody(String.class));oldExchange.getIn().setBody(oldData);return oldExchange;}
}

基于对Multicast组件执行逻辑的理解,我们认为存在多个子路径时,其运行结果应该为:如果其中有一条子路径能运行成功,则使用聚合的结果继续执行后续步骤;如果所有子路径都运行失败,则停止整个路由(route)。本案例中,由于子路径"direct:A"运行异常,子路径"direct:B"运行正常,则应该正常执行后续两个步骤日志(log)和转换(transform)。

运行上述案例,日志信息如下:

2021-05-06 12:43:18.565 INFO 13956 --- [qtp916897446-42] route1 : received request
2021-05-06 12:43:18.566 INFO 13956 --- [qtp916897446-42] route1 : Entering multicast
2021-05-06 12:43:18.575 INFO 13956 --- [ #4 - Multicast] route2 : Executing PATH_1 - exception path
2021-05-06 12:43:18.575 INFO 13956 --- [ #4 - Multicast] route2 : Starting exception throw
2021-05-06 12:43:18.578 INFO 13956 --- [ #4 - Multicast] route2 : Exception handler invoked
2021-05-06 12:43:18.579 INFO 13956 --- [ #4 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Inside aggregator {"data" : "err"}
2021-05-06 12:43:19.575 INFO 13956 --- [ #3 - Multicast] route3 : Executing PATH_2 - success path
2021-05-06 12:43:21.576 INFO 13956 --- [ #3 - Multicast] route3 : PATH_2
2021-05-06 12:43:21.576 INFO 13956 --- [ #3 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Inside aggregator DATA_FROM_PATH_2

观察上述日志,我们发现完成两条子路径结果的聚合后,后续的两个步骤日志(log)和转换(transform)并未执行。这并不符合我们期望的结果。

经过多次测试,我们还发现,只有当到达聚合器SimpleFlowMergeAggregator的第一个子路径("direct:A")执行异常时,便会发生这种后续步骤未执行的情况;而如果第一个子路径("direct:A")执行成功,即使另一个子路径("direct:B")执行失败,也会继续执行后续的步骤。

4 问题分析

接下来,我们通过查看Camel源代码,来找出上述现象的原因。

在camel-core-processors模块的Pipeline.java 中,其run()方法中有这样一段代码:

@Override
public void run() {boolean stop = exchange.isRouteStop();int num = index;boolean more = num < size;boolean first = num == 0;if (!stop && more && (first || continueProcessing(exchange, "so breaking out of pipeline", LOG))) {// prepare for next runif (exchange.hasOut()) {exchange.setIn(exchange.getOut());exchange.setOut(null);}// get the next processorAsyncProcessor processor = processors.get(index++);processor.process(exchange, this);} else {// copyResults is needed in case MEP is OUT and the message is not an OUT messageExchangeHelper.copyResults(exchange, exchange);// logging nextExchange as it contains the exchange that might have altered the payload and since// we are logging the completion if will be confusing if we log the original instead// we could also consider logging the original and the nextExchange then we have *before* and *after* snapshotsif (LOG.isTraceEnabled()) {LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);}AsyncCallback cb = callback;taskFactory.release(this);reactiveExecutor.schedule(cb);}
}

其中,这个if判断决定了是否继续执行后续步骤:

if (!stop && more && (first || continueProcessing(exchange, "so breaking out of pipeline", LOG)))

可以看出,在如下三种情况下,后续步骤将不会被执行:

1. 之前的步骤已经将exchange 对象标记为停止状态。

boolean stop = exchange.isRouteStop();

2. 后续没有步骤可执行。

boolean more = num < size;

3. continueProcessing()方法返回false。

我们来看看continueProcessing()方法的代码。

public final class PipelineHelper {public static boolean continueProcessing(Exchange exchange, String message, Logger log) {ExtendedExchange ee = (ExtendedExchange) exchange;boolean stop = ee.isFailed() || ee.isRollbackOnly() || ee.isRollbackOnlyLast()|| (ee.isErrorHandlerHandledSet() && ee.isErrorHandlerHandled());if (stop) {if (log.isDebugEnabled()) {StringBuilder sb = new StringBuilder();sb.append("Message exchange has failed: ").append(message).append(" for exchange: ").append(exchange);if (exchange.isRollbackOnly() || exchange.isRollbackOnlyLast()) {sb.append(" Marked as rollback only.");}if (exchange.getException() != null) {sb.append(" Exception: ").append(exchange.getException());}if (ee.isErrorHandlerHandledSet() && ee.isErrorHandlerHandled()) {sb.append(" Handled by the error handler.");}log.debug(sb.toString());}return false;}if (ee.isRouteStop()) {if (log.isDebugEnabled()) {log.debug("ExchangeId: {} is marked to stop routing: {}", exchange.getExchangeId(), exchange);}return false;}return true;}
}

可以看出,当执行过程发生异常并且被异常处理器捕获时,continueProcessing()方法将返回false。

再回到我们的案例,第一个到达聚合器SimpleFlowMergeAggregator的子路径("direct:A"),会作为后续聚合的基础,其它子路径("direct:B")会在此基础上追加各自的body数据。实际上,很多Camel用户都会采用这种方式来实现自定义聚合策略。但这样做存在一个问题:在异常处理时,子路径"direct:A"的exchange对象会被设置一个状态标识,而此状态标识会被传递到下游,用于判断是否继续执行后续步骤。由于作为聚合基础的"direct:A"子路径的exchange对象状态为“异常”,最终continueProcessing()方法将返回false,后续的步骤也就不会再执行。

5 解决方案

对于上述问题,用户可以使用多种方式来设置异常处理时exchange对象的状态。本文采用如下解决方案:如果第一个子路径执行正常,则继续执行后续步骤;如果第一个子路径执行异常,则将其与其它执行成功的子路径交换,然后继续执行后续步骤。

更新后的自定义聚合器SimpleFlowMergeAggregator如下:

public class SimpleFlowMergeAggregator implements AggregationStrategy {private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFlowMergeAggregator.class.getName());@Overridepublic Exchange aggregate(Exchange oldExchange, Exchange newExchange) {LOGGER.info("Inside aggregator " + newExchange.getIn().getBody());if(oldExchange == null) {String data = newExchange.getIn().getBody(String.class);List<String> aggregatedDataList = new ArrayList<>();aggregatedDataList.add(data);newExchange.getIn().setBody(aggregatedDataList);return newExchange;}if(hadException(oldExchange)) {if(!hadException(newExchange)) {// aggregate and swap the baseLOGGER.info("Found new exchange with success. swapping the base exchange");List<String> oldData = oldExchange.getIn().getBody(List.class);oldData.add(newExchange.getIn().getBody(String.class));// swapped the base herenewExchange.getIn().setBody(oldData);                 return newExchange;}}List<String> oldData = oldExchange.getIn().getBody(List.class);oldData.add(newExchange.getIn().getBody(String.class));oldExchange.getIn().setBody(oldData);return oldExchange;}private boolean hadException(Exchange exchange) {if(exchange.isFailed()) {return true;}if(exchange.isRollbackOnly()) {return true;}if(exchange.isRollbackOnlyLast()) {return true;}if(((ExtendedExchange)exchange).isErrorHandlerHandledSet()&& ((ExtendedExchange)exchange).isErrorHandlerHandled()) {return true;}return false;}
}

再次运行上述案例,日志信息如下:

2021-05-06 12:46:19.122 INFO 2576 --- [qtp174245837-45] route1 : received request
2021-05-06 12:46:19.123 INFO 2576 --- [qtp174245837-45] route1 : Entering multicast
2021-05-06 12:46:19.130 INFO 2576 --- [ #3 - Multicast] route2 : Executing PATH_1 - exception path
2021-05-06 12:46:19.130 INFO 2576 --- [ #3 - Multicast] route2 : Starting exception throw
2021-05-06 12:46:19.134 INFO 2576 --- [ #3 - Multicast] route2 : Exception handler invoked
2021-05-06 12:46:19.135 INFO 2576 --- [ #3 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Inside aggregator {"data" : "err"}
2021-05-06 12:46:20.130 INFO 2576 --- [ #4 - Multicast] route3 : Executing PATH_2 - success path
2021-05-06 12:46:22.132 INFO 2576 --- [ #4 - Multicast] route3 : PATH_2
2021-05-06 12:46:22.132 INFO 2576 --- [ #4 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Inside aggregator DATA_FROM_PATH_2
2021-05-06 12:46:22.132 INFO 2576 --- [ #4 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Found new exchange with success. swapping the base exchange
2021-05-06 12:46:22.133 INFO 2576 --- [ #4 - Multicast] route1 : Aggregated results {"data" : "err"},DATA_FROM_PATH_2
2021-05-06 12:46:22.133 INFO 2576 --- [ #4 - Multicast] route1 : Another log

可以看出,使用新的自定义聚合策略后,后续的日志(log)和转换(transform)步骤都成功执行。

6 结语

本文通过案例,发现了一个Camel Multicast组件聚合策略相关的问题。通过查看Camel源代码,找到了问题原因并给出了解决方案。

希望本文可以帮助到遇到同样问题的Camel用户。

点击关注,第一时间了解华为云新鲜技术~

一个Camel Multicast组件聚合策略问题的解决过程相关推荐

  1. Apache Camel日志组件示例

    Apache Camel日志组件示例 您要将消息记录到底层的记录机制,请使用骆驼的log:组件. Camel使用sfl4j作为记录器API,然后允许您配置记录器实现. 在本文中,我们将使用Log4j作 ...

  2. 适用于ActiveMQ 5.9的Apache Camel Broker组件

    将Apache Camel嵌入ActiveMQ代理可以为使用Camel的集成功能扩展消息代理提供极大的灵活性. Apache Camel路由的另一个好处是,如果使用activemq组件 ,则可以避免远 ...

  3. 2020-12-24 如何编写一个简单的双均线策略

    如何编写一个简单的双均线策略 目的:编写一个无需写入函数.类的简单策略,对比文字与代码表达的区别,了解编写逻辑与原理 策略类型:双均线策略 策略原理: 双均线策略,指的是运用两条不同周期的移动平均线, ...

  4. java拖动组件,[小娱乐] 一个能拖动组件、改变组件大小的容器

    [小娱乐] 一个能拖动组件.改变组件大小的容器 /* * JDragpullPane.java * * Created on 2007年3月20日, 上午12:31 */ package javax. ...

  5. 构建你的第一个Vue.js组件

    我记得当时我拿起CakePHP,我很喜欢,开始使用它是多么容易.这些文档不仅结构合理,详尽无遗,而且用户友好.多年以后,这正是我在Vue.js中感受到的.然而,与Cake相比,Vue文档仍然缺少一件事 ...

  6. animation基础练习源码_用vue简单写一个音乐播放组件「附源码」

    作者:vipbic 转发链接:https://segmentfault.com/a/1190000022980992 前言 上次小编也分享一个关于Vue 开发过音乐播放对项目: 基于 electron ...

  7. 快速实现一个Http回调组件

    2019独角兽企业重金招聘Python工程师标准>>> 快速实现一个Http回调组件 一.前情回顾 ​ 我们平时在使用一些库的时候,会遇到一些看起来很舒服的写法,用起来感觉很简单,而 ...

  8. apache camel_Apache Camel日志组件示例

    apache camel Apache Camel日志组件示例 您要将消息记录到底层的记录机制中,请使用骆驼的log:组件. Camel使用sfl4j作为记录器API,然后允许您配置记录器实现. 在本 ...

  9. 如何对第一个Vue.js组件进行单元测试

    by Sarah Dayan 通过莎拉·达扬 In Build Your First Vue.js Component we made a star rating component. We've c ...

最新文章

  1. 综述:AI系统安全的实用方法介绍
  2. python下拉菜单_python-web自动化:下拉列表操作
  3. Java 8 Stream Api 中的 skip 和 limit 操作
  4. 索尼将成立200亿日元基金 投资机器人、人工智能等新兴企业
  5. android 6.0权限封装,Android6.0------权限申请管理(单个权限和多个权限申请)
  6. luoguP4755 Beautiful Pair
  7. uni-ui介绍uni-api
  8. 各类 动态获取时间,计算距离,贝塞尔曲线等 的方法
  9. Ubuntu12.04 安装(无法将 grub-efi 软件包安装到/target/中,如果没有 GRUB 启动引导期,所安装的系统无法启动)...
  10. NLog 在winform和asp.net下使用快速攻略 .
  11. 上粱正,下粱不歪——网吧母盘制作流程(转)
  12. 编程到底难在哪里?—— 《人月神话》阅读分享
  13. 【设计】同步降压型DC-DC转换器驱动电路设计
  14. oppo手机热点Android,OPPO手机怎么开启热点?OPPO手机共享网络的三种方法
  15. “朋友”两个字好辛苦
  16. Twitter Inc.(TWTR)2020年第三季度收益电话会议记录
  17. asp.net mvc如何设置起始页
  18. 基于Delphi7openGauss2.0开发社区信息管理系统-设计文档
  19. libevent库bufferevent事件实现socket通信
  20. 一些免费实用的接口,调用次数无限制

热门文章

  1. 前端:JS/26/实例:随机显示小星星
  2. 中止请求和超时 跨域的HTTP请求 认证方式 JSONP
  3. mysql交给spring管理_Mysql事务结合spring管理
  4. mysql 多行 连续_mysql多表连续查询的问题
  5. mysql中文乱码解决_Stata 中文乱码顽疾解决方法
  6. 二等水准数据平差_二等水准复测平差成果表
  7. net 架构师-数据库-sql server-003-T-SQL 基本语句
  8. react-native ios打包和Android打包
  9. oracle备份表和数据
  10. JSP技术模型(五)JSP隐含变量