问题来源

实际项目中,我们会遇到kafka消费不及时,系统发现最新的数据一致无法出现,这时候通过其他kafka工具发现原来指定的消费组lag太大,也就是我们系统要么出问题,要么需要启动更多的实例加快消费消息。每次通过kafka工具去查询lag基本都是手工而且耗时且慢,能否在自己的系统中集成查询指定消费组lag的功能,然后出现问题是可以管理界面中迅速查看lag呢?答案是当然可以。 kafka提供了这些API。

备注:这里的kafka工具是指kafka自带的命令行工具,或者其他第三方提供的kafka工具。
注意事项:本博客所有代码是为了介绍相关内容而编写或者引用的,示例代码并非可直接用于生产的代码。仅供参考而已。

实现思路

首先我们需要引入的依赖如下。 其中kafka-clients是作为kafka客户端访问kakfa需要的依赖包,kafka_2.12是管理端需要的依赖包。
备注:我的kafka版本是kafka_2.12-1.0.0。

        <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>1.0.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.0</version></dependency>

kafka管理端的AdminClient可以查询消费组的详情。

AdminClient.ConsumerGroupSummary  consumerGroupSummary =  adminClient.describeConsumerGroup(GROUP_ID, 5000);scala.collection.immutable.Map<TopicPartition, Object> maps =  adminClient.listGroupOffsets(GROUP_ID);

listGroupOffsets可以查询消费组的在指定topic的指定分区中的offset. 然后我们可以通过查询该TopicPartition最新的postion得到endOffset。
两者之差就是指定消费组在指定topic的某个分区上的lag信息。

 KafkaConsumer<String, String> consumer = getNewConsumer();consumer.assign(Arrays.asList(topicPartition));consumer.seekToEnd(Arrays.asList(topicPartition));long endOffset = consumer.position(topicPartition);

具体代码

完整的代码在这里,欢迎加星和fork。 谢谢!

package com.yq;import kafka.admin.AdminClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import scala.Option;
import scala.collection.immutable.List;import java.util.Arrays;
import java.util.Properties;/*** Simple to Introduction* className: SendMessageMain** @author EricYang* @version 2019/01/10 11:30*/
@Slf4j
public class AdminMain {private static final String SERVERS = "ubuntu:9092";private static final String GROUP_ID = "yq-consumer09";public static long getLogEndOffset(TopicPartition topicPartition){KafkaConsumer<String, String> consumer = getNewConsumer();consumer.assign(Arrays.asList(topicPartition));consumer.seekToEnd(Arrays.asList(topicPartition));long endOffset = consumer.position(topicPartition);consumer.close();return endOffset;}public static KafkaConsumer getNewConsumer(){Properties props = new Properties();props.put("bootstrap.servers", SERVERS);props.put("group.id", "test001");props.put("enable.auto.commit", "true");props.put("auto.offset.reset", "earliest");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);return consumer;}public static void main(String[] args) throws InstantiationException, IllegalAccessException {Properties props = new Properties();props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, SERVERS);AdminClient adminClient = AdminClient.create(props);AdminClient.ConsumerGroupSummary  consumerGroupSummary =  adminClient.describeConsumerGroup(GROUP_ID, 5000);if(consumerGroupSummary.state().equals("Empty")){System.out.println("No grp summary");}System.out.println("consumerGrpSummary State " +  consumerGroupSummary.state());Option<List<AdminClient.ConsumerSummary>> consumerSummaryOption =  consumerGroupSummary.consumers();scala.collection.immutable.Map<TopicPartition, Object> maps =  adminClient.listGroupOffsets(GROUP_ID);scala.collection.Set<TopicPartition> topicPartitions = maps.keySet();scala.collection.immutable.List<TopicPartition> topicPartitionList = topicPartitions.reversed();for(int j =0; j< topicPartitionList.size(); j++){TopicPartition topicPartition = topicPartitionList.apply(j);String currentOffset = maps.get(topicPartition).get().toString();long groupLastEndOffset = getLogEndOffset(topicPartition);long lag =  groupLastEndOffset -Long.valueOf(currentOffset);System.out.println("topic:"+topicPartition.topic()+",  partition:" + topicPartition.partition() + ", offset:"+ currentOffset + ", groupLastEndOffset:"+ groupLastEndOffset + ", lag:"+ lag);}adminClient.close();}
}

效果截图

kakfa如何查询指定消费组lag相关推荐

  1. 跟我学Kafka:Kafka消费组运维详解

    作为一个Kafka初学者,需要快速成长,承担维护公司Kafka的重任,对Kafka的学习,我按照三步走策略: 阅读Kafka相关书籍 从运维实战的角度学习Kafka 阅读源码,体系化,精细化掌握其实现 ...

  2. Kafka学习笔记(十)kakfa消费组和重平衡

    版权声明:本文为转载文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 原文链接:https://blog.csdn.net/weixin_39468305/articl ...

  3. 【flink】flink 消费组死掉 Lag不变 kafka不提交 重启恢复 非常诡异

    文章目录 1.场景1 1.1 钙素 2.场景2 2.场景2 1.场景1 1.1 钙素 是这样的,我有一个环境,遇到一个这样的现象,flink任务运行了 13天了,然后看消费组状态的时候 发现lag很大 ...

  4. 大名鼎鼎又臭名昭著的消费组和重平衡

    点击上方蓝色"胖滚猪学编程",选择"设为星标" 跟着胖滚猪学编程!好玩!有趣! 摘要:Consumer Group 指多个消费者实例组成一个组来共同消费一组主题 ...

  5. kafka专题:kafka的Topic主题、Partition分区、消费组偏移量offset等基本概念详解

    文章目录 1. kafka集群整体架构 2. kafka相关元素的基本概念 2.1 主题Topic和分区Partition 2.2 kafka消息存储在哪里? 2.3 分区副本 2.4 消费组和偏移量 ...

  6. Kafka生成者/消费组详解

    什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区.多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系 ...

  7. Spring Cloud构建微服务架构:消息驱动的微服务(消费组)【Dalston版】

    通过之前的<消息驱动的微服务(入门)>一文,相信很多朋友已经对Spring Cloud Stream有了一个初步的认识.但是,对于<消息驱动的微服务(核心概念)>一文中提到的一 ...

  8. kafka 主动消费_Kafka消费组(consumer group)

    在开始之前,我想花一点时间先来明确一些概念和术语,这会极大地方便我们下面的讨论.另外请原谅这文章有点长,毕竟要讨论的东西很多,虽然已然删除了很多太过细节的东西. 一. 误区澄清与概念明确 1 Kafk ...

  9. 【kafka】kafka 查看 GroupCoordinator 以及 kafka Group dead 消费组死掉 以及 GroupCoordinatorRequest 使用

    本文为博主九师兄(QQ:541711153 欢迎来探讨技术)原创文章,未经允许博主不允许转载. 文章目录 1.概述 2.主类 2.1 乌龙测试 2.2 正常返回节点 2.3 小结 1.概述 因为这篇博 ...

最新文章

  1. HBase常用API操作
  2. Java动态代理的实现
  3. 皮一皮:当你在上海地铁里被夹住后...
  4. 高桥盾react和boost_boost与react的战斗
  5. 任务切换的基础:模拟任务切换时寄存器的保存与恢复
  6. java月份去0_java – 使用月份解析日期而不是前导0
  7. mysql编译安装原理_MySQL编译安装全过程
  8. jep(java表达式分析器)简介
  9. 阿里技术副总裁贾扬清:我对人工智能的一点浅见 | 技术头条
  10. centos命令行xkill
  11. 网络工程师Day6--实验3-2 NAT配置
  12. How to live?
  13. C++windows内核编程笔记day11 win32静态库和动态库的使用
  14. 微信开门,给你简单极致的开门体验!
  15. 共享文件 麒麟系统_银河麒麟操作系统上共享文件目录的方法实践
  16. 慕课秒杀项目seckill
  17. 通过扫码下载安卓和ios安装包
  18. unity-shader 2D - Sprite 影子
  19. 云原生 | 混沌工程工具 ChaosBlade Operator Pod 篇(文末赠书)
  20. 【手机】手机选购指南

热门文章

  1. 计算机控制键盘,键盘装置及其计算机控制系统的制作方法
  2. java字符转转长整型_P104 将数字字符串转换成长整型整数 ★★
  3. Ngrok的注册使用
  4. 【人工智能 AI 2.0】阿里VP贾扬清被曝将离职创业:建大模型基础设施 已火速锁定首轮融资
  5. seetaface6 GPU版本windows编译
  6. StackOverflow和OutOfMemory
  7. c++ 回车键无法换行
  8. splay的一些操作
  9. 《趣味知识博文》小W与小L带你聊天式备考CDA Level Ⅰ(六)
  10. 从数据仓库到数据集市