最近遇到一个kafka分区提异常如下:

throwable:org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1151)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1081)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:253)
message:Consumer subtask 80 failed async Kafka commit

从报错信息来看是说,有新的消费者加入发生了rebalance,导致offset提交失败。解决办法是增加max.poll.interval.ms或减少max.poll.records。
然而这次遇到的这个报错跟通常的提交失败绝对不一样,调整这两个参数根本不起作用。
具体什么原因,怎么解决请往下看。

背景

公司有一批spark streaming任务需要迁移到flink。为了保证数据的一致性,所以采取的方式是:flink任务复用了spark streaming任务的groupid,从而在spark任务停掉之后,flink能从kafka记录的offset接着消费。然而理想很丰满,在实际操作时候就报了上面的异常。
flink程序正常消费数据,就是无法提交offset到kafka,虽说flink自己也有Checkpoint来记录offset,但是一旦Checkpoint丢失,那么必定会造成数据的不准确。而且还有后续任务会根据kafka的offset来判断的逻辑。

原因

查阅了一些资料后,找到答案。原因是因为spark streaming消费kafka采用的是subscribe模式,而flink采用的是assign模式。同一个groupid不能接受两种模式的offset提交,具体原因情况这篇文章的分析。

解决方案

通过将旧groupid的offset信息复制到一个新的groupid上,然后flink用新groupid来消费即可。
具体复制使用python-kafka来实现,脚本如下:

from kafka import KafkaAdminClient, KafkaConsumerclient = KafkaAdminClient(bootstrap_servers='broker:9092')
meta = client.list_consumer_group_offsets('groupid_1')
consumer = KafkaConsumer(bootstrap_servers='broker:9092', group_id='groupid_2')
consumer.commit(meta)

org.apache.kafka.clients.consumer.CommitFailedException相关推荐

  1. 【记一次kafka报org.apache.kafka.clients.consumer.CommitFailedException异常处理】

    项目场景: 项目中,使用到了kafka作为消息中间件,项目作为消费端,消费消息并进行业务处理 问题描述 在实际应用的过程中,发现偶尔但是一直存在的,有消费数据报:org.apache.kafka.cl ...

  2. java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord

    java消费kafka数据时报错 ERROR [Executor task launch worker for task 90] - Exception in task 0.0 in stage 54 ...

  3. 【Flink实战系列】Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/

    java.lang.AbstractMethodError: Method flink/stream/deserialization/PoJoDeserializationSchema.deseria ...

  4. java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 本文链接: Flink入门程序异常,记录一下跟大家分享. SLF4J: Failed to l ...

  5. spark2+kafka报错:java.lang.NoSuchMethodError:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe

    spark读取kafka数据 // Create DataFrame representing the stream of input lines from kafkaval lines = spar ...

  6. kafka生产者报错:[org.apache.kafka.clients.NetworkClient:600] - Error while fetching metadata with corre

    在测试kafka写入数据的时候一直在报错: ... [org.apache.kafka.clients.NetworkClient:600]   - Error while fetching meta ...

  7. Error:(3, 41) java: 程序包org.apache.kafka.clients.producer不存在 错误提示解决办法

    解决 mvn idea:module

  8. Apache Kafka Consumer 消费者集

    1.目标 在我们的上一篇文章中,我们讨论了Kafka Producer.今天,我们将讨论Kafka Consumer.首先,我们将看到什么是Kafka Consumer和Kafka Consumer的 ...

  9. Spring Apache Kafka教程

    在本SpringApache Kafka课程中,我们将学习如何在Spring Boot项目中开始使用Apache Kafka,并开始生成和使用我们所选主题的消息. 除了一个简单的项目外,我们还将深入探 ...

  10. 2.24. Spring boot with Apache Kafka

    Spring boot 1.5.1 2.24.1. 安装 kafka 一下安装仅仅适合开发环境,生产环境请使用这个脚本安装 https://github.com/oscm/shell/tree/mas ...

最新文章

  1. 田志刚:智慧的员工,个人知识管理
  2. Openstack 中的消息总线 AMQP
  3. java stringbu,Java String和StringBuilder常用方法,
  4. C# 通过pid获取hwnd / 通过pid查找hwnd
  5. 第一次作业-李纯锐201731084433
  6. 4、Linux的文件系统结构(目录树结构)
  7. oracle中怎么算奇数,oracle - 如何在oracle中获取奇数列 - SO中文参考 - www.soinside.com...
  8. 利用Javascrip实现web窗体的打开和关闭后的刷新
  9. 泛微oa主表赋值明细表_Java学习第89天--OA系统
  10. ExtJs4 笔记(12) Ext.toolbar.Toolbar 工具栏、Ext.toolbar.Paging 分页栏、Ext.ux.statusbar.StatusBar 状态栏...
  11. 京东联盟高级API - 京东联盟商品类目查询接口
  12. Keil5下载芯片包并导入教程
  13. ElasticSearch 7 中keyword和integer、long、short存储性能对比实验
  14. angularjs grunt uglify 报错
  15. Scratch3.0创意编程(基础篇):第9课 大鱼吃小鱼
  16. 软考之软件设计师——数据库技术基础
  17. 给 FreeBSD 12.1 安装 GNOME3 图形界面
  18. 华为上交 | GAN 将古典人像变3D,视角可切换
  19. 丁亥年戊申月戊戌日为朋友的工作,占得一震为雷 (六冲)卦。
  20. Hadoop详解(你想知道的这里都有!)

热门文章

  1. 远程桌面 android,Microsoft远程桌面
  2. MATLAB TIFF转Shape、TIFF和Shape的读写
  3. 题十:二叉搜索树与双向链表
  4. android小程序_测试大佬是如何进行百度/微信小程序自动化测试的?
  5. luci编程 openwrt_【玩转开源】BananaPi R2 —— 第四篇 Openwrt Luci 初探
  6. 电脑录像,笔记本电脑录像功能_笔记本电脑 录像
  7. 【海码学院】web前端基础入门JavaScript之JavaScript起源和基础语法学习笔记
  8. V831——识别指定的人脸
  9. 收到大量垃圾短信怎么办?如何屏蔽垃圾短信?
  10. 小学期 BlueSky学长与友人帐