RabbitMQ,手动ACK情况下,消费消息的时候出现异常,如何手动ACK或NACK
目录
环境信息
问题背景
常用异常处理机制
自定义注解@MqConsumer
AOP拦截器RabbitInterceptor
消费方RabbitConsumer
异常处理类 RabbitListenerErrorHandlerImpl
说明
SpringBoot结合RabbitMQ异常处理,有多种方式:
1.AbstractRabbitListenerContainerFactory里的ErrorHandler
2.AbstractMessageListenerContainer里的ErrorHandler
3.@RabbitListener里的errorHandler(实际是RabbitListenerErrorHandler)
4.RabbitTemplate里的ErrorHandler
解决方案
环境信息
Spring Boot:2.0.8.RELEASE
Spring Cloud:2.0.4.RELEASE
RabbitMQ,用的是spring-boot-starter-amqp:2.0.8.RELEASE
问题背景
RabbitMQ,使用的是消息手动确认模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
在处理消息出现异常之后,根据情况手动进行ACK或者NACK处理。
常用异常处理机制
使用AOP拦截消息处理方法,统一进行日志的打印和异常的处理:
自定义注解@MqConsumer
可以标记在类上或者方法上
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MqConsumer {String value() default "";
}
AOP拦截器RabbitInterceptor
拦截器,拦截使用@MqConsumer的类或者方法。
在拦截器里打印了消息内容、耗时等,并根据情况手动ACK或者NACK
这里也可以进行异常下的重试或者存表等处理
package com.xxx.mq.interceptor;import com.rabbitmq.client.Channel;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;@Aspect
@Component
public class RabbitInterceptor {private static final Log log = LogFactory.getLog(RabbitInterceptor.class);@Value("${spring.rabbitmq.listener.simple.acknowledge-mode:auto}")private String acknowledgeMode;@Pointcut("@within(com.xxx.mq.support.MqConsumer) || @annotation(com.xxx.mq.support.MqConsumer)")public void consumerPointCut() {}@Around("consumerPointCut()")public Object consumerListenerAround(ProceedingJoinPoint joinPoint) throws Throwable {String className = joinPoint.getTarget().getClass().getSimpleName();String methodName = joinPoint.getSignature().getName();Object[] args = joinPoint.getArgs();Channel channel = null;Message amqpMessage = null;String correlationId = "";long deliveryTag = -1L;for (Object arg : args) {if (arg instanceof Message) {amqpMessage = Message.class.cast(arg);deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();correlationId = amqpMessage.getMessageProperties().getCorrelationId();} else if (arg instanceof org.springframework.messaging.Message<?>) {org.springframework.messaging.Message message = org.springframework.messaging.Message.class.cast(arg);deliveryTag = (long) message.getHeaders().get("amqp_deliveryTag");correlationId = (String) message.getHeaders().get("amqp_correlationId");} else if (arg instanceof Channel) {channel = (Channel) arg;}}if (log.isInfoEnabled()) {log.info("MQ_HDL > {}.{}(), parameters: {}", className, methodName, args);}long start = System.nanoTime();Object obj = null;if ("auto".equalsIgnoreCase(acknowledgeMode)) {obj = joinPoint.proceed(args);} else {if (channel == null) {throw new RuntimeException("手动确认消息,方法参数需要有Channel");}try {obj = joinPoint.proceed(args);// 手动签收channel.basicAck(deliveryTag, false);} catch (Exception e) {// 是否重新投递到队列channel.basicNack(deliveryTag, false, false);throw e;}}if (log.isInfoEnabled()) {log.info("MQ_HDL < [" + (System.nanoTime() - start) / 1000000 + "]ms");}return obj;}}
消费方RabbitConsumer
在消费方里,类上标记了@MqConsumer注解,在方法上配置了监听队列的名称,以及异常处理类rabbitListenerErrorHandlerImpl
@Component
@MqConsumer
public class RabbitConsumer {Log log = LogFactory.getLog(RabbitConsumer.class);/*** 这里按照实际情况配置,请求参数** @param msg* @param amqpMessage* @param channel*/@RabbitListener(queues = {"${tfb.rabbitmq.properties.configs[0].queues[0].name}"}, errorHandler = "rabbitListenerErrorHandlerImpl")public void receiveQueue1Msg(@Payload String msg, Message amqpMessage, Channel channel) {// 业务逻辑// 模拟异常// int i = 1/0; }}
异常处理类 RabbitListenerErrorHandlerImpl
这个类是实现了RabbitListenerErrorHandler,出现异常之后,handle方法的参数里可以获取得到amqp的Message,org.springframework.messaging.Message<?> message, ListenerExecutionFailedException exception,
从而根据具体场景进行处理,比如说异常日志的打印,重试或者存表处理
@Component
public class RabbitListenerErrorHandlerImpl extends ConditionalRejectingErrorHandler implements RabbitListenerErrorHandler {private static final Log log = LogFactory.getLog(RabbitListenerErrorHandlerImpl.class);private final FatalExceptionStrategy exceptionStrategy = new DefaultExceptionStrategy();@Overridepublic Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message, ListenerExecutionFailedException exception) throws Exception {log.error("Execution of Rabbit message listener failed. amqpMessag[{}]", amqpMessage, exception);// 目前这里拿不到channel,但是从spring-amqp 2.1.7开始,可以从message的header里获取到。有了channel就能手动nack或ackif (!this.causeChainContainsARADRE(exception) && this.exceptionStrategy.isFatal(exception)) {ThreadCacheUtil.cleanAllThreadCache();throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", exception);}ThreadCacheUtil.cleanAllThreadCache();return null;}
}
说明
AOP拦截器和消费方配置的异常处理类,有功能重复的地方,比如异常情况下的处理。可以根据实际情况选择其中一个,或者AOP里不处理异常,在异常处理类那边再统一处理。
不同点
AOP拦截器,需要在执行onMessage具体逻辑的时候才会拦截到。如果消息在进入onMessage具体逻辑之前就报错了,那么无法进入拦截器里的异常处理。
而消费方配置的异常处理类都可以处理得到异常,除非异常在AOP里被拦截并且没有抛出。
SpringBoot结合RabbitMQ异常处理,有多种方式:
1.AbstractRabbitListenerContainerFactory里的ErrorHandler
#setErrorHandler(ErrorHandler errorHandler)
这里的ErrorHandler是Spring框架异常处理接口,参数只有一个简单的Throwable t,因此无法获取到一些具体的内容,比如消息体等,也无法对消息进行持久化、手动ACK或NACK。
/*** @param errorHandler The error handler.* @see AbstractMessageListenerContainer#setErrorHandler(org.springframework.util.ErrorHandler)*/public void setErrorHandler(ErrorHandler errorHandler) {this.errorHandler = errorHandler;}
AbstractRabbitListenerContainerFactory是抽象类,是用于创建消息监听容器的,有两个实现类:
SimpleRabbitListenerContainerFactory
DirectRabbitListenerContainerFactory
2.AbstractMessageListenerContainer里的ErrorHandler
AbstractMessageListenerContainer是个抽象类,常见的实现类有以下两个:
SimpleMessageListenerContainer
DirectMessageListenerContainer
这里的ErrorHandler和AbstractRabbitListenerContainerFactory里的是一样的。
参数只有一个简单的Throwable t,因此无法获取到一些具体的内容,比如消息体等,也无法对消息进行持久化、手动ACK或NACK。
例子:
@Beanpublic ErrorHandler errorHandler() {// 自定义异常实现类return new MqErrorHandler();}@BeanSimpleMessageListenerContainer containerReset(ConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setDefaultRequeueRejected(false);container.setErrorHandler(errorHandler());container.setMessageConverter(jsonConverter());container.setQueueNames(getQueueAlert());container.setMessageListener(listenerAlertAdapter);return container;}
3.@RabbitListener里的errorHandler(实际是RabbitListenerErrorHandler)
@RabbitListener
/*** Set an {@link org.springframework.amqp.rabbit.listener.RabbitListenerErrorHandler}* to invoke if the listener method throws an exception.* @return the error handler.* @since 2.0*/String errorHandler() default "";
这里的errorHandler是org.springframework.amqp.rabbit.listener.RabbitListenerErrorHandler
/*** An error handler which is called when a {code @RabbitListener} method* throws an exception. This is invoked higher up the stack than the* listener container's error handler.** @author Gary Russell* @since 2.0**/
@FunctionalInterface
public interface RabbitListenerErrorHandler {/*** Handle the error. If an exception is not thrown, the return value is returned to* the sender using normal {@code replyTo/@SendTo} semantics.* @param amqpMessage the raw message received.* @param message the converted spring-messaging message.* @param exception the exception the listener threw, wrapped in a* {@link ListenerExecutionFailedException}.* @return the return value to be sent to the sender.* @throws Exception an exception which may be the original or different.*/Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message,ListenerExecutionFailedException exception) throws Exception;}
handleError(Message amqpMessage, org.springframework.messaging.Message<?> message,ListenerExecutionFailedException exception)
参数有原生的message,还有转换后的spring-messaging的message,还有包装后的ListenerExecutionFailedException异常信息,于是可以根据这些信息进行消息的处理,包括收到的消息内容是什么,异常信息是什么,以及在这里进行消息的持久化等操作。
如果需要对消息进行手动ACK或NACK,那么就需要获取到Channel才能进行,Channel是和MQ连接的通道,deliveryTag可以从message里获取到。
源码:
Channel接口里的basicAck和basicNack方法。
/*** Acknowledge one or several received* messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method* containing the received message being acknowledged.* @see com.rabbitmq.client.AMQP.Basic.Ack* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}* @param multiple true to acknowledge all messages up to and* including the supplied delivery tag; false to acknowledge just* the supplied delivery tag.* @throws java.io.IOException if an error is encountered*/void basicAck(long deliveryTag, boolean multiple) throws IOException;/*** Reject one or several received messages.** Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}* or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.* @see com.rabbitmq.client.AMQP.Basic.Nack* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}* @param multiple true to reject all messages up to and including* the supplied delivery tag; false to reject just the supplied* delivery tag.* @param requeue true if the rejected message(s) should be requeued rather* than discarded/dead-lettered* @throws java.io.IOException if an error is encountered*/void basicNack(long deliveryTag, boolean multiple, boolean requeue)throws IOException;
这里需要注意,spring-amqp版本在2.1.7以前,这里的org.springframework.messaging.Message<?> message里无法获取到Channel,因此无法进行手动ACK、NACK处理,详见Stack Overflow和官方github里的升级记录:
spring boot - How to requeue or reject in RabbitListenerErrorHandler on MANUAL ack mode? - Stack Overflow
Add AmqpHeaders.CHANNEL in error hander · garyrussell/spring-amqp@b314a5f (github.com)
坑的是,本次使用的环境信息里,spring-boot-starter-amqp:2.0.8.RELEASE里包含的spring-amqp版本是2.0.11.RELEASE,不支持!!
实例:
RabbitListenerErrorHandlerImpl.java
@Component
public class RabbitListenerErrorHandlerImpl extends ConditionalRejectingErrorHandler implements RabbitListenerErrorHandler {private static final Log log = LogFactory.getLog(RabbitListenerErrorHandlerImpl.class);private final FatalExceptionStrategy exceptionStrategy = new DefaultExceptionStrategy();@Overridepublic Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message, ListenerExecutionFailedException exception) throws Exception {log.error("Execution of Rabbit message listener failed. amqpMessag[{}]", amqpMessage, exception);// 这里可以根据异常的类型等进行精细的判断,决定是否需要ack,以及是否需要重新投递//if (!this.causeChainContainsARADRE(exception) && this.exceptionStrategy.isFatal(exception)) {//}message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class).basicReject(message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class), false);}
RabbitConsumer.java
@Component
@MqConsumer
@RabbitListener(queues = {"queueName}"}, errorHandler = "rabbitListenerErrorHandlerImpl")
public class RabbitConsumer {Log log = LogFactory.getLog(RabbitConsumer.class);@RabbitHandler(isDefault = true)public void receiveQueue3Msg(@Payload MqRequestDto msg, org.springframework.messaging.Message<?> message, Channel channel, @Headers Map headers) {log.info(msg);}@RabbitHandler()public void receiveQueue31Msg(@Payload String msg, org.springframework.messaging.Message<?> message, Channel channel, @Headers Map headers) {log.info(message.getPayload());}}
4.RabbitTemplate里的ErrorHandler
/*** When using a direct reply-to container for request/reply operations, set an error* handler to be invoked when a reply delivery fails (e.g. due to a late reply).* @param replyErrorHandler the reply error handler* @since 2.0.11* @see #setUseDirectReplyToContainer(boolean)*/public void setReplyErrorHandler(ErrorHandler replyErrorHandler) {this.replyErrorHandler = replyErrorHandler;}
解决方案
采用上方的@RabbitListener里的errorHandler(实际是RabbitListenerErrorHandler)的方式来处理,并把spring-amqp的版本提升到2.1.7以上
之所以不采用AOP方式,是因为上面提到的异同点:
AOP拦截器,需要在执行onMessage具体逻辑的时候才会拦截到。如果消息在进入@RabbitListener的具体逻辑之前就报错了,那么无法进入拦截器里的异常处理。
而消费方配置的异常处理类都可以处理得到异常,除非异常在AOP里被拦截并且没有抛出。
比如:如果消费方配置错了,导致消息无法进入@RabbitListener的具体处理逻辑,不能成功消费或者拒绝,一直停留在Unacked状态
错误示例:
@RabbitListener(queues = {"queueName"}, errorHandler = "rabbitListenerErrorHandlerImpl")// 参数里有个MessageProperties,导致消息转换异常,无法进入方法里public void receiveQueue2Msg(String msg, MessageProperties messageProperties, Channel channel, @Headers Map headers) {log.info(msg);// 业务逻辑}
收到消息之后,抛出异常:
Caused by: org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.eternalinfo.framework.mq.consumer.RabbitConsumer.receiveQueue2Msg(java.lang.String,org.springframework.amqp.core.MessageProperties,com.rabbitmq.client.Channel,java.util.Map)]
Bean [com.eternalinfo.framework.mq.consumer.RabbitConsumer@c088be]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:191)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:126)
... 9 common frames omittedCaused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.springframework.amqp.core.MessageProperties] for GenericMessage
RabbitMQ,手动ACK情况下,消费消息的时候出现异常,如何手动ACK或NACK相关推荐
- RabbitMQ(四) Work模式下的消息产生以及消费代码实现示例
在工作队列中,我们有多个消息的消费者,每个消费者都会进行消息消费,在默认情况下,RabbitMQ会进行消息轮询发送给每一个消费者,因此每个消费者处理的消息数量是一致的.下面直接看我们的主要文件代码 一 ...
- java rabbitmq 并发_RabbitMQ消息中间件 高级篇二 高并发情况下保障消息投递可靠性...
RabbitMQ消息中间件技术精讲9 高级篇二 高并发场景下,消息的延迟投递做二次确认进行回调检查来保障生产者消息投递成功的可靠性 在上一篇文章中,我们介绍了BAT大厂中一种方式保障生成者消息投递可靠 ...
- #rabbitMQ #重复消费 #可靠投递 #延时投递 #rabbitMQ交换机类型#重复消费#消息积压#消息丢失
exchange类型: 1, direct 指定direct后, 消息会根据你设置的routeing key(路由键), 发送到对应的队列中 1,新建direct交换机 2,添加队列, 并且绑定路由键 ...
- rabbitmq 手动提交_RabbitMQ系列(四)RabbitMQ事务和Confirm发送方消息确认——深入解读 - 王磊的博客 - 博客园...
RabbitMQ事务和Confirm发送方消息确认--深入解读 RabbitMQ系列文章 引言 根据前面的知识( 深入了解RabbitMQ工作原理及简单使用 . Rabbit的几种工作模式介绍与实践 ...
- 消息中间件 RabbitMQ 之 工作队列(2)—消息应答
3.2 消息应答 3.2.1 概念 消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并且只完成了一部分,如果它突然挂掉了,会发生什么情况? RabbitMQ一旦向消费者传递了一条 ...
- 消费消息删除_【进阶之路】可靠消息最终一致性解决方案
导言 大家好,我是南橘,从接触java到现在也有差不多两年时间了,两年时间,从一名连java有几种数据结构都不懂超级小白,到现在懂了一点点的进阶小白,学到了不少的东西.知识越分享越值钱,我这段时间总结 ...
- RabbitMQ事务和Confirm发送方消息确认——深入解读
引言 根据前面的知识(深入了解RabbitMQ工作原理及简单使用.Rabbit的几种工作模式介绍与实践)我们知道,如果要保证消息的可靠性,需要对消息进行持久化处理,然而消息持久化除了需要代码的设置之外 ...
- RocketMQ消费失败如何处理?如何保证消费消息的幂等性?
文章目录 1. 消息消费失败如何处理? 2. 如何保证消费消息的幂等性? 1. 消息消费失败如何处理? 当消费者从Broker获取到消息后会进行消费,并返回消费状态.如下代码所示 //broker推消 ...
- 宿舍限电情况下的台式机装机指南、使用对策
背景 笔者也研究生开学了,去了学校才发现宿舍限电,只有我一台台式机在用电的情况下也有可能跳闸(比如进入 彩虹六号.或者跑一些神经网络),更别提和舍友同时使用电脑了.我的台式机装机的初衷是合理运用我的预 ...
- @transactional 接口_Spring事物(@transactional注解)在什么情况下会失效,为什么?...
一.@transactional 的使用 1.一般在service里加@Transactional注解,不建议在接口上添加 2.加了此注解后每个业务方法执行时,都会开启一个事务,不过都是按照相同的管理 ...
最新文章
- 为什么磁场强度大了呢?
- zookeeper3.4集群搭建
- [LeetCode] Binary Tree Postorder Traversal 二叉树的后序遍历
- 你最喜欢的一张美女图片?
- 请求分页系统中页面分配策略与页面置换策略的关系
- java例子:九九乘法表
- Oracle杀事务数据库崩溃,关于pl/sql dev窗口崩溃导致锁表
- cmd echo写入shell_渗透技巧——通过cmd上传文件的N种方法
- sql server 事务_如何使用显式SQL Server事务回滚
- 前端系列——jquery.i18n.properties前端国际化解决方案“填坑日记”
- wait()、notify()、notifyAll()原理用法详解sleep()与wait()区别
- 九、瞰景Smart3D Viewer浏览器
- 抽象代数的代码实现(1) 置换群
- python中的snip用法_Ubuntu系统中安装SNIP
- 记录一次解决后端接口设置cookie设置不上去经过,一级域名可以设置上去cookie,二级域名设置不上cookie
- 制作Win7多合一原版系统光盘镜像
- 在线点餐APP开发前景如何?
- iOS 网络传输数据安全以及常用的加密算法使用
- STC单片机简单控制直流电机正反转
- 帆软FineReport本地部署springboot