1.概述

2.源码分析

Broker启动加载动态配置

KafkaServer.startup

启动加载动态配置总流程

2.1 动态配置初始化

config.dynamicConfig.initialize(zkClient)
  1. 构造当前配置文件 currentConfig, 然后从zk中获取节点 /config/brokers/<default>信息,然后更新配置updateDefaultConfig; (动态默认配置覆盖静态配置)

  2. 从节点/config/brokers/{当前BrokerId}获取配置, 如果配置中有ConfigType=PASSWORD的配置(例如ssl.keystore.password)存在,接着判断 是否存在password.encoder.old.secret 配置,(这个配置是用来加解密ConfigType=PASSWORD的旧的秘钥),尝试用旧秘钥解密秘钥; 然后将这些配置重新加密回写入/config/brokers/{当前BrokerId} ; 然后返回配置 (这里主要是动态配置里面有密码类型配置的时候需要做一次解密加密处理)

  3. 将上面得到的配置(password类型修改之后) 更新内存总的配置;优先级 静态配置<动态默认配置<指定动态配置

2.2 注册可变更配置监听器

如果有对应的配置变更了,那么相应的监听器就会收到通知去修改自己相应的配置;

 Mx4jLoader.maybeLoad()/* Add all reconfigurables for config change notification before starting config handlers */config.dynamicConfig.addReconfigurables(this)

DynamicBrokerConfig.addReconfigurables

/*** Add reconfigurables to be notified when a dynamic broker config is updated.** `Reconfigurable` is the public API used by configurable plugins like metrics reporter* and quota callbacks. These are reconfigured before `KafkaConfig` is updated so that* the update can be aborted if `reconfigure()` fails with an exception.** `BrokerReconfigurable` is used for internal reconfigurable classes. These are* reconfigured after `KafkaConfig` is updated so that they can access `KafkaConfig`* directly. They are provided both old and new configs.** 添加要在更新动态代理配置时通知的可重构项。** “可重构”是可配置插件(如metrics reporter和配额回调)使用的公共API。这些在更新“KafkaConfig”之前重新配置,* 以便在“reconfigure()”异常失败时可以中止更新。** ' brokerreconfiguration '用于内部可重构类。在更新“KafkaConfig”之后,这些文件将重新配置,* 以便它们可以直接访问“KafkaConfig”。它们同时提供新旧配置。*/def addReconfigurables(kafkaServer: KafkaServer): Unit = {addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer))addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))if (kafkaServer.logManager.cleaner != null)addBrokerReconfigurable(kafkaServer.logManager.cleaner)addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer))addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))addBrokerReconfigurable(kafkaServer.socketServer)}

2.3. 动态配置启动监听

// Create the config manager. start listening to notifications
dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
dynamicConfigManager.startup()
  1. 注册节点处理器change-notification-/config/changes = stateChangeHandler

  2. 注册节点处理器/config/changes = zNodeChildChangeHandler

  3. 获取/config/changes 所有子节点看看有哪些变更

  4. 遍历所有节点并截取节点的编号, 判断一下是不是大于上一次执行过变更的节点ID lastExecutedChange(启动的时候是-1)

  5. 上个条件满足的话,则执行通知操作;不同entity执行的操作不一样,具体请看下面每个类型

  6. 更新lastExecutedChange

  7. 清除过期的通知节点, 默认过期时间15 * 60 * 1000(15分钟) 就是删除/config/changes /下面的过期节点

2.4 加载Topic动态配置

  1. 获取节点的data数据, 如果获取到了则执行通知流程notificationHandler.processNotification(d),处理器是ConfigChangedNotificationHandler; 它先解析节点的json数据,根据版本信息不同调用不同的处理方法; 下面是version=2的处理方式;

  2. 根据json数据可以得到 entityTypeentityName; 那么久可以去对应的zk数据里面getData获取数据; 并且将获取到的数据Decode成Properties对象entityConfig;

  3. 将key为下图中的属性 隐藏掉; 替换成value: [hidden]


调用EntityHandler; 这里是TopicConfigHandler.processConfigChanges来进行处理,方法里面再看看流程 ->

从动态配置entityConfig里面获取message.format.version配置消息格式版本号; 如果当前Broker的版本inter.broker.protocol.version 小于message.format.version配置; 则将message.format.version配置 排除掉

调用TopicConfigHandler.updateLogConfig 来更新指定Topic的所有TopicPartition的配置,其实是将TP正在加载或初始化的状态标记为没有完成初始化,这将会在后续过程中促成TP重新加载并初始化

将动态配置和并覆盖Server的默认配置为新的 newConfig, 然后根据Topic获取对应的Logs对象; 遍历Logs去更新newConfig;并尝试执行 initializeLeaderEpochCache; (需要注意的是:这里的动态配置不是支持所有的配置参数,请看【kafka运维】Kafka全网最全最详细运维命令合集(精品强烈建议收藏!!!)的附件部分)

当然特殊配置如leader.replication.throttled.replicas,follower.replication.throttled.replicas这两个限流相关;解析配置之后,然后通过quotaManager.markThrottled/quotaManager.removeThrottle更新/移除对应的限流分区集合

如果动态配置了unclean.leader.election.enable=true(允许非同步副本选主 );那么就会执行TopicUncleanLeaderElectionEnable方法来让它改变选举策略(前提是当前Broker是Controller角色)

2.5 加载Broker动态配置

BrokerConfigHandler.processConfigChanges

假设我们配置了默认配置; zk里面的节点是<default>

sh bin/kafka-configs.sh –bootstrap-server xxxxx:9090 –alter –entity-type brokers --entity-default –add-config log.segment.bytes=88888888


从zk节点/config/changes 里面获取变更节点的json数据.然后去对应的 /config/{entityType}/{entituName}获取对应的数据

如果是<default>节点,说明有配置动态默认配置; 则按照 静态配置<动态默认配置<动态指定配置 的顺序重新加载覆盖一下;

如果 新旧配置有变更(有可能执行了一次命令但是参数并没有变化的情况,修改了个寂寞)的情况下 才会做更新的; 并且 通知到所有的 BrokerReconfigurable;

如果是指定BrokerId, 则除了上面2重新加载覆盖之外, 相关限流 配置leader.replication.throttled.rate、follower.replication.throttled.rate、replica.alter.log.dirs.io.max.bytes.per.second 都会被更新一下quotaManagers.leader/leader/alterLogDirs.updateQuota ;如果这些配置没有配置的话,则用 Long.MaxValue(相当于是不限流)来更新

2.6 查询动态配置 流程 --describe

  1. 简单检验
  2. 根据类型查询entities ; type是topics就获取所有topic; type是broker|broker-loggers则查询所有Broker节点
  3. 遍历entities获取配置 ;做些简单校验;然后想Broker发起describeConfigs请求; 节点策略是LeastLoadedNodeProvider
    节点调用方法 KafkaApis.handleDescribeConfigsRequest

    1. 未经授权配置不查询
    2. 经过授权的配置开始查询 ;
    3. 当查询的是topics时, 去zk节点/confgi/类型/类型名 ,获取到动态配置数据之后, 然后将其覆盖本地跟Log相关的静态配置, 完事之后组装一下返回;(1.数据为空过滤2.敏感数据设置value=null; ConfigType=PASSWORD和不知道类型是啥的都是敏感数据 3. 组装所有的同义配置(静态默认配置、本地静态、默认动态配置、指定动态配置、等等多个配置))
      返回的数据类型如下

xxxx


如果有broker|broker-loggers节点, 则在 获取到数据之后 然后指定nodeId节点发起 describeBrokerConfigs请求

如果查询的是brokers


如果查询的是 broker-loggers

2.7 新增/修改/删除/动态配置 的流程

1. 发起请求

  1. 查询当前的类型配置; 这里的查询 跟上面的--describe流程是一样的
  2. 相关校验;如果有delete-config配置, 需要校验一下当前配置有没有;如果没有抛出异常;
  3. 计算出需要变更的配置之后, 发起请求incrementalAlterConfigs;如果请求类型是 brokers/broker-loggers 则发起请求的接收方是 指定的Broker 节点; 否则就是LeastLoadedNodeProvider (当前负载最少的节点)
  1. incrementalAlterConfigs 增量修改配置

KafkaApis.handleIncrementalAlterConfigsRequest

  1. 通过请求参数解析 配置 configs
  2. 过滤一下未授权的配置
  3. 如果配置中有重复的项则抛出异常

Topic配置

  1. 获取节点 /config/topics/{topicName} 中的配置数据;
  2. 然后根据请求参数的属性 ,组装好变更后的配置是什么样的 configs
  3. 简单校验一下, 并且支持自定义校验,如果有 alter.config.policy.class.name= 配置(默认null)的话,则会实例化指定的类(需要继承 AlterConfigPolicy类);并调用他的 validate方法来校验;
  4. 调用写入zk配置的接口, 将动态配置重新写入(SetDataRequest)到接口 /config/topics/{topicName}中;
  5. 创建并写入配置变更记录顺序节点 /config/changes/config_change_序列号 中; 这个节点主要是让Broker们来监听这个节点的来了解到哪个配置有变更的;

其他的类型都一样
省略

2.8 Broker监听/config/changes的变更

在 1. Broker启动加载动态配置 中我们了解到有对节点/config/change注册一个子节点变更的监听处理器


那么对动态配置做出修改之后, 这个节点就会新增一条数据,那么所有的Broker都会收到这个通知;

所以我们就要来看一看收到通知之后又做了哪些事情

这个流程是又回到了上面的 1. 2 加载Topics/Brokers动态配置 的流程中了;

3.源码总结

原理部分讲解比较详细的可以看 : Kafka动态配置实现原理解析 - 李志涛 - 博客园



如果我想在我的项目中获取kafka的所有配置该怎么办?
启动的时候加载一次所有Broker的配置
监听节点/config/change节点的变化
是否可以直接在zk中写入动态配置?
不可以,因为Broker是监听 /config/changes/里面的Broker节点,来实时得知有数据变更;

为什么不直接监听 /config/下面的配置?
没有必要,这样监听的数据数据太多了,而且 你不知道具体是改了哪个配置,所以每次都要全部更新一遍,无缘无故的加重负担了, 用/config/change 节点来得知哪个类型的数据变更, 只变更这个相关数据就可以了

M.扩展

Kafka中的动态配置源码分析

【kafka】Kafka中的动态配置源码分析相关推荐

  1. suricata中DPDK收发包源码分析2

    <suricata中DPDK收发包源码分析1>中分析了整体的DPDK收发包框架代码,今天我们继续来深入了解一下一些细节方面的问题. 目录 Q1:收发包线程模式在代码中是怎样确定的? Q2: ...

  2. Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现(1)

    一.综述 HDFS写文件是整个Hadoop中最为复杂的流程之一,它涉及到HDFS中NameNode.DataNode.DFSClient等众多角色的分工与合作. 首先上一段代码,客户端是如何写文件的: ...

  3. MapReduce中Client提交Job源码分析

    回顾 在进行submit源码分析之前,先来回顾一下WordCount案例(点击查看WordCount案例).仔细回想一下曾经Client都干了点啥?获取对象-->一通set-->job.w ...

  4. isomorphic-style-loader在前后端渲染样式同构中的应用与源码分析

    前言 在笔者的上一篇文章(基于react的前后端渲染实例讲解)中,对react下前后端渲染的流程进行了一个介绍.对前后端渲染的相关概念和原理不太了解的可以移步那篇文章.同时留下了一个引子:如何优雅地实 ...

  5. java中jcl,spring-jcl 日志源码分析

    1.spring-jcl介绍 JCL全称:Jakarta Commons Logging spring-jcl 采用了设计模式中的"适配器模式",它对外提供统一的接口,然后在适配类 ...

  6. java中Mark接口_JVM源码分析之Java对象头实现

    原标题:JVM源码分析之Java对象头实现 原创申明:本文由公众号[猿灯塔]原创,转载请说明出处标注 "365篇原创计划"第十一篇. 今天呢!灯塔君跟大家讲: JVM源码分析之Ja ...

  7. ThreadPoolExecutor的应用和实现分析(中)—— 任务处理相关源码分析 线程利用(转)...

    前面一篇文章从Executors中的工厂方法入手,已经对ThreadPoolExecutor的构造和使用做了一些整理.而这篇文章,我们将接着前面的介绍,从源码实现上对ThreadPoolExecuto ...

  8. Hhadoop-2.7.0中HDFS写文件源码分析

    转载自:http://blog.csdn.net/lipeng_bigdata/article/details/53738376 一.综述 HDFS写文件是整个Hadoop中最为复杂的流程之一,它涉及 ...

  9. Nacos的动态配置源码解析

    文章目录 1. 如何使用 2. 原理详解 2.1 采用延迟线程池定时执行"监听"文件是否有修改 2.2 通过长轮询的方式获得修改过的文件及其内容 2.3 拿到配置后通过applic ...

最新文章

  1. Ubuntu恢复默认的字体
  2. BeeHive模块注册
  3. Php在线字体woff转svg,在线字体格式转换ttf/otf/eot/woff/woff2格式工具
  4. 2019牛客暑期多校训练营(第八场)
  5. 记某CMS漏洞getshell
  6. 如何使用Photoshop制作真实的尺子
  7. 判断字符串出栈合法性
  8. 计算机如何用vb文本加密,信息加密与隐藏工具的设计与实现VB231
  9. 刚学计算机先学什么好,计算机语言入门先学什么?
  10. FFmepg 多线程解码历程
  11. 记录hadoop HDFS与hadoop YARN无法访问问题
  12. DataGuard常用命令及DG主备库开关顺序
  13. .NET Core Web API:您需要了解的最少知识(第2部分,共2部分)
  14. 李飞飞重返祖国执掌Google AI中国团队:不忘初心,中国已觉醒
  15. 802.11的CSMA/CA机制
  16. Echarts 地图绘制
  17. 如何使用Wondershare PDFelement制作PDF文件
  18. java中面向对象6_Java面向对象
  19. html 超链接下载文件问题 如何修改文件名称
  20. 用当下的力量共创未来——2017(第十五届)中国互联网经济论坛盛典

热门文章

  1. iPhone 14 Pro渲染图曝光 网友:冲这个外观就可以买了
  2. 刚刚,“国民”APP微信崩了!官方致歉:已经逐步恢复
  3. 苹果MacBook Air 2022款也将有刘海屏设计
  4. iPad mini 6外形巨变:升级全面屏 还要砍掉Home键
  5. 原价19万的美系插混,2年后落地只要11万~15万,微蓝6 PHEV为啥这么惨
  6. 黄光裕回应与京东、拼多多竞争:谁也灭不了谁 不排除合作的可能性
  7. 交钱赎“人”!B站500万粉UP主被黑客勒索,腾讯都表示无解
  8. 陆正耀为瑞幸数据造假道歉:非常自责 会全力挽回损失
  9. 箭在弦上!雷军暗示:小米10系列有望2月3日开启预热
  10. 蔚来上线三款硬货:更大电池包、全新EC6、改款ES8