RabbitMQ系列【8】消息可靠性之ACK机制
有道无术,术尚可求,有术无道,止于术。
文章目录
- 前言
- 自动确认
- 1. 配置
- 2. 演示
- 手动确认
- 1. 配置
- 2. 代码
- 3. 测试
前言
在之前分析了对于生产者来说,可以使用消息发布确认及退回机制,保证消息被成功发送到MQ
中。
但对于消费者来说,消息传递过来,可能会丢失,也有可能接收到消息,但还未处理完,发生宕机或者异常,导致消息没有被成功消费。
为了保证消息在消费过程中的可靠性,RabbitMQ
引入消息确认机制(ACK(Acknowledge))
,消费者在接收到消息并且处理该消息之后,告诉RabbitMQ
它已经处理,RabbitMQ
再讲该消息删除。
消费端收到消息后的确认方式有三种:
自动确认:当消息一旦被消费者接收到,则自动确认收到,并将相应消息从
RabbitMQ
的消息缓存中移除手动确认:将消息分发给了消费者,并且只有当消费者处理完成了整个消息之后才会被认为消息传递成功了,然后才会将内存中的消息删除。
根据异常情况确认:根据侦听器检测是正常返回、还是抛出异常来确认
自动确认
其中自动确认是指,当消息一旦被消费者接收到,则自动确认收到,并将相应其从 RabbitMQ
的消息缓存中移除。
但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。实际开发时,不推荐使用这种方式。
1. 配置
添加配置,设置 acknowledge-mode
为none
,该配置项共有三种:
- none:自动确认
- manual:手动确认
- auto:根据异常情况确认
spring:rabbitmq:username: guestpassword: guesthost: localhostport: 5672# 消息监听器配置listener:# 消息监听容器类型,默认 simpletype: simplesimple:# 消息确认模式,none、manual和autoacknowledge-mode: none# 应用启动时是否启动容器,默认trueauto-startup: true# listener最小消费者数concurrency: 10# listener最大消费者数max-concurrency: 100# 一个消费者最多可处理的nack消息数量prefetch: 10# 被拒绝的消息是否重新入队,默认truedefault-requeue-rejected: true# 如果容器声明的队列不可用,是否失败;或如果在运行时删除一个或多个队列,是否停止容器,默认truemissing-queues-fatal: true# 空闲容器事件应多久发布一次idle-event-interval: 10# 重试配置retry:# 是否开启消费者重试,默认falseenabled: true# 第一次和第二次尝试发送消息的时间间隔,默认1000msinitial-interval: 1000ms# 最大重试次数,默认3max-attempts: 3# 最大重试间隔,默认10000msmax-interval: 10000ms# 应用于前一个重试间隔的乘数multiplier: 1# 重试是无状态还是有状态,默认truestateless: true
2. 演示
添加一个消费者,接收到消息后抛出异常,模拟没有正常消费:
@Component
public class RabbitConsumer {@RabbitListener(queues = {"bootQueue"})public void rabbitListener(Message message) {System.out.println("收到消息===" + message);// 发生异常int i=5/0;}
}
直接发送一条消息:
@SpringBootTest
public class MqTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testRabbitPub() {rabbitTemplate.convertAndSend("bootExchange", "boot.key", "HELLO SPRING BOOT");}
}
运行程序,可以看到发生了异常:
由于开启了重试机制,异常时,会进行重试消费:
查看控制台,发现消息没有被成功消费,但是 RabbitMQ
已经将该消息移除。
手动确认
手动确认
只有当消费正确消费掉之后,再手动告诉RabbitMQ
该消息已经被成功接收并消费,这时RabbitMQ
才会将消息从队列中删除掉。
1. 配置
设置acknowledge-mode
为manual
:
spring:rabbitmq:# 省略....# 消息监听器配置listener:# 消息监听容器类型,默认 simpletype: simplesimple:# 消息确认模式,none、manual和auto,默认autoacknowledge-mode: manual
2. 代码
如果消息成功处理,需要调用channel.basicAck()
方法进行签收:
void basicAck(long deliveryTag, boolean multiple) throws IOException {}
basicAck()
方法需要两个参数:
- deliveryTag(唯一标识 ID):当一个消费者向
RabbitMQ
注册后,会建立起一个Channel
,向消费者推送消息,这个方法携带了一个deliveryTag
, 它代表了RabbitMQ
向该Channel
投递的这条消息的唯一标识 ID,是一个单调递增的正整数,deliveryTag
的范围仅限于当前Channel
。 - multiple:为了减少网络流量,手动确认可以被批处理,当该参数为
true
时,则可以一次性确认deliveryTag
小于等于传入值的所有消息
如果消息处理失败,调用channel.basicNack()
方法拒绝签收:
public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {}
basicNack()
方法需要三个参数:
- deliveryTag:同
basicAck
- multiple:同
basicAck
- requeue:重回队列。如果设置为
true
,则消息重新回到queue
,服务端会重新发送该消息给消费端
消费者代码如下:
@Component
public class RabbitConsumer {@RabbitListener(queues = {"bootQueue"})public void receiveMessage(Message message, Channel channel) throws IOException {// 当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channellong deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("收到消息===" + new String(message.getBody()));System.out.println("处理业务逻辑");// 发生异常// int i = 5 / 0;// 处理完成后,确认channel.basicAck(deliveryTag, true);} catch (Exception e) {// 发生异常,拒绝签收e.printStackTrace();channel.basicNack(deliveryTag, true, true);}}
}
3. 测试
没有异常时,消息被成功消费:
打开异常代码注释,运行程序,此时控制台显示有一个消息未被确认状态:
并且程序一直在死循环接收=》拒绝签收=》返回队列=》接收=》。
死循环
问题是因为,在basicNack
方法中我们设置了重回队列,这样会有问题,一般需要设置为不重回到队列:
channel.basicNack(deliveryTag, true, false);
RabbitMQ系列【8】消息可靠性之ACK机制相关推荐
- RabbitMQ学习之消息可靠性及特性
转载自 https://blog.csdn.net/zhu_tianwei/article/details/53971296 下面主要从队列.消息发送.消息接收方面了解消息传递过的一些可靠性处理. ...
- (转)RabbitMQ学习之消息可靠性及特性
http://blog.csdn.net/zhu_tianwei/article/details/53971296 下面主要从队列.消息发送.消息接收方面了解消息传递过的一些可靠性处理. 1.队列 ...
- 生产环境中,RabbitMQ 持续积压消息不进行ack ,发生什么了?
问题:生产环境 rabbitmq 部分客户端 channel 持续积压消息不进行ack. 0. 服务配置 rabbitmq 集群(普通集群模式) 消费者 三台 消费线程各消费者 10 消费者配置 使用 ...
- RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列
搭建SpringBoot项目,用于演示 springboot版本 <!-- spring boot --><dependency><groupId>org.spri ...
- 用redis实现消息队列(实时消费+ack机制)【转】
用redis实现消息队列(实时消费+ack机制) java queue 消息队列 redis 消息队列 首先做简单的引入. MQ主要是用来: 解耦应用. 异步化消息 流量削峰填谷 目前使用的较多的有A ...
- 我的架构梦:(七十) 消息中间件之RabbitMQ的消息可靠性
一.案例 你用支付宝给商家支付,如果是个仔细的人,会考虑我转账的话,会不会把我的钱扣了,商家没有收到我的钱? 一般我们使用支付宝或微信转账支付的时候,都是扫码,支付,然后立刻得到结果,说你支付了多少钱 ...
- Kafka 生产者数据安全(ACK机制,ACK时机,ACK应答机制,故障处理,Exactly Once)
目录 生产者数据安全 一.数据分区 图解 分区原因 分区原则 二.数据可靠性保证 ACK机制 ACK时机 ACK应答机制 故障处理 Exactly Once 语义 生产者数据安全 一.数据分区 图解 ...
- 【消息队列之rabbitmq】Rabbitmq之消息可靠性投递和ACK机制实战
目录 一.绪论 二.生产者 2.1事务机制 2.2confirm模式 串行模式 批量模式 异步模式 三.消费者 3.1手动ACK 一.绪论 上篇文章介绍了rabbitmq的基本知识.交换机类型实战&l ...
- Storm编程入门API系列之Storm的可靠性的ACK消息确认机制
概念,见博客 Storm概念学习系列之storm的可靠性 什么业务场景需要storm可靠性的ACK确认机制? 答:想要保住数据不丢,或者保住数据总是被处理.即若没被处理的,得让我们知道. publi ...
最新文章
- 程序员的自我救赎---13.1:职场招聘与面试心得
- 人工智能尴尬的2019:需要钱却没钱可烧了
- 使用Cacti监测系统与网络性能(3)
- RHEL6基础四十三之RHEL文件共享②Samba简介
- 网络基准测试Netperf
- 跟我一起写 Makefile(八)
- HTML5制作斑马线表格,JavaScript实现的斑马线表格效果【隔行变色】
- 口罩告急,全民互助!“口罩互助”小程序重磅上线!
- kali linux 2019教程,[教程]KALI LINUX 2.0 2019 更新国内源
- 天猫广告业务独立运营
- 前端开发者常用的9个JavaScript图表库
- Spring 在xml文件中配置Bean
- 高速公路综合运行监测与管控平台(HOCC)
- python编写小程序、模拟实现自动按下键盘_Python 实现键盘鼠标按键模拟
- matplotlib中堆积图、分块图、气泡图的绘制
- 【labelme软件】使用指南
- 一次性搞懂css中的clamp函数,max函数,min函数,vmax,vmin
- flashpaper java_FlashPaper API 说明
- ValueError: `generator` yielded an element of shape (2,) where an element of shape (?, ?) was expect
- 自动化代码审查平台: 基于Docker Compose整合Jenkins + SonarQube