本文主要讲述一个 topic 的创建过程,从 topic 是如何创建到 topic 真正创建成功的中间详细过程,文章主要内容可以分为以下几个部分:

1,topic 是如何创建的?

  • 命令行创建;
  • Producer 发送数据时,自动创建;

2,topic创建时,replicas 是如何分配的?

  • 指定 replicas 的分配;
  • 自动 replicas 分配;

3,replicas 更新到 zk 后,底层如何创建一个 topic?

  • 创建 Partition 对象及状态更新;
  • 创建 Partition 的 replica 对象及状态更新。

topic 介绍

topic 是 Kafka 中的一个消息队列的标识,也可以认为是消息队列的一个 id,用于区分不同的消息队列,一个 topic 由多个 partition 组成,这些 partition 是通常是分布在不同的多台 Broker 上的,为了保证数据的可靠性,一个 partition 又会设置为多个副本(replica),通常会设置两副本或三副本。如下图所示,这个一个名为『topic』的 topic,它由三个 partition 组成,两副本,假设 Kafka 集群有三台 Broker(replica 0_1 代表 partition 0 的第一个副本)。

在设置副本时,副本数是必须小于集群的 Broker 数的,副本只有设置在不同的机器上才有作用。

topic 如何创建

topic 在创建时有两种方式:

  1. 通过 kafka-topics.sh 创建一个 topic,可以设置相应的副本数让 Server 端自动进行 replica 分配,也可以直接指定手动 replica 的分配。
  2. Server 端如果 auto.create.topics.enable 设置为 true 时,那么当 Producer 向一个不存在的 topic 发送数据时,该 topic 同样会被创建出来,此时,副本数默认是1。

下面看一下这两种方式的底层实现。

kafka-topics.sh 创建 topic

在 Kafka 的安装目录下,通过下面这条命令可以创建一个 partition 为3,replica 为2的 topic(test)

./bin/kafka-topics.sh --create --topic test --zookeeper XXXX --partitions 3 --replication-factor 2

kafka-topics.sh 实际上是调用 kafka.admin.TopicCommand 的方法来创建 topic,其实现如下:

//note: 创建 topic
def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {val topic = opts.options.valueOf(opts.topicOpt)val configs = parseTopicConfigsToBeAdded(opts)val ifNotExists = opts.options.has(opts.ifNotExistsOpt)if (Topic.hasCollisionChars(topic))println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.")try {if (opts.options.has(opts.replicaAssignmentOpt)) {//note: 指定 replica 的分配,直接向 zk 更新即可val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)} else {//note: 未指定 replica 的分配,调用自动分配算法进行分配CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)val partitions = opts.options.valueOf(opts.partitionsOpt).intValueval replicas = opts.options.valueOf(opts.replicationFactorOpt).intValueval rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabledelse RackAwareMode.EnforcedAdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)}println("Created topic \"%s\".".format(topic))} catch  {case e: TopicExistsException => if (!ifNotExists) throw e}
}

如果指定了 partition 各个 replica 的分布,那么将 partition replicas 的结果验证之后直接更新到 zk 上,验证的 replicas 的代码是在 parseReplicaAssignment 中实现的,如下所示

def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {val partitionList = replicaAssignmentList.split(",")val ret = new mutable.HashMap[Int, List[Int]]()for (i <- 0 until partitionList.size) {val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)val duplicateBrokers = CoreUtils.duplicates(brokerList)if (duplicateBrokers.nonEmpty)//note: 同一个 partition 对应的 replica 是不能相同的throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: %s".format(duplicateBrokers.mkString(",")))ret.put(i, brokerList.toList)if (ret(i).size != ret(0).size)//note: 同一个 topic 的副本数必须相同throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList)}ret.toMap
}

如果没有指定 parittion replicas 分配的话,将会调用 AdminUtils.createTopic 方法创建 topic,这个方法首先会检测当前的 Kafka 集群是否机架感知,如果有的话先获取 Broker 的机架信息,接着再使用 Replica 自动分配算法来分配 Partition 的 replica,最后就跟指定 replica 方式一样,将 replicas 的结果更新到 zk 中。

def createTopic(zkUtils: ZkUtils,topic: String,partitions: Int,replicationFactor: Int,topicConfig: Properties = new Properties,rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)//note: 有机架感知的情况下,返回 Broker 与机架之间的信息val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)//note: 获取 partiiton 的 replicas 分配AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)//note: 更新到 zk 上
}

Producer 创建 topic

只有当 Server 端的 auto.create.topics.enable 设置为 true 时,Producer 向一个不存在的 topic 发送数据,该 topic 才会被自动创建。

当 Producer 在向一个 topic 发送 produce 请求前,会先通过发送 Metadata 请求来获取这个 topic 的 metadata。Server 端在处理 Metadata 请求时,如果发现要获取 metadata 的 topic 不存在但 Server 允许 producer 自动创建 topic 的话(如果开启权限时,要求 Producer 需要有相应权限:对 topic 有 Describe 权限,并且对当前集群有 Create 权限),那么 Server 将会自动创建该 topic.

//note: 获取 topic 的 metadata 信息
private def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = {val topicResponses = metadataCache.getTopicMetadata(topics, listenerName, errorUnavailableEndpoints)if (topics.isEmpty || topicResponses.size == topics.size) {topicResponses} else {val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet//note: 集群上暂时不存在的 topic 列表val responsesForNonExistentTopics = nonExistentTopics.map { topic =>if (topic == Topic.GroupMetadataTopicName) {createGroupMetadataTopic()} else if (config.autoCreateTopicsEnable) {//note: auto.create.topics.enable 为 true 时,即允许自动创建 topiccreateTopic(topic, config.numPartitions, config.defaultReplicationFactor)} else {new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false,java.util.Collections.emptyList())}}topicResponses ++ responsesForNonExistentTopics}
}

其中 createTopic 还是调用了 AdminUtils.createTopic 来创建 topic,与命令行创建的底层实现是一样。

private def createTopic(topic: String,numPartitions: Int,replicationFactor: Int,properties: Properties = new Properties()): MetadataResponse.TopicMetadata = {try {//note: 还是调用 AdminUtils 命令创建 topicAdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)info("Auto creation of topic %s with %d partitions and replication factor %d is successful".format(topic, numPartitions, replicationFactor))new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, Topic.isInternal(topic),java.util.Collections.emptyList())} catch {case _: TopicExistsException => // let it go, possibly another broker created this topicnew MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, Topic.isInternal(topic),java.util.Collections.emptyList())case ex: Throwable  => // Catch all to prevent unhandled errorsnew MetadataResponse.TopicMetadata(Errors.forException(ex), topic, Topic.isInternal(topic),java.util.Collections.emptyList())}
}

replica 如何分配

通过前面的内容,可以看到,无论使用哪种方式,最后都是通过 AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK() 将 topic 的 Partition replicas 的更新到 zk 上,这中间关键的一点在于:Partition 的 replicas 是如何分配的。在创建时,我们既可以指定相应 replicas 分配,也可以使用默认的算法自动分配。

创建时指定 replicas 分配

在创建 topic 时,可以通过以下形式直接指定 topic 的 replica

./bin/kafka-topics.sh --create --topic test --zookeeper XXXX --replica-assignment 1:2,3:4,5:6

该 topic 有三个 partition,其中,partition 0 的 replica 分布在1和2上,partition 1 的 replica 分布在3和4上,partition 3 的 replica 分布在4和5上。

这样情况下,在创建 topic 时,Server 端会将该 replica 分布直接更新到 zk 上。

replicas 自动分配算法

在创建 topic 时,Server 通过 AdminUtils.assignReplicasToBrokers() 方法来获取该 topic partition 的 replicas 分配。

/*** 副本分配时,有三个原则:* 1. 将副本平均分布在所有的 Broker 上;* 2. partition 的多个副本应该分配在不同的 Broker 上;* 3. 如果所有的 Broker 有机架信息的话, partition 的副本应该分配到不同的机架上。** 为实现上面的目标,在没有机架感知的情况下,应该按照下面两个原则分配 replica:* 1. 从 broker.list 随机选择一个 Broker,使用 round-robin 算法分配每个 partition 的第一个副本;* 2. 对于这个 partition 的其他副本,逐渐增加 Broker.id 来选择 replica 的分配。** @param brokerMetadatas* @param nPartitions* @param replicationFactor* @param fixedStartIndex* @param startPartitionId* @return*/def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata],nPartitions: Int,replicationFactor: Int,fixedStartIndex: Int = -1,startPartitionId: Int = -1): Map[Int, Seq[Int]] = {if (nPartitions <= 0) // note: 要增加的 partition 数需要大于0throw new InvalidPartitionsException("number of partitions must be larger than 0")if (replicationFactor <= 0) //note: replicas 应该大于0throw new InvalidReplicationFactorException("replication factor must be larger than 0")if (replicationFactor > brokerMetadatas.size) //note: replicas 超过了 broker 数throw new InvalidReplicationFactorException(s"replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}")if (brokerMetadatas.forall(_.rack.isEmpty))//note: 没有开启机架感知assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex,startPartitionId)else { //note: 机架感知的情况if (brokerMetadatas.exists(_.rack.isEmpty)) //note: 并不是所有的机架都有机架感知throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment")assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex,startPartitionId)}}

这里没有开启机架感知模式来介绍 topic partition replicas 的分配情况,其分配算法主要是 assignReplicasToBrokersRackUnaware() 方法中实现。

//note: partition 分配private def assignReplicasToBrokersRackUnaware(nPartitions: Int,replicationFactor: Int,brokerList: Seq[Int],fixedStartIndex: Int,startPartitionId: Int): Map[Int, Seq[Int]] = {val ret = mutable.Map[Int, Seq[Int]]()val brokerArray = brokerList.toArrayval startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length) //note: 随机选择一个Brokervar currentPartitionId = math.max(0, startPartitionId) //note: 开始增加的第一个 partitionvar nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)for (_ <- 0 until nPartitions) { //note: 对每个 partition 进行分配if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))nextReplicaShift += 1 //note: 防止 partition 过大时,其中某些 partition 的分配(leader、follower)完全一样val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length //note: partition 的第一个 replicaval replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))for (j <- 0 until replicationFactor - 1) //note: 其他 replica 的分配replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))ret.put(currentPartitionId, replicaBuffer)currentPartitionId += 1}ret}//note: 为 partition 设置完第一个 replica 后,其他 replica 分配的计算private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)//note: 在 secondReplicaShift 的基础上增加一个 replicaIndex(firstReplicaIndex + shift) % nBrokers}

这里举一个栗子,假设一个 Kafka 集群有5个节点,新建的 topic 有10个 partition,并且是三副本,假设最初随机选择的 startIndex 和 nextReplicaShift 节点均为0

  • partition 为0时,那第一副本在 (0+0)%5=0,第二个副本在 (0+(1+(0+0)%4)))%5=1,第三副本在 (0+(1+(0+1)%4)))%5=2;
  • partition 为2时,那第一副本在 (0+2)%5=2,第二个副本在 (2+(1+(0+0)%4)))%5=3,第三副本在 (2+(1+(0+1)%4)))%5=4;
  • partition 为5时,那第一副本在 (0+5)%5=0,第二个副本在 (0+(1+(1+0)%4)))%5=2,第三副本在 (0+(1+(1+1)%4)))%5=3(partition 数是 Broker 数一倍时,nextReplicaShift 值会增加1);
  • partition 为8时,那第一副本在 (0+8)%5=3,第二个副本在 (3+(1+(1+0)%4)))%5=0,第三副本在 (3+(1+(1+1)%4)))%5=1。

分配如下表所示:

replicas 更新到 zk 后触发的操作

这一部分的内容是由 Kafka Controller 来控制的(Kafka Controller 将会在后续文章中讲解),当一个 topic 的 replicas 更新到 zk 上后,监控 zk 这个目录的方法会被触发(TopicChangeListener.doHandleChildChange()方法)

//note: 当 zk 上 topic 节点上有变更时,这个方法就会调用def doHandleChildChange(parentPath: String, children: Seq[String]) {inLock(controllerContext.controllerLock) {if (hasStarted.get) {try {val currentChildren = {debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))children.toSet}val newTopics = currentChildren -- controllerContext.allTopics//note: 新创建的 topic 列表val deletedTopics = controllerContext.allTopics -- currentChildren//note: 已经删除的 topic 列表controllerContext.allTopics = currentChildren//note: 新创建 topic 对应的 partition 列表val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>!deletedTopics.contains(p._1.topic))//note: 把已经删除 partition 过滤掉controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)//note: 将新增的 tp-replicas 更新到缓存中info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,deletedTopics, addedPartitionReplicaAssignment))if (newTopics.nonEmpty)//note: 处理新建的 topiccontroller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)} catch {case e: Throwable => error("Error while handling new topic", e)}}}}

这个方法主要做了以下内容:

  • 获取 zk 的 topic 变更信息,得到新创建的 topic 列表(newTopics)以及被删除的 topic 列表(deletedTopics);
  • 将 deletedTopics 的 replicas 从 controller 的缓存中删除,并将新增 topic 的 replicas 更新到 controller 的缓存中;
  • 调用 KafkaController 的 onNewTopicCreation() 创建 partition 和 replica 对象。

KafkaController 中 onNewTopicCreation() 方法先对这些 topic 注册 PartitionChangeListener,然后再调用 onNewPartitionCreation() 方法创建 partition 和 replicas 的实例对象,topic 创建的主要实现是在 KafkaController onNewPartitionCreation() 这个方法中。

//note: 当 partition state machine 监控到有新 topic 或 partition 时,这个方法将会被调用/*** 1. 注册 partition change listener;* 2. 触发 the new partition callback,也即是 onNewPartitionCreation()* 3. 发送 metadata 请求给所有的 Broker* @param topics* @param newPartitions*/def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {info("New topic creation callback for %s".format(newPartitions.mkString(",")))// subscribe to partition changestopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))onNewPartitionCreation(newPartitions)}//note: topic 变化时,这个方法将会被调用//note: 1. 将新创建的 partition 置为 NewPartition 状态; 2.从 NewPartition 改为 OnlinePartition 状态//note: 1. 将新创建的 Replica 置为 NewReplica 状态; 2.从 NewReplica 改为 OnlineReplica 状态def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {info("New partition creation callback for %s".format(newPartitions.mkString(",")))partitionStateMachine.handleStateChanges(newPartitions, NewPartition)replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)}

在详细介绍这四个方法的调用之前,先简单详述一下 Partition 和 Replica 状态机的变化。

Partition 状态机

关于 Partition 状态的变化可以参考 Kafka 中的这个方法 PartitionStateMachine,状态机的具体转换情况,一个 Partition 对象有四种状态:

  1. NonExistentPartition:这个 partition 不存在;
  2. NewPartition:这个 partition 刚创建,有对应的 replicas,但还没有 leader 和 isr;
  3. OnlinePartition:这个 partition 的 leader 已经选举出来了,处理正常的工作状态;
  4. OfflinePartition:partition 的 leader 挂了。

partition 只有在 OnlinePartition 这个状态时,才是可用状态。

Replica 状态机

关于 Replica 状态的变化可以参考 Kafka 中的这个方法 ReplicaStateMachine,,状态机的具体转换情况,Replica 对象有七种状态,中文解释的比较难以理解,直接上原文对这几种状态的解释。

  1. NewReplica:The controller can create new replicas during partition reassignment. In this state, a replica can only get become follower state change request.
  2. OnlineReplica:Once a replica is started and part of the assigned replicas for its partition, it is in this state. In this state, it can get either become leader or become follower state change requests.
  3. OfflineReplica:If a replica dies, it moves to this state. This happens when the broker hosting the replica is down.
  4. ReplicaDeletionStarted:If replica deletion starts, it is moved to this state.
  5. ReplicaDeletionSuccessful:If replica responds with no error code in response to a delete replica request, it is moved to this state.
  6. ReplicaDeletionIneligible:If replica deletion fails, it is moved to this state.
  7. NonExistentReplica:If a replica is deleted successfully, it is moved to this state.

onNewPartitionCreation() 详解

这个方法有以下四步操作:

  1. partitionStateMachine.handleStateChanges(newPartitions, NewPartition): 创建 Partition 对象,并将其状态置为 NewPartition 状态。
  2. replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica):创建 Replica 对象,并将其状态置为 NewReplica 状态;
  3. partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector):将 partition 对象从 NewPartition 改为 OnlinePartition 状态;
  4. replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica):将 Replica 对象从 NewReplica 改为 OnlineReplica 状态。

partitionStateMachine > NewPartition

这部分的作用是,创建分区对象,并将其状态设置为 NewPartition。

case NewPartition =>//note: 新建一个 partitionassertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)partitionState.put(topicAndPartition, NewPartition) //note: 缓存 partition 的状态val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s".format(controllerId, controller.epoch, topicAndPartition, currState, targetState,assignedReplicas))

replicaStateMachine > NewReplica

这部分是为每个 Partition 创建对应的 replica 对象,并将其状态设置为 NewReplica,参照状态机的变化图更好理解。

case NewReplica =>assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)  //note: 验证// start replica as a follower to the current leader for its partitionval leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)leaderIsrAndControllerEpochOpt match {case Some(leaderIsrAndControllerEpoch) =>if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)//note: 这个状态的 Replica 不能作为 leaderthrow new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica".format(replicaId, topicAndPartition) + "state as it is being requested to become leader")//note: 向所有 replicaId 发送 LeaderAndIsr 请求,这个方法同时也会向所有的 broker 发送 updateMeta 请求brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),topic, partition, leaderIsrAndControllerEpoch,replicaAssignment)case None => // new leader request will be sent to this replica when one gets elected

partitionStateMachine > OnlinePartition

这个方法的主要的作用是将 partition 对象的状态由 NewPartition 设置为 OnlinePartition,从状态机图中可以看到,会有以下两步操作:

  1. 初始化 leader 和 isr,replicas 中的第一个 replica 将作为 leader,所有 replica 作为 isr,并把 leader 和 isr 信息更新到 zk;
  2. 发送 LeaderAndIsr 请求给所有的 replica,发送 UpdateMetadata 给所有 Broker。

具体操作如下:

// post: partition has been assigned replicascase OnlinePartition =>assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)partitionState(topicAndPartition) match {case NewPartition =>// initialize leader and isr path for new partitioninitializeLeaderAndIsrForPartition(topicAndPartition) //note: 为新建的 partition 初始化 leader 和 isrcase OfflinePartition =>electLeaderForPartition(topic, partition, leaderSelector)case OnlinePartition => // invoked when the leader needs to be re-electedelectLeaderForPartition(topic, partition, leaderSelector)case _ => // should never come here since illegal previous states are checked above}

实际的操作是在 initializeLeaderAndIsrForPartition() 方法中完成,这个方法是当 partition 对象的状态由 NewPartition 变为 OnlinePartition 时触发的,用来初始化该 partition 的 leader 和 isr。简单来说,就是选取 Replicas 中的第一个 Replica 作为 leader,所有的 Replica 作为 isr,最后调用 brokerRequestBatch.addLeaderAndIsrRequestForBrokers 向所有 replicaId 发送 LeaderAndIsr 请求以及向所有的 broker 发送 UpdateMetadata 请求(关于 Server 对 LeaderAndIsr 和 UpdateMetadata 请求的处理将会后续文章中讲述)。

//note: 当 partition 状态由 NewPartition 变为 OnlinePartition 时,将触发这一方法,用来初始化 partition 的 leader 和 isrprivate def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) {val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))liveAssignedReplicas.size match {case 0 =>val failMsg = ("encountered error during state change of partition %s from New to Online, assigned replicas are [%s], " +"live brokers are [%s]. No assigned replica is alive.").format(topicAndPartition, replicaAssignment.mkString(","), controllerContext.liveBrokerIds)stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)throw new StateChangeFailedException(failMsg)case _ =>debug("Live assigned replicas for partition %s are: [%s]".format(topicAndPartition, liveAssignedReplicas))// make the first replica in the list of assigned replicas, the leaderval leader = liveAssignedReplicas.head //note: replicas 中的第一个 replica 选做 leaderval leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList),controller.epoch)debug("Initializing leader and isr for partition %s to %s".format(topicAndPartition, leaderIsrAndControllerEpoch))try {zkUtils.createPersistentPath(getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),zkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))//note: zk 上初始化节点信息// NOTE: the above write can fail only if the current controller lost its zk session and the new controller// took over and initialized this partition. This can happen if the current controller went into a long// GC pausecontrollerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch)brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment)//note: 向 live 的 Replica 发送  LeaderAndIsr 请求} catch {case _: ZkNodeExistsException =>// read the controller epochval leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topicAndPartition.topic,topicAndPartition.partition).getval failMsg = ("encountered error while changing partition %s's state from New to Online since LeaderAndIsr path already " +"exists with value %s and controller epoch %d").format(topicAndPartition, leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch)stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)throw new StateChangeFailedException(failMsg)}}

replicaStateMachine > OnlineReplica

这一步也就是最后一步,将 Replica 对象的状态由 NewReplica 更新为 OnlineReplica 状态,这些 Replica 才真正可用。

case OnlineReplica =>assertValidPreviousStates(partitionAndReplica,List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)replicaState(partitionAndReplica) match {case NewReplica =>// add this replica to the assigned replicas list for its partition//note: 向 the assigned replicas list 添加这个 replica(正常情况下这些 replicas 已经更新到 list 中了)val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)if(!currentAssignedReplicas.contains(replicaId))controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s".format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,targetState))

一直到这一步,一个 topic 就才算真正被创建完成。

吃透Kafka六:topic 创建过程相关推荐

  1. 95-25-010-命令-topic 创建过程

    文章目录 1.视界 1. topic 介绍 2.topic 如何创建 3. kafka-topics.sh创建 topic 3.1 createTopic 3.1.1 获取broker和机架信息 3. ...

  2. 4.2.10 Kafka源码剖析, 阅读环境搭建, broker启动流程, topic创建流程, Producer生产者流程, Consumer消费者流程,

    目录 4.1 Kafka源码剖析之源码阅读环境搭建 4.1.1 安装配置Gradle 4.1.2 Scala的安装和配置 4.1.3 Idea配置 4.1.4 源码操作 4.2 Kafka源码剖析之B ...

  3. Kafka解析之topic创建(2)

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  4. Kafka解析之topic创建(3)——合法性验证

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  5. kafka集群配置之kafka启动了创建topic报错:Replication factor: 1 larger than avail

    大家好, 我是上白书妖! 知识源于积累,登峰造极源于自律 今天我根据以前所以学的一些文献,笔记等资料整理出一些小知识点,有不当之处,欢迎各位斧正 kafka启动了创建topic报错:Replicati ...

  6. kafka单节点创建 topic 超时

    kafka单节点创建 topic 超时 单节点kafka创建topic时报错:Timed out waiting for a node assignment. Call: createTopics 经 ...

  7. kafka安装及配置过程

    来来先扫一下,一码不扫何以扫天下 简介 Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目.Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量, ...

  8. 大数据面试重点之kafka(六)

    大数据面试重点之kafka(六) Kafka分区分配算法 可回答:Kafka的partition分区策略问过的一些公司:阿里云,小米参考答案: 1.生产者分区分配策略 生产者在将消息发送到某个Topi ...

  9. Kafka中topic的Partition,Kafka为什么这么快,Consumer的负载均衡及consumerGroup的概念(来自学习笔记)

    1.1. Kafka中topic的Partition  在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic ...

  10. kafka专题:kafka的Topic主题、Partition分区、消费组偏移量offset等基本概念详解

    文章目录 1. kafka集群整体架构 2. kafka相关元素的基本概念 2.1 主题Topic和分区Partition 2.2 kafka消息存储在哪里? 2.3 分区副本 2.4 消费组和偏移量 ...

最新文章

  1. 自动化测试框架:几行代码轻松解决Appium环境问题
  2. Spark快速上手-WordCount案例
  3. mask - 使用 * 遮蔽字符串
  4. 怎么将arcgis新建工具条如何保存_ArcGIS中寻找最短路径的方法
  5. vue 无法进入response拦截器_vue拦截器的一次实践
  6. Java IdentityHashMap size()方法与示例
  7. Android studio 插件安装
  8. 7_python基础—while循环应用1-100累加和
  9. 一起来学习.net core程序使用中介者模式:MediatR插件
  10. Deep Learning for 3D Recognition
  11. 连肝7个晚上,总结了66条计算机网络的知识点
  12. 【ECSHOP插件】ECSHOP商品相册批量上传插件
  13. SPFA算法模板(刘汝佳版)--Wormholes POJ - 3259
  14. linux 声音设置,Linux aumix设置音效装置命令详解
  15. 2021年焊工(初级)考试报名及焊工(初级)实操考试视频
  16. STM8 的汇编学习
  17. java 子类属性覆盖_java子类和父类属性重复问题
  18. win7 如何锁定计算机,Win7系统如何锁定计算机
  19. Redis和Mecahe的简介
  20. CentOS6.5 安装CPAN,从而安装perl的各种模块

热门文章

  1. WIN10如果将电脑网络分享给iphone
  2. 解决IP地址冲突的方法
  3. sql-lab(ALL 1-65)
  4. nginx 连接php
  5. 裂变活动的5个关键步骤?
  6. 基于ROS2多机器人编程资料
  7. matlab 模拟光源,基于 Matlab 的激光光斑模拟.pdf
  8. 中国民营500强企业爬取数据展示
  9. Windows系统如何远程桌面连接
  10. Easy Iot实现MQTT实验