安装kafka-python

pip install kafka-python

生产者

from kafka import KafkaProducer # 有时候导入包会报错,使用pip uninstall kafka-python,卸载后重装可以解决

import json

# 创建producer对象

producer = KafkaProducer(

value_serializer=lambda v: json.dumps(v).encode('utf-8'), # 对发送的数据进行序列化处理

bootstrap_servers=['192.168.0.189:9092','192.168.0.190:9092','192.168.0.191:9092'] # 安装了kafka的集群

)

for i in range(10):

# 创建 data

data={

"name":"李四",

"age":23,

"gender":"男",

"id":i

}

# 将data发送到kafka,主题'test_topic'(自定义)

producer.send('test_topic', data)

producer.close()

消费者

from kafka import KafkaConsumer

import json

# 建立消费者对象

consumer = KafkaConsumer('test_topic', # 与消费者中发送消息的 topic对应

bootstrap_servers=['192.168.0.189:9092','192.168.0.190:9092','192.168.0.191:9092'],

value_deserializer=json.loads # 反序列化数据

)

# 生产者中send()一次数据,消费者中就会接收到一次数据,所以需要遍历

for message in consumer:

print(message.value) # 通过.value方法获取到值

consumer.close()

注:有时候建立 生产者 或消费者 对象时会报错,反复多试几次就可以建立成功,具体什么原因还得多研究,后续补充

标签:消费,producer,python,9092,kafka,192.168,import

来源: https://www.cnblogs.com/jaysonteng/p/14182755.html

python kafka消费实时数据,python生产和消费kafka数据相关推荐

  1. kafka java_Kafka 使用Java实现数据的生产和消费demo

    前言 在上一篇中讲述如何搭建kafka集群,本篇则讲述如何简单的使用 kafka .不过在使用kafka的时候,还是应该简单的了解下kafka. Kafka的介绍 Kafka是一种高吞吐量的分布式发布 ...

  2. 获取props里面的数据_Kafka 使用Java实现数据的生产和消费demo

    Kafka的介绍 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据. Kafka 有如下特性: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB ...

  3. kafka练习:创建topic以及生产与消费

    1.java和scala版本的kafka adminclient去创建topic主题? scala版本 import java.util import java.util.Properties imp ...

  4. python stdout.read_实时读取python STDOUT

    我的代码如下,基本上,该模块将运行所需的命令并逐行捕获其输出,但是在我的情况下,当命令运行时,仅需一秒钟多的时间即可返回命令提示符,即child.stdout. read(1)挂起,如果我使用此命令运 ...

  5. KAFKA 最新版 Shell API单机生产与消费

    文章目录 一.KAFKA 启动与监控 二.KAFKA 主题创建于查看生产与消费 2.1. 查看主题列表 2.2. 创建主题 2.3. 查看主题信息 2.4. 主题信息分析 三.KAFKA 主题创建于查 ...

  6. 数据治理价值链模型与数据基础制度分析

    数据治理价值链模型与数据基础制度分析 黄科满1, 杜小勇1,2 1中国人民大学信息学院 2数据工程与知识工程教育部重点实验室 摘要:培育数据要素市场是实现数据价值充分释放的重要机制.而数据要素市场的繁 ...

  7. 日志服务Python消费组实战(三):实时跨域监测多日志库数据

    解决问题 使用日志服务进行数据处理与传递的过程中,你是否遇到如下监测场景不能很好的解决: 特定数据上传到日志服务中需要检查数据内的异常情况,而没有现成监控工具? 需要检索数据里面的关键字,但数据没有建 ...

  8. 使用python读取kafka实时topic数据demo,包括安装kafka module

    1. 安装kafka module kafka-python安装,转载:https://blog.csdn.net/see_you_see_me/article/details/78468421 1. ...

  9. python同花顺股票实时数据_web实时股票数据展示

    广告关闭 腾讯云11.11云上盛惠 ,精选热门产品助力上云,云服务器首年88元起,买的越多返的越多,最高返5000元! 所有这些都是实时发生的,并推送到仪表板供用户评估事物和行为. 最终,为了能够从任 ...

最新文章

  1. C#语言与面向对象技术(5)
  2. 【 C 】用动态数组实现堆栈
  3. 【资源推荐】良心之作!超过 10000+ 的互联网团队正在使用的在线 API 文档、技术文档工具...
  4. 熊猫直播P2P分享率优化(下):ASN组网
  5. Android开发之解决ListView和ScrollView滑动冲突的方法
  6. python程序会监控错误的语句_python装饰器实现对异常代码出现进行自动监控
  7. windows下MBCS和UNICODE编码的转换
  8. android语音识别和合成第三方
  9. C# Using 用法
  10. Updatepanel jquery 失效解决方案
  11. 使用Connector/C++操作MySQL
  12. 51单片机 模块化编程
  13. matlab神经网络常用函数
  14. 菜鸟好文推荐(七)——他改了密码,姑娘说了“Yes, I do”
  15. vue结合Waterfall做图片瀑布流展示
  16. 如何在数据库中存储用户密码_如何在数据库中存储密码
  17. 阿里云盘小白羊版,带分享功能可转存115文件的第三方客户端
  18. 【Axure基础教程】第19章 树节点
  19. echarts引入省份地图 失败原因
  20. java新特性--03--Stream简介

热门文章

  1. #单机只打开一次窗口_[2019年11月27日]CCWOW单机版修复内容
  2. log4j slf4j实现_日志那点事儿——slf4j源码剖析
  3. python 类继承方法_python类的继承、多继承及其常用魔术方法
  4. hive sql 报错后继续执行_Hive优化之Spark执行引擎参数调优(二)
  5. sql增加字段默认为0_OUP2.0:mysql乐观锁不生效
  6. C语言:存储类型,内存管理
  7. mysql访问被拒绝1045_mysqlimport:错误:1045,访问被拒绝
  8. vue 组件 父向子传值
  9. CUDA C编程权威指南 第四章 全局内存
  10. 图解TCPIP-OSI7层网络模型