写在开头:

本章是Kafka学习归纳第五部分,着重于强调Kafka的事一致性保证,消息重复消费场景及解决方式,记录偏移量的主题,延时队列的知识点。

文章内容输出来源:拉勾教育大数据高薪训练营。

一致性保证

水位标记

水位或水印(watermark)一词,表示位置信息,即位移(offset)。Kafka源码中使用的名字是高水位,HW(high watermark)。

LEO和HW

每个分区副本对象都有两个重要的属性:LEO和HW

LEO:即日志末端位移(log end offset),记录了该副本日志中下一条消息的位移值。如果 LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。另外,Leader LEO和 Follower LEO的更新是有区别的。

HW:即上面提到的水位值。对于同一个副本对象而言,其HW值不会大于LEO值。小于等于 HW值的所有消息都被认为是“已备份”的(replicated)。Leader副本和Follower副本的HW更新不同

上图中,HW值是7,表示位移是 0~7 的所有消息都已经处于“已提交状态”(committed),而LEO值是14,8~13的消息就是未完全备份(fully replicated)——为什么没有14?LEO指向的是下一条消息到来时的位移。

消费者无法消费分区下Leader副本中位移大于分区HW的消息

Follower副本何时更新LEO

Follower副本不停地向Leader副本所在的broker发送FETCH请求,一旦获取消息后写入自己的日志中进行备份。那么Follower副本的LEO是何时更新的呢?首先我必须言明,Kafka有两套Follower副本

LEO:

1. 一套LEO保存在Follower副本所在Broker的副本管理机中;

2. 另一套LEO保存在Leader副本所在Broker的副本管理机中。Leader副本机器上保存了所有的follower副本的LEO。

Kafka使用前者帮助Follower副本更新其HW值;利用后者帮助Leader副本更新其HW。

1. Follower副本的本地LEO何时更新? Follower副本的LEO值就是日志的LEO值,每当新写入一条消息,LEO值就会被更新。当Follower发送FETCH请求后,Leader将数据返回给Follower,此时Follower开始Log写数据,从而自动更新LEO值。

2. Leader端Follower的LEO何时更新? Leader端的Follower的LEO更新发生在Leader在处理 Follower FETCH请求时。一旦Leader接收到Follower发送的FETCH请求,它先从Log中读取 相应的数据,给Follower返回数据前,先更新Follower的LEO。

Follower副本何时更新HW

Follower更新HW发生在其更新LEO之后,一旦Follower向Log写完数据,尝试更新自己的HW值。

比较当前LEO值与FETCH响应中Leader的HW值,取两者的小者作为新的HW值。

即:如果Follower的LEO大于Leader的HW,Follower HW值不会大于Leader的HW值。

Leader副本何时更新LEO

和Follower更新LEO相同,Leader写Log时自动更新自己的LEO值。

Leader副本何时更新HW值

Leader的HW值就是分区HW值,直接影响分区数据对消费者的可见性

Leader会尝试去更新分区HW的四种情况:

1. Follower副本成为Leader副本时:Kafka会尝试去更新分区HW。

2. Broker崩溃导致副本被踢出ISR时:检查下分区HW值是否需要更新是有必要的。

3. 生产者向Leader副本写消息时:因为写入消息会更新Leader的LEO,有必要检查HW值是否需要更新

4. Leader处理Follower FETCH请求时:首先从Log读取数据,之后尝试更新分区HW值

结论:

当Kafka broker都正常工作时,分区HW值的更新时机有两个:

1. Leader处理PRODUCE请求时

2. Leader处理FETCH请求时。

Leader如何更新自己的HW值?Leader broker上保存了一套Follower副本的LEO以及自己的LEO。当尝试确定分区HW时,它会选出所有满足条件的副本,比较它们的LEO(包括Leader的LEO),并选择最小的LEO值作为HW值

需要满足的条件,(二选一):

1. 处于ISR中

2. 副本LEO落后于Leader LEO的时长不大于 replica.lag.time.max.ms 参数值(默认是10s)

如果Kafka只判断第一个条件的话,确定分区HW值时就不会考虑这些未在ISR中的副本,但这些副本已经具备了“立刻进入ISR”的资格,因此就可能出现分区HW值越过ISR中副本LEO的情况——不允许。因为分区HW定义就是ISR中所有副本LEO的最小值

消息重复的场景及解决方案

消息重复和丢失是kafka中很常见的问题,主要发生在以下三个阶段:

1. 生产者阶段

2. broke阶段

3. 消费者阶段

生产者阶段重复场景

生产发送的消息没有收到正确的broke响应,导致生产者重试。

生产者发出一条消息,broke落盘以后因为网络等种种原因发送端得到一个发送失败的响应或者网络中断,然后生产者收到一个可恢复的Exception重试消息导致消息重复。

生产者发送重复解决方案

启动kafka的幂等性

要启动kafka的幂等性,设置: enable.idempotence=true ,以及 ack=all 以及 retries > 1

ack=0,不重试。 可能会丢消息,适用于吞吐量指标重要性高于数据丢失,例如:日志收集。

生产者和broke阶段消息丢失场景

ack=0,不重试

生产者发送消息完,不管结果了,如果发送失败也就丢失了。

ack=1,leader crash

生产者发送消息完,只等待Leader写入成功就返回了,Leader分区丢失了,此时Follower没来及同步,消息丢失

unclean.leader.election.enable 配置true

允许选举ISR以外的副本作为leader,会导致数据丢失,默认为false。生产者发送异步消息,只等待Lead写入成功就返回,Leader分区丢失,此时ISR中没有Follower,Leader从OSR中选举,因为OSR中本来落后于Leader造成消息丢失

解决生产者和broke阶段消息丢失

禁用unclean选举,ack=all

ack=all / -1,tries > 1,unclean.leader.election.enable=false

生产者发完消息,等待Follower同步完再返回,如果异常则重试。副本的数量可能影响吞吐量,不超过5个,一般三个。 不允许unclean Leader选举。

配置:min.insync.replicas > 1

当生产者将 acks 设置为 all (或 -1 )时, min.insync.replicas>1 。指定确认消息写成功需要的最小副本数量。达不到这个最小值,生产者将引发一个异常(要么是NotEnoughReplicas,要么是NotEnoughReplicasAfterAppend)。

当一起使用时, min.insync.replicas 和 ack 允许执行更大的持久性保证。一个典型的场景是创建一个复制因子为3的主题,设置min.insync复制到2个,用 all 配置发送。将确保如果大多数副本没有收到写操作,则生产者将引发异常。

失败的offset单独记录

生产者发送消息,会自动重试,遇到不可恢复异常会抛出,这时可以捕获异常记录到数据库或缓存,进行单独处理。

消费者数据重复场景及解决方案

数据消费完没有及时提交offset到broker。

消息消费端在消费过程中挂掉没有及时提交offset到broke,另一个消费端启动拿之前记录的offset开始消费,由于offset的滞后性可能会导致新启动的客户端有少量重复消费。

解决方案

取消自动提交

每次消费完或者程序退出时手动提交。这可能也没法保证一条重复。

下游做幂等

一般是让下游做幂等或者尽量每消费一条消息都记录offset,对于少数严格的场景可能需要把 offset或唯一ID(例如订单ID)和下游状态更新放在同一个数据库里面做事务来保证精确的一次更新或者在下游数据表里面同时记录消费offset,然后更新下游数据的时候用消费位移做乐观锁拒绝旧位移的数据更新。

__consumer_offsets

Kafka 1.0.2将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets主题,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。

创建topic “tp_test_01”

kafka-topics.sh --zookeeper node1:2181/myKafka --create --
topic tp_test_01 --partitions 5 --replication-factor 1

使用kafka-console-producer.sh脚本生产消息

[root@node1 ~]# for i in `seq 60`; do echo "hello lagou $i" >> messages.txt;
done
[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic
tp_test_01 < messages.txt 

由于默认没有指定key,所以根据round-robin方式,消息分布到不同的分区上。 (本例中生产了60条消息)

验证消息生产成功

kafka-run-class.sh  kafka.tools.GetOffsetShell
--broker-list node1:9092 --topic tp_test_01 --time  -1

创建一个console consumer group

kafka-console-consumer.sh
--bootstrap-server linux121:9092 --topic tp_test_01 --from-beginning

获取该consumer group的group id(后面需要根据该id查询它的位移信息)

kafka-consumer-groups.sh --bootstrap-server linux121:9092 --list

查询__consumer_offsets topic所有内容

注意:运行下面命令前先要在consumer.properties中设置exclude.internal.topics=false

kafka-console-consumer.sh --topic __consumer_offsets
--bootstrap-server node1:9092 --formatter
"kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"
--consumer.config config/consumer.properties --from-beginning 

默认情况下__consumer_offsets有50个分区,如果你的系统中consumer group也很多的话,那么这个命令的输出结果会很多。

计算指定consumer group在__consumer_offsets topic中分区信息

这时候就用到了group.id :console-consumer-77682

Kafka会使用下面公式计算该group位移保存在__consumer_offsets的哪个分区上:

Math.abs(groupID.hashCode()) % numPartitions 

__consumer_offsets的分区41保存了这个consumer group的位移信息。

获取指定consumer group的位移信息

kafka-simple-consumer-shell.sh --topic __consumer_offsets
--partition 41  --broker-list linux121:9092
--formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"

可以看到__consumer_offsets topic的每一日志项的格式都是:

[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]

延时队列

两个follower副本都已经拉取到了leader副本的最新位置,此时又向leader副本发送拉取请求,而leader副本并没有新的消息写入,那么此时leader副本该如何处理呢?可以直接返回空的拉取结果给follower副本,不过在leader副本一直没有新消息写入的情况下,follower副本会一直发送拉取请求,并且总收到空的拉取结果,消耗资源。

Kafka在处理拉取请求时,会先读取一次日志文件,如果收集不到足够多(fetchMinBytes,由参数fetch.min.bytes配置,默认值为1)的消息,那么就会创建一个延时拉取操作(DelayedFetch)以等待拉取到足够数量的消息。当延时拉取操作执行时,会再读取一次日志文件,然后将拉取结果返回给follower副本。

延迟操作不只是拉取消息时的特有操作,在Kafka中有多种延时操作,比如延时数据删除、延时生产等。

对于延时生产(消息)而言,如果在使用生产者客户端发送消息的时候将acks参数设置为-1,那么就意味着需要等待ISR集合中的所有副本都确认收到消息之后才能正确地收到响应的结果,或者捕获超时异常。

假设某个分区有3个副本:leader、follower1和follower2,它们都在分区的ISR集合中。不考虑ISR变动的情况,Kafka在收到客户端的生产请求后,将消息3和消息4写入leader副本的本地日志文件。

由于客户端设置了acks为-1,那么需要等到follower1和follower2两个副本都收到消息3和消息4后才能告知客户端正确地接收了所发送的消息。如果在一定的时间内,follower1副本或follower2副本没能够完全拉取到消息3和消息4,那么就需要返回超时异常给客户端。生产请求的超时时间由参数request.timeout.ms配置,默认值为30000,即30s。

那么这里等待消息3和消息4写入follower1副本和follower2副本,并返回相应的响应结果给客户端的动作是由谁来执行的呢?在将消息写入leader副本的本地日志文件之后,Kafka会创建一个延时的生产操作(DelayedProduce),用来处理消息正常写入所有副本或超时的情况,以返回相应的响应结果给客户端。

延时操作需要延时返回响应的结果,首先它必须有一个超时时间(delayMs),如果在这个超时时间内没有完成既定的任务,那么就需要强制完成以返回响应结果给客户端。其次,延时操作不同于定时操作,定时操作是指在特定时间之后执行的操作,而延时操作可以在所设定的超时时间之前完成,所以延时操作能够支持外部事件的触发。

就延时生产操作而言,它的外部事件是所要写入消息的某个分区的HW(高水位)发生增长。也就是说,随着follower副本不断地与leader副本进行消息同步,进而促使HW进一步增长,HW每增长一次都会检测是否能够完成此次延时生产操作,如果可以就执行以此返回响应结果给客户端;如果在超时时间内始终无法完成,则强制执行。

延时拉取操作,是由超时触发或外部事件触发而被执行的。超时触发很好理解,就是等到超时时间之后触发第二次读取日志文件的操作。外部事件触发就稍复杂了一些,因为拉取请求不单单由follower副本发起,也可以由消费者客户端发起,两种情况所对应的外部事件也是不同的。如果是follower副本的延时拉取,它的外部事件就是消息追加到了leader副本的本地日志文件中;如果是消费者客户端的延时拉取,它的外部事件可以简单地理解为HW的增长。

kafka 脚本发送_Kafka笔记归纳(第五部分:一致性保证,消息重复消费场景及解决方式)...相关推荐

  1. 聊聊 Kafka:Kafka 消息重复的场景以及最佳实践

    一.前言 上一篇我们讲了 聊聊 Kafka:Kafka 消息丢失的场景以及最佳实践,这一篇我们来说一说 Kafka 消息重复的场景以及最佳实践. 我们下面会从以下两个方面来说一下 Kafka 消息重复 ...

  2. 多线程顺序消费MySQL数据_关于MQ的几件小事(五)如何保证消息按顺序执行

    1.为什么要保证顺序 消息队列中的若干消息如果是对同一个数据进行操作,这些操作具有前后的关系,必须要按前后的顺序执行,否则就会造成数据异常.举例: 比如通过mysql binlog进行两个数据库的数据 ...

  3. kafka 重复消费场景及解决方案

    1.与消费者有关的重要参数 在讨论重复消费之前,首先介绍一下kafka中几个跟消费有关的配置参数. enable.auto.commit 默认值true,表示消费者会周期性自动提交消费的offset ...

  4. kafka修改分区数_Kafka笔记

    一.kafka基本介绍 1概念:是一个分布式的基于发布/订阅模式的消息队列,应用于大数据实时处理 1.消息队列(topic): 优点:解耦 可恢复性 缓冲 削峰 异步通信 两种模式: 点对点模式:一对 ...

  5. kafka 脚本发送_NWPC消息平台:在ecFlow系统中发送产品事件消息

    本文属于介绍 NWPC 消息平台 系列文章. 本文介绍如何在基于 ecFlow 构建的数值预报业务系统中发送 NWPC 消息平台的 产品事件消息. 介绍 数值预报业务系统产品制作一般分为三个步骤: 监 ...

  6. kafka如何保证不重复消费又不丢失数据_Kafka写入的数据如何保证不丢失?

    我们暂且不考虑写磁盘的具体过程,先大致看看下面的图,这代表了 Kafka 的核心架构原理. Kafka 分布式存储架构 那么现在问题来了,如果每天产生几十 TB 的数据,难道都写一台机器的磁盘上吗?这 ...

  7. kafka 脚本发送_Apache-Flink深度解析-DataStream-Connectors之Kafka

    聊什么 为了满足本系列读者的需求,在完成<Apache Flink 漫谈系列(14) - DataStream Connectors>之前,我先介绍一下Kafka在Apache Flink ...

  8. 2020年PMP笔记归纳第五章项目管理范围

    第五章 项目范围管理 掌握第五章知识点 学习内容: 内容章节 5.1 规划范围管理 5.2 收集需求 5.3 定义范围 5.4 创建WBS 5.5 确认范围 5.6 控制范围 第五章PMBOK概述中的 ...

  9. 【opencv学习笔记】第五篇:访问图像中像素的三种方式、ROI区域图像叠加和图像混合

    1. 访问图像中像素的三种方式 任何图像处理算法,都是从操作每个像素开始的.在OpenCV中,提供了三种访问每个像素的方法. 方法1:指针访问:C操作符[] 方法2:迭代器iterator 方法3:动 ...

最新文章

  1. oracle 查询、创建、删除 数据库用户
  2. gcc编译选项的循环重复查找依赖库等命令
  3. mysql rpm 安装6_linux6.5 RPM方式安装 mysql5.6
  4. 信息学奥赛一本通(1253:抓住那头牛)
  5. DAG最长路问题 hdu-1224
  6. xcode调试打印QString
  7. linux之间远程拷贝文件
  8. python-数字(int)知识整理
  9. 五种百度云盘下载速度慢解决方法
  10. 11开根号不用计算机,数学开根号有什么方法?不用计算器
  11. tomcat启动异常:org.apache.catalina.deploy.WebXml addFilter或者the JDBC Driver has been forcibly unregister
  12. 易观国际邓中元:移动互联网竞合并存
  13. 华为语音网关iad208e(m)华为8口语音网关web界面
  14. 什么是商业智能(BI),就看这篇文章足够了
  15. 蜗窝科技 spin lock (讲的非常不错)
  16. 网络原理——网络层与数据链路层
  17. 微博android升级7.000,华为 Android 7.0 升级计划曝光:G9 青春版 /Nova 也有份
  18. 服务器””上的 MSDTC 不可用。解决办法
  19. 电脑安装linux点歌系统,如何组装单机版电脑点歌系统
  20. VR技术成为国家新基建项目主力军

热门文章

  1. php 自带多进程,php多进程实现
  2. java传递实例_Java方法的参数传递机制实例详解
  3. SpringBoot2 整合 AXIS2 服务端和客户端
  4. SQL算法中的变量使用占位符动态赋值
  5. 怎样把间隔的几个commit整理成1个呢?
  6. Linux环境安装并配置Maven
  7. 软件设计师 - 算法思想
  8. 我好像明白了如何画序列图了
  9. c++检测输入是否为数字_Go64 for Mac(检测应用是否为64位)
  10. python判断阿姆斯特朗数_Python 程序检查阿姆斯特朗数