Kafka拉取某一个时间段內的消息
一般来说我们都使用Kafka来记录用户的操作记录以便后续分析。
但是通常使用的时候需要按天来统计每天的去重用户数、点击量之类的。
这个时候如果直接拉某个topic的数据的话,就需要判断每个消息的时间戳,还要兼顾把所有的Partition都拉完才能保证数据的完整。
因此如果能只拉取某一个时间段内的消息,就能极大的简化后续的处理逻辑。
拉取时段内消息实现
为了实现这个目的借助于根据时间戳获取Partition内部偏移的方法,获取两个时间点在Partition内部的偏移,然后从第一个时间点的偏移开始拉取指定Partition的消息,当偏移超过第二个时间点的偏移的时候取消订阅。逐个partition操作拉全topic所有的数据。
实验例子,python+confluence kafka
具体代码如下:
#coding=utf8from confluent_kafka import Consumer, KafkaError, TopicPartition, Message
import datetimeconf = {'bootstrap.servers': 'xxx','group.id': 'xxx','session.timeout.ms': 6000,'security.protocol': 'SASL_PLAINTEXT','sasl.mechanism' : 'PLAIN','sasl.username': 'xxx','sasl.password': 'xxx','auto.offset.reset': 'earliest'
}topic = 'topic'consumer = Consumer(conf)# 拉取昨天一天的数据,start_time、end_time这两个时间可以随便设置
now = datetime.datetime.now() - datetime.timedelta(days=1)
start_time = datetime.datetime.strptime(now.strftime('%Y-%m-%d 00:00:00'),'%Y-%m-%d %H:%M:%S')
end_time = datetime.datetime.strptime(now.strftime('%Y-%m-%d 23:59:59'),'%Y-%m-%d %H:%M:%S')# 5 是partition的数量
for index in range(5):# 查询开始时间的针对于某个partition的偏移start_tps = [TopicPartition(topic, index, int(start_time.timestamp() * 1000))]start_offset = consumer.offsets_for_times(start_tps)# 查询结束时间的针对于某个partition的偏移end_tps = [TopicPartition(topic, index, int(end_time.timestamp() * 1000))]end_offset = consumer.offsets_for_times(end_tps)# 从拉取指定partition的offset开始拉取数据consumer.assign(start_offset)while True:try:msg = consumer.poll(1.0)if msg == None:breakoffset = msg.offset()if offset > end_offset[0].offset:# 如果超过当前partition的偏移之后不再继续订阅当前的topicconsumer.unassign()breakpassexcept:pass
Kafka拉取某一个时间段內的消息相关推荐
- 如果可以,我想并行消费Kafka拉取的数据库Binlog
关注 "Java艺术" 我们一起成长! 本篇所述内容只在试用阶段,可行性有待考验! 笔者在上一篇提到:由于Binlog需要顺序消费,所以阿里数据订阅服务DTS只将Binlog放入t ...
- python调用kafka拉取数据失败_无法使用kafkapython从另一个容器向Kafka容器发出请求...
环境:services: zookeeper: image: wurstmeister/zookeeper ports: - 2181 kafka: image: wurstmeister/kafka ...
- java kafka 拉取_java获取kafka consumer lag
maven依赖 org.apache.kafka kafka-clients 0.10.1.0 注意:kafka-clients版本需要0.10.1.0以上,因为调用了新增接口endOffsets; ...
- kafka拉取mysql数据库_kafka里信息用flink获取后放入mysql
1. 安装zookeeper, kafka 2. 启动zookeeper, kafka server 3. 准备工作 在Mysql数据库创建一个table, t_student 加入maven需要的f ...
- kafka consumer配置拉取速度慢_Kafka消费者的使用和原理
这周我们学习下消费者,仍然还是先从一个消费者的Hello World学起: public class Consumer { public static void main(String[] args) ...
- kafka中kafkaConsumer的poll拉取方法
1.poll()方法里传的是时间(ms),而不是Kafka返回的记录条数. 2.Kafka轮询一次就相当于拉取(poll)一定时间段broker中可消费的数据, 在这个指定时间段里拉取,时间到了就立刻 ...
- RocketMQ:Consumer概述及启动流程与消息拉取源码分析
文章目录 Consumer 概述 消费者核心类 消费者启动流程 消息拉取 PullMessageService实现机制 ProcessQueue实现机制 消息拉取基本流程 客户端发起消息拉取请求 消息 ...
- 从源码分析RocketMQ系列-消息拉取PullMessageProcessor详解
导语 在之前的分析中分析了关于SendMessageProcessor,并且提供了对应的源码分析分析对于消息持久化的问题,下面来看另外一个PullMessageProcessor,在RocketM ...
- Consumer消息拉取和消费流程分析
1. 前言 MQConsumer是RocketMQ提供的消费者接口,从接口定义上可以看到,它主要的功能是订阅感兴趣的Topic.注册消息监听器.启动生产者开始消费消息. 消费者获取消息的模式有两种 ...
最新文章
- Activiti Explorer 源码浅析
- php中this,self,parent三个关键字之间的区别(转载)
- 什么是C语言中的条件编译?
- html制作一个简单的邮件,使用HTML编写简单的邮件模版
- bootstrap3 表单构建器_FastReport.NET报表设计器连接到OracleDB关系数据库
- Zabbix的简单使用
- java hibernate 多对多_java - hibernate多对多问题
- Vue-cli3配置教程入门
- C#调用C++编写的COM DLL
- join丢失数据_15、Hive数据倾斜与解决方案
- Android--建立能与访问者进行相互通信的本地服务
- Git入门(本地使用)
- S7-200SMART PLC通过RS485接口与西门子SMART LINE系列触摸屏通信具体步骤
- 我为大家分享永久免费空间 云专家
- 从零到一制作个人网站
- Mac配置item2高亮颜色
- 物联网为什么需要5G?
- 篮球比赛分组问题(动态规划)
- Fortran写nc文件nbsp;f90nbsp;netcdf
- FormData用法详解
热门文章
- 【风马一族_php】常用的语句
- BUPT OJ143 Triangle
- 【转载】上帝粒子证实存在宇宙末日来临?(图)
- Python 爬虫心得
- jQuery源码分析系列(一)初识jQuery
- 计算机组成原理笔记(王道考研) 第七章:输入输出系统
- 教程篇(7.0) 06. FortiGate基础架构 单点登录(FSSO) ❀ Fortinet 网络安全专家 NSE 4
- 清音驱腐启鸿蒙,竹韵清音-格律诗词41期
- 程序员们一个一个的都挺神的,堪称 35 岁毕业之后再就业的标兵,不服不行
- 泰拉瑞亚自建服务器,泰拉瑞亚1.4版本服务器创建教程