为了提高KafkaController Leader和集群其他broker的通信效率,实现批量发送请求的功能

leaderAndIsrRequestMap:保存了发往指定broker的LeaderAndIsrRequest请求相关的信息

stopReplicaRequestMap: 保存了发往指定broker的StopReplicaRequest请求相关的信息

updateMetadataRequestMap:保存了发往指定broker的UpdateMetadataCacheRequest请求相关的信息

// 将保存相关请求的集合清空
def clear() {
  leaderAndIsrRequestMap.clear()
  stopReplicaRequestMap.clear()
  updateMetadataRequestMap.clear()
}
// 往LeaderAndIsrRequest请求的map添加相关信息
def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
                                     leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
                                     replicas: Seq[Int], callback: AbstractRequestResponse => Unit = null) {
  val topicPartition = new TopicPartition(topic, partition)

brokerIds.filter(_ >= 0).foreach { brokerId =>
    val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
    result.put(topicPartition, PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
  }

addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
                                     Set(TopicAndPartition(topic, partition)))
}
// 往StopReplicaRequest请求的map添加相关信息
def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean,
                                    callback: (AbstractRequestResponse, Int) => Unit = null) {
  brokerIds.filter(b => b >= 0).foreach { brokerId =>
    stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[StopReplicaRequestInfo])
    val v = stopReplicaRequestMap(brokerId)
    if(callback != null)
      stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
        deletePartition, (r: AbstractRequestResponse) => callback(r, brokerId))
    else
      stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
        deletePartition)
  }
}

// 往UpdateMetadataRequest请求的map添加相关信息
def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
                                       partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition],
                                       callback: AbstractRequestResponse => Unit = null) {
  // 定义回调函数
 
def updateMetadataRequestMapFor(partition: TopicAndPartition, beingDeleted: Boolean) {
    // 找到controllerContext中保存的该分区的leader
   
val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
    leaderIsrAndControllerEpochOptmatch {
      case Some(leaderIsrAndControllerEpoch) =>
        // 获取该分区的AR集合
       
val replicas = controllerContext.partitionReplicaAssignment(partition).toSet
        val partitionStateInfo = if (beingDeleted) {
          val leaderAndIsr = new LeaderAndIsr(LeaderAndIsr.LeaderDuringDelete, leaderIsrAndControllerEpoch.leaderAndIsr.isr)
          PartitionStateInfo(LeaderIsrAndControllerEpoch(leaderAndIsr, leaderIsrAndControllerEpoch.controllerEpoch), replicas)
        } else {
          PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
        }
        // 向updateMetadataCacaheRequestMap添加数据
       
brokerIds.filter(b => b >= 0).foreach { brokerId =>
          updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo])
          updateMetadataRequestMap(brokerId).put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo)
        }
      case None =>
        info("Leader not yet assigned for partition %s. Skip sendingUpdateMetadataRequest.".format(partition))
    }
  }
  // 如果指定的分区集合为空,则需要全部更新分区
 
val filteredPartitions = {
    // 如果指定的分区集合为空
   
val givenPartitions = if (partitions.isEmpty)
      // 返回获取所有分区的leader信息
      controllerContext
.partitionLeadershipInfo.keySet
    else
      partitions // 直接返回分区
    // 过滤即将被删除的partition
   
if (controller.deleteTopicManager.partitionsToBeDeleted.isEmpty)
      givenPartitions
    else
      givenPartitions-- controller.deleteTopicManager.partitionsToBeDeleted
 
}
  // 将过滤后的分区信息添加到updateMetadataCacheRequestMap集合,等待发送
 
if (filteredPartitions.isEmpty)
    brokerIds.filter(b => b >= 0).foreach { brokerId =>
      updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo])
    }
  else
    filteredPartitions.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = false))

controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = true))
}

def sendRequestsToBrokers(controllerEpoch: Int) {try {// 首先处理leaderAndIsrRequestMapleaderAndIsrRequestMap.foreach { case (broker, partitionStateInfos) =>partitionStateInfos.foreach { case (topicPartition, state) =>val typeOfRequest = if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s to broker %d " +"for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest,state.leaderIsrAndControllerEpoch, broker,topicPartition.topic, topicPartition.partition))}val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSetval leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {_.getNode(controller.config.interBrokerSecurityProtocol)}val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpochval partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,partitionStateInfo.allReplicas.map(Integer.valueOf).asJava)topicPartition -> partitionState}// 创建LeaderAndIsrRequest对象val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, controllerEpoch, partitionStates.asJava, leaders.asJava)// 添加到队列等到被发送controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest, null)}// 将集合清空leaderAndIsrRequestMap.clear()// 处理updateMetadataRequestMap信息updateMetadataRequestMap.foreach { case (broker, partitionStateInfos) =>partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s " +"to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,broker, p._1)))val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpochval partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,partitionStateInfo.allReplicas.map(Integer.valueOf).asJava)topicPartition -> partitionState}val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2: Shortelse if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1: Shortelse 0: Short// 创建UpdateMetadataRequestval updateMetadataRequest =if (version == 0) {val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map(_.getNode(SecurityProtocol.PLAINTEXT))new UpdateMetadataRequest(controllerId, controllerEpoch, liveBrokers.asJava, partitionStates.asJava)}else {val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map { broker =>val endPoints = broker.endPoints.map { case (securityProtocol, endPoint) =>securityProtocol -> new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port)}new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)}new UpdateMetadataRequest(version, controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava)}// 添加到队列等到被发送controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, Some(version), updateMetadataRequest, null)}// 将集合清空updateMetadataRequestMap.clear()// 处理stopReplicaRequestMap信息stopReplicaRequestMap.foreach { case (broker, replicaInfoList) =>val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSetval stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSetdebug("The stop replica request (delete = true) sent to broker %d is %s".format(broker, stopReplicaWithDelete.mkString(",")))debug("The stop replica request (delete = false) sent to broker %d is %s".format(broker, stopReplicaWithoutDelete.mkString(",")))replicaInfoList.foreach { r =>// 创建StopReplicaRequest对象val stopReplicaRequest = new StopReplicaRequest(controllerId, controllerEpoch, r.deletePartition,Set(new TopicPartition(r.replica.topic, r.replica.partition)).asJava)// 添加到队列等到被发送controller.sendRequest(broker, ApiKeys.STOP_REPLICA, None, stopReplicaRequest, r.callback)}}// 将集合清空stopReplicaRequestMap.clear()} catch {case e : Throwable => {if (leaderAndIsrRequestMap.nonEmpty) {error("Haven't been able to send leader and isr requests, current state of " +s"the map is $leaderAndIsrRequestMap. Exception message: $e")}if (updateMetadataRequestMap.nonEmpty) {error("Haven't been able to send metadata update requests, current state of " +s"the map is $updateMetadataRequestMap. Exception message: $e")}if (stopReplicaRequestMap.nonEmpty) {error("Haven't been able to send stop replica requests, current state of " +s"the map is $stopReplicaRequestMap. Exception message: $e")}throw new IllegalStateException(e)}}
}

ControllerBrokerRequestBatch分析相关推荐

  1. Kafka源码分析10:副本状态机ReplicaStateMachine详解 (图解+秒懂+史上最全)

    文章很长,建议收藏起来,慢慢读! Java 高并发 发烧友社群:疯狂创客圈 奉上以下珍贵的学习资源: 免费赠送 经典图书:<Java高并发核心编程(卷1)> 面试必备 + 大厂必备 +涨薪 ...

  2. 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析

    目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...

  3. 2022-2028年中国自动驾驶系统行业现状调研分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国自动驾驶系统行业市场行业相关概述.中国自 ...

  4. 2022-2028年中国阻尼涂料市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国阻尼涂料行业市场行业相关概述.中国阻尼涂 ...

  5. 2021-2028年中国阻燃装饰行业市场需求与投资规划分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国阻燃装饰行业市场行业相关概述.中国阻燃装 ...

  6. 2022-2028年全球与中国漂白吸水棉市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国漂白吸水棉行业市场行业相关概述.全 ...

  7. 2022-2028年全球与中国青苔清洗剂市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国青苔清洗剂行业市场行业相关概述.全 ...

  8. 2022-2028年全球与中国氢碘化物市场智研瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国氢碘化物行业市场行业相关概述.全球 ...

  9. 2022-2028年全球与中国人字拖市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国人字拖行业市场行业相关概述.全球与 ...

最新文章

  1. Xshell 配置是vi显示多种颜色
  2. 微型计算机存取速度,微型计算机中,存取速度由快到慢排序:
  3. LIBSVM使用方法
  4. java当中递归打印目录树
  5. mysql mysql_set_charset_SQL注入攻击之 mysql_set_charset [转]
  6. fastclick.js移动端WEB开发,click,touch,tap事件浅析
  7. 360扫地机原理大揭秘,竟还有无人驾驶技术?——浅析家用机器人SLAM方案
  8. 知道央视大楼为什么又叫“痔疮”么,看看你就明白了
  9. [网文摘录]云计算平台管理
  10. [18/12/3]蓝桥杯 练习系统 入门级别 Fibonacci数列求模问题 题解思路
  11. 用Java山寨一款Flash游戏
  12. 分别使用docx4j,jacob将文字与图片插入word中书签位置
  13. 如何锻炼提高自己的逻辑思维?这里给你7个方法!
  14. Win10找不到gpedit.msc|找不到本地组策略编辑器的解决方法
  15. 挖掘长尾关键词的五大思路
  16. 实现登录注册页面详细(Servlet+jsp+java)
  17. 英才计划计算机潜质测评试题,opq(opq管理潜质测评试题)
  18. 干货分享--企业微信社群促活的12种方式
  19. 手动实现string类的方法实现
  20. 计算机辅助教育相关论文,教学计算机辅助论文,关于计算机辅助教学在现代教育改革中的作用相关参考文献资料-免费论文范文...

热门文章

  1. 倒计时小工具_想要工作效率更高?这几款计时工具你一定不能错过!
  2. 5-8经典卷子神经网络结构介绍
  3. Python机器学习:SVM008scikit-learn中的高斯核函数
  4. 用matlab做元胞自动机预测,元胞自动机(Cellular Automata)与城市规划及其MATLAB实现——莆田市城市发展预测...
  5. 怎么把css改成打印,css 打印print
  6. 用php脚本获取服务内容,如何使用PHP脚本仅获取数据库的内容
  7. 机械师开机黑屏自动修复此计算机,机械师F117-V-BISO还原教程
  8. jemalloc mysql5.6_Mysql-5.6安装编译全教程
  9. np读取csv文件_pythonpandas读写csv数据
  10. json vue 对象转数组_vue.js基于v-for实现批量渲染 Json数组对象列表数据示例