RocketMQ源码学习(六)-Name Server
问题列表:
Name Server 的作用是什么?
Name Server 存储了Broker的什么信息?
Name Server 为Producer的提供些什么信息?
Name Server 为Consuner的提供些什么信息?
Name Server 作用
Name Server在RocketMQ中犹如如它名字一样,是提供Broker发现服务的.
Producer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署
Consumer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。
Name Server 存储了Broker的什么信息?
RouteInfoManager
//主题信息private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//broker信息private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//集群信息private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//活跃broker信息private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
//过滤器信息private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
我们注意到保存broker的Map有两个,即brokerAddrTable用来保存所有的broker列表和brokerLiveTable用来保存当前活跃的broker列表,而BrokerData用来保存broker的主要新增,而BrokerLiveInfo只用来保存上次更新(心跳)时间,我们可以直接看看RouteInfoManager中扫描非活跃broker的方法:
public void scanNotActiveBroker() {Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();while (it.hasNext()) {Entry<String, BrokerLiveInfo> next = it.next();long last = next.getValue().getLastUpdateTimestamp();if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {RemotingUtil.closeChannel(next.getValue().getChannel());it.remove();log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);this.onChannelDestroy(next.getKey(), next.getValue().getChannel());}}}
这个方法由在initialize的定时线程池加载,每十秒执行一次.可以看出,如果两分钟内都没收到一个broker的心跳数据,则直接将其从brokerLiveTable中移除,注意,这还会导致该broker从brokerAddrTable被删除,当然,如果该broker是Master,则它的所有Slave的broker都将被删除。具体细节可以参看RouteInfoManager的onChannelDestroy方法.
Name Server 为Producer的提供些什么信息?
HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private String brokerName; // broker的名称private int readQueueNums; // 读队列数量private int writeQueueNums; // 写队列数量private int perm; // 读写权限private int topicSynFlag; // 同步复制还是异步复制标记
NameServer 维护了key为topic,List<QueueData>的数据为Producer提供服务.返回TopicRouteData信息
RouteInfoManager.pickupTopicRouteData
public TopicRouteData pickupTopicRouteData(final String topic) {TopicRouteData topicRouteData = new TopicRouteData();boolean foundQueueData = false;boolean foundBrokerData = false;Set<String> brokerNameSet = new HashSet<String>();List<BrokerData> brokerDataList = new LinkedList<BrokerData>();topicRouteData.setBrokerDatas(brokerDataList);HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();topicRouteData.setFilterServerTable(filterServerMap);try {try {this.lock.readLock().lockInterruptibly();//根据topic获取QueueData信息List<QueueData> queueDataList = this.topicQueueTable.get(topic);if (queueDataList != null) {topicRouteData.setQueueDatas(queueDataList);foundQueueData = true;Iterator<QueueData> it = queueDataList.iterator();while (it.hasNext()) {QueueData qd = it.next();brokerNameSet.add(qd.getBrokerName());}for (String brokerName : brokerNameSet) {BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null != brokerData) {//根据broker名称获取其地址信息BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone());brokerDataList.add(brokerDataClone);foundBrokerData = true;for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {List<String> filterServerList = this.filterServerTable.get(brokerAddr);filterServerMap.put(brokerAddr, filterServerList);}}}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("pickupTopicRouteData Exception", e);}if (log.isDebugEnabled()) {log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);}if (foundBrokerData && foundQueueData) {return topicRouteData;}return null;}
Name Server 为Consuner的提供些什么信息?
Consumer需要哪些信息?
1. Consumer需要的topic的broker信息
2. 每一个consumer group都有哪些consumer,对应的topic是谁
答
1.如上节所述
2.此信息保存在Broker中
总结
Name Server比较简单,如同一个简单的web服务,提供配置信息,只不过CRUD的不是数据库而是json文件.
此次RocketMQ学习就告一段落了,只描述了我比较关心的流程,很多细节没能涉及到,有时间再写吧,如有疑问和错误请在评论中指出,thx!
RocketMQ源码学习(六)-Name Server相关推荐
- RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的?
RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 文章目录 RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 前言 项目 ...
- RocketMQ源码学习
RocketMQ源码学习 文章目录 RocketMQ源码学习 Producer 是怎么将消息发送至 Broker 的? 同步发送 异步发送 队列选择器 事务消息 原理 Broker 是怎么处理客户端发 ...
- [spring源码学习]六、IOC源码-BeanFactory和factory-bean
https://www.cnblogs.com/jyyzzjl/p/5459335.html 一.代码实例 在我们分析spring的IOC源码的时候,发现除了配置标准的bean,并且通过getBean ...
- RocketMQ源码(十六)之文件清理
文章目录 简介 源码分析 CommitLog清理 ConsumeQueue和Index清理 简介 Broker文件清理主要清理CommitLog.ConsumeQueue.IndexFile Comm ...
- RocketMQ源码(十七)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码
转载来源: RocketMQ源码(19)-Broker处理DefaultMQPushConsumer发起的拉取消息请求源码[一万字]_刘Java的博客-CSDN博客 此前我们学习了RocketMQ源码 ...
- mutations vuex 调用_Vuex源码学习(六)action和mutation如何被调用的(前置准备篇)...
前言 Vuex源码系列不知不觉已经到了第六篇.前置的五篇分别如下: 长篇连载:Vuex源码学习(一)功能梳理 长篇连载:Vuex源码学习(二)脉络梳理 作为一个Web前端,你知道Vuex的instal ...
- action mutation 调用_Vuex源码学习(六)action和mutation如何被调用的(前置准备篇)...
module与moduleCollection你一定要会啊!Vuex源码学习(五)加工后的module 在组件中使用vuex的dispatch和commit的时候,我们只要把action.mutati ...
- RocketMQ 源码分析 —— 集成 Spring Boot
点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...
- RocketMQ源码(八)Broker asyncSendMessage处理消息以及自动创建Topic
此前已经梳理了RocketMQ的broker接收Producer消息的入口源码RocketMQ(七)broker接收消息入口源码_代码---小白的博客-CSDN博客 在文章的最后我们到了SendMes ...
最新文章
- 二维码识别器PC版(电脑版)
- Echarts的提示(Tooltip)显示额外内容
- 啊啊啊 草蛋啊 之前努力一天搞出来的时间算法 被一句pk掉 给我砖头
- 方法重写(Java篇)
- POJ 3981(字符串替换)
- mysql修改表的备注信息_修改mysql 数据库的 表的列的备注信息
- mysql面试常问 1: 谈谈MySQL表级锁和行级锁
- Android进阶——深入浅出Handler(一)
- SqlServer2012 File Table文件表
- VS2010/MFC编程入门之二(VS2010应用程序工程中文件的组成结构)
- linux生成密码文本,Linux下用makepasswd和passwordmaker生成密码
- python可以连接sql server_python连接sqlserver数据库之一
- JQuery Easyui/TopJUI 创建多级联动下拉框(纯HTML实现!!!)
- java面经大全,献给每一个努力过,在努力以及将要努力的人。goodlucky。2018-10-15开始更新
- 抛开理论公式,用符合直觉的方式理解四旋翼无人机控制
- python库阿里云镜像大全
- Android 图片 批量上传,移动端图片批量上传问题
- 前端随录(SPA与MPA和PWA)
- 从上海回杭州三年,我的生活发生了翻天覆地的变化
- 航班编程代码c语言,c语言编写航班查询代码.doc