PartitioinLeaderSelector分析
PartitionLeaderSelector主要负责分区leader副本的选举。
1 NoOpLeaderSelector
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
warn("I should never have been asked to perform leader election,returning the current LeaderAndIsr and replica assignment.")
(currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition))
}
2 OfflinePartitionLeaderSelector
从该分区的AR副本集中过滤出可用的ISR列表
如果 ISR至少有一个可用副本,则从ISR列表中选举出一个副本作为leader
如果 ISR中没有存活副本,而且如果AR中没有存活的副本抛出异常
如果ISR中没有存活副本,但是AR副本集中有副本,那么就把AR第一个副本作为leader
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {case Some(assignedReplicas) =>// 从该分区的AR副本集中过滤出可用的副本val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))// 从传递进来的ISR列表过滤出可用的副本val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))val currentLeaderEpoch = currentLeaderAndIsr.leaderEpochval currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersionval newLeaderAndIsr =// 如果 ISR中没有存活副本if (liveBrokersInIsr.isEmpty) {if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkUtils,ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {throw new NoReplicaOnlineException(("No broker in ISR for partition " +"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +" ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(",")))}debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s".format(topicAndPartition, liveAssignedReplicas.mkString(",")))// 而且如果AR中没有存活的副本if (liveAssignedReplicas.isEmpty) {throw new NoReplicaOnlineException(("No replica for partition " +"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +" Assigned replicas are: [%s]".format(assignedReplicas))} else {// 如果AR中有存活的副本ControllerStats.uncleanLeaderElectionRate.mark()// 从AR的存活的副本中取出第一个副本作为leader,这样的话就是有数据丢失的风险val newLeader = liveAssignedReplicas.headwarn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss.".format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(",")))new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)}} else {// 如果 ISR有存活副本// 从AR存活的副本列表过滤出ISR存活的副本列表val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r))// ISR中存活的副本列表第一个作为新的leaderval newLeader = liveReplicasInIsr.headdebug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader.".format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(",")))new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)}info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))(newLeaderAndIsr, liveAssignedReplicas)case None =>throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to it".format(topicAndPartition))} }
3 ReassignedPartitionLeaderSelector
获取正在重新分配的副本,然后从可用ISR列表中选举出一个leader,然后将当前的ISR更新为新ISR,然后将重新分配非副本集合作为接受LeaderAndIsr请求的副本集
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {// 从ControllerContext获取该分区正在重新分配的副本val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicasval currentLeaderEpoch = currentLeaderAndIsr.leaderEpochval currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion// 过滤出当前可用的副本且ISR列表包含该副本val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r) &¤tLeaderAndIsr.isr.contains(r))val newLeaderOpt = aliveReassignedInSyncReplicas.headOptionnewLeaderOpt match {case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr,currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas)case None =>reassignedInSyncReplicas.size match {case 0 =>throw new NoReplicaOnlineException("List of reassigned replicas for partition " +" %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))case _ =>throw new NoReplicaOnlineException("None of the reassigned replicas for partition " +"%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))}} }
4 PreferredReplicaPartitionLeaderSelector
先取出当前分区分配的所有副本(AR),可能包括不可用,然后取出第一个副本,如果第一个副本就是leader抛出异常,否则如果该副本所在broker是存活的且它在ISR列表中,就把他作为leader,否则抛出异常
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {// 获取该分区的AR副本集val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)// 获取副本集第一个副本val preferredReplica = assignedReplicas.head// 检查该副本是不是leader,如果是leader抛出异常val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leaderif (currentLeader == preferredReplica) {throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s".format(preferredReplica, topicAndPartition))} else {info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) +" Trigerring preferred replica leader election")// 在检查该副本是不是可用的且它在ISR列表中if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) {(new LeaderAndIsr(preferredReplica, currentLeaderAndIsr.leaderEpoch + 1, currentLeaderAndIsr.isr,currentLeaderAndIsr.zkVersion + 1), assignedReplicas)} else {throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) +"%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))}} }
5 ControlledShutdownLeaderSelector
将ISR中处于关闭状态的副本从集合中去除掉,返回一个新新的ISR集合,然后选取第一个副本作为leader,然后令当前AR作为接收LeaderAndIsr请求的副本
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {val currentLeaderEpoch = currentLeaderAndIsr.leaderEpochval currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion// 获取leaderval currentLeader = currentLeaderAndIsr.leader// 获取该分区的AR副本集val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)// 获取可用或者正处于关闭的brokerval liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIdsval liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))// 过滤掉正处于关闭的副本val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))liveAssignedReplicas.find(newIsr.contains) match {case Some(newLeader) =>debug("Partition %s : current leader = %d, new leader = %d".format(topicAndPartition, currentLeader, newLeader))(LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas)case None =>throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" +" shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(",")))} }
PartitioinLeaderSelector分析相关推荐
- 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析
目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...
- 2022-2028年中国自动驾驶系统行业现状调研分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国自动驾驶系统行业市场行业相关概述.中国自 ...
- 2022-2028年中国阻尼涂料市场研究及前瞻分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国阻尼涂料行业市场行业相关概述.中国阻尼涂 ...
- 2021-2028年中国阻燃装饰行业市场需求与投资规划分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国阻燃装饰行业市场行业相关概述.中国阻燃装 ...
- 2022-2028年全球与中国漂白吸水棉市场研究及前瞻分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国漂白吸水棉行业市场行业相关概述.全 ...
- 2022-2028年全球与中国青苔清洗剂市场研究及前瞻分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国青苔清洗剂行业市场行业相关概述.全 ...
- 2022-2028年全球与中国氢碘化物市场智研瞻分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国氢碘化物行业市场行业相关概述.全球 ...
- 2022-2028年全球与中国人字拖市场研究及前瞻分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国人字拖行业市场行业相关概述.全球与 ...
- 2022-2028年全球与中国乳胶丝市场研究及前瞻分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国乳胶丝行业市场行业相关概述.全球与 ...
最新文章
- 【Qt】QtCreator中配置clang-format
- ***和******
- oracle同时更新多列数据,ORACLE 11G 表联合更新多列
- pytorch简单代码实现deep dream图(即CNN特征可视化 features visualization)
- zookeeper 可视化_大厂,常用,四款,大屏可视化工具
- IE无法打开新窗口与U盘不显示故障的解决
- stm32c语言arctan函数,超高速的反正切算法,纯整数运算
- win10系统更新服务器太慢了,Win10易升更新系统很慢的解决方法
- Sentaurus TCAD 2013安装包下载
- Maxcomputer使用实例
- 学习笔记10-Python图像批量处理(对比度、灰度)-内含代码可实现
- unity tags的坑
- html生日快乐源代码
- 计算机组成原理 汇编语言
- 关于区块链在存证方面的应用
- win10 oracle11g 乱码,小编教你解决win10系统出现汉字乱码的处理办法
- 如何完成一份优秀的前端求职简历?
- commit your changes or stash them before you can merge 解决方法
- HTML制作色带,色带用尼龙带及色带的制作方法
- CSS图片阴影+鼠标移上图片放大、变形
热门文章
- python是动态语言_Python是动态语言:动态添加或删除属性、方法
- 城市天际线 android,都市天际线安卓手机版
- java collection源码_jdk源码阅读Collection实例分析
- ElasticSearch的一些核心概念
- java并发编程之正确地终止一个线程interrupt/interrupted
- ubuntu20有道词典亲测安装记录
- computed用发_Vue中的computed属性和nextTick方法
- java_version干什么的_java类中serialVersionUID的作用及其使用
- Java课程烧CPU吗_java程序员:完了!CPU一味求快出事儿了!
- python spark社区_Spark中文python文档