大家好,我是君哥。

今天来分享 RocketMQ 中一个关键的知识点,消费者的启动过程。

多数消息队列中,消费者和 Broker 通信的方式有两种,PUSH 模式和 PULL 模式:

  • PUSH 模式:Broker 主动把消息推送给订阅的消费者;

  • PULL模式:消费者主动从 Broker 拉取消息。

注意,RocketMQ 并没有真正实现 PUSH 模式, RocketMQ 中的 PUSH 模式,本质上也是 PULL 模式,只是消费端封装了轮询过程,相当于开启一个定时线程不停地从 Broker 拉取消息,拉取到消息后唤醒本地业务线程来处理。本文讲解 PULL 模式的启动过程。涉及到到的启动过程如下图:

首先看下面这张图:

图中可以看出,消费者需要注册到 Name Server,拉取消息的时候可以从 Broker 主节点拉取,也可以从 Broker 从节点拉取。

在 RocketMQ 的源码中,拉模式有两个消费者相关的类,其中 DefaultMQPullCons umer 类已经被废弃,官方推荐使用 Defau ltLitePullConsumer 类。下面代码来自官方示例:

public static void main(String[] args) throws Exception {DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);litePullConsumer.subscribe("TopicTest", "*");//启动方法litePullConsumer.start();try {while (running) {//这里可以看到,PULL 模式下消费者需要业务代码主动去拉取消息List<MessageExt> messageExts = litePullConsumer.poll();System.out.printf("%s%n", messageExts);}} finally {litePullConsumer.shutdown();}
}

上面代码中消费者属于消费组 lite_pull _consumer_test,订阅了【TopicTest 】这个 Topic 下的所有 tag。下面一起看一下启动方法。下图是消费者启动过程中类调用关系图,图中心的 pullRequestQueu e 是核心,pull 请求会先发送到这个队列,然后循环地拉取处理。

检查启动配置

消费者启动时首先会检查配置,检查的配置项如下:

  • 消费组名称是否合法。包括校验项包括【非空】、【长度小于等于255】、符合正则表达式【^[%|a-zA-Z0-9_-]+$】、【不等于 “DEFAULT_CO NSUMER”】;

  • 消息模式不能是空,包括集群和广播两种模式;

  • MessageQueue 负载策略不能是空,包括:平均分配策略、循环分配策略、自定义分配策略、按照机房平均分配策略、按照机房就近分配策略、一致性 HASH 策略;

  • 长轮询模式下,消费者连接挂起时间不小于长轮询模式下 Broker 挂起时间,Broker 挂起时间默认 20s,官方不建议修改。

这部分源代码见 DefaultLitePullConsum erImpl#checkConfig。

修改消费者实例名称

如果是集群模式,实例名称改为【进程 ID + “ #” + 系统时间(纳秒 )】,代码如下:

//ClientConfig类
public void changeInstanceNameToPID() {if (this.instanceName.equals("DEFAULT")) {this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();}
}

初始化 MQ 客户端

创建一个 MQClientInstance 实例,然后把消费者注册到 MQClientInstance。

private void initMQClientFactory() throws MQClientException {this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The consumer group[" + this.defaultLitePullConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}
}

初始化负载均衡器

对  RebalanceLitePullImpl 实例初始化,给下面的参数赋值:

  • 消费者名称;

  • 消息模型;

  • MessageQueue  负载均衡策略;

  • MQ 客户端,上节中初始化的 MQClientInstance 实例。

负载均衡线程启动后,默认每 20s 做一次负载均衡,见如下代码:

//RebalanceService 类
public void run() {while (!this.isStopped()) {//waitInterval 默认 20s,可以配置this.waitForRunning(waitInterval);this.mqClientFactory.doRebalance();}
}

初始化 Wrapper

PullAPIWrapper 这个 Wrapper 类是 MQ-ClientInstance 类的 Wrapper 类,类中 pullKernelImpl 方法对  MQClientInstance 类中的 pullMessage 方法进行了装饰,这个装饰类主要增加了下面功能:

  1. 获取 Broker 地址;

  2. 检查 RocketMQ 版本;

  3. 如果 Broker 是从节点,把 sysFlag 标记偏移量的位改为 0,(偏移量 0x1);

  4. 封装请求 header;

  5. 获取 filterServer 地址(如果消费者是通过 filterServer 从 Broker 拉取消息,这里随机获取一个 filterServer  地址)。

代码如下 :

//PullAPIWrapper
public PullResult pullKernelImpl(//省略所有参数
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//1.获取 Broker 地址FindBrokerResult findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);//省略从 Name sever 更新本地 Broker 缓存逻辑if (findBrokerResult != null) {{//2.检查 RocketMQ 版本if (!ExpressionType.isTagType(expressionType)&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {throw new MQClientException("The broker[" + mq.getBrokerName() + ", "+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);}}int sysFlagInner = sysFlag;if (findBrokerResult.isSlave()) {//3.把偏移量的位改为 0,(偏移量 0x1)sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);}//4.封装请求 headerPullMessageRequestHeader  = new PullMessageRequestHeader();//省略封装 requestHeaderString brokerAddr = findBrokerResult.getBrokerAddr();if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {//5.获取 filterServer 地址brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);}PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);return pullResult;}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

初始化 offset 存储器

offset 存储器的 UML 类图如下:

有两个实现类分别对应集群模式和广播模式,本文讨论的集群模式的实现类是 RemoteBrokerOffsetStore。offset 可以存储在本地或者远端服务器。

启动 MQ 客户端

启动 MQ 客户端主要包括如下步骤:

  1. 把 serviceState 改为 START_FAIL ED;

  2. 初始化 Netty channel;

  3. 启动定时任务,包括定时获取 Name Server 地址、从 Name Server 更新 Topic 路由信息、清理过期的 Broker、向 Broker 发送心跳、持久化 offset、定时调整线程池的数量(源码里面这个并没有实现逻辑);

  4. 启动拉取消息的线程,拉取线程的逻辑是从请求队列中不停地取出 pull 请求,然后将请求发送到 Broker 进行拉取消息,代码如下:

//PullMessageService类
public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {PullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");
}

从下面的代码可以看出,PULL 拉取消息最终使用了 DefaultMQPushConsumer Impl,所以 PULL 模式和 PUSH 模式拉取消息的逻辑是一样的。

private void pullMessage(final PullRequest pullRequest) {final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}
}

5.启动 MessageQueue 负载均衡线程;

6.启动生产者线程;

7.把 serviceState 改为 Running。

源码参考 MQClientInstance#start。

启动定时任务

这个定时任务默认每 30s 执行一次,用于监听每个 Topic 下的 MessageQueue 是否发生变化。代码见 startScheduleTask 方法。

启动轨迹消息

轨迹消息主要用于跟踪消息发送、消息消费的轨迹,用于记录详细日志。代码如下:

//AsyncTraceDispatcher 类
public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {if (isStarted.compareAndSet(false, true)) {traceProducer.setNamesrvAddr(nameSrvAddr);traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);traceProducer.start();}this.accessChannel = accessChannel;this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);this.worker.setDaemon(true);this.worker.start();this.registerShutDownHook();
}

这里不详细展开了,后面再单独讨论。

总结

本文通过源码分析讲解了 RocketMQ 中 PULL 模式下的消费者启动过程,在生产上使用比较多的还是 PUSH 模式,PULL 模式拉取消息的方法跟 PUSH 模式一样,不同的是 PULL 模式需要应用程序进行拉取动作,可以通过 PULL 模式的学习更容易的理解 PUSH 模式。最后,分析一个 PULL 模式启动过程涉及的 UML 类图:

5 张图带你理解 RocketMQ 消费者启动过程相关推荐

  1. 8张图带你理解Java整个只是网络(转载)

    8张图带你理解Java整个只是网络 一图胜千言,下面图解均来自Program Creek 网站的Java教程,目前它们拥有最多的票选.如果图解没有阐明问题,那么你可以借助它的标题来一窥究竟. 1.字符 ...

  2. linux内核通俗理解,简洁明了!高手带你理解ARM-Linux的启动过程

    原标题:简洁明了!高手带你理解ARM-Linux的启动过程 1. kernel运行的史前时期和内存布局 在arm平台下,zImage.bin压缩镜像是由bootloader加载到物理内存,然后跳到zI ...

  3. 深入理解iOS APP启动过程

    前言 启动时间是衡量应用品质的重要指标. 本文首先会从原理上出发,讲解iOS系统是如何启动App的,然后从main函数之前和main函数之后两个角度去分析如何优化启动时间. 准备知识 Mach-O 哪 ...

  4. 安卓startActivity:彻底理解startActivity的启动过程这一篇就够了

    基于Android 6.0的源码剖析, 分析android Activity启动流程,相关源码: frameworks/base/services/core/java/com/android/serv ...

  5. 36 张图带你理解,计算机网络 6 大基础知识点

    一.计算机网络概述 1.1 计算机网络的分类 按照网络的作用范围:广域网(WAN).城域网(MAN).局域网(LAN): 按照网络使用者:公用网络.专用网络. 1.2 计算机网络的层次结构 TCP/I ...

  6. [PMP]一张图带你了解项目管理49个过程(PMBOK第六版-附实例)

  7. 一张图带你了解项目管理49个过程(PMBOK第六版-附实例)

    转载仅供本人存档及后续研究使用,请尊重原创. 作者:杨波平,圣略咨询首席PMP讲师,美国 AACTP 国际注册培训师.项目管理专业讲师.微软 MCSE.

  8. Activity启动过程剖析

    Activity启动过程剖析 你同样可以在Github上看到这篇文章:https://github.com/onlynight/ActivityStartPrinciple 写在前面 在看这篇文章之前 ...

  9. linux启动和服务管理(6)systemd启动过程

    Linux系统的启动方式有点复杂,而且总是有需要优化的地方.传统的Linux系统启动过程主要由著名的init进程(也被称为SysV init启动系统)处理,而基于init的启动系统被认为有效率不足的问 ...

最新文章

  1. 困扰爱因斯坦的「幽灵般的超距作用」,是如何被贝尔定理证明确实存在的?...
  2. 怎么做95置信区间图_这种动态的OD图怎么做?简单3步快速搞定
  3. Sphinx 2.2.3 安装和配置,英文数字中文搜索
  4. 更改VS.NET 默认SCM Provider的方法
  5. 至强服务器性能排行,英特尔至强处理器排名天梯 至强cpu天梯2020排名
  6. 海归首选“北上广” 薪资期望不太高 元芳你怎么看?
  7. 有道云笔记同步IT笔试面试资源
  8. 【12月原创】RT-thread - 柿饼UI —— 网络流媒体播放器
  9. 2014年IT互联网行业薪酬待遇
  10. EXCEL作曲线图,如何转成高质量的图片
  11. twitter账户受限_如何为您的企业设置Twitter帐户
  12. 微信推文转发服务器,微信文章如何转载?(看好这4个方法,可转发任何内容)...
  13. 低分辨率人脸识别(LRFR)相关文章整理——(待更)
  14. muduo源码分析2——Singleton分析
  15. wincc 激活记录运行系统
  16. 华为 荣耀20 Andorid10 图片保存到相册 图片不刷新问题
  17. oracle计算两个日期相隔月数,计算日期相差
  18. python读取word图片_python 如何提取 word 内的图片
  19. 25、使用Baidu的paddle自动进行验证码的识别、并计算验证码的数值
  20. 对接IronSource广告(视频)

热门文章

  1. java登录ssh没有权限管理_ssh2--login 是Java的权限管理系统,有完整的程序及代码,很全面的概括了 的功能。 Develop 238万源代码下载- www.pudn.com...
  2. 写作分享|向SCI期刊投稿时使用什么邮箱比较好?
  3. 英伟达(NVIDIA)显卡、驱动版本与cuda版本对应关系
  4. vb.net 教程 3-2 窗体编程之窗体 5
  5. 树上距离之和 1060E
  6. 2016 多校4 1002 After a Sleepless Night 树上贪心
  7. 基于STM32F103C8T6(HAL库)的HC-SR501红外人体传感及HC-SR04超声波测距
  8. 基于springdata JPA的dao层接口实现
  9. 工作流:企业OA办公系统的核心
  10. 苹果wifi测试中文软件,苹果测试排除WiFi网络故障工具 WiFi Check 2.1.2 Mac OS X