python消费kafka_Python脚本消费kafka数据
kafka简介(摘自百度百科)
一、简介:
详见:https://blog.csdn.net/Beyond_F4/article/details/80310507
二、安装
详见博客:https://blog.csdn.net/beyond_f4/article/details/80095689
三、按照官网的样例,先跑一个应用
1、生产者:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['172.21.10.136:9092']) #此处ip可以是多个['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]
for i in range(3):
msg = "msg%d" % i
producer.send('test', msg)
producer.close()
2、消费者(简单demo):
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
bootstrap_servers=['172.21.10.136:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
启动后生产者、消费者可以正常消费。
3、消费者(消费群组)
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
group_id='my-group',
bootstrap_servers=['172.21.10.136:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
启动多个消费者,只有其中可以可以消费到,满足要求,消费组可以横向扩展提高处理能力
4、消费者(读取目前最早可读的消息)
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
auto_offset_reset='earliest',
bootstrap_servers=['172.21.10.136:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest
源码定义:{'smallest': 'earliest', 'largest': 'latest'}
5、消费者(手动设置偏移量)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer('test',
bootstrap_servers=['172.21.10.136:9092'])
print consumer.partitions_for_topic("test") #获取test主题的分区信息
print consumer.topics() #获取主题列表
print consumer.subscription() #获取当前消费者订阅的主题
print consumer.assignment() #获取当前消费者topic、分区信息
print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量
consumer.seek(TopicPartition(topic=u'test', partition=0), 5) #重置偏移量,从第5个偏移量消费
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
6、消费者(订阅多个主题)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test','test0')) #订阅要消费的主题
print consumer.topics()
print consumer.position(TopicPartition(topic=u'test', partition=0)) #获取当前主题的最新偏移量
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
7、消费者(手动拉取消息)
from kafka import KafkaConsumer
import time
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test','test0'))
while True:
msg = consumer.poll(timeout_ms=5) #从kafka获取消息
print msg
time.sleep(1)
8、消费者(消息挂起与恢复)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0))
num = 0
while True:
print num
print consumer.paused() #获取当前挂起的消费者
msg = consumer.poll(timeout_ms=5)
print msg
time.sleep(2)
num = num + 1
if num == 10:
print "resume..."
consumer.resume(TopicPartition(topic=u'test', partition=0))
print "resume......"
pause执行后,consumer不能读取,直到调用resume后恢复。
如果对您有帮助,记得给我点赞诺
如果对您有帮助,记得给我点赞诺
python消费kafka_Python脚本消费kafka数据相关推荐
- kafka 的pom文件_Flink 消费 Kafka 数据
kafka核心概念: Kafka 是一个消息队列,生产者向消息队列中写入数据,消费者从队列中获取数据并进行消费.可以认为一个 Topic 就是一个队列,每个 Topic 又会被分成多个 Partiti ...
- sparkstreaming监听hdfs目录_Spark Streaming消费Kafka数据的两种方案
下午的时候翻微信看到大家在讨论Spark消费Kafka的方式,官网中就有答案,只不过是英文的,当然很多博客也都做了介绍,正好我的收藏夹中有一篇文章供大家参考.文章写的通俗易懂,搭配代码,供大家参考. ...
- 客快物流大数据项目(六十一):将消费的kafka数据同步到Kudu中
目录 将消费的kafka数据同步到Kudu中 一.导入表名映射关系类
- Storm 消费Kafka数据及相关异常解决
Storm 消费Kafka数据及相关异常解决 问题 KafkaTopoDemo类 bolt类 问题 storm报错:Exception in thread "main" java. ...
- java消费kafka数据之后,进行堆积之后在插入数据库
java高频的获取kafka数据,导致数据库数据一致在高频读写,为了降低数据库的高频连接搞高频读写,可以将数据堆积一段时间之后,进行插入数据库操作. 主要采用了队列和缓存,将获取到的数据放入java队 ...
- SparkStreaming安全消费Kafka数据
前言 在这之前做SparkStreaming连接Kafka,我会这么写: val sparkConf = new SparkConf().setAppName("Spark2Kafka&qu ...
- java debug非同期ski,简记kafka group id相同导致的不同consumers启动后不消费和延时消费问题...
场景: 在一个线程内,使用相同的brokers和group id等配置,根据传入的topic数量N,分别定义N个consumer,按定义顺序先后调用consumers消费 现象: 程序启动后,kafk ...
- kafka控制台模拟消费_Flink初试——对接Kafka
本篇文章我们用 Flink Kafka Connector对接Kafka,实现一个简单的报警业务.我们暂时不去谈论理论,先上手实现这个简单的需求. flink-connector-kafka是 fli ...
- mac系统下使用flink消费docker运行的kafka
版本 flink 1.12.0 scala 2.11 java 1.8 kafka 2.0.2 首先使用maven创建一个新的工程 mvn archetype:generate -Darchetype ...
- 【python】使用python脚本将LFW数据中1672组同一个人多张照片拷贝出来
使用python脚本将LFW数据中1672组同一个人多张照片拷贝出来 dataCleaning4multiple.py 源码如下: import os, random, shutil import s ...
最新文章
- 为什么神经网络会把乌龟识别成步枪?现在的 AI 值得信任吗?
- 哈利波特 pdf_干货!哈利波特英文原版pdf免费领,(含音频)词汇量大于新概念!...
- dec++如何查看机器指令_机器指令到汇编再到高级编程语言!
- 谈谈对Canal( 增量数据订阅与消费 )的理解
- 修正 010 Editor 模板文件 MachO.bt 的错误
- Web Service security UserNameToken 使用
- 管理exchange 2010用户邮箱本地移动请求
- 一些html5和css3的一些常见面试题
- 从泰勒展开到牛顿迭代
- 如何安全地终止线程interrupt()、isInterrupted()、interrupted()的区别与使用
- html安装方正兰亭,方正兰亭字体
- 如何设计带限流功能的5V供电电路?快来学!
- 神经网络ANN——SPSS实现
- ruby与ruby on rails环境部署
- 各种UML图的应用场景
- 基于C++和QT实现的简单数独游戏软件
- java后端处理Apple Pay流程
- Ubuntu-18.04版本网络配置,连接网络的方法
- matlab在线_正版MATLAB向中国人民大学全校师生免费开放!
- 串口工具推荐——串口监视精灵v4.0