一条消费成功被消费经历了生产者->MQ->消费者,因此在这三个步骤中都有可能造成消息丢失。

一 消息生产者没有把消息成功发送到MQ

1.1 事务机制

AMQP协议提供了事务机制,在投递消息时开启事务支持,如果消息投递失败,则回滚事务。

自定义事务管理器

@Configuration
public class RabbitTranscation {@Beanpublic RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){return new RabbitTransactionManager(connectionFactory);}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){return new RabbitTemplate(connectionFactory);}
}

修改yml

spring:rabbitmq:# 消息在未被队列收到的情况下返回publisher-returns: true

开启事务支持

rabbitTemplate.setChannelTransacted(true);

消息未接收时调用ReturnCallback

rabbitTemplate.setMandatory(true);

生产者投递消息

@Service
public class ProviderTranscation implements RabbitTemplate.ReturnCallback {@AutowiredRabbitTemplate rabbitTemplate;@PostConstructpublic void init(){// 设置channel开启事务rabbitTemplate.setChannelTransacted(true);rabbitTemplate.setReturnCallback(this);}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("这条消息发送失败了"+message+",请处理");}@Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")public void publishMessage(String message) throws Exception {rabbitTemplate.setMandatory(true);rabbitTemplate.convertAndSend("javatrip",message);}
}

但是,很少有人这么干,因为这是同步操作,一条消息发送之后会使发送端阻塞,以等待RabbitMQ-Server的回应,之后才能继续发送下一条消息,生产者生产消息的吞吐量和性能都会大大降低。

1.2 发送方确认机制

发送消息时将信道设置为confirm模式,消息进入该信道后,都会被指派给一个唯一ID,一旦消息被投递到所匹配的队列后,RabbitMQ就会发送给生产者一个确认。

开启消息确认机制

spring:rabbitmq:# 消息在未被队列收到的情况下返回publisher-returns: true# 开启消息确认机制publisher-confirm-type: correlated

消息未接收时调用ReturnCallback

rabbitTemplate.setMandatory(true);

生产者投递消息

@Service
public class ConfirmProvider implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@AutowiredRabbitTemplate rabbitTemplate;@PostConstructpublic void init() {rabbitTemplate.setReturnCallback(this);rabbitTemplate.setConfirmCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(ack){System.out.println("确认了这条消息:"+correlationData);}else{System.out.println("确认失败了:"+correlationData+";出现异常:"+cause);}}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("这条消息发送失败了"+message+",请处理");}public void publisMessage(String message){rabbitTemplate.setMandatory(true);rabbitTemplate.convertAndSend("javatrip",message);}
}

如果消息确认失败后,我们可以进行消息补偿,也就是消息的重试机制。当未收到确认信息时进行消息的重新投递。设置如下配置即可完成。

spring:rabbitmq:# 支持消息发送失败后重返队列publisher-returns: true# 开启消息确认机制publisher-confirm-type: correlatedlistener:simple:retry:# 开启重试enabled: true# 最大重试次数max-attempts: 5# 重试时间间隔initial-interval: 3000

二 消息发送到MQ后,MQ宕机导致内存中的消息丢失

消息在MQ中有可能发生丢失,这时候我们就需要将队列和消息都进行持久化。

@Queue注解为我们提供了队列相关的一些属性,具体如下:

  1. name: 队列的名称;

  2. durable: 是否持久化;

  3. exclusive: 是否独享、排外的;

  4. autoDelete: 是否自动删除;

  5. arguments:队列的其他属性参数,有如下可选项,可参看图2的arguments:

  • x-message-ttl:消息的过期时间,单位:毫秒;

  • x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;

  • x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;

  • x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;

  • x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;

  • x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;

  • x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值

  • x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)

  • x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;

  • x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;

  • x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。

持久化队列

创建队列的时候将持久化属性durable设置为true,同时要将autoDelete设置为false

@Queue(value = "javatrip",durable = "true",autoDelete = "false")

持久化消息

发送消息的时候将消息的deliveryMode设置为2,在Spring Boot中消息默认就是持久化的。

三 消费者消费消息的时候,未消费完毕就出现了异常

消费者刚消费了消息,还没有处理业务,结果发生异常。这时候就需要关闭自动确认,改为手动确认消息。

修改yml为手动签收模式

spring:rabbitmq:listener:simple:# 手动签收模式acknowledge-mode: manual# 每次签收一条消息prefetch: 1

消费者手动签收

@Component
@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))
public class Consumer {@RabbitHandlerpublic void receive(String message, @Headers Map<String,Object> headers, Channel channel) throws Exception{System.out.println(message);// 唯一的消息IDLong deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);// 确认该条消息if(...){channel.basicAck(deliverTag,false);}else{// 消费失败,消息重返队列channel.basicNack(deliverTag,false,true);}}
}

四 总结

消息丢失的原因?

生产者、MQ、消费者都有可能造成消息丢失

如何保证消息的可靠性?

  • 发送方采取发送者确认模式

  • MQ进行队列及消息的持久化

  • 消费者消费成功后手动确认消息

有道无术,术可成;有术无道,止于术

欢迎大家关注Java之道公众号

好文章,我在看❤️

给你1分钟,回答下RabbitMQ如何保证消息不丢?相关推荐

  1. RabbitMQ消息中间件(二) RabbitMQ如何保证消息的可靠性投递

    RabbitMQ如何保证消息投递的准确性? 生产端的可靠性投递: 1.保证消息成功发送 2.保证MQ节点成功接收 3.发送端收到MQ节点(Broker)确认应答 4.完善的消息补偿机制 BAT等大厂解 ...

  2. Rabbitmq专题:rabbitMQ如何保证消息的可靠性投递?如何防止消息丢失

    文章目录 1. 消息可能出现丢失的情况 2. 生产者如何保证消息的可靠性投递 2.1 消息落库打标 + confirm机制 2.2 消息幂等性如何保证? 2.3 延时消息确认 3. rabbitMQ服 ...

  3. SpringBoot+RabbitMQ ,保证消息100%投递成功并被消费(附源码)

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试资料 来源:rrd.me/f2cxz 一.先扔一张图 说明: 本文涵盖了 ...

  4. RabbitMQ(十):RabbitMQ 如何保证消息的可靠性

    一条消费成功被消费经历了生产者->MQ->消费者,因此在这三个步骤中都有可能造成消息丢失. 一 消息生产者没有把消息成功发送到MQ 1.1 事务机制 AMQP协议提供了事务机制,在投递消息 ...

  5. rabbitmq消费固定个数消息_SpringBoot+RabbitMQ (保证消息100%投递成功并被消费)

    作者:wangzaiplus https://www.jianshu.com/p/dca01aad6bc8 一.先扔一张图 说明:本文涵盖了关于RabbitMQ很多方面的知识点, 如: 消息发送确认机 ...

  6. SpringBoot + RabbitMQ (保证消息100%投递成功并被消费)

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 来源 | jianshu.com/p/dca01aad6 ...

  7. springboot + rabbitmq发送邮件(保证消息100%投递成功并被消费)

    前言: RabbitMQ相关知识请参考: https://www.jianshu.com/p/cc3d2017e7b3 Linux安装RabbitMQ请参考: https://www.jianshu. ...

  8. 消费流程图_SpringBoot+RabbitMQ ,保证消息100%投递成功并被消费(附源码)

    来自:简书,作者:wangzaiplus 链接:https://www.jianshu.com/p/dca01aad6bc8 一.先扔一张图 说明: 本文涵盖了关于RabbitMQ很多方面的知识点, ...

  9. RabbitMQ 如何保证消息的可靠性

    一条消费成功被消费经历了生产者->MQ->消费者,因此在这三个步骤中都有可能造成消息丢失. 一 消息生产者没有把消息成功发送到MQ 1.1 事务机制 AMQP协议提供了事务机制,在投递消息 ...

最新文章

  1. 八年级计算机网络公开课,计算机网络公开课教案.doc
  2. idea不区分大小写设置_我的 IntelliJ IDEA 一直都是这么设置的,效果很棒!
  3. python 调用c++ 回调函数
  4. python科学计数法转换_柳小白Python学习笔记35 Excel之科学计数法类型转换及数据选取1...
  5. vss6 forgot admin password
  6. 分库分表下极致的优化
  7. NOI.AC#2007-light【根号分治】
  8. 转document.documentElement和document.body的区别
  9. 矩阵论思维导图_《实变函数论》 江泽坚 3rd 思维导图与笔记整理
  10. 你不是一个人在战斗!
  11. 如何激活Microsoft Office 2010?
  12. 一步步完成FastDFS + Spring MVC上传下载整合示例
  13. 数据通信技术初级工程师证题库
  14. Java枚举类配合Switch
  15. 模拟电子_热敏电阻PTC和NTC的区别与作用
  16. QT---创建桌面快捷方式
  17. vue+elementUI 怎么上传图片至阿里云
  18. win系统的阿里云服务器部署IDEASpringBoot项目保姆级教程
  19. H2教程系列(二) 创建数据库
  20. 计算机无误的英语,“开电脑”的英语正确表示是哪个?说错了就尴尬

热门文章

  1. FastDFS的介绍
  2. java mongodb 模糊查询_Java操作MongoDB插入数据进行模糊查询与in查询功能的方法
  3. 计算机考研310分什么水平,知乎工学考研310是什么水平
  4. Android横向滚动卡片,Android实现横向滑动卡片效果
  5. AVB中将公钥转换成字符数组头文件的实现
  6. andriod 自写的view 获得屏幕大小和 获得自写view大小的不同写法
  7. 计算机网络之网络层:5、DHCP协议、ICMP协议、网络地址转换NAT
  8. Python Twisted介绍
  9. HookProc 和 CallNextHookEx
  10. AttributeError: module ‘urllib‘ has no attribute ‘urlopen‘错误