1.引言

这篇文章将向您展示在使用JMS异步接收消息期间,使用者执行过程中的错误如何导致消息丢失。 然后,我将解释如何使用本地事务解决此问题。

您还将看到这种解决方案在某些情况下可能导致消息重复(例如,当它将消息保存到数据库中,然后侦听器执行失败时)。 发生这种情况的原因是因为JMS事务独立于其他事务资源(如DB)。 如果您的处理不是幂等的,或者您的应用程序不支持重复消息检测,那么您将不得不使用分布式事务。

分布式事务超出了此职位的范围。 如果您对处理分布式事务感兴趣,可以阅读这篇有趣的文章。

我已经实现了一个再现以下情况的测试应用程序:

  1. 发送和接收消息:使用者将处理收到的消息,并将其存储到数据库中。

    生产者将消息发送到队列:

    使用者从队列中检索消息并进行处理:

  2. 消息处理之前发生错误:使用者检索消息,但是在将消息存储到DB之前执行失败。

  3. 处理消息后发生错误:使用者检索消息,将其存储到DB,然后执行失败。

  • 该应用程序的源代码可以在github上找到。

2.测试应用

测试应用程序执行两个测试类TestNotTransactedMessagingTestTransactedMessaging 。 这些类都将执行上述三种情况。

让我们看看在没有事务的情况下执行应用程序时的配置。

app-config.xml

应用程序配置。 基本上,它会在指定的软件包内进行检查以自动检测应用Bean:生产者和使用者。 它还配置了将在其中存储处理后的通知的内存数据库。

<context:component-scan base-package="xpadro.spring.jms.producer, xpadro.spring.jms.receiver"/><bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"><constructor-arg ref="dataSource"/>
</bean><jdbc:embedded-database id="dataSource"><jdbc:script location="classpath:db/schema.sql" />
</jdbc:embedded-database>

notx-jms-config.xml

配置JMS基础结构,该基础结构是:

  • 经纪人联系
  • JmsTemplate
  • 将通知发送到的队列
  • 侦听器容器,它将发送通知给侦听器以处理它们
<!-- Infrastructure -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="vm://embedded?broker.persistent=false"/>
</bean><bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"><property name="targetConnectionFactory" ref="connectionFactory"/>
</bean><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="cachingConnectionFactory"/><property name="defaultDestination" ref="incomingQueue"/>
</bean><!-- Destinations -->
<bean id="incomingQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="incoming.queue"/>
</bean><!-- Listeners -->
<jms:listener-container connection-factory="connectionFactory"><jms:listener ref="notificationProcessor" destination="incoming.queue"/>
</jms:listener-container>

生产者仅使用jmsTemplate发送通知。

@Component("producer")
public class Producer {private static Logger logger = LoggerFactory.getLogger(Producer.class);@Autowiredprivate JmsTemplate jmsTemplate;public void convertAndSendMessage(String destination, Notification notification) {jmsTemplate.convertAndSend(destination, notification);logger.info("Sending notification | Id: "+notification.getId());}
}

侦听器负责从队列中检索通知,并将其存储到数据库中。

@Component("notificationProcessor")
public class NotificationProcessor implements MessageListener {private static Logger logger = LoggerFactory.getLogger(NotificationProcessor.class);@Autowiredprivate JdbcTemplate jdbcTemplate;@Overridepublic void onMessage(Message message) {try {Notification notification = (Notification) ((ObjectMessage) message).getObject();logger.info("Received notification | Id: "+notification.getId()+" | Redelivery: "+getDeliveryNumber(message));checkPreprocessException(notification);saveToBD(notification);checkPostprocessException(message, notification);} catch (JMSException e) {throw JmsUtils.convertJmsAccessException(e);}} ...
}

当id = 1的通知到达时, checkPreprocessException方法将抛出运行时异常。 这样,在将消息存储到数据库之前,我们将导致错误。

如果到达id = 2的通知, checkPostprocessException方法将引发异常,从而在将其存储到数据库之后立即引起错误。

getDeliveryNumber方法返回发送消息的次数。 这仅适用于事务,因为在侦听器处理失败导致回滚之后,代理将尝试重新发送消息。

最后, saveToDB方法非常明显。 它将通知存储到数据库。

您始终可以通过单击本文开头的链接来检查此应用程序的源代码。

3.测试没有交易的消息接收

我将启动两个测试类,一个不包含事务,另一个在本地事务中。 这两个类都扩展了一个基类,该基类加载了常见的应用程序上下文并包含一些实用程序方法:

@ContextConfiguration(locations = {"/xpadro/spring/jms/config/app-config.xml"})
@DirtiesContext
public class TestBaseMessaging {protected static final String QUEUE_INCOMING = "incoming.queue";protected static final String QUEUE_DLQ = "ActiveMQ.DLQ";@Autowiredprotected JdbcTemplate jdbcTemplate;@Autowiredprotected JmsTemplate jmsTemplate;@Autowiredprotected Producer producer;@Beforepublic void prepareTest() {jdbcTemplate.update("delete from Notifications");}protected int getSavedNotifications() {return jdbcTemplate.queryForObject("select count(*) from Notifications", Integer.class);}protected int getMessagesInQueue(String queueName) {return jmsTemplate.browse(queueName, new BrowserCallback<Integer>() {@Overridepublic Integer doInJms(Session session, QueueBrowser browser) throws JMSException {Enumeration<?> messages = browser.getEnumeration();int total = 0;while (messages.hasMoreElements()) {messages.nextElement();total++;}return total;}});}
}

实用方法说明如下:

  • getSavedNotifications :返回存储到数据库的通知数。 我使用了queryForObject方法,因为自版本3.2.2开始建议使用该方法。 queryForInt方法已被弃用。
  • getMessagesInQueue :允许您检查指定队列中哪些消息仍在等待处理。 对于此测试,我们有兴趣知道仍有多少通知等待处理。

现在,让我向您展示第一个测试的代码( TestNotTransactedMessaging )。 此测试启动本文开头指示的3种情况。

@Test
public void testCorrectMessage() throws InterruptedException {Notification notification = new Notification(0, "notification to deliver correctly");producer.convertAndSendMessage(QUEUE_INCOMING, notification);Thread.sleep(6000);printResults();assertEquals(1, getSavedNotifications());assertEquals(0, getMessagesInQueue(QUEUE_INCOMING));
}@Test
public void testFailedAfterReceiveMessage() throws InterruptedException {Notification notification = new Notification(1, "notification to fail after receiving");producer.convertAndSendMessage(QUEUE_INCOMING, notification);Thread.sleep(6000);printResults();assertEquals(0, getSavedNotifications());assertEquals(0, getMessagesInQueue(QUEUE_INCOMING));
}@Test
public void testFailedAfterProcessingMessage() throws InterruptedException {Notification notification = new Notification(2, "notification to fail after processing");producer.convertAndSendMessage(QUEUE_INCOMING, notification);Thread.sleep(6000);printResults();assertEquals(1, getSavedNotifications());assertEquals(0, getMessagesInQueue(QUEUE_INCOMING));
}private void printResults() {logger.info("Total items in \"incoming\" queue: "+getMessagesInQueue(QUEUE_INCOMING));logger.info("Total items in DB: "+getSavedNotifications());
}

4,执行测试

好的,让我们执行测试,看看结果是什么:

testCorrectMessage输出:

Producer|Sending notification | Id: 0
NotificationProcessor|Received notification | Id: 0 | Redelivery: 1
TestNotTransactedMessaging|Total items in "incoming" queue: 0
TestNotTransactedMessaging|Total items in DB: 1

此处没有问题,因为消息已正确接收并存储到数据库,所以队列为空。

testFailedAfterReceiveMessage输出:

Producer|Sending notification | Id: 1
NotificationProcessor|Received notification | Id: 1 | Redelivery: 1
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after receiving message
TestNotTransactedMessaging|Total items in "incoming" queue: 0
TestNotTransactedMessaging|Total items in DB: 0

由于它在事务外部执行,因此使用确认模式(默认为自动)。 这意味着一旦调用onMessage方法并因此将其从队列中删除,就认为该消息已成功传递。 因为侦听器在将消息存储到数据库之前失败,所以我们丢失了消息!

testFailedAfterProcessingMessage输出:

2013-08-22 18:39:09,906|Producer|Sending notification | Id: 2
2013-08-22 18:39:09,906|NotificationProcessor|Received notification | Id: 2 | Redelivery: 1
2013-08-22 18:39:09,906|AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after processing message
2013-08-22 18:39:15,921|TestNotTransactedMessaging|Total items in "incoming" queue: 0
2013-08-22 18:39:15,921|TestNotTransactedMessaging|Total items in DB: 1

在这种情况下,在执行失败之前,该消息已从队列(AUTO_ACKNOWLEDGE)中删除并存储到DB。

5,添加本地交易

通常,我们不允许像测试的第二种情况那样丢失消息,因此我们要做的是在本地事务中调用侦听器。 所做的更改非常简单,并不意味着从我们的应用程序中修改一行代码。 我们只需要更改配置文件。

为了测试这3种涉及事务的情况,我将以下配置文件notx-jms-config.xml替换为:

tx-jms-config.xml

首先,我添加了在发生回滚的情况下进行的重新传递的数量(由于侦听器执行中的错误导致):

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="vm://embedded?broker.persistent=false"/><property name="redeliveryPolicy"><bean class="org.apache.activemq.RedeliveryPolicy"><property name="maximumRedeliveries" value="4"/></bean></property>
</bean>

接下来,我指示侦听器将在事务内执行。 这可以通过修改侦听器容器定义来完成:

<jms:listener-container connection-factory="connectionFactory" acknowledge="transacted"><jms:listener ref="notificationProcessor" destination="incoming.queue"/>
</jms:listener-container>

这将导致在本地JMS事务中执行对侦听器的每次调用。 收到消息后,事务将开始。 如果侦听器执行失败,则消息接收将回滚。

这就是我们要做的一切。 让我们使用此配置启动测试。

6,测试交易中的消息接收

来自TestTransactedMessaging类的代码实际上与先前的测试相同。 唯一的区别是,它向DLQ(死信队列)添加了查询。 在事务内执行时,如果回退消息接收,则代理会将消息发送到此队列(在所有重新传递失败之后)。

我跳过了成功接收的输出,因为它不会带来任何新的变化。

testFailedAfterReceiveMessage输出:

Producer|Sending notification | Id: 1
NotificationProcessor|Received notification | Id: 1 | Redelivery: 1
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after receiving message
NotificationProcessor|Received notification | Id: 1 | Redelivery: 2
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
...
java.lang.RuntimeException: error after receiving message
NotificationProcessor|Received notification | Id: 1 | Redelivery: 5
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after receiving message
TestTransactedMessaging|Total items in "incoming" queue: 0
TestTransactedMessaging|Total items in "dead letter" queue: 1
TestTransactedMessaging|Total items in DB: 0

如您所见,第一次接收失败,并且代理尝试将其重新发送四次(如maximumRedeliveries属性中所示)。 由于情况持续存在,因此消息已发送到特殊DLQ队列。 这样,我们不会丢失消息。

testFailedAfterProcessingMessage输出:

Producer|Sending notification | Id: 2
NotificationProcessor|Received notification | Id: 2 | Redelivery: 1
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after processing message
NotificationProcessor|Received notification | Id: 2 | Redelivery: 2
TestTransactedMessaging|Total items in "incoming" queue: 0
TestTransactedMessaging|Total items in "dead letter" queue: 0
TestTransactedMessaging|Total items in DB: 2

在这种情况下,这是发生的情况:

  1. 侦听器检索到消息
  2. 它将消息存储到数据库
  3. 侦听器执行失败
  4. 代理重新发送消息。 由于情况已解决,因此侦听器将消息再次存储到DB。 该消息已重复。

7,结论

将本地事务添加到消息接收中可避免丢失消息。 我们必须考虑的是,可能会出现重复的消息,因此我们的侦听器将不得不检测到它,否则我们的处理必须是幂等的才能再次进行处理。 如果这不可能,我们将不得不进行分布式事务,因为它们支持涉及不同资源的事务。

参考: Spring JMS:在XavierPadró的Blog博客上处理来自JCG合作伙伴 Xavier Padro的事务内的消息 。

翻译自: https://www.javacodegeeks.com/2014/02/spring-jms-processing-messages-within-transactions.html

Spring JMS:处理事务中的消息相关推荐

  1. mysql事务管理及spring声明式事务中主动异常抛出使数据库回滚

    mysql事务管理及spring声明式事务中主动异常抛出使数据库回滚 参考文章: (1)mysql事务管理及spring声明式事务中主动异常抛出使数据库回滚 (2)https://www.cnblog ...

  2. spring jms 事务_Spring JMS:处理事务中的消息

    spring jms 事务 1.引言 这篇文章将向您展示使用JMS异步接收消息期间使用者执行过程中的错误如何导致消息丢失. 然后,我将解释如何使用本地事务解决此问题. 您还将看到这种解决方案在某些情况 ...

  3. Spring 如何在一个事务中开启另一个事务?

    Spring项目,需要在一个事务中开启另一个事务. 上面提到的情景可能不常见,但是还是会有的,一旦遇到,如果业务比较复杂,就会很麻烦,但是还是有解决的方案的,比如将一个service方法拆成两个方法, ...

  4. Spring声明式事务中属性解释

    本文内容节选自北京尚学堂张佳明老师培训视频 1. name="" 哪些方法需要有事务控制 支持*通配符 2. readonly="boolean" 是否是只读事 ...

  5. spring+mybatis 一个事务中两次查询结果不一样的问题

    最近搞了一波事情,把一个接口给重构了一番,感觉还不错,同时也遇到了一些问题,这个就是其中一个. 因为重构,我在这个接口上面加了一个事务,然后就发现之前的代码跑出来的结果就不一样了,两次一模一样的查询, ...

  6. Spring AOP在事务中的应用典范

    <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.sp ...

  7. spring boot注释_使用Spring Boot和注释支持配置Spring JMS应用程序

    spring boot注释 1.简介 在以前的文章中,我们学习了如何使用Spring JMS配置项目. 如果查看有关使用Spring JMS进行消息传递的文章介绍 ,您会注意到它是使用XML配置的. ...

  8. 使用Spring Boot和注释支持配置Spring JMS应用程序

    1.简介 在以前的文章中,我们学习了如何使用Spring JMS配置项目. 如果查看有关使用Spring JMS进行消息传递的文章介绍 ,您会注意到它是使用XML配置的. 本文将利用Spring 4. ...

  9. Spring JMS 整合 ActiveMQ

    1.使用maven管理依赖包 <dependencies><dependency><groupId>junit</groupId><artifac ...

最新文章

  1. devc++鼠标变成了光标_游戏鼠标到底能不能提升你的实力?
  2. Fluid 0.5 版本:开启数据集缓存在线弹性扩缩容之路
  3. CSS魔法堂:深入理解line-height和vertical-align
  4. Portal-Basic Java Web 应用开发框架:应用篇(八) —— 整合 Freemarker
  5. 高可用Redis服务架构分析与搭建
  6. 华为云数据库内核专家为您揭秘:GaussDB(for MySQL)并行查询有多快?
  7. 《http权威指南》读书笔记14
  8. 第三回 Bootstrap3.x 起步
  9. Unity Android 动态更新 Assembly-CSharp.dll
  10. 笔记-as/400的CL命令
  11. 朱丹超级搜索术笔记:百度搜索相关的技巧
  12. ubuntu装指定分区_安装Ubuntu16.04系统步骤详细加分区
  13. 斐波那契(黄金分割法)查找算法
  14. 用自己的域名配置动态域名解析(DDNS)
  15. Chrome添加扩展程序
  16. opencv中图像失焦检测
  17. 人工智能知识体系大全
  18. 用Socket实现点对点的文件传输
  19. VC有什么用?该如何学习VC?
  20. 【高级篇 / FortiGate-VM】(6.4) ❀ 04. 虚拟 PC 通过 FortiGate VM 上网 ❀ FortiGate 防火墙

热门文章

  1. oracle 10741 trace,RedHat5.3上安装Oracle 10.2.0.1
  2. delphi7存取配置文件与sqlserver数据库连接_SQL Server基础知识概念要点详细讲解
  3. python queue 生产者 消费者_【python】-- 队列(Queue)、生产者消费者模型
  4. python 虚拟环境_理解Python虚拟环境
  5. java动态代理和cglib动态代理
  6. read cache_通过READ-BEHIND CACHE控制您的慢速生产者
  7. 使用实例工厂方法实例化_一些工厂实例
  8. neo4j 嵌入式_在嵌入式Neo4j中使用Neo4j浏览器
  9. Quarkus的其他(非标准)CDI功能
  10. 使用Spring Boot和Project Reactor处理SQS消息