PartitionStateMachine是Controller Leader用于维护分区状态的状态机,分区状态时PartitionState,它有四个子类:

一 分区的状态转换

# NonExistentPartition -> NewPartition

从zookeeper中加载partition的AR 集合到ControllerContext的partitionReplicaAssignment

# NewPartition -> OnlinePartition

首先将第一个可用的副本所在的broker作为leader,再把所有可用的副本对象都装入ISR,然后写leader和ISR信息到zookeeper中保存

对于这个分区而言,发送LeaderAndIsr请求到每个可用的副本broker,以及UpdateMetadata请求到每个可用的broker上

# OnlinePartition/OfflinePartition ->OnlinePartition

为分区选择新的Leader副本和ISR集合,并将结果写入zookeeper,然后向需要进行角色切换的副本发LeaderAndIsrReqeust,指导这些副本进行角色切换,并向所有可用broker发送UpdateMetadataCache请求,更新该broker上的MetadataCache

# NewPartition/OnlinePartition ->OfflinePartition

仅仅是在kafkaController中标记该状态为OfflinePartition

# OfflinePartition -> NonExistentPartition

只是进行状态切换,没有其他操作

二 核心字段

controllerContext: ControllerContext 用于维护KafkaController中上下文信息

partitionState:Map[TopicAndPartition, PartitionState] 用于保存分区对应的状态

brokerRequestBatch:ControllerBrokerRequestBatch 用于向指定的Broker批量发送请请求

noOpPartitionLeaderSelector:NoOpLeaderSelector 默认的副本选举器,并没有真正进行副本选举,只是返回当前的Leader副本,ISR集合和AR集合

topicChangeListener:TopicChangeListener zookeeper的监听器,监听topic的变化

deleteTopicsListener:DeleteTopicsListener zookeeper的监听器,监听topic的删除

partitionModificationsListeners:Map[String, PartitionModifications

Listener] 用于监听分区修改

三 核心方法

3.1 startup方法

在PartitionStateMachine初始化的时候,会初始化partition的状态,并且会将NewPartition、OfflinePartition状态的分区试图转换成Online

Partition状态

def startup() {// 初始化partition状态initializePartitionState()// set started flaghasStarted.set(true)// 试图移动partition到online状态triggerOnlinePartitionStateChange()
}

# 初始化各个partition状态,初始化是根据ControllerContext的

partitionLeadershipinfo来决定的

private def initializePartitionState() {
  // 遍历ControllerContext获取的分区和副本映射集合
 
for((topicPartition, replicaAssignment) <- controllerContext.partitionReplicaAssignment) {
    // 检测ControllerContext保存的leader信息的leader和isr的路径在zookeeper是否存在
    // 如果存在表示不是新建的分区,如果不存在则表示这是新分区
    controllerContext
.partitionLeadershipInfo.get(topicPartition) match {
      case Some(currentLeaderIsrAndEpoch) =>
        // 检测该分区leader是否可用
       
if (controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader))
          // 如果可用初始化为OnlinePartition
          partitionState
.put(topicPartition, OnlinePartition)
        else
          // 如果不可用初始化为OfflinePartition
          partitionState
.put(topicPartition, OfflinePartition)
      case None =>
        // 如果没有,则表示是新建的,状态为NewPartition
        partitionState
.put(topicPartition, NewPartition)
    }
  }
}

# 试图移动所有NewPartition或者OfflinePartition状态的partition到OnlinePartition状态

def triggerOnlinePartitionStateChange() {try {brokerRequestBatch.newBatch()// 试图移动所有NewPartition或者OfflinePartition状态的partition到OnlinePartition状态// 遍历每一个分区和对应的状态的映射集合for((topicAndPartition, partitionState) <- partitionState// 如果没有开启topic物理删除机制且没有在topic删除队列if !controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic)) {// 将OfflinePartition和NewPartition 试图转换成NewPartitionif(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector,(new CallbackBuilder).build)}// 批量发送请求到指定的brokerbrokerRequestBatch.sendRequestsToBrokers(controller.epoch)} catch {case e: Throwable => error("Error while moving some partitions to the online state", e)}
}

3.2 handleStateChange方法 进行分区状态切换的核心方法,它会根据指定的leader 选举策略进行选举,每一次在转换前都会检测分区的前置状态是否合法

private def handleStateChange(topic: String, partition: Int, targetState: PartitionState, leaderSelector: PartitionLeaderSelector,callbacks: Callbacks) {val topicAndPartition = TopicAndPartition(topic, partition)if (!hasStarted.get)throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " +"the partition state machine has not started").format(controllerId, controller.epoch, topicAndPartition, targetState))// 根据指定的分区,获取分区状态,如果没有则为NonExistentPartitionval currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)try {targetState match {// 如果要转换成NewPartitioncase NewPartition =>// 检查该分区的前置状态assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)// 修改partition状态partitionState.put(topicAndPartition, NewPartition)// 获取分区AR集合val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s".format(controllerId, controller.epoch, topicAndPartition, currState, targetState, assignedReplicas))// 如果要转成OnLinePartitioncase OnlinePartition =>// 检查该分区的前置状态assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)partitionState(topicAndPartition) match {// 当前分区状态是NewPartitioncase NewPartition =>// 实例化新分区的Leader 和 ISRinitializeLeaderAndIsrForPartition(topicAndPartition)// 当前分区状态是OfflinePartitioncase OfflinePartition =>// 调用OfflinePartition->OnlinePartition状态转换方法electLeaderForPartition(topic, partition, leaderSelector)// 如果本身就是OnlinePartition,然后因为某种原因重新选举case OnlinePartition => // invoked when the leader needs to be re-elected// 调用OnlinePartition->OnlinePartition状态转换方法electLeaderForPartition(topic, partition, leaderSelector)case _ => // should never come here since illegal previous states are checked above}// 修改partition状态为OnlinePartitionpartitionState.put(topicAndPartition, OnlinePartition)val leader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leaderstateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to %s with leader %d".format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader))// 如果要转成OfflinePartitioncase OfflinePartition =>// 检查前置状态assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition)stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s".format(controllerId, controller.epoch, topicAndPartition, currState, targetState))// 修改partition状态为OnlinePartitionpartitionState.put(topicAndPartition, OfflinePartition)// 如果要转成NonExistentPartitioncase NonExistentPartition =>// 检查前置状态assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s".format(controllerId, controller.epoch, topicAndPartition, currState, targetState))// 修改partition状态为NonExistentPartitionpartitionState.put(topicAndPartition, NonExistentPartition)}} catch {case t: Throwable =>stateChangeLogger.error("Controller %d epoch %d initiated state change for partition %s from %s to %s failed".format(controllerId, controller.epoch, topicAndPartition, currState, targetState), t)}
}

3.3 initializeLeaderAndIsrForPartition如果NewPartition要切换成OnlinePartition状态时,会初始化该分区的Leader和ISR列表

# 获取该分区AR副本集,并且过滤出现在可用的有哪些副本

# 如果没有可用副本,表示转换失败

# 如果有则创建LeaderIsrAndControllerEpoch对象,它封装了Leader,

ISR以及controller epoch相关的信息

# 将LeaderIsrAndControllerEpoch对象进行转换后,保存到zookeeper对应的路径下:

/brokers/topics/[topic_name]/partitions/[partition_id]/state

# 更新ControllerContext的 partitionLeadershipInfo分区的leader信息

# 将获取的Leader副本和ISR列表以及AR等信息,封装成LeaderAndIsrRequest,添加到待发送队列,等待被发送

private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) {// 获取该分区AR副本集val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)// 获取该分区AR副本集中所有可用的副本val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))liveAssignedReplicas.size match {// 如果AR中没有存活的副本集,抛出状态转换失败的异常case 0 =>// ......case _ =>debug("Live assigned replicas for partition %s are: [%s]".format(topicAndPartition, liveAssignedReplicas))// 获取AR中可用副本集中的第一个副本作为Leaderval leader = liveAssignedReplicas.head// 创建LeaderIsrAndControllerEpoch对象val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList),controller.epoch)debug("Initializing leader and isr for partition %s to %s".format(topicAndPartition, leaderIsrAndControllerEpoch))try {// 根据leaderIsrAndControllerEpoch信息在zookeeper创建/brokers/topics/[topic_name]/partitions/[partition_id]/statezkUtils.createPersistentPath(getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),zkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))// 更新ControllerContext的partitionLeadershipInfo分区leader相关的信息controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch)// 添加LeaderAndIsr请求到队列,等待发送到指定的brokerbrokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment)} catch {//......}}
}

3.4 electLeaderForPartition 当OfflinePartition、OnlinePartition 要切换成OnlinePartition状态时

# 根据指定的选举策略为分区选举新的Leader副本

# 将Leader和ISR信息更新到zookeeper对应的路径下

# 更新ControllerContext的 partitionLeadershipInfo分区的leader信息

# 将获取的Leader副本和ISR列表以及AR等信息,封装成LeaderAndIsrRequest,添加到待发送队列,等待被发送

def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {val topicAndPartition = TopicAndPartition(topic, partition)// handle leader election for the partitions whose leader is no longer alivestateChangeLogger.trace("Controller %d epoch %d started leader election for partition %s".format(controllerId, controller.epoch, topicAndPartition))try {var zookeeperPathUpdateSucceeded: Boolean = falsevar newLeaderAndIsr: LeaderAndIsr = nullvar replicasForThisPartition: Seq[Int] = Seq.empty[Int]while(!zookeeperPathUpdateSucceeded) {// 从zk中获取分区当前的leader副本,ISR集合,zkversion等信息,如果不存在则抛出异常val currentLeaderIsrAndEpoch = getLeaderIsrAndEpochOrThrowException(topic, partition)val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsrval controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch// 判断是否小于已有的controller epoch值,如果小于抛出异常if (controllerEpoch > controller.epoch) {val failMsg = ("aborted leader election for partition [%s,%d] since the LeaderAndIsr path was " +"already written by another controller. This probably means that the current controller %d went through " +"a soft failure and another controller was elected with epoch %d.").format(topic, partition, controllerId, controllerEpoch)stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)throw new StateChangeFailedException(failMsg)}//根据leaderSelector选举出新的Leader副本和ISR列表val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)// 将新的LeaderAndIsr信息保存到zookeeperval (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition,leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion)newLeaderAndIsr = leaderAndIsrnewLeaderAndIsr.zkVersion = newVersionzookeeperPathUpdateSucceeded = updateSucceededreplicasForThisPartition = replicas}val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)// 更新ControllerContext的partitionLeadershipInfo分区leader信息controllerContext.partitionLeadershipInfo.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)stateChangeLogger.trace("Controller %d epoch %d elected leader %d for Offline partition %s".format(controllerId, controller.epoch, newLeaderAndIsr.leader, topicAndPartition))// 获取该分区AR副本集val replicas = controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition))// 向队列添加LeaderAndIsrRequest,等待被发送到指定的brokerbrokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,newLeaderIsrAndControllerEpoch, replicas)} catch {}debug("After leader election, leader cache is updated to %s".format(controllerContext.partitionLeadershipInfo.map(l => (l._1, l._2))))
}

PartitionStateMachine分析相关推荐

  1. 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析

    目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...

  2. 2022-2028年中国自动驾驶系统行业现状调研分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国自动驾驶系统行业市场行业相关概述.中国自 ...

  3. 2022-2028年中国阻尼涂料市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国阻尼涂料行业市场行业相关概述.中国阻尼涂 ...

  4. 2021-2028年中国阻燃装饰行业市场需求与投资规划分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国阻燃装饰行业市场行业相关概述.中国阻燃装 ...

  5. 2022-2028年全球与中国漂白吸水棉市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国漂白吸水棉行业市场行业相关概述.全 ...

  6. 2022-2028年全球与中国青苔清洗剂市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国青苔清洗剂行业市场行业相关概述.全 ...

  7. 2022-2028年全球与中国氢碘化物市场智研瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国氢碘化物行业市场行业相关概述.全球 ...

  8. 2022-2028年全球与中国人字拖市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国人字拖行业市场行业相关概述.全球与 ...

  9. 2022-2028年全球与中国乳胶丝市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国乳胶丝行业市场行业相关概述.全球与 ...

最新文章

  1. 2018年4月份,阿里最新的java程序员面试题目
  2. 汇编语言 字母的大小写转换
  3. 算法题001 剑指Offer 面试题三:二维数组中的查找
  4. iOS - OC PList 数据存储
  5. Harmonic Number(欧拉公式或技巧打表)LightOJ - 1234(求调和级数的和)
  6. Bootstrap响应式图片
  7. 以下各节已定义,但尚未为布局页“~/Views/_LayoutHome.cshtml”呈现:“mainContent; jsSrc”。...
  8. 苹果公布App审查新机制 禁止发布疫情相关娱乐程序和游戏
  9. 漫画:分布式缓存服务器扛不住了怎么办?| 技术头条
  10. 一种去除U盘写保护的可行方法(dd 命令解决)
  11. 中小学计算机培训心得体会,中小学计算机管理员培训心得体会材料
  12. 今日头条图片爬取(一)
  13. 虚幻4引擎开发使用感受
  14. SAPGUI 里 F1 功能键的用法专题讲解试读版
  15. 如何在Python中用集合实现随机的1~100的20个数字
  16. AUTOCAD——云线命令、滚动条设置
  17. [JS]JSON字符串与JS对象的转换
  18. Kotlin Parcelize注解 即kotlin-android-extensions抛弃之后
  19. OSChina 元宵节乱弹 ——青龙偃月刀都实名了
  20. 使用WinGate代理服务器使局域网连接到Internet

热门文章

  1. 多个python文件打包成exe_Python 3.4 .py文件打包成exe可执行文件方法
  2. 系统架构设计师含金量_软考高级系统架构设计师如何备考?
  3. html拖拽吸附插件,前端拖拽插件gridster.js
  4. python进程池的实现原理_Python基于进程池实现多进程过程解析
  5. Qt的工程文件讲解 .pro
  6. 有效集法介绍(Active Set Method)
  7. GWO(灰狼优化)算法
  8. AE、IDL开发问题锦集
  9. vim命令模式和底线_人生苦短!老鸟带你用Vim偷懒!
  10. python列表json_python-带有列表的JSON_normalize JSON文件包含字...