消息队列会丢失消息吗?

答案是肯定的,所以对于业务严谨的数据,我们要确保其在消息队列中的安全,不能丢。

要想解决不丢的问题,首先要弄清楚 消息是怎么丢的呢?

丢消息的关键点有3个:

  • Producer 发送消息的过程
  • 消息队列的消息存储
  • Consumer 消费消息的过程

下面挨个看看都是怎么丢的,以及解决方案。

会以 RabbitMQ 和 Kafka 这两个常用的消息系统来说明。

1. Producer 弄丢消息

Producer 向 MQ 发消息,很简单,发过去就完事儿了。

但是,在发送图中是存在危险的,例如网络问题等等,导致 MQ 没有正常收到。

怎么解决呢? 思路很简单,让 MQ 发一个 接受确认声明(ack) 就行了,就像快递需要签收一样。

例如 RabbitMQ,有两种方式可以确保发送消息的安全。

1)事务消息

Producer 发送消息之前,先开启事务,然后再发送。

如果 RabbitMQ 没有正常收到消息,Producer 会收到异常信息,回滚事务。

如果正常接收了,Producer 就提交事务。

很可靠,但效率低,因为这个事务模式是同步的,会产生阻塞。

2)confirm 确认模式

Producer 开启 confirm 模式,发送消息的时候,RabbitMQ 会给这个消息分配一个唯一的 ID。

成功写入队列之后,RabbitMQ 会向 Producer 发送一个 ack 消息,说明此 ID 的消息已经成功发送。

confirm 模式还有一个回调机制,Producer 可以准备一个失败的接口,供 RabbitMQ 在接收失败时调用。

Producer 收到失败通知,或者超时了,可以执行相应的处理逻辑,例如重发。

confirm 模式是异步的,比事务消息更高效,使用更为广泛。

Kafka 也是使用的 ack 方式,使用方式很简单,只要配置:

ack=all

确保 Kafka 在完全接收成功后才发送确认通知,这样就一定不会发丢了。

2. MQ 在存储期间弄丢消息

MQ 成功接收消息之后,需要保存起来,等着 Consumer 消费。

在这个保存期间,也可以能丢失消息。

这通常是由 MQ 故障引起的。

RabbitMQ 想要保障消息不丢,需要开启持久化,消息就会写入磁盘。

即使 RabbitMQ 宕机了,只要磁盘没事儿,重启之后还可以重新把消息加载进来。

如果想进一步的保障消息安全,就需要配置 RabbitMQ 的镜像集群了,来确保高可用。

Kafka 是天然的分布式系统,Topic 分为多个 Partition,每个 Partition 又有多个副本。

Partition 的多个副本,分为 Leader 和 Follower。

Leader 负责处理消息的读写,Follower 负责备份。

前面说的 Kafka 配置 ack=all,就是告诉Kafka,Leader 和所有 Follower 全都接收到了,才算发送 ack 确认,只有 Leader 自己接收成功是不算的。

否则的话,如果 Leader 接收完成就告诉 Producer OK 了,在 Leader 同步给 Follower 之前,Leader 宕机了,Kafka 会从 Follower 中选举出新的 Leader。那么,老 Leader 在临终前没有同步的消息就丢失了。

为了保障消息的安全,这 4 个参数要设置好:

replication.factor

用于指定 Partition 副本的数量,必须大于 1,就是至少要有 2 个副本,一个 Leader 一个 Follower。

min.insync.replicas

用于指定几个副本成功写入才提交消息,只有提交之后的消息才能被 Consumer 消费。

此值至少大于 1,这样就保障 Leader 之外至少有一个副本同步到了这条消息,不怕 Leader 宕掉了。

acks=all

用于指定几个副本接收到消息之后向 Producer 发送 ack。例如值为 1,表示 Leader 收到就可以了,“all” 表示 “所有副本”,也可以写 “-1”,等同于 “all”。

retries=999

用于指定 Producer 发送失败后的重试次数,可以设为一个很大的数,表示失败了就重试,提升发送成功几率。

3. Consumer 弄丢消息

例如 Consumer 成功接收到了消息 “123”,MQ 就会移除这条消息。

但在 Consumer 处理完这条消息之前,宕机了。

Consumer 重启之后继续从 MQ 拿消息,这次拿到的就是下一条消息 “124”,那么 “123” 就丢了。

所以,Consumer 只是接收到消息是不够的,成功处理完成才行。

这就和 MQ 的消费确认机制有关了。

RabbitMQ 默认是 Consumer 成功接收消息之后就发送 ack 确认,RabbitMQ 就认为消费成功了。

关闭自动的 Consumer ack 就行,改为手动发送确认通知。

Kafka 的 Consumer 发送的不是 ack 确认,而是 offset,告诉 Kafka 已经消费到哪个位置了。

默认是 Consumer 接收后自动提交 offset,所以也需要关闭,改为手动提交。

小结一下,要想消息不丢,需要发消息的时候确认发送成功了,MQ 存储的时候要是高可靠的,Consumer 消费的时候,不能接收之后就确认,真正处理完成才行。

推荐阅读

OAuth2 图解

轻松理解 Kubernetes 的核心概念

开发者必须要了解的架构技术趋势:Service Mesh

消息队列把消息弄丢了怎么办?相关推荐

  1. 消息队列把消息弄丢了怎么办

    消息队列把消息弄丢了怎么办 消息队列会丢失消息吗? 答案是肯定的,所以对于业务严谨的数据,我们要确保其在消息队列中的安全,不能丢. 要想解决不丢的问题,首先要弄清楚 消息是怎么丢的呢? 丢消息的关键点 ...

  2. 消息队列面试 - 为什么使用消息队列,消息队列有什么优点和缺点?

    消息队列面试 - 为什么使用消息队列,消息队列有什么优点和缺点? 面试题 为什么使用消息队列? 消息队列有什么优点和缺点? Kafka.ActiveMQ.RabbitMQ.RocketMQ 都有什么区 ...

  3. 面试题:为什么使用消息队列?消息队列有什么优缺点?

    目录 1. 面试题 2. 面试官心理分析 3. 面试题剖析 3.1. 为什么使用消息队列 3.2. 消息队列有什么优缺点 3.3. Kafka.ActiveMQ.RabbitMQ.RocketMQ 有 ...

  4. 阿里Java面试题剖析:为什么使用消息队列?消息队列有什么优点和缺点?

    面试题 为什么使用消息队列? 消息队列有什么优点和缺点? Kafka.ActiveMQ.RabbitMQ.RocketMQ 都有什么区别,以及适合哪些场景? 面试官心理分析 其实面试官主要是想看看: ...

  5. 消息队列的消息大量积压怎么办

    目录 消息积压简介 生产端 消费端 已经消息积压,如何处理 总结 注意:本文参考  消息队列的消息大量积压怎么办?_JavaEdge.的博客-CSDN博客_消息队列积压了大量消息怎么处理 消息积压简介 ...

  6. 【Android 异步操作】手写 Handler ( 消息队列 MessageQueue | 消息保存到链表 | 从链表中获取消息 )

    文章目录 一.MessageQueue 消息队列存储消息 二.MessageQueue 消息队列取出消息 三.消息队列完整代码 一.MessageQueue 消息队列存储消息 Message 链表 : ...

  7. 消息队列_消息队列:kafka

    概念 kafka是一个分布式的基于发布/订阅模式的消息队列,主要用于大数据实时处理领域. 要理解kafka首先要有分布式的概念,要有消息队列的概念.分布式系统最大的优势就是解耦和削峰,这种情况下,A系 ...

  8. [转载]用消息队列和消息应用状态表来消除分布式事务

    由于数据量的巨大,大部分Web应用都需要部署很多个数据库实例.这样,有些用户操作就可能需要去修改多个数据库实例中的数据.传统的解决方法是使用分布式事务保证数据的全局一致性,经典的方法是使用两阶段提交协 ...

  9. linux 消息队列_Linux消息队列

    消息队列,Unix的通信机制之一,可以理解为是一个存放消息(数据)容器.将消息写入消息队列,然后再从消息队列中取消息,一般来说是先进先出的顺序.可以解决两个进程的读写速度不同(处理数据速度不同),系统 ...

  10. Threadx 消息队列-发送消息_tx_queue_send

    消息队列-发送消息_tx_queue_send 1,发送消息会插入到队列尾部. 2,如果消息队列有挂起的接收线程,发送消息时,可以直接把消息放到接收线程的缓冲中,这可以降低消息传递延时. TX_THR ...

最新文章

  1. JFET直耦级联放大电路:MPF102,2SK102
  2. Google上面有自己给你标注好的数据集
  3. Eclipse中看java源代码
  4. 有关python方面的论文_一篇文章可以带你理解python中的类
  5. [vue] 你有写过自定义指令吗?自定义指令的生命周期(钩子函数)有哪些?
  6. *【ZOJ - 3703】Happy Programming Contest(带优先级的01背包)
  7. 程序员的算法课(10)-字符串排序算法实例(纯代码)
  8. 怎么更改wifi频段_wifi信号差?网速慢?这样做不浪费每一兆宽带
  9. 专注计算机专业知识讲授,计算机一级考试MS Office上机指导
  10. C# 多线程同步和线程通信
  11. 如何使得窗口最大化?
  12. 电线直径对照表_电线平方与直径对照表
  13. 分享多年收集的40款免费开源源码
  14. C# ZipArchive 文件末端错误 的解决方案
  15. 使用Python爬虫爬取淘宝商品并分析
  16. 计算机信息安全技术分为两个层次,李某将玉佩以合理价格转让给善意第三人朱某时,下列说法正确的是?()。...
  17. 关于校园招聘 - 秋招和春招
  18. python如何裁剪图像
  19. 【Markdown笔记】数学公式 三角函数
  20. FPGA协同验证方法-资料整理

热门文章

  1. 嵌入式成长轨迹27 【Linux应用编程强化】【中嵌第二阶段】【进程管理】
  2. rsync报错:some files/attrs were not transferred (see previous errors) (code 23) at main.c(1052) [sende
  3. UART 串口通信实验
  4. 计算机毕设选题推荐之智慧考试系统 在线批卷阅卷系统 随机组卷系统 考试题库系统 springboot考试系统 作业管理系统 在线作业批改系统(源代码+文档+调试+讲解)
  5. 【数据压缩】WAV文件格式
  6. github中如何删除已有项目仓库
  7. 趋势科技对ShellshockBash漏洞的解决方案说明
  8. C语言 编程判断花瓶是谁打碎的,C语言解决是谁打碎花瓶的问题
  9. 视频:搜狗CEO王小川终于把区块链讲通透了
  10. 使用IBM SPSS Statistics检验变量间是否存在共线性