KafkaCtroller会通过Zookeeper监控整个kafka集群的运行状。
TopicChangeListener负责管理topic的增删,监听/brokers/topics节点的子节点变化。

  /*** This is the zookeeper listener that triggers all the state transitions for a partition*/class TopicChangeListener extends IZkChildListener with Logging {this.logIdent = "[TopicChangeListener on Controller " + controller.config.brokerId + "]: "@throws(classOf[Exception])def handleChildChange(parentPath : String, children : java.util.List[String]) {inLock(controllerContext.controllerLock) {if (hasStarted.get) {try {val currentChildren = {// 获取/brokers/topics节点的子节点集合import JavaConversions._debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))(children: Buffer[String]).toSet}// 获取新添加的topicval newTopics = currentChildren -- controllerContext.allTopics// 得到删除的topicval deletedTopics = controllerContext.allTopics -- currentChildren// 更新ControllerConetext的allTopics集合controllerContext.allTopics = currentChildren// 更新ControllerConetext的AR集合val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>!deletedTopics.contains(p._1.topic))controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,deletedTopics, addedPartitionReplicaAssignment))// 调用onNewTopicCreation()方法处理新增topicif(newTopics.size > 0)controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)} catch {case e: Throwable => error("Error while handling new topic", e )}}}}}

onNewTopicCreation()方法中还会为每个新增的topic注册一个PartitionModificationsListener,然后调用onNewPartitionCreation方法完成新增Topic的分区状态以及副本状态转换。

  def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {info("New topic creation callback for %s".format(newPartitions.mkString(",")))// subscribe to partition changes//为新增的topic都注册一个PartitionModificationsListener  topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))onNewPartitionCreation(newPartitions)}def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {info("New partition creation callback for %s".format(newPartitions.mkString(",")))// 为所有指定的新增分区装换为NewPartition状态partitionStateMachine.handleStateChanges(newPartitions, NewPartition)// 为指定分区的所有副本都转换为NewReplica状态replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)// 将所有指定的新增分区装换为OnlinePartition状态// 这里指定的offlinePartitionSelector,但是从newPartition转换为OnlinePartition没有用到offlinePartitionSelectorpartitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)}

TopicDeletionManager与DeleteTopicsListener
TopicDeletionManager维护多个集合,用于管理待删除的topic和不可删除的集合,他会启动一个deleteTopicThread来执行删除Topic的具体逻辑。
当topic满足以下三种情况时不能被删除:
1. topic正在重新分配副本。
2. 任一分区正在进行优先副本的选举
3. 任一人分区的任一副本所在的broker宕机。
TopicDeletionManager字段:

class TopicDeletionManager(controller: KafkaController,initialTopicsToBeDeleted: Set[String] = Set.empty,initialTopicsIneligibleForDeletion: Set[String] = Set.empty) extends Logging {val controllerContext = controller.controllerContext// 管理分区状态的状态机val partitionStateMachine = controller.partitionStateMachine// 管理副本状态的状态机val replicaStateMachine = controller.replicaStateMachine// 要被删除的topicval topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted// 要被删除的分区集合val partitionsToBeDeleted: mutable.Set[TopicAndPartition] = topicsToBeDeleted.flatMap(controllerContext.partitionsForTopic)val deleteLock = new ReentrantLock()// 不可删除的topic集合val topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++(initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted)// Condition对象,用来和其他线程同步val deleteTopicsCond = deleteLock.newCondition()// Topic删除操作是否开始的标志val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false)//delelteTopic的线程var deleteTopicsThread: DeleteTopicsThread = null// 是否支持删除topic,配置项delete.TOPIC.ENABLE值val isDeleteTopicEnabled = controller.config.deleteTopicEnable
}

DeleteTopicsListener被触发后通过enqueueTopicsForDeletion把将删除的topic放在topicsToBeDeleted集合,把分区放在paritionsToBeDeletd集合,并唤醒DeleteTopicsThead处理。

  /*** Invoked by the child change listener on /admin/delete_topics to queue up the topics for deletion. The topic gets added* to the topicsToBeDeleted list and only gets removed from the list when the topic deletion has completed successfully* i.e. all replicas of all partitions of that topic are deleted successfully.* @param topics Topics that should be deleted*/def enqueueTopicsForDeletion(topics: Set[String]) {if(isDeleteTopicEnabled) {// 将删除的topic放入topicsTeBeDeleted集合topicsToBeDeleted ++= topics// 把分区放在paritionsToBeDeletd集合partitionsToBeDeleted ++= topics.flatMap(controllerContext.partitionsForTopic)// 唤醒DeleteTopicsThead处理。resumeTopicDeletionThread()}}

DeleteTopicsThread是真正执行Topic删除操作的献策还给你,它继承了ShutDownableThread,入口是doWork方法。

  class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread-" + controller.config.brokerId, isInterruptible = false) {val zkUtils = controllerContext.zkUtilsoverride def doWork() {// 等待线程被唤醒awaitTopicDeletionNotification()if (!isRunning.get)returninLock(controllerContext.controllerLock) {val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeletedif(!topicsQueuedForDeletion.isEmpty)info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(","))topicsQueuedForDeletion.foreach { topic =>// if all replicas are marked as deleted successfully, then topic deletion is done// 检测所有副本是否处于ReplicaDeletetionSuccessfulif(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {// clear up all state for this topic from controller cache and zookeeper// 清空所有副本和Partition状态,清空ControllerContext和Zookeeper的信息completeDeleteTopic(topic)info("Deletion of topic %s successfully completed".format(topic))} else {if(controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) {//如果任一一个副本处于ReplicaDeletionStarted,则继续等待// ignore since topic deletion is in progressval replicasInDeletionStartedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionStarted)val replicaIds = replicasInDeletionStartedState.map(_.replica)val partitions = replicasInDeletionStartedState.map(r => TopicAndPartition(r.topic, r.partition))info("Deletion for replicas %s for partition %s of topic %s in progress".format(replicaIds.mkString(","),partitions.mkString(","), topic))} else {// if you come here, then no replica is in TopicDeletionStarted and all replicas are not in// TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion// or there is at least one failed replica (which means topic deletion should be retried).// 任一副本处于ReplicaDelettionIneligible,则重置为OfflineReplca后重试if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {// mark topic for deletion retrymarkTopicForDeletionRetry(topic)}}}// Try delete topic if it is eligible for deletion.if(isTopicEligibleForDeletion(topic)) {info("Deletion of topic %s (re)started".format(topic))// topic deletion will be kicked off// 检测当前副本是否可以删除onTopicDeletion(Set(topic))} else if(isTopicIneligibleForDeletion(topic)) {info("Not retrying deletion of topic %s at this time since it is marked ineligible for deletion".format(topic))}}}}}

isTopicEligibleForDeletion根据下面三个条件判断是否可以开始删除。

  private def isTopicEligibleForDeletion(topic: String): Boolean = {topicsToBeDeleted.contains(topic) && //当前topic没有完全完成删除操作(!isTopicDeletionInProgress(topic) && //删除操作没有开始!isTopicIneligibleForDeletion(topic))// 没有被标记为不可删除}

onTopicDeletion方法的核心是向所有可用的Broker发送UpdateMeatadataRequest,leader字段为LeaderDuringDelete,通知他们制定的Topic要删除,并删除MetadataCache中与此Topic相关的缓存信息。

  /*** This callback is invoked by the DeleteTopics thread with the list of topics to be deleted* It invokes the delete partition callback for all partitions of a topic.* The updateMetadataRequest is also going to set the leader for the topics being deleted to* {@link LeaderAndIsr#LeaderDuringDelete}. This lets each broker know that this topic is being deleted and can be* removed from their caches.*/private def onTopicDeletion(topics: Set[String]) {info("Topic deletion callback for %s".format(topics.mkString(",")))// send update metadata so that brokers stop serving data for topics to be deletedval partitions = topics.flatMap(controllerContext.partitionsForTopic)// 向可用的Broker发送UpdateMetadataRequestcontroller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)// 按Topic进行分组val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)// 开始分区的删除操作topics.foreach { topic =>onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).map(_._1).toSet)}}

onTopicDeletion()方法直接调用了startReplicaDeletion方法,对副本进行删除。

  private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicAndPartition]) {info("Partition deletion callback for %s".format(partitionsToBeDeleted.mkString(",")))//获取待删除Topic的AR集合val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted)startReplicaDeletion(replicasPerPartition)}private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {replicasForTopicsToBeDeleted.groupBy(_.topic).foreach { case(topic, replicas) =>// 获取Topic中所有的可用副本var aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic.equals(topic))// 获取Topic中所有已完成的不可用副本val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic// Topic中已经完成删除的副本val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)// 第一次删除的副本集合val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas// move dead replicas directly to failed state// 将不可用的副本装换为ReplicaDeletionIneligible状态replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible)// send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader// 将待删除的副本转换为Offline状态replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica)debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))// 将待删除的副本转化为ReplicaDelettuionStarted状态controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build)if(deadReplicasForTopic.size > 0) {// 标记当前副本不可以删除。debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic))markTopicIneligibleForDeletion(Set(topic))}}}

deleteTopicStopReplicaCallback回调函数调用failReplicaDeletion处理方法处理异常副本,调用completeReplicaDeletion方法处理返回正常StopReplicaResponse副本。

  private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: AbstractRequestResponse, replicaId: Int) {val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]debug("Delete topic callback invoked for %s".format(stopReplicaResponse))val responseMap = stopReplicaResponse.responses.asScalaval partitionsInError =if (stopReplicaResponse.errorCode != Errors.NONE.code) responseMap.keySetelse responseMap.filter { case (_, error) => error != Errors.NONE.code }.map(_._1).toSetval replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))inLock(controllerContext.controllerLock) {// move all the failed replicas to ReplicaDeletionIneligiblefailReplicaDeletion(replicasInError)if (replicasInError.size != responseMap.size) {// some replicas could have been successfully deletedval deletedReplicas = responseMap.keySet -- partitionsInErrorcompleteReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)))}}}def failReplicaDeletion(replicas: Set[PartitionAndReplica]) {if(isDeleteTopicEnabled) {val replicasThatFailedToDelete = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))if(replicasThatFailedToDelete.size > 0) {val topics = replicasThatFailedToDelete.map(_.topic)debug("Deletion failed for replicas %s. Halting deletion for topics %s".format(replicasThatFailedToDelete.mkString(","), topics))// 把异常副本转化为ReplicaDeletionIneligible状态controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete, ReplicaDeletionIneligible)// 标记当前topic不可用markTopicIneligibleForDeletion(topics)// 唤醒DeleteTopicsThead线程。resumeTopicDeletionThread()}}}private def completeReplicaDeletion(replicas: Set[PartitionAndReplica]) {val successfullyDeletedReplicas = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))debug("Deletion successfully completed for replicas %s".format(successfullyDeletedReplicas.mkString(",")))// 把成功删除的副本转换为ReplicaDeletionSuccessful状态controller.replicaStateMachine.handleStateChanges(successfullyDeletedReplicas, ReplicaDeletionSuccessful)// 唤醒DeleteTopicsThead线程。resumeTopicDeletionThread()}

回到doWork函数,调用completeDeleteTopic对成功删除的Topic进行处理。

  private def completeDeleteTopic(topic: String) {// deregister partition change listener on the deleted topic. This is to prevent the partition change listener// firing before the new topic listener when a deleted topic gets auto created// 取消partitionModificationsListener的监听partitionStateMachine.deregisterPartitionChangeListener(topic)// 将ReplicaStateMachine中维护的这个topic的所有副本状态清除val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)// controller will remove this replica from the state machine as well as its partition assignment cachereplicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)// 将此Topic所有分区转换为NonExistentPartitionval partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)// move respective partition to OfflinePartition and NonExistentPartition statepartitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)// 清空topicToBeDeleted和partitionsToBeDeleted中的相关内容topicsToBeDeleted -= topicpartitionsToBeDeleted.retain(_.topic != topic)// 清空ZK中的所有数据val zkUtils = controllerContext.zkUtilszkUtils.zkClient.deleteRecursive(getTopicPath(topic))zkUtils.zkClient.deleteRecursive(getEntityConfigPath(ConfigType.Topic, topic))zkUtils.zkClient.delete(getDeleteTopicPath(topic))controllerContext.removeTopic(topic)}

doWork中markTopicForDeletionRetry方法处理不可删除的topic,把处于ReplicaDeletionIneligible状态的副本重新转换成OfflineReplica状态

   private def markTopicForDeletionRetry(topic: String) {// reset replica states from ReplicaDeletionIneligible to OfflineReplica//把处于ReplicaDeletionIneligible状态的副本重新转换成OfflineReplica状态val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)info("Retrying delete topic for topic %s since replicas %s were not successfully deleted".format(topic, failedReplicas.mkString(",")))controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica)}resumeDeletionForTopics对不可删除的topic的三个条件进行判断,唤醒DeleteTopicsThead线程。def resumeDeletionForTopics(topics: Set[String] = Set.empty) {if(isDeleteTopicEnabled) {val topicsToResumeDeletion = topics & topicsToBeDeletedif(topicsToResumeDeletion.size > 0) {topicsIneligibleForDeletion --= topicsToResumeDeletionresumeTopicDeletionThread()}}}

以上为TopicDeletionManager的介绍,下面看看DeleteTopicsListener,它会监听Zookeeper中的/admin/delete_topics节点下的子节点的变化,当TopicCommand在该路径下添加需要被删除的Topic时,DeleteTopicsListener会被触发,它将会该删除的topic交由TopicDeletionManger执行Topic删除操作。

def handleChildChange(parentPath : String, children : java.util.List[String]) {inLock(controllerContext.controllerLock) {var topicsToBeDeleted = {// 从ZK中获取待删除的topic集合import JavaConversions._(children: Buffer[String]).toSet}debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))// 检测topic是否存在val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))if(nonExistentTopics.size > 0) {warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))// 对于不存在的Topic,直接在/admin/delete_topics中删除这个节点nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))}// 过滤掉不存在的待删除的TopictopicsToBeDeleted --= nonExistentTopicsif(topicsToBeDeleted.size > 0) {info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))// mark topic ineligible for deletion if other state changes are in progress// 检查待删除的Topic是否处于不可删除的状态topicsToBeDeleted.foreach { topic =>// 检测Topic中是否有分区进行优先副本的选举val preferredReplicaElectionInProgress =controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)// 检测分区中是否有布恩的重新分配val partitionReassignmentInProgress =controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)if(preferredReplicaElectionInProgress || partitionReassignmentInProgress)// 将topic标记为不可删除。controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))}// add topic to deletion list// 可删除的topic提交被TopicDeletinManager执行controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)}}
}

KafkaController机制(六):Zookeeper Listener之TopicDeletionManager与DeleteTopicsListener相关推荐

  1. Zookeeper Listener分析

    KafkaController会通过zookeeper监控整个Kafka集群的运行状态,响应管理员的指令,通过在指定znode添加listener, 监听该znode的数据或者子节点等变化 一 Top ...

  2. 【Android 事件分发】事件分发源码分析 ( ViewGroup 事件传递机制 六 )

    Android 事件分发 系列文章目录 [Android 事件分发]事件分发源码分析 ( 驱动层通过中断传递事件 | WindowManagerService 向 View 层传递事件 ) [Andr ...

  3. 一步一步掌握线程机制(六)---Atomic变量和Thread局部变量

    一步一步掌握线程机制(六)---Atomic变量和Thread局部变量 前面我们已经讲过如何让对象具有Thread安全性,让它们能够在同一时间在两个或以上的Thread中使用.Thread的安全性在多 ...

  4. 关于事件监听机制的总结(Listener和Adapter)

    记得以前看过事件监听机制背后也是有一种设计模式的.(设计模式的名字记不清了,只记得背后实现的数据结构是数组.) 附上事件监听机制的分析图: 一个事件源可以承载多个事件(只要这个事件源支持这个事件就可以 ...

  5. Zookeeper核心工作机制(zookeeper特性、zookeeper数据结构、节点类型)

    10.1 zookeeper特性 1.Zookeeper:一个leader,多个follower组成的集群. 2.全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个ser ...

  6. JavaWeb学习 (二十六)————监听器(Listener)学习(二)

    一.监听域对象中属性的变更的监听器 域对象中属性的变更的事件监听器就是用来监听 ServletContext, HttpSession, HttpServletRequest 这三个对象中的属性变更信 ...

  7. H3C 环路避免机制六:触发更新

    转载于:https://www.cnblogs.com/fanweisheng/p/11156887.html

  8. 理解zookeeper选举机制

    转载:https://www.cnblogs.com/shuaiandjun/p/9383655.html 一.zookeeper集群 配置多个实例共同构成一个集群对外提供服务以达到水平扩展的目的,每 ...

  9. Apache ZooKeeper - 事件监听机制详解

    文章目录 事件监听机制命令 Zookeeper事件类型 实操 -w get -w /path 监听节点数据的变化 ls -w /path 监听子节点的变化(增,删) [监听目录] ls -w /pat ...

最新文章

  1. 看我是怎样让客户更快找到我的!
  2. 第三方医药物流的现状及发展
  3. 第十二届蓝桥杯大赛软件赛省赛 C/C++ 大学B组
  4. boost::core模块实现交换std::bitset
  5. centos6 dns转发_CentOS6 配置DNS服务器
  6. LeetCode 325. 和等于 k 的最长子数组长度(哈希表记录第一次出现的状态)
  7. JVM优化系列-Stop-The-World实战
  8. 【github系列】github代码仓创建及更新
  9. 华为云跻身Gartner报告中国三强,预示云计算市场的未来变局?
  10. python机器学习库sklearn——模型评估
  11. 一个自己主动依据xcode中的objective-c代码生成类关系图的神器
  12. linux红帽认证管理员,红帽(Red Hat)面向企业开发人员和应用管理员的认证
  13. Idea设置全白色 背景
  14. 蓝桥杯 印章Java
  15. offsetLeft理解以及MouseEvent接口中的screenX,clientX,pageX,offsetX区别.
  16. JPEG图片格式简单分析
  17. 开始菜单找不到anaconda,如何做?
  18. 艾兰岛编辑器-全局存储
  19. 晓说2017-定期持续更新
  20. 2016五大白马和黑马fund经理

热门文章

  1. 利用中国知网快速自动生成参考文献
  2. 【2019年03月29日】股票的滚动市盈率PE最低排名
  3. abs绝对位置指令 三菱plc_三菱FX系列PLC方便指令的使用方法
  4. oracle 查询带引号,oracle查询带有单引号的 -电脑资料
  5. python -使用pytesseract识别文字时遇到的问题
  6. 2020.7.18 T3Ocd(jz暑假训练day4)
  7. Windows server 2016 安装sql server
  8. 五、从命令行管理文件
  9. 【数学解析几何】C_几种常见的函数曲线——(典型曲线图)
  10. 学美工、平面设计、UI设计,哪个有前途?