说明:此代码是跟着《RocketMQ技术内幕》这本书阅读的,借鉴了很多东西,在此感谢丁威大佬和RocketMQ的贡献者们,文章如有问题,欢迎批评指正
RocketMQ版本:4.8.0

2、RocketMQ的注册中心NameServer

2.1 架构设计

  • Producer、Consumer和NameServer各自集群之间是无状态的
  • NameServer是专门为RocketMQ设计的注册中心,比zookeeper设计实现更加简单,在消息发送端提供容错机制来实现高可用

问题:为什么RocketMQ不使用Zookeeper作为注册中心呢?
答:

  1. 根据CAP理论,同时最多只能满足两个点,而zookeeper满足的是CP,也就是说zookeeper并不能保证服务的可用性,zookeeper在进行选举的时候,整个选举的时间太长,期间整个集群都处于不可用的状态,而这对于一个注册中心来说肯定是不能接受的,作为服务发现来说就应该是为可用性而设计。

  2. 基于性能的考虑,NameServer本身的实现非常轻量,而且可以通过增加机器的方式水平扩展,增加集群的抗压能力,而zookeeper的写是不可扩展的,而zookeeper要解决这个问题只能通过划分领域,划分多个zookeeper集群来解决,首先操作起来太复杂,其次这样还是又违反了CAP中的A的设计,导致服务之间是不连通的。

  3. 持久化的机制来带的问题,ZooKeeper 的 ZAB 协议对每一个写请求,会在每个 ZooKeeper 节点上保持写一个事务日志,同时再加上定期的将内存数据镜像(Snapshot)到磁盘来保证数据的一致性和持久性,而对于一个简单的服务发现的场景来说,这其实没有太大的必要,这个实现方案太重了。而且本身存储的数据应该是高度定制化的。

  4. 消息发送应该弱依赖注册中心,而RocketMQ的设计理念也正是基于此,生产者在第一次发送消息的时候从NameServer获取到Broker地址后缓存到本地,如果NameServer整个集群不可用,短时间内对于生产者和消费者并不会产生太大影响。

2.2 NameServer启动流程

启动位置:NamesrvStartup.main()

  • 解析开机命令,加载配置文件、端口等信息,填充namesrvConfig和nettyServerConfig属性
  • 初始化NamesrvController
  • 注册两个定时任务
    • NamesrvController.this.routeInfoManager.scanNotActiveBroker();
      用于检测未激活的Broker,延迟5秒钟开始,每次检测间隔不超过10秒钟
    • NamesrvController.this.kvConfigManager.printAllPeriodically();
      用于打印KV配置,10秒一次
  • 注册一个监听器,用于重新加载SslContext
// 编程小技巧:使用到了线程池等需要释放资源的时候,一种优雅的停机方式就是注册一个钩子函数,在jvm关闭之前释放资源
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {@Overridepublic Void call() throws Exception {controller.shutdown();return null;}
}));

2.3 NameServer的路由注册、故障剔除

2.3.1 路由元信息

代码位置:RouteInfoManager.java
RouteInfopManager存储的信息如下

private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;private final ReadWriteLock lock = new ReentrantReadWriteLock();
// Topic消息队列的路由信息,消息发送时根据路由表进行负载均衡
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// Broker的基本信息,包括集群名称,Broker名称,主备地址等信息
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// 集群信息,存储所有的Broker的名称
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// Broker的状态信息,  NameServer每次接收心跳都会更新这里的信息,包括上次更新时间,数据版本等
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// Broker上的Filter Server列表,用于类模式过滤
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
public class QueueData implements Comparable<QueueData> {private String brokerName;    // Broker名称private int readQueueNums;   // 读队列的个数private int writeQueueNums;    // 写队列的个数private int perm;private int topicSynFlag;
}
public class BrokerData implements Comparable<BrokerData> {private String cluster; // 集群名称private String brokerName;   // Broker名称private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;   // Broker对应其地址private final Random random = new Random();
}
class BrokerLiveInfo {// 上次心跳时间,距离现在超过120秒,NameServer会移除此Broker的路由信息同时关闭Socket连接private long lastUpdateTimestamp;  private DataVersion dataVersion;private Channel channel;private String haServerAddr;
}

2.3.2 路由注册

路由注册是通过Broker和NameServer的心跳功能实现的,具体步骤如下:

​ Broker启动时向配置的所有NameServer发送心跳语句,每隔30秒发送一次,NameServer收到心跳之后,新建/更新BrokerLiveInfo对象,特别是BrokerLiveInfo的lastUpdateTimestamp属性,它标识着上次心跳是时间。NameServer每隔10秒钟会扫描brokerLiveTable集合,如果lastUpdateTimestamp时间超过120秒,NameServer将认为此Broker不可用,NameServer会移除此Broker的路由信息同时关闭Socket连接。

​ NameServer收到心跳后返回会话信息,通知Broker的类型是否为master

2.3.2.1 Broker发送心跳包
// BrokerController#start()
// 创建定时任务,延迟10秒,每30秒执行一次
// brokerConfig.getRegisterNameServerPeriod() = 30 * 1000 (milliseconds)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
// BrokerController#registerBrokerAll
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {// ... ...if (/*...*/) {doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);}
}
// BrokerController#doRegisterBrokerAll
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,TopicConfigSerializeWrapper topicConfigWrapper) {List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.getHAServerAddr(),topicConfigWrapper,this.filterServerManager.buildNewFilterServerList(),oneway,this.brokerConfig.getRegisterBrokerTimeoutMills(),this.brokerConfig.isCompressedRegister());// ... ...
}
// BrokerOuterAPI#registerBrokerAll
public List<RegisterBrokerResult> registerBrokerAll(final String clusterName, final String brokerAddr,    final String brokerName,    final long brokerId,    final String haServerAddr,  final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final boolean oneway,final int timeoutMills,final boolean compressed) {final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();if (nameServerAddressList != null && nameServerAddressList.size() > 0) {// 设置统一的请求头信息final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();requestHeader.setBrokerAddr(brokerAddr);   // Broker的IP地址requestHeader.setBrokerId(brokerId);  // Broker的ID,等于0:Master,大于0: SlaverequestHeader.setBrokerName(brokerName);    // Broker名称requestHeader.setClusterName(clusterName);   // Broker所在集群名称requestHeader.setHaServerAddr(haServerAddr); // Master的地址,第一次请求时为空,Slave向NameServer注册后返回requestHeader.setCompressed(compressed);RegisterBrokerBody requestBody = new RegisterBrokerBody();    // 消息服务器过滤列表requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); // 主题配置,topicConfigWrapper内部封装的是TopicManager中的TopicCofigTable,内部存储的是Broker启动时的默认topic,MixAll.SELF_TEST_TOPIC、MixAll.DEFAULT_TOPIC(AutoCreateTopicEnable = true)、MixAll.BENCHMARK_TOPIC、MixAll.OFFSET_MOVED_EVENT、BrokerConfig#brokerClusterName、BrokerConfig#brokerName。Broker中,Topic的,默认存储位置:${Rocket_HOME}/store/config/topic.json中。requestBody.setFilterServerList(filterServerList);final byte[] body = requestBody.encode(compressed);final int bodyCrc32 = UtilAll.crc32(body);requestHeader.setBodyCrc32(bodyCrc32);final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());// 遍历所有配置的NamerServer地址,使用countDownLatch保证所有线程结束后再继续执行for (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(new Runnable() {@Overridepublic void run() {try {// 分别向nameserver发送心跳包,进行注册RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);if (result != null) {registerBrokerResultList.add(result);}log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);} catch (Exception e) {log.warn("registerBroker Exception, {}", namesrvAddr, e);} finally {countDownLatch.countDown();}}});}try {// 计数到0时,继续执行countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {}}return registerBrokerResultList;
}
// BrokerOuterAPI#registerBroker()
private RegisterBrokerResult registerBroker(final String namesrvAddr,final boolean oneway,final int timeoutMills,final RegisterBrokerRequestHeader requestHeader,final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);request.setBody(body);// 如果是单向发送if (oneway) {try {this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);} catch (RemotingTooMuchRequestException e) {// Ignore}return null;}// 不是单向发送RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {RegisterBrokerResponseHeader responseHeader =(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);RegisterBrokerResult result = new RegisterBrokerResult();result.setMasterAddr(responseHeader.getMasterAddr());  // 注册返回的主Broker,IPresult.setHaServerAddr(responseHeader.getHaServerAddr());  // 主Broker的地址, IP:Portif (response.getBody() != null) {result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));}return result;}default:break;}
}
2.3.2.2 NameServer处理心跳包
// RouteInfoManager.java
public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {try {// 路由注册需要加锁this.lock.writeLock().lockInterruptibly();// 维护 clusterAddrTableSet<String> brokerNames = this.clusterAddrTable.get(clusterName);// 先判断是否存在if (null == brokerNames) {// 不存在需要新建初始化brokerNames = new HashSet<String>();this.clusterAddrTable.put(clusterName, brokerNames);}brokerNames.add(brokerName);// 是否第一次注册boolean registerFirst = false;// 维护 brokerAddrTable中的BrokerData信息BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {// 不存在这个broker的数据,说明是第一次注册,新建并放入brokerAddrTable中registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());this.brokerAddrTable.put(brokerName, brokerData);}Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();//从Borker切换到主Broker:首先删除namesrv中的<1,IP:PORT>,然后添加<0,IP:PORT>//同一 IP:PORT 在brokerAddrTable中只能有一条记录Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();while (it.hasNext()) {Entry<Long, String> item = it.next();// 将 broker地址相同 + brokerId不同 的元素删除,地址相同、id相同就是跟以前一样的数据,不用修改if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {it.remove();}}String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);registerFirst = registerFirst || (null == oldAddr);// 主Brokerif (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) {// topic配置产生变化,或者第一次注册if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {// 更新配置this.createAndUpdateQueueData(brokerName, entry.getValue());}}}}// 旧的Broker状态BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),  // 当前时间topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);}if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr);} else {this.filterServerTable.put(brokerAddr, filterServerList);}}if (MixAll.MASTER_ID != brokerId) {// 如果NameServer检测出是从Broker,需要获取master的地址,并将Master的地址返回给BrokerString masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if (brokerLiveInfo != null) {result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("registerBroker Exception", e);}return result;
}

2.3.3 路由删除

Brokers每30秒向NameServer发送心跳包,包括BrokerID,Broker地址,所属的集群名称,关联的FilterServer等

NameServer每10秒检查一次BrokerLiveInfo的lastUpdateTimestamp字段,距离当前时间超过120秒,则认为此Broker不可用,NameServer会移除此Broker的路由信息同时关闭Socket连接,并同时更新路由表:topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。

2.3.3.1 触发路由删除的两个触发点
  1. 当NameServer扫描所有Broker时,发现brokerLiveTable中Broker的上次心跳时间距离当前时间超过120秒

  2. Broker在正常关闭的时候,会执行unregisterBroker()

    两种触发方式虽然不同,但是删除路由的步骤是一样的:删除topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable中相关的信息(移除路由信息),关闭Socket连接。

2.3.3.2 从超时120秒阅读源码
// NamesrvController#initialize
public boolean initialize() {// ... ...this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {// NameServer初始化时定义的定时任务线程:扫描不可用的BrokerNamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);// ... ...
}
// RouteInfoManager#scanNotActiveBroker
public void scanNotActiveBroker() {Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();while (it.hasNext()) {Entry<String, BrokerLiveInfo> next = it.next();long last = next.getValue().getLastUpdateTimestamp();if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {// 与当前时间比较,大于120秒,关闭ChannelRemotingUtil.closeChannel(next.getValue().getChannel());it.remove();log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);// 从topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable移除Broker信息this.onChannelDestroy(next.getKey(), next.getValue().getChannel());}}
}
// RouteInfoManager#onChannelDestroy
public void onChannelDestroy(String remoteAddr, Channel channel) {String brokerAddrFound = null;if (channel != null) {try {try {// 加读锁this.lock.readLock().lockInterruptibly();Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable = this.brokerLiveTable.entrySet().iterator();while (itBrokerLiveTable.hasNext()) {Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();if (entry.getValue().getChannel() == channel) {brokerAddrFound = entry.getKey();break;}}} finally {// 解读锁this.lock.readLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}}if (null == brokerAddrFound) {brokerAddrFound = remoteAddr;} else {log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);}if (brokerAddrFound != null && brokerAddrFound.length() > 0) {try {try {// 加写锁,分别从brokerLiveTable、filterServerTable、brokerAddrTable、clusterAddrTable中清除Broker信息this.lock.writeLock().lockInterruptibly();this.brokerLiveTable.remove(brokerAddrFound);this.filterServerTable.remove(brokerAddrFound);String brokerNameFound = null;boolean removeBrokerName = false;Iterator<Entry<String, BrokerData>> itBrokerAddrTable = this.brokerAddrTable.entrySet().iterator();while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {// NameServer底层数据结构要求,主从Broker配置文件名称一致BrokerData brokerData = itBrokerAddrTable.next().getValue();Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();while (it.hasNext()) {Entry<Long, String> entry = it.next();Long brokerId = entry.getKey();String brokerAddr = entry.getValue();// 当最底层的brokerAddr和需要删除的地址相同时,清除掉此地址if (brokerAddr.equals(brokerAddrFound)) {brokerNameFound = brokerData.getBrokerName();it.remove();log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",brokerId, brokerAddr);break;}}// 删除Broker之后不再存在节点信息时,修改属性if (brokerData.getBrokerAddrs().isEmpty()) {removeBrokerName = true;itBrokerAddrTable.remove();log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",brokerData.getBrokerName());}}if (brokerNameFound != null && removeBrokerName) {Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();while (it.hasNext()) {Entry<String, Set<String>> entry = it.next();String clusterName = entry.getKey();Set<String> brokerNames = entry.getValue();boolean removed = brokerNames.remove(brokerNameFound);if (removed) {log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",brokerNameFound, clusterName);// 如果移除后,集群内不包含任何Broker,则将该集群从clusterAddrTable中移除if (brokerNames.isEmpty()) {log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",clusterName);it.remove();}break;}}}if (removeBrokerName) {Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =this.topicQueueTable.entrySet().iterator();while (itTopicQueueTable.hasNext()) {Entry<String, List<QueueData>> entry = itTopicQueueTable.next();String topic = entry.getKey();List<QueueData> queueDataList = entry.getValue();Iterator<QueueData> itQueueData = queueDataList.iterator();while (itQueueData.hasNext()) {QueueData queueData = itQueueData.next();// 从topicQueueTable中删除broker相关的QueueDataif (queueData.getBrokerName().equals(brokerNameFound)) {itQueueData.remove();log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", topic, queueData);}}// 队列为空,直接清除if (queueDataList.isEmpty()) {itTopicQueueTable.remove();log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",topic);}}}} finally {// 释放写锁,删除路由完成this.lock.writeLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}}
}
  1. 其中topicQueueTable,brokerAddrTable、clusterAddrTable都需要两次遍历才能获取到brokerAddr,而brokerLiveTable、filterServerTable两个的HashMap的主键就是bokerName,所以可以不用遍历直接remove()。
  2. 获取brokerAddrFound的时候使用的可中断读锁,维护路由表的时候使用了可中断的写锁。

2.3.4 路由发现

总体流程:​ 通过RouteInfoManager#pickupTopicRouteData()根据topic查找路由信息,然后判断是否为顺序消息,是的话加载顺序消息配置

// DefaultRequestProcessor
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final GetRouteInfoRequestHeader requestHeader = (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);// 调用RouteInfoManager#pickupTopicRouteData(),分别填充TopicRouteData中的以下字段// List<QueueData> queueDatas;// List<BrokerData> brokerDatas;// HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTableTopicRouteData topicRouteData =  this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());if (topicRouteData != null) {// 判断该主题是否时是顺序消息if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {// 从NameserverKVconfig中获取顺序消息相关配置String orderTopicConf = this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,requestHeader.getTopic());topicRouteData.setOrderTopicConf(orderTopicConf);}byte[] content = topicRouteData.encode();response.setBody(content);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}// 没有找到返回 ResponseCode.TOPIC_NOT_EXIST:17response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));return response;
}
// RouteInfoManager
public TopicRouteData pickupTopicRouteData(final String topic) {TopicRouteData topicRouteData = new TopicRouteData();boolean foundQueueData = false;boolean foundBrokerData = false;Set<String> brokerNameSet = new HashSet<String>();List<BrokerData> brokerDataList = new LinkedList<BrokerData>();topicRouteData.setBrokerDatas(brokerDataList);HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();topicRouteData.setFilterServerTable(filterServerMap);try {try {// 可中断读锁this.lock.readLock().lockInterruptibly();// QueueData的属性:brokerNameList<QueueData> queueDataList = this.topicQueueTable.get(topic);if (queueDataList != null) {topicRouteData.setQueueDatas(queueDataList);foundQueueData = true;// 收集 brokerName 到set集合Iterator<QueueData> it = queueDataList.iterator();while (it.hasNext()) {QueueData qd = it.next();brokerNameSet.add(qd.getBrokerName());}// 根据 brokerName 到brokerAddrTable中获取BrokerData,从而得到brokerAddr和FilterServer的关系for (String brokerName : brokerNameSet) {BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null != brokerData) {BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone());brokerDataList.add(brokerDataClone);foundBrokerData = true;for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {// 获取brokerAddr关联的过滤器Filter ServerList<String> filterServerList = this.filterServerTable.get(brokerAddr);filterServerMap.put(brokerAddr, filterServerList);}}}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("pickupTopicRouteData Exception", e);}log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);if (foundBrokerData && foundQueueData) {return topicRouteData;}return null;
}

2.4 总结

RocketMQ源码阅读-NameServer篇相关推荐

  1. RocketMQ 源码阅读 ---- 消息消费(普通消息)

    RocketMQ Consumer 消费拉取的消息的方式有两种 1.      Push方式:rocketmq 已经提供了很全面的实现,consumer 通过长轮询拉取消息后回调 MessageLis ...

  2. Golang流媒体实战之六:lal拉流服务源码阅读

    欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos <Golang流媒体实战>系列的链接 体验 ...

  3. redis源码阅读-持久化之RDB

    持久化介绍: redis的持久化有两种方式: rdb :可以在指定的时间间隔内生成数据集的时间点快照(point-in-time snapshot) aof : 记录redis执行的所有写操作命令 根 ...

  4. redis源码阅读-zset

    前段时间给小伙伴分享redis,顺带又把redis撸了一遍了,对其源码,又有了比较深入的了解.(ps: 分享的文章再丰富下再放出来). 数据结构 我们先看下redis 5.0的代码.本次讲解主要是zs ...

  5. redis源码阅读-持久化之aof与aof重写详解

    aof相关配置 aof-rewrite-incremental-fsync yes # aof 开关,默认是关闭的,改为yes表示开启 appendonly no # aof的文件名,默认 appen ...

  6. 深入剖析RocketMQ源码-NameServer

    作者:vivo互联网服务器团队-Ye Wenhao 一.RocketMQ架构简介 1.1 逻辑部署图 (图片来自网络) 1.2 核心组件说明 通过上图可以看到,RocketMQ的核心组件主要包括4个, ...

  7. Mycat源码篇 : 起步,Mycat源码阅读调试环境搭建

    在研究mycat源码之前必须先把环境搭建好.这篇文章的目标就是搭建mycat源码调试环境.环境主要包括: git jdk maven eclipse mysql 这里假设你知道上面的知识点.我们搭建的 ...

  8. Soul网关源码阅读番外篇(一) HTTP参数请求错误

    Soul网关源码阅读番外篇(一) HTTP参数请求错误 共同作者:石立 萧 * 简介     在Soul网关2.2.1版本源码阅读中,遇到了HTTP请求加上参数返回404的错误,此篇文章基于此进行探索 ...

  9. 《RocketMQ源码分析》NameServer如何处理Broker的连接

    <RocketMQ源码分析>NameServer如何处理Broker的连接 NameServer 介绍 NameServer 功能 动态路由发现和注册 服务剔除 创建NameServerC ...

最新文章

  1. 在循环中正确找到对应DOM元素的索引
  2. c语言铁道,C语言程序设计(方少卿) 铁道C第8章(修订版).pdf
  3. Github 移动端上架!把世界最大同性交友社区装进口袋
  4. 如何使用PHP自动备份数据库
  5. 秒杀多线程第八篇 经典线程同步 信号量Semaphore
  6. 同时启动多个Tomcat 和 Linux部署多个tomcat
  7. Linux(CentOS6.4)Solr4.8.1中文分词配置(IK分词)
  8. 织梦 php 传值,php获取post参数的几种方式
  9. openstack 排错
  10. 转化率模型之转化数据延迟
  11. HALCON 20.11:深度学习笔记(6)---有监督训练
  12. cenyos7安装 yum不可用_centos7安装fabric
  13. Openwrt Uboot烧写
  14. STM32DMA搬运ADC只搬了一半数据的原因。
  15. react native+typescript创建移动端项目-(慕课网喜马拉雅项目笔记)-(二,导航器navigator)
  16. EEGLAB初步学习(1)
  17. 使用stm32f103rct6控制adf4351实现30M~~4G的正弦波
  18. PPT教程 从入门到实践
  19. 云计算板块-云计算基础介绍
  20. 经典Seq2Seq与注意力Seq2Seq模型结构详解

热门文章

  1. 艾拉物联福建省平台王涛:品牌为刃,共谋智慧空间宏伟蓝图
  2. 4g网络设置dns地址_4G网速越来越慢,通过这三个简单的操作,网速成倍提升
  3. 推荐7个好用的免费图床
  4. 百度地图开发:多边形覆盖物鼠标高亮效果和标签展示
  5. EEPROM的存储大小
  6. WOX——Windows快速搜索神器
  7. 纯手工 99 分钟倒计时定时器
  8. web前端开发与应用——图像运用
  9. PVCBOT【14号A版】机械狗--四足爬行机器人
  10. 清理僵尸粉后微信聊天记录被黑客监视