golang 将kafka的offset置为最新
需要解决:
当需要用同一个group_id去消费kafka的partition时,如果程序down掉,可能存在已经消费的数据尚未提交的可能,此时会造成重复消费的问题,且在重启这段时间会产生新的数据,重启这段时间的kafka消息不想再消费。
采用方案:
1、创建consumer时将offset设置为最新
import ("github.com/Shopify/sarama"cluster "github.com/bsm/sarama-cluster"
)config := cluster.NewConfig()
config.Version = sarama.V1_1_0_0
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true// init consumer
consumer, err := cluster.NewConsumer(brokerAddrs, groupID, topics, config)
结果:没有达到期望的效果。(使用sarama.OffsetNewest设置offset位置,得到的结果是从上一次消费完之后的位置开始,程序down掉中间产生的数据也被消费了。)
2、创建的consumer重置offset
import ("fmt""github.com/confluentinc/confluent-kafka-go/kafka""os""os/signal""syscall"
)func main() {broker := "kafka.in.netwa.cn:9092" group := "my_test" topics := []string{"MyTopic"} sigchan := make(chan os.Signal, 1)signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)c, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": broker,"group.id": group,"session.timeout.ms": 30000,"go.events.channel.enable": true,"go.application.rebalance.enable": true,"default.topic.config": kafka.ConfigMap{"auto.offset.reset": "latest"}})if err != nil {fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)os.Exit(1)}fmt.Printf("Created Consumer %v\n", c)err = c.SubscribeTopics(topics, nil)if err != nil {fmt.Fprintf(os.Stderr, "设置topic失败: %s\n", err)os.Exit(1)}low, high, err := c.QueryWatermarkOffsets("ImTopic", 0, -1)if err != nil {fmt.Fprintf(os.Stderr, "查询偏移失败: %s\n", err)os.Exit(1)}fmt.Printf("%d---%d: %+v\n", low, high, err)run := truefor run == true {select {case sig := <-sigchan:fmt.Printf("Caught signal %v: terminating\n", sig)run = falsecase ev := <-c.Events():switch e := ev.(type) {case kafka.AssignedPartitions:fmt.Fprintf(os.Stderr, "%% %v\n", e)for i, _ := range e.Partitions {if *e.Partitions[i].Topic == "MyTopic" {e.Partitions[i].Offset = kafka.Offset(high)}}fmt.Fprintf(os.Stderr, "%% %v\n", e)c.Assign(e.Partitions)case kafka.RevokedPartitions:fmt.Fprintf(os.Stderr, "%% %v\n", e)c.Unassign()case *kafka.Message:fmt.Printf("%% Message on %s:\n%s\n",e.TopicPartition, string(e.Value))case kafka.PartitionEOF:fmt.Printf("%% Reached %v\n", e)case kafka.Error:fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)run = false}}}fmt.Printf("Closing consumer\n")c.Close()
}
结论:消费到了kafka最新的消息。(事件kafka.AssignedPartitions发生时,将partition的偏移设置为kafka消息的最新偏移e.Partitions[i].Offset = kafka.Offset(high)。)
————————————————
版权声明:本文为CSDN博主「持成」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/u011677067/article/details/81026314
golang 将kafka的offset置为最新相关推荐
- kafka的offset笔记
版本 這個看起來有點多此一舉, 我一開始也是這麼想的. 後來經過測試發現,新版本的kafka已經不再兼容老版本的kafka中的命令了,所以本篇記錄是爲了針對新版本的kafka的相關操作的. 組件 版本 ...
- Kafka auto.offset.reset
要从头消费kafka的数据,可以通过以下参数: Kafka auto.offset.reset = earliest 转载于:https://www.cnblogs.com/drjava/p/1045 ...
- flink 写kafka_flink消费kafka的offset与checkpoint
生产环境有个作业,逻辑很简单,读取kafka的数据,然后使用hive catalog,实时写入hbase,hive,redis.使用的flink版本为1.11.1. 为了防止写入hive的文件数量过多 ...
- 【kafka】kafka consumer offset lag获取的三者方式
1.概述 本文主要讨论一下kafka consumer offset lag的监控 方案 利用官方的类库 ConsumerOffsetChecker ConsumerGroupCommand 利用官方 ...
- 【Kafka】kafka Current offset xxx for partition xxx out range
文章目录 1.背景 1.背景 kafka报错 kafka Current offset xxx for partition xxx out range 该问题和以下2个问题有所关系 [Kafka]ka ...
- 一次kafka的offset回退事件及相关知识点
一次kafka的offset回退事件及相关知识点 原文链接:https://blog.csdn.net/lkforce/article/details/83384747
- SpringBoot 重置 kafka 偏移量offset(kafka-0.10.1.0)
如果使用的自动提交偏移量的模式,偏移量会给到kafka或者zk进行管理,其中kafka的偏移量重置给了重新消费kafka内未过期的数据提供了机会,当消费者出错,比如消费了数据,但是中途处理失败,导致数 ...
- 聊聊kafka consumer offset lag increase异常
序 本文主要解析一下遇到的一个kafka consumer offset lag不断增大的异常. 查看consumer消费情况 Group Topic Pid Offset logSize Lag O ...
- java 获取kafka lag,聊聊kafka consumer offset lag的监控
序 本文主要讨论一下kafka consumer offset lag的监控 方案 利用官方的类库 ConsumerOffsetChecker ConsumerGroupCommand 利用官方的JM ...
最新文章
- 目前常用的服务器端网络操作系统有,目前常用的服务器端网络操作系统是()。...
- 【转】css行高line-height的一些深入理解及应用
- 5G NGC — 关键技术 — 网络切片 — 概述
- 奥鹏数据库应用系统设计下列关于php_[南开大学(本部)]《数据库应用系统设计》20春期末考核(参考答案)...
- Jmeter模拟不同带宽进行测试
- 【转】RocketMQ的一些特性(生产者消费者配置参数的含义)
- HTML5网站大观:10个精美的复古风格 HTML5 网站作品
- Android的滑动分析
- java intent 传递集合对象_Android系列之Intent传递对象的几种实例方法
- HDU 12O3 I NEED A OFFER!
- TypeError: Cannot set property ‘styles‘ of undefined
- 客户旅程分析 Customer Journey Mapping
- 微信小程序实现五星评分效果
- 肠道菌群与睡眠的双向桥接
- 官方通报:kissreiko博文因涉嫌诈骗广告 将永久封号
- 【转】Web实现前后端分离,前后端解耦
- R语言绘图中图片的组合(cowplot、patchwork宏包、layout、par()、gridExtra)
- 【转】键盘灯亮无反映解决方法
- IJCAI-18 阿里妈妈广告转化预测
- Ai形状模式与路径查找器