错误处理在Spring Integration中如何工作
1.引言
这篇文章的目标是向您展示将消息传递系统与Spring Integration结合使用时如何处理错误。 您将看到同步和异步消息传递之间的错误处理有所不同。 和往常一样,我将跳过聊天并继续进行一些示例。
- 您可以在github上获取源代码。
2,样品申请
我将使用一个基本示例,因为我想专注于异常处理。 该应用程序包含一个订单服务,该服务接收订单,处理订单并返回确认。
下面我们可以看到消息传递系统的配置方式:
int-config.xml
<context:component-scan base-package="xpadro.spring.integration"/><int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.OrderService"/><int:channel id="requestChannel"/><int:router input-channel="requestChannel" ref="orderRouter" method="redirectOrder"/><int:channel id="syncChannel"/><int:channel id="asyncChannel"><int:queue capacity="5"/>
</int:channel><int:service-activator method="processOrder" input-channel="syncChannel" ref="orderProcessor"/><int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor"><int:poller fixed-delay="2000"/>
</int:service-activator>
网关是消息传递系统的入口点。 它将接收订单并将其发送到直接通道“ requestChannel”,路由器将根据订单ID将其重定向到适当的通道:
- syncChannel:一个直接通道 ,它将订单发送到订阅该通道的订单处理器。
- asyncChannel:一个队列通道 ,订单处理器将从中主动检索订单。
处理订单后,订单确认将发送回网关。 这是代表此的图形:
好的,让我们从最简单的情况开始,使用直接通道进行同步发送。
3.与直接通道同步发送
订单处理器已订阅“ syncChannel”直接渠道。 “ processOrder”方法将在发送者的线程中调用。
public OrderConfirmation processOrder(Order order) {logger.info("Processing order {}", order.getId());if (isInvalidOrder(order)) {logger.info("Error while processing order [{}]", ERROR_INVALID_ID);throw new InvalidOrderException(ERROR_INVALID_ID);}return new OrderConfirmation("confirmed");
}
现在,我们将执行一个测试,该测试将通过发送无效订单来引发异常。 此测试将向网关发送订单:
public interface OrderService {@Gatewaypublic OrderConfirmation sendOrder(Order order);
}
考试:
TestSyncErrorHandling.java
@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestSyncErrorHandling {@Autowiredprivate OrderService service;@Testpublic void testCorrectOrder() {OrderConfirmation confirmation = service.sendOrder(new Order(3, "a correct order"));Assert.assertNotNull(confirmation);Assert.assertEquals("confirmed", confirmation.getId());}@Testpublic void testSyncErrorHandling() {OrderConfirmation confirmation = null;try {confirmation = service.sendOrder(new Order(1, "an invalid order"));Assert.fail("Should throw a MessageHandlingException");} catch (MessageHandlingException e) {Assert.assertEquals(InvalidOrderException.class, e.getCause().getClass());Assert.assertNull(confirmation);}}
}
我们运行测试,看看在订单处理器中如何引发异常并到达测试。 没关系; 我们想验证发送无效订单是否引发了异常。 发生这种情况是因为测试发送了订单,并阻止等待在同一线程中处理订单。 但是,当我们使用异步通道时会发生什么? 让我们继续下一节。
4,与队列通道异步发送
此部分的测试发送一个命令,该命令将由路由器重定向到队列通道。 网关如下所示:
public interface OrderService {@Gatewaypublic Future<OrderConfirmation> sendFutureOrder(Order order);
}
请注意,这次网关正在返回Future 。 如果我们不返回此值,则网关将阻止测试线程。 通过返回Future,网关将变为异步状态,并且不会阻塞发送方的线程。
考试:
TestKoAsyncErrorHandling.java
@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestKoAsyncErrorHandling {@Autowiredprivate OrderService service;@Test(expected=MessageHandlingException.class)public void testAsyncErrorHandling() throws InterruptedException, ExecutionException {Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(6, "another order"));}
}
好的,现在我们将启动测试并看到引发异常的信息…
java.lang.AssertionError: Expected exception: org.springframework.integration.MessageHandlingException
糟糕,测试失败,因为没有异常到达测试! 发生了什么? 好吧,解释如下:
<int:channel id="asyncChannel"><int:queue capacity="5"/>
</int:channel><int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor"><int:poller fixed-delay="2000"/>
</int:service-activator>
由于我们使用的是异步通道(队列),因此发送者发送订单并继续前进。 然后,接收方从另一个线程轮询订单。 因此,不可能将Exception抛回到发送方。 让我们表现得好像什么都没发生吗? 好吧,您最好不要,还有其他选择。
5,异步错误处理
当使用异步消息传递时,Spring Integration通过将异常发布到消息通道来处理它们。 引发的异常将包装到MessagingException中,并成为消息的有效负载。
错误消息发送到哪个通道? 首先,它将检查请求消息是否包含名为“ errorChannel”的标头。 如果找到,错误消息将被发送到那里。 否则,该消息将被发送到所谓的全局错误通道。
5.1全局错误通道
默认情况下,Spring Integration创建一个名为“ errorChannel”的全局错误通道。 该频道是发布-订阅频道。 这意味着我们可以为该频道订阅多个端点。 实际上,已经有一个端点订阅了它:一个日志记录处理程序 。该处理程序将记录到达通道的消息的有效负载,尽管可以将其配置为不同的行为。
现在,我们将向该全局通道订阅一个新的处理程序,并通过将其存储到数据库中来测试它是否接收到异常消息。
首先,我们需要在配置中进行一些更改。 我创建了一个新文件,因此它不会干扰我们之前的测试:
int-async-config.xml
<context:component-scan base-package="xpadro.spring.integration"/><int:gateway default-request-channel="asyncChannel" service-interface="xpadro.spring.integration.service.OrderService" error-channel="errorChannel"/><int:channel id="asyncChannel"><int:queue capacity="5"/>
</int:channel><int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor"><int:poller fixed-delay="2000"/>
</int:service-activator><int:service-activator input-channel="errorChannel" ref="orderErrorHandler" method="handleFailedOrder"/><bean id="orderErrorHandler" class="xpadro.spring.integration.activator.OrderErrorHandler"/>
网关 :我添加了一个错误通道。 如果调用失败,错误消息将发送到该通道。 如果我没有定义错误通道,则网关会将异常传播给调用者,但是在这种情况下,由于这是一个异步网关,因此无法正常工作。
错误处理程序 :我已经定义了一个新的端点,该端点已订阅了全局错误通道。 现在,任何发送到全局错误通道的错误消息都将传递给我们的处理程序。
我还添加了一个配置文件以配置数据库。 我们的错误处理程序会将收到的错误插入此数据库:
db-config.xml
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"><constructor-arg ref="dataSource"/>
</bean><!-- in-memory database -->
<jdbc:embedded-database id="dataSource"><jdbc:script location="classpath:db/schemas/schema.sql" />
</jdbc:embedded-database>
错误处理程序非常简单。 它收到错误消息,并将其信息插入数据库:
public class OrderErrorHandler {@Autowiredprivate JdbcTemplate jdbcTemplate;@ServiceActivatorpublic void handleFailedOrder(Message<MessageHandlingException> message) {Order requestedOrder = (Order) message.getPayload().getFailedMessage().getPayload();saveToBD(requestedOrder.getId(), message.getPayload().getMessage());}private void saveToBD(int orderId, String errorMessage) {String query = "insert into errors(orderid, message) values (?,?)";jdbcTemplate.update(query, orderId, errorMessage);}
}
好的,现在一切就绪。 让我们实施一个新的测试:
TestOkAsyncErrorHandlingTest.java
@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-async-config.xml","/xpadro/spring/integration/config/db-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestOkAsyncErrorHandling {@Autowiredprivate JdbcTemplate jdbcTemplate;@Autowiredprivate OrderService service;@Beforepublic void prepareTest() {jdbcTemplate.update("delete from errors");}@Testpublic void testCorrectOrder() throws InterruptedException, ExecutionException {Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(7, "another correct order"));OrderConfirmation orderConfirmation = confirmation.get();Assert.assertNotNull(orderConfirmation);Assert.assertEquals("confirmed", orderConfirmation.getId());}@Testpublic void testAsyncErrorHandling() throws InterruptedException, ExecutionException {Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(6, "another order"));Thread.sleep(2000);Assert.assertEquals(1, getSavedErrors());validateSavedError(6);}private int getSavedErrors() {return jdbcTemplate.queryForObject("select count(*) from errors", Integer.class);}private void validateSavedError(int orderId) {String query = "select * from errors where orderid=?";Map<String, Object> result = jdbcTemplate.queryForMap(query, orderId);Assert.assertEquals(6, result.get("orderid"));assertThat((String)result.get("message"), containsString("Order ID is invalid"));}
}
这次测试成功,错误消息已存储到数据库。
5.2其他机制
自定义错误通道 :您可以定义错误通道并将其定义为队列通道,而不是默认的发布-订阅通道:
<int:poller id="defaultPoller" default="true" fixed-delay="5000" /><int:channel id="errorChannel"><int:queue capacity="10"/>
</int:channel>
ErrorMessageExceptionTypeRouter :这个Spring Integration专用路由器将解析将错误消息发送到的通道。 它基于错误的最具体原因做出决定:
<int:exception-type-router input-channel="errorChannel" default-output-channel="genericErrorChannel"><int:mapping exception-type="xpadro.spring.integration.exception.InvalidOrderException" channel="invalidChannel" /><int:mapping exception-type="xpadro.spring.integration.exception.FooException" channel="fooChannel" />
</int:exception-type-router>
六,结论
我们已经了解了使用Spring Integration时错误处理的不同机制是什么。 有了这个基础,您将能够通过实现转换器从错误消息中提取信息,使用标头扩展器设置错误通道或实现自己的路由器等来扩展它并配置错误处理。
翻译自: https://www.javacodegeeks.com/2014/02/how-error-handling-works-in-spring-integration.html
错误处理在Spring Integration中如何工作相关推荐
- Http基本身份验证在Spring Security中如何工作?
在上一篇文章中,您学习了如何在基于Spring安全性的Java应用程序中启用Http基本身份验证 ,现在,我们将进一步进一步了解http基本身份验证在Spring安全性中的工作原理. 如果您还记得的话 ...
- Spring Integration Java DSL示例
现在已经为Spring Integration引入了新的基于Java的DSL ,这使得可以使用基于纯Java的配置而不是基于Spring XML的配置来定义Spring Integration消息流. ...
- #翻译NO.4# --- Spring Integration Framework
为什么80%的码农都做不了架构师?>>> Part III. Core Messaging This section covers all aspects of the cor ...
- Spring Integration学习资料
Spring Integration学习资料 1.1 背景 Spring框架的一个重要主题是控制反转.从广义上来说,Spring处理其上下文中管理的组件的职责.只要组件减轻了职责,它们同时也被 ...
- 使用Spring Integration进行消息处理
Spring Integration提供了Spring框架的扩展,以支持著名的企业集成模式. 它在基于Spring的应用程序中启用轻量级消息传递,并支持与外部系统的集成. Spring Integra ...
- Spring Integration 实例讲解
Spring Integration 简介 组件介绍 实例演示 买饮料 jms做一个简单的示例. 实例三集成JDBC 简介 最近学习到的工具,资料很少,但还是要记录下自己目前的理解,官方的说发就不说了 ...
- Spring Integration 快速入门教程
本文通过小的实际示例介绍Spring Integration(SI)的核心概念.Spring Integration提供了许多功能强大的组件,这些组件可以极大地增强企业架构内系统和流程的互连互通. 它 ...
- Spring Integration入门
为什么使用Spring IntegrationSpring Integration是Spring框架创建的又一个API,面向企业应用集成(EAI).说到集成,并不缺"解决办法":硬 ...
- Spring Integration sftp 技术专栏
Spring Integration Sftp 文件传送 目前在国内项目开发中,使用Spring Integration技术的比较少,尤其是中文的参考文献和项目案例,更是罕有.鉴于此,本文详细介绍sp ...
最新文章
- 复盘一次服务安装失败问题
- Android通过Chrome Inspect调试WebView
- 哪些情况是友情链接作弊?总结了11种方法!
- Spring-JdbcTemplate(注入到spring容器)-01
- 大数据数据量估算_如何估算数据科学项目的数据收集成本
- 对永磁无刷电机的调速过程
- jquery 如何动态添加、删除class样式方法介绍_jquery_脚本之家
- 如何实现消息功能_如何实现微信小程序的轮盘抽奖功能
- abap判断包含字符当中包含小数点_剑指Offer整理3 -- 栈和队列 + 数学和字符串
- 和java_那些和Java的点滴
- xss测试工具xsstrike(基于python3)
- x的x分之一次方极限x趋于0_x分之e的x次方减一的极限
- 解决google浏览器自动填充密码问题
- Python中struct.pack()和struct.unpack()用法详细说明
- 蓝桥杯试题算法训练之删除数组零元素——Python满分解答
- un-app部署h5项目到普通云服务器--域名解析--OOS对象存储
- 接口技术七段数码管c语言,031 实例7-七段数码管绘制
- 2018-NIPS-论文网址
- STC11F04E——电子工艺实习
- 【Arduino+ESP32专题】案例:简单的实现NTC热敏电阻检测板卡温度
热门文章
- 2的负x次幂图像_数学| NO.2,3 函数 T15
- java本地监听zk服务器节点【动态上下线】
- java语言发展历史_Java编程语言的历史和未来
- java系统架构师有的特质_Java中特质模式的定义
- JavaFX的科幻用户界面第1部分
- java实现数据库内容修改_数据库更改到Java环境中实现可持续和平
- Java代码样式运算符换行格式
- Neo4j:Cypher – Neo.ClientError.Statement.TypeError:不知道如何添加Double和String
- 在EL表达式中引用ADF Faces组件
- JavaParser生成,分析和修改Java代码