一次 RocketMQ 顺序消费延迟的问题定位

问题背景与现象

昨晚收到了应用报警,发现线上某个业务消费消息延迟了 54s 多(从消息发送到MQ 到被消费的间隔):

2021-06-30T23:12:46.756 message processing is incredibly delayed! (Current delay time: 54725, incredible delay count in 10 seconds: 5677)

查看 RocketMQ 的监控,发现确实发生了比较多的消息积压:

从 RocketMQ-Console 上面查看 Topic 的消费者:

这个 Topic,业务要求是需要有序的。所以在发送的时候,指定了业务 Key,并且消费的时候,使用的是顺序消费模式

我们使用了 RocketMQ 集群,有三个 Broker,对于这个 Topic,每个 Broker 上面都有 8 个 ReadQueue 和 WriteQueue。这里简单提一下 ReadQueue 和 WriteQueue 的意思:

在 RocketMQ 中,消息发送时使用 WriteQueue 个数返回路由信息而消息消费时按照 ReadQueue 个数返回路由信息。在物理文件层面,只有 WriteQueue 才会创建文件。举个例子:设置 WriteQueueNum = 8,ReadQueueNum = 4,会创建 8 个文件夹,代表 0 1 2 3 4 5 6 7 这 8 个队列,但在消息消费时,路由信息只返回 4,在具体拉取消息时,就只会消费0 1 2 3 这 4 个队列中的消息,4 5 6 7 压根就没有被消费。反过来,如果设置 WriteQueueNum = 4,ReadQueueNum = 8,在生产消息时只会往0 1 2 3中生产消息,消费消息时则会从0 1 2 3 4 5 6 7 所有的队列中消费,当然 4 5 6 7中压根就没有消息 ,假设消费是 Group 消费,Group 中有两个消费者,事实上只有第一个消费者在真正的消费消息(0 1 2 3),第二个消费者压根就消费不到消息(4 5 6 7)。一般我们都会设置这两个值相同,只有在需要缩容 topic 的队列数量的时候,才会设置他们不同。

问题分析

首先联想到的是,是否是消费线程卡住了呢线程卡住一般因为

  1. 发生了 Stop-the-wolrd:
  2. GC 导致
  3. 其他 safepoint 原因导致(例如 jstack,定时进入 safepoint 等等,参考我的这篇文章JVM相关 - SafePoint 与 Stop The World 全解)
  4. 线程处理消息时间过长,可能有锁获取不到,可能卡在某些 IO

采集当时的 JFR(关于 JFR,请参考我的另一系列JFR全解),发现:

  1. 在这个时间段并没有发生停滞时间很长的 GC 以及其他 Stop-the-world 的 safepoint 事件:

  1. 在这段时间,线程是 park 的,并且堆栈显示是消费线程并没有消息可以消费

既然应用并没有什么问题,我们来看看 RocketMQ 是否有什么问题。一般的 RocketMQ Broker 的日志我们关心:

  1. 消息持久化的时间消耗统计,如果这里发生异常,我们需要调优 Java MMAP 相关的参数,请参考:
  2. 消息持久化异常,查看 storeerr.log
  3. 锁异常,查看 lock.log

那究竟应该去看哪一个 broker 呢?之前提到了,发送到这个 Topic 是指定了 hashKey 的,通过消息的 hashKey 我们可以定位到是哪个 broker:

int hashCode = "我们的hashKey".hashCode();
log.info("{}", Math.abs(hashCode % 24));

我们找到了消息的 hashKey,通过上面的代码,结果是 20,也就是队列 20,通过前面的描述,我们知道每个 broker 是 8 个队列,20 对应的就是 broker-2 上面的队列,也就是 broker-2 queueId = 5 这个队列。我们来查看 broker-2 上面的日志定位问题。

我们发现 lock.log 里面有异常,如下所示,类似的有很多条,并且持续了 54s 左右,和线程 park 时间比较吻合,也和消息延迟比较吻合:

2021-07-01 07:11:47 WARN AdminBrokerThread_10 - tryLockBatch, message queue locked by other client. Group: 消费group OtherClientId: 10.238.18.6@29 NewClientId: 10.238.18.122@29 MessageQueue [topic=消息topic, brokerName=broker-2, queueId=5]

这个日志的意思是,10.238.18.122@29 这个实例尝试锁住 queueId = 5 失败,因为 10.238.18.6@29 正在持有这个锁。那么为什么会发生这种情况呢?

RocketMQ 多队列顺序消费的原理

RocketMQ 想要实现多队列顺序消费,首先需要指定 hashKey,通过 hashKey 消息会被放入特定的队列,消费者消费这个队列的时候,如果指定了顺序消费,是单线程消费的,这样就保证了同一队列内有序。

那么是如何保证每个队列是单线程消费的呢?每个 Broker 维护一个:

private final ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);

他是一个 ConcurrentMap<消费组名称, ConcurrentHashMap<消息队列, 锁对象>>。锁对象 LockEntry 包括:

RebalanceLockManager.java:

//读取 rocketmq.broker.rebalance.lockMaxLiveTime 这个环境变量,默认 60s
private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty("rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
static class LockEntry {//RocketMQ 客户端唯一 idprivate String clientId;private volatile long lastUpdateTimestamp = System.currentTimeMillis();//省略getter setterpublic boolean isLocked(final String clientId) {boolean eq = this.clientId.equals(clientId);return eq && !this.isExpired();}public boolean isExpired() {// 在 REBALANCE_LOCK_MAX_LIVE_TIME 这么长时间后过期boolean expired =(System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;return expired;}
}

RocketMQ 客户端发送 LOCK_BATCH_MQ 请求到 Broker 上面,Broker 会将客户端请求封装成为 LockEntry 并尝试更新这个 Map,如果更新成功就是获取到了锁,如果失败则没有获取这个锁。Broker 的详细更新逻辑是(感兴趣可以查看,也可以直接跳过,不影响理解,后面有便于理解的图片):

public boolean tryLock(final String group, final MessageQueue mq, final String clientId) {//判断没有已经锁住if (!this.isLocked(group, mq, clientId)) {try {//获取锁,这个锁是实例内的,因为每个 broker 维护自己的队列锁表,并不共享this.lock.lockInterruptibly();try {//尝试获取,判断是否存在,存在就判断是否过期ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);if (null == groupValue) {groupValue = new ConcurrentHashMap<>(32);this.mqLockTable.put(group, groupValue);}LockEntry lockEntry = groupValue.get(mq);if (null == lockEntry) {lockEntry = new LockEntry();lockEntry.setClientId(clientId);groupValue.put(mq, lockEntry);log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}",group,clientId,mq);}if (lockEntry.isLocked(clientId)) {lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());return true;}String oldClientId = lockEntry.getClientId();if (lockEntry.isExpired()) {lockEntry.setClientId(clientId);lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());log.warn("tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}",group,oldClientId,clientId,mq);return true;}//这里就是我们刚刚看到的日志log.warn("tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}",group,oldClientId,clientId,mq);return false;} finally {this.lock.unlock();}} catch (InterruptedException e) {log.error("putMessage exception", e);}} else {}return true;
}
//判断是否是已经锁住了
private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {//通过消费组名称获取ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);//如果不为 nullif (groupValue != null) {//尝试获取 lockEntry,看是否存在LockEntry lockEntry = groupValue.get(mq);if (lockEntry != null) {//如果存在,判断是否过期boolean locked = lockEntry.isLocked(clientId);if (locked) {lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());}return locked;}}return false;
}

每个 MQ 客户端,会定时发送 LOCK_BATCH_MQ 请求,并且在本地维护获取到锁的所有队列:

ProcessQueue.java:

//定时发送 **LOCK_BATCH_MQ** 间隔
public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));

ConsumeMessageOrderlyService.java:

if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {ConsumeMessageOrderlyService.this.lockMQPeriodically();}}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}

流程图如下所示

ConsumeMessageOrderlyService 在关闭的时候,会 unlock 所有的队列:

public void shutdown() {this.stopped = true;this.scheduledExecutorService.shutdown();this.consumeExecutor.shutdown();if (MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {this.unlockAllMQ();}
}

问题出现原因

我们这里客户端定时发送 LOCK_BATCH_MQ 间隔是默认的 20s, Broker 端锁过期的时间也是默认的 60s。

我们的集群容器编排使用了 k8s,并且有实例迁移的功能。在集群压力大的时候,自动扩容新的 Node (可以理解为虚拟机)并将创建新的服务实例部署上去。集群某些服务压力小的时候,某些服务实例会缩容下去,这时候就不需要那么多 Node 了,就会回收一部分 Node,但是被回收的 Node 上面还有不能缩容的服务实例,这时候就需要将这些服务实例迁移到其他 Node 上面。这里我们的业务实例就是发生了这个情况。

在问题出现的时候,发生了迁移,老的实例被关闭,但是没有等待 ConsumeMessageOrderlyService#shutdown 的执行,导致锁没有被主动释放,而是等待 60s 的锁过期时间后,新的实例才拿到队列锁开始消费

问题解决

  1. 在下个版本,加入针对 RocketMQ 客户端的优雅关闭逻辑
  2. 所有服务实例(RocketMQ 客户端)配置 rocketmq.client.rebalance.lockInterval 缩短心跳时间(5s),RocketMQ Broker 配置 rocketmq.broker.rebalance.lockMaxLiveTime 缩短过期时间(例如 15s),但是保持过期时间是心跳时间的 3 倍(集群中的 3 倍设计公理)

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer

一次 RocketMQ 顺序消费延迟的问题定位相关推荐

  1. RocketMQ——顺序消费(代码)

    关于rocketmq顺序消费的理解和图示可以查看该博文:RocketMQ--顺序消费和重复消费 本博客主要是以代码示例来了解顺序消费的相关内容,建议在此之前先了解下顺序消费的原理. 注:RocketM ...

  2. rocketmq 顺序消费_必须先理解的RocketMQ入门手册,才能再次深入解读

    推荐阅读一下下 2020年后想跳槽?MQ.ZK.Nginx.Kafk等分布式技术你都掌握了? 阿里架构师推荐学习的<RabbitMQ实战指南>,渣渣的你都看过吗? RocketMQ入门手册 ...

  3. rocketmq 顺序消费_10 分钟看懂消息队列 RocketMQ

    RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于2017年9月25日成为 Apache 的顶级项目.作为经历过多次阿里巴巴双十一这种&qu ...

  4. rocketmq 顺序消费_RocketMQ核心概念扫盲

    在正式进入RocketMQ的学习之前,我觉得有必要梳理一下RocketMQ核心概念,为大家学习RocketMQ打下牢固的基础. 1.RocketMQ部署架构 在RocketMQ主要的组件如下: Nam ...

  5. 一文理清RocketMQ顺序消费、重复消费、消息丢失问题

    前言 在使用消息队列时不可避免的会遇到顺序消费.重复消费.消息丢失三个问题.在一次面试字节的时候,面试官问到如何保证顺序消费,当时回答不太准确,特意此文回顾如何解决顺序消费.重复消费.消息丢失三个问题 ...

  6. RocketMQ 顺序消费只消费一次 坑

    rocketMq实现顺序消费的原理 produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一 ...

  7. rocketMq 顺序消费

    什么是顺序消费? 消息有序指的是一类消息消费时,能按照发送的顺序来消费.例如:一个订单产生了 3 条消息,分别是订单创建.订单付款.订单完成.消费时,要按照这个顺序消费才有意义.与此同时多笔订单之间又 ...

  8. 顺序消费可没你想的这么简单,队列数量的变更往往无法保证同一个账号的消息发送到同一个分区,怎么解决?

    掌握一到两门java主流中间件,是敲开BAT等大厂必备的技能,送给大家一个Java中间件学习路线,助力大家实现职场的蜕变. Java进阶之梯,成长路线与学习资料,助力突破中间件领域 在金融行业中,如果 ...

  9. RocketMQ(十二)消息堆积与消费延迟

    RocketMQ(十二)消息堆积与消费延迟 产生背景 消息处理流程中,如果Consumer的消费速度跟不上Producer的发送速度,MQ中未处理的消息会越来越多,这部分消息就被称为 堆积消息.消息出 ...

最新文章

  1. simple_html_dom meta,HTML DOM Meta content 属性
  2. moss2007 权限体系分析(深入到数据库结构的分析)
  3. python的openpyxl库如何读取特定列_Excelize 2.3.2 发布,Go 语言 Excel 文档基础库,2021 年首个更新...
  4. 傲游浏览器---自定义 UserAgent 字符串
  5. ios 多线程 -- NSOperation 常用方法
  6. lucene 各个版本介绍
  7. linux账号和权限管理思维导图,Linux系统支持的思维导图软件有哪些?
  8. leetcode16 3-Sum
  9. F6-预编译编译安装
  10. 关于ie浏览器的问题
  11. 目标检测(Object Detection)的整理
  12. 使用TypeScript两年后,还值得吗?
  13. RL01: An Introduction to Deep Reinforcement Learning
  14. 常用编码与为何要encode和decode
  15. 向武 清华大学 计算机,哥哥保送弟弟全市第二 双胞胎同时上清华(组图)
  16. 【转】ARM经典300问
  17. PwC普华永道信息技术2023助理软件工程师面经总结
  18. mysql中如何选中数据库_MySQL 选择数据库
  19. 李狗蛋和二狗子因为HTTP or RPC打起来了No.135
  20. 引发流水线堵塞的原因

热门文章

  1. 辛巴投资翻车,投资的A股st起步公司浮亏超7000万,打算清空
  2. window和python查看和调用exe的对照,结合pyqt5应用查找指定程序,并结束应用程序(酒馆战棋整活必备)
  3. 运用大数据技术揪出的犯罪分子居然是某知名电台女主持人
  4. 让企业各取所需:惠普发布36款“传奇”系列激光打印产品
  5. 2023年西安电子科技大学机械专业考研上岸经验分享
  6. 用户登录的详细流程(三)Shiro框架
  7. CorelDRAW名片设计
  8. tcpdump如何抓接口包_tcpdump抓包命令
  9. Java酒瓶瓶盖_一种白酒瓶瓶盖组件的制作方法
  10. 微营销四剑客-微营销学习笔记3:微营销工具简介