摘要

主要是介绍的kafka的日志存储系统

文件目录布局

回顾之前所学的知识:Kafka 中的消息是以主题为基本单位进行归类的,各个主题在逻辑上相互独立。每个主题又可以分为一个或多个分区,分区的数量可以在主题创建的时候指定,也可以在之后修改。每条消息在发送的时候会根据分区规则被追加到指定的分区中,分区中的每条消息都会被分配一个唯一的序列号,也就是通常所说的偏移量(offset),具有4个分区的主题的逻辑结构见图1-2。

如果分区规则设置得合理,那么所有的消息可以均匀地分布到不同的分区中,这样就可以实现水平扩展。不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka又引入了日志分段(Logs egment)的概念,将Log切分为多个LogS egment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。事实上,Log 和Logs egmient也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个Logsegment对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以".txnindex”为后缀的事务索引文件)。图4-1描绘了主题、分区与副本之间的关系,在图5- 1中又补充了Log 和 LogS egment的关系。

向Log中追加消息时是顺序写入的,只有最后一个LogSegment才能执行写入操作,在此之前所有的LogSegment 都不能写入数据。为了方便描述,我们将最后一个 LogSegment称为“activeSegment”,即表示当前活跃的日志分段。随着消息的不断写入,当activeSegment满足一定的条件时,就需要创建新的activeSegment,之后追加的消息将写入新的activeSegment。

注意每个LogSegment中不只包含“.log”".index”“.timeindex”这3种文件,还可能包含“.deleted”“".cleaned”".swap”等临时文件,以及可能的“..snapshot”“.txnindex "“leader-epoch-checkpoint”等文件。

日志格式的演变

消息压缩

常见的压缩算法是数据量越大压缩效果越好,一条消息通常不会太大,这就导致压缩效果并不是太好。而Kafka实现的压缩方式是将多条消息一起进行压缩,这样可以保证较好的压缩效果。在一般情况下,生产者发送的压缩数据在broker 中也是保持压缩状态进行存储的,消费者从服务端获取的也是压缩的消息,消费者在处理消息之前才会解压消息,这样保持了端到端的压缩。

Kafka日志中使用哪种压缩方式是通过参数 compression.type来配置的,默认值为“producer”,表示保留生产者使用的压缩方式。这个参数还可以配置为“gzip""snappy”"Iz4”,分别对应GZIP、SNAPPY、LZ4这3种压缩算法。如果参数compression.type 配置为“uncompressed”,则表示不压缩。

以上都是针对消息未压缩的情况,而当消息压缩时是将整个消息集进行压缩作为内层消息( inner message),内层消息整体作为外层( wrapper message)的value,其结构如图5-5所示。

在讲述v1版本的消息时,我们了解到vl版本比 v0版的消息多了一个timestamp字段。对于压缩的情形,外层消息的timestamp 设置为:

  • ·如果timestamp类型是CreateTime,那么设置的是内层消息中最大的时间戳。·如果timestamp类型是LogAppendTime,那么设置的是Kafka服务器当前的时间戳内层消息的timestamp设置为:
  • 如果外层消息的timestamp类型是CreateTime,那么设置的是生产者创建消息时的时间戳。
  • 如果外层消息的timestamp类型是LogAppendTime,那么所有内层消息的时间戳都会被忽略。

日志索引(offset timestamp)

本章开头就提及了每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。偏移量索引文件用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置;时间戳索引文件则根据指定的时间戳(timestamp)来查找对应的偏移量信息。

Kafka中的索引文件以稀疏索引(sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(由broker端参数log.index.interval.bytes 指定,默认值为4096,即 4KB)的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,增大或减小log.index.interval.bytes的值,对应地可以增加或缩小索引项的密度。

稀疏索引通过MappedByteBuffer将索引文件映射到内存中,以加快索引的查询速度偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,使用二分查找法来快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量。时间戳索引文件中的时间戳也保持严格的单调递增,查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的最大偏移量,至于要找到对应的物理文件位置还需要根据偏移量索引文件来进行再次定位。稀疏索引的方式是在磁盘空间、内存空间、查找时间等多方面之间的一个折中。

本章开头也提及日志分段文件达到一定的条件时需要进行切分,那么其对应的索引文件也需要进行切分。日志分段文件切分包含以下几个条件,满足其一即可。

  • (1)当前日志分段文件的大小超过了broker端参数log.segment.bytes配置的值。log.segment. bytes参数的默认值为1073741824,即1GB。
  • (2)当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于 log.roll.ms或log.roll.hours参数配置的值。如果同时配置了log.roll.ms和log.roll.hours参数,那么log.roll.ms 的优先级高。默认情况下,只配置了log.roll.hours参数,其值为168,即7天。
  • (3)偏移量索引文件或时间戳索引文件的大小达到 broker端参数log.index.size.maxbytes 配置的值。log.index.size.max. bytes 的默认值为10485760,即10MB。
  • (4)追加的消息的偏移量与当前日志分段的偏移量之间的差值大于Integer.MAX_VALUE,即要追加的消息的偏移量不能转变为相对偏移量(offset - baseOffset > Integer.MAX_VALUE)。对非当前活跃的日志分段而言,其对应的索引文件内容已经固定而不需要再写入索引项,所以会被设定为只读。而对当前活跃的日志分段( activeSegment)而言,索引文件还会追加更多的索引项,所以被设定为可读写。在索引文件切分的时候,Kafka 会关闭当前正在写入的索引文件并置为只读模式,同时以可读写的模式创建新的索引文件,索引文件的大小由broker端参数log.index.size.max.bytes配置。Kafka在创建索引文件的时候会为其预分配log.index.size.max.bytes大小的空间,注意这一点与日志分段文件不同,只有当索引文件进行切分的时候,Kafka 才会把该索引文件裁剪到实际的数据大小。也就是说,与当前活跃的日志分段对应的索引文件的大小固定为 log.index.size.max.bytes,而其余日志分段对应的索引文件的大小为实际的占用空间。

日志清理

Kafka将消息存储在磁盘中,为了控制磁盘占用空间的不断增加就需要对消息做一定的清理操作。Kafka 中每一个分区副本都对应一个Log,而 Log 又可以分为多个日志分段,这样也便于日志的清理操作。Kafka提供了两种日志清理策略。

(1)日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段。

(2)日志压缩(Log Compaction)﹔针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本。

Log Compaction执行前后,日志分段中的每条消息的偏移量和写入时的偏移量保持一致。Log Compaction 会生成新的日志分段文件日志分段中每条消息的物理位置会重新按照新文件来组织Log Compaction执行过后的偏移量不再是连续的,不过这并不影响日志的查询。Kafka中的 Log Compaction可以类比于Redis 中的RDB的持久化模式。试想一下,如果个系统使用Kafka来保存状态,那么每次有状态变更都会将其写入Kafka。在某一时刻此系统异常崩溃,进而在恢复时通过读取Kafka中的消息来恢复其应有的状态,那么此系统关心的是它原本的最新状态而不是历史时刻中的每一个状态。如果Kafka 的日志保存策略是日志删除(Log Deletion),那么系统势必要一股脑地读取Kafka 中的所有数据来进行恢复,如果日志保存策略是 Log Compaction,那么可以减少数据的加载量进而加快系统的恢复速度。Log Compaction在某些应用场景下可以简化技术栈,提高系统整体的质量。

磁盘存储

Kafka依赖于文件系统(更底层地来说就是磁盘)来存储和缓存消息。在我们的印象中,对于各个存储介质的速度认知大体同图5-20所示的相同,层级越高代表速度越快。很显然,磁盘处于一个比较尴尬的位置,这不禁让我们怀疑Kafka采用这种持久化形式能否提供有竞争力的性能。在传统的消息中间件 Rabbi tMQ 中,就使用内存作为默认的存储介质,而磁盘作为备选介质,以此实现高吞吐和低延迟的特性。然而,事实上磁盘可以比我们预想的要快,也可能比我们预想的要慢,这完全取决于我们如何使用。

kafka在设计时采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消息,并且也不允许修改已写入的消息这种方式属于典型的顺序写盘的操作所以就算 Kafka使用磁盘作为存储介质,它所能承载的吞吐量也不容小觑。但这并不是让Kafka在性能上具备足够竞争力的唯一因素,我们不妨继续分析。

页缓存

页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘IO 的操作。具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。为了弥补性能上的差异,现代操作系统越来越“激进地”将内存作为磁盘缓存,甚至会非常乐意将所有可用的内存用作磁盘缓存,这样当内存回收时也几乎没有性能损失,所有对于磁盘的读写也将经由统一的缓存。

当一个进程准备读取磁盘上的文件内容时,操作系统会先查看待读取的数据所在的页(page)是否在页缓存(pagecache)中,如果存在(命中)则直接返回数据,从而避免了对物理磁盘的TO操作;如果没有命中,则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存,之后再将数据返回给进程。同样,如果一个进程需要将数据写入磁盘,那么操作系统也会检测数据对应的页是否在页缓存中,如果不存在,则会先在页缓存中添加相应的页,最后将数据写入对应的页。被修改过后的页也就变成了脏页,操作系统会在合适的时间把脏页中的数据写入磁盘,以保持数据的一致性。

Kafka 中大量使用了页缓存,这是 Kafka实现高吞吐的重要因素之一。虽然消息都是先被写入页缓存,然后由操作系统负责具体的刷盘任务的,但在Kafka中同样提供了同步刷盘及间断性强制刷盘(fsync)的功能,这些功能可以通过 log.flush.interval. messages、log.flush.interval.ms等参数来控制。同步刷盘可以提高消息的可靠性,防止由于机器掉电等异常造成处于页缓存而没有及时写入磁盘的消息丢失。不过笔者并不建议这么做,刷盘任务就应交由操作系统去调配,消息的可靠性应该由多副本机制来保障而不是由同步刷盘这种严重影响性能的行为来保障。

Linux系统会使用磁盘的一部分作为swap分区,这样可以进行进程的调度:把当前非活跃的进程调入 swap分区,以此把内存空出来让给活跃的进程。对大量使用系统页缓存的Kafka而言,应当尽量避免这种内存的交换,否则会对它各方面的性能产生很大的负面影响。我们可以通过修改vm.swappiness参数(Linux系统参数)来进行调节。vm.swappiness 参数的上限为100,它表示积极地使用swap分区,并把内存上的数据及时地搬运到swap分区中;vm.swappiness 参数的下限为0,表示在任何情况下都不要发生交换 (vm.swappiness=0的含义在不同版本的Linux内核中不太相同,这里采用的是变更后的最新解释),这样一来,当内存耗尽时会根据一定的规则突然中止某些进程。笔者建议将这个参数的值设置为1,这样保留了swap 的机制而又最大限度地限制了它对Kafka 性能的影响。

磁盘I/O流程

参考图5-22,从编程角度而言,一般磁盘IO的场景有以下四种

  • (1)用户调用标准C库进行I/O操作,数据流为:应用程序buffer→C库标准 IObuffer→文件系统页缓存→通过具体文件系统到磁盘。
  • (2)用户调用文件 I/O,数据流为:应用程序buffer→文件系统页缓存→通过具体文件系统到磁盘。
  • (3)用户打开文件时使用O_DIRECT,绕过页缓存直接读写磁盘。
  • (4)用户使用类似dd 工具,并使用direct参数,绕过系统cache 与文件系统直接写磁盘。发起IO请求的步骤可以表述为如下的内容(以最长链路为例)。

写操作:用户调用fwrite把数据写入C库标准IObuffer 后就返回,即写操作通常是异步操作;数据写入C库标准 IObuffer后,不会立即刷新到磁盘,会将多次小数据量相邻写操作先缓存起来合并,最终调用write函数一次性写入(或者将大块数据分解多次write调用)页缓存;数据到达页缓存后也不会立即刷新到磁盘,内核有pdflush线程在不停地检测脏页,判断是否要写回到磁盘,如果是则发起磁盘IO请求。

读操作:用户调用fread到C库标准IObuffer 中读取数据,如果成功则返回,否则继续;到页缓存中读取数据,如果成功则返回,否则继续;发起I/O 请求,读取数据后缓存 buffer 和C库标准IObuffer并返回。可以看出,读操作是同步请求。

I/O请求处理:通用块层根据I/O请求构造一个或多个bio结构并提交给调度层;调度器将 bio 结构进行排序和合并组织成队列且确保读写操作尽可能理想:将一个或多个进程的读操作合并到一起读,将一个或多个进程的写操作合并到一起写,尽可能变随机为顺序(因为随机读写比顺序读写要慢),读必须优先满足,而写也不能等太久。

针对不同的应用场景,I/O 调度策略也会影响I/O的读写性能,目前Linux系统中的IO调度策略有4种,分别为NOOP、CFQ、DEADLINE和ANTICIPATORY,默认为CFQ。

1.NOOP:NOOP 算法的全写为No Operation。该算法实现了最简单的FIFO队列,所有I/O请求大致,按照先来后到的顺序进行操作。之所以说“大致”,原因是NOOP在 FIFO 的基础上还做了相邻IO请求的合并,并不是完全按照先进先出的规则满足IO请求。

2.CFQ:CFQ算法的全写为Completely Fair Queuing。该算法的特点是按照IO请求的地址进行排序,而不是按照先来后到的顺序进行响应。CFQ是默认的磁盘调度算法,对于通用服务器来说是最好的选择。它试图均匀地分布对IO带宽的访问。CFQ为每个进程单独创建一个队列来管理该进程所产生的请求,也就是说,每个进程一个队列,各队列之间的调度使用时间片进行调度,以此来保证每个进程都能被很好地分配到I/O带宽。I/O调度器每次执行一个进程的4次请求。在传统的SAS 盘上,磁盘寻道花去了绝大多数的I/O响应时间。CFQ 的出发点是对I/O地址进行排序,以尽量少的磁盘旋转次数来满足尽可能多的IO请求。在CFQ算法下,SAS盘的吞吐量大大提高了。相比于NOOP 的缺点是,先来的IO请求并不一定能被满足,可能会出现“饿死”的情况。

3.DEADLINE:DEADLINE在CFQ的基础上,解决了IO请求“饿死”的极端情况。除了CFQ本身具有的I/O排序队列,DEADLINE 额外分别为读IO和写I/O提供了FIFO队列。读FIFO队列的最大等待时间为500ms,写FIFO队列的最大等待时间为5s。FIFO队列内的IO 请求优先级要比CFQ队列中的高,而读FIFO队列的优先级又比写FIFO队列的优先级高。优先级可以表示如下:

4.ANTICIPATORY:CFQ和 DEADLINE考虑的焦点在于满足零散IO请求上。对于连续的I/O请求,比如顺序读,并没有做优化。为了满足随机IO 和顺序I/O混合的场景,Linux还支持ANTICIPATORY调度算法。ANTICIPATORY在 DEADLINE的基础上,为每个读IO都设置了6ms 的等待时间窗口。如果在6ms内OS收到了相邻位置的读IO请求,就可以立即满足。ANTICIPATORY算法通过增加等待时间来获得更高的性能,假设一个块设备只有一个物理查找磁头(例如一个单独的SATA硬盘),将多个随机的小写入流合并成一个大写入流(相当于将随机读写变顺序读写),通过这个原理来使用读取/写入的延时换取最大的读取/写入吞吐量。适用于大多数环境,特别是读取/写入较多的环境。

不同的磁盘调度算法(以及相应的I/O优化手段)对Kafka这类依赖磁盘运转的应用的影响很大,建议根据不同的业务需求来测试并选择合适的磁盘调度算法。从文件系统层面分析,Kafka操作的都是普通文件,并没有依赖于特定的文件系统,但是依然推荐使用EXT4或XFS。尤其是对XFS而言,它通常有更好的性能,这种性能的提升主要影响的是Kafka的写入性能。

零拷贝

除了消息顺序追加、页缓存等技术,Kafka还使用零拷贝(Zero-Copy)技术来进一步提升性能。所谓的零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。零拷贝大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换对Linux操作系统而言,零拷贝技术依赖于底层的l sendfile()方法实现。对应于Java 语言,FileChannal.transferTo()方法的底层实现就是sendfile()方法。

单纯从概念上理解“零拷贝”比较抽象,这里简单地介绍一下它。考虑这样一种常用的情形:你需要将静态内容(类似图片、文件)展示给用户。这个情形就意味着需要先将静态内容从磁盘中复制出来放到一个内存 buf中,热后将这个 buf通过套接字(Socket)传输给用户,进而用户获得静态内容。这看起来再正常不过了,但实际上这是很低效的流程,我们把上面的这种情形抽象成下面的过程:

首先调用read()将静态内容(这里假设为文件A)读取到tmp_buf,然后调用write()将tmp_buf写入Socket,如图5-23所示。
在这个过程中,文件A经历了4次复制的过程:

  • (1)调用read()时,文件A中的内容被复制到了内核模式下的Read Buffer中。
  • (2)CPU控制将内核模式数据复制到用户模式下。
  • (3)调用write()时,将用户模式下的内容复制到内核模式下的Socket Buffer 中。
  • (4)将内核模式下的Socket Buffer的数据复制到网卡设备中传送。

从上面的过程可以看出,数据平白无故地从内核模式到用户模式“走了一圈”,浪费了2次复制过程:第一次是从内核模式复制到用户模式;第二次是从用户模式再复制回内核模式,即上面4次过程中的第﹖步和第3步。而且在上面的过程中,内核和用户模式的上下文的切换也是4次。

如果采用了零拷贝技术那么应用程序可以直接请求内核把磁盘中的数据传输给Socket,如图5-24所示。

零拷贝技术通过DMA(Direct Memory Access)技术将文件内容复制到内核模式下的ReadBuffer中。不过没有数据被复制到Socket Buffer,相反只有包含数据的位置和长度的信息的件描述符被加到Socket Buffer 中。DMA引擎直接将数据从内核模式中传递到网卡设备(协议引擎)。这里数据只经历了⒉次复制就从磁盘中传送出去了,并且上下文切换也变成了2次零拷贝是针对内核模式而言的,数据在内核模式下实现了零拷贝。

Kafka——Kafka的日志存储(5)相关推荐

  1. CC00060.kafka——|Hadoopkafka.V45|——|kafka.v45|日志存储概述|

    一.日志存储概述 ### --- 日志存储概述~~~ Kafka 消息是以主题为单位进行归类,各个主题之间是彼此独立的,互不影响. ~~~ 每个主题又可以分为一个或多个分区. ~~~ 每个分区各自存在 ...

  2. Kafka和RocketMQ底层存储之那些你不知道的事

    大家好,我是yes. 我们都知道 RocketMQ 和 Kafka 消息都是存在磁盘中的,那为什么消息存磁盘读写还可以这么快?有没有做了什么优化?都是存磁盘它们两者的实现之间有什么区别么?各自有什么优 ...

  3. Kafka和RocketMQ底层存储:零拷贝技术

    零拷本相关 <[转]零拷贝的实现原理> <[转]零拷贝的实现原理> <搞懂Linux零拷贝,DMA> <通过零拷贝进行有效的数据传输(java.c)> ...

  4. Kafka 和 RocketMQ 底层存储之那些你不知道的事

    来源 | yes的练级攻略 头图 | CSDN付费下载自图虫 大家好,我是yes. 我们都知道 RocketMQ 和 Kafka 消息都是存在磁盘中的,那为什么消息存磁盘读写还可以这么快?有没有做了什 ...

  5. Kafka+Log4j实现日志集中管理

    引言 前段时间写的<Spring+Log4j+ActiveMQ实现远程记录日志--实战+分析>得到了许多同学的认可,在认可的同时,也有同学提出可以使用Kafka来集中管理日志,于是今天就来 ...

  6. kafka数据和日志目录迁移教程

    简介 Kafka在运行的过程中,存储在磁盘上的数据会逐渐扩大,甚至会撑爆系统盘,在线上环境我们通常会把kafka的数据存储目录和日志存储目录迁移到磁盘中,或者扩容kafka的存储磁盘.本文将一站式解决 ...

  7. Kafka + ELK实现日志采集

    Kafka是一个高吞吐量的分布式发布订阅消息系统,它的应用场景很多,如日志采集.消息系统.运营指标等.在日志采集的场景中,我们项目的重要服务可能会通过集群进行部署,每个服务有它自己的日志记录产生,这些 ...

  8. scribe、chukwa、kafka、flume日志系统对比

    为什么80%的码农都做不了架构师?>>>    1. 背景介绍许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理这些日志需要特定的日志系统,一般而 ...

  9. 如何关闭kafka的控制台日志

    本文来说下如何关闭kafka的控制台日志 文章目录 问题描述 如何解决 问题描述 在kafka和springboot整合的时候,控制台会打印出kafka相关的日志,如何关闭这些日志呢 查看日志信息,发 ...

  10. Kafka的offset自定义存储实现

    一.什么是Offset 在kafka中,每一条消息都有一个与之对应的序列号,这个序列号就是offset,表示消息的偏移量. 特点: 偏移量从0开始递增 topic中的每个分区维护自己的一个offset ...

最新文章

  1. 解决:error: Cannot find libmysqlclient_r under /usr/local/mysql.
  2. java canonicalize_java.io.IOException:java.io.WinNTFileSystem.canonicalize0处的无效参数
  3. Java 100(三)
  4. Unity内实现Android APK版本更新
  5. linux安装mongodb(设置非root用户和开机启动)
  6. 王栋: 要做好推荐,只有技术是不够的
  7. ReactNative调研结果
  8. 错误代码1833 Cannot change column used in a foreign
  9. 闪迪内存卡软件测试,闪迪存储卡怎么样
  10. Linux 音频系统简析
  11. 关于:WindowsOffice 产品语言包
  12. Three.js - 光源使用详解3(环境光 HemisphereLight、镜头光晕 LensFlare)
  13. Python——青蛙旅行项目
  14. 小程序开发教程,适合小白哦
  15. windows2016安装AD域
  16. .Net 垃圾回收机制原理(二)
  17. MemoryError: Unable to allocate array with shape (61721, 16000) and data typ
  18. iphone4s改装 linux,iPhone4S降级教程(支持iOS5.1.1)可实现完美越狱
  19. 反射 Reflect Class 基础 API MD
  20. 股份有限公司的章程包括哪些内容?

热门文章

  1. java身份认证_WEB应用中的基本身份验证和表单身份验证(中文)
  2. 【解锁】Pandoc——Pandoc安装、使用、快速上手
  3. A*算法最合理的数据结构
  4. 搭建IP代理池伪装IP地址
  5. CH3 HTML基础 1
  6. magento 为用户注册增加一个字段
  7. 《视觉slam十四讲》第3讲课后习题
  8. v36.05 鸿蒙内核源码分析(工作模式) | 程序界的韦小宝是谁 | 百篇博客分析HarmonyOS源码
  9. 计算机的磁盘管理在哪,磁盘管理器在哪
  10. 最新的计算机主板,最新主流电脑主板天梯图2020