ControllerBrokerRequestBatch分析
为了提高KafkaController Leader和集群其他broker的通信效率,实现批量发送请求的功能
leaderAndIsrRequestMap:保存了发往指定broker的LeaderAndIsrRequest请求相关的信息
stopReplicaRequestMap: 保存了发往指定broker的StopReplicaRequest请求相关的信息
updateMetadataRequestMap:保存了发往指定broker的UpdateMetadataCacheRequest请求相关的信息
// 将保存相关请求的集合清空
def clear() {
leaderAndIsrRequestMap.clear()
stopReplicaRequestMap.clear()
updateMetadataRequestMap.clear()
}
// 往LeaderAndIsrRequest请求的map添加相关信息
def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
replicas: Seq[Int], callback: AbstractRequestResponse => Unit = null) {
val topicPartition = new TopicPartition(topic, partition)
brokerIds.filter(_ >= 0).foreach { brokerId =>
val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
result.put(topicPartition, PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
}
addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
Set(TopicAndPartition(topic, partition)))
}
// 往StopReplicaRequest请求的map添加相关信息
def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean,
callback: (AbstractRequestResponse, Int) => Unit = null) {
brokerIds.filter(b => b >= 0).foreach { brokerId =>
stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[StopReplicaRequestInfo])
val v = stopReplicaRequestMap(brokerId)
if(callback != null)
stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
deletePartition, (r: AbstractRequestResponse) => callback(r, brokerId))
else
stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
deletePartition)
}
}
// 往UpdateMetadataRequest请求的map添加相关信息
def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition],
callback: AbstractRequestResponse => Unit = null) {
// 定义回调函数
def updateMetadataRequestMapFor(partition: TopicAndPartition, beingDeleted: Boolean) {
// 找到controllerContext中保存的该分区的leader
val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
leaderIsrAndControllerEpochOptmatch {
case Some(leaderIsrAndControllerEpoch) =>
// 获取该分区的AR集合
val replicas = controllerContext.partitionReplicaAssignment(partition).toSet
val partitionStateInfo = if (beingDeleted) {
val leaderAndIsr = new LeaderAndIsr(LeaderAndIsr.LeaderDuringDelete, leaderIsrAndControllerEpoch.leaderAndIsr.isr)
PartitionStateInfo(LeaderIsrAndControllerEpoch(leaderAndIsr, leaderIsrAndControllerEpoch.controllerEpoch), replicas)
} else {
PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
}
// 向updateMetadataCacaheRequestMap添加数据
brokerIds.filter(b => b >= 0).foreach { brokerId =>
updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo])
updateMetadataRequestMap(brokerId).put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo)
}
case None =>
info("Leader not yet assigned for partition %s. Skip sendingUpdateMetadataRequest.".format(partition))
}
}
// 如果指定的分区集合为空,则需要全部更新分区
val filteredPartitions = {
// 如果指定的分区集合为空
val givenPartitions = if (partitions.isEmpty)
// 返回获取所有分区的leader信息
controllerContext.partitionLeadershipInfo.keySet
else
partitions // 直接返回分区
// 过滤即将被删除的partition
if (controller.deleteTopicManager.partitionsToBeDeleted.isEmpty)
givenPartitions
else
givenPartitions-- controller.deleteTopicManager.partitionsToBeDeleted
}
// 将过滤后的分区信息添加到updateMetadataCacheRequestMap集合,等待发送
if (filteredPartitions.isEmpty)
brokerIds.filter(b => b >= 0).foreach { brokerId =>
updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo])
}
else
filteredPartitions.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = false))
controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = true))
}
def sendRequestsToBrokers(controllerEpoch: Int) {try {// 首先处理leaderAndIsrRequestMapleaderAndIsrRequestMap.foreach { case (broker, partitionStateInfos) =>partitionStateInfos.foreach { case (topicPartition, state) =>val typeOfRequest = if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s to broker %d " +"for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest,state.leaderIsrAndControllerEpoch, broker,topicPartition.topic, topicPartition.partition))}val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSetval leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {_.getNode(controller.config.interBrokerSecurityProtocol)}val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpochval partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,partitionStateInfo.allReplicas.map(Integer.valueOf).asJava)topicPartition -> partitionState}// 创建LeaderAndIsrRequest对象val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, controllerEpoch, partitionStates.asJava, leaders.asJava)// 添加到队列等到被发送controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest, null)}// 将集合清空leaderAndIsrRequestMap.clear()// 处理updateMetadataRequestMap信息updateMetadataRequestMap.foreach { case (broker, partitionStateInfos) =>partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s " +"to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,broker, p._1)))val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpochval partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,partitionStateInfo.allReplicas.map(Integer.valueOf).asJava)topicPartition -> partitionState}val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2: Shortelse if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1: Shortelse 0: Short// 创建UpdateMetadataRequestval updateMetadataRequest =if (version == 0) {val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map(_.getNode(SecurityProtocol.PLAINTEXT))new UpdateMetadataRequest(controllerId, controllerEpoch, liveBrokers.asJava, partitionStates.asJava)}else {val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map { broker =>val endPoints = broker.endPoints.map { case (securityProtocol, endPoint) =>securityProtocol -> new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port)}new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)}new UpdateMetadataRequest(version, controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava)}// 添加到队列等到被发送controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, Some(version), updateMetadataRequest, null)}// 将集合清空updateMetadataRequestMap.clear()// 处理stopReplicaRequestMap信息stopReplicaRequestMap.foreach { case (broker, replicaInfoList) =>val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSetval stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSetdebug("The stop replica request (delete = true) sent to broker %d is %s".format(broker, stopReplicaWithDelete.mkString(",")))debug("The stop replica request (delete = false) sent to broker %d is %s".format(broker, stopReplicaWithoutDelete.mkString(",")))replicaInfoList.foreach { r =>// 创建StopReplicaRequest对象val stopReplicaRequest = new StopReplicaRequest(controllerId, controllerEpoch, r.deletePartition,Set(new TopicPartition(r.replica.topic, r.replica.partition)).asJava)// 添加到队列等到被发送controller.sendRequest(broker, ApiKeys.STOP_REPLICA, None, stopReplicaRequest, r.callback)}}// 将集合清空stopReplicaRequestMap.clear()} catch {case e : Throwable => {if (leaderAndIsrRequestMap.nonEmpty) {error("Haven't been able to send leader and isr requests, current state of " +s"the map is $leaderAndIsrRequestMap. Exception message: $e")}if (updateMetadataRequestMap.nonEmpty) {error("Haven't been able to send metadata update requests, current state of " +s"the map is $updateMetadataRequestMap. Exception message: $e")}if (stopReplicaRequestMap.nonEmpty) {error("Haven't been able to send stop replica requests, current state of " +s"the map is $stopReplicaRequestMap. Exception message: $e")}throw new IllegalStateException(e)}} }
ControllerBrokerRequestBatch分析相关推荐
- Kafka源码分析10:副本状态机ReplicaStateMachine详解 (图解+秒懂+史上最全)
文章很长,建议收藏起来,慢慢读! Java 高并发 发烧友社群:疯狂创客圈 奉上以下珍贵的学习资源: 免费赠送 经典图书:<Java高并发核心编程(卷1)> 面试必备 + 大厂必备 +涨薪 ...
- 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析
目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...
- 2022-2028年中国自动驾驶系统行业现状调研分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国自动驾驶系统行业市场行业相关概述.中国自 ...
- 2022-2028年中国阻尼涂料市场研究及前瞻分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国阻尼涂料行业市场行业相关概述.中国阻尼涂 ...
- 2021-2028年中国阻燃装饰行业市场需求与投资规划分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国阻燃装饰行业市场行业相关概述.中国阻燃装 ...
- 2022-2028年全球与中国漂白吸水棉市场研究及前瞻分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国漂白吸水棉行业市场行业相关概述.全 ...
- 2022-2028年全球与中国青苔清洗剂市场研究及前瞻分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国青苔清洗剂行业市场行业相关概述.全 ...
- 2022-2028年全球与中国氢碘化物市场智研瞻分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国氢碘化物行业市场行业相关概述.全球 ...
- 2022-2028年全球与中国人字拖市场研究及前瞻分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国人字拖行业市场行业相关概述.全球与 ...
最新文章
- Xshell 配置是vi显示多种颜色
- 微型计算机存取速度,微型计算机中,存取速度由快到慢排序:
- LIBSVM使用方法
- java当中递归打印目录树
- mysql mysql_set_charset_SQL注入攻击之 mysql_set_charset [转]
- fastclick.js移动端WEB开发,click,touch,tap事件浅析
- 360扫地机原理大揭秘,竟还有无人驾驶技术?——浅析家用机器人SLAM方案
- 知道央视大楼为什么又叫“痔疮”么,看看你就明白了
- [网文摘录]云计算平台管理
- [18/12/3]蓝桥杯 练习系统 入门级别 Fibonacci数列求模问题 题解思路
- 用Java山寨一款Flash游戏
- 分别使用docx4j,jacob将文字与图片插入word中书签位置
- 如何锻炼提高自己的逻辑思维?这里给你7个方法!
- Win10找不到gpedit.msc|找不到本地组策略编辑器的解决方法
- 挖掘长尾关键词的五大思路
- 实现登录注册页面详细(Servlet+jsp+java)
- 英才计划计算机潜质测评试题,opq(opq管理潜质测评试题)
- 干货分享--企业微信社群促活的12种方式
- 手动实现string类的方法实现
- 计算机辅助教育相关论文,教学计算机辅助论文,关于计算机辅助教学在现代教育改革中的作用相关参考文献资料-免费论文范文...
热门文章
- 倒计时小工具_想要工作效率更高?这几款计时工具你一定不能错过!
- 5-8经典卷子神经网络结构介绍
- Python机器学习:SVM008scikit-learn中的高斯核函数
- 用matlab做元胞自动机预测,元胞自动机(Cellular Automata)与城市规划及其MATLAB实现——莆田市城市发展预测...
- 怎么把css改成打印,css 打印print
- 用php脚本获取服务内容,如何使用PHP脚本仅获取数据库的内容
- 机械师开机黑屏自动修复此计算机,机械师F117-V-BISO还原教程
- jemalloc mysql5.6_Mysql-5.6安装编译全教程
- np读取csv文件_pythonpandas读写csv数据
- json vue 对象转数组_vue.js基于v-for实现批量渲染 Json数组对象列表数据示例