LogManager

LogManager会管理broker上所有的logs(在一个log目录下),一个topic的一个partition对应于一个log(一个log子目录)
首先loadLogs会加载每个partition所对应的log对象, 然后提供createLog,getLog,deleteLog之类的管理接口
并且会创建些后台线程来进行,cleanup,flush,checkpoint生成之类的工作

Log

Log只是对于LogSegments的封装,包含loadSegments,append(到active segment),read(需要定位到相应的segment)

LogSegment

Segment是个逻辑概念,为了防止log文件过大, 将log分成许多的LogSegments
Segment又分为两部分,MessageSet文件和Index文件,分别命名为[base_offset].log和[base_offset].index
base_offset就是该Segment的起始offset,比前一个segment里面的offset都要大

Segment提供对于MessageSet的读写接口 
写,需要间隔的更新index文件,应该为了尽量减小index的size,所以只是当写入数据大于indexIntervalBytes时,才增加一条索引
读,由于user传入的是逻辑offest,需要先转化为物理地址才能从文件中读到数据,如何转化参考下面

同时index文件是可以根据MessageSet文件重新rebuild的

FileMessageSet

Segment中实际存放log message的文件,通过FileChannel可以读写文件

   1: /**
   2:  * An on-disk message set. An optional start and end position can be applied to the message set
   3:  * which will allow slicing a subset of the file.
   4:  * @param file The file name for the underlying log data
   5:  * @param channel the underlying file channel used
   6:  * @param start A lower bound on the absolute position in the file from which the message set begins
   7:  * @param end The upper bound on the absolute position in the file at which the message set ends
   8:  * @param isSlice Should the start and end parameters be used for slicing?
   9:  */
  10: @nonthreadsafe
  11: class FileMessageSet private[kafka](@volatile var file: File,
  12:                                     private[log] val channel: FileChannel,
  13:                                     private[log] val start: Int,
  14:                                     private[log] val end: Int,
  15:                                     isSlice: Boolean) extends MessageSet with Logging {...}

OffsetIndex

Segment的index文件, 这是0.8后加上的,之前message直接使用物理offset标识 
新版本中还是改成了使用逻辑offset,让物理地址对用户透明, 这样就需要一个index来匹配逻辑offset和物理地址 
index考虑到效率,最好放在内存中,但是考虑到size问题, 所以使用MappedByteBuffer(参考,Java RandomAccessFile用法 ) 
注释里面说, 
Index是sparse的,不保证每个message在index都有索引的entry 
Index由entry组成,每个entry为8-byte,逻辑offset4-byte,物理地址4-byte 
并且逻辑offset是基于base offset的相对offset,否则无法保证只使用4-byte

   1: /**
   2:  * An index that maps offsets to physical file locations for a particular log segment. This index may be sparse:
   3:  * that is it may not hold an entry for all messages in the log.
   4:  * 
   5:  * The index is stored in a file that is pre-allocated to hold a fixed maximum number of 8-byte entries.
   6:  * 
   7:  * The index supports lookups against a memory-map of this file. These lookups are done using a simple binary search variant
   8:  * to locate the offset/location pair for the greatest offset less than or equal to the target offset.
   9:  * 
  10:  * Index files can be opened in two ways: either as an empty, mutable index that allows appends or
  11:  * an immutable read-only index file that has previously been populated. The makeReadOnly method will turn a mutable file into an 
  12:  * immutable one and truncate off any extra bytes. This is done when the index file is rolled over.
  13:  * 
  14:  * No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt.
  15:  * 
  16:  * The file format is a series of entries. The physical format is a 4 byte "relative" offset and a 4 byte file location for the 
  17:  * message with that offset. The offset stored is relative to the base offset of the index file. So, for example,
  18:  * if the base offset was 50, then the offset 55 would be stored as 5. Using relative offsets in this way let's us use
  19:  * only 4 bytes for the offset.
  20:  * 
  21:  * The frequency of entries is up to the user of this class.
  22:  * 
  23:  * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal 
  24:  * storage format.
  25:  */
  26: class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
  27:   private val lock = new ReentrantLock  //操作index文件需要加锁
  28:   
  29:   /* initialize the memory mapping for this index */
  30:   private var mmap: MappedByteBuffer =  //使用MappedByteBuffer来操作index文件以应对大文件
  31:     {
  32:       val newlyCreated = file.createNewFile()
  33:       val raf = new RandomAccessFile(file, "rw")
  34:       val len = raf.length()
  35:       val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)          
  36:     }
  37:  
  38:   //通过byte偏移从buffer中读出某个entry的内容,offset和physical地址
  39:   /* return the nth offset relative to the base offset */
  40:   private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8)
  41:   /* return the nth physical position */
  42:   private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4)
  43:  
  44:   //通过二分查找找到targetOffset或最接近的offset(less than)
  45:   /**
  46:    * Find the largest offset less than or equal to the given targetOffset 
  47:    * and return a pair holding this offset and it's corresponding physical file position.
  48:    * 
  49:    * @param targetOffset The offset to look up.
  50:    * 
  51:    * @return The offset found and the corresponding file position for this offset. 
  52:    * If the target offset is smaller than the least entry in the index (or the index is empty),
  53:    * the pair (baseOffset, 0) is returned.
  54:    */
  55:   def lookup(targetOffset: Long): OffsetPosition = {...}
  56:  
  57: /**
  58:  * Get the nth offset mapping from the index
  59:  * @param n The entry number in the index
  60:  * @return The offset/position pair at that entry
  61:  */
  62: def entry(n: Int): OffsetPosition = {
  63:   maybeLock(lock) {
  64:     if(n >= entries)
  65:       throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries))
  66:     val idx = mmap.duplicate
  67:     OffsetPosition(relativeOffset(idx, n), physical(idx, n))
  68:   }
  69: }
  70:  
  71: /**
  72:  * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries.
  73:  */
  74: def append(offset: Long, position: Int) {
  75:   inLock(lock) {
  76:     require(!isFull, "Attempt to append to a full index (size = " + size + ").")
  77:     if (size.get == 0 || offset > lastOffset) {
  78:       debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
  79:       this.mmap.putInt((offset - baseOffset).toInt)
  80:       this.mmap.putInt(position)
  81:       this.size.incrementAndGet()
  82:       this.lastOffset = offset
  83:       require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")
  84:     } else {
  85:       throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."
  86:         .format(offset, entries, lastOffset, file.getAbsolutePath))
  87:     }
  88:   }
  89: }

具体看看如何从逻辑offset转化为物理地址的?

0.8中增加了逻辑offset,那么就需要做逻辑offset和物理地址间的转化
简单的方法,直接用hashmap,cache所有offset,问题就是这样空间耗费比较大
所以kafka的方式,是分段索引,用offset通过二分查找中index中找出段的起始地址,然后再去file里面遍历找出精确的地址, 时间换空间的设计

1. LogSegment.translateOffset
首先是从index文件中找到近似的物理地址
前面说了,index中从效率考虑并不会为每个offset建立索引entry,只会分段建立offset索引, 所以从index中直接可以找到精确物理地址的概率不大,但是可以找到最接近的那个物理地址
如果你觉得index的粒度比较粗,可以直接给出开始查找的startingFilePosition
所以精确的物理地址需要到MessageSet文件里面去继续找

2. FileMessageSet.searchFor
在messageSet中,message的构成是,overhead(MessageSize+Offset)和message
而searchFor的逻辑是从startingPosition开始, 逐条遍历各个message,并从overhead中取出offset进行比较,直到找到target offset为止

本文章摘自博客园,原文发布日期:2014-02-18

Apache Kafka源码分析 – Log Management相关推荐

  1. apache kafka源码分析-Producer分析---转载

    原文地址:http://www.aboutyun.com/thread-9938-1-1.html 问题导读 1.Kafka提供了Producer类作为java producer的api,此类有几种发 ...

  2. Kafka 源码分析之网络层(二)

    上一篇介绍了概述和网络层模型实现<Kafka 源码分析之网络层(一)>,本编主要介绍在Processor中使用的nio selector的又一封装,负责具体数据的接收和发送. PS:丰富的 ...

  3. kafka源码分析-consumer的分区策略

    kafka源码分析-consumer的分区策略 1.AbstractPartitionAssignor 2.RangeAssignor 3.RoundRobinAssignor 4.StickyAss ...

  4. Kafka源码分析10:副本状态机ReplicaStateMachine详解 (图解+秒懂+史上最全)

    文章很长,建议收藏起来,慢慢读! Java 高并发 发烧友社群:疯狂创客圈 奉上以下珍贵的学习资源: 免费赠送 经典图书:<Java高并发核心编程(卷1)> 面试必备 + 大厂必备 +涨薪 ...

  5. kafka源码分析之一server启动分析

    0. 关键概念 关键概念 Concepts Function Topic 用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上. Partition 是Kafka中横向扩展和一 ...

  6. Kafka 源码分析之网络层(一)

    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据.小编会给大家带来几期 Kafka 相关的源码分析文章.这一系列文章是基于kafka 0.9.1版本,今天 ...

  7. apache dubbo 源码分析系列汇总

    Dubbo(读音[ˈdʌbəʊ])是阿里巴巴公司开源的一个高性能优秀的服务框架,使得应用可通过高性能的 RPC 实现服务的输出和输入功能,可以和 Spring框架无缝集成.后面捐献给了知名的开源社区 ...

  8. Apache Drill源码分析(3)--DrilBit以及物理计划

    layout: post Drill源码阅读(3) : 分析DrillBit各个角色 UserServer处理RUN_QUERY_VALUE客户端的查询请求,会将任务分派给UserWorker处理, ...

  9. Apache Drill源码分析(2) 分析一次具体的查询过程以及RPC

    layout: post Drill源码阅读(2) : 分析一次查询过程以及RPC 一次Query的生命周期 Foreman线程的run方法中的queryRequest是org.apache.dril ...

最新文章

  1. 为tomcat6批量生成安全证书
  2. NTFS的交换数据流ADS应用
  3. prometheus 基于文件的目标发现
  4. 第十七部分-Python文档和测试
  5. oracle语句转成mysql语句_如何监控Mysql语句
  6. SAP Spartacus 开发规范
  7. windows最重要的三个dll
  8. iOS touch事件单击双击区分响应
  9. linux7yum安装mysql,CentOS7 使用yum安装mysql
  10. hibernate中get方法和load方法的根本区别
  11. Java开发人员可以从Spring框架中学到编程技巧
  12. 2月26 ubuntu系统备份还原到相同电脑、另一台电脑
  13. 企业管理理论综述与实践 — 战略
  14. 红外图分特点析及红外图像分割
  15. 最新全志超清4K视频编解码芯片-V316处理器
  16. matlab如何按行查找重复值?
  17. 石墨烯在生物医学上应用的研究进展_石墨烯在润滑油中的应用
  18. 半导体封装测试英语词汇量的软件,半导体英语
  19. Excel 如何批量查询快递单号
  20. 科大讯飞“飞星计划”一面二面面经

热门文章

  1. python time 时钟计时_如何使用Python的timeit计时代码段以测试性能?
  2. Android之实现上下左右翻页效果
  3. 一条消息是如何按照二进制协议写入到Batch中的ByteBuffer中的
  4. 设计模式之单例模式学习笔记
  5. (011) java后台开发之泛型类
  6. (iOS-基本知识)堆和栈的基本知识详解
  7. Recyclerview设置间距
  8. TP5.0 PHPExcel 数据表格导出导入(引)
  9. Spark源码剖析 - SparkContext的初始化(八)_初始化管理器BlockManager
  10. Redis缓存使用技巧