rocketmq中consumer设计与实现
0、结构图
1、类层次图
2、Consumer消息队列分配
消息队列分配使用模板方法模式,在ReblanceImpl中定义处理框架,对于变动部分提炼出抽象方法,交给子类来实现
是通过RebalanceImpl#doRebalance来重新分配消息队列的。首先根据当前订阅数据来根据主题重新分配,在集群模式下,根据主题订阅信息表的主题获取对应的消息队列信息集,然后根据主题路由表的主题获取主题路由数据,随机从broker地址中随机选取一个,向broker发送GET_CONSUMER_LIST_BY_GROUP命令获取消费者列表,接着根据分配消息队列策略重新分配消息队列。同时更新处理队列。
3、consumer消息拉取
DefaultPushMQConsumer消息拉取主要是通过PullMessageService线程来拉取的
4、消息消费
类结构
ConsumeMessageConcurrentlySevice消费
(1)检查processQueue是否dropped,如果为true,停止消费
(2)重置重试主题defaultMQPushConsumerImpl.resetRetryAndNamespace。这是因为消息重试机制决定 的。如果发现消息的延时级别delayTimeLevel大于0,会首先将重试主题存入在消息的属性中,然后设置主题名称为SCHEDULE_TOPIC,以便时间到后重装参考消息消费。
(3)执行消息消费钩子函数defaultMQPushConsumerImpl.executeHookBefore
(4)执行具体的消息消费listener.consumeMessage
(5)执行消息消费钩子函数defaultMQPushConsumerImpl.executeHookAfter
(6)执行业务消息消费后,在处理结果前再次验证dropped,如果为true,不对结果处理
(7)执行消息处理结果this.processConsumeResult。计算ackIndex,如果返回consume_success,ackIndex设置为msgs.size()-1,如果返回reconsume_later,ackIndex=-1,为发送msg back(ack)消息作准备。从processQueue中移除这批消息,更新消息消费进度。
时序图
当消费失败时,会向broker同步发送CONSUMER_SEND_MSG_BACK类型的消息,其请求头为ConsumerSendMsgBackRequestHeader
ConsumeMessageOrderlyService消费
在分配到新的消息队列时,首先需要尝试向Broker发起锁定该消息队列的请求,如果返回加锁成功则创建该消息队列的拉取任务,否则将跳过,等待其他消费者释放该消息队列的锁,然后在下一次队列重新负载时再尝试加锁。
如果消息处理队列没有被锁定,则延迟3s后再将PullReques对象放入到拉取任务中,如果该处理队列是第一次拉取任务时,则首先计算拉取偏移量,然后向消息服务端拉取消息。
if (processQueue.isLocked()) {if (!pullRequest.isLockedFirst()) {final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());boolean brokerBusy = offset < pullRequest.getNextOffset();log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",pullRequest, offset, brokerBusy);if (brokerBusy) {log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",pullRequest, offset);}pullRequest.setLockedFirst(true);pullRequest.setNextOffset(offset);}} else {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.info("pull message later because not locked in broker, {}", pullRequest);return;}
服务启动时,如果消费模式为集群模式,默认每隔20s执行一次锁定分配给自己的消息消费队列。通过-Drocketmq.client.reblance.lockInterval=20000设置间隔,该值建议与一次消息负载频率设置相同。集群模式下顺序消息消费在创建拉取任务时并未将ProcessQueue的locked状态设置为true,在未锁定消息队列之前无法执行消息拉取任务,ConsumeMessageOrderlyService以每20s的频率对分配给自己消息队列进行自动加锁,从而消费加锁成功的消息消费队列。
//ConsumeMessageOrderlyService.lockAll
Set<MessageQueue> lockOKMQSet =this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);for (MessageQueue mq : lockOKMQSet) {ProcessQueue processQueue = this.processQueueTable.get(mq);if (processQueue != null) {if (!processQueue.isLocked()) {log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);}processQueue.setLocked(true);processQueue.setLastLockTimestamp(System.currentTimeMillis());}}
6、消费者启动
启动逻辑主要是在DefaultMQPushConsumerImpl中的start()中
- 在集群模式下,会创建订阅重试主题的订阅数据(copySubscription),根据消费组创建重试主题 。构建主题订阅信息SubscriptionData并加入到RebalanceImpl的订阅消息中(subScriptionInner)。订阅关系主要有两个:通过调用DefaultMQPushConsumerImpl#subscribe(String topic, String subExpression)方法,另外一个是订阅重试主题消息。消息重试是以消费组为单位,而不是主题,消息重试主题名为%RETRY%+消费组名。消费者在启动的时候会自动订阅该主题 ,参与该主题的消息队列负载。
- 集群模式下,修改实例名为进程id
- 创建MQClientInstance
设置RebalanceImpl,包含消费组,消息模式,分配消息队列策略以及mqClient实例
- 创建PullAPIWrapper,设置FilterMessageHook
初始化消息进度OffsetStore,广播模式下使用LocalFileOffsetStore,消息消费进度保存在consumer端,集群模式下使用RemoteBrokerOffsetStore,消息消费进度保存在broker端
根据消息监听器类型创建消费端消费消息线程服务,ConsumeMessageService主要负责消息消费,内部维护一个线程池。
向MQClientInstance注册消费者,并启动MQClientInstance
- updateTopicSubscribeInfoWhenSubscriptionChanged更新topic的订阅消息。根据RebalanceImpl中的subscriptionInner调用MQClientInstance#updateTopicRouteInfoFromNameServer从NameServer中获取TopicRouteData(主要是发命令GET_ROUTEINFO_BY_TOPIC),更新RebalanceImpl中的topicSubscribeInfoTable和MQClientInstance中的topicRouteTable和brokerAddrTable
- 检查消费客户端,主要是通过MQClientInstance#checkClientInBroker,根据订阅数据中的topic,在主题路由表中找到主题跟踪数据,随机选择一个broker地址,向broker中发送命令CHECK_CLIENT_CONFIG
- 发送心跳给所有的Broker,同时发送命令REGISTER_MESSAGE_FILTER_CLASS。准备心跳数据,向broker发送命令HEART_BEAT
- 唤醒RebalanceService线程
6.1 MQClientInstance启动
是通过start()来启动,当中包含
- MQClientAPIImpl的启动,主要是NettyClient启动,注册ClientRemotingProcessor。
- 启动定时任务,包含获取NameServer地址(初始延迟为10s,周期为2分钟),从NameServer中获取主题路由数据并且更新(初始延迟为10ms,周期为30s),发送心跳数据给所有的Broker(初始延迟为1s,周期为30s),持久化所有的消费偏移(初始延迟为10s,周期为5s),修改线程池参数(初始延迟为1m,周期为1m)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();} catch (Exception e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.updateTopicRouteInfoFromNameServer();} catch (Exception e) {log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);}}}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.cleanOfflineBroker();MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();} catch (Exception e) {log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);}}}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.persistAllConsumerOffset();} catch (Exception e) {log.error("ScheduledTask persistAllConsumerOffset exception", e);}}}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.adjustThreadPool();} catch (Exception e) {log.error("ScheduledTask adjustThreadPool exception", e);}}}, 1, 1, TimeUnit.MINUTES);
- PullMessageService线程启动
- RebalanceService线程启动
rocketmq中consumer设计与实现相关推荐
- rocketmq中producer设计与实现
1.类层次结构 2.producer的启动 首先设置组,及NameServer,然后调用start启动.启动关键逻辑主要在MQClientInstance中. (1)启动NettyRemotingCl ...
- 详解RocketMQ中的consumer
RocketMQ的Consumer创建启动流程 (1) Push和Pull这两种方式,本质都是Pull实现的,所以DefaultMQPullConsumerImpl的start方法启动 (2) 配置通 ...
- 消息中间件系列(九):详解RocketMQ的架构设计、关键特性、与应用场景
内容大纲: RocketMQ的简介与演进 RocketMQ的架构设计 RocketMQ的关键特性 RocketMQ的应用场景 RocketMQ的简介 RocketMQ一个纯java.分布式.队列模型的 ...
- 深入理解RocketMQ中的NameServer
本文来说下RocketMQ中的NameServer 文章目录 NameServer介绍 NameServer的作用 为什么要使用NameServer NameServer如何保证数据的最终一致 路由注 ...
- RocketMQ的架构设计详解
本文来说下RocketMQ的架构设计 文章目录 RocketMQ的简介 RocketMQ的架构设计 RocketMQ的核心组件 RocketMQ的消息领域模型 RocketMQ的关键特性 消息的顺序 ...
- 消息中间件学习总结(5)——RocketMQ之Apache RocketMQ背后的设计思路与最佳实践
摘要:为了更好地让开发者们更加深入了解阿里开源,阿里云云栖社区在3月1号了举办"阿里开源项目最佳实践"在线技术峰会,直播讲述了当前阿里新兴和经典开源项目实战经验以及背后的开发思路. ...
- RocketMQ 中Topic、Tag、GroupName基本概念介绍
本文主要介绍RocketMQ中Topic.Tag.GroupName的概念.设计初衷以及使用方法. 一.Topic 首先看看官方的定义: Topic是生产者在发送消息和消费者在拉取消息的类别.Topi ...
- Consumer设计-high/low Level Consumer
1 Producer和Consumer的数据推送拉取方式 Producer Producer通过主动Push的方式将消息发布到Broker n Consumer Consumer通过Pull从Br ...
- RocketMQ:Consumer概述及启动流程与消息拉取源码分析
文章目录 Consumer 概述 消费者核心类 消费者启动流程 消息拉取 PullMessageService实现机制 ProcessQueue实现机制 消息拉取基本流程 客户端发起消息拉取请求 消息 ...
最新文章
- rbac 一个用户对应多个账号_电商后台系统:管理后台之账号管理(一)
- 对于有A[0…N]有序的数组,判断是否存在A[i]=I 如果存在的话返回Index,
- linux设置关闭省电模式
- python一球从100米高度自由落下,一球从100米高度自由落下,每次落地后反跳回原高度的一半;再落下,求它在 第10次落地时,......
- 视觉研究的前世今生(上)王天珍(武汉理工大学)
- php 获取京东交易账号,PHP爬虫爬取京东列表
- 【计算机组成原理】控制器
- 一道说难不难的js题目
- 外媒:伊朗政府封锁加密通讯应用Signal
- 模糊测试——强制发掘安全漏洞的利器(Jolt 大奖精选丛书)
- 卡特彼勒CAT SIS 售后服务系统3D零件图系统软件 2019年最新版
- 学习c语言,踏上新征程
- 流程图 FlowChart
- 基于ITIL的医院信息化服务管理实践(客户说)
- Centos内核升级-Yum方式_1
- 使用批处理恢复被病毒隐藏的文件和目录
- Manifest merger failed
- 那些年啊,那些事——一个程序员的奋斗史 ——127
- Oracle存储架构
- 有哪些互联网运营方面的书值得推荐?
热门文章
- [UWP小白日记-14]正则表达式
- ADO.NET 快速入门(一):ADO.NET 概述
- 华为面试题之大整数相加
- c#操作xml实例 2009-03-13 20:00
- 自学python需要买书吗-我的孩子需要学习Python吗?几岁开始学?有什么书籍推荐?...
- python装饰器实例-Python函数装饰器--实例讲解
- python程序员招聘信息-IT行业程序员招聘分析
- python有相关的证书可以考吗-python的证书
- python爬虫入门代码-如何开始写你的第一个爬虫脚本——简单爬虫入门!
- python散点图拟合曲线-【python常用图件绘制#01】线性拟合结果图