讲真,我今年的双十一有点“背”,负责的Kafka集群出了一些幺蛾子,但正是这些幺蛾子,让我这个双十一过的非常充实,也让我意识到如果不体系化学习Kafka,是无法做到生产集群及时预警,将故障扼杀在摇篮中,因此也下定决心研读kafka的内核。

本文就先来分享一个让我始料未及的故障:Kafka生产环境大面积丢失消息

首先要阐述的是消息丢失并不是因为断电,而且集群的副本数量为3,消息发送端设置的acks=-1(all)。

这样严苛的设置,那为什么还会出现消息丢失呢?请听笔者慢慢道来。

1、故障现象

故障发生时,接到多个项目组反馈说消费组的位点被重置到几天前了,截图如下:

从上面的消费组延迟监控曲线上来看,一瞬间积压数从零直接飙升,初步怀疑是位点被重置了。

那位点为什么会被重置呢?

什么?你这篇文章不是说要讲Kafka为什么会丢消息吗?怎么你又扯说消费组位点被重置呢?标题党!!!

NO、NO、NO,各位看官,绝对不是文不对题,请带着这个疑问,与我共同探究吧。

2、问题分析

遇到问题,莫慌,讲道理,基于MQ的应用,消费端一般都会实现幂等,也就是消息可以重复被处理,并且不会影响业务,故解决的方式就是请项目组先评估一下,先人工将位点设置到出现问题的前30分钟左右,快速止血。

一波操作猛如虎,接下来就得好好分析问题产生的原因。

通过查看当时Kafka服务端的日志(server.log),可以看到如下日志:

上面的日志被修改的“面目全非”,其关键日志如下:

  • Member consumer-1-XX in group consumerGroupName has failed, removing it from the group
  • Preparing to rebalance group XXXX on heartbeat expiration

上面的日志指向性非常明显:由于心跳检测过期,消费组协调器将消费者从消费组中移除,重而触发重平衡。

消费组重平衡:当主题分区数量或消费者数量发生变化后,消费者之间需要对分区进行重新分配,实现消费端端负载均衡。

消息消费者在重平衡期间消费会全部暂停,当消费者重新完成分区的负载均衡后,继续从服务端拉起消息,此时消费端并不知道从哪个位置开始,故需要从服务端查询位点,使得消费者能从上次消费的位点继续消费。

现在出现消费位点被重置到最早位点,可以理解为位点丢失?那为什么会丢失位点呢?

无外乎如下两个原因:

  • 服务端丢失位点,导致客户端无法查询到位点
  • 客户端主动向服务端提交了-1,导致位点丢失

目前我们公司使用的Kafka版本为2.2.x,消费组的位点是存储在一个系统主题(__consumer_offsets)中,无论是服务器级别还是Topic级别,参数unclean.leader.election.enable都是设置为false,表示只有ISR集合中的副本才能参与Leader选举,这样就能严格保证位点消息并不会丢失或回到历史某一个位点。

查看客户端提交位点的API,发现用于封装客户端位点的实体类会对位点进行校验,代码截图如下:

如果传入的位点为-1,直接会抛出异常,故客户端并没有机会向服务端提交-1的位点,那位点为什么会丢失呢?

为了进一步探究,我们不得不将目光投向消费组在初次时是如何获取位点,从源码的角度去分析,从而寻找关键日志,并对日志文件进行对照,尝试得到问题的解。

2.1 客户端位点查找机制

为了探究客户端的位点获取机制,笔者详细阅读了消费者在启动时的流程,具体入口为KafkaConsumer的poll方法,其详细流程图如下所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FSkeQ4qz-1638798310716)(https://gitee.com/dingwpmz/blogimg/raw/master/20211203/Kafka%20resetOffsetIfNeeded%20%E4%BD%8D%E7%82%B9%E9%87%8D%E7%BD%AE%E8%B0%83%E7%94%A8%E9%93%BE%E8%B7%9F%E8%B8%AA(NEW)].png )

上述的核心要点说明如下:

  • 在消费者(KafkaConsumer)的poll方法消息时会调用updateAssignmentMetadataIfNeeded方法,该方法主要执行消费组初始化、消费组重平衡、获取消费位点等与元数据相关工作。
  • 如果当前消费组订阅的分区(重平衡后分配的分区)都存在位点,则返回true,说明无需更新位点。
  • 如果当前存在分配的分区没有正确的位点(例如一次重平衡后新增加的分区),此时需要向服务端发送查找位点请求,服务端查询__consumer_offsets主题,返回位点信息。
  • 如果查询到位点,输出DEBUG级别日志(Setting offset for partition),输出从服务端查询到的位点;如果未查询到位点,同样会输出DEBUG级别日志(Found no committed offset for partition)。
  • 如果没有查询到位点,则需要根据消费组配置的位点重置策略,其具体配置参数:auto.offset.reset,其可选值:
    • latest 最新位点
    • earliest 最早位点
    • none 不重置位点
  • 如果重置位点选择的是none,则会抛出NoOffsetForPartitionException异常。
  • 如果重置位点选择的是latest、earliest,则消费者将从查询到的位点开始消费,并输出DEBUG级别日志(Resetting offset for partition XX to offset XXXX.)

非常遗憾,消费者的位点查找机制,Kafka客户端打印的过程日志是DEBUG级别,这在生产环境基本是不会输出的,给我排查问题(找到足够的证据)带来了不便。

这里不得不吐槽一下Kafka输出日志的策略:位点的变更是一个非常关键的状态变更,而且输出这些日志的频率不会很大,日志级别应该使用INFO,而不是DEBUG。

Kafka的日志是Debug,故当时是无法找到证据进行辅助说明,只能排查出为什么会因为心跳超时而触发重平衡。

温馨提示:关于心跳为什么会超时,从而触发重平衡原因,将会在后续的故障分析相关的文章中详细阐述。

找到重平衡触发原因后,在测试环境进行压测并加以重现,同时将客户端日志级别设置为debug,从而查找证据,功夫不负有心人,完美的找到了上文中提到的三条日志:

  • Setting offset for partition
    第一次查询时找到了位点,并且不为-1,也不是最早位点。

  • Found no committed offset for partition
    后面反复进行重平衡,反复查询日志,竟然后面无法正确查询到位点,而是返回没有找到位点(返回-1)。

  • Resetting offset for partition XX to offset XXXX.
    根据重置策略进行了位点重置。

从上面的日志分析,也可以明确地出结论,服务端是有存储消费组的位点的,不然不会出现第一条日志,成功找到了一个有效的位点,只是在后续重平衡过程中,多次需要查询位点时,反而返回了-1,那服务端在什么情况下返回-1呢

Broker服务端处理心跳包的入口是kafkaApis的handleOffsetFetchRequest方法,找到获取位点的关键代码,如下所示:

从上面来看,服务端返回INVALID_OFFSET = -1L的情况如下:

  • 消费组元信息管理器中的缓存(内存)中并不存在该消费组,将返回-1,那又在什么情况下服务端会没有正在使用的消费组元信息呢?

    • __consumer_offsets主题的分区发生Leader选举,当前Broker中拥有的分区变更为follower后,与该分区对应的消费组的元信息将被移除。为什么会这样呢?
      这里背后的原因是Kafka中的消费组在Broker端需要选举出一个组协调器,用于协调消费组的重平衡,选举算法就是将消费组的名称取hashcode,得到的值与 consumer_offsets主题的分区数取模得到一个分区数,然后该分区的Leader节点所在的Broker为该消费组的组协调器,故分区Leader发生变化,与之关联的消费组的组协调器需要重新选举

    • 删除消费组时将器移出。

  • 消费组的状态为GroupState.Dead
    消费组状态变更为Dead,通常有如下几种情况:

    • 消费组被删除
    • __consumer_offsets分区leader发生变化,触发位点重新加载,要先将消费组状态变更为Dead,然后新的分区Leader所在机器上会加载新的位点,然后引导消费组重平衡。
  • 服务端中并没有存储该消费组的位点信息,说明该消费组还未提交过位点

那上面的情况,对于一个正在运行许久的消费组来说,上述这些情况会发生吗?查找服务端相关日志,可以明确看到大量__consumer_offsets相关分区发生leader选举,容易触发上述第一种情况,这样消费组发起的Offset Fetch请求是有可能返回-1,从而会引导消费组根据重置策略进行位点重置。

查看文章开头部分,消费组设置的重置策略选的是earliest,消费组在一瞬间消费积压从0飙升到几个亿,就能解释的通了。

看到这里,大家是不是会突然“后背发凉”,如果消费组配置的位点重置策略(auto.offset.reset)为latest,是不是很容易引起消息丢失,即一部分消费被跳过而不被消费,示意图说明如下:

本文就说到这里了,关于Kafka集群为什么会出现大量__consumer_offsets进行Leader选举,后续文章会一一展开,敬请持续关注我。

3、感想

讲真,由于Kafka服务端使用的编程语言为scala,笔者并没有尝试去看Kafka的源码,只是详细剖析了Kafka的消息发送、消息消费机制,本以为可以轻松驾驭公司各个项目关于Kafka使用层面的问题,但事实上也是如此,对项目组的咨询我应对起来得心应手,但一旦服务端出现问题,还是会有点茫然,当然我们有一套完备的集群问题出现应急方案,但一旦出现问题,尽管你能快速恢复,但故障一旦发生,损失就无法避免,故我们还是要对自己负责的内容研究透,提前做好巡检、根据体系化的知识提前规避故障的发生。

正例如大部分朋友应该知道kafka在后续版本中的消费位点是存储在系统主题__consumer_offsets中,但又有多少人知道,这个主题的分区一旦出现Leader选举,伴随而来的是一大堆消费组全部发生重平衡,导致消费组停止消费呢?

故笔者将下定决心,好好阅读一下kafka服务端相关源码,成体系化理解Kafka,在工作中更好的驾驭Kafka,《Kafka原理与实战》专栏在路上,有兴趣的朋友可以点击文章前的标签加以关注。

最后,期待您的点赞,您的点赞也是我最大的动力,我们下回见。

文章首发:https://www.codingw.net/posts/6d9026c7.html


一键三连(关注、点赞、留言)是对我最大的鼓励。

各位技术朋友们,我是《RocketMQ技术内幕》一书作者,CSDN2020博客之星TOP2,热衷于中间件领域的技术分享,维护「中间件兴趣圈」公众号,旨在成体系剖析Java主流中间件,构建完备的分布式架构体系,欢迎大家大家关注我,回复「专栏」可获取15个专栏;回复「PDF」可获取海量学习资料,回复「加群」可以拉你入技术交流群,零距离与BAT大厂的大神交流。

双十一期间Kafka以这种方式丢消息让我促不及防相关推荐

  1. 双十一期间Kafka以这种方式丢消息让我猝不及防

    讲真,我今年的双十一有点"背",负责的Kafka集群出了一些幺蛾子,但正是这些幺蛾子,让我这个双十一过的非常充实,也让我意识到如果不体系化学习Kafka,是无法做到生产集群及时预警 ...

  2. Kafka会不会丢消息

    本文来说下Kafka是否会丢消息的问题 文章目录 概述 认识Kafka 维基百科的定义 kafka架构 Kafka到底会不会丢失消息 生产者丢失消息 Kafka Broker丢失消息 消费者丢失消息 ...

  3. 面试官问:Kafka 会不会丢消息?怎么处理的?

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! Kafka存在丢消息的问题,消息丢失会发生在Broker, ...

  4. kafka消息处理失败后如何处理_面试题:Kafka 会不会丢消息?怎么处理的?

    Kafka存在丢消息的问题,消息丢失会发生在Broker,Producer和Consumer三种. Broker Broker丢失消息是由于Kafka本身的原因造成的,kafka为了得到更高的性能和吞 ...

  5. 返回的图片 buffer 怎么接收_面试题:Kafka 会不会丢消息?怎么处理的?

    Kafka存在丢消息的问题,消息丢失会发生在Broker,Producer和Consumer三种. Broker Broker丢失消息是由于Kafka本身的原因造成的,kafka为了得到更高的性能和吞 ...

  6. 刨根问底,Kafka消息中间件到底会不会丢消息

    大型互联网公司一般都会要求消息传递最大限度的不丢失,比如用户服务给代金券服务发送一个消息,如果消息丢失会造成用户未收到应得的代金券,最终用户会投诉. 为避免上面类似情况的发生,除了做好补偿措施,更应该 ...

  7. kafka 重复消费和数据丢失_刨根问底,Kafka消息中间件到底会不会丢消息

    大型互联网公司一般都会要求消息传递最大限度的不丢失,比如用户服务给代金券服务发送一个消息,如果消息丢失会造成用户未收到应得的代金券,最终用户会投诉. 为避免上面类似情况的发生,除了做好补偿措施,更应该 ...

  8. 发现kafka丢消息后的排查

    背景: 最近在用kafka做消息中间件,producer从hive中读取消息发送到kafka,后端storm对消息分类发送到elasticsearch建立索引. 问题: hive表中总共350万数据, ...

  9. rocketmq怎么保证数据不会重复_阿里架构师亲授:Kafka和RocketMQ的消息复制实现的差异点在哪?...

    众所周知,消息队列在收发两端,主要是依靠业务代码,配合请求确认的机制,来保证消息不会丢失的.而在服务端,一般采用持久化和复制的方式来保证不丢消息. 把消息复制到多个节点上,不仅可以解决丢消息的问题,还 ...

  10. 双十一期间电商公司程序员通宵压测都在忙什么?

    有情怀,有干货,有广告,微信搜索[三太子敖丙]关注这个不一样的程序员. 本文 GitHub https://github.com/JavaFamily 已收录,有一线大厂面试完整考点.资料以及我的系列 ...

最新文章

  1. 程序员娶妻子的经典准则
  2. 台式电脑如何截屏_如何选购台式电脑显卡?小白装机通俗易懂的独立显卡知识指南...
  3. java string 转 inputstream_String和inputstream互转【转文】
  4. 哲学上的终极问题:你在追求什么?
  5. JaveScript用二分法与普通遍历(冒泡)
  6. poj 2096 Collecting Bugs 概率dp入门题
  7. mac下载站,这个可以收藏看看
  8. 五方面入手精选数据库审计产品
  9. 【数模】模糊综合评价模型
  10. 2007中国优秀无线站点TOP50
  11. 解决:samba 无法访问,您可能没有权限使用网络资源,请与这台服务器管理员联系 指定的网络名不可用
  12. JAVA:18位身份证号码验证工具类(识别性别和生日、计算年龄)
  13. linux 进程共享内存同步,Linux使用共享内存通信的进程同步退出问题
  14. python turtle库画画
  15. 获取视频fps、总帧数
  16. python用for循环求和1到100_python使用for循环计算0-100的整数的和方法
  17. 现场工程师出手-PCAPHub与云SSH隧道稳妥实现异地LAN IIoT联测
  18. 软件测试之黑盒测试方法介绍及测试用例练习
  19. LDAP目录服务折腾之后的总结
  20. Linux命令--lsof

热门文章

  1. Python:1004 成绩排名
  2. DBA 小记 — 分库分表、主从、读写分离
  3. 二维码墓碑的技术探讨
  4. 2020年欧空局10m土地覆盖数据数据分享
  5. 毕业论文选题方法和论文各部分写作技巧
  6. 2012年九月六号阿里巴巴面试
  7. 谷歌浏览器,添加手机模拟器
  8. nginx报502错误
  9. HTML5 webSQL动态查询应用截图
  10. 传统项目管理和敏捷项目管理的区别是什么?