Kafka Partition重分配流程简析
节日快乐~
今天是属于广大程序员的节日,祝自己快乐hhhhhh
随着业务量的急速膨胀和又一年双11的到来,我们会对现有的Kafka集群进行扩容,以应对更大的流量和业务尖峰。当然,扩容之后的新Kafka Broker默认是不会有任何Topic和Partition的,需要手动利用分区重分配命令kafka-reassign-partitions
将现有的Partition/Replica平衡到新的Broker上去。那么Kafka具体是如何执行重分配流程的呢?本文就来简单解读一下。
生成、提交重分配方案
我们知道,使用kafka-reassign-partitions
命令分为三步,一是根据指定的Topic生成JSON格式的重分配方案(--generate
),二是将生成的方案提交执行(--execute
),三是观察重分配的进度(--verify
),它们分别对应kafka.admin.ReassignPartitionsCommand类中的generateAssignment()、executeAssignment()和verifyAssignment()方法。
generateAssignment()方法会调用AdminUtils#assignReplicasToBrokers()方法生成Replica分配方案。源码就不再读了,其原则简述如下:
- 将Replica尽量均匀地分配到各个Broker上去;
- 一个Partition的所有Replica必须位于不同的Broker上;
- 如果Broker有机架感知(rack aware)的信息,将Partition的Replica尽量分配到不同的机架。
executeReassignment()方法调用了reassignPartitions()方法,其源码如下。
def reassignPartitions(throttle: Throttle = NoThrottle, timeoutMs: Long = 10000L): Boolean = {maybeThrottle(throttle)try {val validPartitions = proposedPartitionAssignment.filter { case (p, _) => validatePartition(zkUtils, p.topic, p.partition) }if (validPartitions.isEmpty) falseelse {if (proposedReplicaAssignment.nonEmpty) {val adminClient = adminClientOpt.getOrElse(throw new AdminCommandFailedException("bootstrap-server needs to be provided in order to reassign replica to the specified log directory"))val alterReplicaDirResult = adminClient.alterReplicaLogDirs(proposedReplicaAssignment.asJava, new AlterReplicaLogDirsOptions().timeoutMs(timeoutMs.toInt))alterReplicaDirResult.values().asScala.foreach { case (replica, future) => {try {future.get()throw new AdminCommandFailedException(s"Partition ${replica.topic()}-${replica.partition()} already exists on broker ${replica.brokerId()}." +s" Reassign replica to another log directory on the same broker is currently not supported.")} catch {case t: ExecutionException =>t.getCause match {case e: ReplicaNotAvailableException => // It is OK if the replica is not availablecase e: Throwable => throw e}}}}}val jsonReassignmentData = ZkUtils.formatAsReassignmentJson(validPartitions)zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData)true}} catch {// ......}
}
在进行必要的Partition校验之后,创建ZK持久节点/admin/reassign_partitions
,并将JSON格式的重分配方案写进去。如果该节点存在,就表示已经在进行重分配,不能再启动新的重分配流程(相关的判断在executeReassignment()方法中)。
监听并处理重分配事件
在之前讲解Kafka Controller时,笔者提到Controller会注册多个ZK监听器,将监听到的事件投递到内部的事件队列,并由事件处理线程负责处理。监听ZK中/admin/reassign_partitions
节点的监听器为PartitionReassignmentListener,并产生PartitionReassignment事件,处理逻辑如下。
case object PartitionReassignment extends ControllerEvent {def state = ControllerState.PartitionReassignmentoverride def process(): Unit = {if (!isActive) returnval partitionReassignment = zkUtils.getPartitionsBeingReassigned()val partitionsToBeReassigned = partitionReassignment.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))partitionsToBeReassigned.foreach { case (partition, context) =>if(topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)) {error(s"Skipping reassignment of partition $partition since it is currently being deleted")removePartitionFromReassignedPartitions(partition)} else {initiateReassignReplicasForTopicPartition(partition, context)}}}
}
该方法先取得需要重分配的Partition列表,然后从中剔除掉那些已经被标记为删除的Topic所属的Partition,再调用initiateReassignReplicasForTopicPartition()方法:
def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,reassignedPartitionContext: ReassignedPartitionsContext) {val newReplicas = reassignedPartitionContext.newReplicasval topic = topicAndPartition.topicval partition = topicAndPartition.partitiontry {val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)assignedReplicasOpt match {case Some(assignedReplicas) =>if (assignedReplicas == newReplicas) {throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +" %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))} else {info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))// first register ISR change listenerwatchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)// mark topic ineligible for deletion for the partitions being reassignedtopicDeletionManager.markTopicIneligibleForDeletion(Set(topic))onPartitionReassignment(topicAndPartition, reassignedPartitionContext)}case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist".format(topicAndPartition))}} catch {case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)// remove the partition from the admin path to unblock the admin clientremovePartitionFromReassignedPartitions(topicAndPartition)}
}
该方法的执行逻辑如下:
- 判断Partition的原有Replica是否与即将重分配的新Replica相同,如果相同则抛出异常;
- 注册即将被重分配的Partition的ISR变化监听器;
- 把即将被重分配的Partition/Replica记录在Controller上下文中的partitionsBeingReassigned集合中;
- 把即将被重分配的Topic标记为不可删除;
- 调用onPartitionReassignment()方法真正触发重分配流程。
执行重分配流程
onPartitionReassignment()方法的代码如下。
def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {val reassignedReplicas = reassignedPartitionContext.newReplicasif (!areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)) {info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +"reassigned not yet caught up with the leader")val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSetval newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet//1. Update AR in ZK with OAR + RAR.updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)//2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR).updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),newAndOldReplicas.toSeq)//3. replicas in RAR - OAR -> NewReplicastartNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +"reassigned to catch up with the leader")} else {//4. Wait until all replicas in RAR are in sync with the leader.val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet//5. replicas in RAR -> OnlineReplicareassignedReplicas.foreach { replica =>replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,replica)), OnlineReplica)}//6. Set AR to RAR in memory.//7. Send LeaderAndIsr request with a potential new leader (if current leader not in RAR) and// a new AR (using RAR) and same isr to every broker in RARmoveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)//8. replicas in OAR - RAR -> Offline (force those replicas out of isr)//9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted)stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)//10. Update AR in ZK with RAR.updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)//11. Update the /admin/reassign_partitions path in ZK to remove this partition.removePartitionFromReassignedPartitions(topicAndPartition)info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))controllerContext.partitionsBeingReassigned.remove(topicAndPartition)//12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every brokersendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))// signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completedtopicDeletionManager.resumeDeletionForTopics(Set(topicAndPartition.topic))}
}
官方JavaDoc比较详细,给出了3个方便解释流程的定义,列举如下:
- RAR(Re-assigned replicas):重分配的Replica集合,记为reassignedReplicas;
- OAR(Original assigned replicas):重分配之前的原始Replica集合,通过controllerContext.partitionReplicaAssignment()方法取得;
- AR(Assigned replicas):当前的Replica集合,随着重分配的进行不断变化。
根据上文的代码和注释,我们可以很容易地梳理出重分配的具体流程:
(0) 检查RAR是否都已经在Partition的ISR集合中(即是否已经同步),若否,则计算RAR与OAR的差集,也就是需要被创建或者重分配的Replica集合;
(1) 计算RAR和OAR的并集,即所有Replica的集合,并将ZK中的AR更新;
(2) 增加Partition的Leader纪元值,并向AR中的所有Replica所在的Broker发送LeaderAndIsrRequest;
(3) 更新RAR与OAR的差集中Replica的状态为NewReplica,以触发这些Replica的创建或同步;
(4) 计算OAR和RAR的差集,即重分配过程中需要被下线的Replica集合;
(5) 等待RAR都已经在Partition的ISR集合中,将RAR中Replica的状态设置为OnlineReplica,表示同步完成;
(6) 将迁移现场的AR更新为RAR;
(7) 检查Partition的Leader是否在RAR中,如果没有,则触发新的Leader选举。然后增加Partition的Leader纪元值,发送LeaderAndIsrRequest更新Leader的结果;
(8~9) 将OAR和RAR的差集中的Replica状态设为Offline->NonExistentReplica,这些Replica后续将被删除;
(10) 将ZK中的AR集合更新为RAR;
(11) 一个Partition重分配完成,更新/admin/reassign_partitions
节点中的执行计划,删掉完成的Partition;
(12) 发送UpdateMetadataRequest给所有Broker,刷新元数据缓存;
(13) 如果有一个Topic已经重分配完成并且将被删除,就将它从不可删除的Topic集合中移除。
The End
最后一个小问题:Partition重分配往往会涉及大量的数据交换,有可能会影响正常业务的运行,如何避免呢?ReassignPartitionsCommand也提供了throttle功能用于限流,在代码和帮助文档中都可以看到它,就不多讲了。当然,一旦启用了throttle,我们一定要定期进行verify操作,防止因为限流导致重分配的Replica一直追不上Leader的情况发生。
民那晚安晚安。
http://www.taodudu.cc/news/show-5061780.html
相关文章:
- Hive的分区(partition)-动态分区
- QQ云控引流 为您定制营销推广方案
- 睿速QQ营销——网上销售领航者!!
- QQ营销,你必须知道的技巧
- 营销QQ咨询服务引入代码
- 腾讯QQ精准TIPS消息营销介绍
- 营销QQ、企业QQ添加在线交谈链接
- 营销qq会话在线聊天代码(也可以匿名)
- 如何在腾讯营销QQ获取在线咨询代码
- 营销QQ使用方法
- rke部署k8s_v1.20.15高可用
- 基于RKE部署的rancher管理平台迁移
- RKE方式部署Kubernetes集群
- 单机单点 Rke2 Single 升级到 高可用 Rke2 HA
- RKE安装Kubernetes
- rancher导入rke
- RKE2部署高可用Rancher v2.7.1
- RKE2安装k8s
- RKE vs. RKE2:对比两种 Kubernetes 发行版
- 使用RKE搭建docker-k8s集群
- Rancher RKE K8s 集群 etcd 恢复
- k8s.5-使用RKE构建企业生产级Kubernetes集群
- Rancher安装k8s: rke高可用集群
- rancher rke 集群恢复
- 使用rke安装高可用k8s集群
- RKE2安装kubernetes(2)
- 使用rancher rke2配置高可用k8s集群
- 使用RKE部署高可用Rancher
- RKE 升级kubernetes 版本
- RKE部署高可用Kubernetes集群
Kafka Partition重分配流程简析相关推荐
- uboot源码分析(1)uboot 命令解析流程简析
uboot 命令解析流程简析 uboot正常启动后,会调用main_loop(void)函数,进入main_loop()之后,如果在规定的时间(CONFIG_BOOTDELAY)内,没有检查到任何按键 ...
- Linux的启动流程简析(以Debian为例)
Linux的启动流程简析(以Debian为例) 正文: 前面的文章探讨BIOS和主引导记录的作用.那篇文章不涉及操作系统,只与主板的板载程序有关.今天,我想接着往下写,探讨操作系统接管硬件以后发生的事 ...
- Android开机启动流程简析
Android开机启动流程简析 (一) 文章目录 Android开机启动流程简析 (一) 前言 一.开机启动的流程概述 二.Android的启动过程分析 (1).总体流程 init简述 Zygote简 ...
- CAS流程简析 服务端校验Ticket
相关阅读 CAS基础组件 简介 CAS流程简析 服务端处理未携带Service登录请求 CAS流程简析 服务端处理携带Service登录请求 CAS基础组件 客户端过滤器 简介 用户访问客户端的请求若 ...
- Python源码学习:启动流程简析
Python源码分析 本文环境python2.5系列 参考书籍<<Python源码剖析>> Python简介: python主要是动态语言,虽然Python语言也有编译,生成中 ...
- AsyncTask 源码流程简析
参考链接: https://blog.csdn.net/lmj623565791/article/details/38614699 AsyncTask的几个重要函数和参数 AsyncTask是一个抽象 ...
- GB28181国标协议通讯流程简析以及NVR注册不上等相关问题点记录
目录 留给读者 初识GB28181协议 什么是SIP? SIP中的INVITE SIP中的MESSAGE 什么是NVR? GB28181从注册到注销都经历了哪些步骤? 注册 设备信息查询 实时视频.历 ...
- android 5.1 壁纸路径,RTFSC – Android5.1 壁纸设置流程简析 – RustFisher
Android5.1 壁纸设置流程浅析 Ubuntu14.04 Android5.1 Source Insight3 这里只是简单分析一下5.1里是如何设置壁纸的:这个流程和4.4有一些不同.但基 ...
- Regulator相关GPIO控制使用流程简析
转载请注明出处,亲1,注册到平台 举例: extern struct gpio_regulator_platform_data v210_gpio_regs_platform_data; static ...
最新文章
- MVC - 17.OA项目
- Android平台使用PocketSphinx做离线语音识别,小范围语音99%识别率
- 2021年春季学期-信号与系统-第一次作业参考答案-第四题
- Vim安装、配置和插件的添加使用(可以以目录的形式打开)
- 让div margin属性消失_为什么div里面打一个字之后就会有高度了呢?
- 2022年中国餐饮经营参数蓝皮书
- 单击选定单元格后输入新内容_2015年计算机一级msoffice考前简答题练习
- oracle12c新特点之可插拔数据库(Pluggable Database,PDB)
- 计算机网络与多媒体专科测试,上海第二工业大学2021年专科层次依法自主招生生考试职业技能测试考纲...
- 恩智浦电磁组智能汽车竞赛视频
- 关键字查询 import keyword
- 说说我的专业计算机作文,说说我自己作文(精选11篇)
- 微信公众号1万粉丝流量主能赚多少钱?
- Java将图片转为Base64
- 如何让手机 1 秒打开健康码,任何机型!
- yolov7 打开深度摄像头 realsences
- RabbitMQ 学习笔记
- 股票投资 策略(收集)
- 马哥2016linux就业班+架构班+运维班全套
- carbondata使用笔记