本文基于RocketMQ 4.7.1版本

本文将从整体上分析broker如何启动。

文章目录

  • 一、broker启动
    • 1、createBrokerController()
    • 2、start()

一、broker启动

broker的启动类是BrokerStartup,其main方法如下:

 public static void main(String[] args) {start(createBrokerController(args));}

createBrokerController()用于创建BrokerController控制器,start()用于启动BrokerController。
BrokerController是broker启动的核心类,它会创建一些处理器和管理器,并且持有四大配置对象。它创建的管理器有生产者管理器、消费者管理器、broker状态管理器、主题配置管理器等,这些管理器在以后使用到的时候在介绍。
下面分别来看这两个方法的逻辑。

1、createBrokerController()

下图是createBrokerController方法的处理流程:

下面是对上图内容的一些解析。
broker对外提供服务的端口号不是可配置的,是代码中写死的,必须是10911。
因为broker的配置非常多,所以rocketmq将这些配置做了一下归类,分散到四个不同的配置类中:BrokerConfig属于broker的基本配置,NettyServerConfig和NettyClientConfig用于设置网络相关的配置,比如对外发布服务的端口号就是在NettyServerConfig中设置,MessageStoreConfig用于设置存储相关的配置,比如消息存储路径、日志路径。
如果当前的broker是master,那么其broker id必须是0。
rocketmq在目录/store/config目录下维护了几个非常重要的json文件:

  1. 主题配置文件是在个人目录下的/store/config目录中,文件名为topics.json,该文件中保存主题名、读队列个数、写队列个数、是否只读。文件解析完后,将数据存储到主题配置管理器TopicConfigManager的topicConfigTable属性中。
  2. 消费者位移文件(consumerOffset.json)记录主题在每个消费组下每个读队列的消费位移,比如:

    上图显示了消费组consumer-A消费了主题topicTest11的四个读队列的消息,value里面显示了四个读队列的位移。该文件由ConsumerOffsetManager处理,解析后的数据保存在该对象的offsetTable属性中。
  3. 订阅组文件(subscriptionGroup.json),也叫作消费组文件,里面保存了消费组的配置信息,比如消费组名、消费组是否可以消费消息、重试队列个数、最大重试次数等。该文件由SubscriptionGroupManager处理,解析后数据保存在该对象的subscriptionGroupTable属性中。
  4. 消费者过滤器配置文件(ConsumerFilterManager),记录了每个主题下消费组配置的消息过滤条件,解析后数据保存在ConsumerFilterManager的filterDataByTopic属性中。

消息存储器插件可以在配置文件里面由参数messageStorePlugIn设置,值使用全限定类名,多个插件之间使用“,”分隔,插件类需要继承抽闲类AbstractPluginMessageStore,该类实现了MessageStore接口,DefaultMessageStore也实现了MessageStore接口,当存储消息时,rocketmq将多个插件按照配置依次调用,最后调用DefaultMessageStore将消息存储到文件,因此插件提供了一种在消息存储前后修改数据的功能。
上图中的数据恢复并不是系统宕机后的恢复,这里的恢复是指读取文件来恢复内存数据,使内存数据尽可能恢复到停机前的现场,比如在停机前内存会记录消息文件的下一个写入位置,重启后这个数据就会丢失,那么需要重新读取消息文件,找到消息最后的写入位置,在读取期间,还会处理消息索引,处理读队列的数据。

2、start()

本方法主要是调用BrokerController.start()方法来启动控制器,BrokerController.start()又会启动一系列的组件,下面来看一下BrokerController.start()的代码:

 public void start() throws Exception {if (this.messageStore != null) {//启动DefaultMessageStore,该类会对/store/lock文件加锁,//确保在broker运行期间只有一个broker实例操作/store目录this.messageStore.start();}//启动Netty监听10911端口,可以对外提供服务if (this.remotingServer != null) {this.remotingServer.start();}if (this.fastRemotingServer != null) {//监听10909端口,作用未知this.fastRemotingServer.start();}if (this.fileWatchService != null) {//fileWatchService与TLS有关,本文暂不对TLS解析this.fileWatchService.start();}if (this.brokerOuterAPI != null) {//启动客户端Netty,broke使用该对象对外发送数据,比如向nameserver注册主题信息this.brokerOuterAPI.start();}if (this.pullRequestHoldService != null) {//作用未知this.pullRequestHoldService.start();}if (this.clientHousekeepingService != null) {//作用未知this.clientHousekeepingService.start();}if (this.filterServerManager != null) {//作用未知this.filterServerManager.start();}if (!messageStoreConfig.isEnableDLegerCommitLog()) {//处理HAstartProcessorByHa(messageStoreConfig.getBrokerRole());//启动定时任务,定时与slave机器同步数据,同步的内容包括配置,消费位移等handleSlaveSynchronize(messageStoreConfig.getBrokerRole());//向所有的nameserver发送本机所有的主题数据,//包括主题名、读队列个数、写队列个数、队列权限、是否有序等this.registerBrokerAll(true, false, true);}this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {//定时任务,每过一段时间向nameserver注册一次主题信息@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager != null) {this.brokerStatsManager.start();//空实现}//启动定时任务,用于对收到的请求做流控,//如果发现broker收到的请求在指定的时间内无法处理完成,默认是5s,//那么会向请求方返回错误信息,告知broker正忙,请稍后重试//涉及到的请求有:生产者发送的消息、消费者拉去消息的请求、心跳请求、事务结束请求if (this.brokerFastFailure != null) {this.brokerFastFailure.start();}}

BrokerController.start()方法主要是启动一些组件,其主要作用是:

  1. 监听10911端口,接收生产者和消费者请求;
  2. 向nameserver发送本机所有的主题数据,并启动定时任务。

RocketMQ-broker启动流程详解相关推荐

  1. U-Boot启动流程详解

    参考:U-Boot顶层目录链接脚本文件(u-boot.lds)介绍 作者:一只青木呀 发布时间: 2020-10-23 13:52:23 网址:https://blog.csdn.net/weixin ...

  2. 【Autosar 启动流程详解】

    Autosar 启动流程详解 1. vLinkGen_Template.lsl 2. BrsHwStartup.c 3.BrsMainStartup.c 4.BrsMain.c 链接文件: 1. vL ...

  3. 【正点原子Linux连载】第三十二章 U-Boot启动流程详解 -摘自【正点原子】I.MX6U嵌入式Linux驱动开发指南V1.0

    1)实验平台:正点原子阿尔法Linux开发板 2)平台购买地址:https://item.taobao.com/item.htm?id=603672744434 2)全套实验源码+手册+视频下载地址: ...

  4. golang程序启动流程详解

    golang程序启动流程详解 环境 go1.16.5 linux/amd64 用例 package mainimport "fmt"func main() {fmt.Println ...

  5. android zygote启动流程,Android zygote启动流程详解

    对zygote的理解 在Android系统中,zygote是一个native进程,是所有应用进程的父进程.而zygote则是Linux系统用户空间的第一个进程--init进程,通过fork的方式创建并 ...

  6. 【线上沙龙直播报名】App 启动流程详解及其优化

    点击上方"公众号"可以订阅哦 [美团点评技术沙龙Online]是美团点评技术团队推出的线上分享课程,每月2-3期,采用目前最火热的线上直播形式,邀请美团点评技术专家,面向互联网技术 ...

  7. Springboot启动流程详解

    SpringMVC请求流程详解 SpringMVC框架是一个基于请求驱动的Web框架,并且使用了'前端控制器'模型来进行设计,再根据'请求映射规则'分发给相应的页面控制器进行处理. (一)整体流程 每 ...

  8. Android App启动流程详解

    前言:在之前的文章中已经写了apk的打包流程.安装流程,今天就是梳理一下apk系列的最后的流程--app启动流程.经过今天的梳理以后咱们就可以对apk包是怎么编译生成的.apk是怎么被安装到安卓手机的 ...

  9. S5PV210 Uboot开发与移植03:Uboot启动流程详解

    目录 1. start.S解析 1.1 uboot入口分析 1.2 头文件包含 1.2.1 config.h 1.2.2 version.h 1.2.3 asm/proc/domain.h 1.2.4 ...

最新文章

  1. myeclipse使用maven整合ssh配置
  2. 性能调优常见问题与方案
  3. Linux下无法进入windows的NTFS分区并挂载错误的问题的解决方法
  4. leetcode 在排序数组中查找元素的第一个和最后一个位置
  5. c语言无效参数视为严重错误,C语言编译错误:错误:‘-’参数类型无效(有‘int’)...
  6. 2017/National _C_C++_B/2/磁砖样式
  7. NLP工程师必学技能,自然语言处理进阶手册
  8. java instanceof 继承_继承_instanceOf的使用
  9. xshell搭建宝塔没有远程命令密码框框弹出来_服务器安装宝塔控制面板+wordpress搭建个人网站...
  10. 实验一(高见老师收)
  11. react vs 2017_我在React Europe 2017上学到了什么
  12. 动态时间规整算法_如何使用动态时间规整算法进行语音识别
  13. 廖雪峰Python教程学习笔记
  14. 各种常见数据传输线端口(插头)的分类
  15. 电脑PHP动画制作画板,Canvas在线画图—简单制作一个画板
  16. 吴伯凡-认知方法论-矩阵式认知与苏格拉底
  17. eclipes代码提示及防空格自动补全
  18. 一文概览神经网络优化算法
  19. B站入局直播电商,流量的终点就是带货?
  20. Mask R-CNN 原理解析

热门文章

  1. 深度学习基础--SOFTMAX回归(单层神经网络)
  2. 118 以太坊 ethereum hardhat :编译 artifacts
  3. vue——数字加逗号分隔
  4. django框架全解
  5. 从苏宁电器到卡巴斯基(第二部)第30篇:我当高校教师的这几年 VI
  6. 百度智能云发布全新云智一体3.0架构,自研技术贯穿各层级
  7. HMM隐马尔科夫模型(附维特比代码)
  8. jks与keystore区别
  9. Jacob操作Word文档插入表格并表格中插入图片案例
  10. 电脑技巧 之 Discord翻译插件(PC端)(保姆级教程)