Kafka Shell Lag

kafka 版本:2.1.0

前言

在生产环境中,比如你正在使用group kafka-lag消费某topic内的数据。目前你没有搭建对应的监控系统,你如何去查看对应partition 的堆积信息呢?很多人都会去使用这个命令:

# 正常使用
kafka-consumer-groups --bootstrap-server master:9092 --describe --group default# 系统存在kerberos认证使用
kafka-consumer-groups --bootstrap-server master:9092 --describe --group default --command-config /home/xiahu/client.properties

client.properties

security.protocol=PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka

没错,今天我们就来研究一下这个命令,先从kafka-consumer-groups启动脚本看起

1. kafka-consumer-groups.sh

# 该脚本只是简单的去调用了另外一个脚本kafka-run-class.sh,并将参数传递过去
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"

2. kafka-run-class.sh

# 这个脚本内的内容太多了,其他的我也没看,但是你所需要明白的是:
# 在命令行执行: kafka-consumer-groups --bootstrap-server master:9092 --describe --group default
# 最终调用:kafka.admin.ConsumerGroupCommand --bootstrap-server master:9092 --describe --group default
# 所以主要看源码:kafka.admin.ConsumerGroupCommand 这个类
if [ "x$DAEMON_MODE" = "xtrue" ]; thenecho $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
elseexec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
fi

3. ConsumerGroupCommand

def main(args: Array[String]) {val opts = new ConsumerGroupCommandOptions(args)if (args.length == 0)CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.")val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt, opts.resetOffsetsOpt).count(opts.options.has)if (actions != 1)CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete, --reset-offsets")//参数判断opts.checkArgs()//通过ConsumerGroupCommandOptions,构造ConsumerGroupService对象val consumerGroupService = new ConsumerGroupService(opts)try {if (opts.options.has(opts.listOpt))consumerGroupService.listGroups().foreach(println(_))else if (opts.options.has(opts.describeOpt))//因为此次我们探究的是kafka lag的数据,所以主要看方法consumerGroupService.describeGroup()else if (opts.options.has(opts.deleteOpt))...
}

4. describeGroup()

def describeGroup(): Unit = {// 从配置类中获取配置val group = opts.options.valuesOf(opts.groupOpt).asScala.headval membersOptPresent = opts.options.has(opts.membersOpt)val stateOptPresent = opts.options.has(opts.stateOpt)val offsetsOptPresent = opts.options.has(opts.offsetsOpt)val subActions = Seq(membersOptPresent, offsetsOptPresent, stateOptPresent).count(_ == true)if (subActions == 0 || offsetsOptPresent) {// kafka lag 信息的查询,主要封装与该类中val offsets = collectGroupOffsets()printOffsets(group, offsets._1, offsets._2)} else if (membersOptPresent) {val members = collectGroupMembers(opts.options.has(opts.verboseOpt))printMembers(group, members._1, members._2, opts.options.has(opts.verboseOpt))} elseprintState(group, collectGroupState())
}

5. collectGroupOffsets()

def collectGroupOffsets(): (Option[String], Option[Seq[PartitionAssignmentState]]) = {val groupId = opts.options.valueOf(opts.groupOpt)// 首先构造AdminClient 对象// 关于Admin Client,查看该博客即可了解:https://blog.csdn.net/zc0565/article/details/102791488// AdminClient 根据 groupId  获取 ConsumerGroupDescription  //ConsumerGroupDescription: A detailed description of a single consumer group in the cluster.val consumerGroup = adminClient.describeConsumerGroups(List(groupId).asJava,withTimeoutMs(new DescribeConsumerGroupsOptions())).describedGroups.get(groupId).getval state = consumerGroup.state// 根据groupId 返回一个Map对象<TopicPartition,OffsetAndMetadata>// TopicPartition: 内部封装topic,partition// OffsetAndMetadata : 内部封装当前topic,partition 对应的groupId 的 当前的offset 和元数据信息// 比如: // topic:kafka_lag_test partition:0 groupId:kafka-lag// 众所周知,topic + partition + groupId 都对应着唯一的 :currentOffset    val committedOffsets = getCommittedOffsets(groupId).asScala.toMapvar assignedTopicPartitions = ListBuffer[TopicPartition]()// 下面这段代码主要过滤空的TopicPartition,并且封装TopicPartition 对应的currentOffsetval rowsWithConsumer = consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq.sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size).flatMap { consumerSummary =>val topicPartitions = consumerSummary.assignment.topicPartitions.asScalaassignedTopicPartitions = assignedTopicPartitions ++ topicPartitionsval partitionOffsets = consumerSummary.assignment.topicPartitions.asScala.map { topicPartition =>topicPartition -> committedOffsets.get(topicPartition).map(_.offset)}.toMap// 主要看一下这个方法collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), topicPartitions.toList,partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),Some(s"${consumerSummary.clientId}"))}val rowsWithoutConsumer = committedOffsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap {case (topicPartition, offset) =>collectConsumerAssignment(groupId,Option(consumerGroup.coordinator),Seq(topicPartition),Map(topicPartition -> Some(offset.offset)),Some(MISSING_COLUMN_VALUE),Some(MISSING_COLUMN_VALUE),Some(MISSING_COLUMN_VALUE))}(Some(state.toString), Some(rowsWithConsumer ++ rowsWithoutConsumer))}

6. collectConsumerAssignment

//该方法返回一个PartitionAssignmentState数据
private def collectConsumerAssignment(group: String,coordinator: Option[Node],topicPartitions: Seq[TopicPartition],getPartitionOffset: TopicPartition => Option[Long],consumerIdOpt: Option[String],hostOpt: Option[String],clientIdOpt: Option[String]):
Array[PartitionAssignmentState] = {// 一般情况下,topicPartitions为空if (topicPartitions.isEmpty) {Array[PartitionAssignmentState](PartitionAssignmentState(group, coordinator, None, None, None, getLag(None, None), consumerIdOpt, hostOpt, clientIdOpt, None))}else// 主要看这个方法describePartitions(group, coordinator, topicPartitions.sortBy(_.partition), getPartitionOffset, consumerIdOpt, hostOpt, clientIdOpt)
}

7. describePartitions

private def describePartitions(group: String,coordinator: Option[Node],topicPartitions: Seq[TopicPartition],getPartitionOffset: TopicPartition => Option[Long],consumerIdOpt: Option[String],hostOpt: Option[String],clientIdOpt: Option[String]): Array[PartitionAssignmentState] = {def getDescribePartitionResult(topicPartition: TopicPartition, logEndOffsetOpt: Option[Long]): PartitionAssignmentState = {val offset = getPartitionOffset(topicPartition)PartitionAssignmentState(group, coordinator, Option(topicPartition.topic), Option(topicPartition.partition), offset,getLag(offset, logEndOffsetOpt), consumerIdOpt, hostOpt, clientIdOpt, logEndOffsetOpt)}//getLogEndOffsets//1. 根据bootstrap-server,groupId 实例化KafkaConsumer对象//2. 根据TopicPartition,调用KafkaConsumer的endOffsets方法,获取topic内每一个partition的最大offset//3. 根据之前查询到的groupId对应topic内每一个partition的currentOffset,与此次获取到的offset,做一个计算,最终得到Lag,并将其封装PartitionAssignmentState返回getLogEndOffsets(topicPartitions).map {case (topicPartition, LogOffsetResult.LogOffset(offset)) => getDescribePartitionResult(topicPartition, Some(offset))case (topicPartition, _) => getDescribePartitionResult(topicPartition, None)}.toArray
}

说明

在kafka内,有以下几个概念

  1. broker
  2. topic
  3. partition
  4. group
  5. offset

分别说明:

1. broker

broker可以理解为一台安装kafka的机器,多个broker构成kafka集群,如果只有一个broker,那么这个kafka服务是单机的

2. topic

topic 翻译过来为主题. 一个kafka集群下有多个topic

3. partition

partition翻译为分区,hive里面就有分区的概念,与hive的分区类似,一个topic 内有多个partition

4. groupId

结合实际说明:
目前,我有 topic: kafka_lag ,该topic有两个partition,目前往topic内生产10000条数据,按照默认的分区测试,partition 0,partition 1 分别有 5000 条数据.
除此之外,我有两个group:kafka-consumer-lag-1,kafka-consumer-lag-2

首先:我使用kafka-consumer-lag-1 去消费topic内的数据,加入,partition0,1 分别消费2000 ,则offset 如下:

groupId topic partition currentOffset lag endOffset
kafka-consumer-lag-1 kafka_lag 0 2000 3000 5000
kafka-consumer-lag-1 kafka_lag 1 2000 3000 5000
kafka-consumer-lag-2 kafka_lag 0 0 5000 5000
kafka-consumer-lag-2 kafka_lag 1 0 5000 5000

然后,我用 kafka-consumer-lag-2 去消费topic内的数据,partition 0,1 分区消费4000 ,则offset如下:

groupId topic partition currentOffset lag endOffset
kafka-consumer-lag-1 kafka_lag 0 2000 3000 5000
kafka-consumer-lag-1 kafka_lag 1 2000 3000 5000
kafka-consumer-lag-2 kafka_lag 0 4000 1000 5000
kafka-consumer-lag-2 kafka_lag 1 4000 1000 5000

总结

由上面的数据展示可知:

topic + partition 对应唯一的endOffset

topic + partition + group 对应唯一的currentOffset

其实kafka 提供的 kafka-run-class.sh 就是使用的这个原理

  1. 构造AdminClient,使用AdminClient 的listConsumerGroupOffsets() 根据groupid 获取每一个 topic + partition + groupId 对应的唯一的currentOffset
  2. 实例化KafkaConsumer对象,根据topic + partiton 组成的TopicPartition 对象集合,获取 topic + partition 对应的唯一的endOffset
  3. 通过一系列计算(endOffset - currentOffset),获取到了groupID 对应的Lag ,最终打印呈现

由于kafka 源码是使用scala写的,没了解过scala的人看起来会比较困难,我用java重新给逻辑实现了一遍,代码如下:

package com.clb.lag;import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
import java.util.function.Consumer;/*** @author Xiahu* @create 2021/1/11*/
public class KafkaOffsetTool {private AdminClient adminClient;private static final String MISSING_COLUMN_VALUE = "-";private KafkaConsumer consumer;public KafkaOffsetTool() {Properties properties = new Properties();properties.put("bootstrap.servers", "node2:9092");//kerberos认证需要自己实现if (false) {properties.put("sasl.kerberos.service.name", "kafka");properties.put("sasl.mechanism", "GSSAPI");properties.put("security.protocol", "PLAINTEXT");}this.adminClient = AdminClient.create(properties);}public List<PartitionOffsetState> collectGroupOffsets(String group) throws Exception {List<PartitionOffsetState> result = new ArrayList<>();List<String> groupId = Arrays.asList(group);Map<String, KafkaFuture<ConsumerGroupDescription>> describedGroups = adminClient.describeConsumerGroups(groupId).describedGroups();ConsumerGroupDescription consumerGroup = describedGroups.get(group).get();ConsumerGroupState state = consumerGroup.state();Map<TopicPartition, OffsetAndMetadata> committedOffsets = getCommitsOffsets(group);Collection<MemberDescription> memberDescriptions = consumerGroup.members();Set<MemberDescription> memberDescriptionSet = new HashSet<>();Iterator<MemberDescription> iterator = memberDescriptions.iterator();while (iterator.hasNext()) {MemberDescription memberDescription = iterator.next();if (null != memberDescription.assignment().topicPartitions()) {memberDescriptionSet.add(memberDescription);}}memberDescriptionSet.stream().sorted(new Comparator<MemberDescription>() {@Overridepublic int compare(MemberDescription o1, MemberDescription o2) {if (o1.assignment().topicPartitions().size() >= o2.assignment().topicPartitions().size()) {return 1;} else {return -1;}}}).forEach(new Consumer<MemberDescription>() {@Overridepublic void accept(MemberDescription memberDescription) {Set<TopicPartition> topicPartitions = memberDescription.assignment().topicPartitions();for (TopicPartition tp : topicPartitions) {long offset = committedOffsets.get(tp).offset();PartitionOffsetState partitionOffsetState = new PartitionOffsetState();partitionOffsetState.setGroup(group);partitionOffsetState.setCoordinator(consumerGroup.coordinator().toString());partitionOffsetState.setHost(memberDescription.host());partitionOffsetState.setClientId(memberDescription.clientId());partitionOffsetState.setConsumerId(memberDescription.consumerId());partitionOffsetState.setPartition(tp.partition());partitionOffsetState.setTopic(tp.topic());partitionOffsetState.setOffset(offset);result.add(partitionOffsetState);}}});//封装committedOffsetsIterator<Map.Entry<TopicPartition, OffsetAndMetadata>> entryIterator = committedOffsets.entrySet().iterator();while (entryIterator.hasNext()) {Map.Entry<TopicPartition, OffsetAndMetadata> entry = entryIterator.next();PartitionOffsetState partitionOffsetState = new PartitionOffsetState();partitionOffsetState.setGroup(group);partitionOffsetState.setCoordinator(consumerGroup.coordinator().toString());partitionOffsetState.setHost(MISSING_COLUMN_VALUE);partitionOffsetState.setClientId(MISSING_COLUMN_VALUE);partitionOffsetState.setConsumerId(MISSING_COLUMN_VALUE);partitionOffsetState.setPartition(entry.getKey().partition());partitionOffsetState.setTopic(entry.getKey().topic());partitionOffsetState.setOffset(entry.getValue().offset());result.add(partitionOffsetState);}return result;}private Map<TopicPartition, OffsetAndMetadata> getCommitsOffsets(String groupId) throws Exception {Map<TopicPartition, OffsetAndMetadata> result = adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();return result;}public List<PartitionOffsetState> getLag(List<PartitionOffsetState> partitionOffsetStateList,String groupId) {getConsumer(new Properties(), groupId);List<TopicPartition> topicPartitionList = new ArrayList<>();for (PartitionOffsetState partitionOffset : partitionOffsetStateList) {topicPartitionList.add(new TopicPartition(partitionOffset.getTopic(), partitionOffset.getPartition()));}Map<TopicPartition, Long> map = consumer.endOffsets(topicPartitionList);for (PartitionOffsetState partitionOffset : partitionOffsetStateList) {for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {if (entry.getKey().topic().equals(partitionOffset.getTopic()) && entry.getKey().partition() == partitionOffset.getPartition()) {partitionOffset.setLag(entry.getValue() - partitionOffset.getOffset());partitionOffset.setLogEndOffset(entry.getValue());}}}return partitionOffsetStateList;}private KafkaConsumer getConsumer(Properties prop, String groupId) {if (consumer == null) {createConsumer(prop, groupId);}return consumer;}public void createConsumer(Properties prop, String groupId) {//kerberos认证需要自己实现if (false) {System.setProperty("java.security.krb5.conf", prop.getProperty(NuwaConstant.KERBEROS_KRB5));System.setProperty("java.security.auth.login.config", prop.getProperty(NuwaConstant.KERBEROS_LOGIN_CONFIG));prop.put(NuwaConstant.KAFKA_SECURITY_PROTOCOL, prop.getProperty(NuwaConstant.KAFKA_SECURITY_PROTOCOL));prop.put(NuwaConstant.KAFKA_SASL_MECHANISM, prop.getProperty(NuwaConstant.KAFKA_SASL_MECHANISM));prop.put(NuwaConstant.KAFKA_SASL_KERBEROS_SERVICE_NAME, prop.getProperty(NuwaConstant.KAFKA_SASL_KERBEROS_SERVICE_NAME));}String deserializer = StringDeserializer.class.getName();String broker = "node1:9092";prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer);prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);consumer = new KafkaConsumer(prop);}public static void main(String[] args) throws Exception {KafkaOffsetTool kafkaOffsetTool = new KafkaOffsetTool();List<PartitionOffsetState> partitionOffsetStates = kafkaOffsetTool.collectGroupOffsets("kafka-lag");partitionOffsetStates = kafkaOffsetTool.getLag(partitionOffsetStates,"kafka-lag");System.out.println(partitionOffsetStates);}
}

PartitionOffsetState

package com.clb.lag;import lombok.Data;/*** @author Xiahu* @create 2021/1/11*/
@Data
public class PartitionOffsetState {private String group;private String coordinator;private String topic;private int partition;private Long offset;private Long lag;private String consumerId;private String host;private String clientId;private Long logEndOffset;
}

Kafka Shell Lag相关推荐

  1. kafka中topic默认属性_分享:Kafka 的 Lag 计算误区及正确实现

    前言 消息堆积是消息中间件的一大特色,消息中间件的流量削峰.冗余存储等功能正是得益于消息中间件的消息堆积能力.然而消息堆积其实是一把亦正亦邪的双刃剑,如果应用场合不恰当反而会对上下游的业务造成不必要的 ...

  2. java kafka 拉取_java获取kafka consumer lag

    maven依赖 org.apache.kafka kafka-clients 0.10.1.0 注意:kafka-clients版本需要0.10.1.0以上,因为调用了新增接口endOffsets; ...

  3. kafka python 性能_使用 Python 监控 Kafka Consumer LAG

    我在要完成这个需求的时候大概有两个思路. 第一种方法: 我们直接使用 Kafka 提供的 bin 工具,去把我们关心的 lag 值 show 出来然后通过代码处理一下报出来.例如: 我们可以起个远程的 ...

  4. Kafka 的 Lag 计算误区及正确实现

    前言 消息堆积是消息中间件的一大特色,消息中间件的流量削峰.冗余存储等功能正是得益于消息中间件的消息堆积能力.然而消息堆积其实是一把亦正亦邪的双刃剑,如果应用场合不恰当反而会对上下游的业务造成不必要的 ...

  5. kafka shell

    停止 ./bin/kafka-server-stop.sh  启动 nohup sh kafka-server-start.sh ../config/server.properties & 创 ...

  6. Kafka Shell 基本操作

    1. 启动集群每个节点的进程 nohup kafka-server-start.sh \ /home/hadoop/apps/kafka_2.11-1.1.0/config/server.proper ...

  7. kafka linux 脚本测试,kafka shell命令操作

    1. 查看topic 选项说明: - --list :查看kafka所有的topic - --bootstrap-server : 连接kafka集群 - --hadoop102:9092:hadoo ...

  8. 再看Kafka Lag

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  9. 再看 Kafka Lag

    在<Kafka的Lag计算误区及正确实现>一文中提及了kafka.admin.ConsumerGroupCommand.PartitionAssignmentState无法被外部访问,故要 ...

最新文章

  1. AndroidStudio 3.4 自定义注解处理器不起作用
  2. MIT 学生挑战新泽西索取挖矿程序源代码的要求
  3. (13) 悲观锁和乐观锁解决hibernate并发(转)
  4. Fiori Elements里General Information的设计原理
  5. 基于气动人工肌肉的双足机器人关节设计
  6. 对std::listT的封装
  7. Kafka高性能相关
  8. mysql bitmap index_[MySQL] mysql中bitmap的简单运用
  9. JavaScript中有关数据结构和算法的最佳书籍
  10. 《中国人工智能学会通讯》——第12章 12.1 新世纪知识工程—— 在哪里跨越
  11. Duplicate methods named spliterator with the parameters () and () are inherited from the types Colle
  12. html5 easyui 布局,Easyui 在面板中创建复杂布局_EasyUI 插件
  13. k3安装服务器系统,论如何逗比的在2008R2上安装金蝶K3服务器
  14. Gson报错Invalid time zone indicator ‘ ‘
  15. 20171028_艾孜_Java_第二次
  16. 深入理解Plasma(四):Plasma Cash
  17. 80后的罗敏已经在创业路上走了十几年
  18. 软件构造作业——100道算术题
  19. 企业大楼AI无感考勤解决方案
  20. 港科夜闻|香港科大汪扬教授轻松访谈:对话西泽投资管理主席刘央女士,倾听她跌宕起伏的30年投资生涯...

热门文章

  1. android 键盘隐藏监听,Android监听键盘显示和隐藏
  2. 项目管理的五大过程组和十大知识领域
  3. HR详谈求职简历筛选之道
  4. 软件体系结构期末复习
  5. 代码灵异事件 -- php使用redis的hgetall方法出现的怪问题
  6. 阅读摘抄——把时间当做朋友
  7. 18-《致橡树》-舒婷
  8. 计算机网络中tx和fx,100Base-TX/T4/FX以太网含义及用法
  9. 方队 - 启发式合并 - 主席树
  10. 程序员的修仙之路-筑基篇