二分查找

Kafka 中直接接触索引或索引文件的场景可能不是很多。索引是一个很神秘的组件,Kafka 官方文档也没有怎么提过它。索引这个组件的源码还有一个亮点,那就是它应用了耳熟能详的二分查找算法来快速定位索引项。而且社区还针对 Kafka 自身的特点对其进行了改良。

1. 索引类图及源文件组织架构

在 Kafka 源码中,跟索引相关的源码文件有 5 个,它们都位于 core 包的 /src/main/scala/kafka/log 路径下。

  • AbstractIndex.scala:它定义了最顶层的抽象类,这个类封装了所有索引类型的公共操作。
  • LazyIndex.scala:它定义了 AbstractIndex 上的一个包装类,实现索引项延迟加载。这个类主要是为了提高性能。
  • OffsetIndex.scala:定义位移索引,保存“< 位移值,文件磁盘物理位置 >”对。
  • TimeIndex.scala:定义时间戳索引,保存“< 时间戳,位移值 >”对。
  • TransactionIndex.scala:定义事务索引,为已中止事务(Aborted Transcation)保存重要的元数据信息。只有启用 Kafka 事务后,这个索引才有可能出现。

这些类的继承关系如下图所示:

其中,OffsetIndex、TimeIndex 和 TransactionIndex 都继承了 AbstractIndex 类,而上层的 LazyIndex 仅仅是包装了一个 AbstractIndex 的实现类,用于延迟加载。

2. AbstractIndex 代码结构

  abstract class AbstractIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1, val writable: Boolean) extends Closeable {    ......    }        

AbstractIndex 定义了 4 个属性字段。由于是一个抽象基类,它的所有子类自动地继承了这 4 个字段。也就是说,Kafka 所有类型的索引对象都定义了这些属性。

  1. 索引文件(file)。每个索引对象在磁盘上都对应了一个索引文件。这个字段是 var 型,说明它是可以被修改的。难道索引对象还能动态更换底层的索引文件吗?自 1.1.0 版本之后,Kafka 允许迁移底层的日志路径,所以,索引文件自然要是可以更换的。
  2. 起始位移值(baseOffset)。索引对象对应日志段对象的起始位移值。举个例子,如果你查看 Kafka 日志路径的话,就会发现,日志文件和索引文件都是成组出现的。比如说,如果日志文件是 00000000000000000123.log,正常情况下,一定还有一组索引文件 00000000000000000123.index、00000000000000000123.timeindex 等。这里的“123”就是这组文件的起始位移值,也就是 baseOffset 值。
  3. 索引文件最大字节数(maxIndexSize)。它控制索引文件的最大长度。Kafka 源码传入该参数的值是 Broker 端参数 segment.index.bytes 的值,即 10MB。这就是在默认情况下,所有 Kafka 索引文件大小都是 10MB 的原因。
  4. 索引文件打开方式(writable)。“True”表示以“读写”方式打开,“False”表示以“只读”方式打开。

AbstractIndex 是抽象的索引对象类。可以说,它是承载索引项的容器,而每个继承它的子类负责定义具体的索引项结构。

比如,OffsetIndex 的索引项是 < 位移值,物理磁盘位置 > 对,TimeIndex 的索引项是 < 时间戳,位移值 > 对。基于这样的设计理念,AbstractIndex 类中定义了一个抽象方法 entrySize 来表示不同索引项的大小,如下所示:

protected def entrySize: Int

子类实现该方法时需要给定自己索引项的大小,对于 OffsetIndex 而言,该值就是 8;对于 TimeIndex 而言,该值是 12。

    // OffsetIndex    override def entrySize = 8    // TimeIndex    override def entrySize = 12

8和12具体什么含义呢?

在 OffsetIndex 中,位移值用 4 个字节来表示,物理磁盘位置也用 4 个字节来表示,所以总共是 8 个字节。位移值不是长整型,应该是 8 个字节才对。上面提到 AbstractIndex 已经保存了 baseOffset 了,这里的位移值,实际上是相对于 baseOffset 的相对位移值,即真实位移值减去 baseOffset 的值,使用相对位移值能够有效地节省磁盘空间。而 Broker 端参数 log.segment.bytes 是整型,这说明,Kafka 中每个日志段文件的大小不会超过 2^32,即 4GB,这就说明同一个日志段文件上的位移值减去 baseOffset 的差值一定在整数范围内。因此,源码只需要 4 个字节保存就行了。

同理,TimeIndex 中的时间戳类型是长整型,占用 8 个字节,位移依然使用相对位移值,占用 4 个字节,因此总共需要 12 个字节。

Kafka 中的索引底层的实现原理是 Java 中的 MappedByteBuffer。使用内存映射文件的主要优势在于,它有很高的 I/O 性能,特别是对于索引这样的小文件来说,由于文件内存被直接映射到一段虚拟内存上,访问内存映射文件的速度要快于普通的读写文件速度。

在 AbstractIndex 中,这个 MappedByteBuffer 就是名为 mmap 的变量。看下源码:

 @volatile      protected var mmap: MappedByteBuffer = {        // 第1步:创建索引文件        val newlyCreated = file.createNewFile()        // 第2步:以writable指定的方式(读写方式或只读方式)打开索引文件        val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r")        try {          if(newlyCreated) {            if(maxIndexSize < entrySize) // 预设的索引文件大小不能太小,如果连一个索引项都保存不了,直接抛出异常              throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)            // 第3步:设置索引文件长度,roundDownToExactMultiple计算的是不超过maxIndexSize的最大整数倍entrySize            // 比如maxIndexSize=1234567,entrySize=8,那么调整后的文件长度为1234560            raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))          }                  // 第4步:更新索引长度字段_length          _length = raf.length()          // 第5步:创建MappedByteBuffer对象          val idx = {            if (writable)              raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length)            else              raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length)          }          /* set the position in the index for the next entry */          // 第6步:如果是新创建的索引文件,将MappedByteBuffer对象的当前位置置成0          // 如果索引文件已存在,将MappedByteBuffer对象的当前位置设置成最后一个索引项所在的位置          if(newlyCreated)            idx.position(0)          else            idx.position(roundDownToExactMultiple(idx.limit(), entrySize))          // 第7步:返回创建的MappedByteBuffer对象          idx        } finally {          CoreUtils.swallow(raf.close(), AbstractIndex) // 关闭打开索引文件句柄        }      }

这些代码最主要的作用就是创建 mmap 对象,AbstractIndex 其他大部分的操作都是和 mmap 相关。

比如:

// 如果我们要计算索引对象中当前有多少个索引项,只需要执行下列计算:protected var _entries: Int = mmap.position() / entrySize  // 如果我们要计算索引文件最多能容纳多少个索引项,只要定义下面的变量就行了:private[this] var _maxEntries: Int = mmap.limit() / entrySize  // 再进一步,有了这两个变量,我们就能够很容易地编写一个方法,来判断当前索引文件是否已经写满:def isFull: Boolean = _entries >= _maxEntries

3. 写入索引项

下面这段代码是 OffsetIndex 的 append 方法,用于向索引文件中写入新索引项。

 def append(offset: Long, position: Int): Unit = {        inLock(lock) {          // 第1步:判断索引文件未写满          require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")          // 第2步:必须满足以下条件之一才允许写入索引项:          // 条件1:当前索引文件为空          // 条件2:要写入的位移大于当前所有已写入的索引项的位移——Kafka规定索引项中的位移值必须是单调增加的          if (_entries == 0 || offset > _lastOffset) {            trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}")            mmap.putInt(relativeOffset(offset)) // 第3步A:向mmap中写入相对位移值            mmap.putInt(position) // 第3步B:向mmap中写入物理位置信息            // 第4步:更新其他元数据统计信息,如当前索引项计数器_entries和当前索引项最新位移值_lastOffset            _entries += 1            _lastOffset = offset            // 第5步:执行校验。写入的索引项格式必须符合要求,即索引项个数*单个索引项占用字节数匹配当前文件物理大小,否则说明文件已损坏            require(_entries * entrySize == mmap.position(), entries + " entries but file position in index is " + mmap.position() + ".")          } else {            // 如果第2步中两个条件都不满足,不能执行写入索引项操作,抛出异常            throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" +              s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.")          }        }      }

4. 查找索引项

索引项的写入逻辑并不复杂,难点在于如何查找索引项。AbstractIndex 定义了抽象方法 parseEntry 用于查找给定的索引项,如下所示:

protected def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry

这里的 “n” 表示要查找给定 ByteBuffer 中保存的第 n 个索引项, IndexEntry 是源码定义的一个接口,里面有两个方法:indexKey 和 indexValue,分别返回不同类型索引的对。

OffsetIndex 实现 parseEntry 的逻辑如下:

    override protected def parseEntry(buffer: ByteBuffer, n: Int): OffsetPosition = {        OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))      }

OffsetPosition 是 IndexEntry 的实现类,Key 就是之前说的位移值,而 Value 就是物理磁盘位置值。所以,这里你能看到代码调用了 relativeOffset(buffer, n) + baseOffset 计算出绝对位移值,之后调用 physical(buffer, n) 计算物理磁盘位置,最后将它们封装到一起作为一个独立的索引项返回。

有了 parseEntry 方法,我们就能够根据给定的 n 来查找索引项了。但是,这里还有个问题需要解决,那就是,我们如何确定要找的索引项在第 n 个槽中呢?也就是如何从一组已排序的数中快速定位符合条件的那个数,二分查找登场。

5. 二分查找算法

到目前为止,从已排序数组中寻找某个数字最快速的算法就是二分查找了,它能做到 O(lgN) 的时间复杂度。Kafka 的索引组件就应用了二分查找算法。

原版的实现代码:

   private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {        // 第1步:如果当前索引为空,直接返回对        if(_entries == 0)          return (-1, -1)                // 第2步:要查找的位移值不能小于当前最小位移值        if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)          return (-1, 0)                // binary search for the entry        // 第3步:执行二分查找算法        var lo = 0        var hi = _entries - 1        while(lo < hi) {          val mid = ceil(hi/2.0 + lo/2.0).toInt          val found = parseEntry(idx, mid)          val compareResult = compareIndexEntry(found, target, searchEntity)          if(compareResult > 0)            hi = mid - 1          else if(compareResult < 0)            lo = mid          else            return (mid, mid)        }                (lo, if (lo == _entries - 1) -1 else lo + 1)

这段代码的核心是,第 3 步的二分查找算法。常刷算法题的朋友,再熟悉不过了。

6. 改进版二分查找算法

大多数操作系统使用页缓存来实现内存映射,而目前几乎所有的操作系统都使用 LRU(Least Recently Used)或类似于 LRU 的机制来管理页缓存。Kafka 写入索引文件的方式是在文件末尾追加写入,而几乎所有的索引查询都集中在索引的尾部。这么来看的话,LRU 机制是非常适合 Kafka 的索引访问场景的。

但,这里有个问题是,当 Kafka 在查询索引的时候,原版的二分查找算法并没有考虑到缓存的问题,因此很可能会导致一些不必要的缺页中断(Page Fault)。此时,Kafka 线程会被阻塞,等待对应的索引项从物理磁盘中读出并放入到页缓存中。

下面举个例子来说明一下这个情况。假设 Kafka 的某个索引占用了操作系统页缓存 13 个页(Page),如果待查找的位移值位于最后一个页上,也就是 Page 12,那么标准的二分查找算法会依次读取页号 0、6、9、11 和 12,具体的流程不过多叙述。

接下来是重点:

通常来说,一个页上保存了成百上千的索引项数据。随着索引文件不断被写入,Page 12 不断地被填充新的索引项。如果此时索引查询方都来自 ISR 副本或 Lag 很小的消费者,那么这些查询大多集中在对 Page 12 的查询,因此,Page 0、6、9、11、12 一定经常性地被源码访问。也就是说,这些页一定保存在页缓存上。

后面当新的索引项填满了 Page 12,页缓存就会申请一个新的 Page 来保存索引项,即 Page 13。现在,最新索引项保存在 Page 13 中。如果要查找最新索引项,原版二分查找算法将会依次访问 Page 0、7、10、12 和 13。此时,问题来了:Page 7 和 10 已经很久没有被访问过了,它们大概率不在页缓存中,因此,一旦索引开始征用 Page 13,就会发生 Page Fault,等待那些冷页数据从磁盘中加载到页缓存。根据资料查询,这种加载过程可能长达 1 秒。显然,这是一个普遍的问题,即每当索引文件占用 Page 数发生变化时,就会强行变更二分查找的搜索路径,从而出现不在页缓存的冷数据必须要加载到页缓存的情形,而这种加载过程是非常耗时的。

基于这个问题,社区提出了改进版的二分查找策略,也就是缓存友好的搜索算法。总体的思路是,代码将所有索引项分成两个部分:热区(Warm Area)和冷区(Cold Area),然后分别在这两个区域内执行二分查找算法,如下图所示:

同样是查询最热的那部分数据,一旦索引占用了更多的 Page,要遍历的 Page 组合就会发生变化。这是导致性能下降的主要原因。这个改进版算法的最大好处在于,查询最热那部分数据所遍历的 Page 永远是固定的,因此大概率在页缓存中,从而避免无意义的 Page Fault。

看到这个设计时,我真的感觉到算法的精妙以及commiter的NB。

看下实际的代码:

    private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {        // 第1步:如果索引为空,直接返回对        if(_entries == 0)          return (-1, -1)            // 封装原版的二分查找算法        def binarySearch(begin: Int, end: Int) : (Int, Int) = {          // binary search for the entry          var lo = begin          var hi = end          while(lo < hi) {            val mid = (lo + hi + 1) >>> 1            val found = parseEntry(idx, mid)            val compareResult = compareIndexEntry(found, target, searchEntity)            if(compareResult > 0)              hi = mid - 1            else if(compareResult < 0)              lo = mid            else              return (mid, mid)          }          (lo, if (lo == _entries - 1) -1 else lo + 1)        }            // 第3步:确认热区首个索引项位于哪个槽。_warmEntries就是所谓的分割线,目前固定为8192字节处        // 如果是OffsetIndex,_warmEntries = 8192 / 8 = 1024,即第1024个槽        // 如果是TimeIndex,_warmEntries = 8192 / 12 = 682,即第682个槽        val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries)        // 第4步:判断target位移值在热区还是冷区        if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {          return binarySearch(firstHotEntry, _entries - 1) // 如果在热区,搜索热区        }            // 第5步:确保target位移值不能小于当前最小位移值        if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)          return (-1, 0)         // 第6步:如果在冷区,搜索冷区        binarySearch(0, firstHotEntry)

最后来张两个算法的总结:

7. 空间与时间的互换

到二分查找还没完,日志段有个参数 indexIntervalBytes, 可以理解为插了多少条消息之后再建一个索引,由此看出kafka的索引其实是稀疏索引,这样可以避免索引文件占用过多的内存,从而可以在内存中保存更多的索引。对应Broker端参数就是 log.index.interval.bytes 值,默认4kb。

实际的通过索引查找消息的过程是通过offset找到索引所在的文件,然后通过二分法找到离目标最近的索引,再顺序遍历消息文件找到目标文件。复杂度为 O(log2n)+O(m), n是索引文件里索引的个数,m为稀疏程度。

这就是时间和空间的互换,数据结构和算法的平衡。

ad09只在一定范围内查找相似对象_kafka日志段中的二分查找相关推荐

  1. SGISTL源码探究-stl_alog.h中的二分查找算法

    前言 在上一小节中我们分析了stl_algo.h中的部分算法.本小节中我们将继续分析其中关于二分查找类的算法,即lower_bound.upper_bound.binary_search.equal_ ...

  2. C语言(CED)查找最接近的元素(分治法/二分查找):在一个非降序列中,查找与给定值最接近的元素。(递归实现)

    (请先看置顶博文)https://blog.csdn.net/GenuineMonster/article/details/104495419 一.题目大意 查找最接近的元素(分治法/二分查找):在一 ...

  3. cuda中的二分查找

    使用背景 通常,在做高性能计算时,我们需要随机的连接某些点.这些点都具有自己的度量值,显然,度量值越大的值随机到的概率就会越大.因此,采用加权值得方法: void getdegreeSum(DG *g ...

  4. python实现二分查找_数据结构和算法:Python实现二分查找(Binary_search)

    在一个列表当中我们可以进行线性查找也可以进行二分查找,即通过不同的方法找到我们想要的数字,线性查找即按照数字从列表里一个一个从左向右查找,找到之后程序停下.而二分查找的效率往往会比线性查找更高. 一. ...

  5. ad09只在一定范围内查找相似对象_23、面向对象编程

    目录: 对象的概念 类与对象 面向对象编程 类的定义与实例化 属性访问 类属性与对象属性 属性查找顺序与绑定方法 小结 视频链接 一 对象的概念 "面向对象"的核心是"对 ...

  6. ad09只在一定范围内查找相似对象_dxp查找相似对象

    (Edit→Find Similar Objects)=Shif+F 查找 相似对象 E+O+S(Edit→Origin→Set)设置坐标原点 E+S+N((Edit → Select → Net) ...

  7. ad09只在一定范围内查找相似对象_重磅!Excel更新了超级查找函数XLOOKUP,可以对VLOOKUP说拜拜了...

    VLOOKUP这个数据查找函数真的是职场必学函数!!! 绝对的,不会VLOOKUP的人会被加班摧残的! 但是面对一些稍复杂的情况 ,VLOOKUP匹配起来却显得比较困难,原因是多数情况需要构造数据 , ...

  8. problem k: 查找某一个数_quot;细节魔鬼quot; 二分查找

    ❝ 二分查找,是一个高效,实用,且易理解的一个查找算法, 通常时间复杂度为O(lgn).局限性在于,待查找对象必须为有序的数组:数据量过小,优势不明显,数据量过大,数组大小受限于内存. 除此之外,二分 ...

  9. C语言中的二分查找法

    ...cpp 二分查找法也称折半查找法,是一种有序的查找方法,时间复杂度为log以2为底n的对数.如果是无序的则要先进行排序操作.基本思想是:把要查找的值和中间元素比较.例如在1 2 3 4 5 6 ...

最新文章

  1. iOS - 内购_类型
  2. 手把手教你用 TensorFlow 实现文本分类(下)
  3. ZYAR20A 亚克力2驱 蓝牙 298寻迹避障机器人 —— 小车黑线循迹红外避障综合实验
  4. mysql存储过程模糊查询_Mysql之存储过程“模糊查询drop表”
  5. 在vscode使用editorconfig的正确姿势
  6. 项目部署到Tomat报错:jar not loaded.See Servlet Spec 2.3, section 9.7.2. Offending
  7. 我们都是孩子。』凄美的爱情青春
  8. dnf自动刷图python_dnf卡盟_【python接口自动化】- openpyxl读取excel数据
  9. QQ 音乐加密音乐文件格式转换并附加专辑图片教程
  10. 一、SQL语句执行过程
  11. 赴日IT派遣,如何避免入坑
  12. Maya创建重力动力模型教程!
  13. excel 多列内容合并为一个单元格
  14. 如何在Jupyter Notebook中编写R程序
  15. 基于随机森林、svm、CNN机器学习的风控欺诈识别模型
  16. 扔掉U盘:两步教你如何使用硬盘装系统(适用gpt磁盘)
  17. MySQL:使用PMM进行性能监控
  18. 使用豆瓣安装python包(以numpy为例)
  19. 计算机学报杂志官网在线出版,计算机学报
  20. html sql网页游戏源码,魔兽世界网页游戏webgame源码(asp)

热门文章

  1. modbus-tcp qt4-socket ---------micro2440 as device
  2. Java设计模式分为创建模式, 结构模式, 行为模式 3种类型
  3. CentOS7Selinux设置
  4. 从 C++ 到 Objective-C 的快速指南 【已翻译100%】
  5. DBus glib 各数据类型接收与发送详解—C语言(3)
  6. 寻找GridView中模板列中的控件
  7. IPC经典入侵,比3389实用(图)
  8. GitHub上个最有意思的项目合集(技术清单系列)
  9. 怎么写脚本_直播脚本怎么写|请收下这份攻略
  10. K8s普通用户配置权限解决User “system:anonymous“ cannot list nodes at the cluster scope