RocketMQ 消息丢失场景分析及如何解决!
本文来源:https://blog.csdn.net/LO_YUN/article/details/103949317
既然在项目中使用了MQ,那么就不可避免的需要考虑消息丢失问题。在一些涉及到了金钱交易的场景下,消息丢失还是很致命的。那么在RocketMQ中存在哪几种消息丢失的场景呢?
先来一张最简单的消费流程图:
上图中大致包含了这么几种场景:
生产者产生消息发送给RocketMQ
RocketMQ接收到了消息之后,必然需要存到磁盘中,否则断电或宕机之后会造成数据的丢失
消费者从RocketMQ中获取消息消费,消费成功之后,整个流程结束
这三种场景都可能会产生消息的丢失,如下图所示:
1、场景1中生产者将消息发送给Rocket MQ的时候,如果出现了网络抖动或者通信异常等问题,消息就有可能会丢失
2、场景2中消息需要持久化到磁盘中,这时会有两种情况导致消息丢失
RocketMQ为了减少磁盘的IO,会先将消息写入到os cache中,而不是直接写入到磁盘中,消费者从os cache中获取消息类似于直接从内存中获取消息,速度更快,过一段时间会由os线程异步的将消息刷入磁盘中,此时才算真正完成了消息的持久化。在这个过程中,如果消息还没有完成异步刷盘,RocketMQ中的Broker宕机的话,就会导致消息丢失
如果消息已经被刷入了磁盘中,但是数据没有做任何备份,一旦磁盘损坏,那么消息也会丢失
3、消费者成功从RocketMQ中获取到了消息,还没有将消息完全消费完的时候,就通知RocketMQ我已经将消息消费了,然后消费者宕机,但是RocketMQ认为消费者已经成功消费了数据,所以数据依旧丢失了。
那么如何保证消息的零丢失呢?
1、场景1中保证消息不丢失的方案是使用RocketMQ自带的事务机制来发送消息,大致流程为:
首先生产者发送half消息到RocketMQ中,此时消费者是无法消费half消息的,若half消息就发送失败了,则执行相应的回滚逻辑
half消息发送成功之后,且RocketMQ返回成功响应,则执行生产者的核心链路
如果生产者自己的核心链路执行失败,则回滚,并通知RocketMQ删除half消息
如果生产者的核心链路执行成功,则通知RocketMQ commit half消息,让消费者可以消费这条数据
其中还有一些RocketMQ长时间没有收到生产者是要commit/rollback操作的响应,回调生产者接口的细节,感兴趣的可以参考:
https://blog.csdn.net/LO_YUN/article/details/101673893
在使用了RocketMQ事务将生产者的消息成功发送给RocketMQ,就可以保证在这个阶段消息不会丢失
2、在场景2中要保证消息不丢失,首先需要将os cache的异步刷盘策略改为同步刷盘,这一步需要修改Broker的配置文件,将flushDiskType改为SYNC_FLUSH同步刷盘策略,默认的是ASYNC_FLUSH异步刷盘。
一旦同步刷盘返回成功,那么就一定保证消息已经持久化到磁盘中了;为了保证磁盘损坏不会丢失数据,我们需要对RocketMQ采用主从机构,集群部署,Leader中的数据在多个Follower中都存有备份,防止单点故障。
3、在场景3中,消息到达了消费者,RocketMQ在代码中就能保证消息不会丢失
//注册消息监听器处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ //对消息进行处理return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});
上面这段代码中,RocketMQ在消费者中注册了一个监听器,当消费者获取到了消息,就会去回调这个监听器函数,去处理里面的消息
当你的消息处理完毕之后,才会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS 只有返回了CONSUME_SUCCESS,消费者才会告诉RocketMQ我已经消费完了,此时如果消费者宕机,消息已经处理完了,也就不会丢失消息了
如果消费者还没有返回CONSUME_SUCCESS时就宕机了,那么RocketMQ就会认为你这个消费者节点挂掉了,会自动故障转移,将消息交给消费者组的其他消费者去消费这个消息,保证消息不会丢失
为了保证消息不会丢失,在consumeMessage方法中就直接写消息消费的业务逻辑就可以了,如果非要搞一些骚操作,比如下面的代码
//注册消息监听器处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ //开启子线程异步处理消息new Thread() {public void run() {//对消息进行处理}}.start(); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});
如果新开子线程异步处理消息的话,就有可能出现消息还没有被消费完,消费者告诉RocketMQ消息已经被消费了,结果宕机丢失消息的情况。
使用上面一整套的方案就可以在使用RocketMQ时保证消息零丢失,但是性能和吞吐量也将大幅下降
使用事务机制传输消息,会比普通的消息传输多出很多步骤,耗费性能
同步刷盘相比异步刷盘,一个是存储在磁盘中,一个存储在内存中,速度完全不是一个数量级
主从机构的话,需要Leader将数据同步给Follower
消费时无法异步消费,只能等待消费完成再通知RocketMQ消费完成
消息零丢失是一把双刃剑,要想用好,还是要视具体的业务场景而定,选择合适的方案才是最好的
RocketMQ 消息丢失场景分析及如何解决!相关推荐
- 消息中间件学习总结(21)——RocketMQ 消息丢失场景分析及如何解决!
项目中使用了MQ,那么就不可避免的需要考虑消息丢失问题.在一些涉及到了金钱交易的场景下,消息丢失还是很致命的.那么在RocketMQ中存在哪几种消息丢失的场景呢?先来一张最简单的消费流程图: 上图中大 ...
- Java性能优化推荐书!RocketMQ消息丢失场景及解决办法
毫不夸张的说,这份SpringBoot学习指南能解决你遇到的98%的问题 给跪了!这套万人期待的 SQL 成神之路PDF,终于开源了 既然使用在项目中使用了MQ,那么就不可避免的需要考虑消息丢失问题. ...
- RocketMQ消息丢失场景及解决办法
作者:霁云HYY 来源:https://blog.csdn.net/LO_YUN/article/details/103949317 既然在项目中使用了MQ,那么就不可避免的需要考虑消息丢失问题.在一 ...
- RocketMQ 消息丢失场景及解决办法
点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 来源 | blog.csdn.net/LO_YUN/ar ...
- RocketMQ消息丢失场景及解决办法,已拿offer入职
前言 当前我们都会说SpringBoot是Spring框架对"约定优先于配置理念的最佳实践的产物,一个典型的SpringBoot应用本质上其实就是一个基于Spring框架的应用,而如果大家对 ...
- RocketMq消息丢失问题解决
分享一个链接:https://www.zhihu.com/tardis/sogou/art/166426241 这个图很好,基本上就是这个问题的解决思路.我这里总结一下: 为什么会消息丢失? 存储在队 ...
- 【Java 并发编程】线程操作原子性问题 ( 问题业务场景分析 | 使用 synchronized 解决线程原子性问题 )
文章目录 总结 一.原子性问题示例 二.线程操作原子性问题分析 三.使用 synchronized 解决线程原子性问题 总结 原子操作问题 : 线程中 , 对变量副本 count 进行自增操作 , 不 ...
- twitter storm源码走读之2 -- tuple消息发送场景分析
欢迎转载,转载请注明出处源自徽沪一郎.本文尝试分析tuple发送时的具体细节,本博的另一篇文章<bolt消息传递路径之源码解读>主要从消息接收方面来阐述问题,两篇文章互为补充. worke ...
- rabbitmq可靠性投递_解决RabbitMQ消息丢失问题和保证消息可靠性(一)
工作中经常用到消息中间件来解决系统间的解耦问题或者高并发消峰问题,但是消息的可靠性如何保证一直是个很大的问题,什么情况下消息就不见了?如何防止消息丢失?下面通过这篇文章,我们就聊聊RabbitMQ 消 ...
最新文章
- 【推荐】Flex+asp.net上传文件
- 基因组行业重大事件介绍
- appium1.6在mac上环境搭建启动ios模拟器上Safari浏览器 转自:上海-悠悠
- python学精通要多久-学习Python从入门到精通需要多长时间
- PHP极其强大的图片处理库Grafika详细教程(3):图像属性处理
- Bitcoin0.21版 公链开发(4) Apache windows上安装
- NYOJ 683 Jim的实验
- KDE Applications 19.04 发布,包含大量更新
- spring boot 带远程调试启动方式
- MUI APP关于页面之间的传值,plusready和自定义事件
- 【Shiro第八篇】SpringBoot + Shiro使用Shiro标签
- RubyonRails on linux配置
- Java 常量字符串过长
- BIP学习第一课,创建一个应用和实体并设计页面(图文)
- android微信卡,微信无响应怎么弄 让别人微信卡死的小技巧(PC/安卓适用)
- python中的max(x,key=str)
- ES6 -- 简单笔记总结
- matlab 斜坡响应,二阶系统的斜坡响应解读.docx
- 200行代码,10大知识点,3天做游戏!让你“爽爽爽”的学习利器!
- IDEA常用操作总结(长期更新)
热门文章
- oracle加as报错,【Oracle】-【sqlplus / as sysdba登录报错问题】-新用
- python语法知识_Python基础语法知识汇总(学习党的最爱!)
- php导包,Thinkphp5.1 导入第三方包的问题
- [SCOI2008]着色方案
- 二维树状数组 ---- codeforces341D
- 流网络的最小割问题c语言,「网络流24题」最小路径覆盖问题
- CF498C Array and Operations(数论 + 最大流)
- 模板 - 拓扑排序
- http 里面的post和get区别
- centos 7 mysql 创建用户_【CentOS 7MySQL常用操作4】,MySQL创建用户以及授权#180116