RabbitMQ 入门系列(6)— 如何保证 RabbitMQ 消息不丢失
1. 消息丢失源头
RabbitMQ
消息丢失的源头主要有以下三个:
- 生产者丢失消息
RabbitMQ
丢失消息- 消费者丢失消息
下面主要从 3 个方面进行说明并提供应对措施
2. 生产者丢失消息
RabbitMQ
生产者将数据发送到 rabbitmq
的时候,可能数据在网络传输中搞丢了,这个时候 RabbitMQ
收不到消息,消息就丢了。
解决方法:
2.1 事务方式
在生产者发送消息之前,通过 channel.txSelect
开启一个事务,接着发送消息,
- 如果消息没有成功被
RabbitMQ
接收到,生产者会收到异常,此时就可以进行事务回滚channel.txRollback
然后重新发送; - 假如
RabbitMQ
收到了这个消息,就可以提交事务channel.txCommit
;
但是这样一来,生产者的吞吐量和性能都会降低很多,现在一般不这么干。
2.2 confirm 机制
confirm
机制是在生产者设置的,就是每次写消息的时候会分配一个唯一的 id
,然后 RabbitMQ
收到之后会回传一个 ack
,告诉生产者这个消息 ok
了。如果 rabbitmq
没有处理到这个消息,那么就回调一个 nack
的接口,这个时候生产者就可以重发。
事务机制和 confirm
机制最大的不同点:
- 事务机制是同步的,提交一个事务之后会阻塞在那儿,它性能太差,官方主动弃用;
- 但是
confirm
机制是异步的,发送一个消息之后就可以发送下一个消息,然后那个消息rabbitmq
接收了之后会异步回调你一个接口通知你这个消息接收到了;
所以一般在生产者这块避免数据丢失,都是用 confirm
机制的。
在 confirm
机制下,我们可以将 channel
设置成 confirm
模式,一旦 channel
进入 confirm
模式,所有在该 channel
上面发布的消息都将会被指派一个唯一的 ID
(从 1 开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ
就会发送一个确认给生产者(包含消息的唯一 ID
),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,RabbitMQ
回传给生产者的确认消息中 delivery-tag
域包含了确认消息的序列号,此外 RabbitMQ
也可以设置 basic.ack
的 multiple
域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm
模式最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ
因为自身内部错误导致消息丢失,就会发送一条 nack
消息,生产者应用程序同样可以在回调方法中处理该 nack
消息。
confirm
机制和 transaction
事务模式是不能够共存的,已经处于 transaction
事务模式的 channel
不能被设置为 confirm
模式,同理,反过来也一样。
通常我们可以通过调用 channel
的 confirmSelect
方法将 channel
设置为 confirm
模式。如果没有设置 no-wait
标志的话,RabbitMQ
会返回 confirm.select-ok
表示同意生产者当前 channel
信道设置为 confirm
模式。
客户端生产者侧:生产者将消息发送到 RabbitMQ
然后写入到磁盘后通知生成者已收到生产者消息,保证生产者发送的消息不会丢失。
支持两种通知方式:
- 同步方式,即每发一条消息生成者等待
RabbitMQ
确认后再继续发送消息; - 异步方式,即生产者提供回调函数入口,生产者发送完消息后不等待
RabbitMQ
回应继续发送消息,RabbitMQ
会回调通知生产者是否收到消息,一般实际生产环境用此方式比较多。
import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConfirmListener;import com.rabbitmq.client.Connection;public class ConfirmSend {private static String exchange_name = "";private static String queue_name = "tx_queue";/*** confirm机制:确认publisher发送消息到broker,由broker进行应答(不能确认是否被有效消费)* confirmSelect,进入confirm消息确认模式* ,确认方式:1、异步ConfirmListener;2、同步waitForConfirms* ConfirmListener、waitForConfirms均需要配合confirm机制使用* @param mes* @throws Exception*/public static void txSend(Serializable mes) throws Exception {Connection conn = MqManager.newConnection();Channel channel = conn.createChannel();// 开启confirm机制channel.confirmSelect();channel.queueDeclare(queue_name, false, false, true, null);// 异步实现发送消息的确认(此部分的消息确认是指发送消息到队列,并非确认消息的有效消费)channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {// multiple:测试发现multiple随机true或false,原因未知System.out.println("Nack deliveryTag:" + deliveryTag + ",multiple:" + multiple);}@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("Ack deliveryTag:" + deliveryTag + ",multiple:" + multiple);}});for (int i = 0; i < 10; i++) {System.out.println("---------消息发送-----");channel.basicPublish(exchange_name, queue_name, null, SerializationUtils.serialize(mes.toString() + i));}// channel.waitForConfirms();//同步实现发送消息的确认System.out.println("-----------");channel.close();conn.close();}public static void main(String[] args) throws Exception {txSend("hello world!");}}
3. RabbitMQ 丢失消息
RabbitMQ
集群也会弄丢消息,就是说在消息发送到 RabbitMQ
之后,默认是没有保存到磁盘的,万一 RabbitMQ
宕机了,这个时候消息就丢失了。
所以为了解决这个问题,RabbitMQ
提供了一个持久化的机制,消息写入之后会持久化到磁盘,哪怕是宕机了,恢复之后也会自动恢复之前存储的数据,这样的机制可以确保消息不会丢失。
设置持久化步骤:
- 创建
queue
的时候将其设置为持久化的,这样就可以保证rabbitmq
持久化queue
的元数据,但是不会持久化queue
里的数据; - 发送消息的时候将消息的
deliveryMode
设置为 2,就是将消息设置为持久化的,此时rabbitmq
就会将消息持久化到磁盘上去。
但是这样一来可能会有人说:万一消息发送到 RabbitMQ
之后,还没来得及持久化到磁盘就挂掉了,数据也丢失了。
对于这个问题,其实是配合上面的 confirm
机制一起来保证的,就是在消息持久化到磁盘之后才会给生产者发送 ack
消息。
RabbitMQ
主要是采用持久化的方式保证消息不丢,启用队列、交换机、消息的持久化,确保不会因为 RabbitMQ
服务器的宕机导致消息丢失。
channel.basicPublish(exchange_name, "routingKey",true, MessageProperties.PERSISTENT_BASIC, "xiao ming".getBytes());
MessageProperties.PERSISTENT_BASIC 即可表示消息是要进行持久化的。
消息持久化成功的条件:
- 投递消息的时候
durable
设置为true
,消息持久化,代码:
channel.queueDeclare(x, true, false, false, null)
参数 2 设置为 true
持久化;
- 设置投递模式
deliveryMode
设置为 2(持久),代码:
channel.basicPublish(x, x, MessageProperties.PERSISTENTTEXTPLAIN,x)
参数 3 设置为存储纯文本到磁盘;
- 消息已经到达持久化交换器上;
- 消息已经到达持久化的队列;
四个条件都需要满足。
持久化工作原理:
RabbitMQ
会将你的持久化消息写入磁盘上的持久化日志文件,等消息被消费之后,RabbitMQ
会把这条消息标识为等待垃圾回收。
持久化的缺点:
消息持久化的优点显而易见,但缺点也很明显,那就是性能,因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量,尽管使用 SSD
硬盘可以使事情得到缓解,但他仍然吸干了 RabbitMQ
的性能,当消息成千上万条要写入磁盘的时候,性能是很低的。
4. 消费者丢失消息
RabbitMQ
消费者在消费消息的时候,刚拿到消息,结果进程挂了,这个时候 RabbitMQ
就会认为你已经消费成功了,这条数据就丢了。
RabbitMQ
提供了一个消息确认的概念:当一个消息从队列中投递给消费者后,消费者会通知一下消息中间件(RabbitMQ
),这个可以是系统自动 autoACK
的也可以由处理消息的应用操作。
当“消息确认”被启用的时候,RabbitMQ
不会完全将消息从队列中删除,直到它收到来自消费者的确认回执(acknowledgement)。
为了解决这个问题,RabbitMQ
提供了 2 种处理模式来解决这个问题:
- 自动确认模式:当
RabbbitMQ
将消息发送给应用后,消费者端自动回送一个确认消息。(使用 方法:basic.deliver
或basic.get-ok
)。 - 显式确认模式:
RabbitMQ
不会完全将消息从队列中删除,直到消费者发送一个确认回执(acknowledgement)后再删除消息。(使用方法:basic.ack
)。
在显式确认模式下,消费者可以自由选择什么时候发送确认回执(acknowledgement)。消费者可以在收到消息后立即发送,或将未处理的消息存储后发送,或等到消息被处理完毕后再发送确认回执。
如果一个消费者在尚未发送确认回执的情况下挂掉了,那 RabbitMQ
会将消息重新投递给另一个消费者。如果当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,然后再次尝试投递。
消费者在获取队列消息时,可以指定 autoAck
参数,采用显式确认模式,需要指定 autoAck = false
,在显式确认模式,RabbitMQ
不会为未 ack
的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。如果断开连接,RabbitMQ
也没有收到 ACK
,则 RabbitMQ
会安排该消息重新进入队列,等待投递给下一个消费者。
但是默认情况下这个发送 ack
的操作是自动提交的,也就是说消费者一收到这个消息就会自动返回 ack
给 RabbitMQ
,所以会出现丢消息的问题。
所以针对这个问题的解决方案就是:关闭 RabbitMQ
消费者的自动提交 ack
,在消费者处理完这条消息之后再手动提交 ack
。
import java.io.IOException;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;public class ConsumerTest {private static String queue_name = "tx_queue";/*** @param args*/public static void main(String[] args) {Connection conn;try {conn = MqManager.newConnection();Channel channel = conn.createChannel();// 消费消息boolean autoAck = false;channel.basicConsume(queue_name, autoAck, "myConsumer Tag", new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {String routingKey = envelope.getRoutingKey();String convernType = properties.getContentType();long deliveryTag = envelope.getDeliveryTag();System.out.println("routingKey:"+routingKey+",convernType:"+convernType+",deliveryTag:"+deliveryTag+",Msg body:"+new String(body));channel.basicAck(deliveryTag, false);}});}catch (Exception e) {e.printStackTrace();}}}
5. 业务逻辑
在该流程中,一个分布式事务由 A 和 B 两个服务共同完成,在 A 和 B 都执行成功时,分布式事务结果不会出现以外,但是如果该流程中某一个步骤出现问题,很可能就会导致 AB 的数据不一致的问题。接下来,我们仔细分析一下该流程中的问题所在。
在 A 服务中,由于是本地事务控制,可以保证 a、b 操作的原子性。这里要特别说明的是 b 操作所涉及到的内容:
- 服务 A 向 MQ 服务发送一个消息;
- MQ 持久化保存消息;
- MQ 向服务 A 发送 ack 确认。
待这一系列操作完成后,A 认为所有操作完成,提交事务保存。
在 B 服务中,由于是本地事务控制,可以保证 c、d 操作的原子性。这里要特别说明的是 c 操作所涉及到的内容:
- MQ 向 B 服务发送消息(B 接收消息);
- B 向 MQ 发送 ack 确认消息(该步骤也可以是在 d 操作完成后返回给 MQ)。
本章节部分参考
https://gitbook.cn/books/5d65124b2b27dd24ed390665/index.html
https://gitbook.cn/books/5f30bfb0be80f0592e70f206/index.html
RabbitMQ 入门系列(6)— 如何保证 RabbitMQ 消息不丢失相关推荐
- RabbitMQ 入门系列(10)— RabbitMQ 消息持久化、不丢失消息
消息要保持"持久化",即不丢失,必须要使得消息.交换器.队列,必须全部 "持久化". 1. 生产者怎么确认 RabbitMQ 已经收到了消息? # 打开通道的确 ...
- RabbitMQ 入门系列(4)— RabbitMQ 启动、停止节点和应用程序、用户管理、权限配置
1. 服务器管理 我们使用 "节点" 来指代 RabbitMQ 实例,当我们谈到 RabbitMQ 节点时指的是 RabbitMQ 应用程序和其所在的 Erlang 节点. 1.1 ...
- RabbitMQ 入门系列(11)— RabbitMQ 常用的工作模式(simple模式、work模式、publish/subscribe模式、routing模式、topic模式)
1. simple 模式 simple 模式是最简单最常用的模式 2. work 模式 work 模式有多个消费者 消息产生者将消息放入队列.生产者系统不需知道哪一个任务执行系统在空闲,直接将任务扔到 ...
- RabbitMQ 入门系列(5)— RabbitMQ 使用场景优缺点
1. 为什么要使用 MQ 1.1 异步 怎么理解异步这个概念呢? 举个简单的例子,假设有个业务需要写数据库,但是呢写数据库需要一定的网络开销,可能是几毫秒或者几十毫秒,这对于延时要求很高的业务来说是不 ...
- 【原创】OllyDBG 入门系列(五)-消息断点及 RUN 跟踪
标 题: [原创]OllyDBG 入门系列(五)-消息断点及 RUN 跟踪 作 者: CCDebuger 时 间: 2006-02-19,16:02:46 链 接: http://bbs.pediy. ...
- OllyDBG 入门系列(五)-消息断点及 RUN 跟踪
标 题: [原创]OllyDBG 入门系列(五)-消息断点及 RUN 跟踪 作 者: CCDebuger 时 间: 2006-02-19,16:02:46 链 接: http://bbs.pediy. ...
- rocketmq怎么保证消息一致性_从入门到入土(三)RocketMQ 怎么保证的消息不丢失?...
精彩推荐 一百期Java面试题汇总SpringBoot内容聚合IntelliJ IDEA内容聚合Mybatis内容聚合 接上一篇:RocketMQ入门到入土(二)事务消息&顺序消息 面试官常常 ...
- 从入门到入土(三)RocketMQ 怎么保证的消息不丢失?
精彩推荐 一百期Java面试题汇总 SpringBoot内容聚合 IntelliJ IDEA内容聚合 Mybatis内容聚合 接上一篇:RocketMQ入门到入土(二)事务消息&顺序消息 面试 ...
- RabbitMQ 入门系列(3)— 生产者消费者 Python 代码实现
生产者消费者代码示例 上一章节中对消息通信概念做了详细的说明,本章节我们对 RabbitMQ 生产者和消费者代码分别做一示例说明. 1. 生产者代码 #!/usr/bin/env python # c ...
最新文章
- ITS智能交通监控系统技术解析
- 13位PM告诉你:「陌生人社交」如何逃离互加微信“魔咒”?
- java地图瓦片_百度地图瓦片层级范围对照表
- mysql同时满足升序和降序_MySQL性能优化(三):索引
- (一)html5中的新增元素和废除元素
- Core Audio音频基础概述
- 如何测试GPS的RAIM功能-->如何使用GSS7000测试RAIM
- Windows7 关闭UAC_频繁提示的权限放行窗口
- Excel进行数据分析的常用知识的学习整理
- [经验教程]谷歌浏览器google chrome如何设置默认百度搜索引擎?
- html文件中top什么意思,margin-top在html中的意思是什么
- vmware虚拟机linux重置密码
- Excel编程环境搭建
- Hackintosh Dell vostro 5460 alpha v1.0 版本
- Python自定义豆瓣电影种类,排行,点评的爬取与存储(高阶上)
- 【RuoYi框架】RuoYi框架学习超简单案例 - 新闻管理系统(附源码)
- 查询日历的综合性C语言程序的设计,C语言程序设计实验指导书060522.doc
- LeetCode 197. 上升的温度
- MySQL知识点简述
- 每日优鲜Q2财报:净收入同比强劲增长41%,单季度收入创新高