LogSegment分析
为了防止Log文件过大,将Log切分成多个日志文件来管理,每一个日志文件对应着一个LogSegment。在LogSegment封装了TimeIndex
,OffsetIndex以及FileMessageSet对象,提供日志文件和索引文件的读写功能和其他功能
一 LogSegment的核心字段
log: 用于操作对应消息日志文件的FileMessageSet对象
index: 用于操作对应offset 索引文件的 OffsetIndex对象
timeIndex: 用于操作对应时间索引文件的TimeIndex对象
baseOffset: 每一个日志文件的第一个消息的offset
indexIntervalBytes: 索引项之间间隔的最小字节数,也就是隔多少字节写一次索引
bytesSinceLastIndexEntry: 自上次添加index entry后在日志文件中累计加入的message set的字节数,用于判断下一次索引添加的时机
rollingBasedTimestamp: 用于基于时间的日志滚动的时间戳
maxTimestampSoFar: 目前为止最大的时间,也就是timeIndex文件最后一个entry的时间戳
offsetOfMaxTimestamp:timeIndex文件最后一个entry的对应的offset
二 重要的方法
2.1 append 根据指定的offset开始添加消息,如果满足添加索引的条件,也会添加offset 和 time 索引。注意这个方法不是线程安全的,在多线程环境下需要保证线程安全
@nonthreadsafe
def append(firstOffset: Long, largestTimestamp: Long, offsetOfLargestTimestamp: Long, messages: ByteBufferMessageSet) {
if (messages.sizeInBytes > 0) {
trace("Inserting %d bytes at offset %d at position %d with largesttimestamp %d at offset %d"
.format(messages.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, offsetOfLargestTimestamp))
// 获取log现在的物理位置
val physicalPosition = log.sizeInBytes()
if (physicalPosition == 0)
rollingBasedTimestamp= Some(largestTimestamp)
// 调用Log#append添加消息,还是先写入内存的,
log.append(messages)
// 更细内存中最大的时间戳和相对应的offset
if (largestTimestamp > maxTimestampSoFar) {
maxTimestampSoFar= largestTimestamp
offsetOfMaxTimestamp= offsetOfLargestTimestamp
}
// 自从上一次添加索引到现在累计的字节数如果大于添加索引的间隔字节数,则添加索引
if(bytesSinceLastIndexEntry > indexIntervalBytes) {
index.append(firstOffset, physicalPosition)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
// 添加完毕后,把bytesSinceLastIndexEntry置为0,重新累计消息字节数
bytesSinceLastIndexEntry= 0
}
bytesSinceLastIndexEntry+= messages.sizeInBytes
}
}
2.2 read 从log segment第一个offset开始,读取消息,第一个offset应该大于指定的startOffset
startOffset: 指定读取的起始消息的offset
maxOffset: 指定读取结束的offset,可以为空
maxSize: 指定读取最大的字节数
maxPosition: 指定读取最大的物理地址,可选,默认是日志文件大小
@threadsafe def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size,minOneMessage: Boolean = false): FetchDataInfo = {if (maxSize < 0)throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))// 获取日志大小val logSize = log.sizeInBytes // this may change, need to save a consistent copy// 找到第一个要读取的消息的物理文件位置,将startOffset准换成物理地址val startOffsetAndSize = translateOffset(startOffset)// 如果起始位置已经在日志的末尾返回nullif (startOffsetAndSize == null)return nullval (startPosition, messageSetSize) = startOffsetAndSizeval offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)// 调整maxsize大小,如果消息太大,大于maxSize,必须从maxSize和messageSetSize取出最小的val adjustedMaxSize =if (minOneMessage) math.max(maxSize, messageSetSize)else maxSizeif (adjustedMaxSize == 0)return FetchDataInfo(offsetMetadata, MessageSet.Empty)// 计算消息长度,判断是否从给定的maxOffset读取val length = maxOffset match {case None =>min((maxPosition - startPosition.position).toInt, adjustedMaxSize)case Some(offset) =>if (offset < startOffset)return FetchDataInfo(offsetMetadata, MessageSet.Empty, firstMessageSetIncomplete = false)// offset转化为实际物理地址,如果没有则把当前日志大小更新为endPositionval mapping = translateOffset(offset, startPosition.position)val endPosition =if (mapping == null)logSizeelsemapping._1.positionmin(min(maxPosition, endPosition) - startPosition.position, adjustedMaxSize).toInt}// 构造FetchDataInfo对象FetchDataInfo(offsetMetadata, log.read(startPosition.position, length),firstMessageSetIncomplete = adjustedMaxSize < messageSetSize) }
2.3 translateOffset 将逻辑offset转换成物理的地址
在读取日志文件之前,需要将startOffset和maxOffset转换成对应的物理地址才能用,举个例子:
假设startOffset = 2021,那么我们来分析整个过程:
# 我们将绝对的offset 2021 转换成offset index文件中使用的相对offset,假设baseOffset=2000, 2021 – 2000 = 21
# 在offset 索引文件中查找21属于哪一个区间的,假设索引文件片段如下:
那么21属于20-30区间,那么就从position=478开始找绝对offset为2021的消息
# 通过FileMessageSet.searchFor遍历查找FileMessageSet得到(30,567)这个位置信息。
为什么是30呢?因为21这消息与其他消息被压缩后在一起你构成了offset=2021这个外层消息,并存入日志文件
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): (OffsetPosition, Int) = {val mapping = index.lookup(offset)log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition)) }
LogSegment分析相关推荐
- 【大数据Hadoop】HDFS-HA模式下Checkpointer机制代码分析
Checkpointer机制 概览 源码解读 相关配置项 源码解析 创建StandbyCheckpointer StandbyCheckpointer分析 CheckpointerThread 干了些 ...
- kafka源码分析之副本管理-ReplicaManager
原文地址:https://blog.csdn.net/u014393917/article/details/52043040 ReplicaManager 说明,此组件用于管理kafka中各parti ...
- 【ceph】mkdir|mksnap流程源码分析|锁状态切换实例
目录 一.mkdir Clientd端的处理 发送请求的流程 发送请求的内容 处理请求的流程 后记 二.mkdir MDS端的处理 MDS对于来自客户端请求的通用处理 Locker::process ...
- 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析
目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...
- 2022-2028年中国自动驾驶系统行业现状调研分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国自动驾驶系统行业市场行业相关概述.中国自 ...
- 2022-2028年中国阻尼涂料市场研究及前瞻分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国阻尼涂料行业市场行业相关概述.中国阻尼涂 ...
- 2021-2028年中国阻燃装饰行业市场需求与投资规划分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国阻燃装饰行业市场行业相关概述.中国阻燃装 ...
- 2022-2028年全球与中国漂白吸水棉市场研究及前瞻分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国漂白吸水棉行业市场行业相关概述.全 ...
- 2022-2028年全球与中国青苔清洗剂市场研究及前瞻分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国青苔清洗剂行业市场行业相关概述.全 ...
最新文章
- ComplexHeatmap()函数解析
- 关于ActionBar的向下兼容
- 远程桌面上的文件复制到本地
- git把本地代码上传(更新)到github上
- php 判断 in,tinkphp常用判断条件in、notin、between、AND、OR
- Jmeter工具使用-分布式架构和服务器性能监控解决方案
- NVIDIA新旗舰GeForce GTX 780深度评测
- 牛津高阶字典ld2_奶爸1.6G Mdict词库的补充及在Bluedict中使用的心得
- 3DMAX渲染特别占内存怎么办?
- 使用Ballerina CLI工具开发Ballerina项目
- c语言编译defined,#if defined(__GNUC__)的意思是不是如果使用的是GCC编译器?
- python制作简单网页_怎么用python简单的制作一个网页
- 揭秘:寻找水军组织,宝妈兼职月入3000+
- 透过历史迷雾进行深入探究特斯拉线圈是如何工作的?
- 手把手教你解决宏基笔记本wifi开关故障(超详细)
- 企业信用资质等级证书的办理流程
- B站(Bilibili) 视频的下载。
- 深度学习硬件-GPU-显卡
- 理解设计模式——工厂模式
- php褚洪波,怎么学好php - 褚洪波的个人空间 - OSCHINA - 中文开源技术交流社区
热门文章
- java 舍_Java中BigDecimal的8种舍入模式
- 第八篇:稳定性之提升团队潜意识【及时复盘、开关设计】
- Python机器学习:评价分类结果003实现混淆矩阵,精准率和召回率
- python如何下载pandas、时间延长_大pandas,python – 如何在时间表中选择具体时间
- java 展现层框架_spring快速入门例子教程:06展现层
- ArcGIS GeoDataBase GeoDataset dataset
- git代码库迁移保留commit历史_svn 迁移到 git 仓库并保留 commit 历史记录
- 镜像浏览器_yiming Win10PE 系统维护光盘镜像
- SpringCloud Gateway 服务网关,过滤器
- IFIX 需要权限打开某个画面