Kafka启动都会创建KafkaController,然后会向zookeeper注册,第一个注册的节点就是Leader,其余都是follower。当KafkaController出现故障,不能继续管理集群,则那些KafkaController follower开始竞争成为新的Leader

KafkaController的启动过程是在startup方法中完成的:

首先:注册一个SessionExpirationListener会话超时监听器,监听KafkaController和zookeeper的连接状态。连接超时后创建新连接时会触发SessionExpirationListener的handleNewSession方法

然后:启动ZookeeperLeaderElector

def startup() = {
  inLock(controllerContext.controllerLock) {
    info("Controller starting up")
    // 注册一个会话超时的监听器
   
registerSessionExpirationListener()
    isRunning = true
    // 调用ZookeeperLeaderElector的startup方法
    controllerElector
.startup
    info("Controller startup complete")
  }
}

我们先来分析ZookeeperLeaderElector这个类:

一 核心字段

controllerContext:ControllerContext KafkaController上下问信息

electionPath: String 选举的路径

onBecomingLeader:需要传递一个controller 故障转移函数

onResigningAsLeader:需要传递一个后续做一些清理工作的函数

leaderId:Int 缓存当前controller leader的id

leaderChangeListener:LeaderChangeListener 监听'/controller'节点数据变化,当该节点保存的leaderId发生变化时,会触发LeaderChangeListner进行对应处理

二 重要方法

2.1 startup方法

def startup {inLock(controllerContext.controllerLock) {// 监听'/controller'节点数据的变化controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)// 开始选举elect}
}

2.2 elect 选举

def elect: Boolean = {val timestamp = SystemTime.milliseconds.toStringval electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))// 获取zookeeper当前记录的controller leader idleaderId = getControllerID// 如果已存在,表示该broker已经是leader,返回true,不需要选举if(leaderId != -1) {debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))return amILeader}try {// 创建临时节点,并进行session检查等,如果临时节点已存在抛出异常val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath, electString,controllerContext.zkUtils.zkConnection.getZookeeper, JaasUtils.isZkSecurityEnabled())zkCheckedEphemeral.create()info(brokerId + " successfully elected as leader")// 更新leader id字段leaderId = brokerId// 如果成功,调用故障转移函数onBecomingLeader()} catch {case e: ZkNodeExistsException =>leaderId = getControllerIDif (leaderId != -1)debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))elsewarn("A leader has been elected but just resigned, this will result in another round of election")case e2: Throwable =>error("Error while electing or becoming leader on broker %d".format(brokerId), e2)resign()}amILeader
}

在这里我们知道,它会调用onBecomingLeader也就是KafkaController里的onControllerFailover方法

def onControllerFailover() {if(isRunning) {info("Broker %d starting become controller state transition".format(config.brokerId))// 从zookeeper读取controller的年代信息,从"/controller_epoch"路径获取readControllerEpochFromZookeeper()// 递增ControllerEpoch值并且写入zookeeperincrementControllerEpoch(zkUtils.zkClient)// 注册ReassignedPartitionsListener监听器registerReassignedPartitionsListener()// 注册IsrChangeNotificationListenerregisterIsrChangeNotificationListener()// 注册优先副本选举监听器registerPreferredReplicaElectionListener()// 注册TopicChangeListner和,DeleteChangeListnerpartitionStateMachine.registerListeners()// 注册BrokerChangeListenerreplicaStateMachine.registerListeners()// 实例化ControllerContext,从zookeeper读取topic,partition,replica等信息initializeControllerContext()// 启动replica 状态机,初始化所有的replica状态replicaStateMachine.startup()// 启动Partition状态机,初始化所有的partition状态partitionStateMachine.startup()// 为所有存在的topic注册PartitionModificationListenercontrollerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))// 处理副本重新分配分区maybeTriggerPartitionReassignment()// 处理"优先副本"选举的分区maybeTriggerPreferredReplicaElection()// 向集群中其他broker发送 UpdateMetadataRequest请求sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)// 根据配置决定是否开启分区的自定均衡功能if (config.autoLeaderRebalanceEnable) {info("starting the partition rebalance scheduler")autoRebalanceScheduler.startup()autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)}// 调用TopicDeletionManager,底层启动DeleteTopicThreaddeleteTopicManager.start()}elseinfo("Controller has been shut down, aborting startup/failover")
}

接下来分析一下LeaderChangeListener:

当存在zk的leader信息改变,则调用这个函数,并且在内存记录新的leader

def handleDataChange(dataPath: String, data: Object) {inLock(controllerContext.controllerLock) {val amILeaderBeforeDataChange = amILeader// 记录新的leader的idleaderId = KafkaController.parseControllerId(data.toString)info("New leader is %d".format(leaderId))// 旧的leader需要辞职,即从Controller Leader 变为Followerif (amILeaderBeforeDataChange && !amILeader)onResigningAsLeader()}
}

Controller Leader 变为Follower 需要做一些清理工作:

def onControllerResignation() {debug("Controller resigning, broker id %d".format(config.brokerId))// 撤销zookeeper上的注册的监视器deregisterIsrChangeNotificationListener()deregisterReassignedPartitionsListener()deregisterPreferredReplicaElectionListener()// 关闭TopicDeletionManagerif (deleteTopicManager != null)deleteTopicManager.shutdown()//关闭 leader rebalance定时任务if (config.autoLeaderRebalanceEnable)autoRebalanceScheduler.shutdown()inLock(controllerContext.controllerLock) {// 取消所有的ReassignedPartitionsIsrChangeListnerderegisterReassignedPartitionsIsrChangeListeners()// 关闭partition&replica状态机partitionStateMachine.shutdown()replicaStateMachine.shutdown()// 关闭ControllerChannelManager,断开与其他broker的连接if(controllerContext.controllerChannelManager != null) {controllerContext.controllerChannelManager.shutdown()controllerContext.controllerChannelManager = null}// 重置ControllerContext,切换broker状态controllerContext.epoch=0controllerContext.epochZkVersion=0brokerState.newState(RunningAsBroker)info("Broker %d resigned as the controller".format(config.brokerId))}
}

当"/controller"节点中的数据被删除时会触发handleDataDeleted方法进行处理

def handleDataDeleted(dataPath: String) {inLock(controllerContext.controllerLock) {debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader".format(brokerId, dataPath))if(amILeader)onResigningAsLeader()elect // 重新选举}
}

Kafka初始化和故障转移相关推荐

  1. 06篇 Nacos Client本地缓存及故障转移

    学习不用那么功利,二师兄带你从更高维度轻松阅读源码- 本篇文章我们来通过源码分析一下Nacos的本地缓存及故障转移功能,涉及到核心类为ServiceInfoHolder和FailoverReactor ...

  2. Kafka主题体系架构-复制、故障转移和并行处理

    本文讨论了Kafka主题的体系架构,讨论了如何将分区用于故障转移和并行处理. Kafka主题,日志和分区 Kafka将主题存储在日志中.主题日志分为多个分区.Kafka将日志的分区分布在多个服务器或磁 ...

  3. SQL故障转移集群操作方法

    SQL故障转移集群操作方法 1 给SQL服务器配置IP地址,每台服务器需要两个IP,一个通讯用,一个作为心跳线,修改计算机的名称,关闭服务器的防火墙,开启远程桌面. 2心跳网卡配置 去掉ipv6,并去 ...

  4. 搭建Windows Server 2008故障转移群集

    本文章将详细讲述基于Windows Server 2008 R2的故障转移群集的实现,包括基本知识介绍,故障转移群集环境准备,实施过程,群集的维护等. 故障转移群集可以配置使用多种不同的配置.组成群集 ...

  5. SQL Server 2005故障转移群集

    SQL Server 2005故障转移群集 SQL Server使用最广的高可用性技术叫做故障转移群集.SQL Server故障转移群集是一项基于Windows故障转移群集的一种技术.SQL Serv ...

  6. sql server 2008 故障转移群集

    数据库群集的分类: (1)主动/被动群集(常用模式) 布署简单.比较安全.应用广泛 .资源利用率低 (2)主动/主动群集 没有闲置节点,资源利用率高.安全性差,争抢资源 (3)N+1群集(较好模式) ...

  7. 在Windows Server 2012 R2中搭建SQL Server 2012故障转移集群

    需要说明的是我们搭建的SQL Server故障转移集群(SQL Server Failover Cluster)是可用性集群,而不是负载均衡集群,其目的是为了保证服务的连续性和可用性,而不是为了提高服 ...

  8. MariaDB数据库介绍三、MHA(Master HA)实现主节点故障转移

    一.MHA MHA是开源的MySQL的高可用程序,它为MySQL的主从复制架构提供了主节点故障自动转移的功能,它会监控master节点故障的时候,会提升其中的拥有最新数据的slave节点称为新的mas ...

  9. windows server2008R2故障转移群集

    1 首先规划下自己的环境 都是在VMWARE里进行 服务器AA  Public IP 192.168.1.120 心跳Private IP 10.0.0.10 服务器BB Public IP 192. ...

最新文章

  1. 如何运行ruby代码
  2. leetcode No.83 删除排序链表中的重复元素
  3. 信息学奥赛一本通C++语言——1097:求阶乘的和
  4. 【python教程入门学习】python值得学吗,怎么自学?
  5. 新手学java还是python知乎_编程初学者应该先学C++、Java还是Python?
  6. OpenGL ES渲染管线与着色器
  7. Facebook语音助手Aloha细节曝光,它的logo竟然是一座小火山?
  8. 【iOS】Touch Drag Inside 和 Touch Drag Outside、Touch Drag Enter、Touch Drag Exit的区别
  9. Eclipse启动无响应,停留在Loading workbench状态
  10. 红帽学习笔记[RHCSA] 第一课[Shell、基础知识]
  11. html如何设置hr 标签的线条粗细,html中hr怎么设置粗细
  12. acme申请泛域名证书
  13. 小刘同学的第一百二十二篇博文
  14. npm设置为淘宝镜像地址
  15. w10计算机无法启动不了怎么办,win10开不开机怎么办_win10电脑无法开机的解决步骤...
  16. ppt不能保存我html,powerpoint无法保存怎么解决
  17. 技术负责人如何搞定老板之我所见
  18. python3 安卓_Android QPython3 调用 其他 Android App
  19. 软件测试经典面试题之二
  20. 麦子金服:互联网金融平台分化加剧,行业头部平台值得选择

热门文章

  1. 如何断开mongodb数据库连接_mongodb关闭数据库实例
  2. mysql convert报错_部署mysql版本项目问题记录
  3. java编译大项目很慢_优化MyEclipse编译速度慢的问题、build、project clean 慢
  4. 通过hashtable实现dic
  5. 5-6pooling层
  6. php 下载后文件打不开,解决php下载excel无法打开的问题
  7. 统计各部门的薪水总和_近年来,统计学热过金融火过计算机,这是真的吗?
  8. Java 算法 格雷码
  9. 查看tsfresh提取(时间)序列特征的变量含义
  10. python调用百度智能云API请求(以自然语言处理——词法分析为例)