前文

我们在项目中使用到了kafka,但是后面发生的一系列事让我更加深刻去了解kafka,在前一段时间线上kafka一直出现一个问题消息堆积一直不消费, 重启服务后开始消费?
生产环境kafka在消费一段时间后,停止消费,服务重启后又继续开始消费,但是隔一段时间又会重复出现这个问题。后面在查阅大量资料后,总结出来这几种可能及解决方法,希望可以帮到有相同情况的朋友。

第一种:发生重平衡

这种情况应该是最多的也是最有可能的,网上查询也大多说的这个原因,完全符合我上面描述的情况,其实也很简单,无非就是发生了Rebalance。消费组的Rebalance就是对Topic分区的重新分配。

正常情况下消费组内加入新的消费者或老的消费者退出都会导致Rebalance,这种情况是无法避免的。但是某些特殊情况下,消费者会被误认为异常从而被踢出消费组,此时可能会导致消费异常,需要重点关注。

消费者被误认为异常从而被踢出消费组的场景如下:

  • 1未能及时发送心跳请求。

消费者以设置的heartbeat.interval.ms为间隔向broker发送心跳请求,如果broker在session.timeout.ms时间内没有收到消费者的心跳请求,broker会认为消费者异常,从而将其从消费组中踢出,然后开始新一轮的Rebalance。

  • 2消费者消费时间间隔过长。

消费者每次最多消费max.poll.records条消息,多数情况下客户端都会把一次消费到的数据处理完后才会开始下一次消费,如果单次消费的消息太多导致无法在max.poll.interval.ms时间内处理完或消息处理流程发生了异常(如需要写入后端数据库,后端数据库压力太大,时延增加等)导致消费时间增加,在max.poll.interval.ms时间内消费者没有发起下一次消费请求,broker认为消费者不活跃而将其踢出消费组,然后开始新一轮的Rebalance。

解决方法/排查思路:
场景一:未能及时发送心跳请求
解决方法:建议将session.timeout.ms值设置为heartbeat.interval.ms值3倍以上
示例:

#设置心跳线程同步超时时间,group coordinator检测consumer发生崩溃所需的时间。一个consumer group里面的某个consumer挂掉了,最长需要 session.timeout.ms 秒检测出来。
session.timeout.ms = 180000
#consumer要每 heartbeat.interval.ms 秒给coordinator发一个心跳包,心跳必须设置为低于会话超时,一般来说,session.timeout.ms 的值是 heartbeat.interval.ms 值的 3 倍以上。
heartbeat.interval.ms = 60000

场景二:消费者消费时间间隔过长
排查思路:

  1. 检查单条消息的处理时间是多久,处理max.poll.records条消息会不会超过max.poll.interval.ms时间。
  2. 消息处理流程是否有网络行为,如写数据库、调用后端API等,在发生Rebalance的场景下后端是否正常。

示例:

#一次调用poll()操作时返回的最大记录数,默认值为500, 获取的消息条数越多,需要处理的时间越长。每次拉取的消息数不能太多,需要保证在 max.poll.interval.ms 设置的时间内能消费完,否则会发生 rebalance。
max.poll.records=50
#消费者两次poll()之间的最大时间间隔。默认5分钟,简单说就是 consumer 每次消费消息的时长。如果消息处理的逻辑很重,那么时长就要相应延长。否则如果时间到了 consumer 还么消费完,broker 会默认认为 consumer 死了,发起 rebalance。
max.poll.interval.ms=500000

第二种:业务异常导致kafka消费线程停止

在业务中使用@KafkaListener注解来启动消费者线程,有了这个注解,spring-kafka就会帮我们启动消费者线程。如果我们业务异常了this.errorHandler.handle会帮我们处理,errorHandler默认是LoggingErrorHandler类,它里面很简单,就是抛出堆栈信息。

//在KafkaMessageListenerContainer类中private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> record,@SuppressWarnings("rawtypes") Producer producer) throws Error {try {if (this.acknowledgingMessageListener != null) {this.acknowledgingMessageListener.onMessage(record,this.isAnyManualAck? new ConsumerAcknowledgment(record): null);}else {this.listener.onMessage(record);}ackCurrent(record, producer);}catch (RuntimeException e) {if (this.containerProperties.isAckOnError() && !this.autoCommit && producer == null) {ackCurrent(record, producer);}if (this.errorHandler == null) {throw e;}try {this.errorHandler.handle(e, record);// 省略部分代码}catch (RuntimeException ee) {// 省略部分代码}return null;}

但是注意,它这里只是catch了RuntimeException这种类型的异常,对于其他异常,如Error这种它是不管的,也就是会往前面继续抛,好,我们回到最开始的run方法,就是while循环拉取消息那个地方,最终它会抛到这里去

catch (WakeupException e) {// Ignore, we're stopping
}catch (NoOffsetForPartitionException nofpe) {this.fatalError = true;ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);break;
}
catch (Exception e) {if (this.containerProperties.getGenericErrorHandler() != null) {this.containerProperties.getGenericErrorHandler().handle(e, null);}else {this.logger.error("Container exception", e);
}
}

这里会再次catch,保证run方法不退出,线程保持住继续拉取,但是发现没有,如果抛出的是Error,线程就退出了。如果我们业务中有代码抛出了这种Error类型的异常,消费者线程就会异常退出,也就是run方法结束!既然消费线程都退出了,还怎么拉取消息对吧,到此真相大白。

只是心跳线程还在,后续因为kafka会有检测消费者两次拉取间隔时长来判断消费者是否还活着,如果超过最大时长没有拉取(poll)就被踢掉,所以最后心跳线程也结束了,一切都结束了…

等等,这就完了吗?

还有个问题,线程是将Error往外面抛了,理论上JVM会帮我们打印出来堆栈,可是怎么没有看到异常堆栈信息呢? 为了弄清楚这个问题,又得回到上面提到的FutureTask这哥们,如果它run方法里面异常,不管你什么异常,如果往外抛就被捕捉到,并且最终将异常setException,也就是被吞掉了,熟悉JDK线程池的应该都知道,OK,分析到此可以收尾了。

处理方法:

消费者线程停止消费罪魁祸首其实是我们在业务中抛了Error类型的异常导致线程退出,异常被吞掉所以看不到异常堆栈,所以我们在开发业务过程要警惕这种错误异常的抛出,即使是有也要在业务代码中catch它,以免造成这种情况发生。

原文连接

第三种:消费者消费Topic失败,提示没有权限?

在项目中我们的kafka使用的华为mrs kafka对比于普通kafka就是封装了一些权限,对kafka的Ticop、消费等进行权限管理。其他没有任何区别。所以同一个消费组内有多个消费者,为每个消费者授权不同的Topic访问权限,某一消费者消费其中一个Topic时,提示消费失败,报错信息如下:Not authorized to access topics。

2022-01-16 17:42:27.234 ERROR [] [kafkaTopicNormalConsumer]  com.cloudwalk.portal.config.HwMrsKafkaConfig$1[76] - [kafkaTopicNormalConsumer]: Error due to
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [zhfx_warning, zhgl_warning, zhgd_warning, zhst_warning, zhyj_warning]
2022-01-16 17:42:27.192 ERROR [] [kafkaTopicNormalConsumer]  org.apache.kafka.clients.Metadata[299] - [Consumer instanceId=mh_id_c, clientId=consumer-1-mh_id_c, groupId=1] Topic authorization failed for topics [zhfx_warning, zhgl_warning, zhgd_warning, zhst_warning, zhyj_warning]

问题原因:消费组的leader在进行分区分配时,不会考虑某一个消费者的授权和订阅信息,只会根据消费组整体的订阅情况进行分区分配,此种情况下可能会给消费者分配到未授权的Topic,从而导致了上述问题的出现。

例如:消费组中有消费者A、B、C,A订阅并授权Topic 0、Topic 1、Topic 2,B订阅并授权Topic 3、Topic 4、Topic 5,C订阅并授权Topic 6、Topic 7、Topic 8,假设以上Topic都只有一个分区,消费组的leader会根据策略进行分区分配,分配的结果可能变成:A消费Topic 0、Topic 3、Topic 6,B消费Topic 1、Topic 4、Topic 7,C消费Topic 2、Topic 5、Topic 8。此时A对Topic 3和Topic 6是没有授权的,因此会出现“Not authorized to access topics”的报错。

处理方法:

如果业务要求所有消费者在同一个消费组内,即group.id相同,解决方法:为所有消费者授权相同的Topic访问权限。
如果消费者不需要在同一个消费组内,解决方法:修改group.id,让每个消费者单独在一个消费组内。

示例:

#用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group
group.id=mh-g-group

关于与kafka的爱恨交织相关推荐

  1. 手忙脚乱的快乐 谈谈Overcooked让人爱恨交织的多人合作机制

    前言 <Overcooked>作为近年来主机平台上少有的,叫好又叫座的多人合作游戏类型佳作,自诞生以来无数好友.爱人.同学.朋友等亲密关系在这款游戏的见证下从如胶似漆.你侬我侬变得夫妻反目 ...

  2. 细讲逻辑斯蒂回归与朴素贝叶斯、最大熵原理的爱恨交织(长文)

    好早之前就发现逻辑斯蒂回归好像和朴素贝叶斯里面的后验概率公式还有最大似然.信息熵.交叉熵.伯努利分布.回归分析.几率(odds)等等有着千丝万缕CZFZ(错综复杂).PSML(扑朔迷离)的关系.一直感 ...

  3. Windows8.1层出不穷的问题与爱恨交织的心态

    自从Win8.1出来之后,我就跟上大多程序员的步伐,装了用了看看. 起初感觉很漂亮,颠覆了我对windows的印象,虽然保留了win7的经典桌面,但是metro桌面确实很好看,磁贴的概念给人的感觉也是 ...

  4. 该不该放弃嵌入式,单片机这条路?(答主梦人亦冷:我与嵌入式软件开发爱恨交织7年)

    转载自知乎大佬 梦人亦冷的回答,问题:该不该放弃嵌入式,单片机这条路? 原文链接:https://www.zhihu.com/question/370606355/answer/1865920389 ...

  5. Flink与Kafka的爱恨情仇

    FlinkKafkaConsumer 源码剖析 FlinkKafkaConsumer 的继承关系如下图所示. 可以发现几个版本的 FlinkKafkaConsumer 都继承自 FlinkKafkaC ...

  6. 爱恨交织的编程语言 是什么吸引了你

    摘要:每门编程语言都有自身独特的地方,那么为什么有些语言会一直存活在我们周围,而有些语言却逐渐被人淡忘,是什么吸引你? 每名程序员至少知道两门以上的编程语言,有些甚至不是所谓的编程语言(比如Shell ...

  7. 细讲逻辑斯蒂回归与朴素贝叶斯、最大熵原理的爱恨交织(一)

    好早之前就发现逻辑斯蒂回归好像和朴素贝叶斯里面的后验概率公式还有最大似然.信息熵.交叉熵.伯努利分布.回归分析.几率(odds)等等有着千丝万缕CZFZ(错综复杂).PSML(扑朔迷离)的关系.一直感 ...

  8. 细讲逻辑斯蒂回归与朴素贝叶斯、最大熵原理的爱恨交织(五)

    第五节:分类器中的天真小弟 -- 朴素贝叶斯 朴素贝叶斯文本分类模型 考虑如下文本分类模型:P(yi,di)P(y_i, d_i)P(yi​,di​) 表示一篇文章以及它的 label 的联合概率.d ...

  9. c/c++进阶之爱恨交织的临时对象: 二、天使与魔鬼

    c/c++语言最让人称道的便是性能了,在大气科学.地球物理等等需要高性能计算方面c/c++语言都是不二之选.甚至在分布式领域,由于ssd固态硬盘和万兆网络的兴起,当IO不再成为分布式系统的瓶颈,CPU ...

最新文章

  1. 关于养花---感叹一把
  2. 11月11日:一个人的情人节
  3. hadoop学习——Hadoop核心组件
  4. 有关ElasticSearch的基本概念
  5. 案例精解:insert逻辑读暴增至20万,只因Oracle Recyclebin过大
  6. pythonredis实例_Python读写Redis数据库操作示例
  7. ie浏览器在线使用_关于登录深圳市住房公积金管理中心网站在线办理平台的温馨提示...
  8. docker部署kafka,k8s(helm)部署kafka
  9. Java笔记(1):final关键字
  10. ValueError: This model has not yet been built. Build the model first by calling `build()` or calling
  11. KST1G SD卡脚本提取JPG
  12. 【Python网络蜘蛛 · 1】:网络蜘蛛的基本介绍
  13. 家长进课堂 计算机ppt,家长进课堂之中华传统美德 成品ppt 三井小学一10班出品.ppt...
  14. android zip winrar,WinRAR Zip Unzip Archive
  15. 三年建模师告诉你3DMAX有没有前途
  16. NoteBook / 期货及衍生品基础(3)
  17. 推荐一款免费开源的GIF动图软件(录制,编辑,压缩)
  18. 如何通俗易懂地讲解牛顿迭代法?
  19. 1小时学会HTML5基础
  20. (转载)视频采集学习笔记

热门文章

  1. 使用CAD看图如何打印图纸的部分内容?
  2. 那年,我在亚马逊被骂成狗
  3. 花两万学的python,总结了一点初学者的小技巧,免费送给大家
  4. win10微信公众号视频打不开的解决办法
  5. 1.20 不定式 比较级
  6. MIND——Modality independent neighbourhood descriptor 模态无关邻域描述符
  7. COM与DLL的区别
  8. 清橙1485 Catch The Penguins 抓企鹅
  9. 拓展实践:系统函数的调用
  10. 淘宝逛逛,一个0成本适合新手的副业项目