前言

我们会看到zk的数据中有一个节点/log_dir_event_notification/,这是一个序列号持久节点
这个节点在kafka中承担的作用是: 当某个Broker上的LogDir出现异常时(比如磁盘损坏,文件读写失败,等等异常): 向zk中谢增一个子节点/log_dir_event_notification/log_dir_event_序列号 ;Controller监听到这个节点的变更之后,会向Brokers们发送LeaderAndIsrRequest请求; 然后做一些副本脱机的善后操作

源码分析

这里说的dirLog是 server.properties中配置的log.dir 例如

副本异常处理

首先我们找到有使用这个节点的源码;
kafka启动之初有调用

ReplicaManager.startup()

  def startup(): Unit = {// 省略...//当inter-broker protocol (IBP) < 1.0的时候,如果存在logDir的一些异常则直接让整个Broker启动失败;val haltBrokerOnFailure = config.interBrokerProtocolVersion < KAFKA_1_0_IV0logDirFailureHandler = new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure)logDirFailureHandler.start()}private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) {override def doWork(): Unit = {//从队列 offlineLogDirQueue 取数据val newOfflineLogDir = logDirFailureChannel.takeNextOfflineLogDir()if (haltBrokerOnDirFailure) {fatal(s"Halting broker because dir $newOfflineLogDir is offline")Exit.halt(1)}handleLogDirFailure(newOfflineLogDir)}}
// logDir should be an absolute path// sendZkNotification is needed for unit testdef handleLogDirFailure(dir: String, sendZkNotification: Boolean = true): Unit = {// 省略...logManager.handleLogDirFailure(dir)if (sendZkNotification)zkClient.propagateLogDirEvent(localBrokerId)warn(s"Stopped serving replicas in dir $dir")}

代码比较长,就直接概况一下好了:
主要是当读取或操作LogDir的时候出现了异常就会执行到这里,有可能是磁盘脱机了,或者文件突然没有读取写入权限等等之类的一些IOException异常;那么 Broker就需要做一些处理;如下

  1. 做个判断inter.broker.protocol.version 协议版本 < 1.0 的时候 时候直接退出;那个时候还不支持单Broker上存在多个logDir;
  2. 副本停止fetche数据
  3. 标记分区下线
  4. 可能移除一些监控信息
  5. 如果当前的log_dir 都脱机(或者异常了), 那么久可以直接shutdown这台机器了
  6. 如果还有其他的log_dir 还有在线的, 那么继续做一些其他的清理操作;
  7. 创建持久序列节点/log_dir_event_notification/log_dir_event_+序列号;数据是 BrokerID;例如:
    /log_dir_event_notification/log_dir_event_0000000003

    {"version":1,"broker":20003,"event":1}
    

PS: log_dir 是可以在一台Broker配置多个路径的 ,用逗号隔开

LogDir发生异常

比如说在 给文件加锁的时候lockLogDirs,磁盘损坏了就抛出异常IOException

  /*** Lock all the given directories*/private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = {dirs.flatMap { dir =>try {val lock = new FileLock(new File(dir, LockFile))if (!lock.tryLock())throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParent +". A Kafka instance in another process or thread is using this directory.")Some(lock)} catch {case e: IOException =>logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while locking directory $dir", e)None}}}def maybeAddOfflineLogDir(logDir: String, msg: => String, e: IOException): Unit = {error(msg, e)if (offlineLogDirs.putIfAbsent(logDir, logDir) == null)offlineLogDirQueue.add(logDir)}

offlineLogDirQueue添加了一个异常队列之后就回到上面的副本异常处理代码了, 上面可是一致在queue.take()

Controller监听zk节点变更

KafkaController.processLogDirEventNotification

  private def processLogDirEventNotification(): Unit = {if (!isActive) returnval sequenceNumbers = zkClient.getAllLogDirEventNotificationstry {val brokerIds = zkClient.getBrokerIdsFromLogDirEvents(sequenceNumbers)//尝试将这台Broker上的所有副本 走一下状态流转 到 OnlineReplicaonBrokerLogDirFailure(brokerIds)} finally {// delete processed childrenzkClient.deleteLogDirEventNotifications(sequenceNumbers, controllerContext.epochZkVersion)}}

主要将从zk节点 /log_dir_event_notification/log_dir_event_序列号 中获取到的数据的Broker上的所有副本进行一个副本状态流转 ->OnlineReplica ;关于状态机的流转请看 【kafka源码】Controller中的状态机

  1. 给所有broker 发送LeaderAndIsrRequest请求,让brokers们去查询他们的副本的状态,如果副本logDir已经离线则返回KAFKA_STORAGE_ERROR异常;
  2. 完事之后会删除节点

源码总结

Q&A

【kafka源码】/log_dir_event_notification的LogDir脱机事件通知相关推荐

  1. 【kafka】kafka /log_dir_event_notification的LogDir脱机事件通知

    1.概述 我们会看到zk的数据中有一个节点/log_dir_event_notification/,这是一个序列号持久节点 这个节点在kafka中承担的作用是: 当某个Broker上的LogDir出现 ...

  2. Kafka源码剖析 —— 网络I/O篇 —— 浅析KafkaSelector

    为什么80%的码农都做不了架构师?>>>    ##NioSelector和KafkaSelector有什么区别? 先说结论,KafkaSelector(org.apache.kaf ...

  3. Kafka 源码分析之网络层(二)

    上一篇介绍了概述和网络层模型实现<Kafka 源码分析之网络层(一)>,本编主要介绍在Processor中使用的nio selector的又一封装,负责具体数据的接收和发送. PS:丰富的 ...

  4. Kafka源码阅读-Controller(二)管理brokers

    上一篇kafka源码(一)correspond to/explain Kafka设计解析(二) 中的3.2.3.3.以前一直用kafka 0.8.2.x,那时候redis开始风靡,hadoop方兴未艾 ...

  5. 刚看完 Kafka 源码,各位随便问!

    Kafka 因其优越的特性广泛用于数据传输.消息中间件的设计.开发和维护等方面,也得到越来越多大厂(阿里.美团.百度.快手等)的青睐,很多 IT 界前辈更是在技术层面不断深挖.最近有位后端三年的朋友在 ...

  6. 跟我学Kafka源码Producer分析

    2019独角兽企业重金招聘Python工程师标准>>> 跟我学Kafka源码Producer分析 博客分类: MQ 本章主要讲解分析Kafka的Producer的业务逻辑,分发逻辑和 ...

  7. 10小时,这回一次搞定 Kafka 源码!

    Kafka 因其优越的特性广泛用于日志收集.用户活动跟踪等方面,也得到越来越多企业的青睐,很多 IT 界前辈更是在技术层面不断深挖.目前,如果你还局限在 Kafka 的基本应用,将很难 cover 住 ...

  8. kafka源码分析之一server启动分析

    0. 关键概念 关键概念 Concepts Function Topic 用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上. Partition 是Kafka中横向扩展和一 ...

  9. 最近看Kafka源码,着实被它的客户端缓冲池技术优雅到了

    最近看kafka源码,着实被它的客户端缓冲池技术优雅到了.忍不住要写篇文章赞美一下(哈哈). 注:本文用到的源码来自kafka2.2.2版本. 背景 当我们应用程序调用kafka客户端 produce ...

最新文章

  1. nginx解析php
  2. vue分页tbale小荔枝
  3. 什么是面向对象(OOP)
  4. 对话框Flags的设置值
  5. 跟我一起写 Makefile(十)
  6. chrome恐龙游戏_如何玩没有互联网的Google Chrome恐龙游戏-在线和离线
  7. 处理后台返回文本带空格和换行页面不显示断句的问题
  8. Mybatis JPA-集成方案+代码解析
  9. 【调参】Cyclic Learning Rates和One Cycle Policy-Keras
  10. php数据表创建命令代码,MySQL创建和删除数据表的命令及语法详解
  11. vivo S9无法激活手机了vivoS9e怎么解锁平台刷机教程屏幕锁不记得了可以用这个方法教程重装系统固件软件S9手机如果已忘记密码可以自己学习升级降级USB操作了
  12. 花生壳5.0 for Linux使用教程
  13. JAVA疯狂讲义 第四版 课后习题 第四章 4.5
  14. “万米网格管理法”助力省城城市管理
  15. mike21 matlab tools,MIKE21学习软件
  16. 再见2022,你好2023
  17. 企业公众号运营见效难,如何突围?
  18. SSL 域名证书 安装指引
  19. 齐岳提供AIE分子N-苄基-4-溴-1, 8-蔡酰亚胺,近红外发射的BODIPY-PhOSi和BODIPY-DMA,超分子聚合物PNA-GBP·I2的合成
  20. doesn‘t have a default value /前端控制台network、console

热门文章

  1. 自定义Paging分页
  2. html里按钮始终在底部,详解footer始终位于网页底部的方法介绍
  3. SpringMVC核心知识的梳理(现在都用SpringBoot了,但是SpringMVC还的学的扎实点,饮水思源)
  4. 一套完整的软件开发流程是怎样的?
  5. 2011年系统架构师考试题详解
  6. 新一代Json解析库Moshi使用及原理解析
  7. 国内渗透测试新神器--北极熊扫描器4.0
  8. 关于ads1255/6的基本使用
  9. C语言之简单的字母大小写转换
  10. ElementUI的Table组件在无数据情况下让“暂无数据”文本居中显示