1、问题现象


首先接到项目反馈使用 RocketMQ 会出现如下错误:


错误信息关键点:MQBrokerException:CODE:2 DESC:[TIMEOUT_CLEAN_QUEUE]broker busy,start flow control for a while,period in queue:205ms,size of queue:880。

由于项目组并没有对消息发送失败做任何补偿,导致丢失消息发送失败,故需要对这个问题进行深层次的探讨,并加以解决。

2、问题分析


首先我们根据关键字:TIMEOUT_CLEAN_QUEUE 去 RocketMQ 中查询,去探究在什么时候会抛出如上错误。根据全文搜索如下图所示:


该方法是在 BrokerFastFailure 中定义的,通过名称即可以看成其设计目的:Broker端快速失败机制。

Broker 端快速失败其原理图如下:

  • 消息发送者向 Broker 发送消息写入请求,Broker 端在接收到请求后会首先放入一个队列中(SendThreadPoolQueue),默认容量为 10000。

  • Broker 会专门使用一个线程池(SendMessageExecutor)去从队列中获取任务并执行消息写入请求,为了保证消息的顺序处理,该线程池默认线程个数为1。

如果 Broker 端受到垃圾回收等等因素造成单条写入数据发生抖动,单个 Broker 端积压的请求太多从而得不到及时处理,会极大的造成客户端消息发送的时间延长。

设想一下,如果由于 Broker 压力增大,写入一条消息需要500ms甚至超过1s,并且队列中积压了5000条消息,消息发送端的默认超时时间为3s,如果按照这样的速度,这些请求在轮到 Broker 执行写入请求时,客户端已经将这个请求超时了,这样不仅会造成大量的无效处理,还会导致客户端发送超时。

故 RocketMQ 为了解决该问题,引入 Broker 端快速失败机制,即开启一个定时调度线程,每隔10毫秒去检查队列中的第一个排队节点,如果该节点的排队时间已经超过了 200ms,就会取消该队列中所有已超过 200ms 的请求,立即向客户端返回失败,这样客户端能尽快进行重试,因为 Broker 都是集群部署,下次重试可以发送到其他 Broker 上,这样能最大程度保证消息发送在默认 3s 的时间内经过重试机制,能有效避免某一台 Broker 由于瞬时压力大而造成的消息发送不可用,从而实现消息发送的高可用。

从 Broker 端快速失败机制引入的初衷来看,快速失败后会发起重试,除非同一时刻集群内所有的 Broker 都繁忙,不然消息会发送成功,用户是不会感知这个错误的,那为什么用户感知了呢?难道 TIMEOUT_ CLEAN _ QUEUE 错误,Broker 不重试?

为了解开这个谜团,接下来会采用源码分析的手段去探究真相。接下来将以消息同步发送为例揭示其消息发送处理流程中的核心关键点。

MQ Client 消息发送端首先会利用网络通道将请求发送到 Broker,然后接收到请求结果后并调用 processSendResponse 方法对响应结果进行解析,如下图所示:

在这里返回的 code 为 RemotingSysResponseCode . SYSTEM_BUSY。

我们从 proccessSendResponse 方法中可以得知如果 code 为 SYSTEM_BUSY,该方法会抛出 MQBrokerException,响应 code 为 SYSTEM_BUSY,其错误描述为开头部分的错误信息。

那我们沿着该方法的调用链路,可以找到其直接调用方:DefaultMQProducerImpl 的 sendKernelImpl,我们重点考虑如果底层方法抛出  MQBrokerException 该方法会如何处理。

其关键代码如下图所示:

可以看出在 sendKernelImpl 方法中首先会捕捉异常,先执行注册的钩子函数,即就算执行失败,对应的消息发送后置钩子函数也会执行,然后再原封不动的将该异常向上抛出。

sendKernelImpl 方法被 DefaultMQProducerImpl 的 sendDefaultImpl 方法调用,下面是其核心实现截图:

从这里可以看出 RocketMQ 消息发送高可用设计一个非常关键的点,重试机制,其实现是在 for 循环中 使用 try catch 将 sendKernelImpl 方法包裹,就可以保证该方法抛出异常后能继续重试。从上文可知,如果 SYSTEM_BUSY 会抛出 MQBrokerException,但发现只有上述几个错误码才会重试,因为如果不是上述错误码,会继续向外抛出异常,此时 for 循环会被中断,即不会重试。

这里非常令人意外的是连 SYSTEM_ERROR 都会重试,却没有包含 SYSTEM_BUSY,显然违背了快速失败的设计初衷,故笔者断定,这是 RocketMQ 的一个BUG,将 SYSTEM_BUSY 遗漏了,后续会提一个 PR,增加一行代码,将 SYSTEM_BUSY 加上即可。

问题分析到这里,该问题应该就非常明了。

3、解决方案


如果大家在网上搜索 TIMEOUT_CLEAN_QUEUE 的解决方法,大家不约而同提出的解决方案是增加 waitTimeMillsInSendQueue 的值,该值默认为 200ms,例如将其设置为 1000s 等等,以前我是反对的,因为我的认知里 Broker 会重试,但现在发现 Broker 不会重试,所以我现在认为该 BUG未解决的情况下适当提高该值能有效的缓解。

但这是并不是好的解决方案,我会在近期向官方提交一个PR,将这个问题修复,建议大家在公司尽量对自己使用的版本进行修改,重新打一个包即可,因为这已经违背了 Broker 端快速失败的设计初衷。

但在消息发送的业务方,尽量自己实现消息的重试机制,即不依赖 RocketMQ 本身提供的重试机制,因为受制于网络等因素,消息发送不可能百分之百成功,建议大家在消息发送时捕获一下异常,如果发送失败,可以将消息存入数据库,再结合定时任务对消息进行重试,尽最大程度保证消息不丢失。

特别推荐一个分享架构+算法的优质内容,还没关注的小伙伴,可以长按关注一下:长按订阅更多精彩▼如有收获,点个在看,诚挚感谢

RocketMQ 一行代码造成大量消息丢失相关推荐

  1. RocketMQ 一行代码造成大量消息发送失败

    作者 | 丁威 来源 | 中间件兴趣圈 问题现象 首先接到项目反馈使用 RocketMQ 会出现如下错误: 错误信息关键点:MQBrokerException:CODE:2DESC:[TIMEOUT_ ...

  2. 详解,最新整理,RabbitMQ,RocketMQ,Kafka 事务性,消息丢失,消息顺序性和消息重复发送的处理策略

    消息队列常见问题处理 分布式事务 什么是分布式事务 我们的服务器从单机发展到拥有多台机器的分布式系统,各个系统之前需要借助于网络进行通信,原有单机中相对可靠的方法调用以及进程间通信方式已经没有办法使用 ...

  3. RabbitMQ,RocketMQ,Kafka 事务性,消息丢失,消息顺序性和消息重复发送的处理策略

    消息队列常见问题处理 分布式事务 什么是分布式事务 我们的服务器从单机发展到拥有多台机器的分布式系统,各个系统之前需要借助于网络进行通信,原有单机中相对可靠的方法调用以及进程间通信方式已经没有办法使用 ...

  4. 【一封传话】一行代码实现微信消息推送

    一行代码实现微信消息推送 介绍 通过调用一个简单的接口,将要推送的消息提交后,您将在微信收到推送的消息,简单快捷 可以应用的场景设备上下线,量化交易,网站公告,服务器异常信息推送,网页更新提醒等各种可 ...

  5. RocketMQ一行代码造成消息发送失败

    这是我的第 198 期分享 作者 | 丁威 来源 | 中间件兴趣圈(ID:dingwpmz_zjj) 分享 | Java中文社群(ID:javacn666) 1.问题现象 首先接到项目反馈使用 Roc ...

  6. RocketMQ 消息丢失场景分析及如何解决!

    本文来源:https://blog.csdn.net/LO_YUN/article/details/103949317 既然在项目中使用了MQ,那么就不可避免的需要考虑消息丢失问题.在一些涉及到了金钱 ...

  7. RocketMQ消息丢失场景及解决办法

    作者:霁云HYY 来源:https://blog.csdn.net/LO_YUN/article/details/103949317 既然在项目中使用了MQ,那么就不可避免的需要考虑消息丢失问题.在一 ...

  8. RocketMQ 消息丢失场景及解决办法

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 来源 | blog.csdn.net/LO_YUN/ar ...

  9. Java性能优化推荐书!RocketMQ消息丢失场景及解决办法

    毫不夸张的说,这份SpringBoot学习指南能解决你遇到的98%的问题 给跪了!这套万人期待的 SQL 成神之路PDF,终于开源了 既然使用在项目中使用了MQ,那么就不可避免的需要考虑消息丢失问题. ...

最新文章

  1. 编程之美2.7 最大公约数,最小公倍数
  2. 使用 xCAT 简化 AIX 集群的部署和管理
  3. [已解决] 日常开发中禁用Tomcat自动重启
  4. 参加工作第三个月的感悟
  5. 2019-08-12 计划与安排
  6. 论文笔记_SLAM_Simultaneous Localization And Mapping: A Survey of Current Trends in Autonomous Driving
  7. java 性能框架_Java Fork Join 框架(四)性能
  8. C++中 explicit的用法
  9. java 配置文件参数_从Java的配置文件中读取配置参数的最佳方法是什么?
  10. win10basic模式_BASIC的完整形式是什么?
  11. 计算机管理删除打印机驱动,彻底删除打印机驱动的方法
  12. Adobe Illustrator CS6 出现错误报告16
  13. dfuse for EOSIO v0.1.0-beta4 版本更新说明
  14. fly.io ruby on rails
  15. python实验报告实验目的_20192217 实验一《Python程序设计》实验报告
  16. 王牌竞速安装后显示服务器维护,王牌竞速怎么修车 王牌竞速怎么维修车 王牌竞速的车怎么维护...
  17. intptr_t详解
  18. CAD高版本转低版本怎么转?分享几种好用的转换方法
  19. RINEX 2.11 观测值文件格式说明
  20. 腾讯企业 html邮件模板,用腾讯企业邮做为邮件服务器来发送通知邮件的操作步骤...

热门文章

  1. codeforce708C:树形dp+二次扫描
  2. javamap的用法_Java Map常用的几种用法。
  3. docker启动odoo提示module没有安装_Windows Server 2019上的Docker 入门
  4. 云服务器怎么添加虚拟内存,云服务器怎么添加虚拟内存
  5. 测试报告-1.1组成和要点
  6. WebStorm配置本地测试服务器
  7. 无聊软件-GIT屏幕录制工具_已迁移
  8. 众创汇定制如何革新工业4.0?
  9. Post和Get方法区别
  10. MongDB-副本集搭建【MongDB系列一】