一、简介

在RabbitMQ消息传输过程中,由于网络、服务器本身等问题,有时候会发生消息丢失的现象,在生产环境中,我们应该尽可能保证消息不会丢失,当然不是说百分百保证。下面列举一些发生消息丢失的场景:

  • 【a】如果在声明交换机、队列、发送消息的时候没有指定消息持久化的话,那么在RabbitMQ服务器重启的情况下,所有未持久化的交换机、队列、消息都将会丢失;
  • 【b】如果生产者发送完消息之后,如果在发送到MQ服务器的过程中,发生了异常情况,那么消息将不能在正确到达MQ服务器,此时消息将会发生丢失;
  • 【c】如果发送到交换机的消息没有找到匹配的队列,此时队列中的消息将会发生丢失;
  • 【d】如果我们采用RabbitMQ自动确认机制的话,只要生产者发送完消息,就会认为消费者已经成功处理了,而实际情况是,在消费者消费过程可能发生异常情况,这时候如果自动确认,那么未被消费者消费的消息将会发生丢失;

以上总结的四点,都是会影响消息可靠性传输的一些因素,下面针对上面提到的四点分别总结一下如何提供消息可靠性。

二、方法总结

【a】如果在声明交换机、队列、发送消息的时候没有指定消息持久化的话,那么在RabbitMQ服务器重启的情况下,所有未持久化的交换机、队列、消息都将会丢失;

针对这种情况,我们可以对交换机、队列、已经在发送消息的时候,指定durable=true,将交换机、队列、消息都进行持久化,这样在MQ重启之后,队列中的消息依旧能够被消费者继续消费。

  • 交换机持久化:声明交换机的时候指定参数durable=true,实现交换机持久化。如果交换器不设置持久化,那么在RabbitMQ服务器重启之后,相关的交换器元数据会丢失,消息不会丢失,只是不能将消息发送到这个交换器中。
public TopicExchange(String name, boolean durable, boolean autoDelete) {super(name, durable, autoDelete);}public AbstractExchange(String name) {this(name, true, false);}public AbstractExchange(String name, boolean durable, boolean autoDelete) {this(name, durable, autoDelete, (Map)null);}
  • new TopicExchange(DURABLE_EXCHANGE_NAME,true,false)
  • 队列持久化:声明交换机的时候指定参数durable=true,实现队列持久化,队列的持久化不能保证内存存储的消息不会丢失。
       public Queue(String name) {this(name, true, false, false);}public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)//不指定durable的话默认好像也是true//public Queue(String name, boolean durable)//durable:是否将队列持久化 true表示需要持久化 false表示不需要持久化public Queue(String name, boolean durable) {this(name, durable, false, false, (Map)null);}
  • new Queue(DURABLE_QUEUE_NAME, true);
  • 消息持久化:
* 使用convertAndSend方式发送消息,消息默认就是持久化的,下面是源码:* new MessageProperties() --> DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT --> deliveryMode = 2;

只有实现了交换机、队列与消息的持久化,才能保证消息不会丢失。

【b】如果生产者发送完消息之后,如果在发送到MQ服务器的过程中,发生了异常情况,那么消息将不能在正确到达MQ服务器,此时消息将会发生丢失;

针对这种情况,我们可以使用生产者发送消息确认机制,每当生产者发送消息到MQ服务时,会监听一个confirmListener,触发下面的方法,生产者可以根据返回的状态进行后续操作。

/*** 如果消息没有到达交换机,则该方法中isSendSuccess = false,error为错误信息;* 如果消息正确到达交换机,则该方法中isSendSuccess = true;*/@Overridepublic void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {logger.info("confirm回调方法>>>回调消息ID为: " + correlationData.getId());if (isSendSuccess) {logger.info("confirm回调方法>>>消息发送到交换机成功!");} else {logger.info("confirm回调方法>>>消息发送到交换机失败!,原因 : [{}]", error);}}
  • 实现方法示例如下:实现ConfirmCallback接口,实现: public void confirm(CorrelationData correlationData, boolean isSendSuccess, String error)方法。
/*** @Description 自定义消息发送确认的回调* @Author weishihuai* @Date 2019/6/27 10:42* <p>* 实现接口:implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback* ConfirmCallback:只确认消息是否正确到达交换机中,不管是否到达交换机,该回调都会执行;* ReturnCallback:如果消息从交换机未正确到达队列中将会执行,正确到达则不执行;*/
@Component
public class CustomConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {private static final Logger logger = LoggerFactory.getLogger(CustomConfirmAndReturnCallback.class);@Autowiredprivate RabbitTemplate rabbitTemplate;/*** PostConstruct: 用于在依赖关系注入完成之后需要执行的方法上,以执行任何初始化.*/@PostConstructpublic void init() {//指定 ConfirmCallbackrabbitTemplate.setConfirmCallback(this);//指定 ReturnCallbackrabbitTemplate.setReturnCallback(this);}/*** 消息从交换机成功到达队列,则returnedMessage方法不会执行;* 消息从交换机未能成功到达队列,则returnedMessage方法会执行;*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {logger.info("returnedMessage回调方法>>>" + new String(message.getBody(), StandardCharsets.UTF_8) + ",replyCode:" + replyCode+ ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);}/*** 如果消息没有到达交换机,则该方法中isSendSuccess = false,error为错误信息;* 如果消息正确到达交换机,则该方法中isSendSuccess = true;*/@Overridepublic void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {logger.info("confirm回调方法>>>回调消息ID为: " + correlationData.getId());if (isSendSuccess) {logger.info("confirm回调方法>>>消息发送到交换机成功!");} else {logger.info("confirm回调方法>>>消息发送到交换机失败!,原因 : [{}]", error);}}}

当然除了这种生产者confirm确认机制外,还要一种事务的方式,通过channel.txSelect() / channel.txCommit() / channel.txRollback()来控制事务的提交和回滚操作。生产者确认机制相对于事务机制,最大的好处就是可以异步处理提高吞吐量,不需要额外等待消耗资源。实际生产环境推荐使用生产者确认机制。

【c】如果发送到交换机的消息没有找到匹配的队列,此时队列中的消息将会发生丢失;

针对这种未被正确路由的消息,主要有两种方法:

第一种方法:   可以通过监听RabbitTemplate.ReturnCallback返回回调spring.rabbitmq.publisher-returns=true,同时设置spring.rabbitmq.template.mandatory = true,这样的话就算消息没有匹配到合适的队列,RabbitMQ也会调用将消息返回给生产者,消息将不会丢失。

  • 实现方法示例:实现RabbitTemplate.ReturnCallback接口,实现
  • public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {}方法
  /*** 消息从交换机成功到达队列,则returnedMessage方法不会执行;* 消息从交换机未能成功到达队列,则returnedMessage方法会执行;*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {logger.info("returnedMessage回调方法>>>" + new String(message.getBody(), StandardCharsets.UTF_8) + ",replyCode:" + replyCode+ ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);}

.第二种方法:通过指定备份交换机实现:在声明交换机的时候添加alternate-exchange参数,将没有被路由的消息存储于RabbitMQ中。

 @Beanpublic Exchange exchange() {Map<String, Object> arguments = new HashMap<>(10);//声明备份交换机arguments.put("alternate-exchange", MESSAGE_BAK_EXCHANGE_NAME);return new DirectExchange(EXCHANGE_NAME, true, false, arguments);}
  • arguments.put("alternate-exchange", MESSAGE_BAK_EXCHANGE_NAME);

【d】如果我们采用RabbitMQ自动确认机制的话,只要生产者发送完消息,就会认为消费者已经成功处理了,而实际情况是,在消费者消费过程可能发生异常情况,这时候如果自动确认,那么未被消费者消费的消息将会发生丢失;

针对这种情况,我们可以采用消费者手动确认消息机制,即basicAck = false,只有rabbitmq接收到消费者发送的ack应答之后,才会发送下一条消息到该消费者,才会将消息从内存中移除。强烈推荐使用手动确认机制,因为使用手动确认有足够的时间处理消息,不需要担心消费者进程挂掉之后消息丢失问题。

RabbitMQ如果一直没收到消费者的ack消息应答确认,并且此时消费者连接已断开,那么MQ会重新将消息进入队列等待发送给下一个消费者进行消费。

  • 实现方式示例:
@Component
public class Consumer {private static final Logger logger = LoggerFactory.getLogger(Consumer.class);@RabbitListener(queues = "test_dlx_queue_name")public void receiveMessage(String receiveMessage, Message message, Channel channel) {try {logger.info("【Consumer】接收到消息:[{}]", receiveMessage);//这里模拟随机拒绝一些消息到死信队列中if (new Random().nextInt(10) < 5) {logger.info("【Consumer】拒绝一条信息:[{}],该消息将会被转发到死信交换器中", receiveMessage);channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);} else {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}} catch (Exception e) {logger.info("【Consumer】接消息后的处理发生异常", e);try {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e1) {logger.error("手动确认消息异常.", e1);}}}}
  • channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

三、总结

以上就是关于在RabbitMQ中,对如何提高消息传输可靠性进行了总结,当然并没有哪一种方式能够百分百保证消息可靠性传输,我们只能够尽可能的保证消息不会发生丢失。其实,总结的以上四种方法,在前面的博客中都分别进行了讲解,大家可能选择性的进行阅读总结,希望能对大家有所帮助。

RabbitMQ提升消息传输可靠性方法总结相关推荐

  1. RabbitMQ 可靠消息传输实战--云平台技术栈12

    导读:之前发布了云平台技术栈(ps:点击可查看),本文主要说一下其中的RabbitMQ! 作者:极客慧 https://my.oschina.net/jikeh/blog/2207127 可能是缓存架 ...

  2. rabbitmq丢消息的处理方法

    最近发现系统rabbitmq丢消息比较严重,于是想了些方案来查找原因,给将消息发送方式添加确认机制. 我们在本地模拟了wms发送打标消息的场景. 1. 有事务 2. 先发点对点队列, 再发订阅队列 3 ...

  3. 【RabbitMQ】消息的可靠性投递与签收

    消息的可靠性投递--Porducer Confirm 确认模式 发生在从Producer发送到Exchange时,发送成功/失败都会自动调用ConfirmCallBack回调方法. 步骤: (1)开启 ...

  4. RabbitMQ如何保证消息的可靠性

    在了解RabbitMQ消息可靠性之前,先来了解一下RabbitMQ整个消息投递的路径: producer --> exchange --> queue --> consumer Ra ...

  5. rabbitmq接收不到消息_分布式消息队列:如何保证消息的可靠性传输

    rabbitmq (1)生产者弄丢了数据 生产者将数据发送到rabbitmq的时候,可能数据就在半路给搞丢了,因为网络啥的问题,都有可能. 此时可以选择用rabbitmq提供的事务功能,就是生产者发送 ...

  6. 四种策略确保 RabbitMQ 消息发送可靠性!你用哪种?

    微服务可以设计成消息驱动的微服务,响应式系统也可以基于消息中间件来做,从这个角度来说,在互联网应用开发中,消息中间件真的是太重要了. 今天,以 RabbitMQ 为例,松哥来和大家聊一聊消息中间消息发 ...

  7. RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列

    搭建SpringBoot项目,用于演示 springboot版本 <!-- spring boot --><dependency><groupId>org.spri ...

  8. 消息队列面试 - 如何保证消息的可靠性传输?

    消息队列面试 - 如何保证消息的可靠性传输? 面试题 如何保证消息的可靠性传输?或者说,如何处理消息丢失的问题? 面试官心理分析 这个是肯定的,用 MQ 有个基本原则,就是数据不能多一条,也不能少一条 ...

  9. Rabbitmq专题:rabbitMQ如何保证消息的可靠性投递?如何防止消息丢失

    文章目录 1. 消息可能出现丢失的情况 2. 生产者如何保证消息的可靠性投递 2.1 消息落库打标 + confirm机制 2.2 消息幂等性如何保证? 2.3 延时消息确认 3. rabbitMQ服 ...

  10. 【消息队列之rabbitmq】Rabbitmq之消息可靠性投递和ACK机制实战

    目录 一.绪论 二.生产者 2.1事务机制 2.2confirm模式 串行模式 批量模式 异步模式 三.消费者 3.1手动ACK 一.绪论 上篇文章介绍了rabbitmq的基本知识.交换机类型实战&l ...

最新文章

  1. vscode wecode的配置_使用体验神似VS Code?三步带你了解华为云CloudIDE前世今生
  2. docker操作语句
  3. Codeforces Round #318 (Div. 2) B Bear and Three Musketeers (暴力)
  4. 基于PCA的人脸特征抽取
  5. HTML作业-潮流服装网页
  6. 名词解释 JDK JRE JVM
  7. Flutter通过MethodChannel实现Flutter 与Android iOS 的双向通信
  8. 深度学习:波士顿房价预测
  9. java中json获取key值_如何获取JsonObject中key的值
  10. 数据库表锁死的解决方法
  11. win10经常无法复制粘贴
  12. 自学python书籍_自学python看什么书
  13. python eel vue_张莽子—
  14. win 32学习笔记(三) 消息队列
  15. 帝国cms 留言反馈 问题
  16. 2021-12-11每日刷题打卡
  17. 结对-爬取大麦网演唱会信息-设计文档
  18. STC89C52RC软件IIC驱动
  19. GPS秒转北京时间(年月日时分秒)+ gps 周、周内秒转gps时间戳(单位秒) C++ 代码
  20. 获取百度开放平台Access Token,调用百度统计接口

热门文章

  1. java中对事件的监听事件,详谈Java中的事件监听机制
  2. 自动驾驶的Pipline -- 如何打造自动驾驶的数据闭环?(中)
  3. 算法:eight Queens 8皇后问题
  4. 谷歌浏览器禁止右滑返回历史_早报:拼多多加大百亿补贴力度;嫦娥五号择机返回地球;贾跃亭再成被执行人;微信订阅号页面改版...
  5. python笔记:统计字符串里各种字符的个数 + pandas删除某列
  6. 2021-09-03101. 对称二叉树
  7. 2021-08-3116. 最接近的三数之和 排序+双指针
  8. 180.连续出现的数字
  9. 【PRML 学习笔记】第二章 - 概率分布 (Probability Distributions)
  10. 【BZOJ 1257】余数之和【整除优化】