kafka的offset笔记
版本
這個看起來有點多此一舉,
我一開始也是這麼想的。
後來經過測試發現,新版本的kafka已經不再兼容老版本的kafka中的命令了,所以本篇記錄是爲了針對新版本的kafka的相關操作的。
組件 | 版本 |
Zookeeper | 3.6.0 |
Kafka | 2.5.0 |
概念
①生产者Offset
生产者写入topic的各个partition时,有多少个partition就有多少个offset
②消费者Offset
这是某一个分区的offset情况,我们已经知道生产者写入的offset是最新最大的值也就是12,
而当Consumer A进行消费时,他从0开始消费,一直消费到了9,他的offset就记录在了9,
Consumer B就纪录在了11。
等下一次他们再来消费时,他们可以选择接着上一次的位置消费,当然也可以选择从头消费,或者跳到最近的记录并从“现在”开始消费。
此时,每个partition有多少消费组,那就有多少个offset
消费者组
费者组的概念其实并不影响对offset的理解,上面的情况Consumer A,Consumer B如果是同组就不能同时消费一个分区的消息,不同组的消费者可以同时消费一个分区的消息。
还有一种offset的说法,就是consumer消费未提交时,本地是有另外一个offset的,这个offset不一定与集群中记录的offset一致。
所以,kafka每一个topic分区和生产者,消费者不同,是有多个offset的。
概念总结如下:
offset是指某一个分区的偏移量。
topic partition offset 这三个唯一确定一条消息。
生产者的offset其实就是最新的offset。
消费者的offset是他自己维护的,他可以选择分区最开始,最新,也可以记住他消费到哪了。
消费者组是为了不同组的消费者可以同时消费一个分区的消息。
關於Group這個概念
首先注意,producer沒有group的說法,
kafka提到group一定是consumer這邊。
Group-id位置
具體文件 | 變量 |
$KAFKA/config/consumer.properties | group.id |
$KAFKA/config/connect-distributed.properties | group.id |
如果在新建topic的時候,不特別指定,那麼默認使用的是consumer.properties裏面的group.id
__consumer_offsets的哪個partition保存了consumer group的位移信息
查看$KAFKA/config/consumer.properties
得到group.id是test-consumer-group,
填入下面的代碼並運行
public class kafka_hash {public static void main(String args[]){System.out.println(Math.abs("test-consumer-group".hashCode()) % 50);}}
實驗結果爲31
也就是說__consumer_offsets的partition 31保存了consumer group的位移信息
記住31這個數字,後面會用到
操作
操作 |
具體命令 |
创建topic | $KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --create --topic mytopic --replication-factor 3 --partitions 3 |
查看topic列表 | $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181 |
生產數據 | $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic mytopic |
存放在各個partition的offset終點 | $KAFKA/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list Desktop:9091,Laptop:9092,Laptop:9093 --topic mytopic --time -1 |
存放在各個partition的offset起點 |
$KAFKA/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list Desktop:9091,Laptop:9092,Laptop:9093 --topic mytopic --time -2 |
查询__consumer_offsets topic所有内容 |
$KAFKA/bin/kafka-console-consumer.sh --consumer.config $KAFKA/config/consumer.properties \ --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \ --bootstrap-server Desktop:9091 --topic __consumer_offsets --from-beginning |
查詢__consumer_offsets的partition 31包含的關於mytopic的offset信息 |
|
上面的最後一句命令會得到:
[test-consumer-group,mytopic,1]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[0], metadata=, commitTimestamp=1611494686173, expireTimestamp=None)
[test-consumer-group,mytopic,0]::OffsetAndMetadata(offset=4, leaderEpoch=Optional[0], metadata=, commitTimestamp=1611494686173, expireTimestamp=None)
[test-consumer-group,mytopic,2]::OffsetAndMetadata(offset=4, leaderEpoch=Optional[0], metadata=, commitTimestamp=1611494691174, expireTimestamp=None)
[test-consumer-group,mytopic,1]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[0], metadata=, commitTimestamp=1611494691174, expireTimestamp=None)
上述表格中produce端需要的数据如下:
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "561558", "item_id":"3611281", "category_id": "965809", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "894923", "item_id":"3076029", "category_id": "1879194", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "834377", "item_id":"4541270", "category_id": "3738615", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "315321", "item_id":"942195", "category_id": "4339722", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "625915", "item_id":"1162383", "category_id": "570735", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "578814", "item_id":"176722", "category_id": "982926", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "873335", "item_id":"1256540", "category_id": "1451783", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "429984", "item_id":"4625350", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "866796", "item_id":"534083", "category_id": "4203730", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "937166", "item_id":"321683", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
其他
另外,根據[5],在同一個consumer group中,對於同一個topic而言,只能有一個consumer消費到其中特定的一條數據。
[6]中提到了Coordinator
Reference:
[1]Kafka到底有几个Offset?——Kafka核心之偏移量机制
[2]自己维护kafka_offset中的坑
[3]kafka查询最新producer offset的命令
[4]Kafka 如何读取offset topic内容 (__consumer_offsets)
[5]多个consumer使用同一个group.id消费同一个topic
[6]Kafka消费组(consumer group)
kafka的offset笔记相关推荐
- Apache Kafka实战读书笔记(推荐指数:☆☆☆☆☆)
Apache Kafka实战读书笔记(推荐指数:☆☆☆☆☆) 认识AK 快速入门 安装和启动 小案例 消息引擎系统 消息引擎范型 AK的概要设计 吞吐量/延时 消息持久化 负载均衡和故障转移: 伸缩性 ...
- Kafka auto.offset.reset
要从头消费kafka的数据,可以通过以下参数: Kafka auto.offset.reset = earliest 转载于:https://www.cnblogs.com/drjava/p/1045 ...
- flink 写kafka_flink消费kafka的offset与checkpoint
生产环境有个作业,逻辑很简单,读取kafka的数据,然后使用hive catalog,实时写入hbase,hive,redis.使用的flink版本为1.11.1. 为了防止写入hive的文件数量过多 ...
- 【kafka】kafka consumer offset lag获取的三者方式
1.概述 本文主要讨论一下kafka consumer offset lag的监控 方案 利用官方的类库 ConsumerOffsetChecker ConsumerGroupCommand 利用官方 ...
- 【Kafka】kafka Current offset xxx for partition xxx out range
文章目录 1.背景 1.背景 kafka报错 kafka Current offset xxx for partition xxx out range 该问题和以下2个问题有所关系 [Kafka]ka ...
- 一次kafka的offset回退事件及相关知识点
一次kafka的offset回退事件及相关知识点 原文链接:https://blog.csdn.net/lkforce/article/details/83384747
- SpringBoot 重置 kafka 偏移量offset(kafka-0.10.1.0)
如果使用的自动提交偏移量的模式,偏移量会给到kafka或者zk进行管理,其中kafka的偏移量重置给了重新消费kafka内未过期的数据提供了机会,当消费者出错,比如消费了数据,但是中途处理失败,导致数 ...
- 聊聊kafka consumer offset lag increase异常
序 本文主要解析一下遇到的一个kafka consumer offset lag不断增大的异常. 查看consumer消费情况 Group Topic Pid Offset logSize Lag O ...
- java 获取kafka lag,聊聊kafka consumer offset lag的监控
序 本文主要讨论一下kafka consumer offset lag的监控 方案 利用官方的类库 ConsumerOffsetChecker ConsumerGroupCommand 利用官方的JM ...
最新文章
- nginx反代理服务器
- apache 编译支持php_apache2 不支持php文件 解决办法(示例代码)
- Android应用开发—PendingIntent:如何判断两个PendingIntent对等
- 无惧海量并发,运维准点下班全靠它
- vertx:Flink报错 严重: Caught unexpected Throwable IllegalAccessError: tried to access class io.netty.uti
- java泛型实例化_java基础-泛型举例详解
- linux tcp_nodelay,仔细看参数--NGINX之tcp_nodelay
- [置顶]团队开发经验:如何带领一个项目团队并做好项目总结
- CNBlog客户端--第一阶段记录
- linux zk服务 关闭_linux上安装zookeeper 启动和关闭的教程
- WebLogic部署配置
- 计算指定位数的圆周率
- 动态html函数的写法,如何将html div id的动态传递给js函数
- 关于字符集的测试报告(转)
- JAVA个版本新特性
- 内核spinlock raw_spin_lock spin_lock_bh
- C++ OpenCV【人脸识别人眼识别】
- 独立按键控制继电器开关
- 关于人们感知与数字视音频编码的关系入门-视觉篇01.
- 谷粒商城高级篇笔记1