Kafka初始化和故障转移
Kafka启动都会创建KafkaController,然后会向zookeeper注册,第一个注册的节点就是Leader,其余都是follower。当KafkaController出现故障,不能继续管理集群,则那些KafkaController follower开始竞争成为新的Leader
KafkaController的启动过程是在startup方法中完成的:
首先:注册一个SessionExpirationListener会话超时监听器,监听KafkaController和zookeeper的连接状态。连接超时后创建新连接时会触发SessionExpirationListener的handleNewSession方法
然后:启动ZookeeperLeaderElector
def startup() = {
inLock(controllerContext.controllerLock) {
info("Controller starting up")
// 注册一个会话超时的监听器
registerSessionExpirationListener()
isRunning = true
// 调用ZookeeperLeaderElector的startup方法
controllerElector.startup
info("Controller startup complete")
}
}
我们先来分析ZookeeperLeaderElector这个类:
一 核心字段
controllerContext:ControllerContext KafkaController上下问信息
electionPath: String 选举的路径
onBecomingLeader:需要传递一个controller 故障转移函数
onResigningAsLeader:需要传递一个后续做一些清理工作的函数
leaderId:Int 缓存当前controller leader的id
leaderChangeListener:LeaderChangeListener 监听'/controller'节点数据变化,当该节点保存的leaderId发生变化时,会触发LeaderChangeListner进行对应处理
二 重要方法
2.1 startup方法
def startup {inLock(controllerContext.controllerLock) {// 监听'/controller'节点数据的变化controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)// 开始选举elect} }
2.2 elect 选举
def elect: Boolean = {val timestamp = SystemTime.milliseconds.toStringval electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))// 获取zookeeper当前记录的controller leader idleaderId = getControllerID// 如果已存在,表示该broker已经是leader,返回true,不需要选举if(leaderId != -1) {debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))return amILeader}try {// 创建临时节点,并进行session检查等,如果临时节点已存在抛出异常val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath, electString,controllerContext.zkUtils.zkConnection.getZookeeper, JaasUtils.isZkSecurityEnabled())zkCheckedEphemeral.create()info(brokerId + " successfully elected as leader")// 更新leader id字段leaderId = brokerId// 如果成功,调用故障转移函数onBecomingLeader()} catch {case e: ZkNodeExistsException =>leaderId = getControllerIDif (leaderId != -1)debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))elsewarn("A leader has been elected but just resigned, this will result in another round of election")case e2: Throwable =>error("Error while electing or becoming leader on broker %d".format(brokerId), e2)resign()}amILeader }
在这里我们知道,它会调用onBecomingLeader也就是KafkaController里的onControllerFailover方法
def onControllerFailover() {if(isRunning) {info("Broker %d starting become controller state transition".format(config.brokerId))// 从zookeeper读取controller的年代信息,从"/controller_epoch"路径获取readControllerEpochFromZookeeper()// 递增ControllerEpoch值并且写入zookeeperincrementControllerEpoch(zkUtils.zkClient)// 注册ReassignedPartitionsListener监听器registerReassignedPartitionsListener()// 注册IsrChangeNotificationListenerregisterIsrChangeNotificationListener()// 注册优先副本选举监听器registerPreferredReplicaElectionListener()// 注册TopicChangeListner和,DeleteChangeListnerpartitionStateMachine.registerListeners()// 注册BrokerChangeListenerreplicaStateMachine.registerListeners()// 实例化ControllerContext,从zookeeper读取topic,partition,replica等信息initializeControllerContext()// 启动replica 状态机,初始化所有的replica状态replicaStateMachine.startup()// 启动Partition状态机,初始化所有的partition状态partitionStateMachine.startup()// 为所有存在的topic注册PartitionModificationListenercontrollerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))// 处理副本重新分配分区maybeTriggerPartitionReassignment()// 处理"优先副本"选举的分区maybeTriggerPreferredReplicaElection()// 向集群中其他broker发送 UpdateMetadataRequest请求sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)// 根据配置决定是否开启分区的自定均衡功能if (config.autoLeaderRebalanceEnable) {info("starting the partition rebalance scheduler")autoRebalanceScheduler.startup()autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)}// 调用TopicDeletionManager,底层启动DeleteTopicThreaddeleteTopicManager.start()}elseinfo("Controller has been shut down, aborting startup/failover") }
接下来分析一下LeaderChangeListener:
当存在zk的leader信息改变,则调用这个函数,并且在内存记录新的leader
def handleDataChange(dataPath: String, data: Object) {inLock(controllerContext.controllerLock) {val amILeaderBeforeDataChange = amILeader// 记录新的leader的idleaderId = KafkaController.parseControllerId(data.toString)info("New leader is %d".format(leaderId))// 旧的leader需要辞职,即从Controller Leader 变为Followerif (amILeaderBeforeDataChange && !amILeader)onResigningAsLeader()} }
Controller Leader 变为Follower 需要做一些清理工作:
def onControllerResignation() {debug("Controller resigning, broker id %d".format(config.brokerId))// 撤销zookeeper上的注册的监视器deregisterIsrChangeNotificationListener()deregisterReassignedPartitionsListener()deregisterPreferredReplicaElectionListener()// 关闭TopicDeletionManagerif (deleteTopicManager != null)deleteTopicManager.shutdown()//关闭 leader rebalance定时任务if (config.autoLeaderRebalanceEnable)autoRebalanceScheduler.shutdown()inLock(controllerContext.controllerLock) {// 取消所有的ReassignedPartitionsIsrChangeListnerderegisterReassignedPartitionsIsrChangeListeners()// 关闭partition&replica状态机partitionStateMachine.shutdown()replicaStateMachine.shutdown()// 关闭ControllerChannelManager,断开与其他broker的连接if(controllerContext.controllerChannelManager != null) {controllerContext.controllerChannelManager.shutdown()controllerContext.controllerChannelManager = null}// 重置ControllerContext,切换broker状态controllerContext.epoch=0controllerContext.epochZkVersion=0brokerState.newState(RunningAsBroker)info("Broker %d resigned as the controller".format(config.brokerId))} }
当"/controller"节点中的数据被删除时会触发handleDataDeleted方法进行处理
def handleDataDeleted(dataPath: String) {inLock(controllerContext.controllerLock) {debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader".format(brokerId, dataPath))if(amILeader)onResigningAsLeader()elect // 重新选举} }
Kafka初始化和故障转移相关推荐
- 06篇 Nacos Client本地缓存及故障转移
学习不用那么功利,二师兄带你从更高维度轻松阅读源码- 本篇文章我们来通过源码分析一下Nacos的本地缓存及故障转移功能,涉及到核心类为ServiceInfoHolder和FailoverReactor ...
- Kafka主题体系架构-复制、故障转移和并行处理
本文讨论了Kafka主题的体系架构,讨论了如何将分区用于故障转移和并行处理. Kafka主题,日志和分区 Kafka将主题存储在日志中.主题日志分为多个分区.Kafka将日志的分区分布在多个服务器或磁 ...
- SQL故障转移集群操作方法
SQL故障转移集群操作方法 1 给SQL服务器配置IP地址,每台服务器需要两个IP,一个通讯用,一个作为心跳线,修改计算机的名称,关闭服务器的防火墙,开启远程桌面. 2心跳网卡配置 去掉ipv6,并去 ...
- 搭建Windows Server 2008故障转移群集
本文章将详细讲述基于Windows Server 2008 R2的故障转移群集的实现,包括基本知识介绍,故障转移群集环境准备,实施过程,群集的维护等. 故障转移群集可以配置使用多种不同的配置.组成群集 ...
- SQL Server 2005故障转移群集
SQL Server 2005故障转移群集 SQL Server使用最广的高可用性技术叫做故障转移群集.SQL Server故障转移群集是一项基于Windows故障转移群集的一种技术.SQL Serv ...
- sql server 2008 故障转移群集
数据库群集的分类: (1)主动/被动群集(常用模式) 布署简单.比较安全.应用广泛 .资源利用率低 (2)主动/主动群集 没有闲置节点,资源利用率高.安全性差,争抢资源 (3)N+1群集(较好模式) ...
- 在Windows Server 2012 R2中搭建SQL Server 2012故障转移集群
需要说明的是我们搭建的SQL Server故障转移集群(SQL Server Failover Cluster)是可用性集群,而不是负载均衡集群,其目的是为了保证服务的连续性和可用性,而不是为了提高服 ...
- MariaDB数据库介绍三、MHA(Master HA)实现主节点故障转移
一.MHA MHA是开源的MySQL的高可用程序,它为MySQL的主从复制架构提供了主节点故障自动转移的功能,它会监控master节点故障的时候,会提升其中的拥有最新数据的slave节点称为新的mas ...
- windows server2008R2故障转移群集
1 首先规划下自己的环境 都是在VMWARE里进行 服务器AA Public IP 192.168.1.120 心跳Private IP 10.0.0.10 服务器BB Public IP 192. ...
最新文章
- 如何运行ruby代码
- leetcode No.83 删除排序链表中的重复元素
- 信息学奥赛一本通C++语言——1097:求阶乘的和
- 【python教程入门学习】python值得学吗,怎么自学?
- 新手学java还是python知乎_编程初学者应该先学C++、Java还是Python?
- OpenGL ES渲染管线与着色器
- Facebook语音助手Aloha细节曝光,它的logo竟然是一座小火山?
- 【iOS】Touch Drag Inside 和 Touch Drag Outside、Touch Drag Enter、Touch Drag Exit的区别
- Eclipse启动无响应,停留在Loading workbench状态
- 红帽学习笔记[RHCSA] 第一课[Shell、基础知识]
- html如何设置hr 标签的线条粗细,html中hr怎么设置粗细
- acme申请泛域名证书
- 小刘同学的第一百二十二篇博文
- npm设置为淘宝镜像地址
- w10计算机无法启动不了怎么办,win10开不开机怎么办_win10电脑无法开机的解决步骤...
- ppt不能保存我html,powerpoint无法保存怎么解决
- 技术负责人如何搞定老板之我所见
- python3 安卓_Android QPython3 调用 其他 Android App
- 软件测试经典面试题之二
- 麦子金服:互联网金融平台分化加剧,行业头部平台值得选择
热门文章
- 如何断开mongodb数据库连接_mongodb关闭数据库实例
- mysql convert报错_部署mysql版本项目问题记录
- java编译大项目很慢_优化MyEclipse编译速度慢的问题、build、project clean 慢
- 通过hashtable实现dic
- 5-6pooling层
- php 下载后文件打不开,解决php下载excel无法打开的问题
- 统计各部门的薪水总和_近年来,统计学热过金融火过计算机,这是真的吗?
- Java 算法 格雷码
- 查看tsfresh提取(时间)序列特征的变量含义
- python调用百度智能云API请求(以自然语言处理——词法分析为例)