OffsetIndex和TimeIndex分析
为了提高查找消息的性能,为每一个日志文件添加2个索引索引文件:OffsetIndex 和 TimeIndex,她俩分别对应着磁盘上两个索引文件,与FileMessageSet共同构成一个LogSegment对象。
OffsetIndex索引文件的格式: 每一个索引项为8字节,其中相对offset占用4字节,消息的物理地址(position)占用4个字节
这样就实现了相对offset与物理地址的映射,相对offset表示消息相对于baseOffSet的偏移量,例如分段后的一个日志文件的baseOffset是32450,它的文件名就是32450.log,那么offset为32455的消息在相对offset就是32455-32450 = 5。
另外OffsetIndex是稀疏索引,也就是说不会存储所有的消息的相对offset和position
OffsetIndexhe核心字段:
file: 指向磁盘上的索引文件
baseOffset: 对应日志文件第一个消息的offset
mmap: 用来操作索引文件的MappedByteBuffer
lock: ReentrantLock对象,在mmap进行操作的时候需要加锁保护
_entries:当前索引文件索引项的个数
_maxEntries: 当前索引文件中最多能够保存索引项个数
_lastOffset: 保存最后一个索引项的offset
/**
* 因为会有多个handler线程并发写入索引文件,所以这些字段使用@volatile,保证线程之间的可见性
*/
@volatile
protected var mmap: MappedByteBuffer= {
// 如果索引文件不存在则创建返回true,否则存在返回false
val newlyCreated = _file.createNewFile()
val raf = new RandomAccessFile(_file, "rw")
try {
/* 如有必要进行预分配空间 */
if(newlyCreated) {
//maxIndexSize值大小对索引文件分配大小,分配结果是小于maxIndexSize *8
if(maxIndexSize < entrySize)
throw new IllegalArgumentException("Invalidmax index size: " + maxIndexSize)
raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
}
/* 内存映射 */
val len = raf.length()
val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
/* 蒋新创建的索引文件的position设置为0,从头开始写文件 */
if(newlyCreated)
idx.position(0)
else
// 对于原来就存在的索引文件,则将position移动到索引项的结束位置,防止数据覆盖
idx.position(roundDownToExactMultiple(idx.limit, entrySize))
idx
} finally {
CoreUtils.swallow(raf.close())
}
}
// 添加指定offset/location对到索引中 def append(offset: Long, position: Int) {inLock(lock) {require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")if (_entries == 0 || offset > _lastOffset) {debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))mmap.putInt((offset - baseOffset).toInt)mmap.putInt(position)_entries += 1_lastOffset = offsetrequire(_entries * entrySize == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")} else {throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s.".format(offset, entries, _lastOffset, file.getAbsolutePath))}} }
查找索引:
// 查找那些大于或者等于给定的offset的offset和position def lookup(targetOffset: Long): OffsetPosition = {maybeLock(lock) {val idx = mmap.duplicate // 创建一个副本val slot = indexSlotFor(idx, targetOffset, IndexSearchType.KEY)if(slot == -1)OffsetPosition(baseOffset, 0)elseparseEntry(idx, slot).asInstanceOf[OffsetPosition]} }
// 查找那些大于或者等于指定offset存储在哪一个slot上的 protected def indexSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): Int = {// check if the index is emptyif(_entries == 0)return -1// check if the target offset is smaller than the least offsetif(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)return -1// 二分查找算法var lo = 0var hi = _entries - 1while(lo < hi) {val mid = ceil(hi/2.0 + lo/2.0).toIntval found = parseEntry(idx, mid)val compareResult = compareIndexEntry(found, target, searchEntity)if(compareResult > 0)hi = mid - 1else if(compareResult < 0)lo = midelsereturn mid}lo }
TimeIndex索引文件格式:它是映射时间戳和相对offset, 时间戳和相对offset作为entry,供占用12字节,时间戳占用8字节,相对offset占用4字节,这个索引也是稀疏索引,没有保存全部的消息的entry
TimeIndex查找添加算法和OffsetIndex差不多,不再赘述
OffsetIndex和TimeIndex分析相关推荐
- LogSegment分析
为了防止Log文件过大,将Log切分成多个日志文件来管理,每一个日志文件对应着一个LogSegment.在LogSegment封装了TimeIndex ,OffsetIndex以及FileMessag ...
- LogManager分析
在一个broker上的log都是通过LogManger来管理的,LogManager主要负责日志管理,包括日志创建,日志获取,日志清理,所有的读写操作都要委托的那个日志实例 一 核心字段 logDir ...
- kafka2.2源码分析之Log日志存储
概述 Log由一系列LogSegment组成,每个LogSegment都有一个base offset,表示该段中的第一条消息. 新的LogSegment会根据Log的配置策略来创建.配置策略控制了Lo ...
- kafka消息存储与partition副本原理(二)
消息的存储原理: 消息的文件存储机制: 前面我们知道了一个 topic 的多个 partition 在物理磁盘上的保存路径,那么我们再来分析日志的存储方式.通过 ll /tmp/kafka-logs/ ...
- kafka源码_Kafka日志段源码解析
1 Kafka 日志结构 kafka 日志在磁盘上的组织架构如下: Kafka 日志对象由多个日志段对象组成,每个日志段对象在磁盘上创建一组文件,包括: 日志文件(.log) 索引文件(.index) ...
- Kafka 核心源码解读【一】--日志模块
文章目录 1 日志段:保存消息文件的对象是怎么实现的? 1.1 Kafka 日志结构概览 1.2 日志段代码解析 1.3 日志段类声明 1.4 append 方法 1.5 read 方法 1.6 re ...
- kafka日志对象(一)—— Log Segment
kafka的高吞吐量和持久性是一大亮点,内部的日志操作是如何呢,研究明白了一定豁然开朗,至少我有种恍然大明白的感觉. kafka的日志结构 Kafka 日志对象由多个日志段对象组成,而每个日志段对象会 ...
- Kafka 的实现原理
1.消息中间件能做什么? 1.1 异步处理 消息中间件主要解决的就是分布式系统之间消息传递的问题,它能够屏蔽各种平台以及协议之间的特性,实现应用程序之间的协同.举个非常简单的例子,就拿一个电商平台的注 ...
- ad09只在一定范围内查找相似对象_kafka日志段中的二分查找
二分查找 Kafka 中直接接触索引或索引文件的场景可能不是很多.索引是一个很神秘的组件,Kafka 官方文档也没有怎么提过它.索引这个组件的源码还有一个亮点,那就是它应用了耳熟能详的二分查找算法来快 ...
最新文章
- 不会做内部分享的程序员不是好程序员
- 解决无法使用pip命令加载Python的扩展库问题
- 提升编程水平的靠谱方法
- Python中如何写控制台进度条的整理
- Python语言学习:复杂函数(yield/@property)使用方法、案例应用之详细攻略
- java void eat_java匿名内部类
- cocos2dX改变锚点位置
- Python精通-Python函数使用
- Rayeager PX2 不能进入烧写模式解决方案
- linux选择内核命令,Lenky个人站点
- android studio im源码,全开源即时通讯(IM)系统 高仿微信源码
- 程序员100套简历模板,全网最全
- Redis 官方推出可视化工具,颜值爆表,功能真心强大!这是不给其他工具活路啊!...
- SQLServer数据库压缩与数据库日志压缩
- 归一化相关系数相关匹配公式
- Cannot connenct to relay host smtp.163.com (php邮件发送失败)
- 【C++】关于日期的计算
- 【开源毕设】一款精美的家校互动APP分享——爱吖校推 [你关注的,我们才推](持续开源更新2)
- vivado安装步骤
- Altium Designer快捷键和规则
热门文章
- 没有workstation_这才是Win10的旗舰版:WorkStation版独享功能测试
- emolg模板PHP7,PHP7下安装Emlog5.3.1的笔记
- MATLAB solve函数计算得到lambertw函数,用vpa转换即可
- c语言怎么算n以内素数,关于求N以内素数的一点小问题(N小于一亿)
- static在内存层面的作用_C++内存管理笔记
- Kafka eagle 安装
- MyBatis 原理
- 计算机文化基础课程实验,山东经贸职业学院计算机文化基础课程实验教学大纲.doc...
- android 获取相机方向,android – 从相机捕捉图像,导致炸毁方向
- Java 算法 判定字符位置