上一篇kafka源码(一)correspond to/explain Kafka设计解析(二) 中的3.2、3.3。以前一直用kafka 0.8.2.x,那时候redis开始风靡,hadoop方兴未艾,一晃四五年过去了,终于老得可以读读源码。

不得不说Kafka的代码风格比spark好多了。毕竟spark太庞大,相对来说kafka小而美吧。
可能出于性能的考虑,以及ZooKeeper的机制,kafka大部分都是异步回调的事件机制。类似epoll对IO的处理。
源码中几乎对每个回调函数都注释了该方法什么情况下会被Invoke,以及触发后做哪些工作。这对于开发维护和阅读都很友好,真是相见恨晚哈哈。

本文第3、4部分呼应前文的3.4 broker failover、3.5 Partition的Leader选举。

内容目录

  • 1. BrokerChangeListener的源起
  • 2. Controller对新建Broker的处理
  • 3. Controller对Broker failure的处理
  • 4. Partition的Leader选举

1. BrokerChangeListener的源起

broker被选举为controller后,会在onBecomingLeader(亦即onControllerFailover)回调中注册两个状态机的Listener:

    partitionStateMachine.registerListeners()replicaStateMachine.registerListeners()

其中replica状态机registerListeners()时,调用registerBrokerChangeListener(),在"/brokers/ids" 路径注册brokerChangeListener。
当前节点以及子节点增加或者删除的状态改变,都会触发这个Listener。

  // 位于 ReplicaStateMachine.scala// register ZK listeners of the replica state machinedef registerListeners() {// register broker change listenerregisterBrokerChangeListener()}private def registerBrokerChangeListener() = {zkUtils.zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)}

注册Listener使用的是zkClient的subscribeChildChanges方法,它触发的回调(第二个参数)是一个IZkChildListener接口:

// 见 zkClient document
subscribeChildChanges(java.lang.String path, IZkChildListener listener)

ControllerZkChildListener实现了IZkChildListener接口:

// ControllerZkListener.scala
trait ControllerZkChildListener extends IZkChildListener with ControllerZkListener {@throws[Exception]final def handleChildChange(parentPath: String, currentChildren: java.util.List[String]): Unit = {// Due to zkclient's callback order, it's possible for the callback to be triggered after the controller has movedif (controller.isActive)doHandleChildChange(parentPath, currentChildren.asScala)}@throws[Exception]def doHandleChildChange(parentPath: String, currentChildren: Seq[String]): Unit
}

触发时,如果controller是Active的,就执行ControllerZkListener的doHandleChildChange方法。
由于BrokerChangeListener继承了ControllerZkChildListener,也就是执行BrokerChangeListener的doHandleChildChange。

Kafka对所有新增的broker和死掉的broker的处理,都在这个回调函数中,如下:

  /*** 位于ReplicaStateMachine.scala* This is the zookeeper listener that triggers all the state transitions for a replica*/class BrokerChangeListener(protected val controller: KafkaController) extends ControllerZkChildListener {protected def logName = "BrokerChangeListener"// 同时处理新增的Broker和死掉的Broker。// 通过controller来对新Broker执行onBrokerStartup,对死掉的Broker执行onBrokerFailure。def doHandleChildChange(parentPath: String, currentBrokerList: Seq[String]) {info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.sorted.mkString(",")))inLock(controllerContext.controllerLock) {// ReplicaStateMachine 已经启动(startup)时才会执行if (hasStarted.get) {ControllerStats.leaderElectionTimer.time {try {// 从结点路径读取当前的Broker信息,也就是节点变化后的val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)val curBrokerIds = curBrokers.map(_.id)val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds// liveOrShuttingDownBrokerIds是一个Set,--是求差集操作。// https://docs.scala-lang.org/zh-cn/overviews/collections/sets.html// 节点变化后当前的Broker减去已有的(包括活着的和正在关闭的)Broker,得到新建的Broker;val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds// 结点变化前的Brokers减去变化后的,得到挂掉的Brokersval deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIdsval newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))controllerContext.liveBrokers = curBrokersval newBrokerIdsSorted = newBrokerIds.toSeq.sortedval deadBrokerIdsSorted = deadBrokerIds.toSeq.sortedval liveBrokerIdsSorted = curBrokerIds.toSeq.sortedinfo("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s".format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))// 对新添加的每个broker,将它添加到controllerChannelManager中去,执行一系列操作newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)// 对dead brokers,从controllerChannelManager中移除,包括关闭requestSendThread,从ChannelMgr的内存缓存清除deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)if(newBrokerIds.nonEmpty)controller.onBrokerStartup(newBrokerIdsSorted)if(deadBrokerIds.nonEmpty)controller.onBrokerFailure(deadBrokerIdsSorted)} catch {case e: Throwable => error("Error while handling broker changes", e)}}}}}}
}

对于新添加的每个broker,将它添加到controllerChannelManager中去,Controller会保持与新Broker的连接,通过创建和启动专门的线程发送请求。
然后执行controller.onBrokerStartup()。
对于每一个dead broker,会将它从controllerChannelManager中移除,关闭requestSendThread这个“Channel”,并从ChannelMgr的内存缓存清除。
然后执行controller.onBrokerFailure()

另外,ReplicaState共有7种状态:

sealed trait ReplicaState { def state: Byte }
case object NewReplica extends ReplicaState { val state: Byte = 1 }
case object OnlineReplica extends ReplicaState { val state: Byte = 2 }
case object OfflineReplica extends ReplicaState { val state: Byte = 3 }
case object ReplicaDeletionStarted extends ReplicaState { val state: Byte = 4}
case object ReplicaDeletionSuccessful extends ReplicaState { val state: Byte = 5}
case object ReplicaDeletionIneligible extends ReplicaState { val state: Byte = 6}
case object NonExistentReplica extends ReplicaState { val state: Byte = 7 }

上面是整体情况,下面细看。

2. Controller对新建Broker的处理

主要是这两行:

newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
controller.onBrokerStartup(newBrokerIdsSorted)

controllerChannelManager的addBroker是个函数:

  def addBroker(broker: Broker) {// be careful here. Maybe the startup() API has already started the request send threadbrokerLock synchronized {if(!brokerStateInfo.contains(broker.id)) {addNewBroker(broker)startRequestSendThread(broker.id)}}}

addNewBroker(broker)中,对新broker创建了brokerNode、Selector、NetworkClient,
最后创建一个RequestSendThread线程对象,并把broker的ControllerBrokerStateInfo保存到brokerStateInfo对象。
RequestSendThread是一个ShutdownableThread。
接下来startRequestSendThread就是启动这个线程,每隔100ms发送clientRequest消息,并处理回复:

    // key code in startRequestSendThread:clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time)

3. Controller对Broker failure的处理

Controller执行onBrokerFailure,该函数由replica状态机的BrokerChangeListener触发,带上failed brokers列表作为输入。它做下面4件事:

  1. 把leaders 死掉的分区标记为offline
  2. 对所有offline或新的partitions触发OnlinePartition的状态改变
  3. 在输入的failed brokers列表上,调用OfflineReplica的状态改变
  4. 如果没有partitions受影响,就发送UpdateMetadataRequest消息给live or shutting down brokers。
  /*** This callback is invoked by the replica state machine's broker change listener with the list of failed brokers* as input. It does the following -* 1. Mark partitions with dead leaders as offline* 2. Triggers the OnlinePartition state change for all new/offline partitions* 3. (这句好像写错了:)Invokes the OfflineReplica state change on the input list of newly started brokers* 4. If no partitions are effected then send UpdateMetadataRequest to live or shutting down brokers** Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point.  This is because* the partition state machine will refresh our cache for us when performing leader election for all new/offline* partitions coming online.*/def onBrokerFailure(deadBrokers: Seq[Int]) {info("Broker failure callback for %s".format(deadBrokers.mkString(",")))val deadBrokersThatWereShuttingDown =deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))info("Removed %s from list of shutting down brokers.".format(deadBrokersThatWereShuttingDown))val deadBrokersSet = deadBrokers.toSet// trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokersval partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader =>deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader) &&!deleteTopicManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet// 1,为当前leader在deadBrokers中的所有partitions 触发 OfflinePartition的目的状态partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)// trigger OnlinePartition state changes for offline or new partitions// 2,立即重新进入OnlinePartition状态// 把所有NewPartition or OfflinePartition状态的(除了要删除的)partition,触发OnlinePartition状态,// 使用controller.offlinePartitionSelector 重新选举partition的leader。partitionStateMachine.triggerOnlinePartitionStateChange()// filter out the replicas that belong to topics that are being deleted// 3,从dead Brokers过滤掉(filterNot)属于将要删除的Topics的replicas,得到activeReplicas On DeadBrokersvar allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokersSet)val activeReplicasOnDeadBrokers = allReplicasOnDeadBrokers.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))// 4,处理 activeReplicas,使进入OfflineReplica状态// 向activeReplicas发送stop replica 命令,使停止从leader拉取数据.replicaStateMachine.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica)// check if topic deletion state for the dead replicas needs to be updated// 5,过滤出 设置为要删除的replicas,从topicMgr中删除,并触发ReplicaDeletionIneligible状态val replicasForTopicsToBeDeleted = allReplicasOnDeadBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))if(replicasForTopicsToBeDeleted.nonEmpty) {// it is required to mark the respective replicas in TopicDeletionFailed state since the replica cannot be// deleted when the broker is down. This will prevent the replica from being in TopicDeletionStarted state indefinitely// since topic deletion cannot be retried until at least one replica is in TopicDeletionStarted statedeleteTopicManager.failReplicaDeletion(replicasForTopicsToBeDeleted)}// If broker failure did not require leader re-election, inform brokers of failed broker// Note that during leader re-election, brokers update their metadataif (partitionsWithoutLeader.isEmpty) {sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)}}

partition的状态有四种:

sealed trait PartitionState { def state: Byte }
case object NewPartition extends PartitionState { val state: Byte = 0 }
case object OnlinePartition extends PartitionState { val state: Byte = 1 }
case object OfflinePartition extends PartitionState { val state: Byte = 2 }
case object NonExistentPartition extends PartitionState { val state: Byte = 3 }

4. Partition的Leader选举

KafkaController中共定义了四种selector选举器,它们都继承自PartitionLeaderSelector:

  val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)

0.10.2中去除了NoOpLeaderSelector。这四种选举器分别对应不同的selectLeader策略,从ISR中选取分区的Leader。具体详情有空再补充吧。

Kafka源码阅读-Controller(二)管理brokers相关推荐

  1. Alibaba Druid 源码阅读(二) 数据库连接池实现初步探索

    Alibaba Druid 源码阅读(二) 数据库连接池实现初步探索 简介 在上篇文章中,了解了连接池的应用场景和本地运行了示例,本篇文章中,我们尝试来探索下Alibaba Druid数据库连接池的整 ...

  2. Soul 网关源码阅读(二)代码初步运行

    Soul 源码阅读(二)代码初步运行 简介     基于上篇:Soul 源码阅读(一) 概览,这部分跑一下Soul网关的示例 过程记录     现在我们可以根据地图,稍微探索一下周边,摸一摸      ...

  3. gin context和官方context_gin 源码阅读(二) 路由和路由组

    " 上一篇讲的是gin 框架的启动原理,今天来讲一下 gin 路由的实现. 1 用法 还是老样子,先从使用方式开始: func main() { r := gin.Default() r.G ...

  4. Rpc框架dubbo-client(v2.6.3) 源码阅读(二)

    接上一篇 dubbo-server 之后,再来看一下 dubbo-client 是如何工作的. dubbo提供者服务示例, 其结构是这样的! dubbo://192.168.11.6:20880/co ...

  5. Mybatis源码阅读之二——模板方法模式与Executor

    [系列目录] Mybatis源码阅读之一--工厂模式与SqlSessionFactory 文章目录 一. 模板方法模式 二. 同步回调与匿名函数 三. Executor BaseExecutor与其子 ...

  6. windows下kafka源码阅读环境搭建

    工具准备:jdk1.8,scala-2.11.11,gradle-3.1,zookeeper-3.4.5,kafka-0.10.0.1-src.tgz, kafka_2.11-0.10.0.1.tgz ...

  7. Lidar_imu自动标定源码阅读(二)——calibration部分

    源码阅读,能力有限,如有某处理解错误,请指出,谢谢. Lidar_parser_base.h:激光雷达分析器基础 #pragma once#include <pcl/point_cloud.h& ...

  8. werkzeug源码阅读笔记(二) 下

    wsgi.py----第二部分 pop_path_info()函数 先测试一下这个函数的作用: >>> from werkzeug.wsgi import pop_path_info ...

  9. Mybatis源码阅读(二):动态节点解析2.1 —— SqlSource和SqlNode

    *************************************优雅的分割线 ********************************** 分享一波:程序员赚外快-必看的巅峰干货 如 ...

最新文章

  1. 驱动数字经济加速,摩尔线程发布全新元计算架构MUSA和GPU产品
  2. 个人阅读作业Week7
  3. 【基础】主流web服务器的介绍
  4. 在IDEA上使用maven构建WEB工程,出现Unable to compile class for JSP错误,页面500. ————解决方案
  5. 最新最全产品删除页代码
  6. 加大weblogic在Linux内存,在linux运行weblogic出现运行内存不足错误,求鞭挞....
  7. 沃兹批评苹果避税:纳税还不如我积极
  8. NLP --- 文本分类(基于SVD的隐语意分析(LSA))
  9. spark入门(1)
  10. 学习C语言,有哪些值得推荐的经典书籍?
  11. SqlServer 数据库可疑修复
  12. 程序员的算法趣题Q62: 日历中的最大矩形
  13. 第二篇 界面开发 (Android学习笔记)
  14. html桂花酿网页,桂花酒的做法
  15. ionic 应用在iOS上打开相机拍照闪退、百度地图/高德地图定位失败(解决方案)
  16. 遍历$.each()和$().each()用法
  17. 移动web开发 rem适配布局 +苏宁首页案例+HBuilder下载less插件
  18. 2022年清华大学五道口金融学院考博(联合培养项目+普博项目)成果总结及经验分享
  19. 生鲜行业B2B交易管理系统:助力企业一体化管理,促进生鲜企业线上线下融合
  20. 【WIFI】WIFI-HT的意思

热门文章

  1. 物联网操作系统系列文章之-软件平台的力量
  2. gBuilder内测活动圆满收官
  3. [高通SDM450][Android9.0]默认取消dm-verity以及解决OTA校验vbmeta失败问题
  4. 晶体谐振器和晶体振荡器
  5. 4.2.2 屏幕保护程序
  6. 03 并发用户数怎么计算么
  7. 2021年高压电工考试题及高压电工考试APP
  8. 吉林大学计算机动画与游戏导师,吉林大学计算机 导师联系方式
  9. 杨守鸿 计算机学院,计算机学院军训师生慰问暨新生见面会顺利举办
  10. 前端培训班学习哪家比较好