1.概述

转载: https://www.cnblogs.com/huxi2b/p/7284767.html

本文阐述如何使用Kafka自带的kafka-consumer-groups.sh脚本随意设置消费者组(consumer group)的位移。需要特别强调的是, 这是0.11.0.0版本提供的新功能且只适用于新版本consumer。

在新版本之前,如果要为已有的consumer group调整位移必须要手动编写Java程序调用KafkaConsumer#seek方法,费时费力不说还容易出错。0.11.0.0版本丰富了kafka-consumer-groups脚本的功能,用户可以直接使用该脚本很方便地为已有的consumer group重新设置位移,但前提是:consumer group状态必须是inactive的,即不能是处于正在工作中的状态。

先务虚一下。总体来说,重设位移的流程由3步组成,如下图所示:


确定topic作用域——当前有3种作用域指定方式:

--all-topics(为consumer group下所有topic的所有分区调整位移)
--topic t1 --topic t2(为指定的若干个topic的所有分区调整位移)
--topic t1:0,1,2(为指定的topic分区调整位移)

确定位移重设策略——当前支持8种设置规则:

--to-earliest:把位移调整到分区当前最小位移
--to-latest:把位移调整到分区当前最新位移
--to-current:把位移调整到分区当前位移
--to-offset <offset>: 把位移调整到指定位移处
--shift-by N: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动
--to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000
--by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S
--from-file <file>:从CSV文件中读取调整策略

确定执行方案——当前支持3种方案:

什么参数都不加:只是打印出位移调整方案,不具体执行
--execute:执行真正的位移调整
--export:把位移调整方案按照CSV格式打印,方便用户成csv文件,供后续直接使用

针对上面的8种策略,本文重点演示前面7种策略。
首先,我们创建一个测试topic,5个分区,并发送5,000,000条测试消息:

bin/kafka-topics.sh --zookeeper localhost:2181 --create --partitions 5 --replication-factor 1 --topic topic_lccCreated topic "topic_lcc".

模拟发送数据

[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-producer-perf-test.sh --topic  topic_lcc --num-records 5000000 --throughput -1 --record-size 100 --producer-props bootstrap.servers=localhost:9092 acks=-1
254093 records sent, 50345.4 records/sec (4.80 MB/sec), 279.0 ms avg latency, 675.0 max latency.
525399 records sent, 104828.2 records/sec (10.00 MB/sec), 1454.9 ms avg latency, 2530.0 max latency.
739109 records sent, 147821.8 records/sec (14.10 MB/sec), 2224.5 ms avg latency, 2659.0 max latency.

然后,启动一个console consumer程序,组名设置为test-group:


[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_lcc --from-beginning --consumer-property group.id=test-groupProcessed a total of 133596 messages

待运行一段时间后关闭consumer程序将group设置为inactive。现在运行kafka-consumer-groups.sh脚本首先确定当前group的消费进度:

[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --describe
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'test-group' has no active members.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
topic_lcc       0          298155          2064715         1766560         -               -               -
[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$

由上面输出可知,当前5个分区LAG列的值不是0,表示没有消费完毕。现在我们演示下如何重设位移。

2.–to-earliest

[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-earliest --execute
Note: This will not show information about old Zookeeper-based consumers.TOPIC                          PARTITION  NEW-OFFSET
topic_lcc                      0          164559
[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$

上面输出表明,所有分区的位移都已经被重设为164559

3.–to-latest

[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-latest --execute
Note: This will not show information about old Zookeeper-based consumers.TOPIC                          PARTITION  NEW-OFFSET
topic_lcc                      0          2064715
[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$

上面输出表明,所有分区的位移都已经被重设为最新位移,即2064715

4.–to-offset <offset>

[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset 10000  --execute
Note: This will not show information about old Zookeeper-based consumers.
[2020-10-28 23:04:13,055] WARN New offset (10000) is lower than earliest offset for topic partition topic_lcc-0. Value will be set to 164559 (kafka.admin.ConsumerGroupCommand$)TOPIC                          PARTITION  NEW-OFFSET
topic_lcc                      0          164559
[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset 200000  --execute
Note: This will not show information about old Zookeeper-based consumers.TOPIC                          PARTITION  NEW-OFFSET
topic_lcc                      0          200000
[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$

上面输出表明,所有分区的位移都已经调整为给定的200000,如果你的值小于最老的Offset,那么自动定位到最老的位置

5.–to-current

[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-current --execute
Note: This will not show information about old Zookeeper-based consumers.TOPIC                          PARTITION  NEW-OFFSET
topic_lcc                      0          200000
[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$

输出表明所有分区的位移都已经被移动到当前位移(这个有点傻,因为位移距上一步没有变动)

6.–shift-by N

[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset 200000  --execute
Note: This will not show information about old Zookeeper-based consumers.TOPIC                          PARTITION  NEW-OFFSET
topic_lcc                      0          200000
[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --shift-by -1000 --execute
Note: This will not show information about old Zookeeper-based consumers.TOPIC                          PARTITION  NEW-OFFSET
topic_lcc                      0          199000
[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$

输出表明所有分区的位移被移动到(200000 - 1000) = 199000处

7.–to-datetime

[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-datetime 2017-08-04T14:30:00.000
WARN: In a future major release, the default behavior of this command will be to prompt the user before executing the reset rather than doing a dry run. You should add the --dry-run option explicitly if you are scripting this command and want to keep the current default behavior without prompting.
Note: This will not show information about old Zookeeper-based consumers.TOPIC                          PARTITION  NEW-OFFSET
topic_lcc                      0          164559
[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-datetime 2020-10-28T20:00:00.000
WARN: In a future major release, the default behavior of this command will be to prompt the user before executing the reset rather than doing a dry run. You should add the --dry-run option explicitly if you are scripting this command and want to keep the current default behavior without prompting.
Note: This will not show information about old Zookeeper-based consumers.TOPIC                          PARTITION  NEW-OFFSET
topic_lcc                      0          2064715
[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$

将所有分区的位移调整为2020-10-28T20:00:00.000之后的最早位移

8.–by-duration

[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --by-duration PT0H30M0S
WARN: In a future major release, the default behavior of this command will be to prompt the user before executing the reset rather than doing a dry run. You should add the --dry-run option explicitly if you are scripting this command and want to keep the current default behavior without prompting.
Note: This will not show information about old Zookeeper-based consumers.TOPIC                          PARTITION  NEW-OFFSET
topic_lcc                      0          164571
[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$

将所有分区位移调整为30分钟之前的最早位移

【kafka】Kafka 1.1.0 consumer group位移重设相关推荐

  1. Kafka consumer group位移0ffset重设

    本文阐述如何使用Kafka自带的kafka-consumer-groups.sh脚本随意设置消费者组(consumer group)的位移.需要特别强调的是, 这是0.11.0.0版本提供的新功能且只 ...

  2. Kafka 关于消费者组名Consumer Group

    由于本人是在Windows个人机上搭建了一个Kafka服务.所以使用中遇到了一个棘手的问题,也是第一次使用Kafka消息队列,所以对其核心知识知之甚少. 一个困扰了些许时日的问题,经常因为log文件正 ...

  3. 消费者组 Consumer Group 和 重平衡 Rebalance

    kafka设计了consumer group: 具有可扩展性和容错性的consumer机制,consumer group有3个特性: 1. Consumer Group 下可以有一个或多个 Consu ...

  4. kafka 主动消费_Kafka消费组(consumer group)

    在开始之前,我想花一点时间先来明确一些概念和术语,这会极大地方便我们下面的讨论.另外请原谅这文章有点长,毕竟要讨论的东西很多,虽然已然删除了很多太过细节的东西. 一. 误区澄清与概念明确 1 Kafk ...

  5. Kafka设计解析(十三)Kafka消费组(consumer group)

    转载自 huxihx,原文链接 Kafka消费组(consumer group) 一直以来都想写一点关于kafka consumer的东西,特别是关于新版consumer的中文资料很少.最近Kafka ...

  6. Kafka(Go)教程(十一)---Consumer Group Rebalance

    来自:指月 https://www.lixueduan.com 原文:https://www.lixueduan.com/post/kafka/11-consumer-group-rebalance/ ...

  7. Kafka消费组(consumer group)(转)

    转载自:http://www.cnblogs.com/huxi2b/p/6223228.html 一直以来都想写一点关于kafka consumer的东西,特别是关于新版consumer的中文资料很少 ...

  8. kafka 分组消费topic_Kafka消费组(consumer group)(转)

    在开始之前,我想花一点时间先来明确一些概念和术语,这会极大地方便我们下面的讨论.另外请原谅这文章有点长,毕竟要讨论的东西很多,虽然已然删除了很多太过细节的东西. 一. 误区澄清与概念明确 1 Kafk ...

  9. Kafka消费组(consumer group)

    原文出处:https://www.cnblogs.com/huxi2b/p/6223228.html 转载请注明出处. ---------------------------------------- ...

最新文章

  1. 使用 collections 来创建类似元组对象
  2. CSS选择器分类与优先级
  3. AngularJS集合数据遍历显示
  4. centos环境下使用percona-xtrabackup对mysql5.6数据库innodb和myisam进行快速备份及恢复...
  5. 洛谷P1527 [国家集训队] 矩阵乘法 [整体二分,二维树状数组]
  6. 为什么中国开发不出流行的操作系统和编程语言呢?
  7. node怎么把token放到redis_从零开始手写 redis(八)朴素 LRU 淘汰算法性能优化
  8. HDU 5533 Dancing Stars on Me( 有趣的计算几何 )
  9. iOS开发--XMPPFramework--用户登录(三)
  10. Node.js 0.8.20 稳定版发布
  11. JAVA商城项目(微服务框架)——第4天 乐优商城项目搭建
  12. hibernate查询的方式 都有哪些
  13. 如何在Linux上安装和使用TeamViewer
  14. 你要问我应用层?我就和你扯扯扯
  15. Craw the picture of the specific handle
  16. cesium实现立体墙(垂直、水平)渐变泛光效果
  17. opencv---c++
  18. 回首2022,展望2023
  19. 【转】刀锋一样的眼神
  20. 抖音seo源码 短视频seo源码二次开发,怎么使用抖音seo源码,视频seo源码私有化部署?

热门文章

  1. keep公众号就“借鉴”原创文章致歉:将停更一周
  2. 电影《你好,李焕英》进入全球票房榜前100
  3. Redmi K30S更多细节曝光:骁龙865加持 提供多款配色
  4. 交出娃哈哈,宗庆后还是不放心?
  5. 《八佰》正式上映不到两天 累计票房破6亿元
  6. 英特尔回应苹果换芯:将继续支持老客户,但我的CPU才是最好的
  7. 荣耀推出MOSCHINO联名款荣耀20 PRO手机 售价3799元
  8. iPhone越来越难打动你?从iOS平台转投安卓阵营 这招你得学会!
  9. 亚马逊无人商店因拒收现金被美国多地禁止:被认定歧视消费者
  10. Spring技术原理之Spring IOC