python kafka消费实时数据,python生产和消费kafka数据
安装kafka-python
pip install kafka-python
生产者
from kafka import KafkaProducer # 有时候导入包会报错,使用pip uninstall kafka-python,卸载后重装可以解决
import json
# 创建producer对象
producer = KafkaProducer(
value_serializer=lambda v: json.dumps(v).encode('utf-8'), # 对发送的数据进行序列化处理
bootstrap_servers=['192.168.0.189:9092','192.168.0.190:9092','192.168.0.191:9092'] # 安装了kafka的集群
)
for i in range(10):
# 创建 data
data={
"name":"李四",
"age":23,
"gender":"男",
"id":i
}
# 将data发送到kafka,主题'test_topic'(自定义)
producer.send('test_topic', data)
producer.close()
消费者
from kafka import KafkaConsumer
import json
# 建立消费者对象
consumer = KafkaConsumer('test_topic', # 与消费者中发送消息的 topic对应
bootstrap_servers=['192.168.0.189:9092','192.168.0.190:9092','192.168.0.191:9092'],
value_deserializer=json.loads # 反序列化数据
)
# 生产者中send()一次数据,消费者中就会接收到一次数据,所以需要遍历
for message in consumer:
print(message.value) # 通过.value方法获取到值
consumer.close()
注:有时候建立 生产者 或消费者 对象时会报错,反复多试几次就可以建立成功,具体什么原因还得多研究,后续补充
标签:消费,producer,python,9092,kafka,192.168,import
来源: https://www.cnblogs.com/jaysonteng/p/14182755.html
python kafka消费实时数据,python生产和消费kafka数据相关推荐
- kafka java_Kafka 使用Java实现数据的生产和消费demo
前言 在上一篇中讲述如何搭建kafka集群,本篇则讲述如何简单的使用 kafka .不过在使用kafka的时候,还是应该简单的了解下kafka. Kafka的介绍 Kafka是一种高吞吐量的分布式发布 ...
- 获取props里面的数据_Kafka 使用Java实现数据的生产和消费demo
Kafka的介绍 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据. Kafka 有如下特性: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB ...
- kafka练习:创建topic以及生产与消费
1.java和scala版本的kafka adminclient去创建topic主题? scala版本 import java.util import java.util.Properties imp ...
- python stdout.read_实时读取python STDOUT
我的代码如下,基本上,该模块将运行所需的命令并逐行捕获其输出,但是在我的情况下,当命令运行时,仅需一秒钟多的时间即可返回命令提示符,即child.stdout. read(1)挂起,如果我使用此命令运 ...
- KAFKA 最新版 Shell API单机生产与消费
文章目录 一.KAFKA 启动与监控 二.KAFKA 主题创建于查看生产与消费 2.1. 查看主题列表 2.2. 创建主题 2.3. 查看主题信息 2.4. 主题信息分析 三.KAFKA 主题创建于查 ...
- 数据治理价值链模型与数据基础制度分析
数据治理价值链模型与数据基础制度分析 黄科满1, 杜小勇1,2 1中国人民大学信息学院 2数据工程与知识工程教育部重点实验室 摘要:培育数据要素市场是实现数据价值充分释放的重要机制.而数据要素市场的繁 ...
- 日志服务Python消费组实战(三):实时跨域监测多日志库数据
解决问题 使用日志服务进行数据处理与传递的过程中,你是否遇到如下监测场景不能很好的解决: 特定数据上传到日志服务中需要检查数据内的异常情况,而没有现成监控工具? 需要检索数据里面的关键字,但数据没有建 ...
- 使用python读取kafka实时topic数据demo,包括安装kafka module
1. 安装kafka module kafka-python安装,转载:https://blog.csdn.net/see_you_see_me/article/details/78468421 1. ...
- python同花顺股票实时数据_web实时股票数据展示
广告关闭 腾讯云11.11云上盛惠 ,精选热门产品助力上云,云服务器首年88元起,买的越多返的越多,最高返5000元! 所有这些都是实时发生的,并推送到仪表板供用户评估事物和行为. 最终,为了能够从任 ...
最新文章
- C#语言与面向对象技术(5)
- 【 C 】用动态数组实现堆栈
- 【资源推荐】良心之作!超过 10000+ 的互联网团队正在使用的在线 API 文档、技术文档工具...
- 熊猫直播P2P分享率优化(下):ASN组网
- Android开发之解决ListView和ScrollView滑动冲突的方法
- python程序会监控错误的语句_python装饰器实现对异常代码出现进行自动监控
- windows下MBCS和UNICODE编码的转换
- android语音识别和合成第三方
- C# Using 用法
- Updatepanel jquery 失效解决方案
- 使用Connector/C++操作MySQL
- 51单片机 模块化编程
- matlab神经网络常用函数
- 菜鸟好文推荐(七)——他改了密码,姑娘说了“Yes, I do”
- vue结合Waterfall做图片瀑布流展示
- 如何在数据库中存储用户密码_如何在数据库中存储密码
- 阿里云盘小白羊版,带分享功能可转存115文件的第三方客户端
- 【Axure基础教程】第19章 树节点
- echarts引入省份地图 失败原因
- java新特性--03--Stream简介
热门文章
- #单机只打开一次窗口_[2019年11月27日]CCWOW单机版修复内容
- log4j slf4j实现_日志那点事儿——slf4j源码剖析
- python 类继承方法_python类的继承、多继承及其常用魔术方法
- hive sql 报错后继续执行_Hive优化之Spark执行引擎参数调优(二)
- sql增加字段默认为0_OUP2.0:mysql乐观锁不生效
- C语言:存储类型,内存管理
- mysql访问被拒绝1045_mysqlimport:错误:1045,访问被拒绝
- vue 组件 父向子传值
- CUDA C编程权威指南 第四章 全局内存
- 图解TCPIP-OSI7层网络模型