欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。

欢迎跳转到本文的原文链接:https://honeypps.com/mq/how-to-get-kafka-consumer-details/


前文摘要

在上一篇文章《Kafka的Lag计算误区及正确实现》中介绍了如何计算消费者的消费滞后量(Lag),并且讲解了如何调用Kafka的kafka.admin.ConsumerGroupCommand文件中的KafkaConsumerGroupService来发送OffsetRequest和OffsetFetchRequest两个请求,进而通过两个请求结果之间的差值来获得结果。不过如果你不想修改kafka-core的代码并重新编译的话,这种实现方式无法成功,所以本文的主要目的就是通过调用更底层的API来实现不修改kafka-core的代码来实现KafkaConsumerGroupService的功能,即通过Java调用Scala的代码来实现获取Kafka的消费者详情的功能。

目标及实现

实现如同 bin/kafka-consumer-group.sh --describe --bootstrap-server localhost:9092 --group CONSUMER_GROUP_ID的效果:

[root@node2 kafka_2.12-1.0.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group CONSUMER_GROUP_ID
TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                   CLIENT-ID
topic-test1          0          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID
topic-test1          1          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID
topic-test1          2          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID
topic-test1          3          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID

KafkaConsumerGroupService的核心方法是CollectGroupAssignment,其方法参数为一个consumer group的groupId,方法输出为上面示例中的列表信息。CollectGroupAssignment方法主要有以下几个步骤:

  1. 根据groupId调用describeConsumerGroup方法(内部原理是发送DescribeGroupsRequest请求)来获取consumer group的基本信息,参考上面示例中的CONSUMER-ID、HOST、CLIENT-ID以及TopicPartition信息,但是没有CURRENT-OFFSET、LOG-END-OFFSET、LAG信息。注意这里的LOG-END-OFFSET是消费者可见的LEO,不是生产者可见的LEO,也就是通俗意义上的HW。
  2. 根据groupId调用listGroupOffsets方法(内部原理是发送OffsetFetchRequest请求)来获取各个分区(Partition)的对应的消费位移CURRENT-OFFSET。
  3. 通过调用KafkaConsumer的endOffsets方法来获取TopicPartition对应的HW,即示例中的LOG-END-OFFSET。
  4. 计算Lag并组合成信息列表List<PartitionAssignmentState>。

改造

对应Java版的KafkaConsumerGroupService改造代码可以参见代码,目录结构如下图所示:

其中model中的ConsumerGroupSummary、ConsumerSummary和PartitionAssignmentState是简单的JavaBean, PartitionAssignmentState是用来保存每个TopicPartition的消费者信息的,具体内容参考如下。KafkaConsumerGroupCustomService就是本文所要陈述的Java改造办的KafkaConsumerGroupSerivice,ConsumerGroupUtils用来存放一些公用的代码。

@Data
@Builder
public class PartitionAssignmentState {private String group; // groupIdprivate Node coordinator; // consumer coodinator节点信息private String topic;private int partition;private long offset;private long lag;private String consumerId;private String host;private String clientId;private long logEndOffset;
}

初始化KafkaConsumerGroupCustomService需要Kafka的服务端地址,然后初始化AdminClient和KafkaConsumer,AdminClient中包含了众多管理类方法,主要是通过发送各种自定义协议请求来完成,上面步骤中所说的describeConsumerGroup和listGroupOffsets方法也是通过AdminClient来实现的;KafkaConsumer主要是用来获取TopicPartition对应的HW(消费者可见的LogEndOffsets)的。

KafkaConsumerGroupCustomService中与scala版对应的collectGroupAssignment方法如下(详细步骤参考代码注释):

public List<PartitionAssignmentState> collectGroupAssignment(AdminClient adminClient, KafkaConsumer<String, String> consumer,String group) {//1. 获取consumer group的基本信息,包括CONSUMER-ID、HOST、// CLIENT-ID以及TopicPartition信息AdminClient.ConsumerGroupSummary consumerGroupSummary= adminClient.describeConsumerGroup(group, 0);List<TopicPartition> assignedTopicPartitions = new ArrayList<>();List<PartitionAssignmentState> rowsWithConsumer = new ArrayList<>();scala.collection.immutable.List<AdminClient.ConsumerSummary> consumers= consumerGroupSummary.consumers().get();if (consumers != null) {//2. 获取各个分区(Partition)的对应的消费位移CURRENT-OFFSETscala.collection.immutable.Map<TopicPartition, Object> offsets= adminClient.listGroupOffsets(group);if (offsets.nonEmpty()) {String state = consumerGroupSummary.state();// 3. 还有一个状态是Dead表示"group"对应的consumer group不存在if (state.equals("Stable") || state.equals("Empty")|| state.equals("PreparingRebalance")|| state.equals("AwaitingSync")) {List<ConsumerSummary> consumerList = changeToJavaList(consumers);// 4. 获取当前有消费者的消费信息,即包含CONSUMER-ID、HOST、CLIENT-IDrowsWithConsumer = getRowsWithConsumer(consumerGroupSummary, offsets,consumer, consumerList, assignedTopicPartitions, group);}}//5. 获取当前没有消费者的消费信息List<PartitionAssignmentState> rowsWithoutConsumer =getRowsWithoutConsumer(consumerGroupSummary,offsets, consumer, assignedTopicPartitions, group);//6. 合并结果rowsWithConsumer.addAll(rowsWithoutConsumer);}return rowsWithConsumer;
}

KafkaConsumerGroupCustomService类中包含有getRowsWithConsumer()、getRowsWithoutConsumer()、changeToJavaList等私有方法也都是在Scala语言与Java语言之间进行切换,这样可以不需要修改kafka-core的原生代码而通过外部的封装调用既可以实现获取Kafka消费者详情的功能。光看代码比较抽象,建议对此感兴趣的同学可以亲自对比一下kafka-core包中kafka.admin.ConsumerGroupCommand的KafkaConsumerGroupSerivice与笔者自定义的KafkaConsumerGroupCustomService的实现来了解下Scala语言到Java语言的转换。

如果需要打印详情可以调用KafkaConsumerGroupCustomService同目录的ConsumerGroupUtils类中的printPasList(List list)方法。注意要运行这些代码需要JDK8的环境,笔者为了让代码显得“骚气”一点就用来一点Java8的语法,如果需要Java7的代码实现可以关注私聊。

或许有些同学对于Scala和Java交叉的代码并不感冒,想要寻求一种存Java式的实现方式,那么在这里怎么实现呢?答案是通过KafkaAdminClient,它是AdminClient的Java版实现,从Kafka0.11.0.0版本开始引入的,不过KafkaAdminClient本身并没有提供describeConsumerGroup、listGroupOffsets之类的方法给我们直接使用,扩展一下也很方便,由于篇幅限制,这部分的内容将在下一篇文章中进行介绍,如果想要先一睹为快,可以参考下代码实现,详细的逻辑解析敬请期待….

欢迎跳转到本文的原文链接:https://honeypps.com/mq/how-to-get-kafka-consumer-details/


欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


如何获取Kafka的消费者详情——从Scala到Java的切换相关推荐

  1. Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式

    Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式,可以从代码中简单理解成Receiver方式是通过zookeeper来连接kafka队列,Direct方 ...

  2. Apache Kafka Consumer 消费者集

    1.目标 在我们的上一篇文章中,我们讨论了Kafka Producer.今天,我们将讨论Kafka Consumer.首先,我们将看到什么是Kafka Consumer和Kafka Consumer的 ...

  3. Kafka的消费者概念

    应用程序使用 KafkaConsumer向 Kafka 订阅主题,并从订阅的主题上接收消息 . 从 Kafka 读取数据不同于从其他悄息系统读取数据,它涉及一些独特的概念和想法.如果不先理解 这些概念 ...

  4. kafka实战-消费者offset重置问题

    kafka实战-消费者offset重置问题 背景 问题现象 分析原因 问题解决 附-常见的消费者配置描述和调优方案 1. max.poll.records 2. fetch.max.bytes 3. ...

  5. CC00034.kafka——|Hadoopkafka.V19|——|kafka.v19|消费者位移管理.v02|

    一.消费者位移管理数据准备 ### --- 准备数据~~~ # 生成消息文件 [root@hadoop ~]# for i in `seq 60`; do echo "hello yanqi ...

  6. 2021年大数据Kafka(十一):❤️Kafka的消费者负载均衡机制和数据积压问题❤️

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Kafka的消费者负载均衡机制和数据积压问题 一.kafka ...

  7. kafka java获取topic_通过编程方式获取Kafka中Topic的Metadata信息

    如果我们需要通过编程的方式来获取到TopicMetadataRequest请求到 def findLeader(topic: String): Unit = { val consumer = conn ...

  8. 【Kafka】kafka Java api 获取 kafka topic 或者 partition 占用的磁盘大小

    1.概述 kafka Java api 获取 kafka topic 或者 partition 占用的磁盘大小 package com.dtwave.kafka.storage;import org. ...

  9. 钉钉审批回调 获取单个审批实例详情  遇见System.Collections.Generic.List`1[DRMS.DingTalk.FormRowValue+ExtendValue] 错误

    /processinstance/get 获取单个审批实例详情 接口 错误:Error converting value "[{"emplId":"111111 ...

最新文章

  1. 整数中内存中的保存方式:大端、小端
  2. Scroller解析
  3. AE开发使用内存图层
  4. 玩转Numpy——linspace()函数使用详解
  5. mysql数据存储方式_数据存储在mysql的两种方式
  6. 浅析若干Java序列化工具
  7. Google AI面试题
  8. Android虚拟机Dalvik介绍
  9. 关于Dos窗口的设置
  10. 聊天社交即时通信源码IM 群聊/语音/视频/红包支付/不依赖第三方sdk即时通讯
  11. Yanobox Moods for mac(FCPX/AE/PR滤镜插件)激活版
  12. 编译原理构造词法分析器C语言,编译原理C语言词法分析器
  13. 更改C盘用户文件夹名
  14. 窃取式调度器(Stealing Scheduler)-高并发
  15. 国内有名的文化与教育调查研究咨询公司
  16. Vins-fusion GPS融合部分测试(自己的数据ZED+RTK)
  17. 共享文件与打印机设置
  18. 人一般长到几岁才会停止长高?
  19. Go语言Web项目搭建
  20. 一些常见warning的原因和解决方法

热门文章

  1. 计算机游戏 综述,计算机游戏对玩家认知能力影响的研究综述
  2. basequickadapter详解_BaseRecyclerViewAdapter(持续更新!)
  3. C# 类、对象、方法和属性详解(重新排版,页面整洁)
  4. 五周第四次课(4月23日)
  5. 人工智能化发展已经到了哪一步?
  6. docker-ce私有仓库搭建
  7. webservice linux 杀进程
  8. 一条insert语句导致的性能问题分析(一)
  9. 超级组合:用户中心+云平台
  10. 深入理解javascript 中的 delete(转)