消费kafka消息时,有时可能需要消费某个时间段的消息,写个demo记录下:

public class KafkaConsumerByTime {public static void main(String[] args) throws Exception {String topic = "start_log";String startTime = "2019-11-07 10:26:00";String endTime = "2019-11-07 10:28:00";Properties  kafkaProp = new Properties();kafkaProp.put("bootstrap.servers", "localhost:9092");kafkaProp.put("group.id", "testByTime");kafkaProp.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");kafkaProp.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumerByTime task = new KafkaConsumerByTime(topic, kafkaProp, startTime, endTime);task.doTask();}private static final String PATTERN = "yyyy-MM-dd HH:mm:ss";/*** Pattern:yyyy-MM-dd HH:mm:ss*/private String startTime;/*** Pattern:yyyy-MM-dd HH:mm:ss*/private String endTime;/*** kafka配置*/private Properties kafkaProp;private String topic;private boolean isCancel = false;public KafkaConsumerByTime(String topic, Properties kafkaProp, String startTime, String endTime) {this.topic = topic;this.kafkaProp = kafkaProp;this.startTime = startTime;this.endTime = endTime;}public void doTask() throws Exception{long sTime = DateUtils.parseDate(startTime, PATTERN).getTime();long eTime =  DateUtils.parseDate(endTime, PATTERN).getTime();Map<TopicPartition, OffsetAndTimestamp> startOffsetMap = KafkaUtil.fetchOffsetsWithTimestamp(topic, sTime, kafkaProp);try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProp)) {consumer.assign(startOffsetMap.keySet());startOffsetMap.forEach((k, v) -> {consumer.seek(k, v.offset());System.out.println(topic + ":partition:" + k + ", offsets:" + v.offset());});System.out.println("开始消费");while(!isCancel) {System.out.println("循环...");ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {if (eTime == 0 || record.timestamp() <= eTime) {// doSomethingSystem.out.printf("offset = %d,p = %d, timestamp = %d key = %s, value = %s \r\n", record.offset(), record.partition(), record.timestamp(), record.key(), record.value());}}}System.out.println("结束消费");}catch (Exception e) {e.printStackTrace();}}public void setCancel(boolean cancel) {isCancel = cancel;}
}

kafka设置起止时间消费消息相关推荐

  1. superset设置起止时间为明天

    使用superset的时候发现 修改时间总是从昨天开始 就很郁闷 这什么设定啊 然后通过查询源码,找到了修改方式 首先找到superset的安装目录 /root/anaconda3/envs/supe ...

  2. 利用Kafka发送/消费消息-Java示例

    利用Kafka发送/消费消息-Java示例 当使用命令行工具把基本的组件运行起来后,再使用Java client就很简单,这里是入门的第一个Java客户端程序,有很多需要深入理解的地方. 依赖配置 & ...

  3. kafka comsumer消费消息后不commit offset的情况分析

    kakfa用offset来记录某消费者消费到的位置,由于kafka是个分布式结构,数据被存放在多个partition上,那么要为每个partition单独记录一个offset,该offset保存在一个 ...

  4. Kafka消费消息自动提交与手动提交

    消费者poll消息得过程(poll的意思是从broker拿消息,并不代表拿到就消费成功了) 消费者建立了与broker之间的⻓连接,开始poll消息. 默认一次poll 500条消息 props.pu ...

  5. kafka Java客户端之 consumer API 消费消息

    背景:我使用docker-compose 搭建的kafka服务 kafka的简单介绍以及docker-compose部署单主机Kafka集群 使用consumer API消费指定Topic里面的消息 ...

  6. kafka如何消费消息

    转载自:http://generalcode.cn/archives/255 消费者与消费组 假设这么个场景:我们从Kafka中读取消息,并且进行检查,最后产生结果数据.我们可以创建一个消费者实例去做 ...

  7. kafka 基础概念、命令行操作(查看所有topic、创建topic、删除topic、查看某个Topic的详情、修改分区数、发送消息、消费消息、 查看消费者组 、更新消费者的偏移位置)

    文章目录 前言 1. 基础概念 Broker Producer Consumer Consumer Group Topic Partition Replica 2. 命令行操作 2.1 查看所有top ...

  8. Kafka学习(十)--Kafka消费者Consumer消费消息配置实战

    一. Kafka消费者Consumer消费消息配置实战 配置: public static Properties getProperties() {Properties props = new Pro ...

  9. weblogic jms消息 删除_利用 Kafka 设置可靠的高性能分布式消息传递基础架构

    世界已经迈进"移动"时代,现在应用程序必须能够实时提供数据,这不仅包括数据库表中存储的重要最终结果,还包括用户使用应用程序时执行的所有操作.任何可用信息,例如,用户点击量.日志数据 ...

最新文章

  1. 电子学会青少年编程等级考试Python一级题目解析11
  2. wpf Command 携带当前窗口
  3. 教你如何区分描述统计学与推断统计学
  4. 为什么要用内插字符串代替string.format
  5. 普通技术人员如何快速成长为合格的CTO
  6. python xml实例_python解析xml文档实例
  7. 对996最客观的描述,一叶知秋
  8. c语言提高,C语言提高-day2
  9. 科普 | USB4的全面解读
  10. 软件测试——集成测试篇
  11. 资源共享(不限领域,持续更新)
  12. 东财《组织行为学B》综合作业
  13. Flowable工作流引擎表用途整理
  14. Excel将两个图片合并为一张
  15. 后台管理----首页布局分析1
  16. Domino Web网页中更改密码比你想得简单得多
  17. 后缀名为jnlp的文件的打开方式
  18. TextView.setText()为什么会出错
  19. c语言程序设计教程南京大学出版社答案,《新编C语言程序设计教程》习题解答与实验指导...
  20. 安全的哲学思辨 - 从Facebook ATO 漏洞到区块链安全事件

热门文章

  1. android Criteria
  2. 什么是Java集合?
  3. Ubuntu16使用小米WIFI做为AP
  4. 在PHP中全面阻止SQL注入式攻击之三
  5. python3 安装 pip (PyPI)
  6. 单精度、双精度和半精度浮点格式之间的区别
  7. Java基础篇:八大基本数据类型
  8. 备份与还原相关的基本概念
  9. 去除字符串前面的几个逗号
  10. 卷积和反卷积(deconv)