需要解决:

当需要用同一个group_id去消费kafka的partition时,如果程序down掉,可能存在已经消费的数据尚未提交的可能,此时会造成重复消费的问题,且在重启这段时间会产生新的数据,重启这段时间的kafka消息不想再消费。

采用方案:

1、创建consumer时将offset设置为最新

import ("github.com/Shopify/sarama"cluster "github.com/bsm/sarama-cluster"
)config := cluster.NewConfig()
config.Version = sarama.V1_1_0_0
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true// init consumer
consumer, err := cluster.NewConsumer(brokerAddrs, groupID, topics, config)

结果:没有达到期望的效果。(使用sarama.OffsetNewest设置offset位置,得到的结果是从上一次消费完之后的位置开始,程序down掉中间产生的数据也被消费了。)

2、创建的consumer重置offset

import ("fmt""github.com/confluentinc/confluent-kafka-go/kafka""os""os/signal""syscall"
)func main() {broker := "kafka.in.netwa.cn:9092" group := "my_test"                 topics := []string{"MyTopic"}      sigchan := make(chan os.Signal, 1)signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)c, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers":               broker,"group.id":                        group,"session.timeout.ms":              30000,"go.events.channel.enable":        true,"go.application.rebalance.enable": true,"default.topic.config":            kafka.ConfigMap{"auto.offset.reset": "latest"}})if err != nil {fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)os.Exit(1)}fmt.Printf("Created Consumer %v\n", c)err = c.SubscribeTopics(topics, nil)if err != nil {fmt.Fprintf(os.Stderr, "设置topic失败: %s\n", err)os.Exit(1)}low, high, err := c.QueryWatermarkOffsets("ImTopic", 0, -1)if err != nil {fmt.Fprintf(os.Stderr, "查询偏移失败: %s\n", err)os.Exit(1)}fmt.Printf("%d---%d: %+v\n", low, high, err)run := truefor run == true {select {case sig := <-sigchan:fmt.Printf("Caught signal %v: terminating\n", sig)run = falsecase ev := <-c.Events():switch e := ev.(type) {case kafka.AssignedPartitions:fmt.Fprintf(os.Stderr, "%% %v\n", e)for i, _ := range e.Partitions {if *e.Partitions[i].Topic == "MyTopic" {e.Partitions[i].Offset = kafka.Offset(high)}}fmt.Fprintf(os.Stderr, "%% %v\n", e)c.Assign(e.Partitions)case kafka.RevokedPartitions:fmt.Fprintf(os.Stderr, "%% %v\n", e)c.Unassign()case *kafka.Message:fmt.Printf("%% Message on %s:\n%s\n",e.TopicPartition, string(e.Value))case kafka.PartitionEOF:fmt.Printf("%% Reached %v\n", e)case kafka.Error:fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)run = false}}}fmt.Printf("Closing consumer\n")c.Close()
}

结论:消费到了kafka最新的消息。(事件kafka.AssignedPartitions发生时,将partition的偏移设置为kafka消息的最新偏移e.Partitions[i].Offset = kafka.Offset(high)。)

————————————————
版权声明:本文为CSDN博主「持成」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/u011677067/article/details/81026314

golang 将kafka的offset置为最新相关推荐

  1. kafka的offset笔记

    版本 這個看起來有點多此一舉, 我一開始也是這麼想的. 後來經過測試發現,新版本的kafka已經不再兼容老版本的kafka中的命令了,所以本篇記錄是爲了針對新版本的kafka的相關操作的. 組件 版本 ...

  2. Kafka auto.offset.reset

    要从头消费kafka的数据,可以通过以下参数: Kafka auto.offset.reset = earliest 转载于:https://www.cnblogs.com/drjava/p/1045 ...

  3. flink 写kafka_flink消费kafka的offset与checkpoint

    生产环境有个作业,逻辑很简单,读取kafka的数据,然后使用hive catalog,实时写入hbase,hive,redis.使用的flink版本为1.11.1. 为了防止写入hive的文件数量过多 ...

  4. 【kafka】kafka consumer offset lag获取的三者方式

    1.概述 本文主要讨论一下kafka consumer offset lag的监控 方案 利用官方的类库 ConsumerOffsetChecker ConsumerGroupCommand 利用官方 ...

  5. 【Kafka】kafka Current offset xxx for partition xxx out range

    文章目录 1.背景 1.背景 kafka报错 kafka Current offset xxx for partition xxx out range 该问题和以下2个问题有所关系 [Kafka]ka ...

  6. 一次kafka的offset回退事件及相关知识点

    一次kafka的offset回退事件及相关知识点 原文链接:https://blog.csdn.net/lkforce/article/details/83384747

  7. SpringBoot 重置 kafka 偏移量offset(kafka-0.10.1.0)

    如果使用的自动提交偏移量的模式,偏移量会给到kafka或者zk进行管理,其中kafka的偏移量重置给了重新消费kafka内未过期的数据提供了机会,当消费者出错,比如消费了数据,但是中途处理失败,导致数 ...

  8. 聊聊kafka consumer offset lag increase异常

    序 本文主要解析一下遇到的一个kafka consumer offset lag不断增大的异常. 查看consumer消费情况 Group Topic Pid Offset logSize Lag O ...

  9. java 获取kafka lag,聊聊kafka consumer offset lag的监控

    序 本文主要讨论一下kafka consumer offset lag的监控 方案 利用官方的类库 ConsumerOffsetChecker ConsumerGroupCommand 利用官方的JM ...

最新文章

  1. 目前常用的服务器端网络操作系统有,目前常用的服务器端网络操作系统是()。...
  2. 【转】css行高line-height的一些深入理解及应用
  3. 5G NGC — 关键技术 — 网络切片 — 概述
  4. 奥鹏数据库应用系统设计下列关于php_[南开大学(本部)]《数据库应用系统设计》20春期末考核(参考答案)...
  5. Jmeter模拟不同带宽进行测试
  6. 【转】RocketMQ的一些特性(生产者消费者配置参数的含义)
  7. HTML5网站大观:10个精美的复古风格 HTML5 网站作品
  8. Android的滑动分析
  9. java intent 传递集合对象_Android系列之Intent传递对象的几种实例方法
  10. HDU 12O3 I NEED A OFFER!
  11. TypeError: Cannot set property ‘styles‘ of undefined
  12. 客户旅程分析 Customer Journey Mapping
  13. 微信小程序实现五星评分效果
  14. 肠道菌群与睡眠的双向桥接
  15. 官方通报:kissreiko博文因涉嫌诈骗广告 将永久封号
  16. 【转】Web实现前后端分离,前后端解耦
  17. R语言绘图中图片的组合(cowplot、patchwork宏包、layout、par()、gridExtra)
  18. 【转】键盘灯亮无反映解决方法
  19. IJCAI-18 阿里妈妈广告转化预测
  20. Ai形状模式与路径查找器

热门文章

  1. 《敏捷革命》读书笔记
  2. 开源资产扫描系统-ARL资产灯塔系统
  3. Ubuntu下使用NTP同步对时
  4. 这些操作技巧能够让你的公众号迅速增粉
  5. 秀米svg点击显示另一张图_这个svg也太好玩了吧,居然可以自动展开全文!
  6. 电脑c盘分区太小如何可以扩大,电脑c盘不够用了,如何给电脑分区
  7. iOS各版本发布时间和特点
  8. Android Rooting for Programmers
  9. Java正则表达式简单入门
  10. 比迅雷好用,下载速度快5倍的下载软件IDM(Internet Download Manager)