版本

這個看起來有點多此一舉,

我一開始也是這麼想的。

後來經過測試發現,新版本的kafka已經不再兼容老版本的kafka中的命令了,所以本篇記錄是爲了針對新版本的kafka的相關操作的。

組件 版本
Zookeeper 3.6.0
Kafka 2.5.0

概念

①生产者Offset

生产者写入topic的各个partition时,有多少个partition就有多少个offset

②消费者Offset

这是某一个分区的offset情况,我们已经知道生产者写入的offset是最新最大的值也就是12,

而当Consumer A进行消费时,他从0开始消费,一直消费到了9,他的offset就记录在了9,

Consumer B就纪录在了11。

等下一次他们再来消费时,他们可以选择接着上一次的位置消费,当然也可以选择从头消费,或者跳到最近的记录并从“现在”开始消费。

此时,每个partition有多少消费组,那就有多少个offset

消费者组

费者组的概念其实并不影响对offset的理解,上面的情况Consumer A,Consumer B如果是同组就不能同时消费一个分区的消息,不同组的消费者可以同时消费一个分区的消息。

还有一种offset的说法,就是consumer消费未提交时,本地是有另外一个offset的,这个offset不一定与集群中记录的offset一致。

所以,kafka每一个topic分区和生产者,消费者不同,是有多个offset的。

概念总结如下:

offset是指某一个分区的偏移量。

topic partition offset 这三个唯一确定一条消息。

生产者的offset其实就是最新的offset。

消费者的offset是他自己维护的,他可以选择分区最开始,最新,也可以记住他消费到哪了。

消费者组是为了不同组的消费者可以同时消费一个分区的消息。

關於Group這個概念

首先注意,producer沒有group的說法,

kafka提到group一定是consumer這邊。

Group-id位置

具體文件 變量
$KAFKA/config/consumer.properties group.id
$KAFKA/config/connect-distributed.properties group.id

如果在新建topic的時候,不特別指定,那麼默認使用的是consumer.properties裏面的group.id

__consumer_offsets的哪個partition保存了consumer group的位移信息

查看$KAFKA/config/consumer.properties

得到group.id是test-consumer-group,

填入下面的代碼並運行

public class kafka_hash {public static void main(String args[]){System.out.println(Math.abs("test-consumer-group".hashCode()) % 50);}}

實驗結果爲31

也就是說__consumer_offsets的partition 31保存了consumer group的位移信息

記住31這個數字,後面會用到

操作

操作

具體命令

创建topic $KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --create --topic mytopic --replication-factor 3 --partitions 3
查看topic列表 $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181
生產數據 $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic mytopic
存放在各個partition的offset終點 $KAFKA/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list Desktop:9091,Laptop:9092,Laptop:9093 --topic mytopic --time -1
存放在各個partition的offset起點

$KAFKA/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list Desktop:9091,Laptop:9092,Laptop:9093 --topic mytopic --time -2

查询__consumer_offsets topic所有内容 $KAFKA/bin/kafka-console-consumer.sh --consumer.config $KAFKA/config/consumer.properties \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
--bootstrap-server Desktop:9091 --topic __consumer_offsets --from-beginning
查詢__consumer_offsets的partition 31包含的關於mytopic的offset信息
$KAFKA/bin/kafka-console-consumer.sh  \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
--bootstrap-server Desktop:9091 --topic __consumer_offsets  --partition 31 --from-beginning

上面的最後一句命令會得到:

[test-consumer-group,mytopic,1]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[0], metadata=, commitTimestamp=1611494686173, expireTimestamp=None)
[test-consumer-group,mytopic,0]::OffsetAndMetadata(offset=4, leaderEpoch=Optional[0], metadata=, commitTimestamp=1611494686173, expireTimestamp=None)
[test-consumer-group,mytopic,2]::OffsetAndMetadata(offset=4, leaderEpoch=Optional[0], metadata=, commitTimestamp=1611494691174, expireTimestamp=None)
[test-consumer-group,mytopic,1]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[0], metadata=, commitTimestamp=1611494691174, expireTimestamp=None)

上述表格中produce端需要的数据如下:

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "561558", "item_id":"3611281", "category_id": "965809", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "894923", "item_id":"3076029", "category_id": "1879194", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "834377", "item_id":"4541270", "category_id": "3738615", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "315321", "item_id":"942195", "category_id": "4339722", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "625915", "item_id":"1162383", "category_id": "570735", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "578814", "item_id":"176722", "category_id": "982926", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "873335", "item_id":"1256540", "category_id": "1451783", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "429984", "item_id":"4625350", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "866796", "item_id":"534083", "category_id": "4203730", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "937166", "item_id":"321683", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

其他

另外,根據[5],在同一個consumer group中,對於同一個topic而言,只能有一個consumer消費到其中特定的一條數據。

[6]中提到了Coordinator

Reference:

[1]Kafka到底有几个Offset?——Kafka核心之偏移量机制
[2]自己维护kafka_offset中的坑

[3]kafka查询最新producer offset的命令

[4]Kafka 如何读取offset topic内容 (__consumer_offsets)

[5]多个consumer使用同一个group.id消费同一个topic

[6]Kafka消费组(consumer group)

kafka的offset笔记相关推荐

  1. Apache Kafka实战读书笔记(推荐指数:☆☆☆☆☆)

    Apache Kafka实战读书笔记(推荐指数:☆☆☆☆☆) 认识AK 快速入门 安装和启动 小案例 消息引擎系统 消息引擎范型 AK的概要设计 吞吐量/延时 消息持久化 负载均衡和故障转移: 伸缩性 ...

  2. Kafka auto.offset.reset

    要从头消费kafka的数据,可以通过以下参数: Kafka auto.offset.reset = earliest 转载于:https://www.cnblogs.com/drjava/p/1045 ...

  3. flink 写kafka_flink消费kafka的offset与checkpoint

    生产环境有个作业,逻辑很简单,读取kafka的数据,然后使用hive catalog,实时写入hbase,hive,redis.使用的flink版本为1.11.1. 为了防止写入hive的文件数量过多 ...

  4. 【kafka】kafka consumer offset lag获取的三者方式

    1.概述 本文主要讨论一下kafka consumer offset lag的监控 方案 利用官方的类库 ConsumerOffsetChecker ConsumerGroupCommand 利用官方 ...

  5. 【Kafka】kafka Current offset xxx for partition xxx out range

    文章目录 1.背景 1.背景 kafka报错 kafka Current offset xxx for partition xxx out range 该问题和以下2个问题有所关系 [Kafka]ka ...

  6. 一次kafka的offset回退事件及相关知识点

    一次kafka的offset回退事件及相关知识点 原文链接:https://blog.csdn.net/lkforce/article/details/83384747

  7. SpringBoot 重置 kafka 偏移量offset(kafka-0.10.1.0)

    如果使用的自动提交偏移量的模式,偏移量会给到kafka或者zk进行管理,其中kafka的偏移量重置给了重新消费kafka内未过期的数据提供了机会,当消费者出错,比如消费了数据,但是中途处理失败,导致数 ...

  8. 聊聊kafka consumer offset lag increase异常

    序 本文主要解析一下遇到的一个kafka consumer offset lag不断增大的异常. 查看consumer消费情况 Group Topic Pid Offset logSize Lag O ...

  9. java 获取kafka lag,聊聊kafka consumer offset lag的监控

    序 本文主要讨论一下kafka consumer offset lag的监控 方案 利用官方的类库 ConsumerOffsetChecker ConsumerGroupCommand 利用官方的JM ...

最新文章

  1. nginx反代理服务器
  2. apache 编译支持php_apache2 不支持php文件 解决办法(示例代码)
  3. Android应用开发—PendingIntent:如何判断两个PendingIntent对等
  4. 无惧海量并发,运维准点下班全靠它
  5. vertx:Flink报错 严重: Caught unexpected Throwable IllegalAccessError: tried to access class io.netty.uti
  6. java泛型实例化_java基础-泛型举例详解
  7. linux tcp_nodelay,仔细看参数--NGINX之tcp_nodelay
  8. [置顶]团队开发经验:如何带领一个项目团队并做好项目总结
  9. CNBlog客户端--第一阶段记录
  10. linux zk服务 关闭_linux上安装zookeeper 启动和关闭的教程
  11. WebLogic部署配置
  12. 计算指定位数的圆周率
  13. 动态html函数的写法,如何将html div id的动态传递给js函数
  14. 关于字符集的测试报告(转)
  15. JAVA个版本新特性
  16. 内核spinlock raw_spin_lock spin_lock_bh
  17. C++ OpenCV【人脸识别人眼识别】
  18. 独立按键控制继电器开关
  19. 关于人们感知与数字视音频编码的关系入门-视觉篇01.
  20. 谷粒商城高级篇笔记1

热门文章

  1. 正则实现二代身份证号码验证详解
  2. selective gaussian blur /adaptive-blur
  3. 看AppStore评价
  4. 论坛一大早白屏,无法访问
  5. access中总计为first_用Access开发生产管理系统
  6. x86已安装该产品 剑灵vcredist_MySQL Server v5.7正式版(附安装和配置数据库教程)
  7. JQuery中元素的数据存储
  8. JQuery中的元素选择器
  9. Iterator 遍历器的简单使用
  10. OpenCV的Python接口