1

Kafka 日志结构

kafka 日志在磁盘上的组织架构如下:

Kafka 日志对象由多个日志段对象组成,每个日志段对象在磁盘上创建一组文件,包括:

  1. 日志文件(.log)

  2. 索引文件(.index)

  3. 时间戳索引文件(.timeindex)

  4. 已中止(Aborted)事务的索引文件(.txnindex)

当然,如果你没有使用kafka事务,已中止事务的索引文件是不会被创建出来

图中一串数字0是该日志段的起始位置(Base Offset),也就是该日志段中所存的第一条消息的位移值

一个kafka主题有很多分区,每个分区就对应一个log对象,在物理磁盘上对应一个目录。

如:你创建一个双分区的主题 test-topic,那么,kafka在磁盘上会创建两个子目录:test-topic-0 和 test-topic-1 。在服务器端,就是两个Log对象。每个子目录下存在多组日志段,也就是多组.log、.index、.timeindex 文件组合,只不过文件名不同,因为每个日志段的起始位置不同

2

日志段源码

日志段源码位于 Kafka 的 core 工程下,具体文件位置是 core/src/main/scala/kafka/log/LogSegment.scala。实际上,所有日志结构部分的源码都在 core 的 kafka.log 包下。

该文件下定义了三个 Scala 对象:

LogFlushStats 结尾有个 Stats,它是做统计用的,主要负责日志落盘进行计时

我们主要关心的是 LogSegment class 和 object。

/** * A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in * any previous segment. * * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file. * * @param log The file records containing log entries * @param lazyOffsetIndex The offset index * @param lazyTimeIndex The timestamp index * @param txnIndex The transaction index * @param baseOffset A lower bound on the offsets in this segment * @param indexIntervalBytes The approximate number of bytes between entries in the index * @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time * @param time The time instance */
LogSegment 伴生类注释 说明了每个日志段由两个核心组件构成:日志和索引。这里的索引

这里的索引指索引文件。每个日志段都有一个起始位移值(Base Offset)<= 在这个日志段中每个消息的最小的位移值 并且 > 前面任何日志段中消息的任何位移

3

日志段声明

 /* @param log The file records containing log entries * @param lazyOffsetIndex The offset index * @param lazyTimeIndex The timestamp index * @param txnIndex The transaction index * @param baseOffset A lower bound on the offsets in this segment * @param indexIntervalBytes The approximate number of bytes between entries in the index * @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time * @param time The time instance */@nonthreadsafeclass LogSegment private[log] (val log: FileRecords,                               val lazyOffsetIndex: LazyIndex[OffsetIndex],                               val lazyTimeIndex: LazyIndex[TimeIndex],                               val txnIndex: TransactionIndex,                               val baseOffset: Long,                               val indexIntervalBytes: Int,                               val rollJitterMs: Long,                               val time: Time) extends Logging {..}
  • FileRecords:保存Kafka消息对象

  • lazyOffsetIndex:位移索引文件

  • lazyTimeIndex :时间戳索引文件

  • txnIndex :已中止事务索引文件

  • baseOffset:每个日志段保存自己的起始位置(每个LogSegment对象一旦被创建,它的起始位移就固定了,不能再被更改)

  • indexIntervalBytes:其实就是Broker端参数 log.index.interval.bytes值。它控制日志段对象新增索引项的频率。默认情况下,日志段至少新写入4KB的消息数据才会新增一条索引项

  • rollJitterMs:是日志段对象新增倒计时的 “扰动值” 。因为目前Broker端日志新增倒计时是全局设置,也就是说,在未来的某个时间可能同时创建多个日志段对象,这将极大地增加物理磁盘I/O 压力。有了 rollJitterMs 值的干扰,每个新增日志段在创建时会彼此岔开一小段时间,这样可以缓解物理磁盘的 I/O 负载瓶颈。

  • time:用于统计计时的一个实现类,在 Kafka 源码中普遍出现

4

日志段重要方法4.1append方法

/** * Append the given messages starting with the given offset. Add * an entry to the index if needed. * * It is assumed this method is being called from within a lock. * * @param largestOffset The last offset in the message set * @param largestTimestamp The largest timestamp in the message set. * @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append. * @param records The log entries to append. * @return the physical position in the file of the appended records * @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow */@nonthreadsafedef append(largestOffset: Long,           largestTimestamp: Long,           shallowOffsetOfMaxTimestamp: Long,           records: MemoryRecords): Unit = {  if (records.sizeInBytes > 0) {    trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +          s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")    val physicalPosition = log.sizeInBytes()    if (physicalPosition == 0)      rollingBasedTimestamp = Some(largestTimestamp)    ensureOffsetInRange(largestOffset)    // append the messages    val appendedBytes = log.append(records)    trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset")    // Update the in memory max timestamp and corresponding offset.    if (largestTimestamp > maxTimestampSoFar) {      maxTimestampSoFar = largestTimestamp      offsetOfMaxTimestampSoFar = shallowOffsetOfMaxTimestamp    }    // append an entry to the index (if needed)    if (bytesSinceLastIndexEntry > indexIntervalBytes) {      offsetIndex.append(largestOffset, physicalPosition)      timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)      bytesSinceLastIndexEntry = 0    }    bytesSinceLastIndexEntry += records.sizeInBytes  }}

append 方法接收 4 个参数分别表示待写入消息批次中消息的:

  1. 最大位移值

  2. 最大时间戳

  3. 最大时间戳对应消息的位移

  4. 要写入的消息集合

append方法的完整执行步骤:

1)调用 log.sizeInBytes 方法判断该日志段是否为空,如果是空的话, Kafka 需要记录要写入消息集合的最大时间戳,并将其作为后面新增日志段倒计时的依据。

2)调用 ensureOffsetInRange 方法确保输入参数最大位移值是合法的。

判断是不是合法。标准就是看它与日志段起始位移的差值是否在整数范围内,即 largestOffset - baseOffset 的值是不是介于 [0,Int.MAXVALUE] 之间。在极个别的情况下,这个差值可能会越界,这时,append 方法就会抛出异常,阻止后续的消息写入。一旦你碰到这个问题,你需要做的是升级你的 Kafka 版本,因为这是由已知的 Bug 导致的。

3)append 方法调用 FileRecords 的 append 方法执行真正的写入

4)更新日志段最大时间戳和最大时间戳所属消息的位移值。每个日志段都要保存当前最大时间戳信息和所属消息的位移信息。

还记得 Broker 端提供定期删除日志的功能吗?比如我只想保留最近 7 天的日志,没错,当前最大时间戳这个值就是判断的依据;而最大时间戳对应的消息的位移值则用于时间戳索引项。时间戳索引项保存时间戳与消息位移的对应关系。在这步操作中,Kafka 会更新并保存这组对应关系。

5)append 方法的最后一步就是更新索引项和写入的字节数了。当已写入字节数超过了 4KB 之后,append 方法会调用索引对象的 append 方法新增索引项,同时清空已写入字节数,以备下次重新累积计算。

4.2read方法

/** * Read a message set from this segment beginning with the first offset >= startOffset. The message set will include * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified. * * @param startOffset A lower bound on the first offset to include in the message set we read * @param maxSize The maximum number of bytes to include in the message set we read * @param maxPosition The maximum position in the log segment that should be exposed for read * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxSize` (if one exists) * * @return The fetched data and the offset metadata of the first message whose offset is >= startOffset, *         or null if the startOffset is larger than the largest offset in this log */@threadsafedef read(startOffset: Long,         maxSize: Int,         maxPosition: Long = size,         minOneMessage: Boolean = false): FetchDataInfo = {  if (maxSize < 0)    throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")  val startOffsetAndSize = translateOffset(startOffset)  // if the start position is already off the end of the log, return null  if (startOffsetAndSize == null)    return null  val startPosition = startOffsetAndSize.position  val offsetMetadata = LogOffsetMetadata(startOffset, this.baseOffset, startPosition)  val adjustedMaxSize =    if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)    else maxSize  // return a log segment but with zero size in the case below  if (adjustedMaxSize == 0)    return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)  // calculate the length of the message set to read based on whether or not they gave us a maxOffset  val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)  FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),    firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)}

read方法的4个参数:

  1. startOffset:要读取的第一条消息的位移

  2. maxSize:能读取的最大字节数

  3. maxPosition :能读到的最大文件位置;

  4. minOneMessage:是否允许在消息体过大时至少返回第一条消息(当这个参数为true时,即使出现消息体字节数超过了 maxSize,read方法依然能返回至少一条消息。引入这个参数主要是为了确保不出现消费饿死的情况)

read方法调用步骤:

1)调用 translateOffset 方法定位要读取的起始文件位(startPosition)。输入参数 startOffset 仅仅是位移值,Kafka 需要根据索引信息找到对应的物理文件位置才能开始读取消息。

2)待确定了读取起始位置,日志段代码需要根据这部分信息以及 maxSize 和 maxPosition 参数共同计算要读取的总字节数。

如:maxSize=100,maxPosition=300,startPosition=250,那么 read 方法只能读取 50 字节,因为 maxPosition - startPosition = 50。我们把它和 maxSize 参数相比较,其中的最小值就是最终能够读取的总字节数。

3)调用 FileRecords 的 slice 方法,从指定位置读取指定大小的消息集合

4.3recover方法

/** * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes * from the end of the log and index. * * @param producerStateManager Producer state corresponding to the segment's base offset. This is needed to recover *                             the transaction index. * @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery. * @return The number of bytes truncated from the log * @throws LogSegmentOffsetOverflowException if the log segment contains an offset that causes the index offset to overflow */@nonthreadsafedef recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = {  offsetIndex.reset()  timeIndex.reset()  txnIndex.reset()  var validBytes = 0  var lastIndexEntry = 0  maxTimestampSoFar = RecordBatch.NO_TIMESTAMP  try {    for (batch       batch.ensureValid()      ensureOffsetInRange(batch.lastOffset)      // The max timestamp is exposed at the batch level, so no need to iterate the records      if (batch.maxTimestamp > maxTimestampSoFar) {        maxTimestampSoFar = batch.maxTimestamp        offsetOfMaxTimestampSoFar = batch.lastOffset      }      // Build offset index      if (validBytes - lastIndexEntry > indexIntervalBytes) {        offsetIndex.append(batch.lastOffset, validBytes)        timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)        lastIndexEntry = validBytes      }      validBytes += batch.sizeInBytes()      if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {        leaderEpochCache.foreach { cache =>          if (batch.partitionLeaderEpoch > 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))            cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)        }        updateProducerState(producerStateManager, batch)      }    }  } catch {    case e@ (_: CorruptRecordException | _: InvalidRecordException) =>      warn("Found invalid messages in log segment %s at byte offset %d: %s. %s"        .format(log.file.getAbsolutePath, validBytes, e.getMessage, e.getCause))  }  val truncated = log.sizeInBytes - validBytes  if (truncated > 0)    debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery")  log.truncateTo(validBytes)  offsetIndex.trimToValidSize()  // A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.  timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true)  timeIndex.trimToValidSize()  truncated}

recover的处理逻辑:

recover调用步骤:

1)代码一次调用索引对象的 reset 方法清空所有的索引文件

2)遍历日志段中的而所有消息集合或消息批次

3)读取到的每个消息集合,日志段必须要保证他们是合法的,主要体现在两方面:

  • 该集合中的消息必须符合Kafka定义的二进制格式

  • 该集合中最后一条消息的位移值不能越界,即它与日志段起始位移的差值必须是一个正整数

4)校验完消息集合之后,代码会更新遍历过程中观测到的最大时间戳以及所属消息的位移值。这两个数据用于后续构建索引项。再之后就是不断累加当前已读取的消息字节数,并根据该值有条件地写入索引项。

5)更新事务性Producer的状态以及Leader Epoch缓存

执行完遍历后,Kafka会将日志段当前总字节数和刚刚累加的已读取字节数进行比较,如果发现前者比后者大,说明日志段吸入了非法消息,需要执行截断操作,将日志段调整回合法的数值。同时,kafka还必须相应的调整索引文件的大小。把这些都做完之后,日志段恢复的操作也就宣告结束了。

5

总结

Kafka 日志段源码进行了重点的分析,包括日志段的 append 方法、read 方法和 recover 方法。

  1. append 方法:我重点分析了源码是如何写入消息到日志段的。你要重点关注一下写操作过程中更新索引的时机是如何设定的。

  2. read 方法:我重点分析了源码底层读取消息的完整流程。你要关注下 Kafka 计算待读取消息字节数的逻辑,也就是 maxSize、maxPosition 和 startOffset 是如何共同影响 read 方法的。

  3. recover 方法:这个操作会读取日志段文件,然后重建索引文件。再强调一下,这个操作在执行过程中要读取日志段文件。因此,如果你的环境上有很多日志段文件,你又发现 Broker 重启很慢,那你现在就知道了,这是因为 Kafka 在执行 recover 的过程中需要读取大量的磁盘文件导致的

kafka源码_Kafka日志段源码解析相关推荐

  1. ad09只在一定范围内查找相似对象_kafka日志段中的二分查找

    二分查找 Kafka 中直接接触索引或索引文件的场景可能不是很多.索引是一个很神秘的组件,Kafka 官方文档也没有怎么提过它.索引这个组件的源码还有一个亮点,那就是它应用了耳熟能详的二分查找算法来快 ...

  2. JCL源码使用日志框架源码查看

    查看源码:Log接口的4个实现类JDk13JDK14 正常java.util.loggingLog4j 我们集成的log4jSimple JCL自带实现类(1)查看Jdk14Logger证明里面使用的 ...

  3. graylog+kafka+zookeeper(单机测试及源码),微服务日志查询使用(七)

    graylog+kafka+zookeeper(单机测试及源码),微服务日志查询使用(七) 问题背景 graylog+kafka+zookeeper(单机测试及源码),graylog组件部署,查找问题 ...

  4. producer send源码_Kafka源码深度剖析系列(七)——Producer核心流程初探

    本次内容我们有两个目标:第一个初探Producer发送消息的流程第二个我们学习一下Kafka是如何构造异常体系的一.代码分析Producer核心流程初探 //因为生产中开发使用的是异步的方式发送的消息 ...

  5. graylog+kafka+zookeeper(单机测试及源码),graylog设置URL报警方式(五)

    graylog+kafka+zookeeper(单机测试及源码),graylog设置URL报警方式(五) 问题背景 graylog+kafka+zookeeper(单机测试及源码),graylog组件 ...

  6. graylog+kafka+zookeeper(单机测试及源码),kafka+zookeeper组件部署(二)

    graylog+kafka+zookeeper(单机测试及源码),kafka+zookeeper组件部署(二) 问题背景 graylog+kafka+zookeeper(单机测试及源码),graylo ...

  7. graylog+kafka+zookeeper(单机测试及源码),graylog收集kafka(脚本创建发布订阅方式)存储的消息(四)

    graylog+kafka+zookeeper(单机测试及源码),graylog收集kafka(脚本创建发布订阅方式)存储的消息(四) 问题背景 graylog+kafka+zookeeper(单机测 ...

  8. graylog+kafka+zookeeper(单机测试及源码),graylog测试用例及源码(三)

    graylog+kafka+zookeeper(单机测试及源码),graylog测试用例及源码(三) 问题背景 graylog+kafka+zookeeper(单机测试及源码),graylog组件部署 ...

  9. k8s与日志--journalbeat源码解读 1

    前言 对于日志系统的重要性不言而喻,参照沪江的一 篇关于日志系统的介绍,基本上日志数据在以下几方面具有非常重要的作用: 数据查找:通过检索日志信息,定位相应的 bug ,找出解决方案 服务诊断:通过对 ...

最新文章

  1. linuxHacks中记载的一些小技巧
  2. idea下mapreduce的wordcount
  3. 广告出价--如何使用PID控制广告投放成本
  4. Html中value和name属性的作用
  5. 《走遍中国》珍藏版(十)
  6. apache karaf_Apache Karaf遇到Apache HBase
  7. oo0ooo0ooo0oo_OoO的完整形式是什么?
  8. 服务器批量修改代码,利用Redis实现多服务器批量操作
  9. 长沙计算机中级职称分数公布,大家所期待的2020年湖南省长沙中级职称评审公示...
  10. 送书《R语言数据分析和可视化》 | 这个为生信学习和生信作图打造的开源R教程真香!!!...
  11. 技工学校计算机类论文,技工学校计算机教学论文
  12. 面试官问我什么是「栈」,我随手画了 10 张图来解释
  13. php 构造函数参数传值,php 构造函数参数
  14. linux flash文件系统,需要了解Linux flash文件系统
  15. 在开放环境的步态识别:一个基准(二)
  16. 无线中继后要不要关闭dhcp服务器,tplink无线路由器WDS桥接后副路由开启DHCP 好还是关闭好。...
  17. 设置html字体大小 js,js如何改变文章的字体大小
  18. 一文详解空洞卷积(Atrous Convolution)
  19. 使用Faiss来加速计算向量之间的相似度
  20. 逐梦电竞:雷神“光追”游戏电脑新年首发

热门文章

  1. funuiTitle-居中问题修改
  2. Qt connect parent widget 连接父控件的信号槽
  3. 基于WPF系统框架设计(7)-TextBox/PasswordBox在ViewModel中支持回车命令
  4. IOS开发学习笔记-----UILabel 详解
  5. 清理rms客户端信息
  6. linux设备驱动之PCIE驱动开发
  7. python线程join方法_Python多线程join()用法
  8. asp.net怎么实现按条件查询_用这个提取函数王中王,制作数据查询表
  9. HDR概念 (二十八)
  10. Android图形之HWC(二十四)