开篇提示:kafka重复消费的根本原因就是“数据消费了,但是offset没更新”!而我们要探究一般什么情况下会导致offset没更新?

今天查看Elasticsearch索引的时候发现有一个索引莫名的多了20w+的数据,顿时心里一阵惊讶,然后赶紧打开订阅服务的日志(消费者),眼前的一幕让我惊呆了,我的消费服务的控制台一直在不断的刷着消费日志(刚开始我并没有意识到这是重复消费造成的),我还傻傻的以为是因为今天有人在刷单,所以导致日志狂刷,毕竟之前也遇到过有人用自动交易软件疯狂刷单的,所以当时也没在意;等过了几分钟,又去瞅了一眼控制台仍然在疯狂的刷着日志,妈呀!顿时隐隐感觉不对劲,赶紧看了一眼es索引,我滴天一下子多了几万的数据,突然在想是不是程序出问题了(因为头一天晚上发了一个版本),然后就开始死盯这日志看,发现了一个奇葩的问题:tmd怎么日志打印的数据都是重复的呀!这才恍然大悟,不用想了绝逼是kakfa重复消费了,好吧!能有什么办法了,开始疯狂的寻找解决的办法......

既然之前没有问题,那就是我昨天发版所导致的,那么我昨天究竟改了什么配置呢?对照了之前的版本比较了一下,发现这个参数enable-auto-commit被改成了true,即自动提交,理论上在数据并发不大,以及数据处理不耗时的情况下设置自动提交是没有什么问题的,但是我的情况恰恰相反,可能突然会并发很大(毕竟交易流水不好说的),所以可能在规定的时间(session.time.out默认30s)内没有消费完,就会可能导致re-blance重平衡,导致一部分offset自动提交失败,然后重平衡后重复消费(这种很常见);或者关闭kafka时,如果在close之前,调用consumer.unsubscribe()则可能有部分offset没提交,下次重启会重复消费

try {
consumer.unsubscribe();
} catch (Exception e) {
}

try {
consumer.close();
} catch (Exception e) {
}

所以一般情况下我们设置offset自动提交为false!

解决方法:

1.设置

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=latest 

2.就是修改offset为最新的偏移量呗!我们都知道offset是存在zookeeper中的,所以我就不赘述了!

我的解决方法:

我并没有去修改offset偏移量,毕竟生产环境还是不直接改这个了;

我重新指定了一个消费组(group.id=order_consumer_group),然后指定auto-offset-reset=latest这样我就只需要重启我的服务了,而不需要动kafka和zookeeper了!

#consumer
spring.kafka.consumer.group-id=order_consumer_group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=latest

注:如果你想要消费者从头开始消费某个topic的全量数据,可以重新指定一个全新的group.id=new_group,然后指定auto-offset-reset=earliest即可

补充:

在kafka0.9.0版本的时候,开始启用了新的consumer config,这个新的consumer config采用bootstrap.servers替代之前版本的zookeeper.connect,主要是要渐渐弱化zk的依赖,把zk依赖隐藏到broker背后。
group coordinator
使用bootstrap.servers替代之前版本的zookeeper.connect,相关的有如下两个改动:

1.在 Server 端增加了 GroupCoordinator 这个角色
2.将 topic 的 offset 信息由之前存储在 zookeeper(/consumers/<group.id>/offsets/<topic>/<partitionId>,zk写操作性能不高) 上改为存储到一个特殊的 topic 中(__consumer_offsets)

从0.8.2版本开始Kafka开始支持将consumer的位移信息保存在Kafka内部的topic中(从0.9.0版本开始默认将offset存储到系统topic中)
Coordinator一般指的是运行在broker上的group Coordinator,用于管理Consumer Group中各个成员,每个KafkaServer都有一个GroupCoordinator实例,管理多个消费者组,主要用于offset位移管理和Consumer Rebalance。
rebalance时机
在如下条件下,partition要在consumer中重新分配:

条件1:有新的consumer加入
条件2:旧的consumer挂了
条件3:coordinator挂了,集群选举出新的coordinator
条件4:topic的partition新加
条件5:consumer调用unsubscrible(),取消topic的订阅

__consumer_offsets
Consumer通过发送OffsetCommitRequest请求到指定broker(偏移量管理者)提交偏移量。这个请求中包含一系列分区以及在这些分区中的消费位置(偏移量)。偏移量管理者会追加键值(key-value)形式的消息到一个指定的topic(__consumer_offsets)。key是由consumerGroup-topic-partition组成的,而value是偏移量。

参考:https://segmentfault.com/a/1190000011441747

kafka重复消费问题相关推荐

  1. 什么?搞不定Kafka重复消费?

    来自:架构之美 前言 今天我们聊一个话题,这个话题大家可能在面试过程中,或者是工作当中经常遇到 ????如何保证 Kafka 消息不重复消费?我们在做开发的时候为了程序的健壮性,在使用 Kafka 的 ...

  2. kafka 重复消费和数据丢失_刨根问底,Kafka消息中间件到底会不会丢消息

    大型互联网公司一般都会要求消息传递最大限度的不丢失,比如用户服务给代金券服务发送一个消息,如果消息丢失会造成用户未收到应得的代金券,最终用户会投诉. 为避免上面类似情况的发生,除了做好补偿措施,更应该 ...

  3. 三张表有重复字段_什么?搞不定Kafka重复消费?

    点戳蓝字"架构之美"关注我们哦! 前言 今天我们聊一个话题,这个话题大家可能在面试过程中,或者是工作当中经常遇到 ?如何保证 Kafka 消息不重复消费?我们在做开发的时候为了程序 ...

  4. kafka 重复消费场景及解决方案

    1.与消费者有关的重要参数 在讨论重复消费之前,首先介绍一下kafka中几个跟消费有关的配置参数. enable.auto.commit 默认值true,表示消费者会周期性自动提交消费的offset ...

  5. 【消息队列】kafka是如何保证消息不被重复消费的

    一.kafka自带的消费机制 kafka有个offset的概念,当每个消息被写进去后,都有一个offset,代表他的序号,然后consumer消费该数据之后,隔一段时间,会把自己消费过的消息的offs ...

  6. kafka rebalance与数据重复消费问题

    问题和现象: 某个程序在消费kafka数据时,总是重复消费相关数据,仿佛在数据消费完毕之后,没有提交相应的偏移量.然而在程序中设置了自动提交:enable.auto.commit为true 检查日志, ...

  7. kafka一直rebalance故障,重复消费

    今天我司线上kafka消息代理出现错误日志,异常rebalance,而且平均间隔2到3分钟就会rebalance一次,分析日志发现比较严重.错误日志如下 08-09 11:01:11 131 pool ...

  8. MQ问题集(kafka主从同步与高可用,MQ重复消费、幂等)

    1.kafka主从同步与高可用 https://1028826685.iteye.com/blog/2354570 http://developer.51cto.com/art/201808/5815 ...

  9. kafka 脚本发送_Kafka笔记归纳(第五部分:一致性保证,消息重复消费场景及解决方式)...

    写在开头: 本章是Kafka学习归纳第五部分,着重于强调Kafka的事一致性保证,消息重复消费场景及解决方式,记录偏移量的主题,延时队列的知识点. 文章内容输出来源:拉勾教育大数据高薪训练营. 一致性 ...

最新文章

  1. 深度学习:神经网络,softmax + cross entropy,非tensorflow方式
  2. 单链表的C++实现(采用模板类)
  3. 分配的访问权限的展台应用:最佳做法
  4. 人工智能AI实战100讲(一)-机器人语义建图(上)
  5. 让无线网卡同时工作在 AP 和 STA 模式
  6. ssh 免密码登录---问题
  7. 在线图片水平/垂直均等切割工具
  8. Dev-C++配置问题
  9. XSS测试平台源码——免费分享
  10. 6.1行为型模式--模板方法模式
  11. Java设计模式总结
  12. Top 10 tips to prepare your Dynamics AX 2012 Go Live
  13. java int转byte_JAVA中怎么将int数据转换为byte数据?
  14. 远程桌面 vs 虚拟桌面 vs 虚拟机区别
  15. 联合阿里在职测开工程师耗时一个星期写的 【接口测试+自动化接口接口测试详解]
  16. 河南学业水平计算机,河南高中学业水平考试查询系统
  17. 谁在「连接」制造业?
  18. 今日恐慌与贪婪指数为23,恐慌程度有所下降
  19. 图形编程丨图形绘制基础imgui篇—D3D9 HOOK 创建内部Imgui窗口
  20. Bumping制程简介

热门文章

  1. 2020河南工业大学计算机考研科目,你知道2020年河南工业大学硕士研究生考研有哪些复试科目...
  2. 让线程等待10秒_把python程序变成多线程
  3. 阿里员工都在用的知识管理工具,究竟有何特别?
  4. 手动安装android的sdk
  5. leetcode 88 Merge Sorted Array
  6. BZOJ3448 : [Usaco2014 Feb]Auto-complete
  7. JBoss5 启动报错java.lang.IllegalArgumentException: ...
  8. Dev-DXperience12.2版的新产品介绍:DXTREME
  9. Dell XP版本在非Dell机子上的激活问题
  10. 扔掉伟哥!男性壮阳食品荟萃