上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器。并且其它服务在 NameServer 注册服务信息的时候都是全量注册。如果 RocketMQ 的拓扑图当中有多台 NameServer 的时候,只要有一台存活的话 RocketMQ 集群就可以正常工作。


当 RocketMQ 作为消息队列要向外提供服务的时候,除了需要第一步启动 NameServer 用来保存元数据,还需要启动 RocketMQ 的 Broker。这样才能 Producer 才能发现消息到 Broker,然后 Consumer 才能从 Broker 消费消息。

RocketMQ 的 Broker 启动类为:org.apache.rocketmq.broker.BrokerStartup。下面我们来看一下 Broker 启动做了哪些事情。

1、解析启动命令

其实 BrokerStartup 只是一个启动类,Broker 的启动它其实是委托给 BrokerController 这个类来完成的。它的功能其实是非常强大的,要使用这个类来启动 MQ 的整个服务。Rocket 首先要对这个对象进行初始化,做为一个强大的中间件可配置化也是必须的。所以在初始化 BrokerController ,RocketMQ 会先解析我们的启动命令。还记得我们看源码的时候启动 RocketMQ 的启动命令么,如下所示:

-c /Users/carl/projects/idea/github/apache/rocketmq/config/conf/broker.conf -n localhost:9876

我们来看一下 RocketMQ 的 Broker 启动支持哪些命令:

Borker 启动命令列表

短命令 长命令 描述 解释
p printConfigItem Print all config item 打印所有配置项
c configFile Broker config properties file Broker 的配置文件地址
h help Print help 打印帮助命令
m printImportantConfig Print important config item 打印所有重要的配置项(BrokerConfig类上面所有标注 @ImportantField 注解的属性)
n namesrvAddr Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876 命名服务地址列表

其实最重要的还是 Broker 的配置文件地址的解析,它会把配置文件解析成 Properties 对象,也就是 KV 键值对。然后把这些配置项赋值到以下的 Broker 配置对象当中。

  • BrokerConfig:Broker 服务相关的配置项
  • NettyServerConfig:Broker 对外暴露 Socket 服务相关的配置项(使用 Netty)
  • NettyClientConfig:Broker 对外调用 Socket 服务相关的配置项(使用 Netty)
  • MessageStoreConfig:Broker 消息存储相关的配置项

2、初始化 BrokerController

通过上面解析配置文件并把配置项添加到上面4个配置对象当中,然后调用 BrokerController 的构建函数创建它。

BrokerStartup.java

// 创建 BrokerController 对象
final BrokerController controller = new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);

下面我们来看一下在创建 BrokerController 做了哪些事。

    public BrokerController(final BrokerConfig brokerConfig,final NettyServerConfig nettyServerConfig,final NettyClientConfig nettyClientConfig,final MessageStoreConfig messageStoreConfig) {this.brokerConfig = brokerConfig;this.nettyServerConfig = nettyServerConfig;this.nettyClientConfig = nettyClientConfig;this.messageStoreConfig = messageStoreConfig;// 消费者 offset 管理器this.consumerOffsetManager = new ConsumerOffsetManager(this);// Broker 的 Topic 管理器this.topicConfigManager = new TopicConfigManager(this);// 处理 Consumer 消息拉取处理器this.pullMessageProcessor = new PullMessageProcessor(this);// Consumer 消息拉取长轮训机制this.pullRequestHoldService = new PullRequestHoldService(this);// 监听收到 Producer 消息(通知 PullRequestHoldService)this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);// 消费者管理器(消费者&消费者组信息)this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);// 消费者消费过滤器this.consumerFilterManager = new ConsumerFilterManager(this);// 生产者管理器(过期生产者剔除等功能)this.producerManager = new ProducerManager();// 心跳检测管理器(Producer/Consumer/FilterServer)this.clientHousekeepingService = new ClientHousekeepingService(this);// broker 与客户端通信类this.broker2Client = new Broker2Client(this);// consumer 配置管理器this.subscriptionGroupManager = new SubscriptionGroupManager(this);// broker 与外部通信 API(与 NameServer)this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);// Consumer 消息过滤器管理器this.filterServerManager = new FilterServerManager(this);// 消息主-从同步处理器this.slaveSynchronize = new SlaveSynchronize(this);this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));this.brokerFastFailure = new BrokerFastFailure(this);this.configuration = new Configuration(log,BrokerPathConfigHelper.getBrokerConfigPath(),this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig);}

上面 Borker 的主要功能类已经注释了。关于上面的功能可以结合 RocketMQ 的 github 地址 的特性结合起来理解。在构建了 BrokerController 对象之后,会调用 BrokerController#initialize 方法对它进行初始化。

BrokerController#initialize

    public boolean initialize() throws CloneNotSupportedException {boolean result = this.topicConfigManager.load();result = result && this.consumerOffsetManager.load();result = result && this.subscriptionGroupManager.load();result = result && this.consumerFilterManager.load();if (result) {try {this.messageStore =new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,this.brokerConfig);if (messageStoreConfig.isEnableDLegerCommitLog()) {DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);}this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);//load pluginMessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);this.messageStore = MessageStoreFactory.build(context, this.messageStore);this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));} catch (IOException e) {result = false;log.error("Failed to initialize", e);}}result = result && this.messageStore.load();if (result) {this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getSendMessageThreadPoolNums(),this.brokerConfig.getSendMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.sendThreadPoolQueue,new ThreadFactoryImpl("SendMessageThread_"));this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getPullMessageThreadPoolNums(),this.brokerConfig.getPullMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.pullThreadPoolQueue,new ThreadFactoryImpl("PullMessageThread_"));this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getProcessReplyMessageThreadPoolNums(),this.brokerConfig.getProcessReplyMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.replyThreadPoolQueue,new ThreadFactoryImpl("ProcessReplyMessageThread_"));this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getQueryMessageThreadPoolNums(),this.brokerConfig.getQueryMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.queryThreadPoolQueue,new ThreadFactoryImpl("QueryMessageThread_"));this.adminBrokerExecutor =Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl("AdminBrokerThread_"));this.clientManageExecutor = new ThreadPoolExecutor(this.brokerConfig.getClientManageThreadPoolNums(),this.brokerConfig.getClientManageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.clientManagerThreadPoolQueue,new ThreadFactoryImpl("ClientManageThread_"));this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getHeartbeatThreadPoolNums(),this.brokerConfig.getHeartbeatThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.heartbeatThreadPoolQueue,new ThreadFactoryImpl("HeartbeatThread_", true));this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getEndTransactionThreadPoolNums(),this.brokerConfig.getEndTransactionThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.endTransactionThreadPoolQueue,new ThreadFactoryImpl("EndTransactionThread_"));this.consumerManageExecutor =Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl("ConsumerManageThread_"));this.registerProcessor();final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();final long period = 1000 * 60 * 60 * 24;this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.getBrokerStats().record();} catch (Throwable e) {log.error("schedule record error.", e);}}}, initialDelay, period, TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerOffsetManager.persist();} catch (Throwable e) {log.error("schedule persist consumerOffset error.", e);}}}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerFilterManager.persist();} catch (Throwable e) {log.error("schedule persist consumer filter error.", e);}}}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.protectBroker();} catch (Throwable e) {log.error("protectBroker error.", e);}}}, 3, 3, TimeUnit.MINUTES);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printWaterMark();} catch (Throwable e) {log.error("printWaterMark error.", e);}}}, 10, 1, TimeUnit.SECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());} catch (Throwable e) {log.error("schedule dispatchBehindBytes error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);if (this.brokerConfig.getNamesrvAddr() != null) {this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.brokerOuterAPI.fetchNameServerAddr();} catch (Throwable e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}if (!messageStoreConfig.isEnableDLegerCommitLog()) {if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());this.updateMasterHAServerAddrPeriodically = false;} else {this.updateMasterHAServerAddrPeriodically = true;}} else {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printMasterAndSlaveDiff();} catch (Throwable e) {log.error("schedule printMasterAndSlaveDiff error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);}}if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {// Register a listener to reload SslContexttry {fileWatchService = new FileWatchService(new String[] {TlsSystemConfig.tlsServerCertPath,TlsSystemConfig.tlsServerKeyPath,TlsSystemConfig.tlsServerTrustCertPath},new FileWatchService.Listener() {boolean certChanged, keyChanged = false;@Overridepublic void onChanged(String path) {if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {log.info("The trust certificate changed, reload the ssl context");reloadServerSslContext();}if (path.equals(TlsSystemConfig.tlsServerCertPath)) {certChanged = true;}if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {keyChanged = true;}if (certChanged && keyChanged) {log.info("The certificate and private key changed, reload the ssl context");certChanged = keyChanged = false;reloadServerSslContext();}}private void reloadServerSslContext() {((NettyRemotingServer) remotingServer).loadSslContext();((NettyRemotingServer) fastRemotingServer).loadSslContext();}});} catch (Exception e) {log.warn("FileWatchService created error, can't load the certificate dynamically");}}initialTransaction();initialAcl();initialRpcHooks();}return result;}

2.1 加载配置

会从 rocketmq 的 Broker 配置当中加载核心类的配置信息,包括以下核心类:

功能 配置路径 配置类
TopicConfigManager Topic 配置信息 ${storePathRootDir}/config/topics.json BrokerPathConfigHelper
ConsumerOffsetManager 加载 Consumer Offset ${storePathRootDir}/config/consumerOffset.json BrokerPathConfigHelper
SubscriptionGroupManager 加载 Consumer 的消息者配置信息 ${storePathRootDir}/config/subscriptionGroup.json BrokerPathConfigHelper
ConsumerFilterManager 加载 Consumer 的 Topic 过滤信息 ${storePathRootDir}/config/consumerFilter.json BrokerPathConfigHelper
MessageStore 加载 CommitLog文件/ConsumeQueue文件/ Index 文件/ 定时任务 ~ StorePathConfigHelper

2.2 初始化调度器

RocketMQ 采用的是多端口监听,所以这里会初始化两个远程服务处理类(RemotingServer) remotingServer 以及 fastRemotingServer,用于接收其它端的连接。详情可以查看 – RocketMQ 源码解析之 网络通信 Netty 。

  • remotingServer:监听listenPort配置项指定的监听端口,默认10911
  • fastRemotingServer:监听端口值listenPort-2,即默认为10909

在这里为了提高 RocketMQ 处理请求的效率,都是异步进行处理的。它会初始化一些 ExecutorService 任务调度器多线程处理 RocketMQ 里面逻辑:

调试器名称 任务名称 任务处理类
sendMessageExecutor 异步接收 Producer 发送的消息 SendMessageProcessor
pullMessageExecutor 异步处理 Consumer 拉取消息 PullMessageProcessor
replyMessageExecutor 异步处理 Consumer 消息重试 ReplyMessageProcessor
queryMessageExecutor 异步处理消息查询 NettyRequestProcessor
adminBrokerExecutor 异步处理 Admin 消息 AdminBrokerProcessor
clientManageExecutor 异步处理 Client 注销/检查Client配置消息 ClientManageProcessor
heartbeatExecutor 异步处理 Client 心跳处理 ClientManageProcessor
consumerManageExecutor 异步处理 Consumer 消息(获取消费列表/查询&修改消费者 offset) EndTransactionProcessor
endTransactionExecutor 异步处理 MQ 事务消息 EndTransactionProcessor

上面是任务调度器来处理外部请求,还有一个调度器用于处理 Broker 的内部逻辑,那就是 scheduledExecutorService

处理类 调用方法 功能描述 间隔时间
BrokerStats record() 打印昨日消息接收与消费总数 1 天
ConsumerOffsetManager persist() 持久化 Consumer 的 offset 到配置文件 5 秒
ConsumerFilterManager persist() 持久化 Consumer 的过滤规则到配置文件 10 秒
BrokerController protectBroker() Consumer 失败次数到达阈值剔除 Consumer 3 分钟
BrokerController printWaterMark() 打印内部队列阻塞值 1 秒
MessageStore dispatchBehindBytes() 获取消息存储在commit log 但是没有保存在 consume queue 的 byte 数 60 秒
BrokerOuterAPI fetchNameServerAddr() 更新 Broker 中的 NameServer 地址 2 分钟
BrokerController printMasterAndSlaveDiff() 打印 Broker 主-从 commitlog 之间的差异值 60 秒
BrokerController registerBrokerAll() 注册 Broker、Topic 以及 FilterServer 信息到 NameServer 60 秒

2.3 其它初始化

Broker 在进行了上面的初始化之后,还有其它的初始化操作:

  • FileWatchService:文件变更监听服务,这里主要是监听 SSL 配置相关的文件变更,间隔没 500 毫秒。
  • initialTransaction():初始化 Broker 对于事务消息的片
  • initialAcl():初始化 RocketMQ 鉴权相关的处理
  • initialRpcHooks():初始化 RocketMQ 中 RPCHook 的 SPI 接口并处理

2.4 添加钩子函数

最后,还注册了一个钩子函数。当 RocketMQ 服务器关闭的时候调用 BrokerController#shutdown 进行资源释放。

org.apache.rocketmq.broker.BrokerStartup#createBrokerController

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {private volatile boolean hasShutdown = false;private AtomicInteger shutdownTimes = new AtomicInteger(0);@Overridepublic void run() {synchronized (this) {log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());if (!this.hasShutdown) {this.hasShutdown = true;long beginTime = System.currentTimeMillis();controller.shutdown();long consumingTimeTotal = System.currentTimeMillis() - beginTime;log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);}}}
}, "ShutdownHook"));

3、启动 BrokerController

一切皆已经准备完成,下面我们来看一下 BrokerController 启动都做了哪些事情吧。

org.apache.rocketmq.broker.BrokerController#start

    public void start() throws Exception {if (this.messageStore != null) {this.messageStore.start();}if (this.remotingServer != null) {this.remotingServer.start();}if (this.fastRemotingServer != null) {this.fastRemotingServer.start();}if (this.fileWatchService != null) {this.fileWatchService.start();}if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();}if (this.pullRequestHoldService != null) {this.pullRequestHoldService.start();}if (this.clientHousekeepingService != null) {this.clientHousekeepingService.start();}if (this.filterServerManager != null) {this.filterServerManager.start();}if (!messageStoreConfig.isEnableDLegerCommitLog()) {startProcessorByHa(messageStoreConfig.getBrokerRole());handleSlaveSynchronize(messageStoreConfig.getBrokerRole());this.registerBrokerAll(true, false, true);}this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager != null) {this.brokerStatsManager.start();}if (this.brokerFastFailure != null) {this.brokerFastFailure.start();}}
  • messageStore.start():启动消息存储,包括启动 Broker 的高可用 HA(主-从)机制;启动把内存当中的消息刷到磁盘当中去任务;把 commitLog 中的消息分发到 consumerQueue 文件中任务;还有一些其实的定时任务如下:
处理类 处理方法 任务描述 间隔时间
DefaultMessageStore cleanFilesPeriodically() 清除过期的 commitLog/ consumerQueue 日志文件 10 秒
DefaultMessageStore checkSelf() 检查 commitLog/ consumerQueue 的 MappedFile 10 分钟
DefaultMessageStore ~ 如果 commitLog 锁时间超过了阈值,持久化它的锁信息 1 秒
CleanCommitLogService isSpaceFull() 检测磁盘空间是否足够 10 秒
  • remotingServer.start():这里会启动 remotingServer 也就是 RemotingServer,使用 Netty 暴露 Socket 服务处理外部的请求调用。
  • fastRemotingServer.start():这里会启动 fastRemotingServer 也就是 RemotingServer,使用 Netty 暴露 Socket 服务处理外部的请求调用。
  • fileWatchService.start():启动 文件监听服务,处理 SSL 文件变更
  • brokerOuterAPI.start():启动 brokerOuterAPI 也就是 RemotingClient,使得 Broker 可以调用其它方
  • pullRequestHoldService.start():启动 pullRequestHoldService 服务用于处理 Consumer 拉取消息
  • clientHousekeepingService.start():启动 clientHousekeepingService 服务用于处理 ProducerConsumerFilterServer 的存活
  • filterServerManager.start():启动 filterServerManager 服务用于定时更新 FilterServer
  • registerBrokerAll():注册 Broker 信息到 NameServer
  • brokerStatsManager.start():启动 Broker 中的指标统计
  • brokerFastFailure.start():启动 Broker 请求列表的过期请求清除任务

现在 Broker 启动已经完成了,可以接收 Producer 发送过来的消息了。下一篇我们就去分析一下 Producer 是如何发送消息的。

参考文章:

  • https://blog.csdn.net/prestigeding/article/details/79357818
  • https://github.com/apache/rocketmq
  • https://www.sohu.com/a/409063049_416621

6、RocketMQ 源码解析之 Broker 启动(上)相关推荐

  1. RocketMQ源码解析之broker文件清理

    原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...

  2. rocketmq源码解析之name启动(一)

    2019独角兽企业重金招聘Python工程师标准>>> 说在前面 主要解析namrsrv启动部分,namesrv配置加载.netty server创建.注册出处理器. 正文 源码解析 ...

  3. RocketMQ源码解析之消息消费者(consume Message)

    原创不易,转载请注明出处 文章目录 前言 1.消息流程介绍 2.源码解析 2.1 并发消费 2.2 顺序消费 前言 我们在<RocketMQ源码解析之消息消费者(pullMessage)> ...

  4. 基于8.0源码解析:startService 启动过程

    基于8.0源码解析:startService 启动过程 首先看一张startService的图,心里有个大概的预估,跟Activity启动流程比,Service的启动稍微简单点,并且我把Service ...

  5. RocketMQ源码解析:Filtersrv

    ???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...

  6. RocketMQ源码解析-Broker部分之Broker启动过程

    目录 broker启动流程 broker启动可配置参数 启动入口`BrokerStartup` 1.创建brokerController 2.`BrokerController`构造函数 3.Brok ...

  7. 【RocketMQ|源码分析】namesrv启动停止过程都做了什么

    简介 namesrv在是RocketMQ中一个十分重要的组件,它相当于是Kafka中的zookeeper,Spring Cloud Alibaba中的nacos.它的主要作用是为消息生产者和消息消费者 ...

  8. 消息中间件RocketMQ源码解析-- --调试环境搭建

    1. 依赖工具 JDK :1.8+ Maven IntelliJ IDEA 2. 源码拉取 从官方仓库 [https://github.com/apache/rocketmq) Fork 出属于自己的 ...

  9. 【java】spring-boot源码解析之应用启动

    spring boot 项目使用默认配置的思想,极大的简化了 spring 项目的开发.下面的代码就是一个最简单的 spring 项目: @SpringBootApplication public c ...

最新文章

  1. selenium:学习资源
  2. IOS -- base64编码
  3. 干货丨浅析分布式系统(经典长文,值得收藏)
  4. P2P网络“自由”穿越NAT的“秘密”
  5. 动态生成一个继承接口的类
  6. css中的单位换算_CSS单位px、em、rem及它们之间的换算关系
  7. 前端学习(2433):创建页面组件
  8. 使用fastjson工具类json字符串和对象之间的转换
  9. android连接django(乱哄哄的)
  10. oracle实例名,数据库名,服务名等概念差别与联系
  11. 第一章节 ASP.NET Web应用程序基础(一)
  12. 【nodejs学习】0.nodejs学习第一天
  13. uni-app平台判断 | uni app判断h5 小程序 app 等不同平台
  14. 【转存】游戏中常用术语
  15. 杜邦线改成焊线_做杜邦线(假)教程
  16. python- 小猫钓鱼纸牌游戏
  17. Excel 自动计算房贷、月供 (附模板)
  18. c语言除去字符串多余的空格,从一个字符串中去除多余的空格
  19. 怎样计算期货交易盈亏(期货交易盈利怎么算)
  20. 线性代数Python计算:无关向量组的正交化

热门文章

  1. 气压传感器c语言程序,气压芯片参数——以气压传感器芯片DSH553为例
  2. 【Python_笔记】openpyxl中Workbook()和.load_workbook()区别
  3. 高级算法梳理之LightGBM
  4. 简单理解Hadoop(Hadoop是什么、如何工作)
  5. 打算自学一些编程,想兼职程序员打零工,想问问现在哪个程序员兼职平台单子简单,不考察接单人学历?
  6. 使用three.js加载3dmax资源,以及实现场景中的阴影效果
  7. 【文本分类】文本分类流程及算法原理
  8. 十年Java架构师分享
  9. 遥感专业学习神经网络与深度学习过程中的想法
  10. POJ - 1849 Two(树的直径)