前言

kafka提供了消费组命令工具管理消费组:kafka-consumer-groups.sh,在0.11版本之后引入位移重置功能,重置策略如下(引用自官方文档):

--reset-offsets also has following scenarios to choose from (at least one scenario must be selected):

  • --to-datetime <String: datetime> : Reset offsets to offsets from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'
  • --to-earliest : Reset offsets to earliest offset.
  • --to-latest : Reset offsets to latest offset.
  • --shift-by <Long: number-of-offsets> : Reset offsets shifting current offset by 'n', where 'n' can be positive or negative.
  • --from-file : Reset offsets to values defined in CSV file.
  • --to-current : Resets offsets to current offset.
  • --by-duration <String: duration> : Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'
  • --to-offset : Reset offsets to a specific offset.

就是可以根据时间重置、重置到最小位移、最大位移...等场景。

本文主要聊一下根据时间重置消费位点时候,这个时间格式的问题。

根据时间重置消费位移

示例命令:

sh bin/kafka-consumer-groups.sh \--bootstrap-server 'localhost:9092' --group test_topic_consumer  \--topic test_topic \--reset-offsets --to-datetime 2021-11-29T12:00:00.000 \--execute  

为了看起来直观点,我加了"\"换行展示。

示例命令重置消费组为test_topic_consumer订阅test_topic的消费偏移为2021年11月29号中午12点整的时候消息位点。

实际上,结果不一定是这样,和时区有关。

协调世界时(UTC)

关于utc,查看维基百科

kafka脚本采用的是utc时间标准,与北京时间换算如下:

点击查看:图片来源

北京为东8时区,采用的是utc+08:00,这一段可以看维基百科,下面我复制出一部分,可以了解一下:

UTC+08:00是比世界协调时间快8小时的时区,理论上的位置在东经112度30分127度30分之间,是东盟标准时间的候选时区之一,居住在本时区的人数约有17亿人,占全世界人口的24%,是全世界人口最多的时区。

该时区亦为包括台湾、新加坡、马来西亚、中国、文莱、印尼中部及澳大利亚西部在内的绝大多数汉语使用者所居住的时区。所以互联网上的不少中文网站会使用该时区标记时间,而不论该网站所在地的官方时区为何。

所以,如果你现在使用的是北京时间,如果按示例重置位点,则实际上不是重置到12:00,而且是重置到了20:00的消息位点。

北京时间重置写法

如果是北京时间,则命令应该如下:

sh bin/kafka-consumer-groups.sh \--bootstrap-server 'localhost:9092' --group test_topic_consumer  \--topic test_topic \--reset-offsets --to-datetime 2021-11-29T12:00:00.000+08:00 \--execute  

即时间格式为:2021-11-29T12:00:00.000+08:00 ,表示重置到11月29号的12点。当然这里还有其它写法,下面是源码注释中支持的写法:

(1) yyyy-MM-dd'T'HH:mm:ss.SSS, ex: 2020-11-10T16:51:38.198
(2) yyyy-MM-dd'T'HH:mm:ss.SSSZ, ex: 2020-11-10T16:51:38.198+0800
(3) yyyy-MM-dd'T'HH:mm:ss.SSSX, ex: 2020-11-10T16:51:38.198+08
(4) yyyy-MM-dd'T'HH:mm:ss.SSSXX, ex: 2020-11-10T16:51:38.198+0800
(5) yyyy-MM-dd'T'HH:mm:ss.SSSXXX, ex: 2020-11-10T16:51:38.198+08:00

重置流程

kafka根据时间重置消费位点这一块逻辑也是相当简单:

  1. 获取指定topic的分区(也可以是所有topic)
  2. 将时间转换为对应的时间戳,此时转换的时候就是上面提到的时区问题
  3. 根据时间戳获取对应的消息位点
  4. 修改消费位点为对应的消息位点

下面这段代码是根据时间戳查询位点的逻辑:

    private def getLogTimestampOffsets(groupId: String, topicPartitions: Seq[TopicPartition], timestamp: java.lang.Long): Map[TopicPartition, LogOffsetResult] = {val timestampOffsets = topicPartitions.map { topicPartition =>// 指定根据时间戳类型查询位点,除此之外还有最小和最大日志位点等topicPartition -> OffsetSpec.forTimestamp(timestamp)}.toMap// 查询消息位点val offsets = adminClient.listOffsets(timestampOffsets.asJava,withTimeoutMs(new ListOffsetsOptions)).all.get//如果时间戳超过当前最新的消息时间了,就是查不到了,就是未知,下面会把未知这种转换为最新的消费位点val (successfulOffsetsForTimes, unsuccessfulOffsetsForTimes) =offsets.asScala.partition(_._2.offset != ListOffsetsResponse.UNKNOWN_OFFSET)val successfulLogTimestampOffsets = successfulOffsetsForTimes.map {case (topicPartition, listOffsetsResultInfo) => topicPartition -> LogOffsetResult.LogOffset(listOffsetsResultInfo.offset)}.toMapunsuccessfulOffsetsForTimes.foreach { entry =>println(s"\nWarn: Partition " + entry._1.partition() + " from topic " + entry._1.topic() +" is empty. Falling back to latest known offset.")}// 将查询到的和未知这种转换为最新的日志位点一起返回,准备重置successfulLogTimestampOffsets ++ getLogEndOffsets(groupId, unsuccessfulOffsetsForTimes.keySet.toSeq)}

可视化重置

如果觉得重置命令太麻烦,推荐一款可视化控制台:kafka-console-ui,github地址:https://github.com/xxd763795151/kafka-console-ui

新手用这个学习还是比较友好的:

Kafka使用脚本根据时间重置消费位移,格式你写对了么?相关推荐

  1. kafka linux 脚本测试,kafka shell命令操作

    1. 查看topic 选项说明: - --list :查看kafka所有的topic - --bootstrap-server : 连接kafka集群 - --hadoop102:9092:hadoo ...

  2. python向kafka发送json数据_python3实现从kafka获取数据,并解析为json格式,写入到mysql中...

    项目需求:将kafka解析来的日志获取到数据库的变更记录,按照订单的级别和订单明细级别写入数据库,一条订单的所有信息包括各种维度信息均保存在一条json中,写入mysql5.7中. 配置信息: [Gl ...

  3. 【Kafka】(四)Kafka使用 Consumer 接收消息消费

    Consumer概要 consumer中的关键术语: 消费者(consumer):从kafka中拉取数据并进行处理 消费者组(consumer group):一个消费者组由一个或者多个consumer ...

  4. kafka消息反复从头开始消费问题排查

    问题描述   最近线上的一个数据服务(服务B)出现了一个比较诡异的问题 ,该服务消费上游服务(服务A)产生的kafka消息数据,上线后一直运行平稳,最近一周在两次上线的时候出现了大量数据更新的情况,查 ...

  5. kafka通过脚本一次启动集群

    kafka 群起脚本kafka.sh #!/bin/bashcase $1 in "start"){for i in backup01 backup02 backup03do ec ...

  6. Kafka会不会重复消费

    本文来说下kafka会不会重复消费的问题.在单体架构时代,就存在着接口幂等性的问题,只不过到了分布式.高并发的场景之后,接口幂等性的问题会更加明显. 文章目录 概述 消息重复消费问题 解决方案 方案一 ...

  7. graylog+kafka+zookeeper(单机测试及源码),graylog收集kafka(脚本创建发布订阅方式)存储的消息(四)

    graylog+kafka+zookeeper(单机测试及源码),graylog收集kafka(脚本创建发布订阅方式)存储的消息(四) 问题背景 graylog+kafka+zookeeper(单机测 ...

  8. Kafka实现消息生产和消费

    文章目录 一.Kafka测试消息生产与消费 二.Java程序进行Kafka收发消息 1.消息生产者 2.消息消费者 一.Kafka测试消息生产与消费 # 首先创建一个主题 [root@192 kafk ...

  9. 【画框脚本】YOLO和COCO格式画框

    [画框脚本]YOLO和COCO格式画框 1. yolo格式画框 2. COCO格式画框 1. yolo格式画框 import cv2 import os import colorsys import ...

最新文章

  1. 自动驾驶系统关系与自动泊车原理
  2. python输入什么就输出什么意思_一文读懂Python的输入和输出
  3. 《漫画算法2》源码整理-6 两数之和 三数之和
  4. 实时SLAM的未来及与深度学习的比较The Future of Real-Time SLAM and “Deep Learning vs SLAM”
  5. Metadata Service 最高频的应用 - 每天5分钟玩转 OpenStack(164)
  6. data layui table 排序_具有排序、筛选、分组、虚拟化、编辑功能的React表格组件...
  7. 修改本地的hosts文件配置域名
  8. 常见条形码的用法和格式
  9. 机器学习原来这么有趣!第三章:图像识别【鸟or飞机】?深度学习与卷积神经网络
  10. 金融反欺诈 常用特征处理方法
  11. vue3中的beforeEach里面的next函数---刷新页面导致白页
  12. 世界上最神奇的网站收录--不是最无聊就是最有意思
  13. 通过Python爬虫技术获取小说信息
  14. Hung-yi Li Machine Learning 2019 Task1
  15. A. 拼音魔法 大学生程序设计邀请赛(华东师范大学)
  16. (转)2018最新Web前端经典面试试题及答案
  17. Ailurus 小熊猫
  18. 如何实现外网访问内网ip?公网端口映射或内网映射来解决
  19. 使用NeRF进行3D体素渲染
  20. 前锋html5费用,足坛转会费最高的5位前锋,1.05亿欧元仅第5,第1比C罗还高1.22亿...

热门文章

  1. 取序列 oracle,Oracle 创建序列 获取 序列号
  2. Oracle 创建序列及查询序列是否存在
  3. 解决因缺少msvcp71.dll无法启动Windows程序
  4. Bootstrap 内联标签和徽章,lable标签
  5. Spring注册Bean(提供Bean)系列--方法大全
  6. mysql无法导入数据怎么办_Mysql 导入数据库方法 及失败解决
  7. CentOS 防火墙配置(firewall)
  8. iostat 命令详解
  9. 这份商业策划模板让你写方案像煎鸡蛋一样简单
  10. zGYS变频电源中的调速电机有哪些