为了提高查找消息的性能,为每一个日志文件添加2个索引索引文件:OffsetIndex 和 TimeIndex,她俩分别对应着磁盘上两个索引文件,与FileMessageSet共同构成一个LogSegment对象。

OffsetIndex索引文件的格式: 每一个索引项为8字节,其中相对offset占用4字节,消息的物理地址(position)占用4个字节

这样就实现了相对offset与物理地址的映射,相对offset表示消息相对于baseOffSet的偏移量,例如分段后的一个日志文件的baseOffset是32450,它的文件名就是32450.log,那么offset为32455的消息在相对offset就是32455-32450 = 5。

另外OffsetIndex是稀疏索引,也就是说不会存储所有的消息的相对offset和position

OffsetIndexhe核心字段:

file: 指向磁盘上的索引文件

baseOffset: 对应日志文件第一个消息的offset
mmap: 用来操作索引文件的MappedByteBuffer

lock: ReentrantLock对象,在mmap进行操作的时候需要加锁保护

_entries:当前索引文件索引项的个数

_maxEntries: 当前索引文件中最多能够保存索引项个数

_lastOffset: 保存最后一个索引项的offset

/**
 * 因为会有多个handler线程并发写入索引文件,所以这些字段使用@volatile,保证线程之间的可见性
 */
@volatile
protected var mmap: MappedByteBuffer= {
  // 如果索引文件不存在则创建返回true,否则存在返回false
 
val newlyCreated = _file.createNewFile()
  val raf = new RandomAccessFile(_file, "rw")
  try {
    /* 如有必要进行预分配空间 */
   
if(newlyCreated) {
      //maxIndexSize值大小对索引文件分配大小,分配结果是小于maxIndexSize *8
     
if(maxIndexSize < entrySize)
        throw new IllegalArgumentException("Invalidmax index size: " + maxIndexSize)
      raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
    }

/* 内存映射 */
   
val len = raf.length()
    val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)

/* 蒋新创建的索引文件的position设置为0,从头开始写文件 */
   
if(newlyCreated)
      idx.position(0)
    else
      // 对于原来就存在的索引文件,则将position移动到索引项的结束位置,防止数据覆盖
     
idx.position(roundDownToExactMultiple(idx.limit, entrySize))
    idx
  } finally {
    CoreUtils.swallow(raf.close())
  }
}

// 添加指定offset/location对到索引中
def append(offset: Long, position: Int) {inLock(lock) {require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")if (_entries == 0 || offset > _lastOffset) {debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))mmap.putInt((offset - baseOffset).toInt)mmap.putInt(position)_entries += 1_lastOffset = offsetrequire(_entries * entrySize == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")} else {throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s.".format(offset, entries, _lastOffset, file.getAbsolutePath))}}
}

查找索引:

// 查找那些大于或者等于给定的offset的offset和position
def lookup(targetOffset: Long): OffsetPosition = {maybeLock(lock) {val idx = mmap.duplicate // 创建一个副本val slot = indexSlotFor(idx, targetOffset, IndexSearchType.KEY)if(slot == -1)OffsetPosition(baseOffset, 0)elseparseEntry(idx, slot).asInstanceOf[OffsetPosition]}
}
// 查找那些大于或者等于指定offset存储在哪一个slot上的
protected def indexSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): Int = {// check if the index is emptyif(_entries == 0)return -1// check if the target offset is smaller than the least offsetif(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)return -1// 二分查找算法var lo = 0var hi = _entries - 1while(lo < hi) {val mid = ceil(hi/2.0 + lo/2.0).toIntval found = parseEntry(idx, mid)val compareResult = compareIndexEntry(found, target, searchEntity)if(compareResult > 0)hi = mid - 1else if(compareResult < 0)lo = midelsereturn mid}lo
}

TimeIndex索引文件格式:它是映射时间戳和相对offset, 时间戳和相对offset作为entry,供占用12字节,时间戳占用8字节,相对offset占用4字节,这个索引也是稀疏索引,没有保存全部的消息的entry

TimeIndex查找添加算法和OffsetIndex差不多,不再赘述

OffsetIndex和TimeIndex分析相关推荐

  1. LogSegment分析

    为了防止Log文件过大,将Log切分成多个日志文件来管理,每一个日志文件对应着一个LogSegment.在LogSegment封装了TimeIndex ,OffsetIndex以及FileMessag ...

  2. LogManager分析

    在一个broker上的log都是通过LogManger来管理的,LogManager主要负责日志管理,包括日志创建,日志获取,日志清理,所有的读写操作都要委托的那个日志实例 一 核心字段 logDir ...

  3. kafka2.2源码分析之Log日志存储

    概述 Log由一系列LogSegment组成,每个LogSegment都有一个base offset,表示该段中的第一条消息. 新的LogSegment会根据Log的配置策略来创建.配置策略控制了Lo ...

  4. kafka消息存储与partition副本原理(二)

    消息的存储原理: 消息的文件存储机制: 前面我们知道了一个 topic 的多个 partition 在物理磁盘上的保存路径,那么我们再来分析日志的存储方式.通过 ll /tmp/kafka-logs/ ...

  5. kafka源码_Kafka日志段源码解析

    1 Kafka 日志结构 kafka 日志在磁盘上的组织架构如下: Kafka 日志对象由多个日志段对象组成,每个日志段对象在磁盘上创建一组文件,包括: 日志文件(.log) 索引文件(.index) ...

  6. Kafka 核心源码解读【一】--日志模块

    文章目录 1 日志段:保存消息文件的对象是怎么实现的? 1.1 Kafka 日志结构概览 1.2 日志段代码解析 1.3 日志段类声明 1.4 append 方法 1.5 read 方法 1.6 re ...

  7. kafka日志对象(一)—— Log Segment

    kafka的高吞吐量和持久性是一大亮点,内部的日志操作是如何呢,研究明白了一定豁然开朗,至少我有种恍然大明白的感觉. kafka的日志结构 Kafka 日志对象由多个日志段对象组成,而每个日志段对象会 ...

  8. Kafka 的实现原理

    1.消息中间件能做什么? 1.1 异步处理 消息中间件主要解决的就是分布式系统之间消息传递的问题,它能够屏蔽各种平台以及协议之间的特性,实现应用程序之间的协同.举个非常简单的例子,就拿一个电商平台的注 ...

  9. ad09只在一定范围内查找相似对象_kafka日志段中的二分查找

    二分查找 Kafka 中直接接触索引或索引文件的场景可能不是很多.索引是一个很神秘的组件,Kafka 官方文档也没有怎么提过它.索引这个组件的源码还有一个亮点,那就是它应用了耳熟能详的二分查找算法来快 ...

最新文章

  1. 不会做内部分享的程序员不是好程序员
  2. 解决无法使用pip命令加载Python的扩展库问题
  3. 提升编程水平的靠谱方法
  4. Python中如何写控制台进度条的整理
  5. Python语言学习:复杂函数(yield/@property)使用方法、案例应用之详细攻略
  6. java void eat_java匿名内部类
  7. cocos2dX改变锚点位置
  8. Python精通-Python函数使用
  9. Rayeager PX2 不能进入烧写模式解决方案
  10. linux选择内核命令,Lenky个人站点
  11. android studio im源码,全开源即时通讯(IM)系统 高仿微信源码
  12. 程序员100套简历模板,全网最全
  13. Redis 官方推出可视化工具,颜值爆表,功能真心强大!这是不给其他工具活路啊!...
  14. SQLServer数据库压缩与数据库日志压缩
  15. 归一化相关系数相关匹配公式
  16. Cannot connenct to relay host smtp.163.com (php邮件发送失败)
  17. 【C++】关于日期的计算
  18. 【开源毕设】一款精美的家校互动APP分享——爱吖校推 [你关注的,我们才推](持续开源更新2)
  19. vivado安装步骤
  20. Altium Designer快捷键和规则

热门文章

  1. 没有workstation_这才是Win10的旗舰版:WorkStation版独享功能测试
  2. emolg模板PHP7,PHP7下安装Emlog5.3.1的笔记
  3. MATLAB solve函数计算得到lambertw函数,用vpa转换即可
  4. c语言怎么算n以内素数,关于求N以内素数的一点小问题(N小于一亿)
  5. static在内存层面的作用_C++内存管理笔记
  6. Kafka eagle 安装
  7. MyBatis 原理
  8. 计算机文化基础课程实验,山东经贸职业学院计算机文化基础课程实验教学大纲.doc...
  9. android 获取相机方向,android – 从相机捕捉图像,导致炸毁方向
  10. Java 算法 判定字符位置