节日快乐~

今天是属于广大程序员的节日,祝自己快乐hhhhhh

随着业务量的急速膨胀和又一年双11的到来,我们会对现有的Kafka集群进行扩容,以应对更大的流量和业务尖峰。当然,扩容之后的新Kafka Broker默认是不会有任何Topic和Partition的,需要手动利用分区重分配命令kafka-reassign-partitions将现有的Partition/Replica平衡到新的Broker上去。那么Kafka具体是如何执行重分配流程的呢?本文就来简单解读一下。

生成、提交重分配方案

我们知道,使用kafka-reassign-partitions命令分为三步,一是根据指定的Topic生成JSON格式的重分配方案(--generate),二是将生成的方案提交执行(--execute),三是观察重分配的进度(--verify),它们分别对应kafka.admin.ReassignPartitionsCommand类中的generateAssignment()、executeAssignment()和verifyAssignment()方法。

generateAssignment()方法会调用AdminUtils#assignReplicasToBrokers()方法生成Replica分配方案。源码就不再读了,其原则简述如下:

  • 将Replica尽量均匀地分配到各个Broker上去;
  • 一个Partition的所有Replica必须位于不同的Broker上;
  • 如果Broker有机架感知(rack aware)的信息,将Partition的Replica尽量分配到不同的机架。

executeReassignment()方法调用了reassignPartitions()方法,其源码如下。

def reassignPartitions(throttle: Throttle = NoThrottle, timeoutMs: Long = 10000L): Boolean = {maybeThrottle(throttle)try {val validPartitions = proposedPartitionAssignment.filter { case (p, _) => validatePartition(zkUtils, p.topic, p.partition) }if (validPartitions.isEmpty) falseelse {if (proposedReplicaAssignment.nonEmpty) {val adminClient = adminClientOpt.getOrElse(throw new AdminCommandFailedException("bootstrap-server needs to be provided in order to reassign replica to the specified log directory"))val alterReplicaDirResult = adminClient.alterReplicaLogDirs(proposedReplicaAssignment.asJava, new AlterReplicaLogDirsOptions().timeoutMs(timeoutMs.toInt))alterReplicaDirResult.values().asScala.foreach { case (replica, future) => {try {future.get()throw new AdminCommandFailedException(s"Partition ${replica.topic()}-${replica.partition()} already exists on broker ${replica.brokerId()}." +s" Reassign replica to another log directory on the same broker is currently not supported.")} catch {case t: ExecutionException =>t.getCause match {case e: ReplicaNotAvailableException => // It is OK if the replica is not availablecase e: Throwable => throw e}}}}}val jsonReassignmentData = ZkUtils.formatAsReassignmentJson(validPartitions)zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData)true}} catch {// ......}
}

在进行必要的Partition校验之后,创建ZK持久节点/admin/reassign_partitions,并将JSON格式的重分配方案写进去。如果该节点存在,就表示已经在进行重分配,不能再启动新的重分配流程(相关的判断在executeReassignment()方法中)。

监听并处理重分配事件

在之前讲解Kafka Controller时,笔者提到Controller会注册多个ZK监听器,将监听到的事件投递到内部的事件队列,并由事件处理线程负责处理。监听ZK中/admin/reassign_partitions节点的监听器为PartitionReassignmentListener,并产生PartitionReassignment事件,处理逻辑如下。

case object PartitionReassignment extends ControllerEvent {def state = ControllerState.PartitionReassignmentoverride def process(): Unit = {if (!isActive) returnval partitionReassignment = zkUtils.getPartitionsBeingReassigned()val partitionsToBeReassigned = partitionReassignment.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))partitionsToBeReassigned.foreach { case (partition, context) =>if(topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)) {error(s"Skipping reassignment of partition $partition since it is currently being deleted")removePartitionFromReassignedPartitions(partition)} else {initiateReassignReplicasForTopicPartition(partition, context)}}}
}

该方法先取得需要重分配的Partition列表,然后从中剔除掉那些已经被标记为删除的Topic所属的Partition,再调用initiateReassignReplicasForTopicPartition()方法:

def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,reassignedPartitionContext: ReassignedPartitionsContext) {val newReplicas = reassignedPartitionContext.newReplicasval topic = topicAndPartition.topicval partition = topicAndPartition.partitiontry {val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)assignedReplicasOpt match {case Some(assignedReplicas) =>if (assignedReplicas == newReplicas) {throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +" %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))} else {info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))// first register ISR change listenerwatchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)// mark topic ineligible for deletion for the partitions being reassignedtopicDeletionManager.markTopicIneligibleForDeletion(Set(topic))onPartitionReassignment(topicAndPartition, reassignedPartitionContext)}case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist".format(topicAndPartition))}} catch {case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)// remove the partition from the admin path to unblock the admin clientremovePartitionFromReassignedPartitions(topicAndPartition)}
}

该方法的执行逻辑如下:

  1. 判断Partition的原有Replica是否与即将重分配的新Replica相同,如果相同则抛出异常;
  2. 注册即将被重分配的Partition的ISR变化监听器;
  3. 把即将被重分配的Partition/Replica记录在Controller上下文中的partitionsBeingReassigned集合中;
  4. 把即将被重分配的Topic标记为不可删除;
  5. 调用onPartitionReassignment()方法真正触发重分配流程。

执行重分配流程

onPartitionReassignment()方法的代码如下。

def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {val reassignedReplicas = reassignedPartitionContext.newReplicasif (!areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)) {info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +"reassigned not yet caught up with the leader")val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSetval newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet//1. Update AR in ZK with OAR + RAR.updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)//2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR).updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),newAndOldReplicas.toSeq)//3. replicas in RAR - OAR -> NewReplicastartNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +"reassigned to catch up with the leader")} else {//4. Wait until all replicas in RAR are in sync with the leader.val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet//5. replicas in RAR -> OnlineReplicareassignedReplicas.foreach { replica =>replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,replica)), OnlineReplica)}//6. Set AR to RAR in memory.//7. Send LeaderAndIsr request with a potential new leader (if current leader not in RAR) and//   a new AR (using RAR) and same isr to every broker in RARmoveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)//8. replicas in OAR - RAR -> Offline (force those replicas out of isr)//9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted)stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)//10. Update AR in ZK with RAR.updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)//11. Update the /admin/reassign_partitions path in ZK to remove this partition.removePartitionFromReassignedPartitions(topicAndPartition)info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))controllerContext.partitionsBeingReassigned.remove(topicAndPartition)//12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every brokersendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))// signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completedtopicDeletionManager.resumeDeletionForTopics(Set(topicAndPartition.topic))}
}

官方JavaDoc比较详细,给出了3个方便解释流程的定义,列举如下:

  • RAR(Re-assigned replicas):重分配的Replica集合,记为reassignedReplicas;
  • OAR(Original assigned replicas):重分配之前的原始Replica集合,通过controllerContext.partitionReplicaAssignment()方法取得;
  • AR(Assigned replicas):当前的Replica集合,随着重分配的进行不断变化。

根据上文的代码和注释,我们可以很容易地梳理出重分配的具体流程:

(0) 检查RAR是否都已经在Partition的ISR集合中(即是否已经同步),若否,则计算RAR与OAR的差集,也就是需要被创建或者重分配的Replica集合;

(1) 计算RAR和OAR的并集,即所有Replica的集合,并将ZK中的AR更新;

(2) 增加Partition的Leader纪元值,并向AR中的所有Replica所在的Broker发送LeaderAndIsrRequest;

(3) 更新RAR与OAR的差集中Replica的状态为NewReplica,以触发这些Replica的创建或同步;

(4) 计算OAR和RAR的差集,即重分配过程中需要被下线的Replica集合;

(5) 等待RAR都已经在Partition的ISR集合中,将RAR中Replica的状态设置为OnlineReplica,表示同步完成;

(6) 将迁移现场的AR更新为RAR;

(7) 检查Partition的Leader是否在RAR中,如果没有,则触发新的Leader选举。然后增加Partition的Leader纪元值,发送LeaderAndIsrRequest更新Leader的结果;

(8~9) 将OAR和RAR的差集中的Replica状态设为Offline->NonExistentReplica,这些Replica后续将被删除;

(10) 将ZK中的AR集合更新为RAR;

(11) 一个Partition重分配完成,更新/admin/reassign_partitions节点中的执行计划,删掉完成的Partition;

(12) 发送UpdateMetadataRequest给所有Broker,刷新元数据缓存;

(13) 如果有一个Topic已经重分配完成并且将被删除,就将它从不可删除的Topic集合中移除。

The End

最后一个小问题:Partition重分配往往会涉及大量的数据交换,有可能会影响正常业务的运行,如何避免呢?ReassignPartitionsCommand也提供了throttle功能用于限流,在代码和帮助文档中都可以看到它,就不多讲了。当然,一旦启用了throttle,我们一定要定期进行verify操作,防止因为限流导致重分配的Replica一直追不上Leader的情况发生。

民那晚安晚安。


http://www.taodudu.cc/news/show-5061780.html

相关文章:

  • Hive的分区(partition)-动态分区
  • QQ云控引流 为您定制营销推广方案
  • 睿速QQ营销——网上销售领航者!!
  • QQ营销,你必须知道的技巧
  • 营销QQ咨询服务引入代码
  • 腾讯QQ精准TIPS消息营销介绍
  • 营销QQ、企业QQ添加在线交谈链接
  • 营销qq会话在线聊天代码(也可以匿名)
  • 如何在腾讯营销QQ获取在线咨询代码
  • 营销QQ使用方法
  • rke部署k8s_v1.20.15高可用
  • 基于RKE部署的rancher管理平台迁移
  • RKE方式部署Kubernetes集群
  • 单机单点 Rke2 Single 升级到 高可用 Rke2 HA
  • RKE安装Kubernetes
  • rancher导入rke
  • RKE2部署高可用Rancher v2.7.1
  • RKE2安装k8s
  • RKE vs. RKE2:对比两种 Kubernetes 发行版
  • 使用RKE搭建docker-k8s集群
  • Rancher RKE K8s 集群 etcd 恢复
  • k8s.5-使用RKE构建企业生产级Kubernetes集群
  • Rancher安装k8s: rke高可用集群
  • rancher rke 集群恢复
  • 使用rke安装高可用k8s集群
  • RKE2安装kubernetes(2)
  • 使用rancher rke2配置高可用k8s集群
  • 使用RKE部署高可用Rancher
  • RKE 升级kubernetes 版本
  • RKE部署高可用Kubernetes集群

Kafka Partition重分配流程简析相关推荐

  1. uboot源码分析(1)uboot 命令解析流程简析

    uboot 命令解析流程简析 uboot正常启动后,会调用main_loop(void)函数,进入main_loop()之后,如果在规定的时间(CONFIG_BOOTDELAY)内,没有检查到任何按键 ...

  2. Linux的启动流程简析(以Debian为例)

    Linux的启动流程简析(以Debian为例) 正文: 前面的文章探讨BIOS和主引导记录的作用.那篇文章不涉及操作系统,只与主板的板载程序有关.今天,我想接着往下写,探讨操作系统接管硬件以后发生的事 ...

  3. Android开机启动流程简析

    Android开机启动流程简析 (一) 文章目录 Android开机启动流程简析 (一) 前言 一.开机启动的流程概述 二.Android的启动过程分析 (1).总体流程 init简述 Zygote简 ...

  4. CAS流程简析 服务端校验Ticket

    相关阅读 CAS基础组件 简介 CAS流程简析 服务端处理未携带Service登录请求 CAS流程简析 服务端处理携带Service登录请求 CAS基础组件 客户端过滤器 简介 用户访问客户端的请求若 ...

  5. Python源码学习:启动流程简析

    Python源码分析 本文环境python2.5系列 参考书籍<<Python源码剖析>> Python简介: python主要是动态语言,虽然Python语言也有编译,生成中 ...

  6. AsyncTask 源码流程简析

    参考链接: https://blog.csdn.net/lmj623565791/article/details/38614699 AsyncTask的几个重要函数和参数 AsyncTask是一个抽象 ...

  7. GB28181国标协议通讯流程简析以及NVR注册不上等相关问题点记录

    目录 留给读者 初识GB28181协议 什么是SIP? SIP中的INVITE SIP中的MESSAGE 什么是NVR? GB28181从注册到注销都经历了哪些步骤? 注册 设备信息查询 实时视频.历 ...

  8. android 5.1 壁纸路径,RTFSC – Android5.1 壁纸设置流程简析 – RustFisher

    Android5.1 壁纸设置流程浅析 Ubuntu14.04  Android5.1  Source Insight3 这里只是简单分析一下5.1里是如何设置壁纸的:这个流程和4.4有一些不同.但基 ...

  9. Regulator相关GPIO控制使用流程简析

    转载请注明出处,亲1,注册到平台 举例: extern struct gpio_regulator_platform_data v210_gpio_regs_platform_data; static ...

最新文章

  1. MVC - 17.OA项目
  2. Android平台使用PocketSphinx做离线语音识别,小范围语音99%识别率
  3. 2021年春季学期-信号与系统-第一次作业参考答案-第四题
  4. Vim安装、配置和插件的添加使用(可以以目录的形式打开)
  5. 让div margin属性消失_为什么div里面打一个字之后就会有高度了呢?
  6. 2022年中国餐饮经营参数蓝皮书
  7. 单击选定单元格后输入新内容_2015年计算机一级msoffice考前简答题练习
  8. oracle12c新特点之可插拔数据库(Pluggable Database,PDB)
  9. 计算机网络与多媒体专科测试,上海第二工业大学2021年专科层次依法自主招生生考试职业技能测试考纲...
  10. 恩智浦电磁组智能汽车竞赛视频
  11. 关键字查询 import keyword
  12. 说说我的专业计算机作文,说说我自己作文(精选11篇)
  13. 微信公众号1万粉丝流量主能赚多少钱?
  14. Java将图片转为Base64
  15. 如何让手机 1 秒打开健康码,任何机型!
  16. yolov7 打开深度摄像头 realsences
  17. RabbitMQ 学习笔记
  18. 股票投资 策略(收集)
  19. 马哥2016linux就业班+架构班+运维班全套
  20. carbondata使用笔记

热门文章

  1. 用python实现psnr
  2. p文件转m文件的方法
  3. JVM内存模型——运行时数据区的特点和作用
  4. 为什么选择转行学网安?老男孩培训改变了我的人生!
  5. 测试工程师常用面试题
  6. 显卡设置导致osg中数字显示不全问题的解决方法
  7. visual c++ 下载地址
  8. 《攻守道》:马云的人设与梦想
  9. 【python】python获取当前py文件的文件名
  10. 车检预约小程序如何助机动车检测站营销转型升级?