TopicDeletionManager主要用于管理删除topic的状态机。

一 核心字段

controllerContext: ControllerContext 用于维护KafkaController中上下文信息

partitionStateMachine: PartitionStateMachinepartition状态机

replicaStateMachine: ReplicaStateMahcine replica 状态机

topicsToBeDeleted:Set[String] 记录将要被删除topic集合

partitionsToBeDeleted:Set[TopicAndPartition] 记录将要被删除partition集合

topicsIneligibleForDeletion:Set[String] 用于记录不可删除的集合

deleteTopicStateChanged:AtomicBoolean 用于标记topic删除操作是否开始

deleteTopicsThread:DeleteTopicsThread 用于删除topic的后台线程

isDeleteTopicEnabled:Boolean  是否允许删除topic

二 重要方法

2.1 start方法

defstart() {
  // 判断是否支持删除topic
 
if (isDeleteTopicEnabled) {
    // 创建DeleteTopicsThread线程对象
    deleteTopicsThread
= new DeleteTopicsThread()
    // 如果已经存在将要被删除的topic
   
if (topicsToBeDeleted.nonEmpty)
      // 标记删除操作开始
      deleteTopicStateChanged
.set(true)
    // 启动线程
    deleteTopicsThread
.start()
  }
}

2.2 DeleteTopicsThread的doWork方法使我们进行删除操作的入口方法

第一:获取待删除的topic的分区集合,发送UpdateMetadataCache

Request发送给所有的broker

private def onTopicDeletion(topics: Set[String]) {info("Topic deletion callback for %s".format(topics.mkString(",")))// 获取topic分区val partitions = topics.flatMap(controllerContext.partitionsForTopic)// 发送UpdateMetadataRequest请求controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)// 获取所有AR并且按照topic分区val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)topics.foreach { topic =>// 传入指定topic的AR副本集,开始分区的删除操作onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).keySet)}
}

第二:调用onPartitionDeletion方法开始对指定分区进行删除

将不可用副本转化为ReplicaDeletionIneligible状态;

将可用副本转化为OfflineReplica状态,而且还会发送StopReplica

Request请求到带删除的副本,同时还会向broker发送跟新元数据的请求,将副本从ISR列表移除;

将可用副本由OfflineReplica转化为ReplicationDeletionStarted。

private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicAndPartition]) {info("Partition deletion callback for %s".format(partitionsToBeDeleted.mkString(",")))// 获取每一个分区的副本val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted)// 开始删除startReplicaDeletion(replicasPerPartition)
}
private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {// 将要被删除的topic对应的分区副本按照topic分组replicasForTopicsToBeDeleted.groupBy(_.topic).foreach { case(topic, replicas) =>// 获取topic中所有可用的副本var aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic.equals(topic))// 获取不可用的副本val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopicval successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)// 第一次删除的副本集合val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas// 将不可用副本转换为ReplicaDeletionIneligible状态replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible)// 将待删除副本转化为OfflineReplicareplicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica)debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))// 将待删除副本转化为ReplicaDeletionStartedcontroller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build)if(deadReplicasForTopic.nonEmpty) {debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic))// 如果有副本挂掉,标记不能删除markTopicIneligibleForDeletion(Set(topic))}}
}

第三 : 调用StopReplicaResponse错误吗正常,则将副本转化为ReplicaDeletionSuccessful状态,否则转化为ReplicaDeletionIneligible状态,并标记该副本所在topic不能被删除,将该topic添加到topics

IneligibleForDeletion队列,最后唤醒DeleteTopicsThread

private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: AbstractRequestResponse, replicaId: Int) {// 获取StopReplicaResponse响应val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]debug("Delete topic callback invoked for %s".format(stopReplicaResponse))val responseMap = stopReplicaResponse.responses.asScalaval partitionsInError =if (stopReplicaResponse.errorCode != Errors.NONE.code) responseMap.keySetelse responseMap.filter { case (_, error) => error != Errors.NONE.code }.keySetval replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))inLock(controllerContext.controllerLock) {// move all the failed replicas to ReplicaDeletionIneligiblefailReplicaDeletion(replicasInError)// 如果响应没有异常if (replicasInError.size != responseMap.size) {val deletedReplicas = responseMap.keySet -- partitionsInErrorcompleteReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)))}}
}
def failReplicaDeletion(replicas: Set[PartitionAndReplica]) {if(isDeleteTopicEnabled) {val replicasThatFailedToDelete = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))if(replicasThatFailedToDelete.nonEmpty) {val topics = replicasThatFailedToDelete.map(_.topic)debug("Deletion failed for replicas %s. Halting deletion for topics %s".format(replicasThatFailedToDelete.mkString(","), topics))controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete, ReplicaDeletionIneligible)// 将当前线程标记为不可用markTopicIneligibleForDeletion(topics)// 唤醒DeleteTopicsThreadresumeTopicDeletionThread()}}
}
 
private def completeReplicaDeletion(replicas: Set[PartitionAndReplica]) {val successfullyDeletedReplicas = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))debug("Deletion successfully completed for replicas %s".format(successfullyDeletedReplicas.mkString(",")))// 成功的将删除的副本转化为ReplicaDeletionSuccessful状态controller.replicaStateMachine.handleStateChanges(successfullyDeletedReplicas, ReplicaDeletionSuccessful)resumeTopicDeletionThread() // 唤醒DeleteTopicThread
}

第四:开始第二次调用doWork方法,如果副本已经处于Replica

DeletionSuccessful状态,调用completeDeleteTopic方法完成topic删除

private def completeDeleteTopic(topic: String) {// 取消partitionModificationListener的监听partitionStateMachine.deregisterPartitionChangeListener(topic)// 获取已经删除topic的副本val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)// 将状态转化为NonExistentReplica对象replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)// 将topic对应的parition 转化为offlinePartitionpartitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)// 紧接着转化为NonExistentPartitionpartitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)// 将该topic从待删除队里移除topicsToBeDeleted -= topicpartitionsToBeDeleted.retain(_.topic != topic)val zkUtils = controllerContext.zkUtilszkUtils.zkClient.deleteRecursive(getTopicPath(topic))zkUtils.zkClient.deleteRecursive(getEntityConfigPath(ConfigType.Topic, topic))zkUtils.zkClient.delete(getDeleteTopicPath(topic))// 从ControllerContext中移除该topiccontrollerContext.removeTopic(topic)
}

TopicDeletionManager分析相关推荐

  1. Kafka源码分析10:副本状态机ReplicaStateMachine详解 (图解+秒懂+史上最全)

    文章很长,建议收藏起来,慢慢读! Java 高并发 发烧友社群:疯狂创客圈 奉上以下珍贵的学习资源: 免费赠送 经典图书:<Java高并发核心编程(卷1)> 面试必备 + 大厂必备 +涨薪 ...

  2. Kafka分区副本重分配源码分析

    Kafka分区副本重分配 文章目录 Kafka分区副本重分配 1.前言 2.分区副本重分配流程图 3.分区副本重分配详细分析 3.1 客户端行为 3.1.1 执行副本重分配脚本 3.1.2 解析并验证 ...

  3. 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析

    目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...

  4. 2022-2028年中国自动驾驶系统行业现状调研分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国自动驾驶系统行业市场行业相关概述.中国自 ...

  5. 2022-2028年中国阻尼涂料市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国阻尼涂料行业市场行业相关概述.中国阻尼涂 ...

  6. 2021-2028年中国阻燃装饰行业市场需求与投资规划分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国阻燃装饰行业市场行业相关概述.中国阻燃装 ...

  7. 2022-2028年全球与中国漂白吸水棉市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国漂白吸水棉行业市场行业相关概述.全 ...

  8. 2022-2028年全球与中国青苔清洗剂市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国青苔清洗剂行业市场行业相关概述.全 ...

  9. 2022-2028年全球与中国氢碘化物市场智研瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国氢碘化物行业市场行业相关概述.全球 ...

最新文章

  1. abap判断当前月最后一个工作日_油价正式打响第一枪!今天12月3日,今晚油价迎来大幅暴跌,调价后全国地区油价一览!...
  2. jvm执行引擎全解,java解释器即时编译器,全都讲明白
  3. 在C语言中 下面标识符非法的是,下面哪一项在Java中是非法的标识符? 答案:Youme...
  4. Request/Response【学习笔记03】
  5. 搭载敏捷飞天底座,阿里云专有云敏捷版全面升级
  6. Unity RPG 黑暗之光 问题记录 上 (1-63 地形场景 角色选择 行走 相机跟随、旋转、缩放 任务系统 面板栏 背包系统 状态系统)
  7. HaaS EDU K1 快速搭建Python开发环境
  8. 【Turtlrbot3-burger】从零开始配置Turtlrbot3小车1
  9. StatsD,collected,fluentd和其他守护程序
  10. Proximity sensor---Px318J
  11. wps 写论文时 参考文献的横线怎么消除
  12. 宇视科技实习生笔试面试经历
  13. java正则表达式初探——java.util.regex.Pattern类
  14. Fine-Gray检验、竞争风险模型、列线图绘制
  15. 记录Spring cloud alibaba Nacos学习
  16. .NET网站本机调试通过、发布后EXCEL导入数据库报错问题的解决
  17. android和Mac共享文件,这可能是 Mac 共享文件最详细的教程了
  18. 社群网站广告有效了!竟是普通的陈列广告+小型线上活动
  19. 微信小程序删除文件Page剩余
  20. bzoj1455罗马游戏*

热门文章

  1. 怎么设置苹果手机的小圆点_iPhone屏幕旋转怎么设置?关于苹果手机设置的一些小技巧...
  2. Nacos简介和安装
  3. 层净高怎么算_层高和净高怎么算,标准是多少?
  4. LightGBM常用模板
  5. c c和java最大的区别是什么,“一般”和“一般”之间的区别是什么,类型在C ++和Java?...
  6. java treemap lastkey,java.util.TreeMap.higherKey()方法实例
  7. ENVI二次开发时的注意事项
  8. python读取多个文件夹中的音频文件_Python3.7 读取音频根据文件名生成脚本的代码...
  9. 在线考试 ajax,关于在线考试使用ajax一问?
  10. 鸿蒙系统-手机-JS FA(Feature Ability)调用Java PA(Particle Ability)