PartitionLeaderSelector主要负责分区leader副本的选举。

1 NoOpLeaderSelector

def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
    warn("I should never have been asked to perform leader election,returning the current LeaderAndIsr and replica assignment.")
    (currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition))
  }

2 OfflinePartitionLeaderSelector

从该分区的AR副本集中过滤出可用的ISR列表

如果 ISR至少有一个可用副本,则从ISR列表中选举出一个副本作为leader

如果 ISR中没有存活副本,而且如果AR中没有存活的副本抛出异常

如果ISR中没有存活副本,但是AR副本集中有副本,那么就把AR第一个副本作为leader

def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {case Some(assignedReplicas) =>// 从该分区的AR副本集中过滤出可用的副本val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))// 从传递进来的ISR列表过滤出可用的副本val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))val currentLeaderEpoch = currentLeaderAndIsr.leaderEpochval currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersionval newLeaderAndIsr =// 如果 ISR中没有存活副本if (liveBrokersInIsr.isEmpty) {if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkUtils,ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {throw new NoReplicaOnlineException(("No broker in ISR for partition " +"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +" ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(",")))}debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s".format(topicAndPartition, liveAssignedReplicas.mkString(",")))// 而且如果AR中没有存活的副本if (liveAssignedReplicas.isEmpty) {throw new NoReplicaOnlineException(("No replica for partition " +"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +" Assigned replicas are: [%s]".format(assignedReplicas))} else {// 如果AR中有存活的副本ControllerStats.uncleanLeaderElectionRate.mark()// 从AR的存活的副本中取出第一个副本作为leader,这样的话就是有数据丢失的风险val newLeader = liveAssignedReplicas.headwarn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss.".format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(",")))new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)}} else {// 如果 ISR有存活副本// 从AR存活的副本列表过滤出ISR存活的副本列表val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r))// ISR中存活的副本列表第一个作为新的leaderval newLeader = liveReplicasInIsr.headdebug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader.".format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(",")))new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)}info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))(newLeaderAndIsr, liveAssignedReplicas)case None =>throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to it".format(topicAndPartition))}
}

3 ReassignedPartitionLeaderSelector

获取正在重新分配的副本,然后从可用ISR列表中选举出一个leader,然后将当前的ISR更新为新ISR,然后将重新分配非副本集合作为接受LeaderAndIsr请求的副本集

def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {// 从ControllerContext获取该分区正在重新分配的副本val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicasval currentLeaderEpoch = currentLeaderAndIsr.leaderEpochval currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion// 过滤出当前可用的副本且ISR列表包含该副本val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r) &&currentLeaderAndIsr.isr.contains(r))val newLeaderOpt = aliveReassignedInSyncReplicas.headOptionnewLeaderOpt match {case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr,currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas)case None =>reassignedInSyncReplicas.size match {case 0 =>throw new NoReplicaOnlineException("List of reassigned replicas for partition " +" %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))case _ =>throw new NoReplicaOnlineException("None of the reassigned replicas for partition " +"%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))}}
}

4 PreferredReplicaPartitionLeaderSelector

先取出当前分区分配的所有副本(AR),可能包括不可用,然后取出第一个副本,如果第一个副本就是leader抛出异常,否则如果该副本所在broker是存活的且它在ISR列表中,就把他作为leader,否则抛出异常

def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {// 获取该分区的AR副本集val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)// 获取副本集第一个副本val preferredReplica = assignedReplicas.head// 检查该副本是不是leader,如果是leader抛出异常val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leaderif (currentLeader == preferredReplica) {throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s".format(preferredReplica, topicAndPartition))} else {info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) +" Trigerring preferred replica leader election")// 在检查该副本是不是可用的且它在ISR列表中if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) {(new LeaderAndIsr(preferredReplica, currentLeaderAndIsr.leaderEpoch + 1, currentLeaderAndIsr.isr,currentLeaderAndIsr.zkVersion + 1), assignedReplicas)} else {throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) +"%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))}}
}

5 ControlledShutdownLeaderSelector

将ISR中处于关闭状态的副本从集合中去除掉,返回一个新新的ISR集合,然后选取第一个副本作为leader,然后令当前AR作为接收LeaderAndIsr请求的副本

def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {val currentLeaderEpoch = currentLeaderAndIsr.leaderEpochval currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion// 获取leaderval currentLeader = currentLeaderAndIsr.leader// 获取该分区的AR副本集val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)// 获取可用或者正处于关闭的brokerval liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIdsval liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))// 过滤掉正处于关闭的副本val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))liveAssignedReplicas.find(newIsr.contains) match {case Some(newLeader) =>debug("Partition %s : current leader = %d, new leader = %d".format(topicAndPartition, currentLeader, newLeader))(LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas)case None =>throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" +" shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(",")))}
}

PartitioinLeaderSelector分析相关推荐

  1. 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析

    目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...

  2. 2022-2028年中国自动驾驶系统行业现状调研分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国自动驾驶系统行业市场行业相关概述.中国自 ...

  3. 2022-2028年中国阻尼涂料市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国阻尼涂料行业市场行业相关概述.中国阻尼涂 ...

  4. 2021-2028年中国阻燃装饰行业市场需求与投资规划分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国阻燃装饰行业市场行业相关概述.中国阻燃装 ...

  5. 2022-2028年全球与中国漂白吸水棉市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国漂白吸水棉行业市场行业相关概述.全 ...

  6. 2022-2028年全球与中国青苔清洗剂市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国青苔清洗剂行业市场行业相关概述.全 ...

  7. 2022-2028年全球与中国氢碘化物市场智研瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国氢碘化物行业市场行业相关概述.全球 ...

  8. 2022-2028年全球与中国人字拖市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国人字拖行业市场行业相关概述.全球与 ...

  9. 2022-2028年全球与中国乳胶丝市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国乳胶丝行业市场行业相关概述.全球与 ...

最新文章

  1. 【Qt】QtCreator中配置clang-format
  2. ***和******
  3. oracle同时更新多列数据,ORACLE 11G 表联合更新多列
  4. pytorch简单代码实现deep dream图(即CNN特征可视化 features visualization)
  5. zookeeper 可视化_大厂,常用,四款,大屏可视化工具
  6. IE无法打开新窗口与U盘不显示故障的解决
  7. stm32c语言arctan函数,超高速的反正切算法,纯整数运算
  8. win10系统更新服务器太慢了,Win10易升更新系统很慢的解决方法
  9. Sentaurus TCAD 2013安装包下载
  10. Maxcomputer使用实例
  11. 学习笔记10-Python图像批量处理(对比度、灰度)-内含代码可实现
  12. unity tags的坑
  13. html生日快乐源代码
  14. 计算机组成原理 汇编语言
  15. 关于区块链在存证方面的应用
  16. win10 oracle11g 乱码,小编教你解决win10系统出现汉字乱码的处理办法
  17. 如何完成一份优秀的前端求职简历?
  18. commit your changes or stash them before you can merge 解决方法
  19. HTML制作色带,色带用尼龙带及色带的制作方法
  20. CSS图片阴影+鼠标移上图片放大、变形

热门文章

  1. python是动态语言_Python是动态语言:动态添加或删除属性、方法
  2. 城市天际线 android,都市天际线安卓手机版
  3. java collection源码_jdk源码阅读Collection实例分析
  4. ElasticSearch的一些核心概念
  5. java并发编程之正确地终止一个线程interrupt/interrupted
  6. ubuntu20有道词典亲测安装记录
  7. computed用发_Vue中的computed属性和nextTick方法
  8. java_version干什么的_java类中serialVersionUID的作用及其使用
  9. Java课程烧CPU吗_java程序员:完了!CPU一味求快出事儿了!
  10. python spark社区_Spark中文python文档