文章目录

  • 1. 消息可能出现丢失的情况
  • 2. 生产者如何保证消息的可靠性投递
    • 2.1 消息落库打标 + confirm机制
    • 2.2 消息幂等性如何保证?
    • 2.3 延时消息确认
  • 3. rabbitMQ服务器如何防止消息丢失
  • 4. 消费者如何防止消息丢失

1. 消息可能出现丢失的情况


消息可能出现丢失的情况如上图所示,针对生产者、MQ、消费者三个维度都可能出现消息丢失

  1. 生产者在向MQ服务器Broker发送message时,可能由于网络原因,消息发送失败,在传输过程中丢失,此时消息还未到达MQ服务器

  2. RabbitMq服务器接收到消息,此时RabbitMq服务器突然宕机,造成消息丢失

  3. 消费端拿到消息后,还未来得及处理就宕机或者被重启了,造成消息丢失

2. 生产者如何保证消息的可靠性投递

2.1 消息落库打标 + confirm机制

针对生产者在向Broker发送message时的消息丢失,可以使用消息入库打标记并配合mq的confirm机制来保证消息可靠性,首先要了解什么是mq 的confirm机制?生产者向mq服务端发送消息时,mq服务器会根据消息的接收情况给生产者一个应答,生产者根据应答情况来确保该条消息是否成功的发送到了mq服务器!而这个应答的过程就是mq的confirm机制。

confirm机制的现实步骤如下:

  1. 在生产者的channel 上开启confirm机制channel.confirmSelect();
  2. 在生产者的channel上添加监听,用来监听mq-server返回的应答

伪代码如下:

     //开启confirm机制channel.confirmSelect();//设置confirm 监听channel.addConfirmListener(new AngleConfirmListerner());//......发送消息 //注意:生产者连接不能断开,否则无法监听回调================= 消息监听器AngleConfirmListerner =================public class AngleConfirmListerner implements ConfirmListener {//broker正常签收@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("消息deliveryTag" + deliveryTag + "被正常签收");}//broker异常签收@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("消息deliveryTag" + deliveryTag + "没被签收");}}

消息入库打标解决思路

以订单系统向mq发消息为例

发消息链路流程

  1. 把订单数据入库,同时构建消息数据,消息数据同样入库,记录在message表中,消息状态初始标记为0
  2. 生产者(订单服务)发送消息到mq服务器
  3. mq会通过confirm机制返回确认消息,订单服务监听回调的confirm结果
  4. 正常情况下,mq接收消息成功并返回ACK,订单服务监听到此ACK,并更新message表中的消息状态标记为1,表示发送与接收正常!
  5. 异常情况下,由于网络闪断,导致消费端监控mq服务访问的确认消息 没有收到,那么在msg_db中的那条消息的 状态永远就是0状态。这个时候,我们需要对这种情况下做出补偿
  6. 根据消费者(库存服务)消费情况(消费正常、消费异常),分别修改message表中的消息状态为3或4!

异常情况下的补偿机制

启动一个定时任务,去扫描message这个消息记录表,针对消息状态为0的消息,根据业务来设置重发规则

  • ①:插入message表中的消息,如果3分钟后状态还是0,代表未发送或发送失败,那么进行消息重发,并记录消息重发次数
  • ②:如果消息重发次数大于5次,且消息状态还是0的时候,就把这条消息状态设置为2,代表消息发送不成功,此时人工介入,调查未成功原因!

注意: 消息落库打标,这种方式与RokcetMQ的事务消息的思想非常类似,只不过这里使用的是本地定时扫描发送失败的消息,而RokcetMQ则是在Broker中轮询扫描失败消息。两种方式的触发位置不同!

2.2 消息幂等性如何保证?

幂等性简而言之,就是对接口发起的一次调用和多次调用,所产生的结果都是一致的。某些接口具有天然的幂等性: 比如查询接口,不管是查询一次还是多次,返回的结果都是一致的。

但对于标题2.1的发消息链路中,mq返回成功的ACK时,如果因为网络原因ACK发送失败,就导致消息生产者(订单服务)无法修改message表中的消息状态,又因为补偿机制的存在,回轮询扫描、判断并重发,如果没有幂等性保证,就会造成订单的重复提交! 再或者用户多次点击提交订单,如果没有接口幂等性,也会造成订单重复提交!

订单重复提交幂等性解决方案

如果使用户由于网络卡顿而心急,不断点击提交订单造成的订单重复提交,可以为订单生成一个全局唯一性ID(订单号+业务类型),并把该唯一id使用setnx命令保存在redis,在第一次保存的时候,由于redis中没有该key,那么就会把全局唯一ID 设置上,此时订单入库保存。若出现前端重复点击按钮, 由于第一步已经setnx上了 ,就会阻止后面的保存

mq服务端是如何保证幂等性的?

mq服务端在接受消息时,会对每一条消息都生成一个全局唯一的与业务无关的ID(inner_msg_id),先根据inner_msg_id 是否需要重复发送,再决定消息是否落地 。这样保证每条消息都只会在mq服务端落地一次。如果由于网络原因mq落地成功,但返回ack失败,生产者由于补偿机制重发消息,mq服务端会对比新消息的inner_msg_id,由于此条消息在mq服务器已落地,所以id相等,不予处理!

2.3 延时消息确认

消息入库打标存在缺点:在消息入库打标第一步的过程中,既插入了业务数据表,也同时插入了消息记录表,进行了二次db操作,在分布式环境下,可能还要保证分布式事务。延时消息确认机制相比消息入库打标,减少了一次message入库操作,不用加分布式事务,系统速度显著提高。延时消息的思路如下:


步骤如下:

  1. 订单服务首先将业务代码入库,注意:消息并没有入库
  2. 发送业务消息给mq
  3. 发送第二个延迟确认消息,与业务消息发送时间有一定间隔(1分钟),保证消费端处理完毕并消息入库之后才发送!
  4. 库存服务监听第二步发送的业务消息进行消费
  5. 消费端(库存服务)发送确认消息ack到mq,此时第三步还没有执行,间隔时间未到!
  6. 回调服务监听到这个确认消息
  7. 把这个消费端完成消费的确认消息入库
  8. 回调服务检查到延迟确认消息,会在数据库查询是否有这条消息
  9. 如果没有查到这条消息,说明第五步库存服务消费消息失败了。此时回调服务通过RPC给一个重新发送命令到上游系统

3. rabbitMQ服务器如何防止消息丢失

RabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失。所以就要对消息进行持久化处理。如何持久化?要想做到消息持久化,必须满足以下三个条件,缺一不可。下面具体说明下:

  1. Exchange 设置持久化
/*** exchangeName:交换机名称* exchangeType:交换机类型* true: 开启消息持久化* false:代表连接停掉后不自动删除掉* null:其他参数*/
channel.exchangeDeclare(exchangeName,exchangeType,true,false,null);
  1. Queue 设置持久化
/**
* queueName:队列名称
* true: 开启消息持久化
* false:代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
* false:代表连接停掉后不自动删除掉
* null:其他参数
*/
channel.queueDeclare(queueName,true,false,false,null);
  1. Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息
//消息属性
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().deliveryMode(2)//消息持久化.contentEncoding("UTF-8").correlationId(UUID.randomUUID().toString()).headers(infoMap).build();//生产者发送消息
channel.basicPublish(exchangeName,routingKey,basicProperties,(msgBody+i).getBytes());

4. 消费者如何防止消息丢失

首先看一下,生产者、mq服务器、消费者之间的消息流转过程

  1. 第一步:消息生产者向Mq服务端发送消息
  2. 第二步:mq 服务端把消息进行落地
  3. 第三步:mq 服务端向消息生产者发送ack
  4. 第四步:消息消费者从mq服务端拉取消息消费
  5. 第五步:消费者向mq服务端发送ack
  6. 第六步:mq服务端将落地消息删除

第四步消费者获取到消息之后,没有来得及处理完毕,自己直接宕机了,因为消息者默认采用的是自动ack,此时RabbitMQ的自动ack机制会通知MQ Server这条消息已经处理好了,此时消息就丢了,并不是预期的。

那么我们可以采用手动ack机制来解决这个问题,消费端处理完逻辑之后再通知MQ Server,这样消费者没处理完消息不会发送ack,如果在消费者拿到消息,没来得及处理的情况下自己挂了,此时MQ集群会自动感知到,它就会自觉的重发消息给其他的消费者服务实例。

根据上面的思路你需要完成下面的两步操作:

①:消费者关闭自动ack

 //第二个参数为false代表不自动ackchannel.basicConsume(queueName,false,new TulingAckConsumer(channel));

②:消费完成才ack,否则不ack

  try{//模拟业务Integer mark = (Integer) properties.getHeaders().get("mark");if(mark != 0 ) {//模拟消息消费System.out.println("消费消息:"+new String(body));//消费完成,发出ack通知channel.basicAck(envelope.getDeliveryTag(),false);}else{//否则抛异常throw new RuntimeException("模拟业务异常");}}catch (Exception e) {System.out.println("异常消费消息:"+new String(body));//捕捉异常,消息重回队列,让其他集群节点消费//channel.basicNack(envelope.getDeliveryTag(),false,true);//不重回队列,杀死这个消息channel.basicNack(envelope.getDeliveryTag(),false,false);}

Rabbitmq专题:rabbitMQ如何保证消息的可靠性投递?如何防止消息丢失相关推荐

  1. RabbitMQ消息中间件(二) RabbitMQ如何保证消息的可靠性投递

    RabbitMQ如何保证消息投递的准确性? 生产端的可靠性投递: 1.保证消息成功发送 2.保证MQ节点成功接收 3.发送端收到MQ节点(Broker)确认应答 4.完善的消息补偿机制 BAT等大厂解 ...

  2. 【RabbitMQ】消息的可靠性投递与签收

    消息的可靠性投递--Porducer Confirm 确认模式 发生在从Producer发送到Exchange时,发送成功/失败都会自动调用ConfirmCallBack回调方法. 步骤: (1)开启 ...

  3. php定时发送生日模块消息_RabbitMQ之消息的可靠性投递

    生产端的可靠性投递: 1.保障消息成功发送出去 2.保障mq节点成功接收消息 3.消息发送端需要收到mq服务的确认应答 4.完善的消息补偿机制(百分百成功成功,需要该步骤) 消息落库方案 订单服务调用 ...

  4. 【消息队列之rabbitmq】Rabbitmq之消息可靠性投递和ACK机制实战

    目录 一.绪论 二.生产者 2.1事务机制 2.2confirm模式 串行模式 批量模式 异步模式 三.消费者 3.1手动ACK 一.绪论 上篇文章介绍了rabbitmq的基本知识.交换机类型实战&l ...

  5. RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列

    搭建SpringBoot项目,用于演示 springboot版本 <!-- spring boot --><dependency><groupId>org.spri ...

  6. Rabbitmq专题:springboot如何整合Rabbitmq?Rabbitmq有哪些工作模式?

    文章目录 1. Rabbitmq的安装 2. Rabbitmq的基本概念 3. RabbitMQ的工作模式 3.1 "Hello World!" 简单模式 3.2 Work que ...

  7. IM即时通讯-N-如何保证消息的可靠性展示

    结论先行 客户端如何在推拉结合的模式下保证消息的可靠性展示? 原则: server拉取的消息一定是连续的 原则: 端侧记录的消息的连续段有两个作用: 1. 记录消息的连续性, 即起始中间没有断层, 2 ...

  8. udp怎么保证不丢包_MQ不丢消息,究竟是怎么实现的?

    前几天有水友提问:通过消息队列(MsgQueue,MQ)发送任务和消息,万一MQ重启了怎么办?能否保证MQ不丢消息?今天就聊聊MQ的消息必达性架构与流程.不丢消息,MQ架构设计的核心方向是什么?MQ要 ...

  9. RabbitMQ(十):RabbitMQ 如何保证消息的可靠性

    一条消费成功被消费经历了生产者->MQ->消费者,因此在这三个步骤中都有可能造成消息丢失. 一 消息生产者没有把消息成功发送到MQ 1.1 事务机制 AMQP协议提供了事务机制,在投递消息 ...

最新文章

  1. 现代永磁电机交流伺服系统_【每日文献】2020-002 永磁同步电机模糊自整定自适应积分反步控制...
  2. 阿里云自定义监控tomcat进程数
  3. 谈谈招聘时我喜欢见到的特质
  4. Redis源码剖析(十二)有序集合跳表实现
  5. 高等数学下-赵立军-北京大学出版社-题解-练习12.4
  6. session、token、jwt、oauth2 傻傻分不清
  7. AcWing1082. 数字游戏
  8. Cry On My Shoulder (背景音乐)
  9. 在钢筋混泥土的城市,打铁还需身体硬
  10. funcode坐标c语言,01 FunCode C 入门.doc
  11. 林子雨大数据软件安装和编程指南导航
  12. OpenWrt-19.07.2 For HC5861(极路由3) /HiWiFi/Gee最新固件,极路由3刷openwrt
  13. 亲爱的,别把上帝缩小了 ---- 读书笔记1
  14. win8专业版和win8.1专业版安装密钥key及其永久激活工具
  15. python系列13:python中Path常用功能
  16. SLG变现强劲,却易遭黑产侵袭,如何破局?SLG游戏安全方案定制做到出奇制胜!
  17. 数据科学必备用Python进行描述性统计数据分析详解
  18. 上海联通第一家冰激凌无限店正式开业,拥抱新零售时代!
  19. 谷歌浏览器显示不安全内容
  20. PS初体验:熟悉快捷键

热门文章

  1. Windows 2012 英文版系统安装中文语言包及时间格式设置
  2. [MySQL] mysql 的行级显式锁定和悲观锁
  3. ffmpeg合并音频(转)
  4. 【转载】Java程序设计入门 (二)
  5. 第四次作业——测试作业
  6. java.io 相关tips
  7. IOS程序中打开自己的程序,测试机器中是否装了自己的程序(openURL:,canOpenURL:)
  8. IOC容器特性注入第六篇:利用MVC注入点,把容器启动
  9. Python中的类属性和实例属性以及静态方法和类方法
  10. python五行代码解决滑块验证的缺口距离识别,破解滑块验证...