Partition负责管理每个副本对应的Replica对象,进行Leader副本的切换,ISR列表的管理以及调用日志存储系统完成写消息等

一Partition核心字段
topic: String 表示topic名字

partitionId:Int 表示分区号

replicaManager: ReplicaManager 管理副本的一个类

localBrokerId:Int 当前的broker的id

logManager:LogManager 日志管理器,包括创建删除等

zkUtils: ZkUtils 操作zookeeper的辅助类

leaderEpoch:Int leader副本的年代信息

leaderReplicaIdOpt:Option[Int] 该partition的leader副本的id

inSyncReplicas: Set[Replica]类型,该集合维护了该分区的ISR列表,ISR是AR的子集

assignedReplicaMap:Pool[Int, Replica] 维护了该分区全部副本的集合AR信息

二 Partition重要方法

2.1 getOrCreateReplica: 获取或者创建副本

主要负责在AR集合(assignedReplicaMap)中查找指定的副本的replica对象;如果查找不到则创建replica对象并添加到AR集合中管理,如果创建的是本地副本,还会创建(恢复)对应的Log并初始化HighWatermark,HighWatermark与Log的recoveryPoint类似,也会需要记录到文件中保存,每一个log目录下都有一个replication-offset-checkpoint会记录。每一个分区的HW,在ReplicaManager启动的时候会读取此文件到highWatermarkCheclpoints这个map集合中,之后会定时更新该文件

def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
  // 从AR列表获取指定副本
 
val replicaOpt = getReplica(replicaId)
  replicaOpt match {
    case Some(replica) => replica
    case None =>
      // 判断是不是当前brokerId,
     
if (isReplicaLocal(replicaId)) {
        // 获取配置信息
       
val config = LogConfig.fromProps(logManager.defaultConfig.originals,
                                         AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic))
        // 创建本地log
       
val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
        // 获取指定日志目录对应的OffsetCheckpoint对象,他负责管理log目录下的replica-offsetcheckpoint
        val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
        // 将replication-offset-checkpoint文件中的记录HW转成map
       
val offsetMap = checkpoint.read
        if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
          info("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId))
        // 根据TopicPartition找到对应的HW,再与LEO比较,此值会作为次副本的HW
       
val offset = offsetMap.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset)
        val localReplica = new Replica(replicaId, this, time, offset, Some(log))
        addReplicaIfNotExists(localReplica)
      } else {// 如果是远程副本,直接创建并添加到AR列表
       
val remoteReplica = new Replica(replicaId, this, time)
        addReplicaIfNotExists(remoteReplica)
      }
      getReplica(replicaId).get // 返回replica
 
}
}

2.2 副本的角色切换

broker会根据KafkaController发出的LeaderAndISRRequest请求控制副本的Leader/Follower角色切换

# makeLeader:

def makeLeader(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {//加锁// 获取需要分配的AR集合val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)// 记录controller epochcontrollerEpoch = partitionStateInfo.controllerEpoch// 创建AR集合里所有副本对应的Replica对象allReplicas.foreach(replica => getOrCreateReplica(replica))// 获取ISR集合val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet// remove assigned replicas that have been removed by the controller// 删除那些已经被KafkaController移除的分配的列表(assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))// 更新ISR列表inSyncReplicas = newInSyncReplicasleaderEpoch = partitionStateInfo.leaderEpoch // 更新leader epochzkVersion = partitionStateInfo.zkVersion // 更新zkVersion// 检测leader是否发生变化val isNewLeader =if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId) {false // 表示leader并没有发生变化} else {leaderReplicaIdOpt = Some(localBrokerId) // 表示leader发生了变化true}val leaderReplica = getReplica().get // 获取本地副本if (isNewLeader) {// 初始化leader的highwatermarkMetadata// 如果leader发生了变化,表示leader副本通过上面的步骤刚刚分配到此broker上,可能是刚启动也可能是follower副本成为leader副本//leaderReplica.convertHWToLocalOffsetMetadata()// 重置远程副本的log end offset 为 -1assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))}// 尝试更新high watermark,后移HW(maybeIncrementLeaderHW(leaderReplica), isNewLeader)}// 如果leader增加了HW,在这里可能有DelayedFetch满足条件,所以这里调用tryCompleteDelayedRequestsif (leaderHWIncremented)tryCompleteDelayedRequests()isNewLeader
}
private def maybeIncrementLeaderHW(leaderReplica: Replica): Boolean = {// 获取ISR列表所有的副本的LEOval allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)// 将ISR列表中最小的LEO作为新的HWval newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)val oldHighWatermark = leaderReplica.highWatermark// 比较新旧HighWatermark决定是否更新if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) {leaderReplica.highWatermark = newHighWatermarkdebug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))true} else {debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s".format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(",")))false}
}

# makeFollower:

def makeFollower(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {inWriteLock(leaderIsrUpdateLock) {//加锁val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)// 获取leader broker idval newLeaderBrokerId: Int = partitionStateInfo.leader// 获取controller epochcontrollerEpoch = partitionStateInfo.controllerEpoch// 创建对应的Replica对象allReplicas.foreach(r => getOrCreateReplica(r))// 如果controller 已经删除了某些副本,则更新AR集合列表(assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))// follower replica上的ISR列表应该为空,因为这个ISR列表是leader来维护的inSyncReplicas = Set.empty[Replica]leaderEpoch = partitionStateInfo.leaderEpoch // 更新leader epochzkVersion = partitionStateInfo.zkVersion // 更新zkVersion// 检测leader是否发生变化if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) {false}else {// 如果发生变化更新leaderReplicaIdOptleaderReplicaIdOpt = Some(newLeaderBrokerId)true}}
}

2.3 ISR 集合管理

Partition还需要管理ISR集合,随着follower副本不断与leader副本进行消息同步,follower副本的leo会不断后移,并且最终赶上leader的leo,此时follower副本就有资格进入ISR列表

# ISR的扩张

Partition的maybeExpandIsr方法实现了扩张ISR列表的功能

def maybeExpandIsr(replicaId: Int) {
  val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {// 加锁
    // 检查是否是leader副本,因为只有leader副本才会管理ISR
   
leaderReplicaIfLocal() match {
      case Some(leaderReplica) =>
        // 获取当前replica对象
       
val replica = getReplica(replicaId).get
        // 获取当前leader的HighWatermarek
        val leaderHW = leaderReplica.highWatermark
        // 如果follower副本不再ISR集合但是AR集合可以找得到,且当前副本的leo已经追赶上leader的HW
        // 其实就是说在AR列表的副本,但是因为某种原因,不在ISR列表(可能是新加入的,也可能是挂掉之后重新启动的,或者是落后leader超过阀值的)
        // 由于现在已经赶上leader了,判断的依据就是他们的LEO已经大于或者等于leader的HW的值
       
if(!inSyncReplicas.contains(replica) &&
           assignedReplicas.map(_.brokerId).contains(replicaId) &&
                replica.logEndOffset.offsetDiff(leaderHW) >= 0) {
          // 把该副本添加到ISR列表
         
val newInSyncReplicas = inSyncReplicas+ replica
          info("Expanding ISR for partition [%s,%d] from %s to %s"
                       .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","),
                               newInSyncReplicas.map(_.brokerId).mkString(",")))
          // 并且更新Isr和zookeeper
         
updateIsr(newInSyncReplicas)
          replicaManager.isrExpandRate.mark()
        }
        // 尝试更新HighWatermark
       
maybeIncrementLeaderHW(leaderReplica)

case None => false // nothing to do if no longer leader
   
}
  }

// 尝试执行延迟任务
 
if (leaderHWIncremented)
    tryCompleteDelayedRequests()
}

private def updateIsr(newIsr: Set[Replica]) {
  // 根据新的Isr列表创建一个新LeaderAndIsr对象
 
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
  // 在zk中更新最新的ISR记录.
 
val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,
    newLeaderAndIsr, controllerEpoch, zkVersion)
  // 更新成功就在ReplicaManager中的isrChangeSet集合中添加副本变化的topic 和 partition id
  // 更新成功就在ReplicaManager中的isrChangeSet集合中添加副本变化的topic 和 partition id
 
if(updateSucceeded) {
    replicaManager.recordIsrChange(new TopicAndPartition(topic, partitionId))
    // 将ISR 更新为最新的ISR
    inSyncReplicas
= newIsr
    zkVersion = newVersion
    trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion))
  } else {
    info("Cached zkVersion [%d] not equal to that in zookeeper, skip updatingISR".format(zkVersion))
  }
}

# ISR缩减

各个节点通过网络交互可能出现阻塞和延迟,导致ISR集合内部分的follower无法与leader同步。如果此时ProduceRequest的acks设为-1,则会长时间等待。为了避免出现这种情况,Partition会对ISR列表进行缩减,通过maybeShrinkIsr方法实现。在ReplicaManager中使用定时任务周期性的调用maybeShrinkIsr检查ISR中follower和leaderr副本之间的差距,并对ISR集合缩减

def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
  val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
    // 首先判断是不是当前broker是不是leader,只有leader才可以管理ISR
   
leaderReplicaIfLocal() match {
      // 如果是leader
     
case Some(leaderReplica) =>
        // 获取不同步副本,就是那些和leader差距很多的副本
       
val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
        // 不同步副本如果存在
       
if(outOfSyncReplicas.nonEmpty) {
          // 从ISR列表中移除没有同步的副本
         
val newInSyncReplicas = inSyncReplicas-- outOfSyncReplicas
          assert(newInSyncReplicas.nonEmpty)
          info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId,
            inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
          // 在zookeeper和缓存中更新ISR列表
         
updateIsr(newInSyncReplicas)
          // we may needto increment high watermark since ISR could be down to 1
          // 因为ISR中移除了一个副本,那么有可能剩余的副本都在开始同步了,那么我们可能需要增加高水位线了
         
replicaManager.isrShrinkRate.mark()
          maybeIncrementLeaderHW(leaderReplica)
        } else {
          false
        }

case None => false // do nothing if no longer leader
   
}
  }

// 尝试进行延迟操作
 
if (leaderHWIncremented)
    tryCompleteDelayedRequests()
}

def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica] = {
  // 获取leader的 LEO
 
val leaderLogEndOffset = leaderReplica.logEndOffset
  // 获取候选的副本(ISR列表中除了leader之外的其他follower)
 
val candidateReplicas = inSyncReplicas- leaderReplica
  // 从候选的副本获取落后leader的副本
 
val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
  if(laggingReplicas.nonEmpty)
    debug("Lagging replicas for partition %s are %s".format(TopicAndPartition(topic, partitionId), laggingReplicas.map(_.brokerId).mkString(",")))

laggingReplicas
}

2.4 追加消息

只有leader副本能够处理读写请求,appendMessagesToLeader方法提供了向Leader副本对应的log中追加消息的功能

def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int = 0) = {
  val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
    //  获取leader副本对应的replica对象
   
val leaderReplicaOpt = leaderReplicaIfLocal()
    leaderReplicaOptmatch {
      case Some(leaderReplica) =>
        // 获取Replica的Log对象
       
val log = leaderReplica.log.get
        // 获取最小的ISR副本数
       
val minIsr = log.config.minInSyncReplicas
        // 正在同步的副本数量
       
val inSyncSize = inSyncReplicas.size
        // 如果当前ISR列表的数量小于配置的最小限制,且生产者对消息要求较高可用性ack=-1,则不能追加消息
       
if (inSyncSize < minIsr && requiredAcks == -1) {
          throw new NotEnoughReplicasException("Numberof insync replicas for partition [%s,%d] is [%d], below required minimum[%d]"
            .format(topic, partitionId, inSyncSize, minIsr))
        }
        // 调用Log的append方法,添加消息
       
val info = log.append(messages, assignOffsets = true)
        // 尝试执行对应的延迟操作
       
replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
        (info, maybeIncrementLeaderHW(leaderReplica))//尝试增加leader的highwatermark

case None =>
        throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
          .format(topic, partitionId, localBrokerId))
    }
  }
  // 在高水位线改变之后,一些延迟操作可能就需要开始了
 
if (leaderHWIncremented)
    tryCompleteDelayedRequests()
  info
}

Partition分析相关推荐

  1. java实现顺序表和链表_Java: 实现顺序表和单链表的快速排序

    快速排序 快速排序原理 快速排序(Quick Sort)的基本思想是,通过一趟排序将待排记录分割成独立的两部分,其中一部分记录的关键字均比另一部分记录的关键字小,则可对这两部分记录继续进行排序,以达到 ...

  2. mysql分区表 缓存_Mysql 分区表-分区操作

    一.查看MySQL是否支持分区 1.MySQL5.6以及之前版本 show variables like '%partition%'; 2.MySQL5.7 show plugins; 二.分区表的分 ...

  3. MySQL优化系列12-MySQL分区表

    备注:测试数据库版本为MySQL 8.0 文章目录 一.分区表简介 二.分区的类型 2.1 range分区 2.2 list分区 2.3 colums分区 2.3.1 RANGE COLUMNS分区 ...

  4. sum(x) over( partition by y ORDER BY z ) 分析

    参考的博文出处:http://www.cnblogs.com/luhe/p/4155612.html,对博文进行了修改新增,修改了错误的地方 之前用过row_number(),rank()等排序与ov ...

  5. Partition函数实现java(含分析)

    Partition函数作用 在数组中选择一个数字,接下来把数组中的数字分为两部分,比选择的数字小的数字移到数组的左边,比选择的数字大的数字移到到数组的右边. Partition还可以用来实现长度为n的 ...

  6. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

  7. 算法设计与分析第2章 递归与分治策略

    第2章 递归与分治策略 2.1 递归算法 递归算法:直接或间接地调用自身的算法. 递归函数:用函数自身给出定义的函数.两个要素:边界条件.递归方程 优点:结构清晰,可读性强,而且容易用数学归纳法来证明 ...

  8. 面向Mobile device的CNN模型手工设计与NAS分析总结,MobileNet V1,V2,V3,Efficient,MNasNet以及Efficient network design

    手工方法和NAS的高效网络模型设计总结与分析 这篇文章主要关注对于移动端,资源受限平台的高效神经网络设计(Manually)和搜索(NAS). ​​​​​​高效的CNN设计不只是用在服务器,云端,资源 ...

  9. partprobe源码分析

    partprobe工具 操作系统目录/usr/sbin/partprobe 程序安装包parted-3.1-17.el7.x86_64.rpm 命令用法: partprobe是用来告知操作系统内核 分 ...

最新文章

  1. Spring Cloud(四)服务提供者 Eureka + 服务消费者 Feign
  2. ERP顾问在甲方好还是乙方好?
  3. [Golang] struct Tag说明
  4. css transition改动透明,使用CSS transition和animation改变渐变状态的实现方法
  5. Processor Tracing | 处理器追踪
  6. php中全局变量如何设置,如何在php中声明全局变量?
  7. 人群与网络:关系的平衡
  8. bzoj 1698: [Usaco2007 Feb]Lilypad Pond 荷叶池塘(BFS)
  9. 简述计算机网络的五层协议体系结构,计算机网络五层协议体系结构分别是什么...
  10. 使用Python对csv文件去重
  11. 【软考】系统集成项目管理工程师(八)项目进度管理
  12. Gradle build daemon disappeared unexpectedly (it may have been killed or may hav
  13. 身份证有效验证方法,
  14. Python每日笔记打卡_day2
  15. 大学计算机基础超详细知识点(高手总结),大学计算机基础超详细知识点(高手总结)免费-...
  16. 夏季干燥口腔溃疡频发怎么办
  17. 分子模拟榨干GPU性能的参数建议
  18. 哔哩哔哩老显示服务器中断,哔哩哔哩服务器不能正常播放怎么办_bilibili服务器无法正常播放的解决办法_牛游戏网...
  19. UE 光照可移动与静态在虚拟制片中的差别
  20. 耐世特中国产量里程碑:第200万件齿条式电动助力转向系统下线

热门文章

  1. Python模板设置
  2. js 去重某个键值 数组对象_JS数组去重常见方法分析
  3. 多系统并行服务器,具有分布式并行I/O接口的分布式并行服务器系统的性能研究...
  4. java mysql tomcat my_Java、Tomcat 及 MySQL 环境配置
  5. java 泛化_Java语言class类用法及泛化(详解)
  6. 一个解决方案创建多个项目问题解决方案
  7. 素数筛选法(埃氏筛 欧拉筛)
  8. Python wxpy通过ModBus控制电脑鼠标和键盘
  9. java selenium firefox启动报错大调查
  10. sklearn 使用joblib保存模型,并解决cannot import name joblib from sklearn.externals报错