一般来说我们都使用Kafka来记录用户的操作记录以便后续分析。
但是通常使用的时候需要按天来统计每天的去重用户数、点击量之类的。
这个时候如果直接拉某个topic的数据的话,就需要判断每个消息的时间戳,还要兼顾把所有的Partition都拉完才能保证数据的完整。
因此如果能只拉取某一个时间段内的消息,就能极大的简化后续的处理逻辑。

拉取时段内消息实现

为了实现这个目的借助于根据时间戳获取Partition内部偏移的方法,获取两个时间点在Partition内部的偏移,然后从第一个时间点的偏移开始拉取指定Partition的消息,当偏移超过第二个时间点的偏移的时候取消订阅。逐个partition操作拉全topic所有的数据。

实验例子,python+confluence kafka
具体代码如下:

#coding=utf8from confluent_kafka import Consumer, KafkaError, TopicPartition, Message
import datetimeconf = {'bootstrap.servers': 'xxx','group.id': 'xxx','session.timeout.ms': 6000,'security.protocol': 'SASL_PLAINTEXT','sasl.mechanism' : 'PLAIN','sasl.username': 'xxx','sasl.password': 'xxx','auto.offset.reset': 'earliest'
}topic = 'topic'consumer = Consumer(conf)# 拉取昨天一天的数据,start_time、end_time这两个时间可以随便设置
now = datetime.datetime.now() - datetime.timedelta(days=1)
start_time = datetime.datetime.strptime(now.strftime('%Y-%m-%d 00:00:00'),'%Y-%m-%d %H:%M:%S')
end_time = datetime.datetime.strptime(now.strftime('%Y-%m-%d 23:59:59'),'%Y-%m-%d %H:%M:%S')# 5 是partition的数量
for index in range(5):# 查询开始时间的针对于某个partition的偏移start_tps = [TopicPartition(topic, index, int(start_time.timestamp() * 1000))]start_offset = consumer.offsets_for_times(start_tps)# 查询结束时间的针对于某个partition的偏移end_tps = [TopicPartition(topic, index, int(end_time.timestamp() * 1000))]end_offset = consumer.offsets_for_times(end_tps)# 从拉取指定partition的offset开始拉取数据consumer.assign(start_offset)while True:try:msg = consumer.poll(1.0)if msg == None:breakoffset = msg.offset()if offset > end_offset[0].offset:# 如果超过当前partition的偏移之后不再继续订阅当前的topicconsumer.unassign()breakpassexcept:pass

Kafka拉取某一个时间段內的消息相关推荐

  1. 如果可以,我想并行消费Kafka拉取的数据库Binlog

    关注 "Java艺术" 我们一起成长! 本篇所述内容只在试用阶段,可行性有待考验! 笔者在上一篇提到:由于Binlog需要顺序消费,所以阿里数据订阅服务DTS只将Binlog放入t ...

  2. python调用kafka拉取数据失败_无法使用kafkapython从另一个容器向Kafka容器发出请求...

    环境:services: zookeeper: image: wurstmeister/zookeeper ports: - 2181 kafka: image: wurstmeister/kafka ...

  3. java kafka 拉取_java获取kafka consumer lag

    maven依赖 org.apache.kafka kafka-clients 0.10.1.0 注意:kafka-clients版本需要0.10.1.0以上,因为调用了新增接口endOffsets; ...

  4. kafka拉取mysql数据库_kafka里信息用flink获取后放入mysql

    1. 安装zookeeper, kafka 2. 启动zookeeper, kafka server 3. 准备工作 在Mysql数据库创建一个table, t_student 加入maven需要的f ...

  5. kafka consumer配置拉取速度慢_Kafka消费者的使用和原理

    这周我们学习下消费者,仍然还是先从一个消费者的Hello World学起: public class Consumer { public static void main(String[] args) ...

  6. kafka中kafkaConsumer的poll拉取方法

    1.poll()方法里传的是时间(ms),而不是Kafka返回的记录条数. 2.Kafka轮询一次就相当于拉取(poll)一定时间段broker中可消费的数据, 在这个指定时间段里拉取,时间到了就立刻 ...

  7. RocketMQ:Consumer概述及启动流程与消息拉取源码分析

    文章目录 Consumer 概述 消费者核心类 消费者启动流程 消息拉取 PullMessageService实现机制 ProcessQueue实现机制 消息拉取基本流程 客户端发起消息拉取请求 消息 ...

  8. 从源码分析RocketMQ系列-消息拉取PullMessageProcessor详解

    导语   在之前的分析中分析了关于SendMessageProcessor,并且提供了对应的源码分析分析对于消息持久化的问题,下面来看另外一个PullMessageProcessor,在RocketM ...

  9. Consumer消息拉取和消费流程分析

    1. 前言 MQConsumer是RocketMQ提供的消费者接口,从接口定义上可以看到,它主要的功能是订阅感兴趣的Topic.注册消息监听器.启动生产者开始消费消息. ​ 消费者获取消息的模式有两种 ...

最新文章

  1. Activiti Explorer 源码浅析
  2. php中this,self,parent三个关键字之间的区别(转载)
  3. 什么是C语言中的条件编译?
  4. html制作一个简单的邮件,使用HTML编写简单的邮件模版
  5. bootstrap3 表单构建器_FastReport.NET报表设计器连接到OracleDB关系数据库
  6. Zabbix的简单使用
  7. java hibernate 多对多_java - hibernate多对多问题
  8. Vue-cli3配置教程入门
  9. C#调用C++编写的COM DLL
  10. join丢失数据_15、Hive数据倾斜与解决方案
  11. Android--建立能与访问者进行相互通信的本地服务
  12. Git入门(本地使用)
  13. S7-200SMART PLC通过RS485接口与西门子SMART LINE系列触摸屏通信具体步骤
  14. 我为大家分享永久免费空间 云专家
  15. 从零到一制作个人网站
  16. Mac配置item2高亮颜色
  17. 物联网为什么需要5G?
  18. 篮球比赛分组问题(动态规划)
  19. Fortran写nc文件nbsp;f90nbsp;netcdf
  20. FormData用法详解

热门文章

  1. 【风马一族_php】常用的语句
  2. BUPT OJ143 Triangle
  3. 【转载】上帝粒子证实存在宇宙末日来临?(图)
  4. Python 爬虫心得
  5. jQuery源码分析系列(一)初识jQuery
  6. 计算机组成原理笔记(王道考研) 第七章:输入输出系统
  7. 教程篇(7.0) 06. FortiGate基础架构 单点登录(FSSO) ❀ Fortinet 网络安全专家 NSE 4
  8. 清音驱腐启鸿蒙,竹韵清音-格律诗词41期
  9. 程序员们一个一个的都挺神的,堪称 35 岁毕业之后再就业的标兵,不服不行
  10. 泰拉瑞亚自建服务器,泰拉瑞亚1.4版本服务器创建教程