RocketMQ源码解析-Broker部分之Broker启动过程
目录
- broker启动流程
- broker启动可配置参数
- 启动入口`BrokerStartup`
- 1.创建brokerController
- 2.`BrokerController`构造函数
- 3.BrokerController初始化`initialize()`
- 3.1注册消息处理器`registerProcessor`
- 3.2初始化事务消息相关的服务`initialTransaction()`
- 3.3`initialize`总结
- 4.BrokerControler的`start`
broker启动流程
借用一下【秃头爱健身】博主的的图,我觉得画的很好。
broker启动可配置参数
-n : 指定broker 的 namesrvAddr 地址;-h :打印命令;-c :指定配置文件的路径;-p :启动时候日志打印配置信息;-m :启动时候日志打印导入的配置信息。
启动入口BrokerStartup
broker启动的入口是在brokerStartup
,方法是main
public static void main(String[] args) {//创建BrokerControllerstart(createBrokerController(args));}
1.创建brokerController
public static BrokerController createBrokerController(String[] args) {//设置RocketMq的版本号System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));//设置broker的netty客户端的发送缓冲大小,默认是128 kbif (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {NettySystemConfig.socketSndbufSize = 131072;}//设置broker的netty客户端的接受缓冲大小,默认是128 kbif (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {NettySystemConfig.socketRcvbufSize = 131072;}try {//PackageConflictDetect.detectFastjson();//命令行选项解析Options options = ServerUtil.buildCommandlineOptions(new Options());//解析命令行为 ‘mqbroker’的参数commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),new PosixParser());//如果为空,直接退出if (null == commandLine) {System.exit(-1);}//创建broker,netty的相关配置对象final BrokerConfig brokerConfig = new BrokerConfig();final NettyServerConfig nettyServerConfig = new NettyServerConfig();final NettyClientConfig nettyClientConfig = new NettyClientConfig();//是否使用TLS (TLS是SSL的升级版本,TLS是SSL的标准化后的产物,有1.0 1.1 1.2三个版本)nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));//设置netty的服务端监听的端口 10911,对外提供消息读写服务的端口nettyServerConfig.setListenPort(10911);//创建消息存储配置final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();//如果broker是slave节点if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {//比默认的40% 还要小 10int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;//设置消息存储配置所能使用的最大内存比例,超过该内存,消息将被置换出内存,messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);}//解析命令行参数'-c':指定broker的配置文件路径if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {configFile = file;InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);properties2SystemEnv(properties);MixAll.properties2Object(properties, brokerConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);MixAll.properties2Object(properties, messageStoreConfig);BrokerPathConfigHelper.setBrokerConfigPath(file);in.close();}}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);if (null == brokerConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}//检查broker配置中的nameServer地址String namesrvAddr = brokerConfig.getNamesrvAddr();if (null != namesrvAddr) {try {String[] addrArray = namesrvAddr.split(";");for (String addr : addrArray) {RemotingUtil.string2SocketAddress(addr);}} catch (Exception e) {System.out.printf("The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",namesrvAddr);System.exit(-3);}}//检查broker的角色switch (messageStoreConfig.getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER://如果是master节点,则设置该节点brokerId=0brokerConfig.setBrokerId(MixAll.MASTER_ID);break;case SLAVE:if (brokerConfig.getBrokerId() <= 0) {System.out.printf("Slave's brokerId must be > 0");System.exit(-3);}break;default:break;}//是否启用 DLedger,即是否启用 RocketMQ 主从切换,默认值为 false。如果需要开启主从切换,则该值需要设置为 trueif (messageStoreConfig.isEnableDLegerCommitLog()) {brokerConfig.setBrokerId(-1);}//设置消息存储配置的高可用端口,10912messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");//解析命令行参数'-p':启动时候日志打印配置信息if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig);MixAll.printObjectProperties(console, nettyServerConfig);MixAll.printObjectProperties(console, nettyClientConfig);MixAll.printObjectProperties(console, messageStoreConfig);System.exit(0);} //解析命令行参数'-m':启动时候日志打印导入的配置信息else if (commandLine.hasOption('m')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig, true);MixAll.printObjectProperties(console, nettyServerConfig, true);MixAll.printObjectProperties(console, nettyClientConfig, true);MixAll.printObjectProperties(console, messageStoreConfig, true);System.exit(0);}log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);MixAll.printObjectProperties(log, brokerConfig);MixAll.printObjectProperties(log, nettyServerConfig);MixAll.printObjectProperties(log, nettyClientConfig);MixAll.printObjectProperties(log, messageStoreConfig);//创建BrokerControllerfinal BrokerController controller = new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);// 记住所有的配置以防止丢弃controller.getConfiguration().registerConfig(properties);//初始化BrokerControllerboolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}//注册关闭的钩子方法Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {private volatile boolean hasShutdown = false;private AtomicInteger shutdownTimes = new AtomicInteger(0);@Overridepublic void run() {synchronized (this) {log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());if (!this.hasShutdown) {this.hasShutdown = true;long beginTime = System.currentTimeMillis();//BrokerController的销毁方法controller.shutdown();long consumingTimeTotal = System.currentTimeMillis() - beginTime;log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);}}}}, "ShutdownHook"));return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;}
broker启动会默认启动3个端口
端口 | 说明 |
---|---|
10911 | 接收消息推送的端口 |
10912 | 消息存储配置的高可用端口 |
10909 | 推送消息的VIP端口 |
2.BrokerController
构造函数
构造入参说明
参数 | 类型 | 说明 |
---|---|---|
brokerConfig | BrokerConfig | 封装Broker的基本配置信息 |
nettyServerConfig | NettyServerConfig | 封装了broker作为对外提供消息读写操作的MQ服务器信息 |
nettyClientConfig | NettyClientConfig | 封装了broker作为NameServer的客户端的信息 |
messageStoreConfig | MessageStoreConfig | 封装消息存储Store的配置信息 |
public BrokerController(final BrokerConfig brokerConfig,final NettyServerConfig nettyServerConfig,final NettyClientConfig nettyClientConfig,final MessageStoreConfig messageStoreConfig) {// BrokerStartup中准备的配置信息this.brokerConfig = brokerConfig;this.nettyServerConfig = nettyServerConfig;this.nettyClientConfig = nettyClientConfig;this.messageStoreConfig = messageStoreConfig;// Consumer消费进度记录管理类,会读取store/config/consumerOffset.json配置文件this.consumerOffsetManager = new ConsumerOffsetManager(this);//消息Topic维度的管理查询类, 管理Topic和Topic相关的配置关系, 会读取store/config/topics.jsonthis.topicConfigManager = new TopicConfigManager(this);//Consumer端使用pull的方式向Broker拉取消息请求的处理类this.pullMessageProcessor = new PullMessageProcessor(this);//Consumer使用Push方式的长轮询机制拉取请求时,保存使用,当有消息到达时进行推送处理的类this.pullRequestHoldService = new PullRequestHoldService(this);//有消息到达Broker时的监听器,回调pullRequestHoldService中的notifyMessageArriving()方法this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);//消费者id变化监听器this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);//消费者管理类,并对消费者id变化进行监听this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);//消费者过滤类,按照Topic进行分类, 会读取store/config/consumerFilter.jsonthis.consumerFilterManager = new ConsumerFilterManager(this);//生产者管理 按照group进行分类this.producerManager = new ProducerManager();//客户端心跳连接处理类this.clientHousekeepingService = new ClientHousekeepingService(this);//Console控制台获取Broker信息使用this.broker2Client = new Broker2Client(this);//订阅关系管理类this.subscriptionGroupManager = new SubscriptionGroupManager(this);//Broker对外访问的APIthis.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);//FilterServer管理类this.filterServerManager = new FilterServerManager(this);//Broker主从同步进度管理类this.slaveSynchronize = new SlaveSynchronize(this);// 各种线程池的阻塞队列// 发送消息线程池队列this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));this.brokerFastFailure = new BrokerFastFailure(this);this.configuration = new Configuration(log,BrokerPathConfigHelper.getBrokerConfigPath(),this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig);}
重点对一些核心类进行说明
参数 | 说明 |
---|---|
ConsumerOffsetManager
|
Consumer消费进度记录管理类,会读取store/config/consumerOffset.json配置文件 |
topicConfigManager
|
消息Topic维度的管理查询类, 管理Topic和Topic相关的配置关系, 会读取store/config/topics.json |
pullMessageProcessor
|
Consumer端使用pull的方式向Broker拉取消息请求的处理类 |
pullRequestHoldService
|
Consumer使用Push方式的长轮询机制拉取请求时,保存使用,当有消息到达时进行推送处理的类 |
messageArrivingListener
|
有消息到达Broker时的监听器,回调pullRequestHoldService中的notifyMessageArriving()方法 |
consumerIdsChangeListener
|
消费者id变化监听器 |
consumerManager
|
消费者管理类,并对消费者id变化进行监听 |
consumerFilterManager
|
消费者过滤类,按照Topic进行分类, 会读取store/config/consumerFilter.json |
producerManager
|
生产者管理 按照group进行分类 |
clientHousekeepingService
|
客户端心跳连接处理类 |
broker2Client
|
Console控制台获取Broker信息使用 |
subscriptionGroupManager
|
订阅关系管理类 |
brokerOuterAPI
|
Broker对外访问的API |
filterServerManager
|
FilterServer管理类 |
slaveSynchronize
|
Broker主从同步进度管理类 |
3.BrokerController初始化initialize()
public boolean initialize() throws CloneNotSupportedException {//加载 topic 相关配置,文件地址为 {user.home}/store/config/topics.jsonboolean result = this.topicConfigManager.load();//加载 不同的Consumer消费的进度情况 文件地址为 {user.home}/store/config/consumerOffset.jsonresult = result && this.consumerOffsetManager.load();//加载 订阅关系 文件地址 {user.home}/store/config/subscriptionGroup.jsonresult = result && this.subscriptionGroupManager.load();//加载 Consumer的过滤信息配置 文件地址 {user.home}/store/config/consumerFilter.jsonresult = result && this.consumerFilterManager.load();//如果加载成功if (result) {try {//创建消息存储类messageStorethis.messageStore =new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,this.brokerConfig);//使用的是DLegerCommitLog,则创建DLedgerRoleChangeHandlerif (messageStoreConfig.isEnableDLegerCommitLog()) {DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);}//broker消息统计类this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);//load pluginMessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);this.messageStore = MessageStoreFactory.build(context, this.messageStore);this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));} catch (IOException e) {result = false;log.error("Failed to initialize", e);}}//加载消息的日志文件,包含CommitLog,ConsumeQueue等result = result && this.messageStore.load();//如果加载成功if (result) {//开启服务端this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();//设置10909的服务端口fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);//开启10909的服务端口,这个端口只给生产者使用this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);//处理消息生产者发送的生成消息请求相关的线程池this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getSendMessageThreadPoolNums(),this.brokerConfig.getSendMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.sendThreadPoolQueue,new ThreadFactoryImpl("SendMessageThread_"));//处理消费者发出的消费消息请求相关的线程池this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getPullMessageThreadPoolNums(),this.brokerConfig.getPullMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.pullThreadPoolQueue,new ThreadFactoryImpl("PullMessageThread_"));//处理回复消息api的线程池this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getProcessReplyMessageThreadPoolNums(),this.brokerConfig.getProcessReplyMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.replyThreadPoolQueue,new ThreadFactoryImpl("ProcessReplyMessageThread_"));//查询线程this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getQueryMessageThreadPoolNums(),this.brokerConfig.getQueryMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.queryThreadPoolQueue,new ThreadFactoryImpl("QueryMessageThread_"));//省略一些线程池//为客户端注册需要处理API指令事件,以及消息发送和消费的回调方法this.registerProcessor();final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();final long period = 1000 * 60 * 60 * 24;//每天执行一次,统计昨天put的message和get的messagethis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.getBrokerStats().record();} catch (Throwable e) {log.error("schedule record error.", e);}}}, initialDelay, period, TimeUnit.MILLISECONDS);// 默认5s执行一次,会把消费这的偏移量存到文件中 ${user.home}/store/config/consumerOffset.json.jsonthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerOffsetManager.persist();} catch (Throwable e) {log.error("schedule persist consumerOffset error.", e);}}}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);// 默认10s执行一次,会把消费者的消息过滤的信息持久化到文件 ${user.home}/store/config/consumerFilter.jsonthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerFilterManager.persist();} catch (Throwable e) {log.error("schedule persist consumer filter error.", e);}}}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);// 每3分钟,当消费者消费太慢,会禁用到消费者组this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.protectBroker();} catch (Throwable e) {log.error("protectBroker error.", e);}}}, 3, 3, TimeUnit.MINUTES);//打印当前的Send Queue Size,Pull Queue Size,Query Queue Size,Transaction Queue Sizethis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printWaterMark();} catch (Throwable e) {log.error("printWaterMark error.", e);}}}, 10, 1, TimeUnit.SECONDS);//每隔一分钟打印一次,dispath的消息偏移量和总的消息偏移量的差值this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());} catch (Throwable e) {log.error("schedule dispatchBehindBytes error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);if (this.brokerConfig.getNamesrvAddr() != null) {//更新nameServer地址this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {//没有明确指定name-server的地址,且配置了允许从地址服务器获取name-server地址this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 每隔2分钟从name-server地址服务器拉取最新的配置// 这个是实现name-server动态增减的唯一方法 BrokerController.this.brokerOuterAPI.fetchNameServerAddr();} catch (Throwable e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}if (!messageStoreConfig.isEnableDLegerCommitLog()) {if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());this.updateMasterHAServerAddrPeriodically = false;} else {this.updateMasterHAServerAddrPeriodically = true;}} else {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {//定时打印master与slave的差距BrokerController.this.printMasterAndSlaveDiff();} catch (Throwable e) {log.error("schedule printMasterAndSlaveDiff error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);}}//初始化事务消息相关的服务initialTransaction();//初始化权限管理器initialAcl();//初始化RPC调用的钩子initialRpcHooks();}return result;}
3.1注册消息处理器registerProcessor
rocketmq中有许多线程执行器,包括sendMessageExecutor(发送消息),pullMessageExecutor(拉取消息),queryMessageExecutor(查询消息),adminBrokerExecutor(默认处理)。这些线程执行器会通过registerProcessor注册到NettyRemotingServer ,每一个RequestCode会有一个对应的执行器,最终会以RequestCode为键放到一个HashMap中,当请求到达nettyServer时会根据RequestCode把请求分发到不同的执行器去处理请求
public void registerProcessor() {/*** SendMessageProcessor*/SendMessageProcessor sendProcessor = new SendMessageProcessor(this);sendProcessor.registerSendMessageHook(sendMessageHookList);sendProcessor.registerConsumeMessageHook(consumeMessageHookList);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);//省略其他的处理器
最终放到processorTable的map中
@Overridepublic void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {ExecutorService executorThis = executor;if (null == executor) {executorThis = this.publicExecutor;}Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);this.processorTable.put(requestCode, pair);}
相关的RequestCode说明一下:
事件名 | code | 说明 |
---|---|---|
SEND_MESSAGE | 10 | 生产者发送信息 |
SEND_MESSAGE_V2 | 310 | 生产者发送信息 |
SEND_BATCH_MESSAGE | 320 | 批量发送消息 |
CONSUMER_SEND_MSG_BACK | 36 | 消费端消费失败的时候返回的消息 |
PULL_MESSAGE | 11 | 消费者拉取消息 |
SEND_REPLY_MESSAGE | 324 | 消费者回包消息,可以用类似RPC调用 |
3.2初始化事务消息相关的服务initialTransaction()
服务加载方式是Java的SPI方式。
private void initialTransaction() {//加载TransactionalMessageService服务,实现类为TransactionalMessageServiceImplthis.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);if (null == this.transactionalMessageService) {this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());}//AbstractTransactionalMessageCheckListener对应的服务类为LogTransactionalMessageCheckListener ,其中实现为空实现this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);if (null == this.transactionalMessageCheckListener) {this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());}//设置对应的brokerController到AbstractTransactionalMessageCheckListener中this.transactionalMessageCheckListener.setBrokerController(this);//创建TransactionalMessageCheckService,服务是周期检查事务的服务,this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);}}
3.3initialize
总结
initialize
方法相应的逻辑相对来说比较多,稍微总结为已下几步:
1.服务器内的相关日志文件的加载,{user.home}/store/config/ 文件目录下的json配置文件(包含topics,consumerOffset,subscriptionGroup,consumerFilter)。
2.如果上述文件加载成功,会启动对应的Broker客户端,然后创建一些线程池,在后面注册 API 指令事件后会监听到API的时候会进行处理
3.注册事件到对应的Broker客户端上,然后会记录对应的API事件和对应线程池封装到一个对象中
4.启动一些 定时任务,这些任务比如记录Broker状态,消费进度持久化等任务
5.初始化一些服务,比如事务相关(周期检查事务),消息权限校验初始化和Rpc调用钩子相关服务。对应的服务加载方式是Java的SPI方式。
4.BrokerControler的start
controller.start();public void start() throws Exception {if (this.messageStore != null) {// 启动消息存储服务DefaultMessageStore,其会对/store/lock文件加锁,// 以确保在broker运行期间只有一个broker实例操作/store目录this.messageStore.start();}if (this.remotingServer != null) {// 启动Netty服务监听10911端口,对外提供服务(消息生产、消费)this.remotingServer.start();}if (this.fastRemotingServer != null) {// 监听10909端口this.fastRemotingServer.start();}if (this.fileWatchService != null) {// fileWatchService与TLS有关,todo tls解析this.fileWatchService.start();}if (this.brokerOuterAPI != null) {// 启动Netty客户端netty,broker使用其向外发送数据,比如:向NameServer上报心跳、topic信息。this.brokerOuterAPI.start();}if (this.pullRequestHoldService != null) {// 长轮询机制hold住拉取消息请求的服务this.pullRequestHoldService.start();}if (this.clientHousekeepingService != null) {// 每10s检查一遍非活动的连接服务this.clientHousekeepingService.start();}if (this.filterServerManager != null) {this.filterServerManager.start();}if (!messageStoreConfig.isEnableDLegerCommitLog()) {// 处理HAstartProcessorByHa(messageStoreConfig.getBrokerRole());// 启动定时任务,定时与slave机器同步数据,同步的内容包括配置,消费位移等handleSlaveSynchronize(messageStoreConfig.getBrokerRole());// 向所有的nameserver发送本机所有的主题数据;// 包括主题名、读队列个数、写队列个数、队列权限、是否有序等this.registerBrokerAll(true, false, true);}this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 定时向NameServer注册Broker,最小每10s。BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager != null) {// Broker信息统计,这个没有具体的实现;所以暂时不用管this.brokerStatsManager.start();}if (this.brokerFastFailure != null) {// Broker对请求队列中的请求进行快速失败,返回`Broker繁忙、请稍后重试`信息this.brokerFastFailure.start();}
}
对这几个服务进行说明一下:
服务名 | 类型 | 说明 |
---|---|---|
messageStore | DefaultMessageStore | 处理消息的存储相关的日志,比如CommitLog,ConsumeQueue等 |
remotingServer | RemotingServer | Broker的服务端,处理消费者和生产者的请求 |
fastRemotingServer | RemotingServer | 只给消息生产者的服务端 |
fileWatchService | FileWatchService | 启动监控服务连接时用到的SSL连接文件的服务 |
brokerOuterAPI | BrokerOuterAPI | RocketMQ控制台跟Broker交互时候的客户端 |
pullRequestHoldService | PullRequestHoldService | 处理push模式消费,或者延迟消费的服务 |
clientHousekeepingService | ClientHousekeepingService | 心跳连接用的服务 |
filterServerManager | FilterServerManager | 消息过滤的服务 |
transactionalMessageCheckService | TransactionalMessageCheckService | 定期检查和处理事务消息的服务 |
slaveSynchronize | SlaveSynchronize | 主从之间topic,消费偏移等信息同步用的 |
RocketMQ源码解析-Broker部分之Broker启动过程相关推荐
- Tomcat源码解析三:tomcat的启动过程
Tomcat组件生命周期管理 在Tomcat总体结构 (Tomcat源代码解析之二)中,我们列出了Tomcat中Server,Service,Connector,Engine,Host,Context ...
- 6、RocketMQ 源码解析之 Broker 启动(上)
上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...
- RocketMQ源码解析之broker文件清理
原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...
- 《RocketMQ源码分析》NameServer如何处理Broker的连接
<RocketMQ源码分析>NameServer如何处理Broker的连接 NameServer 介绍 NameServer 功能 动态路由发现和注册 服务剔除 创建NameServerC ...
- RocketMQ源码解析之消息消费者(consume Message)
原创不易,转载请注明出处 文章目录 前言 1.消息流程介绍 2.源码解析 2.1 并发消费 2.2 顺序消费 前言 我们在<RocketMQ源码解析之消息消费者(pullMessage)> ...
- RocketMQ源码解析:Filtersrv
???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...
- rocketmq源码解析之name启动(一)
2019独角兽企业重金招聘Python工程师标准>>> 说在前面 主要解析namrsrv启动部分,namesrv配置加载.netty server创建.注册出处理器. 正文 源码解析 ...
- Myth源码解析系列之四- 配置与启动详解
在上一篇中,我们项目所需的整个环境都已搭建完成,下面我们主要介绍项目的相关配置于启动环节 配置详解 注意: 这里事务存储我们这里采用的是 : mysql, 消息中间件选择的是:rocketmq, 其他 ...
- 谷歌BERT预训练源码解析(三):训练过程
目录 前言 源码解析 主函数 自定义模型 遮蔽词预测 下一句预测 规范化数据集 前言 本部分介绍BERT训练过程,BERT模型训练过程是在自己的TPU上进行的,这部分我没做过研究所以不做深入探讨.BE ...
- Spring AOP源码解析-拦截器链的执行过程
一.简介 在前面的两篇文章中,分别介绍了 Spring AOP 是如何为目标 bean 筛选合适的通知器,以及如何创建代理对象的过程.现在得到了 bean 的代理对象,且通知也以合适的方式插在了目标方 ...
最新文章
- Tomcat关闭后,重新启动,session中保存的对象为什么还存在解决方法
- 基于matlab异步电机 s函数,建立电机状态方程的S 函数和仿真模)基于MATLAB的无刷双馈电机建模与仿真...
- python字符集_PYTHON 中的字符集
- 怎么把视频里的音乐提取出来
- 在winform中使用wpf窗体
- 20175223 MySQL
- 使用Kubernetes和Docker将Spring Boot与MongoDB作为容器部署
- delphi设计模式 多语言开发
- tpc-c值大于46万_超过46万人参加了2018年慕尼黑啤酒节
- 使用Apache Ignite构建C++版本的分布式应用
- linux下加载ISO镜像的方法
- .net 开发工程师 面试题
- [汇总]计算机专业相关证书大全(持续更新...)
- 基于QT实现的图形软件图片编辑器
- flv怎么转换成mp4格式?
- 令人头大的慢查询分析
- NAND FLAASH基础
- 一周快讯:乐视网市值蒸发349亿,钱宝网张小雷被逮
- 基于Hadoop的企业人力资源管理
- 毕设-基于人脸识别的教室点名系统(一)
热门文章
- 哪些实时翻译的软件好用?分享这三款好用的软件
- 电蚊香长时间通电引发的86离线语音智能盒子的设计开发
- 【推荐】javaweb JAVA JSP家政服务管理系统服务网站jsp服务信息管理jsp保姆月嫂招聘系统案例设计与实现源码
- 《Java并发编程实战》【第一部分 基础知识】
- 蓝屏代码: DRIVER_UNLOADED_WITHOUT_CANCELLING_PENDING_OPERATIONS bootsafe64.sys
- IDEA设置 代码提示 快捷键
- 【机器学习】使用Matlab和CNN完成回归任务
- 无法显示页面,因为发生内部服务器错误。
- JZOJ 7066. 【2021.4.24 NOI模拟】ehzeux与圆周(DP)
- 计算机应用的基础包括内容是什么,计算机应用基础试题2