python使用kafka生产和消费案例
// confluent_kafka 使用案例
import json
from confluent-kafka import Producertopic_name = ""
conf = {// 集群,或者服务器名
"bootstrap.servers":"",
// 安全隧道
"security.protocol":"sasl_plaintext",
//加密方式
"sasl.mechanism":"SCRAM-SHA-256"
// 账号密码
"sasl.username":"",
"sasl.password":"",
# >> [详细配置见](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
....
}produce = Producer(**conf)a= {"name":"12344","age":18,"sex":"man"}
produce.produce(topic_name,json.dumps(a,ensure_ascii=False).encoding("utf-8"))produce.pull()
produce.flush()
详解 confluent_kafka python版本的生产
使用案例。
1.需要注意的是confluent_kafka 安装时候可能会出现报错,建议使用conda 安装,主要出现的问题在sasl上
2.confluent_kafka,我是用的情况来看,必须有账号和密码
3.confluent_kafka 没有常见的batch方式,你可以通过produce.poll(timeout=0.2)来对每条数据强制等待返回结果,可以使用flush(),批量等待结果(内部调用poll()实现具体任务)
4.如果想要提高效率,可以取消掉produce.flush() 和 produce.pull()
5.每一个produce(),传入字符串有个长度限制和容量显示,可以在详细配置见找message.max.bytes
来提高传入字符串的长度。
import json
from confluent-kafka import Consumertopic_name = ""
conf = {// 集群,或者服务器名
"bootstrap.servers":"",
// 安全隧道
"security.protocol":"sasl_plaintext",
//加密方式
"sasl.mechanism":"SCRAM-SHA-256"
// 账号密码
"sasl.username":"",
"sasl.password":"",
"group.id":["xxx","xxx1"],
// 需要增加offset相关的配置和消费策略# >> [详细配置见](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
....
}consumer= Consumer(**conf)
batch_size = 200
while True:// 消费数据data = consumer.consume(batch_size)keep_datas = []// 判断是否消费失败if data.error() is not None:for dat in data:if dat.error() is not None:keep_datas.append(dat.value())else:print("当前此条message消费失败......")else:print("当前此批消费失败~~")# json 解析判断use_items = []for keep_data in keep_datas:try:use_items.append(json.loads(keep_data))except Exception as eprint("当前此条消息,json解析失败.....")
python使用kafka生产和消费案例相关推荐
- kafka 生产和消费信息入门
启动生产者 kafka-console-producer.sh \ --broker-list mypc01:9092,mypc02:9092,mypc03:9092 \ --topic pet 启动 ...
- Kafka生产与消费脚本工具
创建主题 [root@localhost bin]# ./kafka-topics.sh --bootstrap-server 192.168.10.141:9092,192.168.10.142:9 ...
- kafka 查看待消费数据_kafka查看消费数据
一.如何查看 在老版本中,使用kafka-run-class.sh 脚本进行查看.但是对于最新版本,kafka-run-class.sh 已经不能使用,必须使用另外一个脚本才行,它就是kafka-co ...
- python kafka消费实时数据,python生产和消费kafka数据
安装kafka-python pip install kafka-python 生产者 from kafka import KafkaProducer # 有时候导入包会报错,使用pip uninst ...
- kafka 主动消费_SpringBoot2 整合Kafka组件,应用案例和流程详解
本文源码:Git || Gitee 一.搭建Kafka环境 1.下载解压 -- 下载wget http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.1 ...
- kafka java_Kafka 使用Java实现数据的生产和消费demo
前言 在上一篇中讲述如何搭建kafka集群,本篇则讲述如何简单的使用 kafka .不过在使用kafka的时候,还是应该简单的了解下kafka. Kafka的介绍 Kafka是一种高吞吐量的分布式发布 ...
- KAFKA 最新版 Shell API单机生产与消费
文章目录 一.KAFKA 启动与监控 二.KAFKA 主题创建于查看生产与消费 2.1. 查看主题列表 2.2. 创建主题 2.3. 查看主题信息 2.4. 主题信息分析 三.KAFKA 主题创建于查 ...
- Kafka实现消息生产和消费
文章目录 一.Kafka测试消息生产与消费 二.Java程序进行Kafka收发消息 1.消息生产者 2.消息消费者 一.Kafka测试消息生产与消费 # 首先创建一个主题 [root@192 kafk ...
- kafka channle的应用案例
kafka channle的应用案例 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 最近在新公司负责大数据平台的建设,平台搭建完毕后,需要将云平台(我们公司使用的Ucloud的 ...
最新文章
- DataGridView 中发生以下异常: System.Exception: 是 不是 Decimal 的有效值。 ---> System.FormatException: 输入字符串的格式不正确。
- C++程序员学Python:C与Python进行交互
- 自定义动画——animate()
- Apache支持ASP.NET方法浅析
- netdev: dev_watchdog timer(结合stmmac 分析)
- vue-cli3配置externals、jquery
- Qt:QSound无法播放.wav声音的解决办法
- docker 镜像修改的配置文件自动还原_所以到底该如何修改 docker 容器的端口映射!!!...
- Vue3(setup函数介绍)
- 从一开始,说出事java匿名内部类
- VRAR应该是工具,而不是“玩具”
- ES6--async函数
- 好工具推荐系列:Github客户端GitHub Desktop使用方法
- HTC官方通用解锁教程(附一键解锁工具)
- 懒惰还是懦弱?你真的不行吗?
- U盘格式化后容量变小了怎么恢复教程
- 玩转SOLIDWORKS的必备内功:机械传动基础原理
- 电脑假死卡的动不了_win10电脑突然卡死动不了怎么办 四种方法快速解决电脑卡死...
- arcgis 实验教程--ModelBuilder与空间建模
- 高管激励的有效手段----股权激励