如果使用的自动提交偏移量的模式,偏移量会给到kafka或者zk进行管理,其中kafka的偏移量重置给了重新消费kafka内未过期的数据提供了机会,当消费者出错,比如消费了数据,但是中途处理失败,导致数据丢失,这时候重置偏移量就是一剂后悔药,让消费者能够重新来过,当然后悔药也是有保质期的,还得取决于数据的保留策略。

这里讨论一下kafka_2.11.0.10.1.0版本重置偏移量的方案

该版本kafka不像其他版本一样,通过执行一句方便的命令就可以重置到指定的偏移量,本文给出了一种通过Java代码来重置偏移量的方式,若有不足,还望指教。

先来看一下如下代码

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;import java.util.HashMap;
import java.util.Properties;public class ResetKafkaOffset {public ResetKafkaOffset() {}public static void main(String[] args) {try {HashMap<String, String> params = new HashMap();// 解析参数for (String param : args) {String[] split = param.split("=");if (split.length == 2) {System.out.println(split[0] + "=" + split[1]);params.put(split[0], split[1]);}}String topic = params.remove("topic");//参数校验 topic、消费组、分区、sever地址没有设置则提示并退出if (null == topic) {System.out.println("please set topic==XXX");System.exit(-1);}String groupId = params.remove("group.id");if (null == groupId) {System.out.println("please set group.id==XXX");System.exit(-1);}String server = params.remove("server");if (null == server) {System.out.println("please set server==XXX");System.exit(-1);}if (params.size() == 0) {System.out.println("please set at lease one partition_x==XXX");System.exit(-1);}// kafka设置Properties properties = new Properties();properties.put("bootstrap.servers", server);properties.put("group.id", groupId);properties.put("enable.auto.commit", "true");properties.put("auto.commit.interval.ms", "6000");properties.put("session.timeout.ms", "10000");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("auto.offset.reset", "earliest");KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);HashMap offsets = new HashMap();// 设置每个分区的偏移量for (String key : params.keySet()) {String[] partitions = key.split("_");TopicPartition topicPartition = new TopicPartition(topic, Integer.parseInt(partitions[1]));OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(Long.parseLong(params.get(key)), "manual");offsets.put(topicPartition, offsetAndMetadata);}// 提交kafkaConsumer.commitSync(offsets);System.out.println("ok");} catch (Exception e) {e.printStackTrace();}}
}

我们把kafka的连接以及我们需要重置的偏移量参数以等号“=”分割的方式传入程序中,然后解析,校验,kafka连接设置,分区偏移量设置,最后提交达到偏移量重置的目的。

参数设置如:

server=127.0.0.1:9092 group.id=console-consumer-48585 topic=testTopic partition_0=1 partition_1=1

server:kafka连接地址

group.id:消费组

topic:kafka的topic

partition_a = b:a表示几号分区数,b表示要重置到指定的偏移量,既要把分区a重置到b位置

现在console-consumer-48585消费组信息如下,两个分区的数据都已经完全消费了

现在停掉console-consumer-48585的消费者进程,把分区0的偏移量指到59,分区2的偏移量指到58,并运行程序

参数

server=127.0.0.1:9092 group.id=console-consumer-48585 topic=testTopic partition_0=59 partition_1=58

程序正常运行

这时候启动我们的消费者进程,会看到console-consumer-48585组下的消费者会重新消费分区1的后一个数和分区2的后一个数,表示重置成功。

sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --consumer.config ../config/consumer.properties

当然生产、灰度环境等遇到需要像这样来重置偏移量的时候,在本地肯定是连不上生产、灰度的地址,所以需要把改代码编译成class文件,然后把该class文件放到kafka的bin目录下(如果是集群放到任意一个下即可),停掉消费应用,然后执行如下的脚本命令(按需要替换掉分区、偏移量、地址、分组)

dir=../libs
for file in "$dir"/*.jar
dopath="$path":"$file"
done
java -cp $path ResetKafkaOffset server=127.0.0.1:9092 group.id=console-consumer-48585 topic=testTopic partition_0=59 partition_1=58

到此 SpringBoot 重置 kafka 偏移量offset(kafka-0.10.1.0)介绍完成。

SpringBoot 重置 kafka 偏移量offset(kafka-0.10.1.0)相关推荐

  1. oracle %date 0 10%,“date:~0,10%“是什么意思?

    基础知识,基础知识太差了.查了半天,先补补课吧. "date:~0,10%是DOS里取日期的命令. echo :~1,5%      //指针向右→偏移1位,然后从指针处开始向右→提取5个字 ...

  2. Kafka偏移量(Offset)管理

    1.定义 Kafka中的每个partition都由一系列有序的.不可变的消息组成,这些消息被连续的追加到partition中.partition中的每个消息都有一个连续的序号,用于partition唯 ...

  3. mysql安装11.0.10,MySQL8.0.11版本的安装 win10的

    MySQL8.0.11版本的安装 不知道为什么之前的Navicat for MySQL的密码忘记了,导致这个软件根本用不了,在经过一个多小时的折磨之下(在网上尝试了很多种方法去找回密码或者修改密码,但 ...

  4. 最简单的kafka接入方式(kafka配置),kafka整合Spring

    文章目录 一.前言. 二.主要流程. 三.各个细节,步骤 一.前言. 本文主要介绍了Springboot项目整合kafka的最简单的方式. 二.主要流程. 1.引入Maven 2.增加消费者和生产者配 ...

  5. kafka java_Java操作Kafka

    java操作kafka非常的简单,然后kafka也提供了很多缺省值,一般情况下我们不需要修改太多的参数就能使用.下面我贴出代码. pom.xml org.apache.kafka kafka-clie ...

  6. Angular rxjs源代码分析:range(0, 10)的实现

    源代码: const source$ = range(0, 10); range(0,10)返回一个新的Observable,但是不会立即执行,直到遇到subscribe调用为止: 下图高亮的这段代码 ...

  7. 2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用

    目录 整合Kafka 0-10-开发使用 原理 1.Direct方式 2.简单的并行度1 : 1 ​​​​​​​API 注意 ​​​​​​​代码实现-自动提交偏移量到默认主题 ​​​​​​​代码实现- ...

  8. kafka中文文档(0.10.0)

    kafka中文文档(0.10.0) 作者:链上研发-老杨叔叔 时间:2016-07-22 版本:Apache Kafka 0.10.0 (2016年5月底发布) .目录 kafka中文文档0100 目 ...

  9. SparkStreaming整合Kafka(Offset保存在zookeeper上,Spark2.X + kafka0.10.X)

    先来一段到处都有的原理(出处到处都有,就不注明了) Streaming和Kafka整合有两种方式--Receiver和Direct,简单理解为:Receiver方式是通过zookeeper来连接kaf ...

最新文章

  1. 自贡市职称计算机考试,四川省自贡市2012年职称计算机考试时间
  2. amazon redshift 分析型数据库特点——本质还是列存储
  3. Flask Vue.js全栈开发
  4. 数据分析之道:抽丝剥茧,存乎一心(中)
  5. Hadoop分布式文件系统--HDFS结构分析
  6. 理解Go语言中的方法和接收者
  7. node:jwt、拦截器-学习笔记
  8. .netcore项目docker化,以及docker之间通信
  9. 贝壳如何docker安装openwrt_OpenWrt 中 使用 Docker 安装可道云出错。
  10. dtc mysql_DTCC归来-高可用可扩展数据库架构探讨
  11. YUV 后面数字的含义_奔富红酒“Bin”后的数字,是什么意思?
  12. 尔雅 2017大学计算机基础答案,2018超星尔雅大学计算机基础答案
  13. java excel 设置列为日期,POI设置Excel单元格格式 (数值,日期,文本等等)
  14. Scheme 协议收集总结
  15. Spring的Bean意义
  16. 网络游戏软件销售渠道模式举例及分析
  17. win10系统可禁用的服务器,window10哪些服务可以禁止
  18. 世界第一台通用计算机:ENIAC
  19. 任务3、监控界面设计
  20. 淘宝网2条新开发者规则,堪称黑虎掏心直接秒杀90%以上,个人淘宝客开发者

热门文章

  1. 深度学习与自然语言处理教程(8) - NLP中的卷积神经网络(NLP通关指南·完结)
  2. 递归概述与递归能解决的问题和规则 [数据结构][Java]
  3. YGG 在 Branch.gg 的免费游戏 Castaways 中购买了 Genesis NFT
  4. 企业招投标采购管理系统源码 一站式全流程采购招标系统
  5. MySQL:数据完整性
  6. 每日一个小技巧:1分钟告诉你截图翻译软件哪个好用
  7. 安卓8.1系统SDK去掉系统设置中的自动调节亮度
  8. QQ Account
  9. oracle index alter,Oracle alter index rebuild 一系列问题
  10. python使用什么来区分代码块_Python 小数据池、代码块以及代码块缓存机制