kafka源码_Kafka日志段源码解析
1
Kafka 日志结构
kafka 日志在磁盘上的组织架构如下:
Kafka 日志对象由多个日志段对象组成,每个日志段对象在磁盘上创建一组文件,包括:
日志文件(.log)
索引文件(.index)
时间戳索引文件(.timeindex)
已中止(Aborted)事务的索引文件(.txnindex)
当然,如果你没有使用kafka事务,已中止事务的索引文件是不会被创建出来
图中一串数字0是该日志段的起始位置(Base Offset),也就是该日志段中所存的第一条消息的位移值
一个kafka主题有很多分区,每个分区就对应一个log对象,在物理磁盘上对应一个目录。
如:你创建一个双分区的主题 test-topic,那么,kafka在磁盘上会创建两个子目录:test-topic-0 和 test-topic-1 。在服务器端,就是两个Log对象。每个子目录下存在多组日志段,也就是多组.log、.index、.timeindex 文件组合,只不过文件名不同,因为每个日志段的起始位置不同
2
日志段源码
日志段源码位于 Kafka 的 core 工程下,具体文件位置是 core/src/main/scala/kafka/log/LogSegment.scala。实际上,所有日志结构部分的源码都在 core 的 kafka.log 包下。
该文件下定义了三个 Scala 对象:
LogFlushStats 结尾有个 Stats,它是做统计用的,主要负责日志落盘进行计时
我们主要关心的是 LogSegment class 和 object。
/** * A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in * any previous segment. * * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file. * * @param log The file records containing log entries * @param lazyOffsetIndex The offset index * @param lazyTimeIndex The timestamp index * @param txnIndex The transaction index * @param baseOffset A lower bound on the offsets in this segment * @param indexIntervalBytes The approximate number of bytes between entries in the index * @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time * @param time The time instance */
LogSegment 伴生类注释 说明了每个日志段由两个核心组件构成:日志和索引。这里的索引
这里的索引指索引文件。每个日志段都有一个起始位移值(Base Offset)<= 在这个日志段中每个消息的最小的位移值 并且 > 前面任何日志段中消息的任何位移
3
日志段声明
/* @param log The file records containing log entries * @param lazyOffsetIndex The offset index * @param lazyTimeIndex The timestamp index * @param txnIndex The transaction index * @param baseOffset A lower bound on the offsets in this segment * @param indexIntervalBytes The approximate number of bytes between entries in the index * @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time * @param time The time instance */@nonthreadsafeclass LogSegment private[log] (val log: FileRecords, val lazyOffsetIndex: LazyIndex[OffsetIndex], val lazyTimeIndex: LazyIndex[TimeIndex], val txnIndex: TransactionIndex, val baseOffset: Long, val indexIntervalBytes: Int, val rollJitterMs: Long, val time: Time) extends Logging {..}
FileRecords:保存Kafka消息对象
lazyOffsetIndex:位移索引文件
lazyTimeIndex :时间戳索引文件
txnIndex :已中止事务索引文件
baseOffset:每个日志段保存自己的起始位置(每个LogSegment对象一旦被创建,它的起始位移就固定了,不能再被更改)
indexIntervalBytes:其实就是Broker端参数 log.index.interval.bytes值。它控制日志段对象新增索引项的频率。默认情况下,日志段至少新写入4KB的消息数据才会新增一条索引项
rollJitterMs:是日志段对象新增倒计时的 “扰动值” 。因为目前Broker端日志新增倒计时是全局设置,也就是说,在未来的某个时间可能同时创建多个日志段对象,这将极大地增加物理磁盘I/O 压力。有了 rollJitterMs 值的干扰,每个新增日志段在创建时会彼此岔开一小段时间,这样可以缓解物理磁盘的 I/O 负载瓶颈。
time:用于统计计时的一个实现类,在 Kafka 源码中普遍出现
4
日志段重要方法4.1append方法
/** * Append the given messages starting with the given offset. Add * an entry to the index if needed. * * It is assumed this method is being called from within a lock. * * @param largestOffset The last offset in the message set * @param largestTimestamp The largest timestamp in the message set. * @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append. * @param records The log entries to append. * @return the physical position in the file of the appended records * @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow */@nonthreadsafedef append(largestOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = { if (records.sizeInBytes > 0) { trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " + s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp") val physicalPosition = log.sizeInBytes() if (physicalPosition == 0) rollingBasedTimestamp = Some(largestTimestamp) ensureOffsetInRange(largestOffset) // append the messages val appendedBytes = log.append(records) trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset") // Update the in memory max timestamp and corresponding offset. if (largestTimestamp > maxTimestampSoFar) { maxTimestampSoFar = largestTimestamp offsetOfMaxTimestampSoFar = shallowOffsetOfMaxTimestamp } // append an entry to the index (if needed) if (bytesSinceLastIndexEntry > indexIntervalBytes) { offsetIndex.append(largestOffset, physicalPosition) timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar) bytesSinceLastIndexEntry = 0 } bytesSinceLastIndexEntry += records.sizeInBytes }}
append 方法接收 4 个参数分别表示待写入消息批次中消息的:
最大位移值
最大时间戳
最大时间戳对应消息的位移
要写入的消息集合
append方法的完整执行步骤:
1)调用 log.sizeInBytes 方法判断该日志段是否为空,如果是空的话, Kafka 需要记录要写入消息集合的最大时间戳,并将其作为后面新增日志段倒计时的依据。
2)调用 ensureOffsetInRange 方法确保输入参数最大位移值是合法的。
判断是不是合法。标准就是看它与日志段起始位移的差值是否在整数范围内,即 largestOffset - baseOffset 的值是不是介于 [0,Int.MAXVALUE] 之间。在极个别的情况下,这个差值可能会越界,这时,append 方法就会抛出异常,阻止后续的消息写入。一旦你碰到这个问题,你需要做的是升级你的 Kafka 版本,因为这是由已知的 Bug 导致的。
3)append 方法调用 FileRecords 的 append 方法执行真正的写入
4)更新日志段最大时间戳和最大时间戳所属消息的位移值。每个日志段都要保存当前最大时间戳信息和所属消息的位移信息。
还记得 Broker 端提供定期删除日志的功能吗?比如我只想保留最近 7 天的日志,没错,当前最大时间戳这个值就是判断的依据;而最大时间戳对应的消息的位移值则用于时间戳索引项。时间戳索引项保存时间戳与消息位移的对应关系。在这步操作中,Kafka 会更新并保存这组对应关系。
5)append 方法的最后一步就是更新索引项和写入的字节数了。当已写入字节数超过了 4KB 之后,append 方法会调用索引对象的 append 方法新增索引项,同时清空已写入字节数,以备下次重新累积计算。
4.2read方法
/** * Read a message set from this segment beginning with the first offset >= startOffset. The message set will include * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified. * * @param startOffset A lower bound on the first offset to include in the message set we read * @param maxSize The maximum number of bytes to include in the message set we read * @param maxPosition The maximum position in the log segment that should be exposed for read * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxSize` (if one exists) * * @return The fetched data and the offset metadata of the first message whose offset is >= startOffset, * or null if the startOffset is larger than the largest offset in this log */@threadsafedef read(startOffset: Long, maxSize: Int, maxPosition: Long = size, minOneMessage: Boolean = false): FetchDataInfo = { if (maxSize < 0) throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log") val startOffsetAndSize = translateOffset(startOffset) // if the start position is already off the end of the log, return null if (startOffsetAndSize == null) return null val startPosition = startOffsetAndSize.position val offsetMetadata = LogOffsetMetadata(startOffset, this.baseOffset, startPosition) val adjustedMaxSize = if (minOneMessage) math.max(maxSize, startOffsetAndSize.size) else maxSize // return a log segment but with zero size in the case below if (adjustedMaxSize == 0) return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY) // calculate the length of the message set to read based on whether or not they gave us a maxOffset val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize) FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize), firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)}
read方法的4个参数:
startOffset:要读取的第一条消息的位移
maxSize:能读取的最大字节数
maxPosition :能读到的最大文件位置;
minOneMessage:是否允许在消息体过大时至少返回第一条消息(当这个参数为true时,即使出现消息体字节数超过了 maxSize,read方法依然能返回至少一条消息。引入这个参数主要是为了确保不出现消费饿死的情况)
read方法调用步骤:
1)调用 translateOffset 方法定位要读取的起始文件位(startPosition)。输入参数 startOffset 仅仅是位移值,Kafka 需要根据索引信息找到对应的物理文件位置才能开始读取消息。
2)待确定了读取起始位置,日志段代码需要根据这部分信息以及 maxSize 和 maxPosition 参数共同计算要读取的总字节数。
如:maxSize=100,maxPosition=300,startPosition=250,那么 read 方法只能读取 50 字节,因为 maxPosition - startPosition = 50。我们把它和 maxSize 参数相比较,其中的最小值就是最终能够读取的总字节数。
3)调用 FileRecords 的 slice 方法,从指定位置读取指定大小的消息集合
4.3recover方法
/** * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes * from the end of the log and index. * * @param producerStateManager Producer state corresponding to the segment's base offset. This is needed to recover * the transaction index. * @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery. * @return The number of bytes truncated from the log * @throws LogSegmentOffsetOverflowException if the log segment contains an offset that causes the index offset to overflow */@nonthreadsafedef recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = { offsetIndex.reset() timeIndex.reset() txnIndex.reset() var validBytes = 0 var lastIndexEntry = 0 maxTimestampSoFar = RecordBatch.NO_TIMESTAMP try { for (batch batch.ensureValid() ensureOffsetInRange(batch.lastOffset) // The max timestamp is exposed at the batch level, so no need to iterate the records if (batch.maxTimestamp > maxTimestampSoFar) { maxTimestampSoFar = batch.maxTimestamp offsetOfMaxTimestampSoFar = batch.lastOffset } // Build offset index if (validBytes - lastIndexEntry > indexIntervalBytes) { offsetIndex.append(batch.lastOffset, validBytes) timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar) lastIndexEntry = validBytes } validBytes += batch.sizeInBytes() if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) { leaderEpochCache.foreach { cache => if (batch.partitionLeaderEpoch > 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch > _)) cache.assign(batch.partitionLeaderEpoch, batch.baseOffset) } updateProducerState(producerStateManager, batch) } } } catch { case e@ (_: CorruptRecordException | _: InvalidRecordException) => warn("Found invalid messages in log segment %s at byte offset %d: %s. %s" .format(log.file.getAbsolutePath, validBytes, e.getMessage, e.getCause)) } val truncated = log.sizeInBytes - validBytes if (truncated > 0) debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery") log.truncateTo(validBytes) offsetIndex.trimToValidSize() // A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well. timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true) timeIndex.trimToValidSize() truncated}
recover的处理逻辑:
recover调用步骤:
1)代码一次调用索引对象的 reset 方法清空所有的索引文件
2)遍历日志段中的而所有消息集合或消息批次
3)读取到的每个消息集合,日志段必须要保证他们是合法的,主要体现在两方面:
该集合中的消息必须符合Kafka定义的二进制格式
该集合中最后一条消息的位移值不能越界,即它与日志段起始位移的差值必须是一个正整数
4)校验完消息集合之后,代码会更新遍历过程中观测到的最大时间戳以及所属消息的位移值。这两个数据用于后续构建索引项。再之后就是不断累加当前已读取的消息字节数,并根据该值有条件地写入索引项。
5)更新事务性Producer的状态以及Leader Epoch缓存
执行完遍历后,Kafka会将日志段当前总字节数和刚刚累加的已读取字节数进行比较,如果发现前者比后者大,说明日志段吸入了非法消息,需要执行截断操作,将日志段调整回合法的数值。同时,kafka还必须相应的调整索引文件的大小。把这些都做完之后,日志段恢复的操作也就宣告结束了。
5
总结
Kafka 日志段源码进行了重点的分析,包括日志段的 append 方法、read 方法和 recover 方法。
append 方法:我重点分析了源码是如何写入消息到日志段的。你要重点关注一下写操作过程中更新索引的时机是如何设定的。
read 方法:我重点分析了源码底层读取消息的完整流程。你要关注下 Kafka 计算待读取消息字节数的逻辑,也就是 maxSize、maxPosition 和 startOffset 是如何共同影响 read 方法的。
recover 方法:这个操作会读取日志段文件,然后重建索引文件。再强调一下,这个操作在执行过程中要读取日志段文件。因此,如果你的环境上有很多日志段文件,你又发现 Broker 重启很慢,那你现在就知道了,这是因为 Kafka 在执行 recover 的过程中需要读取大量的磁盘文件导致的
kafka源码_Kafka日志段源码解析相关推荐
- ad09只在一定范围内查找相似对象_kafka日志段中的二分查找
二分查找 Kafka 中直接接触索引或索引文件的场景可能不是很多.索引是一个很神秘的组件,Kafka 官方文档也没有怎么提过它.索引这个组件的源码还有一个亮点,那就是它应用了耳熟能详的二分查找算法来快 ...
- JCL源码使用日志框架源码查看
查看源码:Log接口的4个实现类JDk13JDK14 正常java.util.loggingLog4j 我们集成的log4jSimple JCL自带实现类(1)查看Jdk14Logger证明里面使用的 ...
- graylog+kafka+zookeeper(单机测试及源码),微服务日志查询使用(七)
graylog+kafka+zookeeper(单机测试及源码),微服务日志查询使用(七) 问题背景 graylog+kafka+zookeeper(单机测试及源码),graylog组件部署,查找问题 ...
- producer send源码_Kafka源码深度剖析系列(七)——Producer核心流程初探
本次内容我们有两个目标:第一个初探Producer发送消息的流程第二个我们学习一下Kafka是如何构造异常体系的一.代码分析Producer核心流程初探 //因为生产中开发使用的是异步的方式发送的消息 ...
- graylog+kafka+zookeeper(单机测试及源码),graylog设置URL报警方式(五)
graylog+kafka+zookeeper(单机测试及源码),graylog设置URL报警方式(五) 问题背景 graylog+kafka+zookeeper(单机测试及源码),graylog组件 ...
- graylog+kafka+zookeeper(单机测试及源码),kafka+zookeeper组件部署(二)
graylog+kafka+zookeeper(单机测试及源码),kafka+zookeeper组件部署(二) 问题背景 graylog+kafka+zookeeper(单机测试及源码),graylo ...
- graylog+kafka+zookeeper(单机测试及源码),graylog收集kafka(脚本创建发布订阅方式)存储的消息(四)
graylog+kafka+zookeeper(单机测试及源码),graylog收集kafka(脚本创建发布订阅方式)存储的消息(四) 问题背景 graylog+kafka+zookeeper(单机测 ...
- graylog+kafka+zookeeper(单机测试及源码),graylog测试用例及源码(三)
graylog+kafka+zookeeper(单机测试及源码),graylog测试用例及源码(三) 问题背景 graylog+kafka+zookeeper(单机测试及源码),graylog组件部署 ...
- k8s与日志--journalbeat源码解读 1
前言 对于日志系统的重要性不言而喻,参照沪江的一 篇关于日志系统的介绍,基本上日志数据在以下几方面具有非常重要的作用: 数据查找:通过检索日志信息,定位相应的 bug ,找出解决方案 服务诊断:通过对 ...
最新文章
- linuxHacks中记载的一些小技巧
- idea下mapreduce的wordcount
- 广告出价--如何使用PID控制广告投放成本
- Html中value和name属性的作用
- 《走遍中国》珍藏版(十)
- apache karaf_Apache Karaf遇到Apache HBase
- oo0ooo0ooo0oo_OoO的完整形式是什么?
- 服务器批量修改代码,利用Redis实现多服务器批量操作
- 长沙计算机中级职称分数公布,大家所期待的2020年湖南省长沙中级职称评审公示...
- 送书《R语言数据分析和可视化》 | 这个为生信学习和生信作图打造的开源R教程真香!!!...
- 技工学校计算机类论文,技工学校计算机教学论文
- 面试官问我什么是「栈」,我随手画了 10 张图来解释
- php 构造函数参数传值,php 构造函数参数
- linux flash文件系统,需要了解Linux flash文件系统
- 在开放环境的步态识别:一个基准(二)
- 无线中继后要不要关闭dhcp服务器,tplink无线路由器WDS桥接后副路由开启DHCP 好还是关闭好。...
- 设置html字体大小 js,js如何改变文章的字体大小
- 一文详解空洞卷积(Atrous Convolution)
- 使用Faiss来加速计算向量之间的相似度
- 逐梦电竞:雷神“光追”游戏电脑新年首发
热门文章
- funuiTitle-居中问题修改
- Qt connect parent widget 连接父控件的信号槽
- 基于WPF系统框架设计(7)-TextBox/PasswordBox在ViewModel中支持回车命令
- IOS开发学习笔记-----UILabel 详解
- 清理rms客户端信息
- linux设备驱动之PCIE驱动开发
- python线程join方法_Python多线程join()用法
- asp.net怎么实现按条件查询_用这个提取函数王中王,制作数据查询表
- HDR概念 (二十八)
- Android图形之HWC(二十四)