由于项目需要,需要查看kafka消费信息lag(lag = logSize - offset)

参考https://www.aliyun.com/jiaocheng/775267.html 的实现方式在有些场景无法获取offset的值(具体原因暂不知晓后续研究下)

因此决定直接从zookeeper中取offset值

一、springboot项目添加依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

二、相关代码

import java.io.IOException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class KafkaUtil {private static Logger logger = LoggerFactory.getLogger(KafkaUtil.class);private static final int ZOOKEEPER_TIMEOUT = 30000;private final CountDownLatch latch = new CountDownLatch(1);public ZooKeeper getZookeeper(String connectionString) {ZooKeeper zk = null;try {zk = new ZooKeeper(connectionString, ZOOKEEPER_TIMEOUT, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (Event.KeeperState.SyncConnected.equals(event.getState())) {latch.countDown();}}});latch.await();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}return zk;}public static Properties getConsumerProperties(String groupId, String bootstrap_servers) {Properties props = new Properties();props.put("group.id", groupId);props.put("bootstrap.servers", bootstrap_servers);props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return props;}/*** 获取logSize, offset, lag等信息* @param zk* @param bootstrap_servers* @param groupId* @param topics null查询groupId消费过的所有topic* @param sorted* @return* @throws Exception*/public List<Map<String, Object>> getLagByGroupAndTopic(ZooKeeper zk, String bootstrap_servers, String groupId,String[] topics, boolean sorted) throws Exception {List<Map<String, Object>> topicPatitionMapList = new ArrayList<>();// 获取group消费过的所有topicList<String> topicList = null;if (topics == null || topics.length == 0) {try {topicList = zk.getChildren("/consumers/" + groupId + "/offsets", false);} catch (KeeperException | InterruptedException e) {logger.error("从zookeeper获取topics失败:zkState: {}, groupId:{}", zk.getState(), groupId);throw new Exception("从zookeeper中获取topics失败");}} else {topicList = Arrays.asList(topics);}Properties consumeProps = getConsumerProperties(groupId, bootstrap_servers);logger.info("consumer properties:{}", consumeProps);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumeProps);// 查询topic partitionsfor (String topic : topicList) {List<PartitionInfo> partitionsFor = consumer.partitionsFor(topic);//由于有时延, 尽量逐个topic查询, 减少lag为负数的情况List<TopicPartition> topicPartitions = new ArrayList<>();// 获取topic对应的 TopicPartitionfor (PartitionInfo partitionInfo : partitionsFor) {TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());topicPartitions.add(topicPartition);}// 查询logSizeMap<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);for (Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {TopicPartition partitionInfo = entry.getKey();// 获取offsetString offsetPath = MessageFormat.format("/consumers/{0}/offsets/{1}/{2}", groupId, partitionInfo.topic(),partitionInfo.partition());byte[] data = zk.getData(offsetPath, false, null);long offset = Long.valueOf(new String(data));Map<String, Object> topicPatitionMap = new HashMap<>();topicPatitionMap.put("group", groupId);topicPatitionMap.put("topic", partitionInfo.topic());topicPatitionMap.put("partition", partitionInfo.partition());topicPatitionMap.put("logSize", endOffsets.get(partitionInfo));topicPatitionMap.put("offset", offset);topicPatitionMap.put("lag", endOffsets.get(partitionInfo) - offset);topicPatitionMapList.add(topicPatitionMap);}}consumer.close();if(sorted) {Collections.sort(topicPatitionMapList, new Comparator<Map<String,Object>>() {@Overridepublic int compare(Map<String, Object> o1, Map<String, Object> o2) {if(o1.get("topic").equals(o2.get("topic"))) {return ((Integer)o1.get("partition")).compareTo((Integer)o2.get("partition"));}return ((String)o1.get("topic")).compareTo((String)o2.get("topic"));}});}return topicPatitionMapList;}public static void main(String[] args) throws Exception {String bootstrap_servers = "localhost:9092";String groupId = "interface-group-new";String[] topics = null;//{"test1", "test2", test3};KafkaUtil kafkaUtil = new KafkaUtil();String connectionString = "localhost:2181";ZooKeeper zk = kafkaUtil.getZookeeper(connectionString);if (zk == null) {throw new RuntimeException("获取zookeeper连接失败");}List<Map<String, Object>> topicPatitionMapList = kafkaUtil.getLagByGroupAndTopic(zk, bootstrap_servers,groupId, topics, true);for (Map<String, Object> map : topicPatitionMapList) {System.out.println(map);}zk.close();}
}

三、说明

调用时参数topics为空会获取到groupId所有消费过的topic(zookeeper会保存消费过的groupId的offset值)

List<PartitionInfo> partitionsFor = consumer.partitionsFor(topic);

获取到 List<PartitionInfo> 后要尽快查询zookeeper对应的offset,避免由于继续生产消费或时延导致offset > logSize

参考:https://www.aliyun.com/jiaocheng/775267.html

kafka监控获取logSize, offset, lag等信息相关推荐

  1. 【Kafka】Kafka使用代码设置offset值

    1.概述 转载:https://www.cnblogs.com/jinniezheng/p/6379639.html package com.kafka.consumer.offset.update; ...

  2. java 获取kafka lag,聊聊kafka consumer offset lag的监控

    序 本文主要讨论一下kafka consumer offset lag的监控 方案 利用官方的类库 ConsumerOffsetChecker ConsumerGroupCommand 利用官方的JM ...

  3. 【kafka】kafka consumer offset lag获取的三者方式

    1.概述 本文主要讨论一下kafka consumer offset lag的监控 方案 利用官方的类库 ConsumerOffsetChecker ConsumerGroupCommand 利用官方 ...

  4. 聊聊kafka consumer offset lag increase异常

    序 本文主要解析一下遇到的一个kafka consumer offset lag不断增大的异常. 查看consumer消费情况 Group Topic Pid Offset logSize Lag O ...

  5. java查看kafka数据量_Java kafka监控 topic的数据量count情况,每个topic的offset,

    Java使用kafka的API来监控kafka的某些topic的数据量增量,offset,定时查总量之后,然后计算差值,然后就可以算单位间隔的每个topic的增量,kafka监控一般都是监控的吞吐量, ...

  6. 【kafka系列教程41】kafka监控

    Monitoring Kafka uses Yammer Metrics for metrics reporting in both the server and the client. This c ...

  7. DataPipeline |《Apache Kafka实战》作者胡夕:Apache Kafka监控与调优

    胡夕,<Apache Kafka实战>作者,北航计算机硕士毕业,现任某互金公司计算平台总监,曾就职于IBM.搜狗.微博等公司.国内活跃的Kafka代码贡献者. 前言 虽然目前Apache ...

  8. Kafka监控KafkaOffsetMonitor【转】

    1.概述 前面给大家介绍了Kafka的背景以及一些应用场景,并附带上演示了Kafka的简单示例.然后,在开发的过程当中,我们会发现一些问题,那就是消息的监控情况.虽然,在启动Kafka的相关服务后,我 ...

  9. kafka详解(JAVA API操作kafka、kafka原理、kafka监控)-step2

    1.JAVA API操作kafka  修改Windows的Host文件: 目录:C:\Windows\System32\drivers\etc (win10) 内容: 192.168.40.150 k ...

最新文章

  1. vue-devTools插件安装流程
  2. java对象的内存分配
  3. MASM5.0汇编环境安装
  4. Php传图缩图,使用以下用于上传图像的PHP代码上传时缩小图像大小
  5. Starting MySQL.. ERROR! The server quit without updating PID file (/usr/local/mysql/data/vm10-0-0-19
  6. iis7设置html支持asp,Win7下启用IIS7配置ASP运行环境的详细方法
  7. Springboot 2.x 单元测试 JUnit 5
  8. js 日期星期 带农历
  9. KMP算法———模板
  10. 网络篇:朋友面试之TCP/IP,回去等通知吧
  11. oracle 层次查询判断叶子和根节点
  12. 工业大数据发展面临四方面挑战
  13. 【更新】PDF控件Spire.PDF V3.9.463发布 | 修复多个PDF转换bug
  14. ROSCon 2019 机器人操作系统国际盛会
  15. 关于芯片、CPU的区别的简单理解
  16. 信息系统项目管理师必背知识点(完整版)
  17. 利穗IT网络工程师面试试题
  18. 常见的配置中心:Apollo(二)-接入Apollo
  19. Typora使用详解
  20. 机器学习-8(单调函数)

热门文章

  1. tensorboard 拒绝了我们的连接请求
  2. mysql查询总成绩高于240_Egret应用开发实践(02) MVC 模块化 - SegmentFault 思否
  3. Aspose.Words for .NET Crack 23.1.0
  4. 我的iphone6退货之路
  5. 在c语言中pwm的作用,PWM调速的C语言程序编写(非常简单);
  6. MySQL数据库入门实战教程
  7. 凸优化——凸优化问题与算法
  8. 微信开放平台开发第三方授权登陆(二):PC网页端
  9. 【数字IC/FPGA】电平同步、脉冲同步、边沿同步
  10. iphoneX的safari浏览器操作栏隐藏时兼容底部Home键