ControllerContext维护了Controller使用到的上下文信息,而且还会缓存zookeeper一些数据

一 核心字段

controllerChannelManager: ControllerChannelManager: 负责和kafka集群内部Server之间建立channel来进行通信

shuttingDownBrokerIds:mutable.Set[Int] 正在关闭的brokers

epoch: Int KafkaController的年代信息,一般在leader变化后修改,比如当前leader挂了,新的contorller leader被选举,那么这个值就会加1,这样就能够判断哪些是旧的controller leader发送的请求

epochZkVersion: Int controller 年代信息的zk的版本号

allTopics: Set[String] 存放集群中所有的topic

partitionReplicaAssignment:mutable.Map[TopicAndPartition, Seq[Int]] 保存每一个partition的AR集合,一个partition的replica列表称为"assigned replicas"

partitionLeadershipInfo:mutable.Map[TopicAndPartition, LeaderIsrAnd

ControllerEpoch] 保存每一个分区的leader副本所在的brokerId, ISR列表以及controller_epoch等信息

partitionsBeingReassigned:mutable.Map[TopicAndPartition, Reassigned

PartitionsContext] 保存了正在重新分配的副本的分区

partitionsUndergoingPreferredReplicaElection:mutable.Set[TopicAndPa

rtition] 保存了正在进行"优先副本"选举的分区

liveBrokersUnderlying: Set[Broker] 保存了当前可用的broker集合

liveBrokerIdsUnderlying: Set[Int] 保存了当前可用的broker ID 集合

二 重要方法

# 根据提供的broker集合更新liveBrokersUnderlying和liveBrokerIdsUnderlying
def liveBrokers_=(brokers:Set[Broker]) {
  liveBrokersUnderlying = brokers
  liveBrokerIdsUnderlying = liveBrokersUnderlying.map(_.id)
}

# 从liveBrokersUnderlying和liveBrokerIdsUnderlying排除shuttingDownBrokerIds里相同的broker,这里全是可用的broker
def liveBrokers = liveBrokersUnderlying.filter(broker => !shuttingDownBrokerIds.contains(broker.id))
def liveBrokerIds = liveBrokerIdsUnderlying -- shuttingDownBrokerIds

# 表示正在关闭或者可用的broker

def liveOrShuttingDownBrokerIds = liveBrokerIdsUnderlying
def liveOrShuttingDownBrokers = liveBrokersUnderlying
# 获取在指定broker上的partitions
def partitionsOnBroker(brokerId: Int): Set[TopicAndPartition] = {partitionReplicaAssignment.filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) }.map { case(topicAndPartition, replicas) => topicAndPartition }.toSet
}
# 获取在指定broker上的PartitionAndReplica
def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = {brokerIds.flatMap { brokerId =>partitionReplicaAssignment.filter { case (topicAndPartition, replicas) => replicas.contains(brokerId) }.map { case (topicAndPartition, replicas) =>new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId)}}.toSet
}
# 获取在指定topic上的PartitionAndReplica
def replicasForTopic(topic: String): Set[PartitionAndReplica] = {partitionReplicaAssignment.filter { case (topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }.flatMap { case (topicAndPartition, replicas) =>replicas.map { r =>new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, r)}}.toSet
}
# 获取在指定topic上的TopicAndPartition
def partitionsForTopic(topic: String): collection.Set[TopicAndPartition] = {partitionReplicaAssignment.filter { case(topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }.keySet
}
# 获取所有存活的PartitionAndReplica
def allLiveReplicas(): Set[PartitionAndReplica] = {replicasOnBrokers(liveBrokerIds)
}
# 据传入的TopicAndPartition集合转换成PartitionAndReplica集合
def replicasForPartition(partitions: collection.Set[TopicAndPartition]): collection.Set[PartitionAndReplica] = {partitions.flatMap { p =>val replicas = partitionReplicaAssignment(p)replicas.map(r => new PartitionAndReplica(p.topic, p.partition, r))}
}
# 删除topic,并且更新partitionReplicaAssignment,然后在更新alltopics
def removeTopic(topic: String) = {partitionLeadershipInfo = partitionLeadershipInfo.filter{ case (topicAndPartition, _) => topicAndPartition.topic != topic }partitionReplicaAssignment = partitionReplicaAssignment.filter{ case (topicAndPartition, _) => topicAndPartition.topic != topic }allTopics -= topic
}

ControllerContext分析相关推荐

  1. kafka源码分析之一server启动分析

    0. 关键概念 关键概念 Concepts Function Topic 用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上. Partition 是Kafka中横向扩展和一 ...

  2. asp.net mvc源码分析-DefaultModelBinder 自定义的普通数据类型的绑定和验证

    asp.net mvc源码分析-DefaultModelBinder 自定义的普通数据类型的绑定和验证 原文:asp.net mvc源码分析-DefaultModelBinder 自定义的普通数据类型 ...

  3. 记一次 .NET 某教育系统 异常崩溃分析

    一:背景 1. 讲故事 这篇文章起源于 搬砖队大佬 的精彩文章 WinDBg定位asp.net mvc项目异常崩溃源码位置 ,写的非常好,不过美中不足的是通览全文之后,总觉得有那么一点不过瘾,就是没有 ...

  4. Asp.net web Api源码分析-HttpParameterBinding

    接着上文Asp.net web Api源码分析-Filter 我们提到filter的获取和调用,后面通过HttpActionBinding actionBinding = actionDescript ...

  5. Kube Controller Manager 源码分析

    Kube Controller Manager 源码分析 Controller Manager 在k8s 集群中扮演着中心管理的角色,它负责Deployment, StatefulSet, Repli ...

  6. asp.net mvc源码分析-Action篇 Action的执行

    接着上篇 asp.net mvc源码分析-Action篇 DefaultModelBinder 我们已经获取的了Action的参数,有前面的内容我们知道Action的调用时在ControllerAct ...

  7. asp.net mvc源码分析-Action篇 DefaultModelBinder

    接着上篇 asp.net mvc源码分析-Controller篇 ValueProvider 现在我们来看看ModelBindingContext这个对象. ModelBindingContext b ...

  8. kube-controller-manager源码分析(三)之 Informer机制

    本文个人博客地址:https://www.huweihuang.com/kubernetes-notes/code-analysis/kube-controller-manager/sharedInd ...

  9. Kafka源码分析10:副本状态机ReplicaStateMachine详解 (图解+秒懂+史上最全)

    文章很长,建议收藏起来,慢慢读! Java 高并发 发烧友社群:疯狂创客圈 奉上以下珍贵的学习资源: 免费赠送 经典图书:<Java高并发核心编程(卷1)> 面试必备 + 大厂必备 +涨薪 ...

最新文章

  1. oracle字段重复新增错误,oracle在已有重复数据的列上创建唯一约束
  2. nslookup命令反解ip_电脑网络基础知识:ipconfig/all命令及nslookupDns查询命令
  3. centos 添加中文输入法
  4. Android_SQLite_升级框架
  5. OpenCV图像发现轮廓函数findContours()的使用
  6. yum升级rhel5
  7. 实现服务器和客户端数据交互,Java Socket有妙招
  8. THotKey控件 delphi
  9. Batch Normalization 算法解析
  10. 软件技术文档编写_如何编写好的软件技术文档
  11. 各类排序算法思想及计算复杂度
  12. skyline软件体系及工作流程
  13. 机器学习-UCI数据集
  14. SpringMVC+vue实现前后端分离的旅游管理系统
  15. Flask 推理模型,显存一直增长。
  16. 利器 | Terminal Shell 改造记录 Windows Terminal + ZSH + Tmux
  17. 这也敢爬,你离牢饭不远了,爬虫逆向实战案例
  18. Gartner发布2022年云平台服务技术成熟度曲线,iPaaS、低代码将达到成熟期
  19. 免费领7天腾讯视频VIP/优酷会员!
  20. visual studio 2019 + WinDDK 7600.16385.0编写驱动

热门文章

  1. sublime c 语言 编译,默认情况下,将程序编译为Sublime Text 3中的c 14
  2. Python机器学习:多项式回归与模型泛化010L1L2和弹性网络
  3. 大号字代码php,如何用QQ发超大汉字_php
  4. php 将表情存入数据库,php + mysql 存入表情 【如何轉義emoji表情,讓它可以存入utf8的數據庫】...
  5. log函数 oracle power_博主营地 | Unity3D 实用技巧 基础数学库函数学习
  6. qtableview点击行将整行数据传过去_掌握这15个可视化图表,小白也能轻松玩转数据分析...
  7. 苹果手机怎么编辑word文档_可以一键导入word图文的微信编辑软件有什么?编辑器怎么使用?...
  8. java 高德地图 车型比价计算_高德地图的高速公路过路费计算功能是如何实现的?有相应开放的API吗?...
  9. KVM虚拟机添加磁盘空间
  10. DevEco Studio的下载