RocketMQ-broker启动流程详解
本文基于RocketMQ 4.7.1版本
本文将从整体上分析broker如何启动。
文章目录
- 一、broker启动
- 1、createBrokerController()
- 2、start()
一、broker启动
broker的启动类是BrokerStartup,其main方法如下:
public static void main(String[] args) {start(createBrokerController(args));}
createBrokerController()用于创建BrokerController控制器,start()用于启动BrokerController。
BrokerController是broker启动的核心类,它会创建一些处理器和管理器,并且持有四大配置对象。它创建的管理器有生产者管理器、消费者管理器、broker状态管理器、主题配置管理器等,这些管理器在以后使用到的时候在介绍。
下面分别来看这两个方法的逻辑。
1、createBrokerController()
下图是createBrokerController方法的处理流程:
下面是对上图内容的一些解析。
broker对外提供服务的端口号不是可配置的,是代码中写死的,必须是10911。
因为broker的配置非常多,所以rocketmq将这些配置做了一下归类,分散到四个不同的配置类中:BrokerConfig属于broker的基本配置,NettyServerConfig和NettyClientConfig用于设置网络相关的配置,比如对外发布服务的端口号就是在NettyServerConfig中设置,MessageStoreConfig用于设置存储相关的配置,比如消息存储路径、日志路径。
如果当前的broker是master,那么其broker id必须是0。
rocketmq在目录/store/config目录下维护了几个非常重要的json文件:
- 主题配置文件是在个人目录下的/store/config目录中,文件名为topics.json,该文件中保存主题名、读队列个数、写队列个数、是否只读。文件解析完后,将数据存储到主题配置管理器TopicConfigManager的topicConfigTable属性中。
- 消费者位移文件(consumerOffset.json)记录主题在每个消费组下每个读队列的消费位移,比如:
上图显示了消费组consumer-A消费了主题topicTest11的四个读队列的消息,value里面显示了四个读队列的位移。该文件由ConsumerOffsetManager处理,解析后的数据保存在该对象的offsetTable属性中。 - 订阅组文件(subscriptionGroup.json),也叫作消费组文件,里面保存了消费组的配置信息,比如消费组名、消费组是否可以消费消息、重试队列个数、最大重试次数等。该文件由SubscriptionGroupManager处理,解析后数据保存在该对象的subscriptionGroupTable属性中。
- 消费者过滤器配置文件(ConsumerFilterManager),记录了每个主题下消费组配置的消息过滤条件,解析后数据保存在ConsumerFilterManager的filterDataByTopic属性中。
消息存储器插件可以在配置文件里面由参数messageStorePlugIn设置,值使用全限定类名,多个插件之间使用“,”分隔,插件类需要继承抽闲类AbstractPluginMessageStore,该类实现了MessageStore接口,DefaultMessageStore也实现了MessageStore接口,当存储消息时,rocketmq将多个插件按照配置依次调用,最后调用DefaultMessageStore将消息存储到文件,因此插件提供了一种在消息存储前后修改数据的功能。
上图中的数据恢复并不是系统宕机后的恢复,这里的恢复是指读取文件来恢复内存数据,使内存数据尽可能恢复到停机前的现场,比如在停机前内存会记录消息文件的下一个写入位置,重启后这个数据就会丢失,那么需要重新读取消息文件,找到消息最后的写入位置,在读取期间,还会处理消息索引,处理读队列的数据。
2、start()
本方法主要是调用BrokerController.start()方法来启动控制器,BrokerController.start()又会启动一系列的组件,下面来看一下BrokerController.start()的代码:
public void start() throws Exception {if (this.messageStore != null) {//启动DefaultMessageStore,该类会对/store/lock文件加锁,//确保在broker运行期间只有一个broker实例操作/store目录this.messageStore.start();}//启动Netty监听10911端口,可以对外提供服务if (this.remotingServer != null) {this.remotingServer.start();}if (this.fastRemotingServer != null) {//监听10909端口,作用未知this.fastRemotingServer.start();}if (this.fileWatchService != null) {//fileWatchService与TLS有关,本文暂不对TLS解析this.fileWatchService.start();}if (this.brokerOuterAPI != null) {//启动客户端Netty,broke使用该对象对外发送数据,比如向nameserver注册主题信息this.brokerOuterAPI.start();}if (this.pullRequestHoldService != null) {//作用未知this.pullRequestHoldService.start();}if (this.clientHousekeepingService != null) {//作用未知this.clientHousekeepingService.start();}if (this.filterServerManager != null) {//作用未知this.filterServerManager.start();}if (!messageStoreConfig.isEnableDLegerCommitLog()) {//处理HAstartProcessorByHa(messageStoreConfig.getBrokerRole());//启动定时任务,定时与slave机器同步数据,同步的内容包括配置,消费位移等handleSlaveSynchronize(messageStoreConfig.getBrokerRole());//向所有的nameserver发送本机所有的主题数据,//包括主题名、读队列个数、写队列个数、队列权限、是否有序等this.registerBrokerAll(true, false, true);}this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {//定时任务,每过一段时间向nameserver注册一次主题信息@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager != null) {this.brokerStatsManager.start();//空实现}//启动定时任务,用于对收到的请求做流控,//如果发现broker收到的请求在指定的时间内无法处理完成,默认是5s,//那么会向请求方返回错误信息,告知broker正忙,请稍后重试//涉及到的请求有:生产者发送的消息、消费者拉去消息的请求、心跳请求、事务结束请求if (this.brokerFastFailure != null) {this.brokerFastFailure.start();}}
BrokerController.start()方法主要是启动一些组件,其主要作用是:
- 监听10911端口,接收生产者和消费者请求;
- 向nameserver发送本机所有的主题数据,并启动定时任务。
RocketMQ-broker启动流程详解相关推荐
- U-Boot启动流程详解
参考:U-Boot顶层目录链接脚本文件(u-boot.lds)介绍 作者:一只青木呀 发布时间: 2020-10-23 13:52:23 网址:https://blog.csdn.net/weixin ...
- 【Autosar 启动流程详解】
Autosar 启动流程详解 1. vLinkGen_Template.lsl 2. BrsHwStartup.c 3.BrsMainStartup.c 4.BrsMain.c 链接文件: 1. vL ...
- 【正点原子Linux连载】第三十二章 U-Boot启动流程详解 -摘自【正点原子】I.MX6U嵌入式Linux驱动开发指南V1.0
1)实验平台:正点原子阿尔法Linux开发板 2)平台购买地址:https://item.taobao.com/item.htm?id=603672744434 2)全套实验源码+手册+视频下载地址: ...
- golang程序启动流程详解
golang程序启动流程详解 环境 go1.16.5 linux/amd64 用例 package mainimport "fmt"func main() {fmt.Println ...
- android zygote启动流程,Android zygote启动流程详解
对zygote的理解 在Android系统中,zygote是一个native进程,是所有应用进程的父进程.而zygote则是Linux系统用户空间的第一个进程--init进程,通过fork的方式创建并 ...
- 【线上沙龙直播报名】App 启动流程详解及其优化
点击上方"公众号"可以订阅哦 [美团点评技术沙龙Online]是美团点评技术团队推出的线上分享课程,每月2-3期,采用目前最火热的线上直播形式,邀请美团点评技术专家,面向互联网技术 ...
- Springboot启动流程详解
SpringMVC请求流程详解 SpringMVC框架是一个基于请求驱动的Web框架,并且使用了'前端控制器'模型来进行设计,再根据'请求映射规则'分发给相应的页面控制器进行处理. (一)整体流程 每 ...
- Android App启动流程详解
前言:在之前的文章中已经写了apk的打包流程.安装流程,今天就是梳理一下apk系列的最后的流程--app启动流程.经过今天的梳理以后咱们就可以对apk包是怎么编译生成的.apk是怎么被安装到安卓手机的 ...
- S5PV210 Uboot开发与移植03:Uboot启动流程详解
目录 1. start.S解析 1.1 uboot入口分析 1.2 头文件包含 1.2.1 config.h 1.2.2 version.h 1.2.3 asm/proc/domain.h 1.2.4 ...
最新文章
- myeclipse使用maven整合ssh配置
- 性能调优常见问题与方案
- Linux下无法进入windows的NTFS分区并挂载错误的问题的解决方法
- leetcode 在排序数组中查找元素的第一个和最后一个位置
- c语言无效参数视为严重错误,C语言编译错误:错误:‘-’参数类型无效(有‘int’)...
- 2017/National _C_C++_B/2/磁砖样式
- NLP工程师必学技能,自然语言处理进阶手册
- java instanceof 继承_继承_instanceOf的使用
- xshell搭建宝塔没有远程命令密码框框弹出来_服务器安装宝塔控制面板+wordpress搭建个人网站...
- 实验一(高见老师收)
- react vs 2017_我在React Europe 2017上学到了什么
- 动态时间规整算法_如何使用动态时间规整算法进行语音识别
- 廖雪峰Python教程学习笔记
- 各种常见数据传输线端口(插头)的分类
- 电脑PHP动画制作画板,Canvas在线画图—简单制作一个画板
- 吴伯凡-认知方法论-矩阵式认知与苏格拉底
- eclipes代码提示及防空格自动补全
- 一文概览神经网络优化算法
- B站入局直播电商,流量的终点就是带货?
- Mask R-CNN 原理解析