Kafka高级特性解析


文章目录

  • Kafka高级特性解析
  • 2.5 物理存储
    • 2.5.1 日志存储概述
    • 2.5.2 日志存储
      • 2.5.2.1 索引
        • 2.5.2.1.1 偏移量
        • 2.5.2.1.2 时间戳
      • 2.5.2.2 清理
        • 2.5.2.2.1 日志删除
        • 2.5.2.2.2 日志压缩策略
    • 2.5.3 磁盘存储
      • 2.5.3.1 零拷贝
      • 2.5.3.2 页缓存
      • 2.5.3.3 顺序写入
  • 2.6 稳定性
    • 2.6.1 事务
      • 2.6.1.1 幂等性
      • 2.6.1.2 事务操作
    • 2.6.2 控制器
      • 2.6.2.1 broker选举
    • 2.6.3 可靠性保证
      • 2.6.3.1 失效副本
      • 2.6.3.2 副本复制
    • 2.6.4 一致性保证
      • **一、概念**
      • 二、Follower副本何时更新LEO
      • 三、Follower副本何时更新HW
      • 四、Leader副本何时更新LEO
      • 五、Leader副本何时更新HW值
      • 六、HW和LEO正常更新案例
      • 七、HW和LEO异常案例
      • 八、Leader Epoch使用
        • Kafka解决方案
        • 规避数据丢失
        • 规避数据不一致
    • 2.6.5 消息重复的场景及解决方案
      • 2.6.5.1 生产者阶段重复场景
        • 2.6.5.1.1 根本原因
        • 2.6.5.1.2 重试过程
        • 2.6.5.1.3 可恢复异常说明
        • 2.6.5.1.4 记录顺序问题
      • 2.6.5.2 生产者发送重复解决方案
        • 2.6.5.2.1 启动kafka的幂等性
        • 2.6.5.2.2 ack=0,不重试。
      • 2.6.5.3 生产者和broke阶段消息丢失场景
        • 2.6.5.3.1 ack=0,不重试
        • 2.6.5.3.2 ack=1,leader crash
        • 2.6.5.3.3 unclean.leader.election.enable 配置true
      • 2.6.5.4 解决生产者和broke阶段消息丢失
        • 2.6.5.4.1 禁用unclean选举,ack=all
        • 2.6.5.4.2 配置:min.insync.replicas > 1
        • 2.6.5.4.3 失败的offset单独记录
      • 2.6.5.5 消费者数据重复场景及解决方案
        • 2.6.5.5.1 根本原因
        • 2.6.5.5.2 场景
      • 2.6.5.6 解决方案
        • 2.6.5.6.1 取消自动提交
        • 2.6.5.6.2 下游做幂等
    • 2.6.6 __consumer_offsets
  • 2.7 延时队列
  • 2.8 重试队列
    • 自定义实现步骤
    • 代码实现

2.5 物理存储

2.5.1 日志存储概述

Kafka 消息是以主题为单位进行归类,各个主题之间是彼此独立的,互不影响。
每个主题又可以分为一个或多个分区。
每个分区各自存在一个记录消息数据的日志文件。

图中,创建了一个 tp_demo_01 主题,其存在6个 Parition,对应的每个Parition下存在一个[Topic-Parition] 命名的消息日志文件。在理想情况下,数据流量分摊到各个 Parition 中,实现了负载均衡的效果。在分区日志文件中,你会发现很多类型的文件,比如: .index、.timestamp、.log、.snapshot 等。

其中,文件名一致的文件集合就称为 LogSement

LogSegment

  1. 分区日志文件中包含很多的 LogSegment
  2. Kafka 日志追加是顺序写入的
  3. LogSegment 可以减小日志文件的大小
  4. 进行日志删除的时候和数据查找的时候可以快速定位。
  5. ActiveLogSegment 是活跃的日志分段,拥有文件拥有写入权限,其余的 LogSegment 只有只读的权限。

日志文件存在多种后缀文件,重点需要关注 .index、.timestamp、.log 三种类型。
类别作用

每个 LogSegment 都有一个基准偏移量,表示当前 LogSegment 中第一条消息的 offset。
偏移量是一个 64 位的长整形数,固定是20位数字,长度未达到,用 0 进行填补,索引文件和日志文件都由该作为文件名命名规则(00000000000000000000.index、00000000000000000000.timestamp、00000000000000000000.log)。
如果日志文件名为 00000000000000000121.log ,则当前日志文件的一条数据偏移量就是121(偏移量从 0 开始)。

日志与索引文件

配置项默认值说明
偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。

时间戳索引文件则根据时间戳查找对应的偏移量。

Kafka 中的索引文件是以稀疏索引的方式构造消息的索引,并不保证每一个消息在索引文件中都有对应的索引项。

每当写入一定量的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项。

通过修改 log.index.interval.bytes 的值,改变索引项的密度。

切分文件
当满足如下几个条件中的其中之一,就会触发文件的切分:

  1. 当前日志分段文件的大小超过了 broker 端参数 log.segment.bytes 配置的值。
    log.segment.bytes 参数的默认值为 1073741824,即 1GB。
  2. 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于 log.roll.ms 或 log.roll.hours 参数配置的值。如果同时配置了log.roll.ms 和 log.roll.hours 参数,那么 log.roll.ms 的优先级高。默认情况下,只配置了 log.roll.hours 参数,其值为168,即 7 天。
  3. 偏移量索引文件或时间戳索引文件的大小达到 broker 端参数 log.index.size.max.bytes配置的值。 log.index.size.max.bytes 的默认值为 10485760,即 10MB。
  4. 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于 Integer.MAX_VALUE ,即要追加的消息的偏移量不能转变为相对偏移量。

为什么是 Integer.MAX_VALUE ?
1024 * 1024 * 1024=1073741824
在偏移量索引文件中,每个索引项共占用 8 个字节,并分为两部分。

相对偏移量和物理地址。
相对偏移量:表示消息相对与基准偏移量的偏移量,占 4 个字节
物理地址:消息在日志分段文件中对应的物理位置,也占 4 个字节
4 个字节刚好对应 Integer.MAX_VALUE ,如果大于Integer.MAX_VALUE ,则不能用 4 个字节进行表示了。

索引文件切分过程
索引文件会根据 log.index.size.max.bytes 值进行预先分配空间,即文件创建的时候就是最大值
当真正的进行索引文件切分的时候,才会将其裁剪到实际数据大小的文件。
这一点是跟日志文件有所区别的地方。其意义降低了代码逻辑的复杂性。

2.5.2 日志存储

2.5.2.1 索引

偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。时间戳索引文件则根据时间戳查找对应的偏移量。

文件:
查看一个topic分区目录下的内容,发现有log、index和timeindex三个文件:

  1. log文件名是以文件中第一条message的offset来命名的,实际offset长度是64位,但是这里只使用了20位,应付生产是足够的。
  2. 一组index+log+timeindex文件的名字是一样的,并且log文件默认写满1G后,会进行logrolling形成一个新的组合来记录消息,这个是通过broker端 log.segment.bytes =1073741824指定的。
  3. index和timeindex在刚使用时会分配10M的大小,当进行 log rolling 后,它会修剪为实际的大小。

    1、创建主题:
[root@linux121 ~]# kafka-topics.sh --zookeeper linux121:2181/myKafka --create --topic tp_demo_05 --partitions 1 --replication-factor 1 --config segment.bytes=104857600

2、创建消息文件:

[root@linux121 ~]# for i in `seq 10000000`; do echo "hello lagou $i" >> nmm.txt; done


3、将文本消息生产到主题中:

[root@linux121 ~]# kafka-console-producer.sh --broker-list linux121:9092 --topic tp_demo_05 <nmm.txt

4、查看存储文件:

[root@linux121 ~]# cd /var/lagou/kafka/kafka-logs/cd tp_demo_05-0
[root@linux121 ~]# ll


如果想查看这些文件,可以使用kafka提供的shell来完成,几个关键信息如下:
(1)offset是逐渐增加的整数,每个offset对应一个消息的偏移量。
(2)position:消息批字节数,用于计算物理地址。
(3)CreateTime:时间戳。
(4)magic:2代表这个消息类型是V2,如果是0则代表是V0类型,1代表V1类型。
(5)compresscodec:None说明没有指定压缩类型,kafka目前提供了4种可选择,0-None、1-
GZIP、2-snappy、3-lz4。 (6)crc:对所有字段进行校验后的crc值。

[root@linux121 tp_demo_05-0]# kafka-run-class.sh kafka.tools.DumpLogSegments - -files 00000000000000000000.log --print-data-log | head


关于消息偏移量:
一、消息存储

  1. 消息内容保存在log日志文件中。
  2. 消息封装为Record,追加到log日志文件末尾,采用的是顺序写模式。
  3. 一个topic的不同分区,可认为是queue,顺序写入接收到的消息。

    消费者有offset。下图中,消费者A消费的offset是9,消费者B消费的offset是11,不同的消费者offset是交给一个内部公共topic来记录的。

    (3)时间戳索引文件,它的作用是可以让用户查询某个时间段内的消息,它一条数据的结构是时间戳(8byte)+相对offset(4byte),如果要使用这个索引文件,首先需要通过时间范围,找到对应的相对offset,然后再去对应的index文件找到position信息,然后才能遍历log文件,它也是需要使用上面说的index文件的。
    但是由于producer生产消息可以指定消息的时间戳,这可能将导致消息的时间戳不一定有先后顺序,因此尽量不要生产消息时指定时间戳。

2.5.2.1.1 偏移量

  1. 位置索引保存在index文件中
  2. log日志默认每写入4K(log.index.interval.bytes设定的),会写入一条索引信息到index文件中,因此索引文件是稀疏索引,它不会为每条日志都建立索引信息。
  3. log文件中的日志,是顺序写入的,由message+实际offset+position组成
  4. 索引文件的数据结构则是由相对offset(4byte)+position(4byte)组成,由于保存的是相对第一个消息的相对offset,只需要4byte就可以了,可以节省空间,在实际查找后还需要计算回实际的offset,这对用户是透明的。

稀疏索引,索引密度不高,但是offset有序,二分查找的时间复杂度为O(lgN),如果从头遍历时间复杂度是O(N)。
示意图如下:

偏移量索引由相对偏移量和物理地址组成。

可以通过如下命令解析 .index 文件

kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.index --print-data-log | head

注意:offset 与 position 没有直接关系,因为会删除数据和清理日志。

[root@node1 tp_demo_05-0]# kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000003925423.log --print-data-log | head


在偏移量索引文件中,索引数据都是顺序记录 offset ,但时间戳索引文件中每个追加的索引时间戳必须大于之前追加的索引项,否则不予追加。在 Kafka 0.11.0.0 以后,消息元数据中存在若干的时间戳信息。如果 broker 端参数 log.message.timestamp.type 设置为LogAppendTIme ,那么时间戳必定能保持单调增长。反之如果是CreateTime 则无法保证顺序。
注意:timestamp文件中的 offset 与 index 文件中的 relativeOffset 不是一一对应的。因为数据的写入是各自追加。

思考:如何查看偏移量为23的消息?
Kafka 中存在一个 ConcurrentSkipListMap 来保存在每个日志分段,通过跳跃表方式,定位到在00000000000000000000.index ,通过二分法在偏移量索引文件中找到不大于 23 的最大索引项,即offset 20 那栏,然后从日志分段文件中的物理位置为320 开始顺序查找偏移量为 23 的消息。

2.5.2.1.2 时间戳

在偏移量索引文件中,索引数据都是顺序记录 offset ,但时间戳索引文件中每个追加的索引时间戳必须大于之前追加的索引项,否则不予追加。在 Kafka 0.11.0.0 以后,消息信息中存在若干的时间戳信息。如果 broker 端参数 log.message.timestamp.type 设置为LogAppendTIme ,那么时间戳必定能保持单调增长。反之如果是 CreateTime 则无法保证顺序。

通过时间戳方式进行查找消息,需要通过查找时间戳索引和偏移量索引两个文件

时间戳索引索引格式:前八个字节表示时间戳,后四个字节表示偏移量


思考:查找时间戳为 1557554753430 开始的消息?

  1. 查找该时间戳应该在哪个日志分段中。将1557554753430和每个日志分段中最大时间戳largestTimeStamp逐一对比,直到找到不小于1557554753430所对应的日志分段。日志分段中的largestTimeStamp的计算是:先查询该日志分段所对应时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳字段值大于0,则取该值,否则取该日志分段的最近修改时间。
  2. 查找该日志分段的偏移量索引文件,查找该偏移量对应的物理地址。
  3. 日志文件中从 320 的物理位置开始查找不小于 1557554753430 数据。
    注意:timestamp文件中的 offset 与 index 文件中的 relativeOffset 不是一一对应的,因为数据的写入是各自追加。

2.5.2.2 清理

Kafka 提供两种日志清理策略:
日志删除:按照一定的删除策略,将不满足条件的数据进行数据删除
日志压缩:针对每个消息的 Key 进行整合,对于有相同 Key 的不同 Value 值,只保留最后一个版本。
Kafka 提供 log.cleanup.policy 参数进行相应配置,默认值: delete ,还可以选择compact 。
主题级别的配置项是 cleanup.policy 。

2.5.2.2.1 日志删除

基于时间
日志删除任务会根据 log.retention.hours/log.retention.minutes/log.retention.ms 设
定日志保留的时间节点。如果超过该设定值,就需要进行删除。默认是 7 天, log.retention.ms 优先级最高。

Kafka 依据日志分段中最大的时间戳进行定位。

首先要查询该日志分段所对应的时间戳索引文件,查找时间戳索引文件中最后一条索引项,若最后一条索引项的时间戳字段值大于 0,则取该值,否则取最近修改时间。

为什么不直接选最近修改时间呢?
因为日志文件可以有意无意的被修改,并不能真实的反应日志分段的最大时间信息。

删除过程

  1. 从日志对象中所维护日志分段的跳跃表中移除待删除的日志分段,保证没有线程对这些日志分段进行读取操作。
  2. 这些日志分段所有文件添加 上 .delete 后缀。
  3. 交由一个以 “delete-file” 命名的延迟任务来删除这些 .delete 为后缀的文件。延迟执行时间可以通过 file.delete.delay.ms 进行设置

如果活跃的日志分段中也存在需要删除的数据时?
Kafka 会先切分出一个新的日志分段作为活跃日志分段,该日志分段不删除,删除原来的日志分段。
先腾出地方,再删除。

基于日志大小
日志删除任务会检查当前日志的大小是否超过设定值。设定项为 log.retention.bytes ,单个日志分段的大小由 log.segment.bytes 进行设定。
删除过程

  1. 计算需要被删除的日志总大小 (当前日志文件大小(所有分段)减去retention值)。
  2. 从日志文件第一个 LogSegment 开始查找可删除的日志分段的文件集合。
  3. 执行删除。

基于偏移量
根据日志分段的下一个日志分段的起始偏移量是否大于等于日志文件的起始偏移量,若是,则可以删除此日志分段。
注意:日志文件的起始偏移量并不一定等于第一个日志分段的基准偏移量,存在数据删除,可能与之相等的那条数据已经被删除了。

删除过程

  1. 从头开始遍历每个日志分段,日志分段1的下一个日志分段的起始偏移量为21,小于logStartOffset,将日志分段1加入到删除队列中
  2. 日志分段 2 的下一个日志分段的起始偏移量为35,小于 logStartOffset,将 日志分段 2 加入到删除队列中
  3. 日志分段 3 的下一个日志分段的起始偏移量为57,小于logStartOffset,将日志分段3加入删除集合中
  4. 日志分段4的下一个日志分段的其实偏移量为71,大于logStartOffset,则不进行删除。删除过程

2.5.2.2.2 日志压缩策略

  1. 概念
    日志压缩是Kafka的一种机制,可以提供较为细粒度的记录保留,而不是基于粗粒度的基于时间的
    保留。
    对于具有相同的Key,而数据不同,只保留最后一条数据,前面的数据在合适的情况下删除。
  2. 应用场景
    日志压缩特性,就实时计算来说,可以在异常容灾方面有很好的应用途径。比如,我们在Spark、Flink中做实时计算时,需要长期在内存里面维护一些数据,这些数据可能是通过聚合了一天或者一周的
    日志得到的,这些数据一旦由于异常因素(内存、网络、磁盘等)崩溃了,从头开始计算需要很长的时间。一个比较有效可行的方式就是定时将内存里的数据备份到外部存储介质中,当崩溃出现时,再从外
    部存储介质中恢复并继续计算。

使用日志压缩来替代这些外部存储有哪些优势及好处呢?这里为大家
列举并总结了几点:

  • Kafka即是数据源又是存储工具,可以简化技术栈,降低维护成本
  • 使用外部存储介质的话,需要将存储的Key记录下来,恢复的时候再使用这些Key将数据取回,实现起来有一定的工程难度和复杂度。使用Kafka的日志压缩特性,只需要把数据写进Kafka,等异常出现恢复任务时再读回到内存就可以了
  • Kafka对于磁盘的读写做了大量的优化工作,比如磁盘顺序读写。相对于外部存储介质没有索引查询等工作量的负担,可以实现高性能。同时,Kafka的日志压缩机制可以充分利用廉价的磁盘,不用依赖昂贵的内存来处理,在性能相似的情况下,实现非常高的性价比(这个观点仅仅针对于异常处理和容灾的场景来说)

3 日志压缩方式的实现细节
主题的 cleanup.policy 需要设置为compact。
Kafka的后台线程会定时将Topic遍历两次:

  1. 记录每个key的hash值最后一次出现的偏移量
  2. 第二次检查每个offset对应的Key是否在后面的日志中出现过,如果出现了就删除对应的日志。

日志压缩允许删除,除最后一个key之外,删除先前出现的所有该key对应的记录。在一段时间后从日志中清理,以释放空间。

注意:日志压缩与key有关,确保每个消息的key不为null。

压缩是在Kafka后台通过定时重新打开Segment来完成的,Segment的压缩细节如下图所示:

日志压缩可以确保:

  1. 任何保持在日志头部以内的使用者都将看到所写的每条消息,这些消息将具有顺序偏移量。可以使用Topic的min.compaction.lag.ms属性来保证消息在被压缩之前必须经过的最短时间。也就是说,它为每个消息在(未压缩)头部停留的时间提供了一个下限。可以使用Topic的max.compaction.lag.ms属性来保证从收到消息到消息符合压缩条件之间的最大延时
  • 消息始终保持顺序,压缩永远不会重新排序消息,只是删除一些而已
  • 消息的偏移量永远不会改变,它是日志中位置的永久标识符
  • 从日志开始的任何使用者将至少看到所有记录的最终状态,按记录的顺序写入。另外,如果使用者在比Topic的log.cleaner.delete.retention.ms短的时间内到达日志的头部,则会看到已删除记录的所有delete标记。保留时间默认是24小时。

默认情况下,启动日志清理器,若需要启动特定Topic的日志清理,请添加特定的属性。配置日志清理器,这里为大家总结了以下几点:
1) log.cleanup.policy 设置为 compact ,Broker的配置,影响集群中所有的Topic。
2) log.cleaner.min.compaction.lag.ms ,用于防止对更新超过最小消息进行压缩,如果没有设置,除最后一个Segment之外,所有Segment都有资格进行压缩

  • log.cleaner.max.compaction.lag.ms ,用于防止低生产速率的日志在无限制的时间内不压缩。

Kafka的日志压缩原理并不复杂,就是定时把所有的日志读取两遍,写一遍,而CPU的速度超过磁盘完全不是问题,只要日志的量对应的读取两遍和写入一遍的时间在可接受的范围内,那么它的性能就是可以接受的。

2.5.3 磁盘存储

2.5.3.1 零拷贝

kafka高性能,是多方面协同的结果,包括宏观架构、分布式partition存储、ISR数据同步、以及“无所不用其极”的高效利用磁盘/操作系统特性。
零拷贝并不是不需要拷贝,而是减少不必要的拷贝次数。通常是说在IO读写过程中。
nginx的高性能也有零拷贝的身影。

传统IO
比如:读取文件,socket发送
传统方式实现:先读取、再发送,实际经过1~4四次copy。

buffer = File.read
Socket.send(buffer)

1、第一次:将磁盘文件,读取到操作系统内核缓冲区;
2、第二次:将内核缓冲区的数据,copy到application应用程序的buffer;
3、第三步:将application应用程序buffer中的数据,copy到socket网络发送缓冲区(属于操作系统内核的缓冲区);
4、第四次:将socket buffer的数据,copy到网络协议栈,由网卡进行网络传输。

实际IO读写,需要进行IO中断,需要CPU响应中断(内核态到用户态转换),尽管引入**DMA(DirectMemory Access,直接存储器访问)**来接管CPU的中断请求,但四次copy是存在“不必要的拷贝”的。
实际上并不需要第二个和第三个数据副本。数据可以直接从读缓冲区传输到套接字缓冲区。

kafka的两个过程:
1、网络数据持久化到磁盘 (Producer 到 Broker)
2、磁盘文件通过网络发送(Broker 到 Consumer)

数据落盘通常都是非实时的,Kafka的数据并不是实时的写入硬盘,它充分利用了现代操作系统分页存储来利用内存提高I/O效率。

磁盘文件通过网络发送(Broker 到 Consumer)

磁盘数据通过**DMA(Direct Memory Access,直接存储器访问)**拷贝到内核态 Buffer

直接通过 DMA 拷贝到 NIC Buffer(socket buffer),无需 CPU 拷贝。

除了减少数据拷贝外,整个读文件 ==> 网络发送由一个 sendfile 调用完成,整个过程只有两次上下文切换,因此大大提高了性能。

Java NIO对sendfile的支持就是FileChannel.transferTo()/transferFrom()。

fileChannel.transferTo( position, count, socketChannel);

把磁盘文件读取OS内核缓冲区后的fileChannel,直接转给socketChannel发送;底层就是sendfile。消费者从broker读取数据,就是由此实现。

具体来看,Kafka 的数据传输通过 TransportLayer 来完成,其子类 PlaintextTransportLayer 通过Java NIO 的 FileChannel 的 transferTo 和 transferFrom 方法实现零拷贝。

注: transferTo 和 transferFrom 并不保证一定能使用零拷贝,需要操作系统支持。
Linux 2.4+ 内核通过 sendfile 系统调用,提供了零拷贝。

2.5.3.2 页缓存

页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘 I/O 的操作。

具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。

Kafka接收来自socket buffer的网络数据,应用进程不需要中间处理、直接进行持久化时。可以使用mmap内存文件映射。

Memory Mapped Files

简称mmap,简单描述其作用就是:将磁盘文件映射到内存, 用户通过修改内存就能修改磁盘文件。

它的工作原理是直接利用操作系统的Page来实现磁盘文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。

通过mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存)。使用这种方式可以获取很大的I/O提升,省去了用户空间到内核空间复制的开销。

mmap也有一个很明显的缺陷:不可靠,写到mmap中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘。

Kafka提供了一个参数 producer.type 来控制是不是主动flush;

如果Kafka写入到mmap之后就立即flush然后再返回Producer叫同步(sync);

写入mmap之后立即返回Producer不调用flush叫异步(async)。

Java NIO对文件映射的支持
Java NIO,提供了一个MappedByteBuffer 类可以用来实现内存映射。
MappedByteBuffer只能通过调用FileChannel的map()取得,再没有其他方式。
FileChannel.map()是抽象方法,具体实现是在FileChannelImpl.map()可自行查看JDK源码,其map0()方法就是调用了Linux内核的mmap的API。


使用 MappedByteBuffer类要注意的是

  • mmap的文件映射,在full gc时才会进行释放。当close时,需要手动清除内存映射文件,可以反射调用sun.misc.Cleaner方法。

当一个进程准备读取磁盘上的文件内容时:

  1. 操作系统会先查看待读取的数据所在的页 (page)是否在页缓存(pagecache)中,如果存在(命 中)则直接返回数据,从而避免了对物理磁盘的 I/O 操作;
  2. 如果没有命中,则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存,之后再将数
    据返回给进程。

如果一个进程需要将数据写入磁盘:

  1. 操作系统也会检测数据对应的页是否在页缓存中,如果不存在,则会先在页缓存中添加相应的
    页,最后将数据写入对应的页。
  2. 被修改过后的页也就变成了脏页,操作系统会在合适的时间把脏页中的数据写入磁盘,以保持
    数据的一致性。

对一个进程而言,它会在进程内部缓存处理所需的数据,然而这些数据有可能还缓存在操作系统的页缓存中,因此同一份数据有可能被缓存了两次。并且,除非使用Direct I/O的方式, 否则页缓存很难
被禁止。

当使用页缓存的时候,即使Kafka服务重启, 页缓存还是会保持有效,然而进程内的缓存却需要重建。这样也极大地简化了代码逻辑,因为维护页缓存和文件之间的一致性交由操作系统来负责,这样会
比进程内维护更加安全有效。

Kafka中大量使用了页缓存,这是 Kafka 实现高吞吐的重要因素之一。
消息先被写入页缓存,由操作系统负责刷盘任务。

2.5.3.3 顺序写入

操作系统可以针对线性读写做深层次的优化,比如预读(read-ahead,提前将一个比较大的磁盘块读入内存) 和后写(write-behind,将很多小的逻辑写操作合并起来组成一个大的物理写操作)技术。

Kafka 在设计时采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消 息,并且也不允许修改已写入的消息,这种方式属于典型的顺序写盘的操作,所以就算 Kafka 使用磁盘作为存储
介质,也能承载非常大的吞吐量。

mmap和sendfile:

  1. Linux内核提供、实现零拷贝的API;
  2. sendfile 是将读到内核空间的数据,转到socket buffer,进行网络发送;
  3. mmap将磁盘文件映射到内存,支持读和写,对内存的操作会反映在磁盘文件上。
  4. RocketMQ 在消费消息时,使用了 mmap。kafka 使用了 sendFile。

Kafka速度快是因为:

  1. partition顺序读写,充分利用磁盘特性,这是基础;
  2. Producer生产的数据持久化到broker,采用mmap文件映射,实现顺序的快速写入;
  3. Customer从broker读取数据,采用sendfile,将磁盘文件读到OS内核缓冲区后,直接转到socket buffer进行网络发送。

2.6 稳定性

2.6.1 事务

一、事务场景

  1. 如producer发的多条消息组成一个事务这些消息需要对consumer同时可见或者同时不可见。
  2. producer可能会给多个topic,多个partition发消息,这些消息也需要能放在一个事务里面,
    这就形成了一个典型的分布式事务。
  3. kafka的应用场景经常是应用先消费一个topic,然后做处理再发到另一个topic,这个consume-transform-produce过程需要放到一个事务里面,比如在消息处理或者发送的过程中如果失败了,消费偏移量也不能提交。
  4. producer或者producer所在的应用可能会挂掉,新的producer启动以后需要知道怎么处理之
    前未完成的事务 。
  5. 在一个原子操作中,根据包含的操作类型,可以分为三种情况,前两种情况是事务引入的场景,最后一种没用。
    1) 只有Producer生产消息;
    2)消费消息和生产消息并存,这个是事务场景中最常用的情况,就是我们常说的consume-transform-produce 模式
    3) 只有consumer消费消息,这种操作其实没有什么意义,跟使用手动提交效果一样,而且也不是事务属性引入的目的,所以一般不会使用这种情况

二、几个关键概念和推导

  1. 因为producer发送消息可能是分布式事务,所以引入了常用的2PC,所以有事务协调者(Transaction Coordinator)。Transaction Coordinator和之前为了解决脑裂和惊群问题引入的Group Coordinator在选举上类似。
  2. 事务管理中事务日志是必不可少的,kafka使用一个内部topic来保存事务日志,这个设计和之前使用内部topic保存偏移量的设计保持一致。事务日志是Transaction Coordinator管理的状态的持久化,因为不需要回溯事务的历史状态,所以事务日志只用保存最近的事务状态。
    __transaction_state
  3. 因为事务存在commit和abort两种操作,而客户端又有read committed和readuncommitted两种隔离级别,所以消息队列必须能标识事务状态,这个被称作Control
    Message。
  4. producer挂掉重启或者漂移到其它机器需要能关联的之前的未完成事务所以需要有一个唯一标识符来进行关联,这个就是TransactionalId,一个producer挂了,另一个有相同
    TransactionalId的producer能够接着处理这个事务未完成的状态。kafka目前没有引入全局序,所以也没有transaction id,这个TransactionalId是用户提前配置的。
  5. TransactionalId能关联producer,也需要避免两个使用相同TransactionalId的producer同时存在,所以引入了producer epoch来保证对应一个TransactionalId只有一个活跃的producer

三、事务语义
2.1. 多分区原子写入
事务能够保证Kafka topic下每个分区的原子写入。事务中所有的消息都将被成功写入或者丢弃。

首先,我们来考虑一下原子 读取-处理-写入 周期是什么意思。简而言之,这意味着如果某个应用程序在某个topic tp0的偏移量X处读取到了消息A,并且在对消息A进行了一些处理(如B = F(A))之后
将消息B写入topic tp1,则只有当消息A和B被认为被成功地消费并一起发布,或者完全不发布时,整个读取过程写入操作是原子的。

现在,只有当消息A的偏移量X被标记为已消费,消息A才从topic tp0消费,消费到的数据偏移量(record offset)将被标记为提交偏移量(Committing offset)。在Kafka中,我们通过写入一个名为
offsets topic的内部Kafka topic来记录offset commit。消息仅在其offset被提交给offsets topic时才被认为成功消费。

由于offset commit只是对Kafkatopic的另一次写入,并且由于消息仅在提交偏移量时被视为成功消费,所以跨多个主题和分区的原子写入也启用原子 读取-处理-写入 循环:提交偏移量X到offset topic和消息B到tp1的写入将是单个事务的一部分,所以整个步骤都是原子的。

2.2. 粉碎“僵尸实例”
我们通过为每个事务Producer分配一个称为transactional.id的唯一标识符来解决僵尸实例的问题。在进程重新启动时能够识别相同的Producer实例。

API要求事务性Producer的第一个操作应该是在Kafka集群中显示注册transactional.id。 当注册的时候,Kafka broker用给定的transactional.id检查打开的事务并且完成处理。 Kafka也增加了一个与transactional.id相关的epoch。Epoch存储每个transactional.id内部元数据。

一旦epoch被触发,任何具有相同的transactional.id和旧的epoch的生产者被视为僵尸,Kafka拒绝来自这些生产者的后续事务性写入。

简而言之:Kafka可以保证Consumer最终只能消费非事务性消息或已提交事务性消息。它将保留来自未完成事务的消息,并过滤掉已中止事务的消息。

2.3 事务消息定义
生产者可以显式地发起事务会话,在这些会话中发送(事务)消息,并提交或中止事务。有如下要求:

  1. 原子性:消费者的应用程序不应暴露于未提交事务的消息中。
  2. 持久性:Broker不能丢失任何已提交的事务。
  3. 排序:事务消费者应在每个分区中以原始顺序查看事务消息。
  4. 交织:每个分区都应该能够接收来自事务性生产者和非事务生产者的消息
  5. 事务中不应有重复的消息。

如果允许事务性和非事务性消息的交织,则非事务性和事务性消息的相对顺序将基于附加(对于非事务性消息)和最终提交(对于事务性消息)的相对顺序。

在上图中,分区p0和p1接收事务X1和X2的消息,以及非事务性消息。时间线是消息到达Broker的时间。由于首先提交了X2,所以每个分区都将在X1之前公开来自X2的消息。由于非事务性消息在X1和X2的提交之前到达,因此这些消息将在来自任一事务的消息之前公开。

四、事务配置
1、创建消费者代码,需要:

  • 将配置中的自动提交属性(auto.commit)进行关闭
  • 而且在代码里面也不能使用手动提交commitSync( )或者commitAsync( )
  • 设置isolation.level:READ_COMMITTED或READ_UNCOMMITTED

2、创建生成者,代码如下,需要:

  • 配置transactional.id属性
  • 配置enable.idempotence属性

五、事务概览
生产者将表示事务开始/结束/中止状态的事务控制消息发送给使用多阶段协议管理事务的高可用事务协调器。生产者将事务控制记录(开始/结束/中止)发送到事务协调器,并将事务的消息直接发送到目标数据分区。消费者需要了解事务并缓冲每个待处理的事务,直到它们到达其相应的结束(提交/中止)记录为止。

  • 事务组
  • 事务组中的生产者
  • 事务组的事务协调器
  • Leader brokers(事务数据所在分区的Broker)
  • 事务的消费者

六、事务组
事务组用于映射到特定的事务协调器(基于日志分区数字的哈希)。该组中的生产者需要配置为该组事务生产者。由于来自这些生产者的所有事务都通过此协调器进行,因此我们可以在这些事务生产者之间实现严格的有序

七、生产者ID和事务组状态
事务生产者需要两个新参数:生产者ID和生产组。

需要将生产者的输入状态与上一个已提交的事务相关联。这使事务生产者能够重试事务(通过为该事务重新创建输入状态;在我们的用例中通常是偏移量的向量)。

可以使用消费者偏移量管理机制来管理这些状态。消费者偏移量管理器将每个键(consumergroup-topic-partition )与该分区的最后一个检查点偏移量和元数据相关联。在事务生产者中,我们保存消费者的偏移量,该偏移量与事务的提交点关联。此偏移提交记录(在__consumer_offsets 主题中)应作为事务的一部分写入。即,存储消费组偏移量的__consumer_offsets 主题分区将需要参与事务。因此,假定生产者在事务中间失败(事务协调器随后到期);当生产者恢复时,它可以发出偏移量获取请求,以恢复与最后提交的事务相关联的输入偏移量,并从该点恢复事务处理。

为了支持此功能,我们需要对偏移量管理器和压缩的 __consumer_offsets 主题进行一些增强。

首先,压缩的主题现在还将包含事务控制记录。我们将需要为这些控制记录提出剔除策略。

其次,偏移量管理器需要具有事务意识;特别是,如果组与待处理的事务相关联,则偏移量提
取请求应返回错误。

八、事务协调器

需要确保无论是什么样的保留策略(日志分区的删除还是压缩),都不能删除包含事务HW的日志分段。

九、事务流程

初始阶段 (图中步骤A)

  1. Producer:计算哪个Broker作为事务协调器。
  2. Producer:向事务协调器发送BeginTransaction(producerId, generation, partitions… )请
    求,当然也可以发送另一个包含事务过期时间的。如果生产者需要将消费者状态作为事务的一部分提交事务,则需要在BeginTransaction中包含对应的 __consumer_offsets 主题分区信息。
  3. Broker:生成事务ID
  4. Coordinator:向事务协调主题追加BEGIN(TxId, producerId, generation, partitions…)消息,然后发送响应给生产者。
  5. Producer:读取响应(包含了事务ID:TxId) 6. Coordinator (and followers):在内存更新当前事务的待确认事务状态和数据分区信息。

发送阶段
(图中步骤2)
Producer:发送事务消息给主题Leader分区所在的Broker。每个消息需要包含TxId和TxCtl字段。
TxCtl仅用于标记事务的最终状态(提交还是中止)。生产者请求也封装了生产者ID,但是不追加到日志中。

结束阶段 (生产者准备提交事务)
(图中步骤3、4、5。)

十、事务的中止
当事务生产者发送业务消息的时候如果发生异常,可以中止该事务。如果事务提交超时,事务协调器也会中止当前事务。

十一、基本事务流程的失败

十二、主题的压缩
压缩主题在压缩过程中会丢弃具有相同键的早期记录。如果这些记录是事务的一部分,这合法吗?
这可能有点怪异,但可能不会太有害,因为在主题中使用压缩策略的理由是保留关键数据的最新更新。

如果该应用程序正在(例如)更新某些表,并且事务中的消息对应于不同的键,则这种情况可能导致数据库视图不一致。

十三、事务相关配置
1、Broker configs

2、Producer configs

3、Consumer configs

2.6.1.1 幂等性

Kafka在引入幂等性之前,Producer向Broker发送消息,然后Broker将消息追加到消息流中后给Producer返回Ack信号值。实现流程如下:

生产中,会出现各种不确定的因素,比如在Producer在发送给Broker的时候出现网络异常。比如以下这种异常情况的出现:

上图这种情况,当Producer第一次发送消息给Broker时,Broker将消息(x2,y2)追加到了消息流中,但是在返回Ack信号给Producer时失败了(比如网络异常) 。此时,Producer端触发重试机制,将消息(x2,y2)重新发送给Broker,Broker接收到消息后,再次将该消息追加到消息流中,然后成功返回Ack信号给Producer。这样下来,消息流中就被重复追加了两条相同的(x2,y2)的消息。

幂等性
保证在消息重发的时候,消费者不会重复处理。即使在消费者收到重复消息的时候,重复处理,也要保证最终结果的一致性。
所谓幂等性,数学概念就是: f(f(x)) = f(x) 。f函数表示对消息的处理。
比如,银行转账,如果失败,需要重试。不管重试多少次,都要保证最终结果一定是一致的。

幂等性实现
添加唯一ID,类似于数据库的主键,用于唯一标记一个消息。
Kafka为了实现幂等性,它在底层设计架构中引入了ProducerID和SequenceNumber。

  • ProducerID:在每个新的Producer初始化时,会被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的。
  • SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。


同样,这是一种理想状态下的发送流程。实际情况下,会有很多不确定的因素,比如Broker在发送Ack信号给Producer时出现网络异常,导致发送失败。异常情况如下图所示:

当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。此时,Broker返回Ack信号给Producer时,发生异常导致Producer接收Ack信号失败。对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引入了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber发送给Broker,而之前Broker缓存过之前发送的相同的消息,那么在消息流中的消息就只有一条(x2,y2),不会出现重复发送的情况。

客户端在生成Producer时,会实例化如下代码:

// 实例化一个Producer对象
Producer<String, String> producer = new KafkaProducer<>(props);

在org.apache.kafka.clients.producer.internals.Sender类中,在run()中有一个maybeWaitForPid()方法,用来生成一个ProducerID,实现代码如下:

private void maybeWaitForPid() {if (transactionState == null)return;while (!transactionState.hasPid()) {try {Node node = awaitLeastLoadedNodeReady(requestTimeout);if (node != null) {ClientResponse response = sendAndAwaitInitPidRequest(node);if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();// 设置pid和epochtransactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());} else {log.error("Received an unexpected response type for an InitPidRequest from { }." +"We will back off and try again.", node);}} else {log.debug("Could not find an available broker to send InitPidRequest to." +"We will back off and try again.");}} catch (Exception e) {log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);}log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);time.sleep(retryBackoffMs);metadata.requestUpdate();}
}

2.6.1.2 事务操作

在Kafka事务中,一个原子性操作,根据操作类型可以分为3种情况。情况如下:

  • 只有Producer生产消息,这种场景需要事务的介入;
  • 消费消息和生产消息并存,比如Consumer&Producer模式,这种场景是一般Kafka项目中比较常见的模式,需要事务介入;
  • 只有Consumer消费消息,这种操作在实际项目中意义不大,和手动Commit Offsets的结果一样,而且这种场景不是事务的引入目的。
// 初始化事务,需要注意确保transation.id属性被分配
void initTransactions();// 开启事务
void beginTransaction() throws ProducerFencedException;// 为Consumer提供的在事务内Commit Offsets的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throwsProducerFencedException;// 提交事务
void commitTransaction() throws ProducerFencedException;// 放弃事务,类似于回滚事务的操作
void abortTransaction() throws ProducerFencedException;

案例1:单个Producer,使用事务保证消息的仅一次发送:

package com.lagou.kafka.demo.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;public class MyTransactionalProducer {public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 提供生产者client.idconfigs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer");// 设置事务IDconfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my_tx_id_1");// 需要ISR全体确认消息configs.put(ProducerConfig.ACKS_CONFIG, "all");KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);// 初始化事务producer.initTransactions();try {// 开启事务producer.beginTransaction();// 发送事务消息producer.send(new ProducerRecord<>("tp_tx_01", "txkey1", "tx_msg_4"));producer.send(new ProducerRecord<>("tp_tx_01", "txkey2", "tx_msg_5"));producer.send(new ProducerRecord<>("tp_tx_01", "txkey3", "tx_msg_6"));// 人为制造异常int i = 1 / 0;// 提交事务producer.commitTransaction();} catch (Exception e) {e.printStackTrace();// 事务回滚producer.abortTransaction();} finally {// 关闭生产者producer.close();}}
}

正常运行时, 一次全都显示,如果有异常则会回滚, 不发送消息

案例2:在 消费-转换-生产 模式,使用事务保证仅一次发送。

package com.lagou.kafka.demo;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Collections;
import java.util.HashMap;
import java.util.Map;public class MyTransactional {public static KafkaProducer<String, String> getProducer() {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 设置client.idconfigs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer_01");// 设置事务id, 必须设置configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id_02");// 需要所有的ISR副本确认configs.put(ProducerConfig.ACKS_CONFIG, "all");// 启用幂等性, 通过pid和sequenceconfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);return producer;}public static KafkaConsumer<String, String> getConsumer(String consumerGroupId) {Map<String, Object> configs = new HashMap<>();configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 设置消费组IDconfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_grp_02");// 不启用消费者偏移量的自动确认,也不要手动确认configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_client_02");// 如果不存在这个偏移量, 就自动读最早的configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 只读取已提交的消息
//        configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);return consumer;}public static void main(String[] args) {// 设置消费组idString consumerGroupId = "consumer_grp_id_101";KafkaProducer<String, String> producer = getProducer();KafkaConsumer<String, String> consumer = getConsumer(consumerGroupId);// 事务的初始化producer.initTransactions();//订阅主题consumer.subscribe(Collections.singleton("tp_tx_01"));final ConsumerRecords<String, String> records = consumer.poll(1_000);// 开启事务producer.beginTransaction();try {Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();for (ConsumerRecord<String, String> record : records) {System.out.println(record);producer.send(new ProducerRecord<String, String>("tp_tx_out_01", record.key(), record.value()));offsets.put(new TopicPartition(record.topic(), record.partition()),// 下面要 +1,  偏移量表示下一条要消费的消息new OffsetAndMetadata(record.offset() + 1));}// 将该消息的偏移量提交作为事务的一部分,随事务一起提交和回滚(不手动或自动提交消费偏移量)producer.sendOffsetsToTransaction(offsets, consumerGroupId);//            int i = 1 / 0;// 提交事务producer.commitTransaction();} catch (Exception e) {e.printStackTrace();// 回滚事务producer.abortTransaction();} finally {// 关闭资源producer.close();consumer.close();}}
}

有异常时, 运行之后消费者不会消费消息, 同时, 再次运行时, 还可以从之前的偏移量继续消费消息

2.6.2 控制器

Kafka集群包含若干个broker,broker.id指定broker的编号,编号不要重复。
Kafka集群上创建的主题,包含若干个分区。
每个分区包含若干个副本,副本因子包括了Follower副本和Leader副本。
副本又分为ISR(同步副本分区)和OSR(非同步副本分区)。

控制器就是一个broker。
控制器除了一般broker的功能,还负责Leader分区的选举。

2.6.2.1 broker选举

集群里第一个启动的broker在Zookeeper中创建临时节点 /controller 。
其他broker在该控制器节点创建Zookeeper watch对象,使用Zookeeper的监听机制接收该节点的变更。
即:Kafka通过Zookeeper的分布式锁特性选举集群控制器。
下图中,节点 /myKafka/controller 是一个zookeeper临时节点,其中 “brokerid”:0 ,表示当前控制器是broker.id为0的broker。

每个新选出的控制器通过 Zookeeper 的条件递增操作获得一个全新的、数值更大的 controller epoch。其他 broker 在知道当前 controller epoch 后,如果收到由控制器发出的包含较旧epoch 的消息,就会忽略它们,以防止“脑裂”

比如当一个Leader副本分区所在的broker宕机,需要选举新的Leader副本分区,有可能两个具有不同纪元数字的控制器都选举了新的Leader副本分区,如果选举出来的Leader副本分区不一样,听谁的?脑裂了。有了纪元数字,直接使用纪元数字最新的控制器结果。


集群控制器负责监听 ids 节点,一旦节点子节点发送变化,集群控制器得到通知。


结论:

  1. Kafka 使用 Zookeeper 的分布式锁选举控制器,并在节点加入集群或退出集群时通知控制器。
  2. 控制器负责在节点加入或离开集群时进行分区Leader选举。
  3. 控制器使用epoch 来避免“脑裂”。“脑裂”是指两个节点同时认为自己是当前的控制器。

2.6.3 可靠性保证

概念

  1. 创建Topic的时候可以指定 --replication-factor 3 ,表示分区的副本数,不要超过broker的数量。
  2. Leader是负责读写的节点,而其他副本则是Follower。Producer只把消息发送到Leader,Follower定期地到Leader上Pull数据。
  3. ISR是Leader负责维护的与其保持同步的Replica列表,即当前活跃的副本列表。如果一个Follow落后太多,Leader会将它从ISR中移除。落后太多意思是该Follow复制的消息Follow长
    时间没有向Leader发送fetch请求(参数:replica.lag.time.max.ms 默认值:10000)。 4. 为了保证可靠性,可以设置 acks=all 。Follower收到消息后,会像Leader发送ACK。一旦Leader收到了ISR中所有Replica的ACK,Leader就commit,那么Leader就向Producer发送ACK。

副本的分配:
当某个topic的 --replication-factor 为N(N>1)时,每个Partition都有N个副本,称作replica。原则上是将replica均匀的分配到整个集群上。不仅如此,partition的分配也同样需要均匀分配,为了更好的负载均衡。

副本分配的三个目标:

  1. 均衡地将副本分散于各个broker上
  2. 对于某个broker上分配的分区,它的其他副本在其他broker上
  3. 如果所有的broker都有机架信息,尽量将分区的各个副本分配到不同机架上的broker。

在不考虑机架信息的情况下:

  1. 第一个副本分区通过轮询的方式挑选一个broker,进行分配。该轮询从broker列表的随机位置进行轮询。
  2. 其余副本通过增加偏移进行分配。

2.6.3.1 失效副本

失效副本的判定
replica.lag.time.max.ms 默认大小为10000。

失效副本的分区个数是用于衡量Kafka性能指标的重要部分。Kafka本身提供了一个相关的指标,即UnderReplicatedPartitions,这个可以通过JMX访问:

kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions 1

取值范围是大于等于0的整数。注意:如果Kafka集群正在做分区迁移(kafka-reassign-partitions.sh)的时候,这个值也会大于0。

2.6.3.2 副本复制

日志复制算法(log replication algorithm)必须提供的基本保证是,如果它告诉客户端消息已被提交,而当前Leader出现故障,新选出的Leader也必须具有该消息。在出现故障时,Kafka会从挂掉Leader的ISR里面选择一个Follower作为这个分区新的Leader。

每个分区的 leader 会维护一个in-sync replica(同步副本列表,又称 ISR)。当Producer向broker发送消息,消息先写入到对应Leader分区,然后复制到这个分区的所有副本中。ACKS=ALL时,只有将消息成功复制到所有同步副本(ISR)后,这条消息才算被提交。


当副本落后于 leader 分区时,这个副本被认为是不同步或滞后的。在 Kafka中,副本的滞后于Leader是根据 replica.lag.time.max.ms 来衡量。

如何确认某个副本处于滞后状态
通过 replica.lag.time.max.ms 来检测卡住副本(Stuck replica)在所有情况下都能很好地工作。它跟踪 follower 副本没有向 leader 发送获取请求的时间,通过这个可以推断 follower 是否正常。

另一方面,使用消息数量检测不同步慢副本(Slow replica)的模型只有在为单个主题或具有同类流量模式的多个主题设置这些参数时才能很好地工作,但我们发现它不能扩展到生产集群中所有主题。

2.6.4 一致性保证

一、概念

1. 水位标记
水位或水印(watermark)一词,表示位置信息,即位移(offset)。Kafka源码中使用的名字是高水位,HW(high watermark)

2. 副本角色
Kafka分区使用多个副本(replica)提供高可用。

3. LEO和HW
每个分区副本对象都有两个重要的属性:LEO和HW。

  • LEO:即日志末端位移(log end offset),记录了该副本日志中下一条消息的位移值。如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。另外,Leader LEOFollower LEO的更新是有区别的。
  • HW:即上面提到的水位值。对于同一个副本对象而言,其HW值不会大于LEO值。小于等于HW值的所有消息都被认为是“已备份”的(replicated)。Leader副本和Follower副本的HW更新不同。

二、Follower副本何时更新LEO

三、Follower副本何时更新HW

Follower更新HW发生在其更新LEO之后,一旦Follower向Log写完数据,尝试更新自己的HW值。

比较当前LEO值与FETCH响应中Leader的HW值,取两者的小者作为新的HW值。

即:如果Follower的LEO大于Leader的HW,Follower HW值不会大于Leader的HW值。

四、Leader副本何时更新LEO

和Follower更新LEO相同,Leader写Log时自动更新自己的LEO值。

五、Leader副本何时更新HW值

Leader的HW值就是分区HW值,直接影响分区数据对消费者的可见性 。

Leader如何更新自己的HW值?Leader broker上保存了一套Follower副本的LEO以及自己的LEO。
当尝试确定分区HW时,它会选出所有满足条件的副本,比较它们的LEO(包括Leader的LEO),并选择最小的LEO值作为HW值
需要满足的条件,(二选一):

  1. 处于ISR中
  2. 副本LEO落后于Leader LEO的时长不大于 replica.lag.time.max.ms 参数值(默认是10s)

如果Kafka只判断第一个条件的话,确定分区HW值时就不会考虑这些未在ISR中的副本,但这些副本已经具备了“立刻进入ISR”的资格,因此就可能出现分区HW值越过ISR中副本LEO的情况——不允许。因为分区HW定义就是ISR中所有副本LEO的最小值。

六、HW和LEO正常更新案例

我们假设有一个topic,单分区,副本因子是2,即一个Leader副本和一个Follower副本。我们看下当producer发送一条消息时,broker端的副本到底会发生什么事情以及分区HW是如何被更新的。

1. 初始状态

2. Follower发送FETCH请求在Leader处理完PRODUCE请求之后
producer给该topic分区发送了一条消息
此时的状态如下图所示:


PRODUCE请求处理完成后各值如下,Leader端的HW值依然是0,而LEO是1,Remote LEO也是0。

假设此时follower发送了FETCH请求,则状态变更如下:


而Follower副本接收到FETCH Response后依次执行下列操作:

  1. 写入本地Log,同时更新Follower自己管理的 LEO为1
  2. 更新Follower HW:比较本地LEO和 FETCH Response 中的当前Leader HW值,取较小者,Follower HW = 0

此时,第一轮FETCH RPC结束,我们会发现虽然Leader和Follower都已经在Log中保存了这条消息,但分区HW值尚未被更新,仍为0。

Follower第二轮FETCH
分区HW是在第二轮FETCH RPC中被更新的,如下图所示:


同样地,Follower副本接收到FETCH response后依次执行下列操作:

  1. 写入本地Log,当然没东西可写,Follower LEO也不会变化,依然是1。
  2. 更新Follower HW:比较本地LEO和当前LeaderHW取小者。由于都是1,故更新follower HW
    = 1 。

    此时消息已经成功地被复制到Leader和Follower的Log中且分区HW是1,表明消费者能够消费offset = 0的消息。

七、HW和LEO异常案例

Kafka使用HW值来决定副本备份的进度,而HW值的更新通常需要额外一轮FETCH RPC才能完成。
但这种设计是有问题的,可能引起的问题包括:

  1. 备份数据丢失
  2. 备份数据不一致

数据丢失
使用HW值来确定备份进度时其值的更新是在下一轮RPC中完成的。如果Follower副本在标记上方的的第一步与第二步之间发生崩溃,那么就有可能造成数据的丢失。

上图中有两个副本:A和B。开始状态是A是Leader。
假设生产者 min.insync.replicas 为1,那么当生产者发送两条消息给A后,A写入Log,此时Kafka会通知生产者这两条消息写入成功。

但是在broker端,Leader和Follower的Log虽都写入了2条消息且分区HW已经被更新到2,但Follower HW尚未被更新还是1,也就是上面标记的第二步尚未执行,表中最后一条未执行。

倘若此时副本B所在的broker宕机,那么重启后B会自动把LEO调整到之前的HW值1,故副本B会做日志截断(log truncation),将offset = 1的那条消息从log中删除,并调整LEO = 1。此时follower副本底层log中就只有一条消息,即offset = 0的消息!

B重启之后需要给A发FETCH请求,但若A所在broker机器在此时宕机,那么Kafka会令B成为新的Leader,而当A重启回来后也会执行日志截断,将HW调整回1。这样,offset=1的消息就从两个副本的log中被删除,也就是说这条已经被生产者认为发送成功的数据丢失。

丢失数据的前提是 min.insync.replicas=1 时,一旦消息被写入Leader端Log即被认为是committed 。延迟一轮 FETCH RPC 更新HW值的设计使follower HW值是异步延迟更新,若在这个过程中Leader发生变更,那么成为新Leader的Follower的HW值就有可能是过期的,导致生产者本是成功提交的消息被删除。

Leader和Follower数据离散
除了可能造成的数据丢失以外,该设计还会造成Leader的Log和Follower的Log数据不一致。

如Leader端记录序列:m1,m2,m3,m4,m5,…;Follower端序列可能是m1,m3,m4,m5,…。
看图:

八、Leader Epoch使用

Kafka解决方案

造成上述两个问题的根本原因在于

  1. HW值被用于衡量副本备份的成功与否。
  2. 在出现失败重启时作为日志截断的依据。

但HW值的更新是异步延迟的,特别是需要额外的FETCH请求处理流程才能更新,故这中间发生的任何崩溃都可能导致HW值的过期。

Kafka从0.11引入了 leader epoch 来取代HW值。Leader端使用内存保存Leader的epoch信息,即使出现上面的两个场景也能规避这些问题。

所谓Leader epoch实际上是一对值:<epoch, offset>:

  1. epoch表示Leader的版本号,从0开始,Leader变更过1次,epoch+1
  2. offset对应于该epoch版本的Leader写入第一条消息的offset。因此假设有两对值:
<0, 0>
<1, 120>

则表示第一个Leader从位移0开始写入消息;共写了120条[0, 119];而第二个Leader版本号是1,从位移120处开始写入消息。

  1. Leader broker中会保存这样的一个缓存,并定期地写入到一个 checkpoint 文件中。
  2. 当Leader写Log时它会尝试更新整个缓存:如果这个Leader首次写消息,则会在缓存中增加一个条目;否则就不做更新。
  3. 每次副本变为Leader时会查询这部分缓存,获取出对应Leader版本的位移,则不会发生数据不一致和丢失的情况。

规避数据丢失

规避数据不一致


依靠Leader epoch的信息可以有效地规避数据不一致的问题。
对于使用 unclean.leader.election.enable = true 设置的群集,该方案不能保证消息的一致性

2.6.5 消息重复的场景及解决方案

消息重复和丢失是kafka中很常见的问题,主要发生在以下三个阶段:

  1. 生产者阶段
  2. broke阶段
  3. 消费者阶段

2.6.5.1 生产者阶段重复场景

2.6.5.1.1 根本原因

生产发送的消息没有收到正确的broke响应,导致生产者重试。

生产者发出一条消息,broke落盘以后因为网络等种种原因发送端得到一个发送失败的响应或者网络中断,然后生产者收到一个可恢复的Exception重试消息导致消息重复。

2.6.5.1.2 重试过程


说明:

  1. new KafkaProducer()后创建一个后台线程KafkaThread扫描RecordAccumulator中是否有消息;
  2. 调用KafkaProducer.send()发送消息,实际上只是把消息保存到RecordAccumulator中;
  3. 后台线程KafkaThread扫描到RecordAccumulator中有消息后,将消息发送到kafka集群;
  4. 如果发送成功,那么返回成功;
  5. 如果发送失败,那么判断是否允许重试。如果不允许重试,那么返回失败的结果;如果允许重试,把消息再保存到RecordAccumulator中,等待后台线程KafkaThread扫描再次发送;

2.6.5.1.3 可恢复异常说明

异常是RetriableException类型或者TransactionManager允许重试;RetriableException类继承关系如下:

2.6.5.1.4 记录顺序问题

如果设置 max.in.flight.requests.per.connection 大于1(默认5,单个连接上发送的未确认请求的最大数量,表示上一个发出的请求没有确认下一个请求又发出了)。大于1可能会改变记录的顺序,因为如果将两个batch发送到单个分区,第一个batch处理失败并重试,但是第二个batch处理成功,那么第二个batch处理中的记录可能先出现被消费。

设置 max.in.flight.requests.per.connection 为1,可能会影响吞吐量,可以解决单个生产者发送顺序问题。如果多个生产者,生产者1先发送一个请求,生产者2后发送请求,此时生产者1返回可恢复异常,重试一定次数成功了。虽然生产者1先发送消息,但生产者2发送的消息会被先消费。

2.6.5.2 生产者发送重复解决方案

2.6.5.2.1 启动kafka的幂等性

要启动kafka的幂等性,设置: enable.idempotence=true ,以及 ack=all 以及 retries > 1 。

2.6.5.2.2 ack=0,不重试。

可能会丢消息,适用于吞吐量指标重要性高于数据丢失,例如:日志收集。

2.6.5.3 生产者和broke阶段消息丢失场景

2.6.5.3.1 ack=0,不重试

生产者发送消息完,不管结果了,如果发送失败也就丢失了。

2.6.5.3.2 ack=1,leader crash

生产者发送消息完,只等待Leader写入成功就返回了,Leader分区丢失了,此时Follower没来及同步,消息丢失。

2.6.5.3.3 unclean.leader.election.enable 配置true

允许选举ISR以外的副本作为leader,会导致数据丢失,默认为false。生产者发送异步消息,只等待Lead写入成功就返回,Leader分区丢失,此时ISR中没有Follower,Leader从OSR中选举,因为OSR中本来落后于Leader造成消息丢失。

2.6.5.4 解决生产者和broke阶段消息丢失

2.6.5.4.1 禁用unclean选举,ack=all

ack=all / -1,tries > 1,unclean.leader.election.enable : false

生产者发完消息,等待Follower同步完再返回,如果异常则重试。副本的数量可能影响吞吐量,不超过5个,一般三个。

不允许unclean Leader选举。

2.6.5.4.2 配置:min.insync.replicas > 1

当生产者将 acks 设置为 all (或 -1 )时, min.insync.replicas>1 。指定确认消息写成功需要的最小副本数量。达不到这个最小值,生产者将引发一个异常(要么是NotEnoughReplicas,要么是NotEnoughReplicasAfterAppend)。

当一起使用时, min.insync.replicas 和 ack 允许执行更大的持久性保证。一个典型的场景是创建一个复制因子为3的主题,设置min.insync复制到2个,用 all 配置发送。将确保如果大多数副本没有
收到写操作,则生产者将引发异常。

2.6.5.4.3 失败的offset单独记录

生产者发送消息,会自动重试,遇到不可恢复异常会抛出,这时可以捕获异常记录到数据库或缓存,进行单独处理。

2.6.5.5 消费者数据重复场景及解决方案

2.6.5.5.1 根本原因

数据消费完没有及时提交offset到broker。

2.6.5.5.2 场景

消息消费端在消费过程中挂掉没有及时提交offset到broke,另一个消费端启动拿之前记录的offset开始消费,由于offset的滞后性可能会导致新启动的客户端有少量重复消费。

2.6.5.6 解决方案

2.6.5.6.1 取消自动提交

每次消费完或者程序退出时手动提交。这可能也没法保证一条重复。

2.6.5.6.2 下游做幂等

一般是让下游做幂等或者尽量每消费一条消息都记录offset,对于少数严格的场景可能需要把offset或唯一ID(例如订单ID)和下游状态更新放在同一个数据库里面做事务来保证精确的一次更新或者在下游数据表里面同时记录消费offset,然后更新下游数据的时候用消费位移做乐观锁拒绝旧位移的数据更新。

2.6.6 __consumer_offsets

Zookeeper不适合大批量的频繁写入操作。
Kafka 1.0.2将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets主题,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。

  1. 创建topic “tp_test_01”
[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_test_01 --partitions 5 --replication-factor 1
  1. 使用kafka-console-producer.sh脚本生产消息
[root@node1 ~]# for i in `seq 100`; do echo "hello lagou $i" >> messages.txt; done
[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_test_01 < messages.txt

由于默认没有指定key,所以根据round-robin方式,消息分布到不同的分区上。 (本例中生产了100条消息)

  1. 验证消息生产成功
[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_test_01 < messages.txt
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
[root@node1 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list node1:9092 --topic tp_test_01 --time -1
#显示结果
tp_test_01:2:20
tp_test_01:4:20
tp_test_01:1:20
tp_test_01:3:20
tp_test_01:0:20
[root@node1 ~]#

结果输出表明100条消息全部生产成功!

  1. 创建一个console consumer group
[root@node1 ~]#kafka-console-consumer.sh --bootstrap-server node1:9092 --topic tp_test_01 --from-beginning
  1. 获取该consumer group的group id(后面需要根据该id查询它的位移信息)
[root@node1 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --list

输出: console-consumer-49366 (记住这个id!)

  1. 查询__consumer_offsets topic所有内容
    注意:运行下面命令前先要在consumer.properties中设置exclude.internal.topics=false
[root@node1 ~]# kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config /opt/lagou/kafka/config/consumer.properties --from-beginning

默认情况下__consumer_offsets有50个分区,如果你的系统中consumer group也很多的话,那么这个命令的输出结果会很多。

  1. 计算指定consumer group在__consumer_offsets topic中分区信息
    这时候就用到了第5步获取的group.id(本例中是console-consumer-49366)。Kafka会使用下面公式计算该group位移保存在__consumer_offsets的哪个分区上:
Math.abs(groupID.hashCode()) % numPartitions


对应的分区=Math.abs(“console-consumer-49366”.hashCode()) % 50 = 19,即__consumer_offsets的分区19保存了这个consumer group的位移信息。

  1. 获取指定consumer group的位移信息
[root@node1 ~]# kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 19 --broker-list node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

下面是输出结果:

上图可见,该consumer group果然保存在分区11上,且位移信息都是对的(这里的位移信息是已消费的位移,严格来说不是第3步中的位移。由于我的consumer已经消费完了所有的消息,所以这里的位移与第3步中的位移相同)。另外,可以看到__consumer_offsets topic的每一日志项的格式都是:
[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]。

2.7 延时队列

两个follower副本都已经拉取到了leader副本的最新位置,此时又向leader副本发送拉取请求,而leader副本并没有新的消息写入,那么此时leader副本该如何处理呢?可以直接返回空的拉取结果给follower副本,不过在leader副本一直没有新消息写入的情况下,follower副本会一直发送拉取请求,并且总收到空的拉取结果,消耗资源。

Kafka在处理拉取请求时,会先读取一次日志文件,如果收集不到足够多(fetchMinBytes,由参数fetch.min.bytes配置,默认值为1)的消息,那么就会创建一个延时拉取操作(DelayedFetch)以等待
拉取到足够数量的消息。当延时拉取操作执行时,会再读取一次日志文件,然后将拉取结果返回给follower副本。

延迟操作不只是拉取消息时的特有操作,在Kafka中有多种延时操作,比如延时数据删除、延时生产等。

对于延时生产(消息)而言,如果在使用生产者客户端发送消息的时候将acks参数设置为-1,那么就意味着需要等待ISR集合中的所有副本都确认收到消息之后才能正确地收到响应的结果,或者捕获超时异常。

假设某个分区有3个副本:leader、follower1和follower2,它们都在分区的ISR集合中。不考虑ISR变动的情况,Kafka在收到客户端的生产请求后,将消息3和消息4写入leader副本的本地日志文件。

由于客户端设置了acks为-1,那么需要等到follower1和follower2两个副本都收到消息3和消息4后才能告知客户端正确地接收了所发送的消息。如果在一定的时间内,follower1副本或follower2副本没能够完全拉取到消息3和消息4,那么就需要返回超时异常给客户端。生产请求的超时时间由参数request.timeout.ms配置,默认值为30000,即30s。

那么这里等待消息3和消息4写入follower1副本和follower2副本,并返回相应的响应结果给客户端的动作是由谁来执行的呢?在将消息写入leader副本的本地日志文件之后,Kafka会创建一个延时的生
产操作(DelayedProduce),用来处理消息正常写入所有副本或超时的情况,以返回相应的响应结果给客户端。

延时操作需要延时返回响应的结果,首先它必须有一个超时时间(delayMs),如果在这个超时时间内没有完成既定的任务,那么就需要强制完成以返回响应结果给客户端。其次,延时操作不同于定时
操作,定时操作是指在特定时间之后执行的操作,而延时操作可以在所设定的超时时间之前完成,所以延时操作能够支持外部事件的触发。

就延时生产操作而言,它的外部事件是所要写入消息的某个分区的HW(高水位)发生增长。也就是说,随着follower副本不断地与leader副本进行消息同步,进而促使HW进一步增长,HW每增长一次
都会检测是否能够完成此次延时生产操作,如果可以就执行以此返回响应结果给客户端;如果在超时时间内始终无法完成,则强制执行。

延时拉取操作,是由超时触发或外部事件触发而被执行的。超时触发很好理解,就是等到超时时间之后触发第二次读取日志文件的操作。外部事件触发就稍复杂了一些,因为拉取请求不单单由follower副本发起,也可以由消费者客户端发起,两种情况所对应的外部事件也是不同的。如果是follower副本的延时拉取,它的外部事件就是消息追加到了leader副本的本地日志文件中;如果是消费者客户端的延
时拉取,它的外部事件可以简单地理解为HW的增长。

时间轮实现延时队列。
TimeWheel。size,每个单元格的时间,每个单元格都代表一个时间,size*每个单元格的时间就是一个周期。

2.8 重试队列

kafka没有重试机制不支持消息重试,也没有死信队列,因此使用kafka做消息队列时,需要自己实现消息重试的功能。

自定义实现步骤

创建新的kafka主题作为重试队列:

  1. 创建一个topic作为重试topic,用于接收等待重试的消息。
  2. 普通topic消费者设置待重试消息的下一个重试topic。
  3. 从重试topic获取待重试消息储存到redis的zset中,并以下一次消费时间排序
  4. 定时任务从redis获取到达消费事件的消息,并把消息发送到对应的topic
  5. 同一个消息重试次数过多则不再重试

代码实现

  1. 新建springboot项目
    版本:2.2.8
    dependencies选择如下三个:

    pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.8.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.lagou.kafka.demo</groupId><artifactId>demo-retryqueue</artifactId><version>0.0.1-SNAPSHOT</version><name>demo-retryqueue</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
  1. 添加application.properties
# bootstrap.servers
spring.kafka.bootstrap-servers=node1:9092
# key序列化器
spring.kafka.producer.key-
serializer=org.apache.kafka.common.serialization.StringSerializer
# value序列化器
spring.kafka.producer.value-
serializer=org.apache.kafka.common.serialization.StringSerializer
# 消费组id:group.id
spring.kafka.consumer.group-id=retryGroup
# key反序列化器
spring.kafka.consumer.key-
deserializer=org.apache.kafka.common.serialization.StringDeserializer
# value反序列化器
spring.kafka.consumer.value-
deserializer=org.apache.kafka.common.serialization.StringDeserializer
# redis数据库编号
spring.redis.database=0
# redis主机地址
spring.redis.host=node1
# redis端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=20
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.jedis.pool.max-wait=-1
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=10
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=0
# 连接超时时间(毫秒)
spring.redis.timeout=1000
# Kafka主题名称
spring.kafka.topics.test=tp_demo_retry_01
# 重试队列
spring.kafka.topics.retry=tp_demo_retry_02
  1. RetryqueueApplication.java
package com.lagou.kafka.demo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class RetryqueueApplication {public static void main(String[] args) {SpringApplication.run(RetryqueueApplication.class, args);}}
  1. AppConfig.java
package com.lagou.kafka.demo.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;// 配置redis template
@Configuration
public class AppConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();// 配置连接工厂template.setConnectionFactory(factory);return template;}}
  1. KafkaController.java
package com.lagou.kafka.demo.controller;import com.lagou.kafka.demo.service.KafkaService;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.ExecutionException;@RestController
public class RetryController {@Autowiredprivate KafkaService kafkaService;@Value("${spring.kafka.topics.test}")private String topic;@RequestMapping("/send/{message}")public String sendMessage(@PathVariable String message) throws ExecutionException, InterruptedException {ProducerRecord<String, String> record = new ProducerRecord<>(topic,message);// 向业务主题发送消息String result = kafkaService.sendMessage(record);return result;}}
  1. KafkaService.java
package com.lagou.kafka.demo.service;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;import java.util.concurrent.ExecutionException;@Service
public class KafkaService {private Logger log = LoggerFactory.getLogger(KafkaService.class);// 标红不管@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public String sendMessage(ProducerRecord<String, String> record) throws ExecutionException, InterruptedException {SendResult<String, String> result = this.kafkaTemplate.send(record).get();RecordMetadata metadata = result.getRecordMetadata();String returnResult = metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset();log.info("发送消息成功:" + returnResult);return returnResult;}}
  1. ConsumerListener.java
package com.lagou.kafka.demo.listener;import com.lagou.kafka.demo.service.RetryService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class ConsumerListener {private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);@Autowiredprivate RetryService kafkaRetryService;private static int index = 0;// 拉取下面主题的消息@KafkaListener(topics = "${spring.kafka.topics.test}", groupId = "${spring.kafka.consumer.group-id}")public void consume(ConsumerRecord<String, String> record) {try {// 业务处理log.info("消费的消息:" + record);index++;if (index % 2 == 0) {throw new Exception("该重发了");}} catch (Exception e) {log.error(e.getMessage());// 消息重试,实际上先将消息放到redis, 再从redis放到消息队列中kafkaRetryService.consumerLater(record);}}}
  1. KafkaRetryService.java
package com.lagou.kafka.demo.service;import com.alibaba.fastjson.JSON;
import com.lagou.kafka.demo.entity.RetryRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;import java.nio.ByteBuffer;
import java.util.Calendar;
import java.util.Date;@Service
public class RetryService {private static final Logger log = LoggerFactory.getLogger(RetryService.class);/*** 消息消费失败后下一次消费的延迟时间(秒)* 第一次重试延迟10秒;第   二次延迟30秒,第三次延迟1分钟...*/private static final int[] RETRY_INTERVAL_SECONDS = {10, 30, 1*60, 2*60, 5*60, 10*60, 30*60, 1*60*60, 2*60*60};/*** 重试topic*/@Value("${spring.kafka.topics.retry}")private String retryTopic;@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void consumerLater(ConsumerRecord<String, String> record){// 获取消息的已重试次数int retryTimes = getRetryTimes(record);Date nextConsumerTime = getNextConsumerTime(retryTimes);// 如果达到重试次数,则不再重试if(nextConsumerTime == null) {return;}// 组织消息RetryRecord retryRecord = new RetryRecord();retryRecord.setNextTime(nextConsumerTime.getTime());retryRecord.setTopic(record.topic());retryRecord.setRetryTimes(retryTimes);retryRecord.setKey(record.key());retryRecord.setValue(record.value());// 转换为字符串String value = JSON.toJSONString(retryRecord);// 发送到重试队列kafkaTemplate.send(retryTopic, null, value);}/*** 获取消息的已重试次数*/private int getRetryTimes(ConsumerRecord record){int retryTimes = -1;for(Header header : record.headers()){if(RetryRecord.KEY_RETRY_TIMES.equals(header.key())){ByteBuffer buffer = ByteBuffer.wrap(header.value());retryTimes = buffer.getInt();}}retryTimes++;return retryTimes;}/*** 获取待重试消息的下一次消费时间*/private Date getNextConsumerTime(int retryTimes){// 重试次数超过上限,不再重试if(RETRY_INTERVAL_SECONDS.length < retryTimes) {return null;}Calendar calendar = Calendar.getInstance();calendar.add(Calendar.SECOND, RETRY_INTERVAL_SECONDS[retryTimes]);return calendar.getTime();}
}
  1. RetryListener.java
package com.lagou.kafka.demo.listener;import com.alibaba.fastjson.JSON;
import com.lagou.kafka.demo.entity.RetryRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.util.Set;
import java.util.UUID;@Component
// 下面注解表示开启调度
@EnableScheduling
public class RetryListener {private Logger log = LoggerFactory.getLogger(RetryListener.class);private static final String RETRY_KEY_ZSET = "_retry_key";      // 时间private static final String RETRY_VALUE_MAP = "_retry_value";   // 消息@Autowiredprivate RedisTemplate<String,Object> redisTemplate;@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;//@Value("${spring.kafka.topics.test}")private String bizTopic;@KafkaListener(topics = "${spring.kafka.topics.retry}")     // 只要有消息就取出来
//    public void consume(List<ConsumerRecord<String, String>> list) {//        for(ConsumerRecord<String, String> record : list){public void consume(ConsumerRecord<String, String> record) {System.out.println("需要重试的消息:" + record);RetryRecord retryRecord = JSON.parseObject(record.value(), RetryRecord.class);/*** 防止待重试消息太多撑爆redis,可以将待重试消息按下一次重试时间分开存储放到不同介质* 例如下一次重试时间在半小时以后的消息储存到mysql,并定时从mysql读取即将重试的消息储储存到redis*/// 通过redis的zset进行时间排序String key = UUID.randomUUID().toString();redisTemplate.opsForHash().put(RETRY_VALUE_MAP, key, record.value());redisTemplate.opsForZSet().add(RETRY_KEY_ZSET, key, retryRecord.getNextTime());}
//    }/*** 定时任务从redis读取到达重试时间的消息,发送到对应的topic*/
//    @Scheduled(cron="2 * * * * *")@Scheduled(fixedDelay = 2000)public void retryFromRedis() {log.warn("retryFromRedis----begin");long currentTime = System.currentTimeMillis();// 根据时间倒序获取Set<ZSetOperations.TypedTuple<Object>> typedTuples =redisTemplate.opsForZSet().reverseRangeByScoreWithScores(RETRY_KEY_ZSET, 0, currentTime);// 移除取出的消息redisTemplate.opsForZSet().removeRangeByScore(RETRY_KEY_ZSET, 0, currentTime);for(ZSetOperations.TypedTuple<Object> tuple : typedTuples){String key = tuple.getValue().toString();String value = redisTemplate.opsForHash().get(RETRY_VALUE_MAP, key).toString();redisTemplate.opsForHash().delete(RETRY_VALUE_MAP, key);RetryRecord retryRecord = JSON.parseObject(value, RetryRecord.class);ProducerRecord record = retryRecord.parse();ProducerRecord recordReal = new ProducerRecord(bizTopic,record.partition(),record.timestamp(),record.key(),record.value(),record.headers());kafkaTemplate.send(recordReal);}// todo 发生异常将发送失败的消息重新发送到redis}
}
  1. RetryRecord.java
package com.lagou.kafka.demo.entity;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;public class RetryRecord {public static final String KEY_RETRY_TIMES = "retryTimes";private String key;private String value;private Integer retryTimes;private String topic;private Long nextTime;public RetryRecord() {}public String getKey() {return key;}public void setKey(String key) {this.key = key;}public String getValue() {return value;}public void setValue(String value) {this.value = value;}public Integer getRetryTimes() {return retryTimes;}public void setRetryTimes(Integer retryTimes) {this.retryTimes = retryTimes;}public String getTopic() {return topic;}public void setTopic(String topic) {this.topic = topic;}public Long getNextTime() {return nextTime;}public void setNextTime(Long nextTime) {this.nextTime = nextTime;}public ProducerRecord parse() {     // 解析成ProducerRecordInteger partition = null;Long timestamp = System.currentTimeMillis();List<Header> headers = new ArrayList<>();ByteBuffer retryTimesBuffer = ByteBuffer.allocate(4);retryTimesBuffer.putInt(retryTimes);retryTimesBuffer.flip();headers.add(new RecordHeader(RetryRecord.KEY_RETRY_TIMES, retryTimesBuffer));ProducerRecord sendRecord = new ProducerRecord(topic, partition, timestamp, key, value, headers);return sendRecord;}
}

4.2.4 Kafka高级特性解析(物理存储、稳定性:事物,控制器,可靠性,一致性,_consumer_offsets、延时队列、自定义重试队列)相关推荐

  1. 分布式-MQ-07 kafka高级特性及常见问题调优

    kafka高级特性及常见问题调优 一.kafka高级特性 1.1 producer发布消息机制 1.2 Controller及partition选举机制 1.2.1 Controller选举机制(zk ...

  2. 8 Kafka高级特性-稳定性

    8 稳定性 8.1 事务 一.事务场景 1. 如producer发的多条消息组成一个事务这些消息需要对consumer同时可见或者同时不可见 . 2. producer可能会给多个topic,多个pa ...

  3. Zabbix之2023 Zabbix6.4最新高级特性、优缺点及其实现原理总结

    目录 Zabbix高级特性1. 自动发现 Zabbix高级特性2. 分布式监控 Zabbix高级特性3. 高级报警 Zabbix高级特性4. 可视化 Zabbix高级特性5. API Zabbix高级 ...

  4. python切片迭代_Python高级特性 切片 迭代解析

    切片:方便截取list.tuple.字符串部分索引的内容 正序切片 语法:dlist = doList[0:3]表示,从索引0开始取,直到索引3为止,但不包括索引3.即索引0,1,2,正好是3个元素 ...

  5. 【Hadoop】HDFS操作、数据上传与下载原理解析、高级特性及底层原理

    HDFS操作.数据上传与下载原理解析.高级特性及底层原理 1 HDFS操作 1.1 Web Console网页工具 1.2 命令行 1.2.1 普通的操作命令 1.2.2 管理员命令 1.3 Java ...

  6. 发布订阅的消息系统 Kafka的深度解析

    发布&订阅的消息系统 Kafka的深度解析 2015-01-27 10:25 Jason Guo Jason Guo的博客 字号: T | T 一个典型的kafka集群中包含若干produce ...

  7. 【Java书笔记】:《深入理解Java虚拟机:JVM高级特性与最佳实践(第3版)》第2部分-自动内存管理,第3部分-虚拟机执行子系统,第5部分-高效并发

    作者:周志明 整理者GitHub:https://github.com/starjuly/UnderstandingTheJVM 第2部分-自动内存管理 第2章 Java内存区域与内存溢出异常 2.2 ...

  8. RabbitMQ 高级特性(吐血猝死整理篇)

    文章目录 RabbitMQ 高级特性 消息可靠性投递(可靠性发送) 事务机制 代码实现 发送方确认机制 为什么比事务性能好 示例代码 测试一下QPS 持久化存储 TTL 队列 死信队列(DLX) 延迟 ...

  9. 数字式声纳设计原理 pdf_阿里P7大牛,深入剖析JVM底层设计原理+高级特性pdf,附46页ppt...

    前言 JVM是Java Virtual Machine(Java虚拟机)的缩写,JVM是一种用于计算设备的规范,它是一个虚构出来的计算机,是通过在实际的计算机上仿真模拟各种计算机功能来实现的. 引入J ...

最新文章

  1. Kali Linux更新后无法启动解决了
  2. 乔布斯最伟大的贡献是什么
  3. 洛谷P3391文艺平衡树(Splay)
  4. ASP.NET MVC导出excel(数据量大,非常耗时的,异步导出)
  5. `>>`(有符号右移) 和 `>>>`(无符号右移)的区别
  6. c语言考试题及答案 大一,大一C语言期末考试试题
  7. iOS开发- 蓝牙后台接收数据(BLE4.0)
  8. php 事件调度,MySQL的事件调度器使用介绍
  9. vue amp; nuxt 博客网站
  10. class matplotlib.figure.Figure
  11. 05-01 docker 介绍
  12. maven编译报程序包不存在_宝马730i空调不制冷,报冷却剂压缩机当前存在故障
  13. python列表数据写入txt文件_Python将列表数据写入文件(txt, csv,excel)
  14. ❤️Python Django网站开发 2021年最新版教程 合集❤️
  15. 虚拟服务器vdi重删,VDI桌面虚拟化简介
  16. python之测试类
  17. 虚拟机安装centos7
  18. 【mysql】查询中英文名称拼接处理
  19. 【跨境电商】5个免费极简主义WordPress主题(二)
  20. 管理系统类毕设(二)---学生管理系统说明

热门文章

  1. QT的OpenGL进行模型的3D展示
  2. Linux怎么同步另一台设备的时间
  3. 生物冰箱智能锁有哪些功能
  4. pyqt5菜鸟教程_PyQt5系列教程(61):PyQt5与数据库互联的小例子1
  5. C语言简易程序设计————11、打印楼梯与笑脸
  6. 项目环境搭建,数据库,以及Swagger2介绍(二)
  7. 解救小哈(深度优先,广度优先)
  8. 网络信息安全领域中常见的几个概念
  9. 云蹦迪云广场舞软件开源源码
  10. C++——计算x的n次幂