我在环境中发现代码里面的kafka有所延迟,查看kafka消息发现堆积严重,经过检查发现是kafka消息分区不均匀造成的,消费速度过慢。这里由自己在虚拟机上演示相关问题,给大家提供相应问题的参考思路。
这篇文章有点遗憾并没重现分区不均衡的样例和Warning: Consumer group ‘testGroup1’ is rebalancing. 这里仅将正确的方式展示,等后续重现了在进行补充。

主要有两个要点:

1、一个消费者组只消费一个topic.
2、factory.setConcurrency(concurrency);这里设置监听并发数为 部署单元节点*concurrency=分区数量

1、先在kafka消息中创建对应分区数目的topic(testTopic2,testTopic3)testTopic1由代码创建

./kafka-topics.sh --create --zookeeper 192.168.25.128:2181 --replication-factor 1 --partitions 2 --topic testTopic2

2、添加配置文件application.properties

kafka.test.topic1=testTopic1
kafka.test.topic2=testTopic2
kafka.test.topic3=testTopic3
kafka.broker=192.168.25.128:9092
auto.commit.interval.time=60000
#kafka.test.group=customer-test
kafka.test.group1=testGroup1
kafka.test.group2=testGroup2
kafka.test.group3=testGroup3
kafka.offset=earliest
kafka.auto.commit=falsesession.timeout.time=10000
kafka.concurrency=2

3、创建kafka工厂

package com.yin.customer.config;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;/*** @author yin* @Date 2019/11/24 15:54* @Method*/
@Configuration
@Component
public class KafkaConfig {@Value("${kafka.broker}")private String broker;@Value("${kafka.auto.commit}")private String autoCommit;// @Value("${kafka.test.group}")//private String testGroup;@Value("${session.timeout.time}")private String sessionOutTime;@Value("${auto.commit.interval.time}")private String autoCommitTime;@Value("${kafka.offset}")private String offset;@Value("${kafka.concurrency}")private Integer concurrency;@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());//监听设置两个个分区factory.setConcurrency(concurrency);//打开批量拉取数据factory.setBatchListener(true);//这里设置的是心跳时间也是拉的时间,也就说每间隔max.poll.interval.ms我们就调用一次poll,kafka默认是300s,心跳只能在poll的时候发出,如果连续两次poll的时候超过//max.poll.interval.ms 值就会导致rebalance//心跳导致GroupCoordinator以为本地consumer节点挂掉了,引发了partition在consumerGroup里的rebalance。// 当rebalance后,之前该consumer拥有的分区和offset信息就失效了,同时导致不断的报auto offset commit failed。factory.getContainerProperties().setPollTimeout(3000);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}private ConsumerFactory<String,String> consumerFactory() {return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();//kafka的地址propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);//是否自动提交 OffsetpropsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);// enable.auto.commit 设置成 false,那么 auto.commit.interval.ms 也就不被再考虑//默认5秒钟,一个 Consumer 将会提交它的 Offset 给 KafkapropsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,  5000);//这个值必须设置在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。//zookeeper.session.timeout.ms 默认值:6000//ZooKeeper的session的超时时间,如果在这段时间内没有收到ZK的心跳,则会被认为该Kafka server挂掉了。// 如果把这个值设置得过低可能被误认为挂掉,如果设置得过高,如果真的挂了,则需要很长时间才能被server得知。propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionOutTime);propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);//组与组间的消费者是没有关系的。//topic中已有分组消费数据,新建其他分组ID的消费者时,之前分组提交的offset对新建的分组消费不起作用。//propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, testGroup);//当创建一个新分组的消费者时,auto.offset.reset值为latest时,// 表示消费新的数据(从consumer创建开始,后生产的数据),之前产生的数据不消费。// https://blog.csdn.net/u012129558/article/details/80427016//earliest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。// latest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);//不是指每次都拉50条数据,而是一次最多拉50条数据()propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);return propsMap;}
}

3、展示kafka消费者

@Component
public class KafkaConsumer {private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);@KafkaListener(topics = "${kafka.test.topic1}",groupId = "${kafka.test.group1}",containerFactory = "kafkaListenerContainerFactory")public void listenPartition1(List<ConsumerRecord<?, ?>> records,Acknowledgment ack) {logger.info("testTopic1 recevice a message size :{}" , records.size());try {for (ConsumerRecord<?, ?> record : records) {Optional<?> kafkaMessage = Optional.ofNullable(record.value());logger.info("received:{} " , record);if (kafkaMessage.isPresent()) {Object message = record.value();String topic = record.topic();Thread.sleep(300);logger.info("p1 topic is:{} received message={}",topic, message);}}} catch (Exception e) {e.printStackTrace();} finally {ack.acknowledge();}}@KafkaListener(topics = "${kafka.test.topic2}",groupId = "${kafka.test.group2}",containerFactory = "kafkaListenerContainerFactory")public void listenPartition2(List<ConsumerRecord<?, ?>> records,Acknowledgment ack) {logger.info("testTopic2 recevice a message size :{}" , records.size());try {for (ConsumerRecord<?, ?> record : records) {Optional<?> kafkaMessage = Optional.ofNullable(record.value());logger.info("received:{} " , record);if (kafkaMessage.isPresent()) {Object message = record.value();String topic = record.topic();Thread.sleep(300);logger.info("p2 topic :{},received message={}",topic, message);}}} catch (Exception e) {e.printStackTrace();} finally {ack.acknowledge();}}@KafkaListener(topics = "${kafka.test.topic3}",groupId = "${kafka.test.group3}",containerFactory = "kafkaListenerContainerFactory")public void listenPartition3(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {logger.info("testTopic3 recevice a message size :{}" , records.size());try {for (ConsumerRecord<?, ?> record : records) {Optional<?> kafkaMessage = Optional.ofNullable(record.value());logger.info("received:{} " , record);if (kafkaMessage.isPresent()) {Object message = record.value();String topic = record.topic();logger.info("p3 topic :{},received message={}",topic, message);Thread.sleep(300);}}} catch (Exception e) {e.printStackTrace();} finally {ack.acknowledge();}}}

查看分区消费情况:

kafka消息堆积及分区不均匀的解决方案相关推荐

  1. kafka消息堆积原因解析

    kafka消息堆积,可以调节如下两个参数 max.poll.records 一次调用poll()返回的最大记录数. 默认值500 就是一次最多拉取500条记录 max.poll.interval.ms ...

  2. kafka消息堆积且CPU过高代码优化

    kafka消息堆积且CPU过高代码优化 直接部署已有的代码程序到线上服务器,发现CPU立马升高500%左右,立马停掉服务并看源代码排查问题,翻看代码,发现通过多线程消费 kafka消息,根据对多线程的 ...

  3. 平时只会用Kafka发消息,昨天突然遇到一次Kafka消息堆积生产事故!

    前言 线上kafka消息堆积,所有consumer全部掉线,到底怎么回事? 最近处理了一次线上故障,具体故障表现就是kafka某个topic消息堆积,这个topic的相关consumer全部掉线. 整 ...

  4. 解决kafka 消息堆积问题的排查及调优

    一.背景说明 深夜接到客户紧急电话,反馈腾讯云 kafka 中有大量消息堆积未及时消费.每分钟堆积近 100w 条数据.但是查看 ES 监控,各项指标都远还没到性能瓶颈.后天公司就要搞电商促销活动,到 ...

  5. kafka 消息堆积解决

    一 :背景 线上kafka消费端因日志异常的解决导致消息堆积. 二 : 日志异常解决导致消息堆积 线上kafka消费端日志异常,频繁打印错误日志,服务器磁盘一天就满了,此时其他服务无法正常工作.报错如 ...

  6. kafka怎么查看消息堆积_Kafka集群消息积压问题及处理策略

    阅读原文​mp.weixin.qq.com 通常情况下,企业中会采取轮询或者随机的方式,通过Kafka的producer向Kafka集群生产数据,来尽可能保证Kafk分区之间的数据是均匀分布的. 在分 ...

  7. kafka 消息分发机制、分区和副本机制

    一.消息分发机制 1.1 kafka 消息分发策略 消息是 kafka 中最基本的数据单元,在 kafka 中,一条消息由key.value两部分构成,在发送一条消息 时,我们可以指定这个key,那么 ...

  8. kafka 消息队列

    kafka 消息队列 kafka 架构原理 大数据时代来临,如果你还不知道Kafka那就真的out了!据统计,有三分之一的世界财富500强企业正在使用Kafka,包括所有TOP10旅游公司,7家TOP ...

  9. 消息队列面试连环问:如何保证消息不丢失?处理重复消息?消息有序性?消息堆积处理?...

    大家好,我是 yes. 最近我一直扎在消息队列实现细节之中无法自拔,已经写了 3 篇Kafka源码分析,还剩很多没肝完.之前还存着RocketMQ源码分析还没整理.今儿暂时先跳出来盘一盘大方向上的消息 ...

  10. 如何利用Partitioner将消息路由到分区?

    文章目录 获取Topic的可用分区 不指定分区key 指定分区key 获取Topic的可用分区 发送消息时,有了元数据了,就要把消息路由到分区了.执行doSend方法中的对应方法: //使用Parti ...

最新文章

  1. linux内核单独安装,Linux内核编译与安装
  2. int能表示的数据范围(在VS2017下,int和long都是32位)
  3. 牛客网 暑期ACM多校训练营(第一场)J.Different Integers-区间两侧不同数字的个数-离线树状数组 or 可持久化线段树(主席树)...
  4. 2440启动文件分析
  5. http --- cookie与会话跟踪
  6. JavaFX UI控件教程(十六)之Separator
  7. 购买阿里云ECS服务器忘记终端管理密码或者没有设置
  8. Linux7/Redhat7/Centos7 安装Oracle 12C_配置VNC远程安装数据库_03
  9. Java构造时成员初始化的陷阱
  10. 计算机的组成结构6,计算机组成及结构.6.ppt
  11. c++ 中 define
  12. Linux内核开发_内核模块
  13. 计算机专业显示器英语,电脑显示器词汇 计算机英语词汇
  14. 微信小程序组件之间传值
  15. word里画的流程图怎么全选_怎么用word画流程图
  16. 计算机可移动磁盘无法显示图片,电脑不显示移动硬盘图标?两种解决办法
  17. 特别篇:公主,快放开那只巨龙
  18. java Complex 类
  19. fig-tlo_PHP-FIG,Quo Vadis?
  20. [转]汽车ARM攒机指南

热门文章

  1. 测开之路三十三:Flask实现扎金花游戏
  2. zigbee的各种profile【裁剪】
  3. 慧荣SM3271AD芯片U盘量产
  4. 20组事后诸葛亮会议总结
  5. mybatis 通配符
  6. word文档打不开、损坏了怎么修复
  7. C#模拟点击网页按钮,提交数据有关问题
  8. oracle数据库长连接和短连接,tcp 长连接与短连接
  9. 电子邮件注册帐号大全_电子邮件
  10. 关于模拟信号和数字信号的储存