目录

  • broker启动流程
  • broker启动可配置参数
  • 启动入口`BrokerStartup`
    • 1.创建brokerController
    • 2.`BrokerController`构造函数
    • 3.BrokerController初始化`initialize()`
      • 3.1注册消息处理器`registerProcessor`
      • 3.2初始化事务消息相关的服务`initialTransaction()`
      • 3.3`initialize`总结
    • 4.BrokerControler的`start`

broker启动流程

借用一下【秃头爱健身】博主的的图,我觉得画的很好。

broker启动可配置参数

 -n : 指定broker 的 namesrvAddr 地址;-h :打印命令;-c :指定配置文件的路径;-p :启动时候日志打印配置信息;-m :启动时候日志打印导入的配置信息。

启动入口BrokerStartup

broker启动的入口是在brokerStartup,方法是main

    public static void main(String[] args) {//创建BrokerControllerstart(createBrokerController(args));}

1.创建brokerController

public static BrokerController createBrokerController(String[] args) {//设置RocketMq的版本号System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));//设置broker的netty客户端的发送缓冲大小,默认是128 kbif (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {NettySystemConfig.socketSndbufSize = 131072;}//设置broker的netty客户端的接受缓冲大小,默认是128 kbif (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {NettySystemConfig.socketRcvbufSize = 131072;}try {//PackageConflictDetect.detectFastjson();//命令行选项解析Options options = ServerUtil.buildCommandlineOptions(new Options());//解析命令行为 ‘mqbroker’的参数commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),new PosixParser());//如果为空,直接退出if (null == commandLine) {System.exit(-1);}//创建broker,netty的相关配置对象final BrokerConfig brokerConfig = new BrokerConfig();final NettyServerConfig nettyServerConfig = new NettyServerConfig();final NettyClientConfig nettyClientConfig = new NettyClientConfig();//是否使用TLS (TLS是SSL的升级版本,TLS是SSL的标准化后的产物,有1.0 1.1 1.2三个版本)nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));//设置netty的服务端监听的端口 10911,对外提供消息读写服务的端口nettyServerConfig.setListenPort(10911);//创建消息存储配置final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();//如果broker是slave节点if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {//比默认的40% 还要小 10int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;//设置消息存储配置所能使用的最大内存比例,超过该内存,消息将被置换出内存,messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);}//解析命令行参数'-c':指定broker的配置文件路径if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {configFile = file;InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);properties2SystemEnv(properties);MixAll.properties2Object(properties, brokerConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);MixAll.properties2Object(properties, messageStoreConfig);BrokerPathConfigHelper.setBrokerConfigPath(file);in.close();}}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);if (null == brokerConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}//检查broker配置中的nameServer地址String namesrvAddr = brokerConfig.getNamesrvAddr();if (null != namesrvAddr) {try {String[] addrArray = namesrvAddr.split(";");for (String addr : addrArray) {RemotingUtil.string2SocketAddress(addr);}} catch (Exception e) {System.out.printf("The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",namesrvAddr);System.exit(-3);}}//检查broker的角色switch (messageStoreConfig.getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER://如果是master节点,则设置该节点brokerId=0brokerConfig.setBrokerId(MixAll.MASTER_ID);break;case SLAVE:if (brokerConfig.getBrokerId() <= 0) {System.out.printf("Slave's brokerId must be > 0");System.exit(-3);}break;default:break;}//是否启用 DLedger,即是否启用 RocketMQ 主从切换,默认值为 false。如果需要开启主从切换,则该值需要设置为 trueif (messageStoreConfig.isEnableDLegerCommitLog()) {brokerConfig.setBrokerId(-1);}//设置消息存储配置的高可用端口,10912messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");//解析命令行参数'-p':启动时候日志打印配置信息if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig);MixAll.printObjectProperties(console, nettyServerConfig);MixAll.printObjectProperties(console, nettyClientConfig);MixAll.printObjectProperties(console, messageStoreConfig);System.exit(0);} //解析命令行参数'-m':启动时候日志打印导入的配置信息else if (commandLine.hasOption('m')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig, true);MixAll.printObjectProperties(console, nettyServerConfig, true);MixAll.printObjectProperties(console, nettyClientConfig, true);MixAll.printObjectProperties(console, messageStoreConfig, true);System.exit(0);}log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);MixAll.printObjectProperties(log, brokerConfig);MixAll.printObjectProperties(log, nettyServerConfig);MixAll.printObjectProperties(log, nettyClientConfig);MixAll.printObjectProperties(log, messageStoreConfig);//创建BrokerControllerfinal BrokerController controller = new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);// 记住所有的配置以防止丢弃controller.getConfiguration().registerConfig(properties);//初始化BrokerControllerboolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}//注册关闭的钩子方法Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {private volatile boolean hasShutdown = false;private AtomicInteger shutdownTimes = new AtomicInteger(0);@Overridepublic void run() {synchronized (this) {log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());if (!this.hasShutdown) {this.hasShutdown = true;long beginTime = System.currentTimeMillis();//BrokerController的销毁方法controller.shutdown();long consumingTimeTotal = System.currentTimeMillis() - beginTime;log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);}}}}, "ShutdownHook"));return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;}

broker启动会默认启动3个端口

端口 说明
10911 接收消息推送的端口
10912 消息存储配置的高可用端口
10909 推送消息的VIP端口

2.BrokerController构造函数

构造入参说明

参数 类型 说明
brokerConfig BrokerConfig 封装Broker的基本配置信息
nettyServerConfig NettyServerConfig 封装了broker作为对外提供消息读写操作的MQ服务器信息
nettyClientConfig NettyClientConfig 封装了broker作为NameServer的客户端的信息
messageStoreConfig MessageStoreConfig 封装消息存储Store的配置信息
public  BrokerController(final BrokerConfig brokerConfig,final NettyServerConfig nettyServerConfig,final NettyClientConfig nettyClientConfig,final MessageStoreConfig messageStoreConfig) {// BrokerStartup中准备的配置信息this.brokerConfig = brokerConfig;this.nettyServerConfig = nettyServerConfig;this.nettyClientConfig = nettyClientConfig;this.messageStoreConfig = messageStoreConfig;// Consumer消费进度记录管理类,会读取store/config/consumerOffset.json配置文件this.consumerOffsetManager = new ConsumerOffsetManager(this);//消息Topic维度的管理查询类, 管理Topic和Topic相关的配置关系,  会读取store/config/topics.jsonthis.topicConfigManager = new TopicConfigManager(this);//Consumer端使用pull的方式向Broker拉取消息请求的处理类this.pullMessageProcessor = new PullMessageProcessor(this);//Consumer使用Push方式的长轮询机制拉取请求时,保存使用,当有消息到达时进行推送处理的类this.pullRequestHoldService = new PullRequestHoldService(this);//有消息到达Broker时的监听器,回调pullRequestHoldService中的notifyMessageArriving()方法this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);//消费者id变化监听器this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);//消费者管理类,并对消费者id变化进行监听this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);//消费者过滤类,按照Topic进行分类,  会读取store/config/consumerFilter.jsonthis.consumerFilterManager = new ConsumerFilterManager(this);//生产者管理 按照group进行分类this.producerManager = new ProducerManager();//客户端心跳连接处理类this.clientHousekeepingService = new ClientHousekeepingService(this);//Console控制台获取Broker信息使用this.broker2Client = new Broker2Client(this);//订阅关系管理类this.subscriptionGroupManager = new SubscriptionGroupManager(this);//Broker对外访问的APIthis.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);//FilterServer管理类this.filterServerManager = new FilterServerManager(this);//Broker主从同步进度管理类this.slaveSynchronize = new SlaveSynchronize(this);// 各种线程池的阻塞队列// 发送消息线程池队列this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));this.brokerFastFailure = new BrokerFastFailure(this);this.configuration = new Configuration(log,BrokerPathConfigHelper.getBrokerConfigPath(),this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig);}

重点对一些核心类进行说明

参数 说明
ConsumerOffsetManager Consumer消费进度记录管理类,会读取store/config/consumerOffset.json配置文件
topicConfigManager 消息Topic维度的管理查询类, 管理Topic和Topic相关的配置关系, 会读取store/config/topics.json
pullMessageProcessor Consumer端使用pull的方式向Broker拉取消息请求的处理类
pullRequestHoldService Consumer使用Push方式的长轮询机制拉取请求时,保存使用,当有消息到达时进行推送处理的类
messageArrivingListener 有消息到达Broker时的监听器,回调pullRequestHoldService中的notifyMessageArriving()方法
consumerIdsChangeListener 消费者id变化监听器
consumerManager 消费者管理类,并对消费者id变化进行监听
consumerFilterManager 消费者过滤类,按照Topic进行分类, 会读取store/config/consumerFilter.json
producerManager 生产者管理 按照group进行分类
clientHousekeepingService 客户端心跳连接处理类
broker2Client Console控制台获取Broker信息使用
subscriptionGroupManager 订阅关系管理类
brokerOuterAPI Broker对外访问的API
filterServerManager FilterServer管理类
slaveSynchronize Broker主从同步进度管理类

3.BrokerController初始化initialize()

public boolean initialize() throws CloneNotSupportedException {//加载 topic 相关配置,文件地址为 {user.home}/store/config/topics.jsonboolean result = this.topicConfigManager.load();//加载 不同的Consumer消费的进度情况  文件地址为 {user.home}/store/config/consumerOffset.jsonresult = result && this.consumerOffsetManager.load();//加载 订阅关系  文件地址  {user.home}/store/config/subscriptionGroup.jsonresult = result && this.subscriptionGroupManager.load();//加载 Consumer的过滤信息配置  文件地址  {user.home}/store/config/consumerFilter.jsonresult = result && this.consumerFilterManager.load();//如果加载成功if (result) {try {//创建消息存储类messageStorethis.messageStore =new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,this.brokerConfig);//使用的是DLegerCommitLog,则创建DLedgerRoleChangeHandlerif (messageStoreConfig.isEnableDLegerCommitLog()) {DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);}//broker消息统计类this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);//load pluginMessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);this.messageStore = MessageStoreFactory.build(context, this.messageStore);this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));} catch (IOException e) {result = false;log.error("Failed to initialize", e);}}//加载消息的日志文件,包含CommitLog,ConsumeQueue等result = result && this.messageStore.load();//如果加载成功if (result) {//开启服务端this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();//设置10909的服务端口fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);//开启10909的服务端口,这个端口只给生产者使用this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);//处理消息生产者发送的生成消息请求相关的线程池this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getSendMessageThreadPoolNums(),this.brokerConfig.getSendMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.sendThreadPoolQueue,new ThreadFactoryImpl("SendMessageThread_"));//处理消费者发出的消费消息请求相关的线程池this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getPullMessageThreadPoolNums(),this.brokerConfig.getPullMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.pullThreadPoolQueue,new ThreadFactoryImpl("PullMessageThread_"));//处理回复消息api的线程池this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getProcessReplyMessageThreadPoolNums(),this.brokerConfig.getProcessReplyMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.replyThreadPoolQueue,new ThreadFactoryImpl("ProcessReplyMessageThread_"));//查询线程this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getQueryMessageThreadPoolNums(),this.brokerConfig.getQueryMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.queryThreadPoolQueue,new ThreadFactoryImpl("QueryMessageThread_"));//省略一些线程池//为客户端注册需要处理API指令事件,以及消息发送和消费的回调方法this.registerProcessor();final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();final long period = 1000 * 60 * 60 * 24;//每天执行一次,统计昨天put的message和get的messagethis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.getBrokerStats().record();} catch (Throwable e) {log.error("schedule record error.", e);}}}, initialDelay, period, TimeUnit.MILLISECONDS);// 默认5s执行一次,会把消费这的偏移量存到文件中  ${user.home}/store/config/consumerOffset.json.jsonthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerOffsetManager.persist();} catch (Throwable e) {log.error("schedule persist consumerOffset error.", e);}}}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);// 默认10s执行一次,会把消费者的消息过滤的信息持久化到文件 ${user.home}/store/config/consumerFilter.jsonthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerFilterManager.persist();} catch (Throwable e) {log.error("schedule persist consumer filter error.", e);}}}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);// 每3分钟,当消费者消费太慢,会禁用到消费者组this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.protectBroker();} catch (Throwable e) {log.error("protectBroker error.", e);}}}, 3, 3, TimeUnit.MINUTES);//打印当前的Send Queue Size,Pull Queue Size,Query Queue Size,Transaction Queue Sizethis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printWaterMark();} catch (Throwable e) {log.error("printWaterMark error.", e);}}}, 10, 1, TimeUnit.SECONDS);//每隔一分钟打印一次,dispath的消息偏移量和总的消息偏移量的差值this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());} catch (Throwable e) {log.error("schedule dispatchBehindBytes error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);if (this.brokerConfig.getNamesrvAddr() != null) {//更新nameServer地址this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {//没有明确指定name-server的地址,且配置了允许从地址服务器获取name-server地址this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 每隔2分钟从name-server地址服务器拉取最新的配置// 这个是实现name-server动态增减的唯一方法   BrokerController.this.brokerOuterAPI.fetchNameServerAddr();} catch (Throwable e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}if (!messageStoreConfig.isEnableDLegerCommitLog()) {if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());this.updateMasterHAServerAddrPeriodically = false;} else {this.updateMasterHAServerAddrPeriodically = true;}} else {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {//定时打印master与slave的差距BrokerController.this.printMasterAndSlaveDiff();} catch (Throwable e) {log.error("schedule printMasterAndSlaveDiff error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);}}//初始化事务消息相关的服务initialTransaction();//初始化权限管理器initialAcl();//初始化RPC调用的钩子initialRpcHooks();}return result;}

3.1注册消息处理器registerProcessor

rocketmq中有许多线程执行器,包括sendMessageExecutor(发送消息),pullMessageExecutor(拉取消息),queryMessageExecutor(查询消息),adminBrokerExecutor(默认处理)。这些线程执行器会通过registerProcessor注册到NettyRemotingServer ,每一个RequestCode会有一个对应的执行器,最终会以RequestCode为键放到一个HashMap中,当请求到达nettyServer时会根据RequestCode把请求分发到不同的执行器去处理请求

 public void registerProcessor() {/*** SendMessageProcessor*/SendMessageProcessor sendProcessor = new SendMessageProcessor(this);sendProcessor.registerSendMessageHook(sendMessageHookList);sendProcessor.registerConsumeMessageHook(consumeMessageHookList);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);//省略其他的处理器

最终放到processorTable的map中

    @Overridepublic void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {ExecutorService executorThis = executor;if (null == executor) {executorThis = this.publicExecutor;}Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);this.processorTable.put(requestCode, pair);}

相关的RequestCode说明一下:

事件名 code 说明
SEND_MESSAGE 10 生产者发送信息
SEND_MESSAGE_V2 310 生产者发送信息
SEND_BATCH_MESSAGE 320 批量发送消息
CONSUMER_SEND_MSG_BACK 36 消费端消费失败的时候返回的消息
PULL_MESSAGE 11 消费者拉取消息
SEND_REPLY_MESSAGE 324 消费者回包消息,可以用类似RPC调用

3.2初始化事务消息相关的服务initialTransaction()

服务加载方式是Java的SPI方式。

 private void initialTransaction() {//加载TransactionalMessageService服务,实现类为TransactionalMessageServiceImplthis.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);if (null == this.transactionalMessageService) {this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());}//AbstractTransactionalMessageCheckListener对应的服务类为LogTransactionalMessageCheckListener ,其中实现为空实现this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);if (null == this.transactionalMessageCheckListener) {this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());}//设置对应的brokerController到AbstractTransactionalMessageCheckListener中this.transactionalMessageCheckListener.setBrokerController(this);//创建TransactionalMessageCheckService,服务是周期检查事务的服务,this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);}}

3.3initialize总结

initialize方法相应的逻辑相对来说比较多,稍微总结为已下几步:

1.服务器内的相关日志文件的加载,{user.home}/store/config/ 文件目录下的json配置文件(包含topics,consumerOffset,subscriptionGroup,consumerFilter)。

2.如果上述文件加载成功,会启动对应的Broker客户端,然后创建一些线程池,在后面注册 API 指令事件后会监听到API的时候会进行处理
3.注册事件到对应的Broker客户端上,然后会记录对应的API事件和对应线程池封装到一个对象中
4.启动一些 定时任务,这些任务比如记录Broker状态,消费进度持久化等任务
5.初始化一些服务,比如事务相关(周期检查事务),消息权限校验初始化和Rpc调用钩子相关服务。对应的服务加载方式是Java的SPI方式。

4.BrokerControler的start

 controller.start();public void start() throws Exception {if (this.messageStore != null) {// 启动消息存储服务DefaultMessageStore,其会对/store/lock文件加锁,// 以确保在broker运行期间只有一个broker实例操作/store目录this.messageStore.start();}if (this.remotingServer != null) {// 启动Netty服务监听10911端口,对外提供服务(消息生产、消费)this.remotingServer.start();}if (this.fastRemotingServer != null) {// 监听10909端口this.fastRemotingServer.start();}if (this.fileWatchService != null) {// fileWatchService与TLS有关,todo tls解析this.fileWatchService.start();}if (this.brokerOuterAPI != null) {// 启动Netty客户端netty,broker使用其向外发送数据,比如:向NameServer上报心跳、topic信息。this.brokerOuterAPI.start();}if (this.pullRequestHoldService != null) {// 长轮询机制hold住拉取消息请求的服务this.pullRequestHoldService.start();}if (this.clientHousekeepingService != null) {// 每10s检查一遍非活动的连接服务this.clientHousekeepingService.start();}if (this.filterServerManager != null) {this.filterServerManager.start();}if (!messageStoreConfig.isEnableDLegerCommitLog()) {// 处理HAstartProcessorByHa(messageStoreConfig.getBrokerRole());// 启动定时任务,定时与slave机器同步数据,同步的内容包括配置,消费位移等handleSlaveSynchronize(messageStoreConfig.getBrokerRole());// 向所有的nameserver发送本机所有的主题数据;// 包括主题名、读队列个数、写队列个数、队列权限、是否有序等this.registerBrokerAll(true, false, true);}this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 定时向NameServer注册Broker,最小每10s。BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager != null) {// Broker信息统计,这个没有具体的实现;所以暂时不用管this.brokerStatsManager.start();}if (this.brokerFastFailure != null) {// Broker对请求队列中的请求进行快速失败,返回`Broker繁忙、请稍后重试`信息this.brokerFastFailure.start();}
}

对这几个服务进行说明一下:

服务名 类型 说明
messageStore DefaultMessageStore 处理消息的存储相关的日志,比如CommitLog,ConsumeQueue等
remotingServer RemotingServer Broker的服务端,处理消费者和生产者的请求
fastRemotingServer RemotingServer 只给消息生产者的服务端
fileWatchService FileWatchService 启动监控服务连接时用到的SSL连接文件的服务
brokerOuterAPI BrokerOuterAPI RocketMQ控制台跟Broker交互时候的客户端
pullRequestHoldService PullRequestHoldService 处理push模式消费,或者延迟消费的服务
clientHousekeepingService ClientHousekeepingService 心跳连接用的服务
filterServerManager FilterServerManager 消息过滤的服务
transactionalMessageCheckService TransactionalMessageCheckService 定期检查和处理事务消息的服务
slaveSynchronize SlaveSynchronize 主从之间topic,消费偏移等信息同步用的

RocketMQ源码解析-Broker部分之Broker启动过程相关推荐

  1. Tomcat源码解析三:tomcat的启动过程

    Tomcat组件生命周期管理 在Tomcat总体结构 (Tomcat源代码解析之二)中,我们列出了Tomcat中Server,Service,Connector,Engine,Host,Context ...

  2. 6、RocketMQ 源码解析之 Broker 启动(上)

    上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...

  3. RocketMQ源码解析之broker文件清理

    原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...

  4. 《RocketMQ源码分析》NameServer如何处理Broker的连接

    <RocketMQ源码分析>NameServer如何处理Broker的连接 NameServer 介绍 NameServer 功能 动态路由发现和注册 服务剔除 创建NameServerC ...

  5. RocketMQ源码解析之消息消费者(consume Message)

    原创不易,转载请注明出处 文章目录 前言 1.消息流程介绍 2.源码解析 2.1 并发消费 2.2 顺序消费 前言 我们在<RocketMQ源码解析之消息消费者(pullMessage)> ...

  6. RocketMQ源码解析:Filtersrv

    ???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...

  7. rocketmq源码解析之name启动(一)

    2019独角兽企业重金招聘Python工程师标准>>> 说在前面 主要解析namrsrv启动部分,namesrv配置加载.netty server创建.注册出处理器. 正文 源码解析 ...

  8. Myth源码解析系列之四- 配置与启动详解

    在上一篇中,我们项目所需的整个环境都已搭建完成,下面我们主要介绍项目的相关配置于启动环节 配置详解 注意: 这里事务存储我们这里采用的是 : mysql, 消息中间件选择的是:rocketmq, 其他 ...

  9. 谷歌BERT预训练源码解析(三):训练过程

    目录 前言 源码解析 主函数 自定义模型 遮蔽词预测 下一句预测 规范化数据集 前言 本部分介绍BERT训练过程,BERT模型训练过程是在自己的TPU上进行的,这部分我没做过研究所以不做深入探讨.BE ...

  10. Spring AOP源码解析-拦截器链的执行过程

    一.简介 在前面的两篇文章中,分别介绍了 Spring AOP 是如何为目标 bean 筛选合适的通知器,以及如何创建代理对象的过程.现在得到了 bean 的代理对象,且通知也以合适的方式插在了目标方 ...

最新文章

  1. Tomcat关闭后,重新启动,session中保存的对象为什么还存在解决方法
  2. 基于matlab异步电机 s函数,建立电机状态方程的S 函数和仿真模)基于MATLAB的无刷双馈电机建模与仿真...
  3. python字符集_PYTHON 中的字符集
  4. 怎么把视频里的音乐提取出来
  5. 在winform中使用wpf窗体
  6. 20175223 MySQL
  7. 使用Kubernetes和Docker将Spring Boot与MongoDB作为容器部署
  8. delphi设计模式 多语言开发
  9. tpc-c值大于46万_超过46万人参加了2018年慕尼黑啤酒节
  10. 使用Apache Ignite构建C++版本的分布式应用
  11. linux下加载ISO镜像的方法
  12. .net 开发工程师 面试题
  13. [汇总]计算机专业相关证书大全(持续更新...)
  14. 基于QT实现的图形软件图片编辑器
  15. flv怎么转换成mp4格式?
  16. 令人头大的慢查询分析
  17. NAND FLAASH基础
  18. 一周快讯:乐视网市值蒸发349亿,钱宝网张小雷被逮
  19. 基于Hadoop的企业人力资源管理
  20. 毕设-基于人脸识别的教室点名系统(一)

热门文章

  1. 哪些实时翻译的软件好用?分享这三款好用的软件
  2. 电蚊香长时间通电引发的86离线语音智能盒子的设计开发
  3. 【推荐】javaweb JAVA JSP家政服务管理系统服务网站jsp服务信息管理jsp保姆月嫂招聘系统案例设计与实现源码
  4. 《Java并发编程实战》【第一部分 基础知识】
  5. 蓝屏代码: DRIVER_UNLOADED_WITHOUT_CANCELLING_PENDING_OPERATIONS bootsafe64.sys
  6. IDEA设置 代码提示 快捷键
  7. 【机器学习】使用Matlab和CNN完成回归任务
  8. 无法显示页面,因为发生内部服务器错误。
  9. JZOJ 7066. 【2021.4.24 NOI模拟】ehzeux与圆周(DP)
  10. 计算机应用的基础包括内容是什么,计算机应用基础试题2