1、与消费者有关的重要参数

在讨论重复消费之前,首先介绍一下kafka中几个跟消费有关的配置参数。

  • enable.auto.commit 默认值true,表示消费者会周期性自动提交消费的offset
  • auto.commit.interval.ms 在enable.auto.commit 为true的情况下,自动提交的间隔,默认值5000ms
  • max.poll.records 单次poll,消费者拉取的最大数据条数,默认值500
  • max.poll.interval.ms  默认值5分钟,表示若5分钟之内消费者没有消费完上一次poll的消息,那么consumer会主动发起离开g

roup的请求,造成rebalance。

2、消费者自动提交的时机

配置offset自动提交时,consumer什么时候自动提交呢。实际上kafka消费者调用poll方法的时机及自动提交的时机约束如下:

在auto-commit=true时,当上一次poll方法拉取的消息消费完时会进行下一次poll,在经过auto.commit.interval.ms间隔后,下一次调用poll时会提交所有已消费消息的offset。

为了验证consumer自动提交的时机,配置消费者参数如下:

//自动提交
props.put("enable.auto.commit", "true");//提交间隔30s
props.put("auto.commit.interval.ms", "30000");//单次拉取20条
props.put("max.poll.records", 20);

配置消费者代码如下:配置获取消费者offset代码如下:

@KafkaListener(topics = "test1", groupId = "group1")
public void fromKafka(ConsumerRecord record) throws InterruptedException {
System.out.println(new Date().toString()+"group111 "+record.toString()); Thread.sleep(1000);}//每5s获取一次消费者最新offset
@Scheduled(fixedRate = 5000)
public void schedule() throws TimeoutException { Map<TopicPartition, OffsetAndMetadata> offset1 = lagOf("group1","localhost:9092"); for (Map.Entry<TopicPartition, OffsetAndMetadata> entry:offset1.entrySet()){
System.out.println(new Date().toString() +"consumer group1:topic-"+entry.getKey().topic()+"partition-"+entry.getKey().partition()+" offset"+entry.getValue().offset());
}
}
public static Map<TopicPartition, OffsetAndMetadata> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (AdminClient client = AdminClient.create(props)) {
ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID); try {Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
return consumedOffsets;
} catch (Exception e){return Collections.emptyMap();
} } }

对topic test1配置了消费者组group1,单次拉取消息数20条,消费者组group1每条消息耗费1s,记录日志打印结果如下:

Mon Feb 10 11:54:39 CST 2020consumer group1:topic-test2partition-0 offset120
Mon Feb 10 11:54:39 CST 2020group111 ConsumerRecord(topic = test1, partition = 0, offset = 125
...
Mon Feb 10 11:55:19 CST 2020consumer group1:topic-test2partition-0 offset160
Mon Feb 10 11:55:19 CST 2020group111 ConsumerRecord(topic = test1, partition = 0, offset = 165 ...
Mon Feb 10 11:55:59 CST 2020consumer group1:topic-test2partition-0 offset200
Mon Feb 10 11:55:59 CST 2020group111 ConsumerRecord(topic = test1, partition = 0, offset = 205

从日志中可以看出,消费组的offset每40s更新一次,因为每次poll会拉取20条消息,每个消息消费1s,在第一次poll之后,下一次poll因为没有达到auto.commit.interval.ms=30s,所以不会提交offset。第二次poll时,已经经过40s,因此这次poll会提交之前两次消费的消息,offset增加40.

3、消费重复的场景

在enable.auto.commit 默认值true情况下,出现重复消费的场景有以下几种:

3.1 consumer 在消费过程中,应用进程被强制kill掉或发生异常退出。

例如在一次poll500条消息后,消费到200条时,进程被强制kill消费到offset未提交,或出现异常退出导致消费到offset未提交。下次重启时,依然会重新拉取500消息,造成之前消费到200条消息重复消费了两次。

解决方案:可在spring中配置异常处理回调类,在发生异常时正确处理未提交的offset

3.2 消费者消费时间过长

max.poll.interval.ms参数定义了两次poll的最大间隔,它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。

配置消费者参数如下:

props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
//单次poll拉取11条消息 props.put("max.poll.records", 11);
//消费代码,单条消息消费30s
@KafkaListener(topics = "test2", groupId = "group22")
public void fromKafka1(ConsumerRecord record) {
System.out.println(new Date().toString() +": group222 "+record.toString());
try {
Thread.sleep(30000);
} catch (InterruptedException e) {e.printStackTrace(); } }

单次拉取11条消息,每条消息耗时30s,11条消息耗时5分钟30秒,由于max.poll.interval.ms  默认值5分钟,所以消费者无法在5分钟内消费完,consumer会离开组,导致rebalance。在消费完11条消息后,consumer会重新连接broker,再次rebalance,因为上次消费的offset未提交,再次拉取的消息是之前消费过的消息,造成重复消费。

日志如下:

Tue Feb 11 17:29:33 CST 2020: group222 ConsumerRecord(topic = test2, partition = 0, offset = 100, CreateTime = 1581306569687, serialized key size = 3, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = 100, value = abcd)

Tue Feb 11 17:30:03 CST 2020: group222 ConsumerRecord(topic = test2, partition = 0, offset = 101, CreateTime = 1581306569687, serialized key size = 3, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = 101, value = abcd)

......

Tue Feb 11 17:34:33 CST 2020: group222 ConsumerRecord(topic = test2, partition = 0, offset = 110, CreateTime = 1581306569688, serialized key size = 3, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = 110, value = abcd)

2020-02-11 17:35:03.513 WARN 53544 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=group22] Synchronous auto-commit of offsets {test2-0=OffsetAndMetadata{offset=111, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

2020-02-11 17:35:03.513 INFO 53544 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=group22] Revoking previously assigned partitions [test2-0]

2020-02-11 17:35:03.513 INFO 53544 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [test2-0]

2020-02-11 17:35:03.513 INFO 53544 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=group22] (Re-)joining group

2020-02-11 17:35:03.521 INFO 53544 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=group22] Successfully joined group with generation 57

2020-02-11 17:35:03.522 INFO 53544 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=group22] Setting newly assigned partitions [test2-0]

2020-02-11 17:35:03.627 INFO 53544 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [test2-0]

Tue Feb 11 17:35:03 CST 2020: group222 ConsumerRecord(topic = test2, partition = 0, offset = 100, CreateTime = 1581306569687, serialized key size = 3, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = 100, value = abcd)

可以看到在消费完第11条消息后,提交offset失败,因为消费时间太长,已经rebalance。之后,重新分配partition后,再次poll依然从之前消费过的消息处开始消费,造成重复消费。

解决方案:

1、提高消费能力,提高单条消息的处理速度;根据实际场景可讲max.poll.interval.ms值设置大一点,避免不必要的rebalance;可适当减小max.poll.records的值,默认值是500,可根据实际消息速率适当调小。这种方法可解决因消费时间过长导致的重复消费问题,但无法避免极端情况

2、可生成消息时,可加入唯一标识符如消息id。在消费端,保存最近的1000条消息(可配置)id存入到redis或mysql中,配置max.poll.records的值小于1000,消费的消息时通过前置表去重。

参考文档:

1、https://stackoverflow.com/questions/38230862/need-clarification-about-kafka-auto-commit-and-auto-commit-interval-ms?r=SearchResults

2、https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html

3、 Kafka中位移提交那些事儿

kafka 重复消费场景及解决方案相关推荐

  1. kafka 脚本发送_Kafka笔记归纳(第五部分:一致性保证,消息重复消费场景及解决方式)...

    写在开头: 本章是Kafka学习归纳第五部分,着重于强调Kafka的事一致性保证,消息重复消费场景及解决方式,记录偏移量的主题,延时队列的知识点. 文章内容输出来源:拉勾教育大数据高薪训练营. 一致性 ...

  2. kafka重复消费问题

    开篇提示:kafka重复消费的根本原因就是"数据消费了,但是offset没更新"!而我们要探究一般什么情况下会导致offset没更新? 今天查看Elasticsearch索引的时候 ...

  3. 什么?搞不定Kafka重复消费?

    来自:架构之美 前言 今天我们聊一个话题,这个话题大家可能在面试过程中,或者是工作当中经常遇到 ????如何保证 Kafka 消息不重复消费?我们在做开发的时候为了程序的健壮性,在使用 Kafka 的 ...

  4. kafka 重复消费和数据丢失_刨根问底,Kafka消息中间件到底会不会丢消息

    大型互联网公司一般都会要求消息传递最大限度的不丢失,比如用户服务给代金券服务发送一个消息,如果消息丢失会造成用户未收到应得的代金券,最终用户会投诉. 为避免上面类似情况的发生,除了做好补偿措施,更应该 ...

  5. 三张表有重复字段_什么?搞不定Kafka重复消费?

    点戳蓝字"架构之美"关注我们哦! 前言 今天我们聊一个话题,这个话题大家可能在面试过程中,或者是工作当中经常遇到 ?如何保证 Kafka 消息不重复消费?我们在做开发的时候为了程序 ...

  6. Rabbitmq消息可靠投递和重复消费等问题解决方案

    消息的可靠性投递 在一些对数据一致性要求较高的业务场景里面,如果消息在发布和消费过程中出现了问题(消息丢失,消息重复消费),就会导致数据不一致,要做到消息的可靠性投递. 在RabbitMq里面提供了很 ...

  7. RocketMQ消息重复消费场景及解决办法

    消息重复消费是各个MQ都会发生的常见问题之一,在一些比较敏感的场景下,重复消费会造成比较严重的后果,比如重复扣款等. 那么在什么情况下会发生RocketMQ的消息重复消费呢? 当系统的调用链路比较长的 ...

  8. 如何避免Kafka的重复消费

    从Producer和Consumer两个角度分析重复消费的问题. Producer端 消息重复场景 Producer的send()方法可能会出现异常,配合生产者参数retries>0,生产者会在 ...

  9. Kafka会不会重复消费

    本文来说下kafka会不会重复消费的问题.在单体架构时代,就存在着接口幂等性的问题,只不过到了分布式.高并发的场景之后,接口幂等性的问题会更加明显. 文章目录 概述 消息重复消费问题 解决方案 方案一 ...

最新文章

  1. tkinter实现文件加密和解密
  2. mfc 弹框只出现一次_只出现一次的数字
  3. vue2.0中引入wangEditor2 步骤与坑
  4. java update多个字段的值_SQL的update多个字段的写法
  5. vue Iframe
  6. php 逗号运算符,基础篇PHP运算符总结宝典
  7. c修改datatable单元格的值_神奇的VBA编程:批量拆分单元格数据
  8. 查看家庭组组计算机用户名密码是什么,windows10系统如何查看家庭组密码
  9. 【转】刨根究底正则表达式(2):文本查找方式的演化历史
  10. Mac - 苹果电脑mac系统释放硬盘空间方法汇总
  11. vue如何判断已经有定时器在执行_中国股市:如何判断当日涨跌?“分时图”已经全部告诉你了...
  12. ListT清除重复某一项
  13. 使用 GreenSock 来制作 SVG 动画
  14. select 默认选中问题
  15. SSD浅层网络_ssd目标检测
  16. python第七天作业
  17. LED屏幕上轮流显示三色条纹、彩虹、四叶草(数组与内存映射的采用)
  18. c语言编写图书检索系统,求C语言编写图书管理系统
  19. hadoop错误:java.io.IOException: There appears to be a gap in the edit log. We expected txid 1
  20. NLP入门之综述阅读-基于深度学习的自然语言处理研究综述

热门文章

  1. CNCF 2020 China Interesting Talks
  2. IE浏览器JS设置样式右转45度,透明
  3. MySQL数据库小白入门0606
  4. 模版的分离编译 解决方案
  5. [ExpOS]开发经验
  6. 猿创征文|ES索引字段映射类型以及ES底层打分逻辑
  7. Geekbench - Cross-Platform Benchmark - 跨平台跑分
  8. 荣耀7.0系统机器最简单激活Xposed框架的步骤
  9. springboot集成flowable简单实例入门
  10. lisp画示坡线,cass绘制的陡坎示坡线在CAD中显示不出来是什么原因