文章目录

  • 前言
  • 一、NameServer路由中心
  • 二、消息存储
    • 1.消息发送存储流程
    • 2.存储文件
  • 三、消息消费
    • 1、消息者启动流程
    • 2、消息拉取
      • 1、PullMessageService实现机制
      • 2、ProcessQueue实现机制
      • 3、消息拉取基本流程
        • 3.1客户端封装消息拉取请求
        • 3.2消息服务端Broker组装信息
        • 3.3消息拉取客户端处理消息
        • 3.4消息拉取长轮询机制分析
        • 3.5PullRequestHoldService
        • 3.6DefaultMessageStore$ReputMessageService详解
  • 总结

前言

疫情期间,学习研究了一下《RocketMQ技术内幕》这本书,记录一下自己的学习心得


一、NameServer路由中心

待续

二、消息存储

1.消息发送存储流程

代码如下(示例):

待续

2.存储文件

待续


三、消息消费

1、消息者启动流程

跟踪DefaultMQPushConsumer的启动流程,start()方法

public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}

DefaultMQPushConsumerImpl#start

1、构建主题订阅信息SubscriptionData并加入RebalanceImpl的订阅消息中。
2、初始化MQClientInstance、RebalanceImpl(消息重新负载实现类)等
3、初始化消息进度。集群模式进度保存在Broker上,广播模式季度存储在消费端。
4、根据是否顺序消费,创建消费端消费线程服务。ConsumeMessageService主要负责消息消费,内部维护一个线程池
5、向MQClientInstance注册消息消费者,并启动MQClientInstance,在一个JVM中的所有消费者、生产者持有同一个MQClientInstance、MQClientInstance只会启动一次

/*** 消费启动流程** @throws MQClientException*/public synchronized void start() throws MQClientException {switch (this.serviceState) {case CREATE_JUST:log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());this.serviceState = ServiceState.START_FAILED;this.checkConfig();// 1、构建主题订阅信息SubscriptionData并加入RebalanceImpl的订阅消息中。this.copySubscription();// 2、初始化MQClientInstance、RebalanceImpl(消息重新负载实现类)等if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();}this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);// 3、初始化消息进度。集群模式进度保存在Broker上,广播模式季度存储在消费端。if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}this.offsetStore.load();// 4、根据是否顺序消费,创建消费端消费线程服务。ConsumeMessageService主要负责消息消费,内部维护一个线程池if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}this.consumeMessageService.start();// 5、向MQClientInstance注册消息消费者,并启动MQClientInstance,// 在一个JVM中的所有消费者、生产者持有同一个MQClientInstance、MQClientInstance只会启动一次boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;this.consumeMessageService.shutdown();throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}mQClientFactory.start();log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PushConsumer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}this.updateTopicSubscribeInfoWhenSubscriptionChanged();this.mQClientFactory.checkClientInBroker();this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();this.mQClientFactory.rebalanceImmediately();}

2、消息拉取

1、PullMessageService实现机制

承接上文,当启动MQClientInstance时,触发MQClientInstance.start()方法,其中就触发了消息拉取Runnable任务–this.pullMessageService.start();

轮询从pullRequestQueue阻塞队列获取PullRequest,然后执行pullMessage(pullRequest)拉取消息,进行消费

PullRequest核心属性

问题一:PullRequest是什么时候添加的?

问题二:PullRequest是什么时候创建的?

1、RocketMQ根据PullRequest拉取任务执行完一次消息拉取任务后,又将PullRequest对象放入到pullRequestQueue
2、RebalanceImpl中创建,是PullRequest对象真正创建的地方!!

2、ProcessQueue实现机制

ProcessQueue是MessageQueue在消费端的重新、快照。消息服务器默认每次拉取32条消息,按消息的队列偏移量顺序存在在ProcessQueue中,PullMessageService然后将消息提交到消费者消费线程池,消息成功消费后从ProcessQueue中移除

ProcessQueue核心属性

3、消息拉取基本流程

3.1客户端封装消息拉取请求

DefaultMQPushConsumerImpl#pullMessage

消息拉取的基本流程:
1、消息拉取客户端–消息拉取请求封装
2、消息服务器-----查找并返回信息
3、消息拉取客户端–处理返回信息

1、如果当前消费者被挂起,则将拉取任务延迟1s再次放入到PullMessageService的拉取任务队列中【pullRequestQueue】,结束本次消息拉取

2、进行消息拉取流控

3、拉取主题订阅信息,如果为空,结束本次消息拉取,下次任务延迟3s

4、构建消息拉取系统标记【PullSysFlag】

PullSysFlag核心属性

5、调用PullAPIWrapper.pullKernelImpl方法后与服务端交互。

pullKernelImpl方法核心参数

netty发送请求到Broker消息服务端

消息拉取命令Code:RequestCode.PULL_MESSAGE

3.2消息服务端Broker组装信息

根据RequestCode.PULL_MESSAGE找到对应request请求处理类,PullMessageProcessor#processRequest

1、根据订阅信息,构建消息过滤器。
2、MessageStore.getMessage 查找消息
3、根据主题名称与队列编号获取消息消费队列
4、消息偏移量异常情况校对下一次拉取偏移量
5、如果待拉去偏移量offset大于minOffset并且小于maxOffset,从当前offset处尝试拉取32条信息。
6、根据getMessageResult填充responseHeader的nextBeginOffset、minOffset、maxOffset
7、根据主从同步延迟,如果从节点数据包含下一次拉取的偏移量,设置下一次拉取任务的brokerId
8、根据getMessageResult.getStatus()编码转换关系
9、如果commitlog标记可用并且当前节点为主节点,则更新消息消费进度





服务端消息拉取处理完毕,将返回结果到拉取消息调用方。在调用方,需要重点关注PULL_RETRY_IMMEDIATELY、PULL_OFFSET_MOVED、PULL_NOT_FOUND等情况下如何矫正拉取偏移量

3.3消息拉取客户端处理消息

NettyRemotingClient在收到服务端响应结构后会回调PullCallback的onSuccess或onException
PullCallback对象在DefaultMQPushConsumerImpl#pullMessage中创建

消息拉取客户端调用入口(处理服务端返回的response)
MQClientAPIImpl#pullMessageAsync

1、响应结果解码成PullResultExt对象

private PullResult processPullResponse(final RemotingCommand response) throws MQBrokerException, RemotingCommandException {PullStatus pullStatus = PullStatus.NO_NEW_MSG;switch (response.getCode()) {case ResponseCode.SUCCESS:pullStatus = PullStatus.FOUND;break;case ResponseCode.PULL_NOT_FOUND:pullStatus = PullStatus.NO_NEW_MSG;break;case ResponseCode.PULL_RETRY_IMMEDIATELY:pullStatus = PullStatus.NO_MATCHED_MSG;break;case ResponseCode.PULL_OFFSET_MOVED:pullStatus = PullStatus.OFFSET_ILLEGAL;break;default:throw new MQBrokerException(response.getCode(), response.getRemark());}PullMessageResponseHeader responseHeader =(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);// 响应结果解码成PullResultExt对象return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());}

2、调用pullAPIWrapper.processPullResult将消息字节数组解码成消息列表,填充PullResult的属性msgFoundList,并对消息进行消息过滤(TAG)模式

3、更新PullRequest的下一次拉取偏移量,如果msgFoundList为空,则立即将PullRequest放入到PullMessageService的pullRequestQueue,以便PullMessageService能及时环形并再次执行消息拉取

将拉取到的消息存入ProcessQueue 存入消费端本地缓存
然后将拉取到的消息提交到ConsumeMessageService中供消费者消费 线程池异步处理
如果pullInterval>0,则等待pullInterval好秒后将PullRequest放入到PullMessageService的pullRequestQueue中,该消息队列的下次拉取即将被激活,达到持续消息拉取,实现准实时拉取消息的效果

3.4消息拉取长轮询机制分析

rocketmq并没有真正的实现推模式,而是消费者主动向消息服务器拉取消息,推模式是循环向消息服务端发送消息拉取请求

如果消息为到达消费队列,如果不启用长轮询机制,服务端会等待shortPollingTimeMills时间后挂起再去判断消息是否到达消息队列,如果消息味道大则提示消息拉取客户端PULL_NOT_FOUND消息不存在

如果开启长轮询模式,会每5s轮询检查一次消息是否到达,同时一有新消息到达立马通知挂起线程再次验证新消息是否是自己感兴趣的消息,如果是则从commitlog文件提取消息返回给客户端,否则直到挂起超时,超时时间PUSH模式默认15s

消息拉取时服务端从commitlog未找到消息时的逻辑处理如下
PullMessageProcessor#processRequest

从commitLog没有获取到数据,触发轮询机制,由两个线程共同完成
1、PullRequestHoldService:每隔5s重试一次
2、DefaultMessageStore#ReputMessageService,每处理一次重新拉取,Thread.sleep(1)

3.5PullRequestHoldService

PullRequestHoldService#run

 @Overridepublic void run() {log.info("{} service started", this.getServiceName());while (!this.isStopped()) {try {// 如果开启长轮询,每5s一次,判断新消息是否到达if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {this.waitForRunning(5 * 1000);} else {// 未开启长轮询,则默认等待1s再次尝试this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());}long beginLockTimestamp = this.systemClock.now();this.checkHoldRequest();long costTime = this.systemClock.now() - beginLockTimestamp;if (costTime > 5 * 1000) {log.info("[NOTIFYME] check hold request cost {} ms.", costTime);}} catch (Throwable e) {log.warn(this.getServiceName() + " service has exception. ", e);}}log.info("{} service end", this.getServiceName());}

总结:如果开启了长轮询机制,PullRequestHoldService线程会每隔5s被唤醒去尝试检测是否有新消息的到来,直到超时,如果被挂起,需要等待5s,消息拉取实时性比较差,

为了避免这种情况,引入另外一种机制,当消息到达时唤醒挂起线程,触发一次检查

3.6DefaultMessageStore$ReputMessageService详解

ReputMessageService线程主要是根据commitlog将消息转发到consumeQueue,Index等文件。现关注doReput方法关于长轮询相关实现。

总结

待续

rocketmq源码分析相关推荐

  1. 《RocketMQ源码分析》NameServer如何处理Broker的连接

    <RocketMQ源码分析>NameServer如何处理Broker的连接 NameServer 介绍 NameServer 功能 动态路由发现和注册 服务剔除 创建NameServerC ...

  2. RocketMQ源码分析之延迟消息

    文章目录 前言 一.延迟消息 1.特点 2.使用场景 3.demo 二.发送延迟消息 三.broker端存储延迟消息 四.总结 1.延迟消息工作原理 2.延迟消息在消费者消费重试中的应用 前言 本篇文 ...

  3. RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)

    在阅读本文前,若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群 根据上文的描述,发送事务消息的入口为: TransactionMQProducer#sendMessageInTra ...

  4. RocketMQ源码分析之request-reply特性

    1.什么是request-reply?   RocketMQ4.6.0版本中增加了request-reply新特性,该特性允许producer在发送消息后同步或者异步等待consumer消费完消息并返 ...

  5. rocketmq源码分析 -生产者

    概念 生产者producer,用于生产消息,在rocketmq中对应着MQProducer接口. 组件 Producer 消息生产者.在rocketmq中,生产者对应MQProducer接口: pub ...

  6. RocketMQ源码分析(十二)之CommitLog同步与异步刷盘

    文章目录 版本 简介 FlushCommitLogService 同步刷盘 GroupCommitService 异步刷盘 CommitRealTimeService FlushRealTimeSer ...

  7. 【RocketMQ|源码分析】namesrv启动停止过程都做了什么

    简介 namesrv在是RocketMQ中一个十分重要的组件,它相当于是Kafka中的zookeeper,Spring Cloud Alibaba中的nacos.它的主要作用是为消息生产者和消息消费者 ...

  8. RocketMQ 源码分析 —— 集成 Spring Boot

    点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...

  9. mq消费者组_「架构师MQ进阶」RocketMQ源码分析(四)- 源代码包结构分析

    在前面第一篇中已经将源代码下载到本地了,本篇主要是介绍代码中相关模块到作用.036.Rocket-MQ-Source-code-cover.png 一.源码结构 RocketMQ源码组织方式基于Mav ...

  10. RocketMQ源码分析之从官方示例窥探:RocketMQ事务消息实现基本思想

    RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理.首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中. 官方版本未发布之前,从apache ...

最新文章

  1. 【OpenCV 4开发详解】均值滤波
  2. Ubuntu Linux配置Nginx+MySQL+PHP+phpMyAdmin详细步骤
  3. 通过句柄找到广告弹窗进程
  4. redis(17)--集群
  5. Android 模拟器调试的缺点
  6. YzmCMS轻量级开源CMS v6.2
  7. ffmpeg 封面提取
  8. python现有两个磁盘文件a和b_有两个磁盘文件a和b,各存放一行字母,今要求把这两个文件中的信息合并...
  9. Pandas入门教程(五)
  10. Linux系统安装完成后创建交换空间
  11. 开关电源matlab仿真文件,基于PI控制方式的7A开关电源的MATLAB仿真.doc
  12. 华为价值评价体系的四个基本假设
  13. Python基础-名片管理
  14. Docker升级Wekan
  15. 15个富有创意的单页设计
  16. 圣地亚哥大学计算机科学专业,加州大学圣地亚哥分校计算机科学与工程系
  17. 光电耦合器原理及应用介绍
  18. 《把时间当朋友》 第六章交流 读书笔记
  19. 速轩三维 - 白光/蓝光/拍照式三维扫描仪
  20. 2018年刑侦科推理试题php版,2018年刑侦科目推理试题完整版分享

热门文章

  1. git上clone别人的项目、提交代码以及更新最新代码
  2. 再谈web.config/app.config敏感数据加/解密的二种方法
  3. SSE 加速运算例子详解:乘法、加法、平方、最小值、最大值、与操作
  4. 全国省市车牌二级联动json数据
  5. 什么是独立构件架构风格
  6. 2018北邮网研院机试真题
  7. ACM SIGKDD 2020 Conference会议资料整理
  8. 如何恢复音乐文件数据呢
  9. c语言整型数据类型及存储形式详解
  10. 迁移学习后续——中草药分类(inception-v3)