在企业实践RocketMQ时基本上80%都是不消费问题,而由于消费进度问题导致不消费的问题又是最难确认的和排查的。RocketMQ的消费进度分为本地消费进度管理和远程消费进度管理,分别对应的消费模式是广播消费和集群消费。

本文选自《RocketMQ分布式消息中间件:核心原理与最佳实践》一书,带你层层揭秘RocketMQ如何管理消费进度。

什么是消费进度

消费进度,也就是由Broker管理每一个消费者消费Topic的进度,包含正常提交消费进度和重置消费进度,如下:

上图表示一个消费者组A,部署了2个消费者实例consumer instance1和consumer instance2。

- consumer instance1消费queue1和queue2

- consumer instance2消费queue3和queue4

这里的消费进度是指consumer instance1分别消费到queue1和queue2第多少条消息,consumer instance2分别消费到queue3和queue4第多少条消息。

在集群消费时,消费进度由消费者主动“上报”给Broker,广播消费时由消费者自己本地保存。

为什么需要消费进度

消费进度管理的目的是保证消费者在正常运行状态、重启、异常关闭等状态下都能准确续接“上一次”未处理的消息。

在RocketMQ中,实现的消费语义叫“至少投递一次”,也就是所有的消息至少有一次机会消费不用担心会丢消息。用户需要实现消费幂等来避免重复投递对业务实际数据的影响。

什么时候“上报”消费进度

消费者一般在两种情况下“上报”消费进度,消费成功后(包含正常消费成功、重试消费成功)和重置消费进度。如下图2展示了,图3展示了:

消费成功后提交消费进度的过程

重置消费进度的过程

二者共同点:

• 都是由Broker统一管理消费者的消费进度

• 都需要由消费者“主动上报”最新的消费进度

二者的差异点:

• 正常消费时提交消费进度,一般消费进度是向前推进

• 重置消费进度时提交消费进度,消费进度可能向前推进,也可能向后回溯

消费进度管理代码分析

在RocketMQ中,将消费进度管理抽象为消费进度管理接口OffsetStore,该接口有两个实现类: RemoteBrokerOffsetStore和LocalFileOffsetStore,他们分别实现了集群消费、广播消费的消费进度管理。

下图描述了OffsetStore、RemoteBrokerOffsetStore和LocalFileOffsetStore三者的类图关系:

OffsetStore接口定义了消费进度管理的基本方法,具体方法列表如下(方法参数已省略):

load(): 加载全部消费者的消费进度信息

updateOffset(): 更新一个queue的消费进度

readOffset(): 读取一个queue的消费进度

persistAll(): 持久化全部消费进度

persist(): 持久化一个queue的消费进度

removeOffset(): 移除一个queue的消费进度

cloneOffsetTable(): 克隆一个topic的消费进度

updateConsumeOffsetToBroker(): 更新消费进度到Broker

RemoteBrokerOffsetStore的实现是将消费进度信息保存到Broker中;LocalFileOffsetStore的实现是将消费进度信息保存到本地文件中。

/ 彩蛋1 /

updateConsumeOffsetToBroker() 这个方法是将消费进度更新到Broker中,想必在LocalFileOffsetStore是没有实现该方法的。通过看源码,也印证了我们的猜想:

接来下以用Push的方式消费普通消息(非顺序消息)为例,具体讲解如何消费成功、重置消费位点整个过程是如何的。

▊ 消费成功,如何提交消费进度?

在RocketMQ中,消费者是一批一批的消费的,Push消费方式默认每批16条消息,消费完成后会调用ConsumeMessageConcurrentlyService。

processConsumeResult()方法处理消费结果,该方法会更新这批消息中对应Topic的queue的消费进度,具体核心代码片段如下:

1long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
2        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
3this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
4        }

以上代码主要涉及3个核心方法removeMessage()、isDropped()、updateOffset()。

removeMessage()方法是将成功消费的消息从本地缓queue中删除,并返回这个queue的消费位点。

isDropped()这个方法是判断这些消息所在的本地queue是否被drop了,如果被drop了消费进度就不更新。一般由于有消费者上线、下线、broker宕机等引发消费者负载均衡,导致这个queue已经分配给其他消费者。

updateOffset(): 更新本地内存中的消费位点。

实现代码如下:

 1public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly){2    if (mq != null) {3        AtomicLong offsetOld = this.offsetTable.get(mq);4        if (null == offsetOld) {5           offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));6        }7    if (null != offsetOld) {8        if (increaseOnly) {9            MixAll.compareAndIncreaseOnly(offsetOld, offset);
10        } else {
11            offsetOld.set(offset);
12               }
13        }
14    }
15}

代码中this.offsetTable的类型是ConcurrentMap<MessageQueue,AtomicLong>,表示一个本地queue和其消费位点的对应关系,看到这里大家不禁心中会冒起疑问: 不是更新位点到Broker中嘛? 是的,确实不是。在RocketMQ的设计中,本地消费位点和Broker位点同步是异步的。大家如果顺着persistAll()方法找调用关系,会发现RocketMQ客户端在启动时会初始化一个定时任务调用persistAll()方法,将offsetTable中的本地位点信息更新到Broker中。

persistAll()方法主要是通过调用updateConsumeOffsetToBroker()方法将消费进度更新到Broker的,核心代码片段如下:

 1public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,2        MQBrokerException, InterruptedException, MQClientException {3       ...4            if (isOneway) {5                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(6                    findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);7            } else {8                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(9                    findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
10            }
11        }
12...
13    }

updateConsumeOffsetToBroker()方法将一个queue的消费进度信息封装为一个RPC请求的requestHeader,再加上请求代码RequestCode.UPDATE_CONSUMER_OFFSET一起封装成为一个RPC的请求命令RemotingCommand,最后调用网络层方法invokeOneway()将该RPC请求发送给Broker。

/ 彩蛋2 /

这里特别注意,RocketMQ默认是通过invokeOneway()方法将该请求发送出去的,也就是说客户端只管发请求。不管Broker的返回结果。如果网络不好或者Broker处理慢,可能发现一个现象: 消费者一直在正常消费,而Broker的消费进度信息更新很慢。

▊ 重置消费进度如何生效?

RocketMQ目前支持重置消费进度到某个具体时间,重置消费位点逻辑中客户端部分和正常消费一致,只是消费进度更新发起者是RocketMQ Console,具体过程如下图6所示:

第一步,用户可以在RocketMQ Console的Topic页面,重置一个Topic的某一个消费者组的消费进度到某个时刻。

第二步,当Broker收到Console发送的重置消费进度请求后,会根据重置时间查找该时间对应的每个queue的消费位点,然后将这些信息封装后发送给每一个消费者实例。

第三步,消费者收到Broker发送的重置位点请求后,更新本地消费进度。

/ 彩蛋3 /

这里有个坑,除了java客户端之外,如果是CPP/Python/Go等基于CPP客户端封装的多语言客户端会重置失败,原因时Broker在封装请求时,只是按照java协议封装了请求包,该包其他语言会解析失败,导致重置位点失败。目前笔者已经提PR(pr id=1930)处理了。

第四步,消费者本地的定时任务定时将本地位点信息同步到Broker。(逻辑和成功消费时一致)

通过我们大量的实践发现,何时提交消费进度、如何提交消费进度是排查问题的主要依据,在掌握了这两点后,问题基本迎刃而解。

想要了解更多关于RocketMQ的原理实现可以阅读《RocketMQ分布式消息中间件:核心原理与最佳实践》一书。

这是一本讲解RocketMQ最佳实践的系统化书籍,作者有在RocketMQ在线高可靠场景下的深度开发和运营经验,踩过很多坑,总结出宝贵的经验。内容清晰易懂,又结合了最佳实践的经验,可以当作RocketMQ初学的参考书,也可以当作在线深度大规模使用的工具书。

关于作者

Apache RocketMQ北京社区联合发起人,RocketMQ项目Commiter,RocketMQ社区Python客户端项目负责人。目前就职于北京某在线教育公司,担任高级大数据工程师,曾负责公司消息与数据流平台,目前主要负责OLAP团队,对分布式存储计算系统设计有丰富经验,热衷于知识分享和社区活动。

大揭秘!RocketMQ如何管理消费进度相关推荐

  1. RocketMQ(十)——Consumer消费进度(Offset)的管理

    文章目录 Consumer消费进度(Offset)的管理 Offset本地管理模式 Offset远程管理模式 offset用途 重试队列 offset的同步提交与异步提交 Consumer消费进度(O ...

  2. 【高项】项目整体管理、范围管理与进度管理(十大管理)

    [高项]项目整体管理与范围管理 文章目录 1.项目整体管理 1.1 整体管理的过程 1.2 制定项目章程(启动) 1.3 制订项目管理计划(规划) 1.4 指导与管理项目执行(执行) 1.5 监控项目 ...

  3. 康复治疗学可以考计算机吗,【大揭秘】2018“人机对话”康复医学治疗技术专业技术资格考试...

    原标题:[大揭秘]2018"人机对话"康复医学治疗技术专业技术资格考试 昨天,关于"2018年康复医学治疗技术专业技术资格考试采用人机对话考试方式"的通知一经发 ...

  4. 计算机制片管理系统,Agile Shot:新一代影视制片流程管理系统大揭秘

    原标题:Agile Shot:新一代影视制片流程管理系统大揭秘 制片管理是影视行业的一个重要概念.一个剧组小到几十人,大到成百上千人,涉及的工作领域非常宽泛,工作纷繁复杂,制片管理贯穿影视生产的全过程 ...

  5. 技术破局,业绩狂飙十倍:亿级电商平台重构大揭秘

    重构助力业务腾飞:一个年增长率10倍的亿级电商平台的技术故事 引言 这篇文章,将聊一聊:在业绩年增10倍的狂飙式增长下(年GMV5亿到年50亿),一个累计超过百亿GMV的电商平台,是如何进行从0到1的 ...

  6. kafka_消费者组消费进度监控实现

    对于 Kafka 消费者,最重要的就是监控它们的消费进度,或者说监控它们消费的滞后程度(消费者 Lag 或 Consumer Lag). 所谓滞后程度,就是指消费者当前落后于生产者的程度.Lag 的单 ...

  7. python就业方向及工资-【行情分享】python就业方向与薪资大揭秘

    原标题:[行情分享]python就业方向与薪资大揭秘 学python,我们要首先问自己,是为了转行?提升自己?还是什么,有了明确的目标,才会沉下心来学习.我学习python的目标是想要转行,可以跟大家 ...

  8. 深入理解RocketMQ:Consumer消费消息原理

    前言 RocketMQ版本:4.8 Consumer类型:DefaultMQPushConsumer 原理解析 consumer 启动时做了哪些事情? 定时从NameSrv获取最新的Topic+Que ...

  9. 技术系列课回顾 | 网易云信线上万人连麦技术大揭秘

    导读:本文根据网易云信资深音视频服务端开发工程师陈策在<MCtalk Live#5:网易云信线上万人连麦技术大揭秘>线上直播分享整理. 文|陈策 网易云信资深音视频服务端开发工程师 大家好 ...

  10. 【CDS技术揭秘系列 02】阿里云CDS-SLS大揭秘

    简介:CDS-SLS 作为云化的日志平台,将组件进行高内聚低耦合,线下用户最低可以在6台规模的机器上将上述所有的功能自动化部署,在运维.运营.财务管理.数据分析报表等大数据场景领域以低代码模式有效解决 ...

最新文章

  1. java web程序示例_想要建立一些有趣的东西吗? 这是示例Web应用程序创意的列表。...
  2. im和音视频开发哪个更好_如何阅读成为更好的开发者的方式
  3. cmd进入Oracle的sql*plus
  4. 机器学习的教训:5家公司分享的错误经验
  5. Hibernate映射关系
  6. CSU 1328: 近似回文词
  7. IOS中UITableView异步加载图片的实现
  8. C#判断点和直线的位置关系
  9. SMP、NUMA、MPP体系结构介绍
  10. linux percpu机制解析
  11. 建立自己的voc数据集_Mac上 制作自己的VOC数据集
  12. C4996 'GetVersionExW': 被声明为已否决 TTS_one f:\vs2015\speechsdk\include\sphel
  13. 【大牛分享】人机工程简史
  14. Facebook主页照片和封面照片的尺寸要求
  15. Linux下c语言的图形编程
  16. 创建一个整型变量toes,并将toes设置为10.
  17. innodb buffer pool管理--free list
  18. java优化代码常见套路
  19. 正点原子OLED显示实验
  20. MySQL图形界面创建数据库

热门文章

  1. BZOJ1938: [CROATIAN2010] ALADIN
  2. bootstrap源码学习:辅助(1)
  3. oracle中decode方法使用
  4. 网页布局02 盒子模型
  5. python scipy stats学习笔记
  6. 深入理解的JavaScript函数编程
  7. 17秋 SDN课程 第五次上机作业
  8. 浣溪沙·江畔芦花【两首】
  9. LightOJ - 1050 (唯一分解+推公式+乘法逆元)
  10. 前端面试题之手写事件模型及事件代理/委托