kafka监控获取logSize, offset, lag等信息
由于项目需要,需要查看kafka消费信息lag(lag = logSize - offset)
参考https://www.aliyun.com/jiaocheng/775267.html 的实现方式在有些场景无法获取offset的值(具体原因暂不知晓后续研究下)
因此决定直接从zookeeper中取offset值
一、springboot项目添加依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
二、相关代码
import java.io.IOException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class KafkaUtil {private static Logger logger = LoggerFactory.getLogger(KafkaUtil.class);private static final int ZOOKEEPER_TIMEOUT = 30000;private final CountDownLatch latch = new CountDownLatch(1);public ZooKeeper getZookeeper(String connectionString) {ZooKeeper zk = null;try {zk = new ZooKeeper(connectionString, ZOOKEEPER_TIMEOUT, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (Event.KeeperState.SyncConnected.equals(event.getState())) {latch.countDown();}}});latch.await();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}return zk;}public static Properties getConsumerProperties(String groupId, String bootstrap_servers) {Properties props = new Properties();props.put("group.id", groupId);props.put("bootstrap.servers", bootstrap_servers);props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return props;}/*** 获取logSize, offset, lag等信息* @param zk* @param bootstrap_servers* @param groupId* @param topics null查询groupId消费过的所有topic* @param sorted* @return* @throws Exception*/public List<Map<String, Object>> getLagByGroupAndTopic(ZooKeeper zk, String bootstrap_servers, String groupId,String[] topics, boolean sorted) throws Exception {List<Map<String, Object>> topicPatitionMapList = new ArrayList<>();// 获取group消费过的所有topicList<String> topicList = null;if (topics == null || topics.length == 0) {try {topicList = zk.getChildren("/consumers/" + groupId + "/offsets", false);} catch (KeeperException | InterruptedException e) {logger.error("从zookeeper获取topics失败:zkState: {}, groupId:{}", zk.getState(), groupId);throw new Exception("从zookeeper中获取topics失败");}} else {topicList = Arrays.asList(topics);}Properties consumeProps = getConsumerProperties(groupId, bootstrap_servers);logger.info("consumer properties:{}", consumeProps);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumeProps);// 查询topic partitionsfor (String topic : topicList) {List<PartitionInfo> partitionsFor = consumer.partitionsFor(topic);//由于有时延, 尽量逐个topic查询, 减少lag为负数的情况List<TopicPartition> topicPartitions = new ArrayList<>();// 获取topic对应的 TopicPartitionfor (PartitionInfo partitionInfo : partitionsFor) {TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());topicPartitions.add(topicPartition);}// 查询logSizeMap<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);for (Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {TopicPartition partitionInfo = entry.getKey();// 获取offsetString offsetPath = MessageFormat.format("/consumers/{0}/offsets/{1}/{2}", groupId, partitionInfo.topic(),partitionInfo.partition());byte[] data = zk.getData(offsetPath, false, null);long offset = Long.valueOf(new String(data));Map<String, Object> topicPatitionMap = new HashMap<>();topicPatitionMap.put("group", groupId);topicPatitionMap.put("topic", partitionInfo.topic());topicPatitionMap.put("partition", partitionInfo.partition());topicPatitionMap.put("logSize", endOffsets.get(partitionInfo));topicPatitionMap.put("offset", offset);topicPatitionMap.put("lag", endOffsets.get(partitionInfo) - offset);topicPatitionMapList.add(topicPatitionMap);}}consumer.close();if(sorted) {Collections.sort(topicPatitionMapList, new Comparator<Map<String,Object>>() {@Overridepublic int compare(Map<String, Object> o1, Map<String, Object> o2) {if(o1.get("topic").equals(o2.get("topic"))) {return ((Integer)o1.get("partition")).compareTo((Integer)o2.get("partition"));}return ((String)o1.get("topic")).compareTo((String)o2.get("topic"));}});}return topicPatitionMapList;}public static void main(String[] args) throws Exception {String bootstrap_servers = "localhost:9092";String groupId = "interface-group-new";String[] topics = null;//{"test1", "test2", test3};KafkaUtil kafkaUtil = new KafkaUtil();String connectionString = "localhost:2181";ZooKeeper zk = kafkaUtil.getZookeeper(connectionString);if (zk == null) {throw new RuntimeException("获取zookeeper连接失败");}List<Map<String, Object>> topicPatitionMapList = kafkaUtil.getLagByGroupAndTopic(zk, bootstrap_servers,groupId, topics, true);for (Map<String, Object> map : topicPatitionMapList) {System.out.println(map);}zk.close();}
}
三、说明
调用时参数topics为空会获取到groupId所有消费过的topic(zookeeper会保存消费过的groupId的offset值)
List<PartitionInfo> partitionsFor = consumer.partitionsFor(topic);
获取到 List<PartitionInfo> 后要尽快查询zookeeper对应的offset,避免由于继续生产消费或时延导致offset > logSize
参考:https://www.aliyun.com/jiaocheng/775267.html
kafka监控获取logSize, offset, lag等信息相关推荐
- 【Kafka】Kafka使用代码设置offset值
1.概述 转载:https://www.cnblogs.com/jinniezheng/p/6379639.html package com.kafka.consumer.offset.update; ...
- java 获取kafka lag,聊聊kafka consumer offset lag的监控
序 本文主要讨论一下kafka consumer offset lag的监控 方案 利用官方的类库 ConsumerOffsetChecker ConsumerGroupCommand 利用官方的JM ...
- 【kafka】kafka consumer offset lag获取的三者方式
1.概述 本文主要讨论一下kafka consumer offset lag的监控 方案 利用官方的类库 ConsumerOffsetChecker ConsumerGroupCommand 利用官方 ...
- 聊聊kafka consumer offset lag increase异常
序 本文主要解析一下遇到的一个kafka consumer offset lag不断增大的异常. 查看consumer消费情况 Group Topic Pid Offset logSize Lag O ...
- java查看kafka数据量_Java kafka监控 topic的数据量count情况,每个topic的offset,
Java使用kafka的API来监控kafka的某些topic的数据量增量,offset,定时查总量之后,然后计算差值,然后就可以算单位间隔的每个topic的增量,kafka监控一般都是监控的吞吐量, ...
- 【kafka系列教程41】kafka监控
Monitoring Kafka uses Yammer Metrics for metrics reporting in both the server and the client. This c ...
- DataPipeline |《Apache Kafka实战》作者胡夕:Apache Kafka监控与调优
胡夕,<Apache Kafka实战>作者,北航计算机硕士毕业,现任某互金公司计算平台总监,曾就职于IBM.搜狗.微博等公司.国内活跃的Kafka代码贡献者. 前言 虽然目前Apache ...
- Kafka监控KafkaOffsetMonitor【转】
1.概述 前面给大家介绍了Kafka的背景以及一些应用场景,并附带上演示了Kafka的简单示例.然后,在开发的过程当中,我们会发现一些问题,那就是消息的监控情况.虽然,在启动Kafka的相关服务后,我 ...
- kafka详解(JAVA API操作kafka、kafka原理、kafka监控)-step2
1.JAVA API操作kafka 修改Windows的Host文件: 目录:C:\Windows\System32\drivers\etc (win10) 内容: 192.168.40.150 k ...
最新文章
- vue-devTools插件安装流程
- java对象的内存分配
- MASM5.0汇编环境安装
- Php传图缩图,使用以下用于上传图像的PHP代码上传时缩小图像大小
- Starting MySQL.. ERROR! The server quit without updating PID file (/usr/local/mysql/data/vm10-0-0-19
- iis7设置html支持asp,Win7下启用IIS7配置ASP运行环境的详细方法
- Springboot 2.x 单元测试 JUnit 5
- js 日期星期 带农历
- KMP算法———模板
- 网络篇:朋友面试之TCP/IP,回去等通知吧
- oracle 层次查询判断叶子和根节点
- 工业大数据发展面临四方面挑战
- 【更新】PDF控件Spire.PDF V3.9.463发布 | 修复多个PDF转换bug
- ROSCon 2019 机器人操作系统国际盛会
- 关于芯片、CPU的区别的简单理解
- 信息系统项目管理师必背知识点(完整版)
- 利穗IT网络工程师面试试题
- 常见的配置中心:Apollo(二)-接入Apollo
- Typora使用详解
- 机器学习-8(单调函数)
热门文章
- tensorboard 拒绝了我们的连接请求
- mysql查询总成绩高于240_Egret应用开发实践(02) MVC 模块化 - SegmentFault 思否
- Aspose.Words for .NET Crack 23.1.0
- 我的iphone6退货之路
- 在c语言中pwm的作用,PWM调速的C语言程序编写(非常简单);
- MySQL数据库入门实战教程
- 凸优化——凸优化问题与算法
- 微信开放平台开发第三方授权登陆(二):PC网页端
- 【数字IC/FPGA】电平同步、脉冲同步、边沿同步
- iphoneX的safari浏览器操作栏隐藏时兼容底部Home键