6、RocketMQ 源码解析之 Broker 启动(上)
上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器。并且其它服务在 NameServer 注册服务信息的时候都是全量注册。如果 RocketMQ 的拓扑图当中有多台 NameServer 的时候,只要有一台存活的话 RocketMQ 集群就可以正常工作。
当 RocketMQ 作为消息队列要向外提供服务的时候,除了需要第一步启动 NameServer 用来保存元数据,还需要启动 RocketMQ 的 Broker。这样才能 Producer 才能发现消息到 Broker,然后 Consumer 才能从 Broker 消费消息。
RocketMQ 的 Broker 启动类为:org.apache.rocketmq.broker.BrokerStartup
。下面我们来看一下 Broker 启动做了哪些事情。
1、解析启动命令
其实 BrokerStartup
只是一个启动类,Broker 的启动它其实是委托给 BrokerController
这个类来完成的。它的功能其实是非常强大的,要使用这个类来启动 MQ 的整个服务。Rocket 首先要对这个对象进行初始化,做为一个强大的中间件可配置化也是必须的。所以在初始化 BrokerController
,RocketMQ 会先解析我们的启动命令。还记得我们看源码的时候启动 RocketMQ 的启动命令么,如下所示:
-c /Users/carl/projects/idea/github/apache/rocketmq/config/conf/broker.conf -n localhost:9876
我们来看一下 RocketMQ 的 Broker 启动支持哪些命令:
Borker 启动命令列表
短命令 | 长命令 | 描述 | 解释 |
---|---|---|---|
p | printConfigItem | Print all config item | 打印所有配置项 |
c | configFile | Broker config properties file | Broker 的配置文件地址 |
h | help | Print help | 打印帮助命令 |
m | printImportantConfig | Print important config item | 打印所有重要的配置项(BrokerConfig类上面所有标注 @ImportantField 注解的属性) |
n | namesrvAddr | Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876 | 命名服务地址列表 |
其实最重要的还是 Broker 的配置文件地址的解析,它会把配置文件解析成 Properties 对象,也就是 KV 键值对。然后把这些配置项赋值到以下的 Broker 配置对象当中。
BrokerConfig
:Broker 服务相关的配置项NettyServerConfig
:Broker 对外暴露 Socket 服务相关的配置项(使用 Netty)NettyClientConfig
:Broker 对外调用 Socket 服务相关的配置项(使用 Netty)MessageStoreConfig
:Broker 消息存储相关的配置项
2、初始化 BrokerController
通过上面解析配置文件并把配置项添加到上面4个配置对象当中,然后调用 BrokerController 的构建函数创建它。
BrokerStartup.java
// 创建 BrokerController 对象
final BrokerController controller = new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);
下面我们来看一下在创建 BrokerController
做了哪些事。
public BrokerController(final BrokerConfig brokerConfig,final NettyServerConfig nettyServerConfig,final NettyClientConfig nettyClientConfig,final MessageStoreConfig messageStoreConfig) {this.brokerConfig = brokerConfig;this.nettyServerConfig = nettyServerConfig;this.nettyClientConfig = nettyClientConfig;this.messageStoreConfig = messageStoreConfig;// 消费者 offset 管理器this.consumerOffsetManager = new ConsumerOffsetManager(this);// Broker 的 Topic 管理器this.topicConfigManager = new TopicConfigManager(this);// 处理 Consumer 消息拉取处理器this.pullMessageProcessor = new PullMessageProcessor(this);// Consumer 消息拉取长轮训机制this.pullRequestHoldService = new PullRequestHoldService(this);// 监听收到 Producer 消息(通知 PullRequestHoldService)this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);// 消费者管理器(消费者&消费者组信息)this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);// 消费者消费过滤器this.consumerFilterManager = new ConsumerFilterManager(this);// 生产者管理器(过期生产者剔除等功能)this.producerManager = new ProducerManager();// 心跳检测管理器(Producer/Consumer/FilterServer)this.clientHousekeepingService = new ClientHousekeepingService(this);// broker 与客户端通信类this.broker2Client = new Broker2Client(this);// consumer 配置管理器this.subscriptionGroupManager = new SubscriptionGroupManager(this);// broker 与外部通信 API(与 NameServer)this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);// Consumer 消息过滤器管理器this.filterServerManager = new FilterServerManager(this);// 消息主-从同步处理器this.slaveSynchronize = new SlaveSynchronize(this);this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));this.brokerFastFailure = new BrokerFastFailure(this);this.configuration = new Configuration(log,BrokerPathConfigHelper.getBrokerConfigPath(),this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig);}
上面 Borker 的主要功能类已经注释了。关于上面的功能可以结合 RocketMQ 的 github 地址 的特性结合起来理解。在构建了 BrokerController 对象之后,会调用 BrokerController#initialize
方法对它进行初始化。
BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {boolean result = this.topicConfigManager.load();result = result && this.consumerOffsetManager.load();result = result && this.subscriptionGroupManager.load();result = result && this.consumerFilterManager.load();if (result) {try {this.messageStore =new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,this.brokerConfig);if (messageStoreConfig.isEnableDLegerCommitLog()) {DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);}this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);//load pluginMessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);this.messageStore = MessageStoreFactory.build(context, this.messageStore);this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));} catch (IOException e) {result = false;log.error("Failed to initialize", e);}}result = result && this.messageStore.load();if (result) {this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getSendMessageThreadPoolNums(),this.brokerConfig.getSendMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.sendThreadPoolQueue,new ThreadFactoryImpl("SendMessageThread_"));this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getPullMessageThreadPoolNums(),this.brokerConfig.getPullMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.pullThreadPoolQueue,new ThreadFactoryImpl("PullMessageThread_"));this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getProcessReplyMessageThreadPoolNums(),this.brokerConfig.getProcessReplyMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.replyThreadPoolQueue,new ThreadFactoryImpl("ProcessReplyMessageThread_"));this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getQueryMessageThreadPoolNums(),this.brokerConfig.getQueryMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.queryThreadPoolQueue,new ThreadFactoryImpl("QueryMessageThread_"));this.adminBrokerExecutor =Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl("AdminBrokerThread_"));this.clientManageExecutor = new ThreadPoolExecutor(this.brokerConfig.getClientManageThreadPoolNums(),this.brokerConfig.getClientManageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.clientManagerThreadPoolQueue,new ThreadFactoryImpl("ClientManageThread_"));this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getHeartbeatThreadPoolNums(),this.brokerConfig.getHeartbeatThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.heartbeatThreadPoolQueue,new ThreadFactoryImpl("HeartbeatThread_", true));this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getEndTransactionThreadPoolNums(),this.brokerConfig.getEndTransactionThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.endTransactionThreadPoolQueue,new ThreadFactoryImpl("EndTransactionThread_"));this.consumerManageExecutor =Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl("ConsumerManageThread_"));this.registerProcessor();final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();final long period = 1000 * 60 * 60 * 24;this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.getBrokerStats().record();} catch (Throwable e) {log.error("schedule record error.", e);}}}, initialDelay, period, TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerOffsetManager.persist();} catch (Throwable e) {log.error("schedule persist consumerOffset error.", e);}}}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerFilterManager.persist();} catch (Throwable e) {log.error("schedule persist consumer filter error.", e);}}}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.protectBroker();} catch (Throwable e) {log.error("protectBroker error.", e);}}}, 3, 3, TimeUnit.MINUTES);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printWaterMark();} catch (Throwable e) {log.error("printWaterMark error.", e);}}}, 10, 1, TimeUnit.SECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());} catch (Throwable e) {log.error("schedule dispatchBehindBytes error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);if (this.brokerConfig.getNamesrvAddr() != null) {this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.brokerOuterAPI.fetchNameServerAddr();} catch (Throwable e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}if (!messageStoreConfig.isEnableDLegerCommitLog()) {if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());this.updateMasterHAServerAddrPeriodically = false;} else {this.updateMasterHAServerAddrPeriodically = true;}} else {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printMasterAndSlaveDiff();} catch (Throwable e) {log.error("schedule printMasterAndSlaveDiff error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);}}if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {// Register a listener to reload SslContexttry {fileWatchService = new FileWatchService(new String[] {TlsSystemConfig.tlsServerCertPath,TlsSystemConfig.tlsServerKeyPath,TlsSystemConfig.tlsServerTrustCertPath},new FileWatchService.Listener() {boolean certChanged, keyChanged = false;@Overridepublic void onChanged(String path) {if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {log.info("The trust certificate changed, reload the ssl context");reloadServerSslContext();}if (path.equals(TlsSystemConfig.tlsServerCertPath)) {certChanged = true;}if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {keyChanged = true;}if (certChanged && keyChanged) {log.info("The certificate and private key changed, reload the ssl context");certChanged = keyChanged = false;reloadServerSslContext();}}private void reloadServerSslContext() {((NettyRemotingServer) remotingServer).loadSslContext();((NettyRemotingServer) fastRemotingServer).loadSslContext();}});} catch (Exception e) {log.warn("FileWatchService created error, can't load the certificate dynamically");}}initialTransaction();initialAcl();initialRpcHooks();}return result;}
2.1 加载配置
会从 rocketmq 的 Broker 配置当中加载核心类的配置信息,包括以下核心类:
类 | 功能 | 配置路径 | 配置类 |
---|---|---|---|
TopicConfigManager | Topic 配置信息 | ${storePathRootDir}/config/topics.json | BrokerPathConfigHelper |
ConsumerOffsetManager | 加载 Consumer Offset | ${storePathRootDir}/config/consumerOffset.json | BrokerPathConfigHelper |
SubscriptionGroupManager | 加载 Consumer 的消息者配置信息 | ${storePathRootDir}/config/subscriptionGroup.json | BrokerPathConfigHelper |
ConsumerFilterManager | 加载 Consumer 的 Topic 过滤信息 | ${storePathRootDir}/config/consumerFilter.json | BrokerPathConfigHelper |
MessageStore | 加载 CommitLog文件/ConsumeQueue文件/ Index 文件/ 定时任务 | ~ | StorePathConfigHelper |
2.2 初始化调度器
RocketMQ 采用的是多端口监听,所以这里会初始化两个远程服务处理类(RemotingServer) remotingServer
以及 fastRemotingServer
,用于接收其它端的连接。详情可以查看 – RocketMQ 源码解析之 网络通信 Netty 。
- remotingServer:监听listenPort配置项指定的监听端口,默认10911
- fastRemotingServer:监听端口值listenPort-2,即默认为10909
在这里为了提高 RocketMQ 处理请求的效率,都是异步进行处理的。它会初始化一些 ExecutorService 任务调度器多线程处理 RocketMQ 里面逻辑:
调试器名称 | 任务名称 | 任务处理类 |
---|---|---|
sendMessageExecutor | 异步接收 Producer 发送的消息 | SendMessageProcessor |
pullMessageExecutor | 异步处理 Consumer 拉取消息 | PullMessageProcessor |
replyMessageExecutor | 异步处理 Consumer 消息重试 | ReplyMessageProcessor |
queryMessageExecutor | 异步处理消息查询 | NettyRequestProcessor |
adminBrokerExecutor | 异步处理 Admin 消息 | AdminBrokerProcessor |
clientManageExecutor | 异步处理 Client 注销/检查Client配置消息 | ClientManageProcessor |
heartbeatExecutor | 异步处理 Client 心跳处理 | ClientManageProcessor |
consumerManageExecutor | 异步处理 Consumer 消息(获取消费列表/查询&修改消费者 offset) | EndTransactionProcessor |
endTransactionExecutor | 异步处理 MQ 事务消息 | EndTransactionProcessor |
上面是任务调度器来处理外部请求,还有一个调度器用于处理 Broker 的内部逻辑,那就是 scheduledExecutorService
。
处理类 | 调用方法 | 功能描述 | 间隔时间 |
---|---|---|---|
BrokerStats | record() | 打印昨日消息接收与消费总数 | 1 天 |
ConsumerOffsetManager | persist() | 持久化 Consumer 的 offset 到配置文件 | 5 秒 |
ConsumerFilterManager | persist() | 持久化 Consumer 的过滤规则到配置文件 | 10 秒 |
BrokerController | protectBroker() | Consumer 失败次数到达阈值剔除 Consumer | 3 分钟 |
BrokerController | printWaterMark() | 打印内部队列阻塞值 | 1 秒 |
MessageStore | dispatchBehindBytes() | 获取消息存储在commit log 但是没有保存在 consume queue 的 byte 数 | 60 秒 |
BrokerOuterAPI | fetchNameServerAddr() | 更新 Broker 中的 NameServer 地址 | 2 分钟 |
BrokerController | printMasterAndSlaveDiff() | 打印 Broker 主-从 commitlog 之间的差异值 | 60 秒 |
BrokerController | registerBrokerAll() | 注册 Broker、Topic 以及 FilterServer 信息到 NameServer | 60 秒 |
2.3 其它初始化
Broker 在进行了上面的初始化之后,还有其它的初始化操作:
- FileWatchService:文件变更监听服务,这里主要是监听 SSL 配置相关的文件变更,间隔没 500 毫秒。
- initialTransaction():初始化 Broker 对于事务消息的片
- initialAcl():初始化 RocketMQ 鉴权相关的处理
- initialRpcHooks():初始化 RocketMQ 中 RPCHook 的 SPI 接口并处理
2.4 添加钩子函数
最后,还注册了一个钩子函数。当 RocketMQ 服务器关闭的时候调用 BrokerController#shutdown
进行资源释放。
org.apache.rocketmq.broker.BrokerStartup#createBrokerController
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {private volatile boolean hasShutdown = false;private AtomicInteger shutdownTimes = new AtomicInteger(0);@Overridepublic void run() {synchronized (this) {log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());if (!this.hasShutdown) {this.hasShutdown = true;long beginTime = System.currentTimeMillis();controller.shutdown();long consumingTimeTotal = System.currentTimeMillis() - beginTime;log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);}}}
}, "ShutdownHook"));
3、启动 BrokerController
一切皆已经准备完成,下面我们来看一下 BrokerController 启动都做了哪些事情吧。
org.apache.rocketmq.broker.BrokerController#start
public void start() throws Exception {if (this.messageStore != null) {this.messageStore.start();}if (this.remotingServer != null) {this.remotingServer.start();}if (this.fastRemotingServer != null) {this.fastRemotingServer.start();}if (this.fileWatchService != null) {this.fileWatchService.start();}if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();}if (this.pullRequestHoldService != null) {this.pullRequestHoldService.start();}if (this.clientHousekeepingService != null) {this.clientHousekeepingService.start();}if (this.filterServerManager != null) {this.filterServerManager.start();}if (!messageStoreConfig.isEnableDLegerCommitLog()) {startProcessorByHa(messageStoreConfig.getBrokerRole());handleSlaveSynchronize(messageStoreConfig.getBrokerRole());this.registerBrokerAll(true, false, true);}this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager != null) {this.brokerStatsManager.start();}if (this.brokerFastFailure != null) {this.brokerFastFailure.start();}}
messageStore.start()
:启动消息存储,包括启动 Broker 的高可用 HA(主-从)机制;启动把内存当中的消息刷到磁盘当中去任务;把commitLog
中的消息分发到consumerQueue
文件中任务;还有一些其实的定时任务如下:
处理类 | 处理方法 | 任务描述 | 间隔时间 |
---|---|---|---|
DefaultMessageStore | cleanFilesPeriodically() | 清除过期的 commitLog/ consumerQueue 日志文件 | 10 秒 |
DefaultMessageStore | checkSelf() | 检查 commitLog/ consumerQueue 的 MappedFile | 10 分钟 |
DefaultMessageStore | ~ | 如果 commitLog 锁时间超过了阈值,持久化它的锁信息 | 1 秒 |
CleanCommitLogService | isSpaceFull() | 检测磁盘空间是否足够 | 10 秒 |
remotingServer.start()
:这里会启动remotingServer
也就是RemotingServer
,使用 Netty 暴露 Socket 服务处理外部的请求调用。fastRemotingServer.start()
:这里会启动fastRemotingServer
也就是RemotingServer
,使用 Netty 暴露 Socket 服务处理外部的请求调用。fileWatchService.start()
:启动 文件监听服务,处理 SSL 文件变更brokerOuterAPI.start()
:启动brokerOuterAPI
也就是RemotingClient
,使得Broker
可以调用其它方pullRequestHoldService.start()
:启动pullRequestHoldService
服务用于处理Consumer
拉取消息clientHousekeepingService.start()
:启动clientHousekeepingService
服务用于处理Producer
、Consumer
、FilterServer
的存活filterServerManager.start()
:启动filterServerManager
服务用于定时更新FilterServer
。registerBrokerAll()
:注册 Broker 信息到NameServer
。brokerStatsManager.start()
:启动 Broker 中的指标统计brokerFastFailure.start()
:启动 Broker 请求列表的过期请求清除任务
现在 Broker 启动已经完成了,可以接收 Producer 发送过来的消息了。下一篇我们就去分析一下 Producer 是如何发送消息的。
参考文章:
- https://blog.csdn.net/prestigeding/article/details/79357818
- https://github.com/apache/rocketmq
- https://www.sohu.com/a/409063049_416621
6、RocketMQ 源码解析之 Broker 启动(上)相关推荐
- RocketMQ源码解析之broker文件清理
原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...
- rocketmq源码解析之name启动(一)
2019独角兽企业重金招聘Python工程师标准>>> 说在前面 主要解析namrsrv启动部分,namesrv配置加载.netty server创建.注册出处理器. 正文 源码解析 ...
- RocketMQ源码解析之消息消费者(consume Message)
原创不易,转载请注明出处 文章目录 前言 1.消息流程介绍 2.源码解析 2.1 并发消费 2.2 顺序消费 前言 我们在<RocketMQ源码解析之消息消费者(pullMessage)> ...
- 基于8.0源码解析:startService 启动过程
基于8.0源码解析:startService 启动过程 首先看一张startService的图,心里有个大概的预估,跟Activity启动流程比,Service的启动稍微简单点,并且我把Service ...
- RocketMQ源码解析:Filtersrv
???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...
- RocketMQ源码解析-Broker部分之Broker启动过程
目录 broker启动流程 broker启动可配置参数 启动入口`BrokerStartup` 1.创建brokerController 2.`BrokerController`构造函数 3.Brok ...
- 【RocketMQ|源码分析】namesrv启动停止过程都做了什么
简介 namesrv在是RocketMQ中一个十分重要的组件,它相当于是Kafka中的zookeeper,Spring Cloud Alibaba中的nacos.它的主要作用是为消息生产者和消息消费者 ...
- 消息中间件RocketMQ源码解析-- --调试环境搭建
1. 依赖工具 JDK :1.8+ Maven IntelliJ IDEA 2. 源码拉取 从官方仓库 [https://github.com/apache/rocketmq) Fork 出属于自己的 ...
- 【java】spring-boot源码解析之应用启动
spring boot 项目使用默认配置的思想,极大的简化了 spring 项目的开发.下面的代码就是一个最简单的 spring 项目: @SpringBootApplication public c ...
最新文章
- selenium:学习资源
- IOS -- base64编码
- 干货丨浅析分布式系统(经典长文,值得收藏)
- P2P网络“自由”穿越NAT的“秘密”
- 动态生成一个继承接口的类
- css中的单位换算_CSS单位px、em、rem及它们之间的换算关系
- 前端学习(2433):创建页面组件
- 使用fastjson工具类json字符串和对象之间的转换
- android连接django(乱哄哄的)
- oracle实例名,数据库名,服务名等概念差别与联系
- 第一章节 ASP.NET Web应用程序基础(一)
- 【nodejs学习】0.nodejs学习第一天
- uni-app平台判断 | uni app判断h5 小程序 app 等不同平台
- 【转存】游戏中常用术语
- 杜邦线改成焊线_做杜邦线(假)教程
- python- 小猫钓鱼纸牌游戏
- Excel 自动计算房贷、月供 (附模板)
- c语言除去字符串多余的空格,从一个字符串中去除多余的空格
- 怎样计算期货交易盈亏(期货交易盈利怎么算)
- 线性代数Python计算:无关向量组的正交化
热门文章
- 气压传感器c语言程序,气压芯片参数——以气压传感器芯片DSH553为例
- 【Python_笔记】openpyxl中Workbook()和.load_workbook()区别
- 高级算法梳理之LightGBM
- 简单理解Hadoop(Hadoop是什么、如何工作)
- 打算自学一些编程,想兼职程序员打零工,想问问现在哪个程序员兼职平台单子简单,不考察接单人学历?
- 使用three.js加载3dmax资源,以及实现场景中的阴影效果
- 【文本分类】文本分类流程及算法原理
- 十年Java架构师分享
- 遥感专业学习神经网络与深度学习过程中的想法
- POJ - 1849 Two(树的直径)