戳蓝字“CSDN云计算”关注我们哦!

Kafka 对外使用 Topic 的概念,生产者往 Topic 里写消息,消费者从中读消息。为了做到水平扩展,一个 Topic 实际是由多个 Partition 组成的,遇到瓶颈时,可以通过增加 Partition 的数量来进行横向扩容。单个 Parition 内是保证消息有序。

每新写一条消息,Kafka 就是在对应的文件 append 写,所以性能非常高。

Kafka 的总体数据流是这样的:

大概用法就是,Producers 往 Brokers 里面的指定 Topic 中写消息,Consumers 从 Brokers 里面拉取指定 Topic 的消息,然后进行业务处理。图中有两个 Topic,Topic0 有两个 Partition,Topic1 有一个 Partition,三副本备份。
可以看到 Consumer Gourp1 中的 Consumer2 没有分到 Partition 处理,这是有可能出现的,下面会讲到。关于 Broker、Topics、Partitions 的一些元信息用 ZK 来存,监控和路由啥的也都会用到 ZK。
———————————— 生产 ————————————

基本流程是这样的:

创建一条记录,记录中一个要指定对应的 Topic 和 Value,Key 和 Partition 可选。 
先序列化,然后按照 Topic 和 Partition,放进对应的发送队列中。Kafka Produce 都是批量请求,会积攒一批,然后一起发送,不是调 send()就立刻进行网络发包。
如果 Partition 没填,那么情况会是这样的:
  • Key 有填。按照 Key 进行哈希,相同 Key 去一个 Partition。(如果扩展了 Partition 的数量那么就不能保证了)
  • Key 没填。Round-Robin 来选 Partition。
这些要发往同一个 Partition 的请求按照配置,攒一波,然后由一个单独的线程一次性发过去。
  • API

有 High Level API,替我们把很多事情都干了,Offset,路由啥都替我们干了,用起来很简单。还有 Simple API,Offset 啥的都是要我们自己记录。(注:消息消费的时候,首先要知道去哪消费,这就是路由,消费完之后,要记录消费单哪,就是 Offset)
  • Partition

当存在多副本的情况下,会尽量把多个副本,分配到不同的 Broker 上。Kafka 会为 Partition 选出一个 Leader,之后所有该 Partition 的请求,实际操作的都是 Leader,然后再同步到其他的 Follower。
当一个 Broker 歇菜后,所有 Leader 在该 Broker 上的 Partition 都会重新选举,选出一个 Leader。(这里不像分布式文件存储系统那样会自动进行复制保持副本数)
然后这里就涉及两个细节:
  • 怎么分配 Partition
  • 怎么选 Leader
关于 Partition 的分配,还有 Leader 的选举,总得有个执行者。在 Kafka 中,这个执行者就叫 Controller。Kafka 使用 ZK 在 Broker 中选出一个 Controller,用于 Partition 分配和 Leader 选举。

Partition 的分配:

  • 将所有 Broker(假设共 n 个 Broker)和待分配的 Partition 排序。

  • 将第 i 个 Partition 分配到第(i mod n)个 Broker 上 (这个就是 Leader)。

  • 将第 i 个 Partition 的第 j 个 Replica 分配到第((i + j) mode n)个 Broker 上。

  • Leader容灾

Controller 会在 ZK 的 /brokers/ids 节点上注册 Watch,一旦有 Broker 宕机,它就能知道。当 Broker 宕机后,Controller 就会给受到影响的 Partition 选出新 Leader。

Controller 从 ZK 的 /brokers/topics/[topic]/partitions/[partition]/state 中,读取对应 Partition 的 ISR(in-sync replica 已同步的副本)列表,选一个出来做 Leader。选出 Leader后,更新ZK,然后发送 LeaderAndISRRequest 给受影响的 Broker,让它们知道改变这事。

为什么这里不是使用 ZK 通知,而是直接给 Broker 发送 RPC 请求,我的理解可能是这样做 ZK 有性能问题吧。如果 ISR 列表是空,那么会根据配置,随便选一个 Replica 做 Leader,或者干脆这个 Partition 就是歇菜;如果 ISR 列表的有机器,但是也歇菜了,那么还可以等 ISR 的机器活过来。

  • 多副本同步

这里的策略,服务端这边的处理是 Follower 从 Leader 批量拉取数据来同步。但是具体的可靠性,是由生产者来决定的。生产者生产消息的时候,通过 request.required.acks 参数来设置数据的可靠性。

在 Acks=-1 的时候,如果 ISR 少于 min.insync.replicas 指定的数目,那么就会返回不可用。

这里 ISR 列表中的机器是会变化的,根据配置 replica.lag.time.max.ms,多久没同步,就会从 ISR 列表中剔除。以前还有根据落后多少条消息就踢出 ISR,在 1.0 版本后就去掉了,因为这个值很难取,在高峰的时候很容易出现节点不断的进出 ISR 列表。从 ISA 中选出 Leader 后,Follower 会把自己日志中上一个高水位后面的记录去掉,然后去和 Leader 拿新的数据。

因为新的 Leader 选出来后,Follower 上面的数据,可能比新 Leader 多,所以要截取。这里高水位的意思,对于 Partition 和 Leader,就是所有 ISR 中都有的最新一条记录。消费者最多只能读到高水位。

从 Leader 的角度来说高水位的更新会延迟一轮,例如写入了一条新消息,ISR 中的 Broker 都 Fetch 到了,但是 ISR 中的 Broker 只有在下一轮的 Fetch 中才能告诉 Leader。也正是由于这个高水位延迟一轮,在一些情况下,Kafka 会出现丢数据和主备数据不一致的情况,0.11 开始,使用 Leader Epoch 来代替高水位。

思考:当 Acks=-1 时

  • 是 Follwers 都来 Fetch 就返回成功,还是等 Follwers 第二轮 Fetch?

  • Leader 已经写入本地,但是 ISR 中有些机器失败,那么怎么处理呢?

———————————— 消费 ————————————

订阅 Topic 是以一个消费组来订阅的,一个消费组里面可以有多个消费者。同一个消费组中的两个消费者,不会同时消费一个 Partition。换句话来说,就是一个 Partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费。

因此,如果消费组内的消费者如果比 Partition 多的话,那么就会有个别消费者一直空闲。

  • API

订阅 Topic 时,可以用正则表达式,如果有新 Topic 匹配上,那能自动订阅上。
  • Offset的保存

一个消费组消费 Partition,需要保存 Offset 记录消费到哪,以前保存在 ZK 中,由于 ZK 的写性能不好,以前的解决方法都是 Consumer 每隔一分钟上报一次。这里 ZK 的性能严重影响了消费的速度,而且很容易出现重复消费。在 0.10 版本后,Kafka 把这个 Offset 的保存,从 ZK 总剥离,保存在一个名叫 consumeroffsets topic 的 Topic 中。

写进消息的 Key 由 Groupid、Topic、Partition 组成,Value 是偏移量 Offset。Topic 配置的清理策略是 Compact。总是保留最新的 Key,其余删掉。一般情况下,每个 Key 的 Offset 都是缓存在内存中,查询的时候不用遍历 Partition,如果没有缓存,第一次就会遍历 Partition 建立缓存,然后查询返回。

确定 Consumer Group 位移信息写入 consumers_offsets 的哪个 Partition,具体计算公式:

__consumers_offsets partition =   Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
//groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。

思考:如果正在跑的服务,修改了 offsets.topic.num.partitions,那么 Offset 的保存是不是就乱套了?
  • 分配 Partition—Reblance

生产过程中 Broker 要分配 Partition,消费过程这里,也要分配 Partition 给消费者。类似 Broker 中选了一个 Controller 出来,消费也要从 Broker 中选一个 Coordinator,用于分配 Partition。

下面从顶向下,分别阐述一下:

  • 怎么选 Coordinator

  • 交互流程

  • Reblance 的流程

①选Coordinator:看Offset 保存在那个 Partition;该 Partition Leader 所在的 Broker 就是被选定的 Coordinator。

这里我们可以看到,Consumer Group的Coordinator,和保存Consumer Group Offset 的 Partition Leader是同一台机器。

②交互流程:把Coordinator选出来之后,就是要分配了。整个流程是这样的:

  • Consumer 启动、或者 Coordinator 宕机了,Consumer 会任意请求一个 Broker,发送 ConsumerMetadataRequest 请求。

    Broker 会按照上面说的方法,选出这个 Consumer 对应 Coordinator 的地址。

  • Consumer 发送 Heartbeat 请求给 Coordinator,返回 IllegalGeneration 的话,就说明 Consumer 的信息是旧的了,需要重新加入进来,进行 Reblance。

    返回成功,那么 Consumer 就从上次分配的 Partition 中继续执行。

③Reblance 流程:

  • Consumer 给 Coordinator 发送 JoinGroupRequest 请求。

  • 这时其他 Consumer 发 Heartbeat 请求过来时,Coordinator 会告诉他们,要 Reblance 了。

  • 其他 Consumer 发送 JoinGroupRequest 请求。

  • 所有记录在册的 Consumer 都发了 JoinGroupRequest 请求之后,Coordinator 就会在这里 Consumer 中随便选一个 Leader。

    然后回 JoinGroupRespone,这会告诉 Consumer 你是 Follower 还是 Leader,对于 Leader,还会把 Follower 的信息带给它,让它根据这些信息去分配 Partition。

  • Consumer向Coordinator 发送 SyncGroupRequest,其中 Leader 的 SyncGroupRequest 会包含分配的情况。

  • Coordinator 回包,把分配的情况告诉 Consumer,包括 Leader。

当 Partition 或者消费者的数量发生变化时,都得进行 Reblance。

列举一下会 Reblance 的情况:

  • 增加 Partition

  • 增加消费者

  • 消费者主动关闭

  • 消费者宕机了

  • Coordinator 自己也宕机了


———————————— 消息投递语义————————————
Kafka 支持 3 种消息投递语义:
  • At most once:最多一次,消息可能会丢失,但不会重复。
  • At least once:最少一次,消息不会丢失,可能会重复。
  • Exactly once:只且一次,消息不丢失不重复,只且消费一次(0.11 中实现,仅限于下游也是 Kafka)
在业务中,常常都是使用 At least once 的模型,如果需要可重入的话,往往是业务自己实现。
1. At least once

先获取数据,再进行业务处理,业务处理成功后 Commit Offset:

  • 生产者生产消息异常,消息是否成功写入不确定,重做,可能写入重复的消息。

  • 消费者处理消息,业务处理成功后,更新 Offset 失败,消费者重启的话,会重复消费。

2. At most once

先获取数据,再 Commit Offset,最后进行业务处理:

  • 生产者生产消息异常,不管,生产下一个消息,消息就丢了。

  • 消费者处理消息,先更新 Offset,再做业务处理,做业务处理失败,消费者重启,消息就丢了。

3. Exactly once

思路是这样的,首先要保证消息不丢,再去保证不重复。所以盯着 At least once 的原因来搞。

首先想出来的:

  • 生产者重做导致重复写入消息:生产保证幂等性。

  • 消费者重复消费:消灭重复消费,或者业务接口保证幂等性重复消费也没问题。

由于业务接口是否幂等,不是 Kafka 能保证的,所以 Kafka 这里提供的 Exactly once 是有限制的,消费者的下游也必须是 Kafka。所以以下讨论的,没特殊说明,消费者的下游系统都是 Kafka(注:使用 Kafka Conector,它对部分系统做了适配,实现了 Exactly once)。生产者幂等性好做,没啥问题。

解决重复消费有两个方法:

  • 下游系统保证幂等性,重复消费也不会导致多条记录。

  • 把 Commit Offset 和业务处理绑定成一个事务。

本来 Exactly once 实现第 1 点就 OK 了。但是在一些使用场景下,我们的数据源可能是多个 Topic,处理后输出到多个 Topic,这时我们会希望输出时要么全部成功,要么全部失败。这就需要实现事务性。既然要做事务,那么干脆把重复消费的问题从根源上解决,把 Commit Offset 和输出到其他 Topic 绑定成一个事务。

4. 生产幂等性
思路是这样的,为每个 Producer 分配一个 Pid,作为该 Producer 的唯一标识。Producer 会为每一个维护一个单调递增的 Seq。类似的,Broker 也会为每个记录下最新的 Seq。
当 req_seq == broker_seq+1 时,Broker 才会接受该消息,因为:
  • 消息的 Seq 比 Broker 的 Seq 大超过时,说明中间有数据还没写入,即乱序了。
  • 消息的 Seq 不比 Broker 的 Seq 小,那么说明该消息已被保存。
5. 事务性/原子性广播
场景是这样的:
  • 先从多个源 Topic 中获取数据。

  • 做业务处理,写到下游的多个目的 Topic。

  • 更新多个源 Topic 的 Offset。

其中第 2、3 点作为一个事务,要么全成功,要么全失败。这里得益于 Offset 实际上是用特殊的 Topic 去保存,这两点都归一为写多个 Topic 的事务性处理。
基本思路是这样的:
  • 引入 Tid(transaction id),和 Pid 不同,这个 ID 是应用程序提供的,用于标识事务,和 Producer 是谁并没关系。

    就是任何 Producer 都可以使用这个 Tid 去做事务,这样进行到一半就死掉的事务,可以由另一个 Producer 去恢复。

  • 同时为了记录事务的状态,类似对 Offset 的处理,引入 Transaction Coordinator 用于记录 Transaction Log。

    在集群中会有多个 Transaction Coordinator,每个 Tid 对应唯一一个 Transaction Coordinator。

    注:Transaction Log 删除策略是 Compact,已完成的事务会标记成 Null,Compact 后不保留。

做事务时,先标记开启事务,写入数据,全部成功就在 Transaction Log 中记录为 Prepare Commit 状态,否则写入 Prepare Abort 的状态。再去给每个相关的 Partition 写入一条 Marker(Commit 或者 Abort)消息,标记这个事务的 Message 可以被读取或已经废弃。成功后在 Transaction Log记录下 Commit/Abort 状态,至此事务结束。

数据流:

  • 首先使用 Tid 请求任意一个 Broker(代码中写的是负载最小的 Broker),找到对应的 Transaction Coordinator。

  • 请求 Transaction Coordinator 获取到对应的 Pid,和 Pid 对应的 Epoch,这个 Epoch 用于防止僵死进程复活导致消息错乱。

    当消息的 Epoch 比当前维护的 Epoch 小时,拒绝掉。Tid 和 Pid 有一一对应的关系,这样对于同一个 Tid 会返回相同的 Pid。

  • Client 先请求 Transaction Coordinator 记录的事务状态,初始状态是 Begin,如果是该事务中第一个到达的,同时会对事务进行计时。

    Client 输出数据到相关的 Partition 中;Client 再请求 Transaction Coordinator 记录 Offset 的事务状态;Client 发送 Offset Commit 到对应 Offset Partition。

  • Client 发送 Commit 请求,Transaction Coordinator 记录 Prepare Commit/Abort,然后发送 Marker 给相关的 Partition。

    全部成功后,记录 Commit/Abort 的状态,最后这个记录不需要等待其他 Replica 的 ACK,因为 Prepare 不丢就能保证最终的正确性了。

这里 Prepare 的状态主要是用于事务恢复,例如给相关的 Partition 发送控制消息,没发完就宕机了,备机起来后,Producer 发送请求获取 Pid 时,会把未完成的事务接着完成。
当 Partition 中写入 Commit 的 Marker 后,相关的消息就可被读取。所以 Kafka 事务在 Prepare Commit 到 Commit 这个时间段内,消息是逐渐可见的,而不是同一时刻可见。
6. 消费事务

前面都是从生产的角度看待事务。还需要从消费的角度去考虑一些问题。消费时,Partition 中会存在一些消息处于未 Commit 状态,即业务方应该看不到的消息,需要过滤这些消息不让业务看到,Kafka 选择在消费者进程中进行过来,而不是在 Broker 中过滤,主要考虑的还是性能。

Kafka 高性能的一个关键点是 Zero Copy,如果需要在 Broker 中过滤,那么势必需要读取消息内容到内存,就会失去 Zero Copy 的特性。

———————————— 文件组织 ————————————

Kafka 的数据,实际上是以文件的形式存储在文件系统的。Topic 下有 Partition,Partition 下有 Segment,Segment 是实际的一个个文件,Topic 和 Partition 都是抽象概念。

在目录 /partitionid}/ 下,存储着实际的 Log 文件(即 Segment),还有对应的索引文件。每个 Segment 文件大小相等,文件名以这个 Segment 中最小的 Offset 命名,文件扩展名是 .log。Segment 对应的索引的文件名字一样,扩展名是 .index。

有两个 Index 文件:

  • 一个是 Offset Index 用于按 Offset 去查 Message。
  • 一个是 Time Index 用于按照时间去查,其实这里可以优化合到一起,下面只说 Offset Index。
总体的组织是这样的:

为了减少索引文件的大小,降低空间使用,方便直接加载进内存中,这里的索引使用稀疏矩阵,不会每一个 Message 都记录下具体位置,而是每隔一定的字节数,再建立一条索引。

索引包含两部分:

  • BaseOffset:意思是这条索引对应 Segment 文件中的第几条 Message。这样做方便使用数值压缩算法来节省空间。例如 Kafka 使用的是 Varint。

  • Position:在 Segment 中的绝对位置。

查找 Offset 对应的记录时,会先用二分法,找出对应的 Offset 在哪个 Segment 中,然后使用索引,在定位出 Offset 在 Segment 中的大概位置,再遍历查找 Message。

———————————— 常用配置项 ————————————

1. Broker 配置

2. Topic配置
关于日志清理,默认当前正在写的日志,是怎么也不会清理掉的。
还有 0.10 之前的版本,时间看的是日志文件的 Mtime,但这个值是不准确的,有可能文件被 Touch 一下,Mtime 就变了。因此从 0.10 版本开始,改为使用该文件最新一条消息的时间来判断。按大小清理这里也要注意,Kafka 在定时任务中尝试比较当前日志量总大小是否超过阈值至少一个日志段的大小。如果超过但是没超过一个日志段,那么就不会删除。
福利
扫描添加小编微信,备注“姓名+公司职位”,加入【云计算学习交流群】,和志同道合的朋友们共同打卡学习!

推荐阅读:
  • 架构师必备技能:教你画出一张合格的技术架构图

  • 30 岁程序员生活图鉴,怎样算是活成了理想的模样?

  • 千万不要和女程序员做同事!

  • 阿里云智能 AIoT 首席科学家丁险峰:阿里全面进军 IoT 这一年 | 问底中国 IT 技术演进

  • 只有程序员才能读懂的西游记

  • 通信工程到底要不要转专业?

  • 阿里云智能运维的自动化三剑客

真香,朕在看了!

看完这篇还不会kafka,我跪榴莲!相关推荐

  1. 第六十二期:看完这篇还不了解Nginx,那我就哭了!

    看完这篇还不了解Nginx,那我就哭了! Nginx 同 Apache 一样都是一种 Web 服务器.基于 REST 架构风格,以统一资源描述符(Uniform Resources Identifie ...

  2. 为什么子进程每次执行顺序不一样_看完这篇还不懂Redis的RDB持久化,你来打我...

    推荐观看: Redis缓存穿透的终极解决方案,手写布隆过滤器_哔哩哔哩 (゜-゜)つロ 干杯~-bilibili​www.bilibili.com P8架构师串讲:Redis,zookeeper,ka ...

  3. 看完这篇还不懂Redis的RDB持久化,你们来打我!

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 一.为什么需要持久化 redis里有10gb数据,突然停电或者意外 ...

  4. 图解 | 看完这篇还不懂高并发中的线程与线程池,你来打我!

    来源 | 码农的荒岛求生 头图 | 视觉中国 一切要从CPU说起 你可能会有疑问,讲多线程为什么要从CPU说起呢?原因很简单,在这里没有那些时髦的概念,你可以更加清晰的看清问题的本质. CPU并不知道 ...

  5. 看完这篇还不会Elasticsearch,我跪搓衣板,90%程序员已收藏

    疯狂的肉丝面 2019-07-24 08:01:01 摘自:JaJian 51CTO技术栈 编者说: 这篇可谓ES雄文,从概念到原理再到应用,还囊括了调优.强烈建议收藏. 生活中的数据 搜索引擎是对数 ...

  6. 看完这篇还不清楚Netty的内存管理,那我就哭了!

    点击上方"朱小厮的博客",选择"设为星标" 后台回复"加群"加入公众号专属技术群 说明 在学习Netty的时候,ByteBuf随处可见,但是 ...

  7. netty发送数据_看完这篇还不清楚Netty的内存管理,那我就哭了

    说明 在学习Netty的时候,ByteBuf随处可见,但是如何高效分配ByteBuf还是很复杂的,Netty的池化内存分配这块还是比较难的,很多人学习过,看过但是还是云里雾里的,本篇文章就是主要来讲解 ...

  8. 看完这篇还不懂 MySQL 主从复制,可以回家躺平了~

    我们在平时工作中,使用最多的数据库就是 MySQL 了,随着业务的增加,如果单单靠一台服务器的话,负载过重,就容易造成宕机. 这样我们保存在 MySQL 数据库的数据就会丢失,那么该怎么解决呢? 其实 ...

  9. 看完这篇还不懂高并发中的线程与线程池你来打我

    int len; void* start_point; - }; 接下来就是起名字时刻. 这个数据结构总要有个名字吧,这个结构体用来记录什么信息呢?记录的是程序在被 <一线大厂Java面试题解析 ...

最新文章

  1. 010-你觉得单元测试可行吗
  2. python PyQt5中文教程☞【第二节】PyQt5基本功能(创建窗口、应用程序图标、显示提示语、通过按钮关闭窗口、消息框(关闭窗口确认框)、窗口显示在屏幕中间【居中显示】)
  3. js模块化编程之彻底弄懂CommonJS和AMD/CMD!
  4. leetcode - 统计封闭岛屿的数目
  5. 深入学习 Redis(1):Redis 内存模型
  6. 【限时免费】架构和运维技术高峰论坛 (成都站)
  7. linux 内核代码怎么下载,centos的linux内核源码下载方法
  8. stdio.h头文件被更改怎么办
  9. arduino中利用LiquidCrystal内置的scrool函数实现屏幕文字滚动播放
  10. echar地图使用小总结
  11. Debug查看汉字机内码
  12. 金色经典图案背景新中式PPT模板
  13. 联想微型计算机进入bios,联想如何进入bios界面的方法汇总
  14. Java PCM音频变声
  15. php开启sockets模块,php开启php_sockets扩展
  16. Appium学习:雷电模拟器的使用
  17. Python入门学习笔记第五章——if条件句~~~
  18. 4103 yxc 的日常
  19. 16省8-四平方和(四平方和定理,又称为拉格朗日定理: 每个正整数都可以表示为至多4个正整数的平方和。 如果把0包括进去,就正好可以表示为4个数的平方和。 比如:)
  20. ico图标下载 ico大全_ICO扰乱了您创办和运营公司的方式

热门文章

  1. python类的特殊方法汇总_Python笔记001-类的特殊方法
  2. 天才王垠惊人言论炸翻网友:相对论是假说,爱因斯坦是民科!
  3. 本硕皆数学专业,博士转行生物后,他发表了学校首篇Nature
  4. “阿法狗”之父:关于围棋,人类3000年来犯了一个错
  5. 重返数学史的黄金时代,由数学推动诞生的人工智能,一部人类智慧形成的历史...
  6. 成都理工大学工程技术学院计算机专业收分线,2019年成都理工大学工程技术学院美术类专业录取分数线...
  7. nginx 端口转发
  8. 【转载保存】java 23种设计模式 深入理解
  9. 还是畅通工程(思想+代码)
  10. QAQ的幸运数字 数学