kakfa如何查询指定消费组lag
问题来源
实际项目中,我们会遇到kafka消费不及时,系统发现最新的数据一致无法出现,这时候通过其他kafka工具发现原来指定的消费组lag太大,也就是我们系统要么出问题,要么需要启动更多的实例加快消费消息。每次通过kafka工具去查询lag基本都是手工而且耗时且慢,能否在自己的系统中集成查询指定消费组lag的功能,然后出现问题是可以管理界面中迅速查看lag呢?答案是当然可以。 kafka提供了这些API。
备注:这里的kafka工具是指kafka自带的命令行工具,或者其他第三方提供的kafka工具。
注意事项:本博客所有代码是为了介绍相关内容而编写或者引用的,示例代码并非可直接用于生产的代码。仅供参考而已。
实现思路
首先我们需要引入的依赖如下。 其中kafka-clients是作为kafka客户端访问kakfa需要的依赖包,kafka_2.12是管理端需要的依赖包。
备注:我的kafka版本是kafka_2.12-1.0.0。
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>1.0.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.0</version></dependency>
kafka管理端的AdminClient可以查询消费组的详情。
AdminClient.ConsumerGroupSummary consumerGroupSummary = adminClient.describeConsumerGroup(GROUP_ID, 5000);scala.collection.immutable.Map<TopicPartition, Object> maps = adminClient.listGroupOffsets(GROUP_ID);
listGroupOffsets可以查询消费组的在指定topic的指定分区中的offset. 然后我们可以通过查询该TopicPartition最新的postion得到endOffset。
两者之差就是指定消费组在指定topic的某个分区上的lag信息。
KafkaConsumer<String, String> consumer = getNewConsumer();consumer.assign(Arrays.asList(topicPartition));consumer.seekToEnd(Arrays.asList(topicPartition));long endOffset = consumer.position(topicPartition);
具体代码
完整的代码在这里,欢迎加星和fork。 谢谢!
package com.yq;import kafka.admin.AdminClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import scala.Option;
import scala.collection.immutable.List;import java.util.Arrays;
import java.util.Properties;/*** Simple to Introduction* className: SendMessageMain** @author EricYang* @version 2019/01/10 11:30*/
@Slf4j
public class AdminMain {private static final String SERVERS = "ubuntu:9092";private static final String GROUP_ID = "yq-consumer09";public static long getLogEndOffset(TopicPartition topicPartition){KafkaConsumer<String, String> consumer = getNewConsumer();consumer.assign(Arrays.asList(topicPartition));consumer.seekToEnd(Arrays.asList(topicPartition));long endOffset = consumer.position(topicPartition);consumer.close();return endOffset;}public static KafkaConsumer getNewConsumer(){Properties props = new Properties();props.put("bootstrap.servers", SERVERS);props.put("group.id", "test001");props.put("enable.auto.commit", "true");props.put("auto.offset.reset", "earliest");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);return consumer;}public static void main(String[] args) throws InstantiationException, IllegalAccessException {Properties props = new Properties();props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, SERVERS);AdminClient adminClient = AdminClient.create(props);AdminClient.ConsumerGroupSummary consumerGroupSummary = adminClient.describeConsumerGroup(GROUP_ID, 5000);if(consumerGroupSummary.state().equals("Empty")){System.out.println("No grp summary");}System.out.println("consumerGrpSummary State " + consumerGroupSummary.state());Option<List<AdminClient.ConsumerSummary>> consumerSummaryOption = consumerGroupSummary.consumers();scala.collection.immutable.Map<TopicPartition, Object> maps = adminClient.listGroupOffsets(GROUP_ID);scala.collection.Set<TopicPartition> topicPartitions = maps.keySet();scala.collection.immutable.List<TopicPartition> topicPartitionList = topicPartitions.reversed();for(int j =0; j< topicPartitionList.size(); j++){TopicPartition topicPartition = topicPartitionList.apply(j);String currentOffset = maps.get(topicPartition).get().toString();long groupLastEndOffset = getLogEndOffset(topicPartition);long lag = groupLastEndOffset -Long.valueOf(currentOffset);System.out.println("topic:"+topicPartition.topic()+", partition:" + topicPartition.partition() + ", offset:"+ currentOffset + ", groupLastEndOffset:"+ groupLastEndOffset + ", lag:"+ lag);}adminClient.close();}
}
效果截图
kakfa如何查询指定消费组lag相关推荐
- 跟我学Kafka:Kafka消费组运维详解
作为一个Kafka初学者,需要快速成长,承担维护公司Kafka的重任,对Kafka的学习,我按照三步走策略: 阅读Kafka相关书籍 从运维实战的角度学习Kafka 阅读源码,体系化,精细化掌握其实现 ...
- Kafka学习笔记(十)kakfa消费组和重平衡
版权声明:本文为转载文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 原文链接:https://blog.csdn.net/weixin_39468305/articl ...
- 【flink】flink 消费组死掉 Lag不变 kafka不提交 重启恢复 非常诡异
文章目录 1.场景1 1.1 钙素 2.场景2 2.场景2 1.场景1 1.1 钙素 是这样的,我有一个环境,遇到一个这样的现象,flink任务运行了 13天了,然后看消费组状态的时候 发现lag很大 ...
- 大名鼎鼎又臭名昭著的消费组和重平衡
点击上方蓝色"胖滚猪学编程",选择"设为星标" 跟着胖滚猪学编程!好玩!有趣! 摘要:Consumer Group 指多个消费者实例组成一个组来共同消费一组主题 ...
- kafka专题:kafka的Topic主题、Partition分区、消费组偏移量offset等基本概念详解
文章目录 1. kafka集群整体架构 2. kafka相关元素的基本概念 2.1 主题Topic和分区Partition 2.2 kafka消息存储在哪里? 2.3 分区副本 2.4 消费组和偏移量 ...
- Kafka生成者/消费组详解
什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区.多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系 ...
- Spring Cloud构建微服务架构:消息驱动的微服务(消费组)【Dalston版】
通过之前的<消息驱动的微服务(入门)>一文,相信很多朋友已经对Spring Cloud Stream有了一个初步的认识.但是,对于<消息驱动的微服务(核心概念)>一文中提到的一 ...
- kafka 主动消费_Kafka消费组(consumer group)
在开始之前,我想花一点时间先来明确一些概念和术语,这会极大地方便我们下面的讨论.另外请原谅这文章有点长,毕竟要讨论的东西很多,虽然已然删除了很多太过细节的东西. 一. 误区澄清与概念明确 1 Kafk ...
- 【kafka】kafka 查看 GroupCoordinator 以及 kafka Group dead 消费组死掉 以及 GroupCoordinatorRequest 使用
本文为博主九师兄(QQ:541711153 欢迎来探讨技术)原创文章,未经允许博主不允许转载. 文章目录 1.概述 2.主类 2.1 乌龙测试 2.2 正常返回节点 2.3 小结 1.概述 因为这篇博 ...
最新文章
- HBase常用API操作
- Java动态代理的实现
- 皮一皮:当你在上海地铁里被夹住后...
- 高桥盾react和boost_boost与react的战斗
- 任务切换的基础:模拟任务切换时寄存器的保存与恢复
- java月份去0_java – 使用月份解析日期而不是前导0
- mysql编译安装原理_MySQL编译安装全过程
- jep(java表达式分析器)简介
- 阿里技术副总裁贾扬清:我对人工智能的一点浅见 | 技术头条
- centos命令行xkill
- 网络工程师Day6--实验3-2 NAT配置
- How to live?
- C++windows内核编程笔记day11 win32静态库和动态库的使用
- 微信开门,给你简单极致的开门体验!
- 共享文件 麒麟系统_银河麒麟操作系统上共享文件目录的方法实践
- 慕课秒杀项目seckill
- 通过扫码下载安卓和ios安装包
- unity-shader 2D - Sprite 影子
- 云原生 | 混沌工程工具 ChaosBlade Operator Pod 篇(文末赠书)
- 【手机】手机选购指南