maven依赖

org.apache.kafka

kafka-clients

0.10.1.0

注意:kafka-clients版本需要0.10.1.0以上,因为调用了新增接口endOffsets;

lag=logsize-offset

logsize通过consumer的endOffsets接口获得;offset通过consumer的committed接口获得;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;

import org.apache.kafka.common.PartitionInfo;

import org.apache.kafka.common.TopicPartition;

public class KafkaConsumeLagMonitor {

public static Properties getConsumeProperties(String groupID, String bootstrap_server) {

Properties props = new Properties();

props.put("group.id", groupID);

props.put("bootstrap.servers", bootstrap_server);

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

return props;

}

public static void main(String[] args) {

String bootstrap_server = args[0];

String groupID = args[1];

String topic = args[2];

Map endOffsetMap = new HashMap();

Map commitOffsetMap = new HashMap();

Properties consumeProps = getConsumeProperties(groupID, bootstrap_server);

System.out.println("consumer properties:" + consumeProps);

//查询topic partitions

KafkaConsumer consumer = new KafkaConsumer(consumeProps);

List topicPartitions = new ArrayList();

List partitionsFor = consumer.partitionsFor(topic);

for (PartitionInfo partitionInfo : partitionsFor) {

TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());

topicPartitions.add(topicPartition);

}

//查询log size

Map endOffsets = consumer.endOffsets(topicPartitions);

for (TopicPartition partitionInfo : endOffsets.keySet()) {

endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo));

}

for (Integer partitionId : endOffsetMap.keySet()) {

System.out.println(String.format("at %s, topic:%s, partition:%s, logSize:%s", System.currentTimeMillis(), topic, partitionId, endOffsetMap.get(partitionId)));

}

//查询消费offset

for (TopicPartition topicAndPartition : topicPartitions) {

OffsetAndMetadata committed = consumer.committed(topicAndPartition);

commitOffsetMap.put(topicAndPartition.partition(), committed.offset());

}

//累加lag

long lagSum = 0l;

if (endOffsetMap.size() == commitOffsetMap.size()) {

for (Integer partition : endOffsetMap.keySet()) {

long endOffSet = endOffsetMap.get(partition);

long commitOffSet = commitOffsetMap.get(partition);

long diffOffset = endOffSet - commitOffSet;

lagSum += diffOffset;

System.out.println("Topic:" + topic + ", groupID:" + groupID + ", partition:" + partition + ", endOffset:" + endOffSet + ", commitOffset:" + commitOffSet + ", diffOffset:" + diffOffset);

}

System.out.println("Topic:" + topic + ", groupID:" + groupID + ", LAG:" + lagSum);

} else {

System.out.println("this topic partitions lost");

}

consumer.close();

}

}

另外一个思路可参考kafka源码kafka.tools.ConsumerOffsetChecker实现,offset直接读取 zk节点内容,logsize通过consumer的getOffsetsBefore方法获取,整体来说,较麻烦;

java kafka 拉取_java获取kafka consumer lag相关推荐

  1. git 拉取和获取 pull 和 fetch 区别

    使用Git  直接提交的话   直接 push 获取最新版本  有两种  拉取 和 获取 pull 和 fetch git  pull     从远程拉取最新版本 到本地  自动合并 merge   ...

  2. kafka拉取mysql数据库_kafka里信息用flink获取后放入mysql

    1. 安装zookeeper, kafka 2. 启动zookeeper, kafka server 3. 准备工作 在Mysql数据库创建一个table, t_student 加入maven需要的f ...

  3. Kafka拉取某一个时间段內的消息

    一般来说我们都使用Kafka来记录用户的操作记录以便后续分析. 但是通常使用的时候需要按天来统计每天的去重用户数.点击量之类的. 这个时候如果直接拉某个topic的数据的话,就需要判断每个消息的时间戳 ...

  4. 如果可以,我想并行消费Kafka拉取的数据库Binlog

    关注 "Java艺术" 我们一起成长! 本篇所述内容只在试用阶段,可行性有待考验! 笔者在上一篇提到:由于Binlog需要顺序消费,所以阿里数据订阅服务DTS只将Binlog放入t ...

  5. python调用kafka拉取数据失败_无法使用kafkapython从另一个容器向Kafka容器发出请求...

    环境:services: zookeeper: image: wurstmeister/zookeeper ports: - 2181 kafka: image: wurstmeister/kafka ...

  6. java生产者消费者代码_Java实现Kafka生产者消费者代码实例

    Kafka的结构与RabbitMQ类似,消息生产者向Kafka服务器发送消息,Kafka接收消息后,再投递给消费者. 生产者的消费会被发送到Topic中,Topic中保存着各类数据,每一条数据都使用键 ...

  7. java 获取秒数_Java获取精确到秒的时间戳(转)

    1.时间戳简介: 时间戳的定义:通常是一个字符序列,唯一地标识某一刻的时间.数字时间戳技术是数字签名技术一种变种的应用.是指格林威治时间1970年01月01日00时00分00秒(北京时间1970年01 ...

  8. java中的随机数_Java获取随机数

    随机数在实际中使用很广泛,比如要随即生成一个固定长度的字符串.数字.或者随即生成一个不定长度的数字.或者进行一个模拟的随机选择等等.Java提供了最基本的工具,可以帮助开发者来实现这一切. 一.Jav ...

  9. java 文件md5校验_Java 获取 文件md5校验码

    讯雷下载的核心思想是校验文件的md5值,两个文件若md5相同则为同一文件. 当得到用户下载某个文件的请求后它根据数据库中保留的文件md5比对出拥有此文件的url, 将用户请求挂接到此url上并仿造一个 ...

最新文章

  1. Ubuntu_Win10双系统互换注意事项以及蓝屏解决方案
  2. 泛化,过拟合,欠拟合素材(part2)--机器学习入门之道
  3. codeforces1453 E. Dog Snacks
  4. ffmpeg.exe 笔记
  5. Java笔记-DH密钥交换获取密钥及AES加解密
  6. 使用了 23 年的 Java 不再免费!
  7. 硅谷公司:我们称他们为软件工程师,而非打工人
  8. 单片机c语言期末考试题(a)的答案,单片机C语言期末考试题(A).doc
  9. Mac 更改Apache文件系统目录
  10. 分享12306全自动验证码识别提交,春运抢票准备时
  11. 为什么要分层?数据仓库分层架构深度讲解
  12. java拼音查询_Java汉字获取拼音、笔划、偏旁部首
  13. (附源码)springboot青少年公共卫生教育平台 毕业设计 643214
  14. http://nianjian.xiaze.com/tags.php?/%E5%B9%BF%E5%B7%9E%E7%BB%8F%E6%B5%8E%E5%B9%B4%E9%89%B4/1/1360241
  15. 0x120-从头开始写操作系统-启动扇区与内存的关系及内存寻址的应用
  16. 【C语言练习】求名次、找凶手
  17. RABBIT API (随机ACG图片接口推荐)
  18. C语言实现逆波兰表达式计算函数(含浮点型、整型混合运算)
  19. Android Retrofit通过OkHttp设置Interceptor拦截器统一打印请求报文及返回报文
  20. RabbitMQ 延迟队列详解

热门文章

  1. 首次落地中国大陆的OpenInfra:中国对于开源做出的贡献力量已不可忽视
  2. 华为面试改革,我们该怎么跟进?
  3. 我那么拼命,为什么还会被裁掉?
  4. Cloud一分钟|茅台4.5亿入股云上贵州大数据,后者已接管苹果中国iCloud; 阿里云进入印度市场,增长速度远超当地平均水平...
  5. 数组正遍历,数组倒遍历
  6. ORA-01858: 在要求输入数字处找到非数字字符 13行
  7. 工作流实战_20_flowable 任务签收 反签收
  8. 软件设计师 - 算法思想
  9. Vue 实现 Open Graph 分享预览
  10. qt中根据数据解析的结果动态的创建控件并布局