报错信息

Commit cannot be completed since the group has already rebalanced and assigned the partitions

如何理解

这里是说提交commit失败, 因为这个组已经重新分配了

产生原因

正常情况下, kafka会有一个配置用于设置一条消息的过期时间, 在规定时间内, 如果消费者提交了消费完成的信息, 那么就可以正常的分配下一条记录给消费者, 并且将当前记录的状态记为"已消费"状态, 对消息队列做一个标识, 避免重复消费

如何解决

kafka中配置的规定返回消息时间, 默认是300s, 也就是5分钟, 但是有一些业务逻辑处理起来比较复杂, 数据量又比较庞大, 那么5分钟是肯定处理不完的, 比如导入一个5G的文件, 然后逐条插入数据库, 这就需要消耗很长时间, 所以需要设置一下kafka的最大间隔时间
在application-dev.yml文件中配置如下

也就是配置

spring:kafka:consumer:properties:max.poll.interval.ms: 86400000

86400000是一天的毫秒数, 我这个业务需求有一天一夜足矣

至此, 问题完美修复!

其它参考方案

  1. 调大max.poll.interval.ms(两次poll方法最大时间间隔),默认时间为300000ms
  2. 调小max.poll.records(一次最多处理的记录数量),默认500
  3. 启动多个线程并行处理数据,但要注意处理完一批消息后才能提交offset,然后进行下次的poll(会用到CountDownLatch)

修改配置参数,调大间隔,调小一次处理的最大任务数量

props.put("max.poll.records", 8);
props.put("max.poll.interval.ms", "30000");
props.put("session.timeout.ms", "30000");

使用多线程并行处理

@Scheduled(fixedRate = 5000)
public void processing()
{//如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。//如果队列中有消息,立即消费消息,每次消费的消息的多少//可以通过max.poll.records配置ConsumerRecords<String, String> records = consumer.poll(3000);if (records.count() == 0){return;}Iterator<ConsumerRecord<String, String>> iterator = records.iterator();CountDownLatch countDownLatch = new CountDownLatch(records.count());ConsumerRecord array[] = new ConsumerRecord[records.count()];int i;for (i = 0; i < records.count(); ++i){array[i] = iterator.next();}for (i = 0; i < records.count(); ++i){final int id = i;if (id < records.count() - 1){new Thread(()-> {disposeOneRecord(array[id],false);countDownLatch.countDown();}).start();}else{new Thread(()-> {disposeOneRecord(array[id],true);countDownLatch.countDown();}).start();}}try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}consumer.commitAsync();logger.info(String.format("Successfully processing %d records", records.count()));
}private void disposeOneRecord(ConsumerRecord<String, String> record, boolean saveInRedis)
{String[] split;DCSPoint point;String rowKey, qualifier, value;List<Put> putList = new ArrayList<>();Map<String,Object> tagAndValue = JSONObject.parseObject(record.value()).getInnerMap();for (String tag : tagAndValue.keySet()) {split = tag.split("_");if (split.length != 2){continue;}try {point = DCSPoint.valueOf(split[1].toUpperCase());}catch (IllegalArgumentException e){continue;}if (point.getSection() == Section.UNKNOWN || point.getDataType() != DataType.REAL){continue;}value = tagAndValue.get(tag).toString();if (saveInRedis){RedisConfig.masterRedis.set(tag, value);}rowKey = split[0] + "_" + record.key();qualifier = split[1];putList.add(HBaseDaoUtil.cellPut(rowKey, HBaseConfig.FAMILY,qualifier,value));}hBaseDao.adds(HBaseConfig.TABLE_NAME, putList);
}

Commit cannot be completed since the group has already rebalanced and assign相关推荐

  1. Commit cannot be completed since the group has already rebalanced and assigned the

    问题场景 我们项目有个订单出库的业务,作为kafka消费者去消费商城项目给我们发的消息. 某次突然出现大批订单的出库状态没改的情况,拉日志一看,报了如下异常. org.apache.kafka.cli ...

  2. kafka : CommitFailedException Commit cannot be completed since the group has already rebalanced

    1. 背景 读取本地kafka的数据,写入到远程CDH中KUDU表中. 1.1 项目 项目引入的kafka版本为 <kafka.version>1.0.1-kafka-3.1.1</ ...

  3. kafka自动提交offset失败:Auto offset commit failed

    今天在服务日志中观察数据的消费情况时,发现了一个如下的警告,而且每隔几秒就会出现一次,虽然只是个警告, Auto offset commit failed for group order_group: ...

  4. Kafka 异常 : DefaultOffsetCommitCallback.onComplete(ConsumerCoordinator.java:537) -Offset commit faile

    Kafka 异常 : DefaultOffsetCommitCallback.onComplete(ConsumerCoordinator.java:537) -Offset commit faile ...

  5. aiokafka:Heartbeat failed: local member_id was not recognized; resetting and re-joining group

    具体错误如下: Heartbeat failed: local member_id was not recognized; resetting and re-joining group Heartbe ...

  6. commit 提交失败, 消费者自动死掉

    https://blog.csdn.net/shibuwodai_/article/details/80678717 commit 提交失败, 消费者自动死掉, 报错: org.apache.kafk ...

  7. Flink1.4.0连接Kafka0.10.2时遇到的问题

    Flink1.4.0连接部署在Linux上的Kafka0.10.2时,报如下异常: org.apache.flink.streaming.connectors.kafka.FlinkKafkaCons ...

  8. java 连接kafka超时_java – Kafka KStreams – 处理超时

    我试图使用< KStream> .process()与Time Windows.of("name",30000)批量处理一些KTable值并发送它们.似乎30秒超过了消 ...

  9. kafka rebalance与数据重复消费问题

    问题和现象: 某个程序在消费kafka数据时,总是重复消费相关数据,仿佛在数据消费完毕之后,没有提交相应的偏移量.然而在程序中设置了自动提交:enable.auto.commit为true 检查日志, ...

  10. kafka一直rebalance故障,重复消费

    今天我司线上kafka消息代理出现错误日志,异常rebalance,而且平均间隔2到3分钟就会rebalance一次,分析日志发现比较严重.错误日志如下 08-09 11:01:11 131 pool ...

最新文章

  1. 吴裕雄 python 神经网络——TensorFlow 花瓣分类与迁移学习(1)
  2. iPIN CEO 杨洋:AI 还未被大规模用在工作中,缺的是认知智能
  3. shader 2: vertex, fragment, surf的区别
  4. 赞!这样构建微服务架构,实在是太轻松了!
  5. 基于 DataLakeAnalytics 做跨地域的数据分析
  6. ip网络基础知识及原理_关于网络测试的5个命令
  7. web大作业介绍自己的家乡_襄阳市恒大名都小学2018—2019年度寒假实践作业
  8. ajax 请求二进制流 图片 文件 XMLHttpRequest 请求并处理二进制流数据 之最佳实践
  9. DDD战略篇:架构设计的响应力
  10. Facebook的秘密服务器,竟藏着互联网的军事根源?
  11. background的认识(二)
  12. java SimpleDateFormat类浅析
  13. MongoDB:管道操作
  14. python 进行照片分类_python 照片文件名分类
  15. Ubuntu apt install / update错误前因后果: 连接失败 [IP: 91.189.91.* 80]
  16. 许奔创新社-第21问:如何唤醒创造力?
  17. python 函数报错TypeError: object of type 'int' has no len()
  18. python 召回率_使用sklearn获取精确性和召回率
  19. 安卓7.0核心破解示列
  20. Devops持续化集成

热门文章

  1. 精通css网页布局 pdf,精通CSS网页布局
  2. 利用接口(vue等)调用thinkphp6(tp6)验证码验证结果总是失败的解决方案
  3. 基于Java+Springboot+Vue校园志愿者管理系统设计与实现
  4. 电影TS/TC/SCR/R5/BD/HD/HC版本意思收集(转)
  5. upnp+摄相头捕捉服务器端程序
  6. 三维地图开发三维地图服务器
  7. CodeForces 312B Archer
  8. 谷歌浏览器SwitchyOmega插件下载安装
  9. 服务器共享文件设成禁止删除,服务器共享文件夹权限 禁止删除共享文件方法...
  10. 双向链表(double linked list)