大揭秘!RocketMQ如何管理消费进度
在企业实践RocketMQ时基本上80%都是不消费问题,而由于消费进度问题导致不消费的问题又是最难确认的和排查的。RocketMQ的消费进度分为本地消费进度管理和远程消费进度管理,分别对应的消费模式是广播消费和集群消费。
本文选自《RocketMQ分布式消息中间件:核心原理与最佳实践》一书,带你层层揭秘RocketMQ如何管理消费进度。
什么是消费进度
消费进度,也就是由Broker管理每一个消费者消费Topic的进度,包含正常提交消费进度和重置消费进度,如下:
上图表示一个消费者组A,部署了2个消费者实例consumer instance1和consumer instance2。
- consumer instance1消费queue1和queue2
- consumer instance2消费queue3和queue4
这里的消费进度是指consumer instance1分别消费到queue1和queue2第多少条消息,consumer instance2分别消费到queue3和queue4第多少条消息。
在集群消费时,消费进度由消费者主动“上报”给Broker,广播消费时由消费者自己本地保存。
为什么需要消费进度
消费进度管理的目的是保证消费者在正常运行状态、重启、异常关闭等状态下都能准确续接“上一次”未处理的消息。
在RocketMQ中,实现的消费语义叫“至少投递一次”,也就是所有的消息至少有一次机会消费不用担心会丢消息。用户需要实现消费幂等来避免重复投递对业务实际数据的影响。
什么时候“上报”消费进度
消费者一般在两种情况下“上报”消费进度,消费成功后(包含正常消费成功、重试消费成功)和重置消费进度。如下图2展示了,图3展示了:
消费成功后提交消费进度的过程
重置消费进度的过程
二者共同点:
• 都是由Broker统一管理消费者的消费进度
• 都需要由消费者“主动上报”最新的消费进度
二者的差异点:
• 正常消费时提交消费进度,一般消费进度是向前推进
• 重置消费进度时提交消费进度,消费进度可能向前推进,也可能向后回溯
消费进度管理代码分析
在RocketMQ中,将消费进度管理抽象为消费进度管理接口OffsetStore,该接口有两个实现类: RemoteBrokerOffsetStore和LocalFileOffsetStore,他们分别实现了集群消费、广播消费的消费进度管理。
下图描述了OffsetStore、RemoteBrokerOffsetStore和LocalFileOffsetStore三者的类图关系:
OffsetStore接口定义了消费进度管理的基本方法,具体方法列表如下(方法参数已省略):
load(): 加载全部消费者的消费进度信息
updateOffset(): 更新一个queue的消费进度
readOffset(): 读取一个queue的消费进度
persistAll(): 持久化全部消费进度
persist(): 持久化一个queue的消费进度
removeOffset(): 移除一个queue的消费进度
cloneOffsetTable(): 克隆一个topic的消费进度
updateConsumeOffsetToBroker(): 更新消费进度到Broker
RemoteBrokerOffsetStore的实现是将消费进度信息保存到Broker中;LocalFileOffsetStore的实现是将消费进度信息保存到本地文件中。
/ 彩蛋1 /
updateConsumeOffsetToBroker() 这个方法是将消费进度更新到Broker中,想必在LocalFileOffsetStore是没有实现该方法的。通过看源码,也印证了我们的猜想:
接来下以用Push的方式消费普通消息(非顺序消息)为例,具体讲解如何消费成功、重置消费位点整个过程是如何的。
▊ 消费成功,如何提交消费进度?
在RocketMQ中,消费者是一批一批的消费的,Push消费方式默认每批16条消息,消费完成后会调用ConsumeMessageConcurrentlyService。
processConsumeResult()方法处理消费结果,该方法会更新这批消息中对应Topic的queue的消费进度,具体核心代码片段如下:
1long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
2 if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
3this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
4 }
以上代码主要涉及3个核心方法removeMessage()、isDropped()、updateOffset()。
removeMessage()方法是将成功消费的消息从本地缓queue中删除,并返回这个queue的消费位点。
isDropped()这个方法是判断这些消息所在的本地queue是否被drop了,如果被drop了消费进度就不更新。一般由于有消费者上线、下线、broker宕机等引发消费者负载均衡,导致这个queue已经分配给其他消费者。
updateOffset(): 更新本地内存中的消费位点。
实现代码如下:
1public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly){2 if (mq != null) {3 AtomicLong offsetOld = this.offsetTable.get(mq);4 if (null == offsetOld) {5 offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));6 }7 if (null != offsetOld) {8 if (increaseOnly) {9 MixAll.compareAndIncreaseOnly(offsetOld, offset);
10 } else {
11 offsetOld.set(offset);
12 }
13 }
14 }
15}
代码中this.offsetTable的类型是ConcurrentMap<MessageQueue,AtomicLong>,表示一个本地queue和其消费位点的对应关系,看到这里大家不禁心中会冒起疑问: 不是更新位点到Broker中嘛? 是的,确实不是。在RocketMQ的设计中,本地消费位点和Broker位点同步是异步的。大家如果顺着persistAll()方法找调用关系,会发现RocketMQ客户端在启动时会初始化一个定时任务调用persistAll()方法,将offsetTable中的本地位点信息更新到Broker中。
persistAll()方法主要是通过调用updateConsumeOffsetToBroker()方法将消费进度更新到Broker的,核心代码片段如下:
1public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,2 MQBrokerException, InterruptedException, MQClientException {3 ...4 if (isOneway) {5 this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(6 findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);7 } else {8 this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(9 findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
10 }
11 }
12...
13 }
updateConsumeOffsetToBroker()方法将一个queue的消费进度信息封装为一个RPC请求的requestHeader,再加上请求代码RequestCode.UPDATE_CONSUMER_OFFSET一起封装成为一个RPC的请求命令RemotingCommand,最后调用网络层方法invokeOneway()将该RPC请求发送给Broker。
/ 彩蛋2 /
这里特别注意,RocketMQ默认是通过invokeOneway()方法将该请求发送出去的,也就是说客户端只管发请求。不管Broker的返回结果。如果网络不好或者Broker处理慢,可能发现一个现象: 消费者一直在正常消费,而Broker的消费进度信息更新很慢。
▊ 重置消费进度如何生效?
RocketMQ目前支持重置消费进度到某个具体时间,重置消费位点逻辑中客户端部分和正常消费一致,只是消费进度更新发起者是RocketMQ Console,具体过程如下图6所示:
第一步,用户可以在RocketMQ Console的Topic页面,重置一个Topic的某一个消费者组的消费进度到某个时刻。
第二步,当Broker收到Console发送的重置消费进度请求后,会根据重置时间查找该时间对应的每个queue的消费位点,然后将这些信息封装后发送给每一个消费者实例。
第三步,消费者收到Broker发送的重置位点请求后,更新本地消费进度。
/ 彩蛋3 /
这里有个坑,除了java客户端之外,如果是CPP/Python/Go等基于CPP客户端封装的多语言客户端会重置失败,原因时Broker在封装请求时,只是按照java协议封装了请求包,该包其他语言会解析失败,导致重置位点失败。目前笔者已经提PR(pr id=1930)处理了。
第四步,消费者本地的定时任务定时将本地位点信息同步到Broker。(逻辑和成功消费时一致)
通过我们大量的实践发现,何时提交消费进度、如何提交消费进度是排查问题的主要依据,在掌握了这两点后,问题基本迎刃而解。
想要了解更多关于RocketMQ的原理实现可以阅读《RocketMQ分布式消息中间件:核心原理与最佳实践》一书。
这是一本讲解RocketMQ最佳实践的系统化书籍,作者有在RocketMQ在线高可靠场景下的深度开发和运营经验,踩过很多坑,总结出宝贵的经验。内容清晰易懂,又结合了最佳实践的经验,可以当作RocketMQ初学的参考书,也可以当作在线深度大规模使用的工具书。
关于作者
Apache RocketMQ北京社区联合发起人,RocketMQ项目Commiter,RocketMQ社区Python客户端项目负责人。目前就职于北京某在线教育公司,担任高级大数据工程师,曾负责公司消息与数据流平台,目前主要负责OLAP团队,对分布式存储计算系统设计有丰富经验,热衷于知识分享和社区活动。
大揭秘!RocketMQ如何管理消费进度相关推荐
- RocketMQ(十)——Consumer消费进度(Offset)的管理
文章目录 Consumer消费进度(Offset)的管理 Offset本地管理模式 Offset远程管理模式 offset用途 重试队列 offset的同步提交与异步提交 Consumer消费进度(O ...
- 【高项】项目整体管理、范围管理与进度管理(十大管理)
[高项]项目整体管理与范围管理 文章目录 1.项目整体管理 1.1 整体管理的过程 1.2 制定项目章程(启动) 1.3 制订项目管理计划(规划) 1.4 指导与管理项目执行(执行) 1.5 监控项目 ...
- 康复治疗学可以考计算机吗,【大揭秘】2018“人机对话”康复医学治疗技术专业技术资格考试...
原标题:[大揭秘]2018"人机对话"康复医学治疗技术专业技术资格考试 昨天,关于"2018年康复医学治疗技术专业技术资格考试采用人机对话考试方式"的通知一经发 ...
- 计算机制片管理系统,Agile Shot:新一代影视制片流程管理系统大揭秘
原标题:Agile Shot:新一代影视制片流程管理系统大揭秘 制片管理是影视行业的一个重要概念.一个剧组小到几十人,大到成百上千人,涉及的工作领域非常宽泛,工作纷繁复杂,制片管理贯穿影视生产的全过程 ...
- 技术破局,业绩狂飙十倍:亿级电商平台重构大揭秘
重构助力业务腾飞:一个年增长率10倍的亿级电商平台的技术故事 引言 这篇文章,将聊一聊:在业绩年增10倍的狂飙式增长下(年GMV5亿到年50亿),一个累计超过百亿GMV的电商平台,是如何进行从0到1的 ...
- kafka_消费者组消费进度监控实现
对于 Kafka 消费者,最重要的就是监控它们的消费进度,或者说监控它们消费的滞后程度(消费者 Lag 或 Consumer Lag). 所谓滞后程度,就是指消费者当前落后于生产者的程度.Lag 的单 ...
- python就业方向及工资-【行情分享】python就业方向与薪资大揭秘
原标题:[行情分享]python就业方向与薪资大揭秘 学python,我们要首先问自己,是为了转行?提升自己?还是什么,有了明确的目标,才会沉下心来学习.我学习python的目标是想要转行,可以跟大家 ...
- 深入理解RocketMQ:Consumer消费消息原理
前言 RocketMQ版本:4.8 Consumer类型:DefaultMQPushConsumer 原理解析 consumer 启动时做了哪些事情? 定时从NameSrv获取最新的Topic+Que ...
- 技术系列课回顾 | 网易云信线上万人连麦技术大揭秘
导读:本文根据网易云信资深音视频服务端开发工程师陈策在<MCtalk Live#5:网易云信线上万人连麦技术大揭秘>线上直播分享整理. 文|陈策 网易云信资深音视频服务端开发工程师 大家好 ...
- 【CDS技术揭秘系列 02】阿里云CDS-SLS大揭秘
简介:CDS-SLS 作为云化的日志平台,将组件进行高内聚低耦合,线下用户最低可以在6台规模的机器上将上述所有的功能自动化部署,在运维.运营.财务管理.数据分析报表等大数据场景领域以低代码模式有效解决 ...
最新文章
- java web程序示例_想要建立一些有趣的东西吗? 这是示例Web应用程序创意的列表。...
- im和音视频开发哪个更好_如何阅读成为更好的开发者的方式
- cmd进入Oracle的sql*plus
- 机器学习的教训:5家公司分享的错误经验
- Hibernate映射关系
- CSU 1328: 近似回文词
- IOS中UITableView异步加载图片的实现
- C#判断点和直线的位置关系
- SMP、NUMA、MPP体系结构介绍
- linux percpu机制解析
- 建立自己的voc数据集_Mac上 制作自己的VOC数据集
- C4996	'GetVersionExW': 被声明为已否决	TTS_one	f:\vs2015\speechsdk\include\sphel
- 【大牛分享】人机工程简史
- Facebook主页照片和封面照片的尺寸要求
- Linux下c语言的图形编程
- 创建一个整型变量toes,并将toes设置为10.
- innodb buffer pool管理--free list
- java优化代码常见套路
- 正点原子OLED显示实验
- MySQL图形界面创建数据库