本文依然是以kafka0.8.2.2为例讲解

一,如何删除一个topic

删除一个topic有两个关键点:

1,配置删除参数

delete.topic.enable这个Broker参数配置为True。

2,执行

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

假如不配置删除参数为true的话,topic其实并没有被清除,只是被标记为删除。此时,估计一般人的做法是删除topic在Zookeeper的信息和日志,其实这个操作并不会清除kafkaBroker内存的topic数据。所以,此时最佳的策略是配置删除参数为true然后,重启kafka。

二,重要的类介绍

1,PartitionStateMachine

该类代表分区的状态机。决定者分区的当前状态,和状态转移。四种状态

NonExistentPartition

NewPartition

OnlinePartition

OfflinePartition

2,ReplicaManager

负责管理当前机器的所有副本,处理读写、删除等具体动作。

读写:写获取partition对象,再获取Replica对象,再获取Log对象,采用其管理的Segment对象将数据写入、读出。

3,ReplicaStateMachine

副本的状态机。决定者副本的当前状态和状态之间的转移。一个副本总共可以处于一下几种状态的一种

NewReplica:Crontroller在分区重分配的时候可以创建一个新的副本。只能接受变为follower的请求。前状态可以是NonExistentReplica

OnlineReplica:新启动的分区,能接受变为leader或者follower请求。前状态可以是NewReplica, OnlineReplica or OfflineReplica

OfflineReplica:死亡的副本处于这种状态。前状态可以是NewReplica, OnlineReplica

ReplicaDeletionStarted:分本删除开始的时候处于这种状态,前状态是OfflineReplica

ReplicaDeletionSuccessful:副本删除成功。前状态是ReplicaDeletionStarted

ReplicaDeletionIneligible:删除失败的时候处于这种状态。前状态是ReplicaDeletionStarted

NonExistentReplica:副本成功删除之后处于这种状态,前状态是ReplicaDeletionSuccessful

4,TopicDeletionManager

该类管理着topic删除的状态机

1),TopicCommand通过创建/admin/delete_topics/,来发布topic删除命令。

2),Controller监听/admin/delete_topic子节点变动,开始分别删除topic。想学习交流HashMap,nginx、dubbo、Spring MVC,分布式、高性能高可用、MySQL,redis、jvm、多线程、netty、kafka、的加尉xin(同英):1253431195 扩列获取资料学习,无工作经验不要加哦!

3),Controller有个后台线程负责删除Topic

三,源码彻底解析topic的删除过程

此处会分四个部分:

A),客户端执行删除命令作用

B),不配置delete.topic.enable整个流水的源码

C),配置了delete.topic.enable整个流水的源码

D),手动删除zk上topic信息和磁盘数据

1,客户端执行删除命令

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

进入kafka-topics.sh我们会看到

exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand $@

进入TopicCommand里面,main方法里面

else if(opts.options.has(opts.deleteOpt))
deleteTopic(zkClient, opts)

实际内容是

val topics = getTopics(zkClient, opts)
if (topics.length == 0) {
println(“Topic %s does not exist”.format(opts.options.valueOf(opts.topicOpt)))
}
topics.foreach { topic =>
try {
ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))
在"/admin/delete_topics"目录下创建了一个topicName的节点。

2,假如不配置delete.topic.enable整个流水是

总共有两处listener会响应:

A),TopicChangeListener

B),DeleteTopicsListener

使用topic的删除命令删除一个topic的话,指挥触发DeleteTopicListener。

var topicsToBeDeleted = {
import JavaConversions._
(children: Buffer[String]).toSet
}
val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))
topicsToBeDeleted --= nonExistentTopics
if(topicsToBeDeleted.size > 0) {
info(“Starting topic deletion for topics " + topicsToBeDeleted.mkString(”,"))
// mark topic ineligible for deletion if other state changes are in progress
topicsToBeDeleted.foreach { topic =>
val preferredReplicaElectionInProgress =
controllerContext.partitionsUndergoingPreferredReplicaElection.map(.topic).contains(topic)
val partitionReassignmentInProgress =
controllerContext.partitionsBeingReassigned.keySet.map(
.topic).contains(topic)
if(preferredReplicaElectionInProgress || partitionReassignmentInProgress)
controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
}
// add topic to deletion list
controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
}
由于都会判断delete.topic.enable是否为true,假如不为true就不会执行,为true就进入执行

controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))

controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)

3,delete.topic.enable配置为true

此处与步骤2的区别,就是那两个处理函数。

controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
markTopicIneligibleForDeletion函数的处理为
if(isDeleteTopicEnabled) {
val newTopicsToHaltDeletion = topicsToBeDeleted & topics
topicsIneligibleForDeletion ++= newTopicsToHaltDeletion
if(newTopicsToHaltDeletion.size > 0)
info(“Halted deletion of topics %s”.format(newTopicsToHaltDeletion.mkString(",")))
}
主要是停止删除topic,假如存储以下三种情况

  • Halt delete topic if -
    1. replicas being down
    1. partition reassignment in progress for some partitions of the topic
    1. preferred replica election in progress for some partitions of the topic

enqueueTopicsForDeletion主要作用是更新删除topic的集合,并激活TopicDeleteThread

def enqueueTopicsForDeletion(topics: Set[String]) {
if(isDeleteTopicEnabled) {
topicsToBeDeleted ++= topics
partitionsToBeDeleted ++= topics.flatMap(controllerContext.partitionsForTopic)
resumeTopicDeletionThread()
}
}
在删除线程DeleteTopicsThread的doWork方法中

topicsQueuedForDeletion.foreach { topic =>
// if all replicas are marked as deleted successfully, then topic deletion is done
if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
// clear up all state for this topic from controller cache and zookeeper
completeDeleteTopic(topic)
info(“Deletion of topic %s successfully completed”.format(topic))
}
进入completeDeleteTopic方法中

// deregister partition change listener on the deleted topic. This is to prevent the partition change listener
// firing before the new topic listener when a deleted topic gets auto created
partitionStateMachine.deregisterPartitionChangeListener(topic)
val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic,ReplicaDeletionSuccessful)
// controller will remove this replica from the state machine as well as its partition assignment cache
replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)
val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)
// move respective partition to OfflinePartition and NonExistentPartition state
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)
topicsToBeDeleted -= topic
partitionsToBeDeleted.retain(_.topic != topic)
controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic))
controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic))
controllerContext.removeTopic(topic)
主要作用是解除掉监控分区变动的listener,删除Zookeeper具体节点信息,删除磁盘数据,更新内存数据结构,比如从副本状态机里面移除分区的具体信息。

其实,最终要的是我们的副本磁盘数据是如何删除的。我们重点介绍这个部分。

首次清除的话,在删除线程DeleteTopicsThread的doWork方法中

{
// if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
// TopicDeletionSuccessful. That means, that either given topic haven’t initiated deletion
// or there is at least one failed replica (which means topic deletion should be retried).
if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
// mark topic for deletion retry
markTopicForDeletionRetry(topic)
}

进入markTopicForDeletionRetry
val failedReplicas = controller.replicaStateMachine.replicasInState(topic,ReplicaDeletionIneligible)
info(“Retrying delete topic for topic %s since replicas %s were not successfully deleted”
.format(topic, failedReplicas.mkString(",")))
controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica)
在ReplicaStateMachine的handleStateChanges方法中,调用了handleStateChange,处理OfflineReplica
// send stop replica command to the replica so that it stops fetching from the leader
brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition,deletePartition = false)
接着在handleStateChanges中
brokerRequestBatch.sendRequestsToBrokers(controller.epoch,controllerContext.correlationId.getAndIncrement)
给副本数据存储节点发送StopReplicaKey副本指令,并开始删除数据
stopReplicaRequestMap foreach { case(broker, replicaInfoList) =>
val stopReplicaWithDelete = replicaInfoList.filter(p => p.deletePartition == true).map(i => i.replica).toSet
val stopReplicaWithoutDelete = replicaInfoList.filter(p => p.deletePartition == false).map(i => i.replica).toSet
debug(“The stop replica request (delete = true) sent to broker %d is %s”
.format(broker, stopReplicaWithDelete.mkString(",")))
debug(“The stop replica request (delete = false) sent to broker %d is %s”
.format(broker, stopReplicaWithoutDelete.mkString(",")))
replicaInfoList.foreach { r =>
val stopReplicaRequest = new StopReplicaRequest(r.deletePartition,
Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch,correlationId)
controller.sendRequest(broker, stopReplicaRequest, r.callback)
}
}
stopReplicaRequestMap.clear()
Broker的KafkaApis的Handle方法在接受到指令后
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)

val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
接着是在stopReplicas方法中
{
controllerEpoch = stopReplicaRequest.controllerEpoch
// First stop fetchers for all partitions, then stop the corresponding replicas
replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map(r =>TopicAndPartition(r.topic, r.partition)))
for(topicAndPartition <- stopReplicaRequest.partitions){
val errorCode = stopReplica(topicAndPartition.topic, topicAndPartition.partition,stopReplicaRequest.deletePartitions)
responseMap.put(topicAndPartition, errorCode)
}
(responseMap, ErrorMapping.NoError)
}
进一步进入stopReplica方法,正式进入日志删除
getPartition(topic, partitionId) match {
case Some(partition) =>
if(deletePartition) {
val removedPartition = allPartitions.remove((topic, partitionId))
if (removedPartition != null)
removedPartition.delete() // this will delete the local log
}
以上就是kafka的整个日志删除流水。

4,手动删除zk上topic信息和磁盘数据

TopicChangeListener会监听处理,但是处理很简单,只是更新了

val deletedTopics = controllerContext.allTopics – currentChildren
controllerContext.allTopics = currentChildren

val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient,newTopics.toSeq)
controllerContext.partitionReplicaAssignment =controllerContext.partitionReplicaAssignment.filter(p =>
四,总结

Kafka的topic的删除过程,实际上就是基于Zookeeper做了一个订阅发布系统。Zookeeper的客户端创建一个节点/admin/delete_topics/,由kafka Controller监听到事件之后正式触发topic的删除:解除Partition变更监听的listener,清除内存数据结构,删除副本数据,删除topic的相关Zookeeper节点。想学习交流HashMap,nginx、dubbo、Spring MVC,分布式、高性能高可用、MySQL,redis、jvm、多线程、netty、kafka、的加尉xin(同英):1253431195 扩列获取资料学习,无工作经验不要加哦!

delete.topic.enable配置该参数为false的情况下执行了topic的删除命令,实际上未做任何动作。我们此时要彻底删除topic建议修改该参数为true,重启kafka,这样topic信息会被彻底删除,已经测试。

一般流行的做法是手动删除Zookeeper的topic相关信息及磁盘数据但是这样的话会造成部分内存数据未清除。至于是否会有隐患,未测试。

源码解析kafka删除topic相关推荐

  1. 【kafka】Kafka 源码解析:Group 协调管理机制

    1.概述 转载:Kafka 源码解析:Group 协调管理机制 在 Kafka 的设计中,消费者一般都有一个 group 的概念(当然,也存在不属于任何 group 的消费者),将多个消费者组织成一个 ...

  2. kafka源码_十年资深架构师多年工作经验结晶:Kafka源码解析与实战

    前言 本篇系统介绍Kafka的实现原理和应用方法,并介绍Kafka的运维工具.客户端编程方法和第三方集成方式,深入浅出.图文并茂.分析透彻. 本篇将从初学者的角度出发,循序渐进地讲解Kafka内部的实 ...

  3. Kafka核心源码解析 - KafkaController源码解析

    在进入源码解析之前,我先来介绍一些KafkaController在Kafka集群中的作用. (1)负责监听zookeeper上所有的元数据变更请求: (2)负责topic的partition迁移任务分 ...

  4. Kafka生产者和消费者分区策略部分源码解析

    之前我在看其他的博客时,发现对于kafka consumer的RoundRobin的缺点分析中,有两种观点,一种认为缺点在于如果消费者组中消费者消费的主题不同,或者消费者线程数不同,那么会造成消费者消 ...

  5. hashmap删除指定key_Java集合之HashMap源码解析(JDK8)

    哈希表(hash table)也叫散列表,是一种非常重要的数据结构,应用场景非常丰富,许多缓存技术(比如memcached)的核心其实就是在内存中维护一张大的哈希表,而HashMap的实现原理也常常出 ...

  6. kafka源码_Kafka日志段源码解析

    1 Kafka 日志结构 kafka 日志在磁盘上的组织架构如下: Kafka 日志对象由多个日志段对象组成,每个日志段对象在磁盘上创建一组文件,包括: 日志文件(.log) 索引文件(.index) ...

  7. 多线程与高并发(八):ThreadPoolExecutor源码解析, SingleThreadPool,CachedPool,FixedThreadPool,ForkJoinPoll 等

    线程池 今天我们来看看JDK给我们提供的默认的线程池的实现. ThreadPoolExecutor:我们通常所说的线程池.多个线程共享同一个任务队列. SingleThreadPool CachedP ...

  8. loraserver 源码解析 (六) lora-app-server

    目录 下载源码 升级 npm 安装一些必要的依赖库 pq_trgm extension run 调用 handleDataDownPayloads 开启一个Goroutine  G1 run再调用 s ...

  9. hashmap与concurrenthashmap源码解析

    hashmap源码解析转载:http://www.cnblogs.com/ITtangtang/p/3948406.html 一.HashMap概述 HashMap基于哈希表的 Map 接口的实现.此 ...

  10. Flink 全网最全资源(视频、博客、PPT、入门、原理、实战、性能调优、源码解析、问答等持续更新)

    Flink 学习 https://github.com/zhisheng17/flink-learning 麻烦路过的各位亲给这个项目点个 star,太不易了,写了这么多,算是对我坚持下来的一种鼓励吧 ...

最新文章

  1. 业界 | Facebook F8开发者大会首日:扎克伯格走心演讲,VR硬件发售
  2. 基于Salmon的转录组定量流程
  3. Azkaban通过API动态传递参数
  4. Effective STL中文版pdf
  5. FFmpeg入门测试
  6. QT:QByteArray和QByteArray、char *(转)
  7. PyTorch学习—9.模型容器与AlexNet构建
  8. Eclipse,新建web项目后 出现jax-ws webservice
  9. 这样的科幻不该被埋没,吐血推荐!
  10. 微信小程序picker选择器(下拉框)以及传值问题
  11. ​Spring Cloud:统一异常处理
  12. 知乎上演的“变形计“,资本市场会打几分?
  13. 哪个快递能寄液晶显示器啊?
  14. MySQL优化系列3-Linux查看CPU、内存、磁盘、网络信息
  15. 多彩的产品之年——产品经理一席谈
  16. U-boot中怎么添加配置菜单选项
  17. python输入二维数组_Python输入二维数组方法
  18. 苹果笔记本能不能用python_苹
  19. org.apache.shiro.session.ExpiredSessionException: Session with id异常排查
  20. 手写一个vue中英文翻译组件

热门文章

  1. python网站攻击-利用Python进行Web渗透测试(十):密码攻击
  2. 百度和谷歌到底有什么区别?看完终于明白了!
  3. redis设计秒杀活动图解
  4. 微信活码系统源码/微信群二维码/活码生成网站系统/生成微信活码
  5. php加载COM组件失败原因及其解决方法
  6. 浅谈Spring事件监听
  7. 软件测试的术语SRS,HLD,LLD,BD,FD,DD意义
  8. html+圆角梯形,用CSS圆角梯形
  9. 【应届生必看】技术岗面试应答有哪些话术和技巧?
  10. 海龟交易法的“道”和“术”