消息存储结构

kafka每个topic有多个partition,单个partition内消息有序。Partition在物理存储上由多个segment组成,每个segment内包含两个文件,index文件和log文件。
物理实体 index文件和log文件
逻辑实体 topic > partition > segment

存储结构

1.partition存储
在kafka文件存储中,同一个Topic下有多个不同的partition,每个partition为一个目录,partition命名规则为topic名称+有序序号,第一个partition序号从0开始,序号最大值为partition数量减1。
每个partition(目录)相当于一个大型文件被平均分配到大小(可配置log.segment.bytes)相等的segment数据文件中,但每个segment file消息数量不一定相等,取决于消息大小,方便快速删除。
2.segment存储
Segment file由两个部分组成,分别是index file和data file,一一对应,成对出现,后缀为.index和.log,分别对应索引文件和数据文件。Segment的文件命名第一个从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。

.index文件是索引文件,每行数据包括两个值,第几条消息+该消息在log文件的物理偏移量。.log文件存储消息的实际数据,每行由offset+message组成。具体如下图所示:

message参数说明:

关键字 解释说明
8 byte offset 在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message
4 byte message size message大小
4 byte CRC32 用crc32校验message
1 byte “magic" 表示本次发布Kafka服务程序协议版本号
1 byte “attributes" 表示为独立版本、或标识压缩类型、或编码类型。
4 byte key length 表示key的长度,当key为-1时,K byte key字段不填
Kbyte key 可选
value bytes payload 表示实际消息数据。

通过offset查找message

以上图中查找offset=36876的message为例,需要通过以下两个步骤:
第一步查找segment file,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0。第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1。同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。当offset=368776时定位到00000000000000368769.index|log

第二步通过segment file查找message,通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到offset=368776为止。

消息生产

基本流程

生产者可与任一broker连接(生产者不会与zookeeper通信),获得topic的partition信息(每个broker都有所有topic信息),找到每个partition的leader所在broker,再与该broker建立连接。发送消息时,通过轮询或者随机选取partition的方式,决定消息被发送到哪一个partition。
kafka的消息发送包括同步和异步两种方式。同步发送可配置acks参数,该参数可配置消息的确认级别。当acks=-1,则要求所有ISR中的replica都确定拿到消息后再返回给生产者成功(leader会先将消息落盘,ISR中的replica拿到后不一定落盘,到内存就算成功);acks=0则直接返回成功(不用leader确认);acks=1,则leader把消息落盘后再返回。异步发送则直接返回发送成功,由后台线程扫描队列长度,达到一定长度或者配置时间再批量发送消息到leader。

基本流程如下:

a. 创建topic时会往zk注册topic的分区信息
b. 生产者从broker获取topic的所有分区
c. 根据一定的负载均衡算法决定将消息发往哪个分区
d. 最终根据分区所在的leader broker将消息发送到broker
e. 当topic分区变化时,生产者会重新从broker获取新的分区信息
Kafka的消息生产者使用Producer.scala,客户端通过producer.type配置可使用sync和async两种模式。客户端调用Producer.send发送消息。
在同步模式下,首先调用DefaultEventHandler.handle方法,序列化消息,序列化方式是默认的Encoder,可自定义实现(producer配置serializer.class),之后在最大重试次数(默认三次)内尝试发送消息,调用dispatchSerializedData方法,在该方法内选择消息的partition。

partition选择

如果消息没有key,且是该客户端对应topic下首条消息,则随机选择一个partition,并缓存对应的partition和topic的关系到sendPartitionPerTopicCache,之后该topic下没有key的消息都将发往该分区。sendPartitionPerTopicCache将在对应的配置时间(topic.metadata.refresh.interval.ms,默认为600000)内clear,防止所有消息都发往同一个partition。
如果消息key不为空,则调用默认的分区方法DefaultPartitioner.partition。key hash之后的值再对分区值取模,得到消息对应的分区。可自行实现Partitioner接口,实现自定义的分区策略(producer新增配置partitioner.class)。

消息如何不丢

消息到达broker后,leader先将该消息落盘。再根据acks参数决定是否返回消息写入成功,如果acks=-1,则需等待ISR中的replica复制消息,全部复制完成后再返回成功,如果等待时间超时,则返回消息发送失败。
如果要严格保证消息不丢失,可给该topic配置两个以上replica,同时生产者的acks设置为-1,每条消息都要求副本确认复制后再返回成功。

消息发送流程图如下:

创建topic——kafka源码探究之一
https://segmentfault.com/a/11...

broker的高可用及高伸缩——kafka源码探究之二
https://segmentfault.com/a/11...

消息生产与消息存储——kafka源码探究之三相关推荐

  1. 刚看完 Kafka 源码,各位随便问!

    Kafka 因其优越的特性广泛用于数据传输.消息中间件的设计.开发和维护等方面,也得到越来越多大厂(阿里.美团.百度.快手等)的青睐,很多 IT 界前辈更是在技术层面不断深挖.最近有位后端三年的朋友在 ...

  2. 10小时,这回一次搞定 Kafka 源码!

    Kafka 因其优越的特性广泛用于日志收集.用户活动跟踪等方面,也得到越来越多企业的青睐,很多 IT 界前辈更是在技术层面不断深挖.目前,如果你还局限在 Kafka 的基本应用,将很难 cover 住 ...

  3. kafka源码分析之一server启动分析

    0. 关键概念 关键概念 Concepts Function Topic 用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上. Partition 是Kafka中横向扩展和一 ...

  4. 最近看Kafka源码,着实被它的客户端缓冲池技术优雅到了

    最近看kafka源码,着实被它的客户端缓冲池技术优雅到了.忍不住要写篇文章赞美一下(哈哈). 注:本文用到的源码来自kafka2.2.2版本. 背景 当我们应用程序调用kafka客户端 produce ...

  5. 使用IntelliJ IDEA搭建kafka源码环境时遇到Output path错误解决办法

    kafka源码环境搭建好之后,需要在IntelliJ IDEA开发工具中以debug方式启动kafka服务器来测试消息的生产和消费. 但是在启动kafka.Kafka类中的main方法(也就是运行 k ...

  6. 4.2.10 Kafka源码剖析, 阅读环境搭建, broker启动流程, topic创建流程, Producer生产者流程, Consumer消费者流程,

    目录 4.1 Kafka源码剖析之源码阅读环境搭建 4.1.1 安装配置Gradle 4.1.2 Scala的安装和配置 4.1.3 Idea配置 4.1.4 源码操作 4.2 Kafka源码剖析之B ...

  7. 聊聊 Kafka:编译 Kafka 源码并搭建源码环境

    一.前言 老周这里编译 Kafka 的版本是 2.7,为啥采用这个版本来搭建源码的阅读环境呢?因为该版本相对来说比较新.而我为啥不用 2.7 后的版本呢?比如 2.8,这是因为去掉了 ZooKeepe ...

  8. 源码系列第1弹 | 带你快速攻略Kafka源码之旅入门篇

    大家过年好,我是 华仔, 又跟大家见面了. 从今天开始我将为大家奉上 Kafka 源码剖析系列文章,正式开启 「Kafka的源码之旅」,跟我一起来掌握 Kafka 源码核心架构设计思想吧. 今天这篇我 ...

  9. 跟我学Kafka源码Producer分析

    2019独角兽企业重金招聘Python工程师标准>>> 跟我学Kafka源码Producer分析 博客分类: MQ 本章主要讲解分析Kafka的Producer的业务逻辑,分发逻辑和 ...

  10. windows下kafka源码阅读环境搭建

    工具准备:jdk1.8,scala-2.11.11,gradle-3.1,zookeeper-3.4.5,kafka-0.10.0.1-src.tgz, kafka_2.11-0.10.0.1.tgz ...

最新文章

  1. JAVA取质数(素数)算法优化
  2. Spring Environment仅用于初始化代码
  3. python正弦波和等腰三角波_51proteus仿真:生成方波、正弦波、锯齿波和三角波
  4. 如何取到两个日期中的每一天,并且打印出来
  5. i2c传输距离_使用 ToF 传感器进行距离测量和手势识别的基本原理
  6. 电脑有两个显示器用来显示,如何配置呢,请看下文
  7. Todoist Chrome:待办事项列表及任务管理
  8. 双硬盘安装win10和linux双系统,双硬盘安装 Win 10 和 Ubuntu 16.04 双系统
  9. Renascence架构简介
  10. idea破解到2100年
  11. 解读Vue项目文件目录结构,实例化Vue对象,数据和方法
  12. android 定时截图,这款 APP 让我每天都忍不住想发截图!
  13. ECCV 2022 | 适用于分类,检测,分割的生成式知识蒸馏开源
  14. utils.data的使用
  15. 物联网安全的发展现状与展望
  16. c语言编程led数码管,数码管显示函数
  17. python 哈姆雷特_哈姆雷特之 React
  18. GTY‘s gay friends 线段树+前缀和
  19. 网络重置后没有wifi连接解决办法
  20. 智慧工地管理系统解决方案厂商-喜讯科技

热门文章

  1. 语义分割空间上下文关系_多尺度空间注意的语义分割
  2. 计算机电缆静电,ZR-DJFPVP计算机电缆
  3. python统计时间的次数的代码_python脚本实现统计日志文件中的ip访问次数代码分享...
  4. python爬取方式_Python3 实现爬取网站下所有URL方式
  5. python md5加密_python中的md5加密
  6. 吴恩达机器学习笔记七_应用机器学习的建议
  7. 学会写出"图形界面+数据库"的程序要多长时间?
  8. 利用Metaweblog技术的API接口同步到多个博客网站(详细)
  9. boost::bind 与 boost::function 的使用方法例子
  10. loadruner知识点小结