java kafka 拉取_java获取kafka consumer lag
maven依赖
org.apache.kafka
kafka-clients
0.10.1.0
注意:kafka-clients版本需要0.10.1.0以上,因为调用了新增接口endOffsets;
lag=logsize-offset
logsize通过consumer的endOffsets接口获得;offset通过consumer的committed接口获得;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
public class KafkaConsumeLagMonitor {
public static Properties getConsumeProperties(String groupID, String bootstrap_server) {
Properties props = new Properties();
props.put("group.id", groupID);
props.put("bootstrap.servers", bootstrap_server);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
public static void main(String[] args) {
String bootstrap_server = args[0];
String groupID = args[1];
String topic = args[2];
Map endOffsetMap = new HashMap();
Map commitOffsetMap = new HashMap();
Properties consumeProps = getConsumeProperties(groupID, bootstrap_server);
System.out.println("consumer properties:" + consumeProps);
//查询topic partitions
KafkaConsumer consumer = new KafkaConsumer(consumeProps);
List topicPartitions = new ArrayList();
List partitionsFor = consumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionsFor) {
TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
topicPartitions.add(topicPartition);
}
//查询log size
Map endOffsets = consumer.endOffsets(topicPartitions);
for (TopicPartition partitionInfo : endOffsets.keySet()) {
endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo));
}
for (Integer partitionId : endOffsetMap.keySet()) {
System.out.println(String.format("at %s, topic:%s, partition:%s, logSize:%s", System.currentTimeMillis(), topic, partitionId, endOffsetMap.get(partitionId)));
}
//查询消费offset
for (TopicPartition topicAndPartition : topicPartitions) {
OffsetAndMetadata committed = consumer.committed(topicAndPartition);
commitOffsetMap.put(topicAndPartition.partition(), committed.offset());
}
//累加lag
long lagSum = 0l;
if (endOffsetMap.size() == commitOffsetMap.size()) {
for (Integer partition : endOffsetMap.keySet()) {
long endOffSet = endOffsetMap.get(partition);
long commitOffSet = commitOffsetMap.get(partition);
long diffOffset = endOffSet - commitOffSet;
lagSum += diffOffset;
System.out.println("Topic:" + topic + ", groupID:" + groupID + ", partition:" + partition + ", endOffset:" + endOffSet + ", commitOffset:" + commitOffSet + ", diffOffset:" + diffOffset);
}
System.out.println("Topic:" + topic + ", groupID:" + groupID + ", LAG:" + lagSum);
} else {
System.out.println("this topic partitions lost");
}
consumer.close();
}
}
另外一个思路可参考kafka源码kafka.tools.ConsumerOffsetChecker实现,offset直接读取 zk节点内容,logsize通过consumer的getOffsetsBefore方法获取,整体来说,较麻烦;
java kafka 拉取_java获取kafka consumer lag相关推荐
- git 拉取和获取 pull 和 fetch 区别
使用Git 直接提交的话 直接 push 获取最新版本 有两种 拉取 和 获取 pull 和 fetch git pull 从远程拉取最新版本 到本地 自动合并 merge ...
- kafka拉取mysql数据库_kafka里信息用flink获取后放入mysql
1. 安装zookeeper, kafka 2. 启动zookeeper, kafka server 3. 准备工作 在Mysql数据库创建一个table, t_student 加入maven需要的f ...
- Kafka拉取某一个时间段內的消息
一般来说我们都使用Kafka来记录用户的操作记录以便后续分析. 但是通常使用的时候需要按天来统计每天的去重用户数.点击量之类的. 这个时候如果直接拉某个topic的数据的话,就需要判断每个消息的时间戳 ...
- 如果可以,我想并行消费Kafka拉取的数据库Binlog
关注 "Java艺术" 我们一起成长! 本篇所述内容只在试用阶段,可行性有待考验! 笔者在上一篇提到:由于Binlog需要顺序消费,所以阿里数据订阅服务DTS只将Binlog放入t ...
- python调用kafka拉取数据失败_无法使用kafkapython从另一个容器向Kafka容器发出请求...
环境:services: zookeeper: image: wurstmeister/zookeeper ports: - 2181 kafka: image: wurstmeister/kafka ...
- java生产者消费者代码_Java实现Kafka生产者消费者代码实例
Kafka的结构与RabbitMQ类似,消息生产者向Kafka服务器发送消息,Kafka接收消息后,再投递给消费者. 生产者的消费会被发送到Topic中,Topic中保存着各类数据,每一条数据都使用键 ...
- java 获取秒数_Java获取精确到秒的时间戳(转)
1.时间戳简介: 时间戳的定义:通常是一个字符序列,唯一地标识某一刻的时间.数字时间戳技术是数字签名技术一种变种的应用.是指格林威治时间1970年01月01日00时00分00秒(北京时间1970年01 ...
- java中的随机数_Java获取随机数
随机数在实际中使用很广泛,比如要随即生成一个固定长度的字符串.数字.或者随即生成一个不定长度的数字.或者进行一个模拟的随机选择等等.Java提供了最基本的工具,可以帮助开发者来实现这一切. 一.Jav ...
- java 文件md5校验_Java 获取 文件md5校验码
讯雷下载的核心思想是校验文件的md5值,两个文件若md5相同则为同一文件. 当得到用户下载某个文件的请求后它根据数据库中保留的文件md5比对出拥有此文件的url, 将用户请求挂接到此url上并仿造一个 ...
最新文章
- Ubuntu_Win10双系统互换注意事项以及蓝屏解决方案
- 泛化,过拟合,欠拟合素材(part2)--机器学习入门之道
- codeforces1453 E. Dog Snacks
- ffmpeg.exe 笔记
- Java笔记-DH密钥交换获取密钥及AES加解密
- 使用了 23 年的 Java 不再免费!
- 硅谷公司:我们称他们为软件工程师,而非打工人
- 单片机c语言期末考试题(a)的答案,单片机C语言期末考试题(A).doc
- Mac 更改Apache文件系统目录
- 分享12306全自动验证码识别提交,春运抢票准备时
- 为什么要分层?数据仓库分层架构深度讲解
- java拼音查询_Java汉字获取拼音、笔划、偏旁部首
- (附源码)springboot青少年公共卫生教育平台 毕业设计 643214
- http://nianjian.xiaze.com/tags.php?/%E5%B9%BF%E5%B7%9E%E7%BB%8F%E6%B5%8E%E5%B9%B4%E9%89%B4/1/1360241
- 0x120-从头开始写操作系统-启动扇区与内存的关系及内存寻址的应用
- 【C语言练习】求名次、找凶手
- RABBIT API (随机ACG图片接口推荐)
- C语言实现逆波兰表达式计算函数(含浮点型、整型混合运算)
- Android Retrofit通过OkHttp设置Interceptor拦截器统一打印请求报文及返回报文
- RabbitMQ 延迟队列详解
热门文章
- 首次落地中国大陆的OpenInfra:中国对于开源做出的贡献力量已不可忽视
- 华为面试改革,我们该怎么跟进?
- 我那么拼命,为什么还会被裁掉?
- Cloud一分钟|茅台4.5亿入股云上贵州大数据,后者已接管苹果中国iCloud; 阿里云进入印度市场,增长速度远超当地平均水平...
- 数组正遍历,数组倒遍历
- ORA-01858: 在要求输入数字处找到非数字字符 13行
- 工作流实战_20_flowable 任务签收 反签收
- 软件设计师 - 算法思想
- Vue 实现 Open Graph 分享预览
- qt中根据数据解析的结果动态的创建控件并布局