kafka设置起止时间消费消息
消费kafka消息时,有时可能需要消费某个时间段的消息,写个demo记录下:
public class KafkaConsumerByTime {public static void main(String[] args) throws Exception {String topic = "start_log";String startTime = "2019-11-07 10:26:00";String endTime = "2019-11-07 10:28:00";Properties kafkaProp = new Properties();kafkaProp.put("bootstrap.servers", "localhost:9092");kafkaProp.put("group.id", "testByTime");kafkaProp.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");kafkaProp.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumerByTime task = new KafkaConsumerByTime(topic, kafkaProp, startTime, endTime);task.doTask();}private static final String PATTERN = "yyyy-MM-dd HH:mm:ss";/*** Pattern:yyyy-MM-dd HH:mm:ss*/private String startTime;/*** Pattern:yyyy-MM-dd HH:mm:ss*/private String endTime;/*** kafka配置*/private Properties kafkaProp;private String topic;private boolean isCancel = false;public KafkaConsumerByTime(String topic, Properties kafkaProp, String startTime, String endTime) {this.topic = topic;this.kafkaProp = kafkaProp;this.startTime = startTime;this.endTime = endTime;}public void doTask() throws Exception{long sTime = DateUtils.parseDate(startTime, PATTERN).getTime();long eTime = DateUtils.parseDate(endTime, PATTERN).getTime();Map<TopicPartition, OffsetAndTimestamp> startOffsetMap = KafkaUtil.fetchOffsetsWithTimestamp(topic, sTime, kafkaProp);try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProp)) {consumer.assign(startOffsetMap.keySet());startOffsetMap.forEach((k, v) -> {consumer.seek(k, v.offset());System.out.println(topic + ":partition:" + k + ", offsets:" + v.offset());});System.out.println("开始消费");while(!isCancel) {System.out.println("循环...");ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {if (eTime == 0 || record.timestamp() <= eTime) {// doSomethingSystem.out.printf("offset = %d,p = %d, timestamp = %d key = %s, value = %s \r\n", record.offset(), record.partition(), record.timestamp(), record.key(), record.value());}}}System.out.println("结束消费");}catch (Exception e) {e.printStackTrace();}}public void setCancel(boolean cancel) {isCancel = cancel;}
}
kafka设置起止时间消费消息相关推荐
- superset设置起止时间为明天
使用superset的时候发现 修改时间总是从昨天开始 就很郁闷 这什么设定啊 然后通过查询源码,找到了修改方式 首先找到superset的安装目录 /root/anaconda3/envs/supe ...
- 利用Kafka发送/消费消息-Java示例
利用Kafka发送/消费消息-Java示例 当使用命令行工具把基本的组件运行起来后,再使用Java client就很简单,这里是入门的第一个Java客户端程序,有很多需要深入理解的地方. 依赖配置 & ...
- kafka comsumer消费消息后不commit offset的情况分析
kakfa用offset来记录某消费者消费到的位置,由于kafka是个分布式结构,数据被存放在多个partition上,那么要为每个partition单独记录一个offset,该offset保存在一个 ...
- Kafka消费消息自动提交与手动提交
消费者poll消息得过程(poll的意思是从broker拿消息,并不代表拿到就消费成功了) 消费者建立了与broker之间的⻓连接,开始poll消息. 默认一次poll 500条消息 props.pu ...
- kafka Java客户端之 consumer API 消费消息
背景:我使用docker-compose 搭建的kafka服务 kafka的简单介绍以及docker-compose部署单主机Kafka集群 使用consumer API消费指定Topic里面的消息 ...
- kafka如何消费消息
转载自:http://generalcode.cn/archives/255 消费者与消费组 假设这么个场景:我们从Kafka中读取消息,并且进行检查,最后产生结果数据.我们可以创建一个消费者实例去做 ...
- kafka 基础概念、命令行操作(查看所有topic、创建topic、删除topic、查看某个Topic的详情、修改分区数、发送消息、消费消息、 查看消费者组 、更新消费者的偏移位置)
文章目录 前言 1. 基础概念 Broker Producer Consumer Consumer Group Topic Partition Replica 2. 命令行操作 2.1 查看所有top ...
- Kafka学习(十)--Kafka消费者Consumer消费消息配置实战
一. Kafka消费者Consumer消费消息配置实战 配置: public static Properties getProperties() {Properties props = new Pro ...
- weblogic jms消息 删除_利用 Kafka 设置可靠的高性能分布式消息传递基础架构
世界已经迈进"移动"时代,现在应用程序必须能够实时提供数据,这不仅包括数据库表中存储的重要最终结果,还包括用户使用应用程序时执行的所有操作.任何可用信息,例如,用户点击量.日志数据 ...
最新文章
- 电子学会青少年编程等级考试Python一级题目解析11
- wpf Command 携带当前窗口
- 教你如何区分描述统计学与推断统计学
- 为什么要用内插字符串代替string.format
- 普通技术人员如何快速成长为合格的CTO
- python xml实例_python解析xml文档实例
- 对996最客观的描述,一叶知秋
- c语言提高,C语言提高-day2
- 科普 | USB4的全面解读
- 软件测试——集成测试篇
- 资源共享(不限领域,持续更新)
- 东财《组织行为学B》综合作业
- Flowable工作流引擎表用途整理
- Excel将两个图片合并为一张
- 后台管理----首页布局分析1
- Domino Web网页中更改密码比你想得简单得多
- 后缀名为jnlp的文件的打开方式
- TextView.setText()为什么会出错
- c语言程序设计教程南京大学出版社答案,《新编C语言程序设计教程》习题解答与实验指导...
- 安全的哲学思辨 - 从Facebook ATO 漏洞到区块链安全事件