本文阐述如何使用Kafka自带的kafka-consumer-groups.sh脚本随意设置消费者组(consumer group)的位移。需要特别强调的是, 这是0.11.0.0版本提供的新功能且只适用于新版本consumer。

在新版本之前,如果要为已有的consumer group调整位移必须要手动编写Java程序调用KafkaConsumer#seek方法,费时费力不说还容易出错。版本丰富了kafka-consumer-groups脚本的功能,用户可以直接使用该脚本很方便地为已有的consumer group重新设置位移,但前提是:consumer group状态必须是inactive的,即不能是处于正在工作中的状态。



--all-topics(为consumer group下所有topic的所有分区调整位移)
--topic t1 --topic t2(为指定的若干个topic的所有分区调整位移)
--topic t1:0,1,2(为指定的topic分区调整位移)


--to-offset <offset>: 把位移调整到指定位移处
--shift-by N: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动
--to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000
--by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S
--from-file <file>:从CSV文件中读取调整策略




bin/kafka-topics.sh --zookeeper localhost:2181 --create --partitions 5 --replication-factor 1 --topic topic_lccCreated topic "topic_lcc".


[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-producer-perf-test.sh --topic  topic_lcc --num-records 5000000 --throughput -1 --record-size 100 --producer-props bootstrap.servers=localhost:9092 acks=-1
254093 records sent, 50345.4 records/sec (4.80 MB/sec), 279.0 ms avg latency, 675.0 max latency.
525399 records sent, 104828.2 records/sec (10.00 MB/sec), 1454.9 ms avg latency, 2530.0 max latency.
739109 records sent, 147821.8 records/sec (14.10 MB/sec), 2224.5 ms avg latency, 2659.0 max latency.

然后,启动一个console consumer程序,组名设置为test-group:

[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_lcc --from-beginning --consumer-property group.id=test-groupProcessed a total of 133596 messages


[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --describe
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'test-group' has no active members.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
topic_lcc       0          298155          2064715         1766560         -               -               -
[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$



[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-earliest --execute
Note: This will not show information about old Zookeeper-based consumers.TOPIC                          PARTITION  NEW-OFFSET
topic_lcc                      0          164559
[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$



[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-latest --execute
Note: This will not show information about old Zookeeper-based consumers.TOPIC                          PARTITION  NEW-OFFSET
topic_lcc                      0          2064715
[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$


4.–to-offset <offset>

[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset 10000  --execute
Note: This will not show information about old Zookeeper-based consumers.
[2020-10-28 23:04:13,055] WARN New offset (10000) is lower than earliest offset for topic partition topic_lcc-0. Value will be set to 164559 (kafka.admin.ConsumerGroupCommand$)TOPIC                          PARTITION  NEW-OFFSET
topic_lcc                      0          164559
[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset 200000  --execute
Note: This will not show information about old Zookeeper-based consumers.TOPIC                          PARTITION  NEW-OFFSET
topic_lcc                      0          200000
[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$



[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-current --execute
Note: This will not show information about old Zookeeper-based consumers.TOPIC                          PARTITION  NEW-OFFSET
topic_lcc                      0          200000
[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$


6.–shift-by N

[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset 200000  --execute
Note: This will not show information about old Zookeeper-based consumers.TOPIC                          PARTITION  NEW-OFFSET
topic_lcc                      0          200000
[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --shift-by -1000 --execute
Note: This will not show information about old Zookeeper-based consumers.TOPIC                          PARTITION  NEW-OFFSET
topic_lcc                      0          199000
[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$

输出表明所有分区的位移被移动到(200000 - 1000) = 199000处


[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-datetime 2017-08-04T14:30:00.000
WARN: In a future major release, the default behavior of this command will be to prompt the user before executing the reset rather than doing a dry run. You should add the --dry-run option explicitly if you are scripting this command and want to keep the current default behavior without prompting.
Note: This will not show information about old Zookeeper-based consumers.TOPIC                          PARTITION  NEW-OFFSET
topic_lcc                      0          164559
[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-datetime 2020-10-28T20:00:00.000
WARN: In a future major release, the default behavior of this command will be to prompt the user before executing the reset rather than doing a dry run. You should add the --dry-run option explicitly if you are scripting this command and want to keep the current default behavior without prompting.
Note: This will not show information about old Zookeeper-based consumers.TOPIC                          PARTITION  NEW-OFFSET
topic_lcc                      0          2064715
[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$



[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --by-duration PT0H30M0S
WARN: In a future major release, the default behavior of this command will be to prompt the user before executing the reset rather than doing a dry run. You should add the --dry-run option explicitly if you are scripting this command and want to keep the current default behavior without prompting.
Note: This will not show information about old Zookeeper-based consumers.TOPIC                          PARTITION  NEW-OFFSET
topic_lcc                      0          164571
[lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$


