KafkaController会通过zookeeper监控整个Kafka集群的运行状态,响应管理员的指令,通过在指定znode添加listener, 监听该znode的数据或者子节点等变化

一 TopicChangeListener

def handleChildChange(parentPath : String, children : java.util.List[String]) {inLock(controllerContext.controllerLock) {// 判断状态机是否启动if (hasStarted.get) {try {// 获取'/brokers/topics' znode下的子节点集合val currentChildren = {import JavaConversions._debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))(children: Buffer[String]).toSet}// 获取新加入的topic(过滤ControllerContext的所有topic,然后剩余的就是新的),val newTopics = currentChildren -- controllerContext.allTopics// 获取删除的topics(因为只要删除了,currentChildren肯定是没有这个topic,但是缓存里有,所以可以获取到)val deletedTopics = controllerContext.allTopics -- currentChildren// 将当前topic更新到controllerContext.allTopicscontrollerContext.allTopics = currentChildren// 获取新的topic的所有分区信息以及分区的AR副本集val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)// ControllerContext的AR副本集过滤掉deletedTopics队列里的topiccontrollerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>!deletedTopics.contains(p._1.topic))// 并将新的topics更新到controllerContext的AR副本集controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,deletedTopics, addedPartitionReplicaAssignment))// 新的topics不为空,处理新增topicif(newTopics.nonEmpty)controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)} catch {case e: Throwable => error("Error while handling new topic", e )}}}
}
def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {info("New topic creation callback for %s".format(newPartitions.mkString(",")))// 为每一个新增的topic注册PartitionModificationChangeListenertopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))onNewPartitionCreation(newPartitions)
}

def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
  info("New partition creation callback for %s".format(newPartitions.mkString(",")))
  // 将所有指定新增的分区的状态转换为NewPartition状态
  partitionStateMachine
.handleStateChanges(newPartitions, NewPartition)
  // 将指定新增的分区的所有副本都转换为NewReplica状态
  replicaStateMachine
.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
  // 将指定新增的分区转化为OnlinePartition状态
  partitionStateMachine
.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
  // 将指定新增的分区的所有副本都转换为OnlineReplica状态
  replicaStateMachine
.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
}

二DeleteTopicListener

Topic 满足下列三种情况不能被删

# 如果topic任意一个分区正在重新分配副本

# 如果topic任意一个分区正在进行优先副本选举

# 如果topic的任意分区任意的一个副本所在broker宕机,则topic不能被删除

def handleChildChange(parentPath : String, children : java.util.List[String]) {
  inLock(controllerContext.controllerLock) {
    // 获取将被删除的topics集合
   
var topicsToBeDeleted = {
      import JavaConversions._
      (children: Buffer[String]).toSet
    }
    debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
    // 查看这些topics是在controllerContext.allTopics不存在的
   
val nonExistentTopics = topicsToBeDeleted-- controllerContext.allTopics
    // 如果nonExistentTopics不为空,则删除它在zookeeper的路径
   
if(nonExistentTopics.nonEmpty) {
      warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
      nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
    }
    // 过滤掉不存在的待删除topic
   
topicsToBeDeleted--= nonExistentTopics
    if(topicsToBeDeleted.nonEmpty) {
      info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
      // 检查topic中是否处于不可删除状态
     
topicsToBeDeleted.foreach { topic =>
        // 检测topic是否有'优先副本’选举
       
val preferredReplicaElectionInProgress =
          controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
        // 检测是否有分区正在进行副本的重新分配
       
val partitionReassignmentInProgress =
          controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
        // 如果有一个为true,则标记该topic删除不成功
       
if(preferredReplicaElectionInProgress || partitionReassignmentInProgress)
          controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
      }
      // 将可用删除的topic添加要被删除的topics到删除队列
     
controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
    }
  }
}

三PartitionModificationsListener

我们知道在topic的删除过程中,涉及到PartitionModificationsListener

的注册和取消。在新增topic时会为每一个topic注册一个PartitionModificationsListener,在成功删除topic之后会将注册Partition

ModifcationListener删除。PartitionModificationsListener会监听"/brokers/topics/[topic_name]"数据的变化,主要就是监听一个topic分区的变化

def handleDataChange(dataPath : String, data: Object) {
  inLock(controllerContext.controllerLock) {
    try {
      info(s"Partition modification triggered $data for path $dataPath")
      // 从zookeeper获取topic分区记录
     
val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic))
      // 过滤出新增加的分区
     
val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
        !controllerContext.partitionReplicaAssignment.contains(p._1))
      if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(topic))
        error("Skipping adding partitions %s for topic %s since it is currentlybeing deleted"
              .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
      else {
        // 如果新增分区
       
if (partitionsToBeAdded.nonEmpty) {
          info("New partitions to be added %s".format(partitionsToBeAdded))
          // 更新新增的分区到ControllerContext中
          controllerContext
.partitionReplicaAssignment.++=(partitionsToBeAdded)
          // 切换新增分区及其副本状态,最终使其上线对外提供服务
         
controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)
        }
      }
    } catch {
      case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e )
    }
  }
}

def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
  info("New partition creation callback for %s".format(newPartitions.mkString(",")))
  // 将所有指定新增的分区的状态转换为NewPartition状态
  partitionStateMachine
.handleStateChanges(newPartitions, NewPartition)
  // 将指定新增的分区的所有副本都转换为NewReplica状态
  replicaStateMachine
.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
  // 将指定新增的分区转化为OnlinePartition状态
  partitionStateMachine
.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
  // 将指定新增的分区的所有副本都转换为OnlineReplica状态
  replicaStateMachine
.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
}

四 BrokerChangeListener

会监听"/brokers/ids"节点下子节点变化,主要负责处理broker的上线和故障下线

def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
  info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.sorted.mkString(",")))
  inLock(controllerContext.controllerLock) {
    if (hasStarted.get) {
      ControllerStats.leaderElectionTimer.time {
        try {
          // 从zookeeper获取broker列表
         
val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
          // 获取broker id列表
         
val curBrokerIds = curBrokers.map(_.id)
          val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
          // 过滤出新的brokerId
         
val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
          // 过滤出挂掉的broker id
         
val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
          val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))
          controllerContext.liveBrokers = curBrokers
          val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
          val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
          val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
          info("Newly added brokers: %s, deleted brokers: %s, all live brokers:%s"
            .format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
          // 创建controller与新增的broker的网络连接
         
newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
          // 关闭controller与故障broker的网络连接
         
deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
          // 处理新增broker的上线
         
if(newBrokerIds.nonEmpty)
            controller.onBrokerStartup(newBrokerIdsSorted)
          // 处理新增broker的下线
         
if(deadBrokerIds.nonEmpty)
            controller.onBrokerFailure(deadBrokerIdsSorted)
        } catch {
          case e: Throwable => error("Error while handling broker changes", e)
        }
      }
    }
  }
}

def onBrokerStartup(newBrokers: Seq[Int]) {
  info("New broker startup callback for %s".format(newBrokers.mkString(",")))
  val newBrokersSet = newBrokers.toSet
  // 发送sendUpdateMetadataRequest请求
 
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
  // 获取在新的broker上的所有副本
 
val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
  // 将新增broker的副本状态都转化为OnlinePartition
  replicaStateMachine
.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica)
  partitionStateMachine.triggerOnlinePartitionStateChange()
  // 检测是否需要进行副本的重新分配
 
val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter {
    case (topicAndPartition, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
  }
  partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2))
  // 如果新增broker上有待删除的topic副本,则唤醒DeleteTopicsThread线程,进行删除
 
val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
  if(replicasForTopicsToBeDeleted.nonEmpty) {
    info(("Some replicas %s for topics scheduled for deletion %s are on thenewly restarted brokers %s. " +
      "Signalingrestart of topic deletion for these topics").format(replicasForTopicsToBeDeleted.mkString(","),
      deleteTopicManager.topicsToBeDeleted.mkString(","), newBrokers.mkString(",")))
    deleteTopicManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))
  }
}

def onBrokerFailure(deadBrokers: Seq[Int]) {
  info("Broker failure callback for %s".format(deadBrokers.mkString(",")))
  // 将正常关闭的broker从dead broker移除
 
val deadBrokersThatWereShuttingDown =
    deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))
  info("Removed %s from list of shutting down brokers.".format(deadBrokersThatWereShuttingDown))
  val deadBrokersSet = deadBrokers.toSet
  // 过滤得到leader副本在故障broker上的分区,并将其分区转化为offline partition状态
 
val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader =>
    deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader) &&
      !deleteTopicManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet
  partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
  // 将Offline Partition状态的分区转化为OnlinePartition状态
  partitionStateMachine
.triggerOnlinePartitionStateChange()
  // 过滤得到在故障broker上的副本,将这些副本转化为OfflineReplica
 
var allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokersSet)
  val activeReplicasOnDeadBrokers = allReplicasOnDeadBrokers.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
  // handle deadreplicas
  replicaStateMachine
.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica)
  // 检查故障broker上是否持有待删除的topic的副本,如果存在,则将其转化为ReplicaDeletionIneligible状态,并标记topic不可删除
 
val replicasForTopicsToBeDeleted = allReplicasOnDeadBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
  if(replicasForTopicsToBeDeleted.nonEmpty) {
    deleteTopicManager.failReplicaDeletion(replicasForTopicsToBeDeleted)
  }
  // 发送UpdateMetadataCacheRequest更新所有可用broker信息
 
if (partitionsWithoutLeader.isEmpty) {
    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
  }
}

五IsrChangeNotificationListener

我们知道follower副本会与leader副本进行消息同步,当follower副本追上leader副本就会被添加到ISR列表;当follower副本与leader副本差距太大时会被剔除ISR集合。

Leader副本不仅会在ISR集合变化时将其记录到zookeeper,还会调用ReplicaManager.recordIsrChange方法记录到isrChangeSet集合,之后通过isr-change-propogation定时任务将该集合中的数据周期性写入zookeeper的"/isr-change-notification"路径下

IsrChangeNotificationListener就是用于监听此路径下子节点的变化:

override def handleChildChange(parentPath: String, currentChildren: util.List[String]): Unit = {
  import scala.collection.JavaConverters._
  inLock(controller.controllerContext.controllerLock) {
    debug("[IsrChangeNotificationListener] Fired!!!")
    // 获取'/isr-change-notification';路径下的子节点
   
val childrenAsScala: mutable.Buffer[String] = currentChildren.asScala
    try {
      // 获取分区信息
     
val topicAndPartitions: immutable.Set[TopicAndPartition] = childrenAsScala.flatMap(x => getTopicAndPartition(x)).toSet
      if (topicAndPartitions.nonEmpty) {
        // 更新ControllerContext的partitionLeadershipInfo(分区的leader信息)
       
controller.updateLeaderAndIsrCache(topicAndPartitions)
        // 然后向那些可用的broker发送UpdateMetadataCacheRequest请求
       
processUpdateNotifications(topicAndPartitions)
      }
    } finally {
      // 删除'isr-change-notification/partitions'下已经处理的信息
     
childrenAsScala.map(x => controller.controllerContext.zkUtils.deletePath(
        ZkUtils.IsrChangeNotificationPath + "/" + x))
    }
  }
}

六PreferredReplicaElectionListener

负责监听"/admin/preferred_replica_election"节点,当我们通过优先副本选举的命令指定某些分区需要进行优先副本选举时会将指定的分区信息写入该节点,从而触发PreferredReplicaElectionListener进行处理。优先副本选举目的是让分区的优先副本重新成为leader副本,这是为了让Leader副本在整个集群中分布的更加均衡

def handleDataChange(dataPath: String, data: Object) {
  debug("Preferred replica election listener fired for path %s. Recordpartitions to undergo preferred replica election %s"
          .format(dataPath, data.toString))
  inLock(controllerContext.controllerLock) {
    // 获取参加优先副本选举的分区
   
val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
    if(controllerContext.partitionsUndergoingPreferredReplicaElection.nonEmpty)
      info("These partitions are already undergoing preferred replica election:%s"
        .format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
    // 过滤正在进行优先副本选举的分区
   
val partitions = partitionsForPreferredReplicaElection-- controllerContext.partitionsUndergoingPreferredReplicaElection
    // 过滤掉正在进行删除状态的topic的分区
   
val partitionsForTopicsToBeDeleted = partitions.filter(p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
    if(partitionsForTopicsToBeDeleted.nonEmpty) {
      error("Skipping preferred replica election for partitions %s since therespective topics are being deleted"
        .format(partitionsForTopicsToBeDeleted))
    }
    // 对剩余的分区调用onPreferredReplicaElection方法进行优先副本的选举
   
controller.onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)
  }
}

def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = false) {
  info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
  try {
    // 将参加优先副本选举的分区添加到partitionsUndergoingPreferredReplicaElection
    controllerContext
.partitionsUndergoingPreferredReplicaElection ++= partitions
    // 将对应的topic标记为不可删除
    deleteTopicManager
.markTopicIneligibleForDeletion(partitions.map(_.topic))
    // 将Partition转化成OnlinePartition,除了重新选举leader,还会更新zookeeper中的数据,并且
    // 发送LeaderAndIsrRequest和UpdateMetadataRequest
    partitionStateMachine
.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
  } catch {
    case e: Throwable => error("Error completing preferred replica leader election for partitions%s".format(partitions.mkString(",")), e)
  } finally {
    // 清理partitionsUndergoingPreferredReplicaElection和zookeeper相关的数据
   
removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)
    deleteTopicManager.resumeDeletionForTopics(partitions.map(_.topic))
  }
}

七 副本重新分配相关的Listener

PartitionReassignedListener监听zookeeper路径"/admin/reassign_

partitions",当接收到ReassignPartitions命令指定某些分区需要重新分配副本,会将指定的分区的信息写入该节点,从而触发PartitionReassignedListener进行处理

下面是副本重新分配的步骤:

第一步:先从zookeeper的"/admin/reassign_partitions"读取分区重分配信息

第二步:过滤掉正在进行重新分配的分区

第三步:检测其topic是否为待删除的topic.如果是,则调用controller的removePartitionFromReassignedPartitions:

# 取消此分区注册的ReassignedPartitionsIsrChangeListener

# 删除zookeeper的"/admin/reassign_partitions"节点中与当前分区相关的数据

# 从partitionsBeingReassigned集合中删除分区相关数据

第四步:否则创建ReassignedPartitionsContext,调用initiateReassign

ReplicasForTopicPartition方法开始为重新分配副本做准备

# 获取旧的AR副本集合指定的新的副本集

# 比较新旧AR副本集,完全一样抛出异常

# 判断新AR副本集涉及的broker是否都可用,如不是则抛出异常

# 为分区添加注册ReassignedPartitionsIsrChangeListener

# 将分区添加到partitionsBeingReassigned集合中,并标志该topic不能被删除

# 调用onPartitionReassignment方法,开始执行副本重新分配

第五步: 判断AR集合中所有副本是否已经进入ISR集合,如果没有,进行如下步骤:

# 将分区在ContextController和zookeeper中AR集合更新成(新AR+旧AR)并集

# 向该AR并集发送LeaderAndIsrRequest,增加zookeeper中记录的leader_epoch值

# 将(新AR-旧AR)中差集副本更新成NewReplica,因为表示新增的副本,此步骤会向这些副本发送LeaderAndIsrRequest,使其成为follower副本,并发送UpdateMetadataSRequest请求

第六步:如果新的AR集合的副本都已经进入ISR列表,则执行一下步骤:

# 将新AR集合中所有副本都转化为OnlineReplica状态

# 更新 ControllerContext的AR记录为新的AR

# 如果当前leader副本在新的AR集合,则递增zookeeper和Controller

Context的leader_epoc值,并且发送LeaderAndIsrRequest和Update

MetadataSRequest请求

# 如果当前leader副本不在新的AR集合,或者leader副本 不可用,则将分区状态转化为OnlinePartition,主要目的是使用ReassignedPartitionLeaderSelector选举新的Leader副本,使得新的AR集合中一个副本成为新的leader,然后会发发送LeaderAndIsrRequest和UpdateMetadataSRequest请求

# 将(旧AR-新AR)中的副本转换为OfflineReplica,此步骤走会发送StopReplicaRequest,清理ISR列表中相关的副本,并且发送LeaderAndIsrRequest和UpdateMetadataSRequest请求

# 接着将将(旧AR-新AR)中的副本转换为ReplicaDeletionStart状态,此步骤发送StopReplicaRequest,完成删除后,将副本状态转化为状态ReplicaDeletionSuccessful

# 更新zookeeper的AR信息

# 将此分区相关信息从"/admin/reassign_partitions"移除

# 向所有broker发送UpdateMetadataRequest

# 尝试取消相关的topic不可删除状态,唤醒DeleteTopicsThread线程

def handleDataChange(dataPath: String, data: Object) {
  debug("Partitions reassigned listener fired for path %s. Record partitionsto be reassigned %s"
    .format(dataPath, data))
  // 从zookeeper获取要重新分配副本的分区
 
val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString)
  // 过滤掉正在进行重新分配的分区
 
val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
    partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
  }
  partitionsToBeReassigned.foreach { partitionToBeReassigned =>
    inLock(controllerContext.controllerLock) {
      // 检测其topic是否为待删除的topic
     
if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
        error("Skipping reassignment of partition %s for topic %s since it iscurrently being deleted"
          .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
        controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
      } else {
        // 开始进行分区的重新分配
       
val context = new ReassignedPartitionsContext(partitionToBeReassigned._2)
        controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
      }
    }
  }
}

def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) {
  if(controllerContext.partitionsBeingReassigned.get(topicAndPartition).isDefined) {
    // 取消此分区注册的ReassignedPartitionsIsrChangeListener
   
zkUtils.zkClient.unsubscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
      controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
  }
  // 获取将要重新分配的分区
 
val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
  // 获取将要重新分配的分区删除与当前分区相关的数据
 
val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition
  // 将新的将要重新分配的分区信息写入zookeeper
 
zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
  // 更新ControllerContext的partitionsBeingReassigned,也是把正在删除topci的分区移除
  controllerContext
.partitionsBeingReassigned.remove(topicAndPartition)
}

def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
                                      reassignedPartitionContext: ReassignedPartitionsContext) {
  // 获取需要reassign的分区的副本
 
val newReplicas = reassignedPartitionContext.newReplicas
  val topic = topicAndPartition.topic
  val partition = topicAndPartition.partition
  // 从新副本过滤出可用的副本
 
val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
  try {
    val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
    assignedReplicasOptmatch {
      case Some(assignedReplicas) =>
        // 如果需要reassign的副本和之前的AR副本相同,则不用
       
if(assignedReplicas == newReplicas) {
          throw new KafkaException("Partition %s to bereassigned is already assigned to replicas".format(topicAndPartition) +
            " %s.Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
        } else {
          info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
          // 注册一个ISR改变监听器
         
watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
          controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
          // 在reassign的时候,标记删除topic是不成功的
          deleteTopicManager
.markTopicIneligibleForDeletion(Set(topic))
          // 真正执行partition reassign操作
         
onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
        }
      case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
        .format(topicAndPartition))
    }
  } catch {
    case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
    removePartitionFromReassignedPartitions(topicAndPartition)
  }
}

def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
  val reassignedReplicas = reassignedPartitionContext.newReplicas
  if (!areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)) {
    info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
      "reassignednot yet caught up with the leader")
    val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
    val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
    //1. Update ARin ZK with OAR + RAR.
   
updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
    //2. SendLeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR).
   
updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
      newAndOldReplicas.toSeq)
    //3. replicasin RAR - OAR -> NewReplica
   
startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
    info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
      "reassignedto catch up with the leader")
  } else {
    //4. Waituntil all replicas in RAR are in sync with the leader.
   
val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
    //5. replicasin RAR -> OnlineReplica
   
reassignedReplicas.foreach { replica =>
      replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
        replica)), OnlineReplica)
    }
    //6. Set AR toRAR in memory.
    //7. Send LeaderAndIsr request with apotential new leader (if current leader not in RAR) and
    //  a new AR (using RAR) and same isr to every broker in RAR
   
moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
    //8. replicasin OAR - RAR -> Offline (force those replicas out of isr)
    //9. replicas in OAR - RAR ->NonExistentReplica (force those replicas to be deleted)
   
stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
    //10. UpdateAR in ZK with RAR.
   
updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
    //11. Updatethe /admin/reassign_partitions path in ZK to remove this partition.
   
removePartitionFromReassignedPartitions(topicAndPartition)
    info("Removed partition %s from the list of reassigned partitions inzookeeper".format(topicAndPartition))
    controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
    //12. Afterelecting leader, the replicas and isr information changes, so resend the updatemetadata request to every broker
   
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
    // signaldelete topic thread if reassignment for some partitions belonging to topicsbeing deleted just completed
    deleteTopicManager
.resumeDeletionForTopics(Set(topicAndPartition.topic))
  }
}

Zookeeper Listener分析相关推荐

  1. KafkaController机制(六):Zookeeper Listener之TopicDeletionManager与DeleteTopicsListener

    KafkaCtroller会通过Zookeeper监控整个kafka集群的运行状. TopicChangeListener负责管理topic的增删,监听/brokers/topics节点的子节点变化. ...

  2. Oracle11g rac监听,关于oracle11g RAC 监听器使用中出现的no services以及no listener分析...

    首先,我们必须知道在oracle11g中,官方推荐GI和oracle数据库软件分开管理,在这种情况下,我们会发现数据库监听器其实是位于GI的管理账号下,即通常我们所创建的grid账号.在该grid账号 ...

  3. Zookeeper原理分析之存储结构ZkDatabase

    ZKDatabase在内存中维护了zookeeper的sessions, datatree和commit logs集合. 当zookeeper server启动的时候会将txnlogs和snapsho ...

  4. Zookeeper和Redis实现分布式锁,附我的可靠性分析

    作者:今天你敲代码了吗 链接:https://www.jianshu.com/p/b6953745e341 在分布式系统中,为保证同一时间只有一个客户端可以对共享资源进行操作,需要对共享资源加锁来实现 ...

  5. ZooKeeper 源码和实践揭秘

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 作者:runnerzhang,腾讯 CSIG 后台开发工程 ...

  6. 干货 | ZooKeeper 源码和实践揭秘

    精选30+云产品,助力企业轻松上云!>>> 摘要:ZooKeeper 是个针对大型分布式系统的高可用.高性能且具有一致性的开源协调服务,被广泛的使用.对于开发人员,ZooKeeper ...

  7. Apache ShenYu源码阅读系列-基于ZooKeeper的数据同步

    Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关. 在ShenYu网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中.Apac ...

  8. 【Zookeeper系列】ZooKeeper管理分布式环境中的数据(转)

    原文地址:https://www.cnblogs.com/sunddenly/p/4092654.html 引言 本节本来是要介绍ZooKeeper的实现原理,但是ZooKeeper的原理比较复杂,它 ...

  9. 分布式服务框架 Zookeeper — 管理分布式环境中的数据

    FROM: http://www.superwu.cn/2014/11/26/1461 本节本来是要介绍ZooKeeper的实现原理,但是ZooKeeper的原理比较复杂,它涉及到了paxos算法.Z ...

最新文章

  1. 万人云峰会DevSecOps论坛:数字化浪潮下,安全开发与运维该如何破局?
  2. AOJ 0525 Osenbei
  3. 报错 应用程序池 中asp.net 4.0 自动停止
  4. java quic kcptun_Server-网络加速Kcptun
  5. Request请求转发
  6. 服务器被入侵怎么办,如何预防
  7. 【LOJ#6198】—谢特(后缀数组+01Trie)
  8. 如何修改Linux字体大小,如何更改字体并调整字体大小?
  9. 设计师都在用的6个免费设计素材网站~
  10. 利用ArcGIS创建要素与表之间的关系类并发布带有关系数据表的要素服务
  11. CSDR华为云_浅谈华为云Stack中Global、Region、AZ、资源池以及主机组
  12. android 启动图片 大小,ios 和安卓常用图标、启动图 尺寸
  13. 基于MMRotate训练自定义数据集 做旋转目标检测 2022-3-30
  14. Maven问题记录:本地打包成功,远程打包报错,找不到依赖包
  15. 构建万物互联的智能世界,华为为何选择了智能体?
  16. Life - 生活杂记
  17. bmob云数据库属于mysql吗_bmob后端云服务 bmob云数据库
  18. 尼日利亚CRIA认证办理费用时间要多久
  19. 容器安全检查工具 - Clair v2.0.0
  20. Emoji表情对照码地址

热门文章

  1. iphone如何分屏_苹果手机如何操作分屏 苹果手机录屏没有声音这么做轻松解决...
  2. 第二篇:稳定性之如何有条不紊地应对风险?
  3. 神经网络 误差下降 准确率不上升_Go进程的HeapReleased上升,但是RSS不下降造成内存泄漏?...
  4. linux卸载splunk,linux安装splunk-enterprise
  5. 广二师的计算机专业好不,广东技术师范学院和广东第二师范学院哪一个更好?...
  6. python是最好的语言 永远二十岁_“Python才是世界上最好的语言”
  7. 联通g网java业务的是什么_联通发力G网增值业务
  8. 创造自己的专属免费网盘:Nextcloud
  9. Python3.x中Django-xadmin的添加
  10. windows 开启mysql日志记录_Windows下MySQL开启历史记录