Kafka 异常 : DefaultOffsetCommitCallback.onComplete(ConsumerCoordinator.java:537) -Offset commit failed

异常详情:

ConsumerCoordinator$DefaultOffsetCommitCallback.onComplete(ConsumerCoordinator.java:537) -Offset commit failed.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:600)at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:541)at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
环境参数,大数据环境 HDP, kafka 版本 2.0.0,spark版本 2.3.2

由于业务逻辑的修改,调大 SparkStreaming的批次间的时间间隔至5分钟。启动服务之后就会报上述的错误。

从报错内容来看是消费者在执行poll操作时超过了这个线程的默认最大的空闲时间,导致消费者组认为该消费者已离开消费者组,所以消费者组执行了再均衡操作,从而导致了sparkStreaming poll 失败。

解决方法:通过调大这个空闲时间的参数(max.poll.interval.ms)来解决这个问题。

但是现实情况确实即便是调大到1小时,这个错误依旧会出现。

继续查看日志,到任务启动阶段我们可以找到下面这部分Kafka消费者日志信息。

2022-03-29 14:09:49,285 INFO org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:178) -ConsumerConfig values:metric.reporters = []metadata.max.age.ms = 300000partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]reconnect.backoff.ms = 50sasl.kerberos.ticket.renew.window.factor = 0.8max.partition.fetch.bytes = 1048576bootstrap.servers = [192.168.1.177:6667, 192.168.0.130:6667, 192.168.0.117:6667]ssl.keystore.type = JKSenable.auto.commit = falsesasl.mechanism = GSSAPIinterceptor.classes = nullexclude.internal.topics = truessl.truststore.password = nullclient.id = consumer-1ssl.endpoint.identification.algorithm = nullmax.poll.records = 2147483647check.crcs = truerequest.timeout.ms = 600000heartbeat.interval.ms = 3000auto.commit.interval.ms = 5000receive.buffer.bytes = 65536ssl.truststore.type = JKSssl.truststore.location = nullssl.keystore.password = nullfetch.min.bytes = 1send.buffer.bytes = 131072value.deserializer = class org.apache.kafka.common.serialization.StringDeserializergroup.id = tinyeyePortScanretry.backoff.ms = 100sasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05ssl.trustmanager.algorithm = PKIXssl.key.password = nullfetch.max.wait.ms = 500sasl.kerberos.min.time.before.relogin = 60000connections.max.idle.ms = 480000session.timeout.ms = 300000metrics.num.samples = 2key.deserializer = class org.apache.kafka.common.serialization.StringDeserializerssl.protocol = TLSssl.provider = nullssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]ssl.keystore.location = nullssl.cipher.suites = nullsecurity.protocol = PLAINTEXTssl.keymanager.algorithm = SunX509metrics.sample.window.ms = 30000auto.offset.reset = latest2022-03-29 14:09:49,322 WARN org.apache.kafka.common.config.AbstractConfig.logUnused(AbstractConfig.java:186) -The configuration max.poll.interval.ms = 450000 was supplied but isn't a known config.
2022-03-29 14:09:49,326 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:83) -Kafka version : 0.10.0.1
2022-03-29 14:09:49,326 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:84) -Kafka commitId : a7a17cdec9eaa6c5

值得注意的是Kafka日志信息显示 没有 max.poll.interval.ms 这个配置项,并且 kafka 的版本为 0.10.0.1。

这说明我们的配置的kafka配置项没有生效。二是我们的kafka实际的集群版本是 2.0.0 与实际的消费者API版本不符。

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId><version>${spark.version}</version>
</dependency>

如果直接在maven项目中引用 spark-streaming-kafka-0-10_ 默认同时会引用 kafka-client 版本是 0.10.0.1的 kafka api,而我们需要配置的 max.poll.interval.ms 在0.10.0.1版本中还不是可配置项,所以才会出现上面的情况。

解决方法

在pom中添加

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.0</version>
</dependency>

配置完成之后, max.poll.interval.ms 参数生效,问题解决。

Kafka 异常 : DefaultOffsetCommitCallback.onComplete(ConsumerCoordinator.java:537) -Offset commit faile相关推荐

  1. kafka自动提交offset失败:Auto offset commit failed

    今天在服务日志中观察数据的消费情况时,发现了一个如下的警告,而且每隔几秒就会出现一次,虽然只是个警告, Auto offset commit failed for group order_group: ...

  2. 最新Kafka教程(包含kafka部署与基本操作、java连接kafka、spring连接kafka以及使用springboot)

    最新Kafka教程(包含kafka部署与基本操作.java连接kafka.spring连接kafka以及使用springboot) 欢迎转载,转载请注明网址:https://blog.csdn.net ...

  3. kafka学习总结(含java生产者、消费者、Topic操作代码)

    kafka(http://kafka.apache.org)是一款分布式消息发布和订阅的系统,具有高性能和高吞吐率.它的优点是能够直接使用磁盘进行存储.线性读写.速度快,避免了数据在JVM内存和系统内 ...

  4. 异常处理器详解 Java多线程异常处理机制 多线程中篇(四)

    在Thread中有异常处理器相关的方法 在ThreadGroup中也有相关的异常处理方法 示例 未检查异常 对于未检查异常,将会直接宕掉,主线程则继续运行,程序会继续运行 在主线程中能不能捕获呢? 我 ...

  5. 【异常】Caused by: java.lang.NoClassDefFoundError: org/aspectj/lang/annotation/Around

    [异常]Caused by: java.lang.NoClassDefFoundError: org/aspectj/lang/annotation/Around 参考文章: (1)[异常]Cause ...

  6. NullPointerException异常的原因及java异常??

    NullPointerException异常的原因及java异常?? 参考文章: (1)NullPointerException异常的原因及java异常?? (2)https://www.cnblog ...

  7. 受检异常 非受检异常_这样设计 Java 异常更优雅,赶紧学

    来源:Lrwinlrwinx.github.io/2016/04/28/如何优雅的设计java异常/ 导语 异常处理是程序开发中必不可少操作之一,但如何正确优雅的对异常进行处理确是一门学问,笔者根据自 ...

  8. 智能小车37:异常在ARM、JAVA、硬件里的实现

    几乎所有编程语言都有异常,可以说有程序就有异常.今天学习Arm的中断(异常)处理,联想到Java的异常,硬件中如何实现等问题,下面给大家分享一下. 一.Arm的中断. 1.触发异常 2.保存现场 3. ...

  9. java 异常对象_在java中的异常处理中的异常对象是什么

    展开全部 Exception类以及他的子类 的一个实例对象 比如32313133353236313431303231363533e58685e5aeb931333264633563 常见异常 1. j ...

最新文章

  1. 高级程序员到底高级在哪里?
  2. Centos 安装 MySql
  3. mac要装anaconda吗_Anaconda安装教程|Windows,Linux ,Mac OS
  4. oracle命令窗口粘贴,Oracle数据库中的Copy命令
  5. webService学习4:客户端调用服务端的代码
  6. 脚手架 mixin (混入)
  7. 实用必备xp框架模块_两款实用工具类软件,是你的日常必备!
  8. splunk 提取字段_splunk 学习笔记之三[使用字段查找对照]
  9. LeetCode动态规划系列教程(下)
  10. 对Python的初认识以及期待
  11. php提取文本数据处理,PHP文件处理—读取文件(一个字符,字串)
  12. 阶段5 3.微服务项目【学成在线】_day01 搭建环境 CMS服务端开发_19-页面查询服务端开发-创建CMS服务工程-CMS工程结构...
  13. linux之 sed命令
  14. Android手机使用笔记本流量上网(基于Android便携式WLAN热点)
  15. AIO-3588MQ 车规级AI主板
  16. Linux权限drwxrwxrwx是什么意思?
  17. POI之excel固定模板导出
  18. 太阳能手机充电器设计
  19. html5之DeviceOrientation 手机重力与方向感应
  20. OpenCV常用函数记载

热门文章

  1. ISP基本框架及算法介绍
  2. IOT跨平台组件设计方案
  3. editplus怎么在键盘输入
  4. Invalid bound statement 无效的绑定 的解决办法!
  5. 斯特封和格莱圈的区别
  6. 谷歌浏览器打开index.html原型页面axure_chrome_V0.6.3
  7. cocos2d-x动画加速与减速
  8. C#利用QQ游戏破解QQ密码
  9. android 9 手机硬件性能,一加9系列系统评测:功能丰富+稳定流畅,或是目前最佳安卓系统...
  10. 题解【[AHOI2008]紧急集合 / 聚会】