ControllerContext分析
ControllerContext维护了Controller使用到的上下文信息,而且还会缓存zookeeper一些数据
一 核心字段
controllerChannelManager: ControllerChannelManager: 负责和kafka集群内部Server之间建立channel来进行通信
shuttingDownBrokerIds:mutable.Set[Int] 正在关闭的brokers
epoch: Int KafkaController的年代信息,一般在leader变化后修改,比如当前leader挂了,新的contorller leader被选举,那么这个值就会加1,这样就能够判断哪些是旧的controller leader发送的请求
epochZkVersion: Int controller 年代信息的zk的版本号
allTopics: Set[String] 存放集群中所有的topic
partitionReplicaAssignment:mutable.Map[TopicAndPartition, Seq[Int]] 保存每一个partition的AR集合,一个partition的replica列表称为"assigned replicas"
partitionLeadershipInfo:mutable.Map[TopicAndPartition, LeaderIsrAnd
ControllerEpoch] 保存每一个分区的leader副本所在的brokerId, ISR列表以及controller_epoch等信息
partitionsBeingReassigned:mutable.Map[TopicAndPartition, Reassigned
PartitionsContext] 保存了正在重新分配的副本的分区
partitionsUndergoingPreferredReplicaElection:mutable.Set[TopicAndPa
rtition] 保存了正在进行"优先副本"选举的分区
liveBrokersUnderlying: Set[Broker] 保存了当前可用的broker集合
liveBrokerIdsUnderlying: Set[Int] 保存了当前可用的broker ID 集合
二 重要方法
# 根据提供的broker集合更新liveBrokersUnderlying和liveBrokerIdsUnderlying
def liveBrokers_=(brokers:Set[Broker]) {
liveBrokersUnderlying = brokers
liveBrokerIdsUnderlying = liveBrokersUnderlying.map(_.id)
}
# 从liveBrokersUnderlying和liveBrokerIdsUnderlying排除shuttingDownBrokerIds里相同的broker,这里全是可用的broker def liveBrokers = liveBrokersUnderlying.filter(broker => !shuttingDownBrokerIds.contains(broker.id)) def liveBrokerIds = liveBrokerIdsUnderlying -- shuttingDownBrokerIds
# 表示正在关闭或者可用的broker
def liveOrShuttingDownBrokerIds = liveBrokerIdsUnderlying def liveOrShuttingDownBrokers = liveBrokersUnderlying
# 获取在指定broker上的partitions def partitionsOnBroker(brokerId: Int): Set[TopicAndPartition] = {partitionReplicaAssignment.filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) }.map { case(topicAndPartition, replicas) => topicAndPartition }.toSet } # 获取在指定broker上的PartitionAndReplica def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = {brokerIds.flatMap { brokerId =>partitionReplicaAssignment.filter { case (topicAndPartition, replicas) => replicas.contains(brokerId) }.map { case (topicAndPartition, replicas) =>new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId)}}.toSet } # 获取在指定topic上的PartitionAndReplica def replicasForTopic(topic: String): Set[PartitionAndReplica] = {partitionReplicaAssignment.filter { case (topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }.flatMap { case (topicAndPartition, replicas) =>replicas.map { r =>new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, r)}}.toSet } # 获取在指定topic上的TopicAndPartition def partitionsForTopic(topic: String): collection.Set[TopicAndPartition] = {partitionReplicaAssignment.filter { case(topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }.keySet } # 获取所有存活的PartitionAndReplica def allLiveReplicas(): Set[PartitionAndReplica] = {replicasOnBrokers(liveBrokerIds) } # 据传入的TopicAndPartition集合转换成PartitionAndReplica集合 def replicasForPartition(partitions: collection.Set[TopicAndPartition]): collection.Set[PartitionAndReplica] = {partitions.flatMap { p =>val replicas = partitionReplicaAssignment(p)replicas.map(r => new PartitionAndReplica(p.topic, p.partition, r))} } # 删除topic,并且更新partitionReplicaAssignment,然后在更新alltopics def removeTopic(topic: String) = {partitionLeadershipInfo = partitionLeadershipInfo.filter{ case (topicAndPartition, _) => topicAndPartition.topic != topic }partitionReplicaAssignment = partitionReplicaAssignment.filter{ case (topicAndPartition, _) => topicAndPartition.topic != topic }allTopics -= topic }
ControllerContext分析相关推荐
- kafka源码分析之一server启动分析
0. 关键概念 关键概念 Concepts Function Topic 用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上. Partition 是Kafka中横向扩展和一 ...
- asp.net mvc源码分析-DefaultModelBinder 自定义的普通数据类型的绑定和验证
asp.net mvc源码分析-DefaultModelBinder 自定义的普通数据类型的绑定和验证 原文:asp.net mvc源码分析-DefaultModelBinder 自定义的普通数据类型 ...
- 记一次 .NET 某教育系统 异常崩溃分析
一:背景 1. 讲故事 这篇文章起源于 搬砖队大佬 的精彩文章 WinDBg定位asp.net mvc项目异常崩溃源码位置 ,写的非常好,不过美中不足的是通览全文之后,总觉得有那么一点不过瘾,就是没有 ...
- Asp.net web Api源码分析-HttpParameterBinding
接着上文Asp.net web Api源码分析-Filter 我们提到filter的获取和调用,后面通过HttpActionBinding actionBinding = actionDescript ...
- Kube Controller Manager 源码分析
Kube Controller Manager 源码分析 Controller Manager 在k8s 集群中扮演着中心管理的角色,它负责Deployment, StatefulSet, Repli ...
- asp.net mvc源码分析-Action篇 Action的执行
接着上篇 asp.net mvc源码分析-Action篇 DefaultModelBinder 我们已经获取的了Action的参数,有前面的内容我们知道Action的调用时在ControllerAct ...
- asp.net mvc源码分析-Action篇 DefaultModelBinder
接着上篇 asp.net mvc源码分析-Controller篇 ValueProvider 现在我们来看看ModelBindingContext这个对象. ModelBindingContext b ...
- kube-controller-manager源码分析(三)之 Informer机制
本文个人博客地址:https://www.huweihuang.com/kubernetes-notes/code-analysis/kube-controller-manager/sharedInd ...
- Kafka源码分析10:副本状态机ReplicaStateMachine详解 (图解+秒懂+史上最全)
文章很长,建议收藏起来,慢慢读! Java 高并发 发烧友社群:疯狂创客圈 奉上以下珍贵的学习资源: 免费赠送 经典图书:<Java高并发核心编程(卷1)> 面试必备 + 大厂必备 +涨薪 ...
最新文章
- oracle字段重复新增错误,oracle在已有重复数据的列上创建唯一约束
- nslookup命令反解ip_电脑网络基础知识:ipconfig/all命令及nslookupDns查询命令
- centos 添加中文输入法
- Android_SQLite_升级框架
- OpenCV图像发现轮廓函数findContours()的使用
- yum升级rhel5
- 实现服务器和客户端数据交互,Java Socket有妙招
- THotKey控件 delphi
- Batch Normalization 算法解析
- 软件技术文档编写_如何编写好的软件技术文档
- 各类排序算法思想及计算复杂度
- skyline软件体系及工作流程
- 机器学习-UCI数据集
- SpringMVC+vue实现前后端分离的旅游管理系统
- Flask 推理模型,显存一直增长。
- 利器 | Terminal Shell 改造记录 Windows Terminal + ZSH + Tmux
- 这也敢爬,你离牢饭不远了,爬虫逆向实战案例
- Gartner发布2022年云平台服务技术成熟度曲线,iPaaS、低代码将达到成熟期
- 免费领7天腾讯视频VIP/优酷会员!
- visual studio 2019 + WinDDK 7600.16385.0编写驱动
热门文章
- sublime c 语言 编译,默认情况下,将程序编译为Sublime Text 3中的c 14
- Python机器学习:多项式回归与模型泛化010L1L2和弹性网络
- 大号字代码php,如何用QQ发超大汉字_php
- php 将表情存入数据库,php + mysql 存入表情 【如何轉義emoji表情,讓它可以存入utf8的數據庫】...
- log函数 oracle power_博主营地 | Unity3D 实用技巧 基础数学库函数学习
- qtableview点击行将整行数据传过去_掌握这15个可视化图表,小白也能轻松玩转数据分析...
- 苹果手机怎么编辑word文档_可以一键导入word图文的微信编辑软件有什么?编辑器怎么使用?...
- java 高德地图 车型比价计算_高德地图的高速公路过路费计算功能是如何实现的?有相应开放的API吗?...
- KVM虚拟机添加磁盘空间
- DevEco Studio的下载