// confluent_kafka 使用案例
import json
from confluent-kafka import Producertopic_name = ""
conf = {// 集群,或者服务器名
"bootstrap.servers":"",
// 安全隧道
"security.protocol":"sasl_plaintext",
//加密方式
"sasl.mechanism":"SCRAM-SHA-256"
// 账号密码
"sasl.username":"",
"sasl.password":"",
# >> [详细配置见](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
....
}produce = Producer(**conf)a= {"name":"12344","age":18,"sex":"man"}
produce.produce(topic_name,json.dumps(a,ensure_ascii=False).encoding("utf-8"))produce.pull()
produce.flush()

详解 confluent_kafka python版本的生产使用案例。
1.需要注意的是confluent_kafka 安装时候可能会出现报错,建议使用conda 安装,主要出现的问题在sasl上
2.confluent_kafka,我是用的情况来看,必须有账号和密码
3.confluent_kafka 没有常见的batch方式,你可以通过produce.poll(timeout=0.2)来对每条数据强制等待返回结果,可以使用flush(),批量等待结果(内部调用poll()实现具体任务)
4.如果想要提高效率,可以取消掉produce.flush() 和 produce.pull()
5.每一个produce(),传入字符串有个长度限制和容量显示,可以在详细配置见找message.max.bytes 来提高传入字符串的长度。

import json
from confluent-kafka import Consumertopic_name = ""
conf = {// 集群,或者服务器名
"bootstrap.servers":"",
// 安全隧道
"security.protocol":"sasl_plaintext",
//加密方式
"sasl.mechanism":"SCRAM-SHA-256"
// 账号密码
"sasl.username":"",
"sasl.password":"",
"group.id":["xxx","xxx1"],
// 需要增加offset相关的配置和消费策略# >> [详细配置见](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
....
}consumer= Consumer(**conf)
batch_size = 200
while True:// 消费数据data = consumer.consume(batch_size)keep_datas = []// 判断是否消费失败if data.error() is not None:for dat in data:if dat.error() is not None:keep_datas.append(dat.value())else:print("当前此条message消费失败......")else:print("当前此批消费失败~~")# json 解析判断use_items = []for keep_data in keep_datas:try:use_items.append(json.loads(keep_data))except Exception as eprint("当前此条消息,json解析失败.....")

python使用kafka生产和消费案例相关推荐

  1. kafka 生产和消费信息入门

    启动生产者 kafka-console-producer.sh \ --broker-list mypc01:9092,mypc02:9092,mypc03:9092 \ --topic pet 启动 ...

  2. Kafka生产与消费脚本工具

    创建主题 [root@localhost bin]# ./kafka-topics.sh --bootstrap-server 192.168.10.141:9092,192.168.10.142:9 ...

  3. kafka 查看待消费数据_kafka查看消费数据

    一.如何查看 在老版本中,使用kafka-run-class.sh 脚本进行查看.但是对于最新版本,kafka-run-class.sh 已经不能使用,必须使用另外一个脚本才行,它就是kafka-co ...

  4. python kafka消费实时数据,python生产和消费kafka数据

    安装kafka-python pip install kafka-python 生产者 from kafka import KafkaProducer # 有时候导入包会报错,使用pip uninst ...

  5. kafka 主动消费_SpringBoot2 整合Kafka组件,应用案例和流程详解

    本文源码:Git || Gitee 一.搭建Kafka环境 1.下载解压 -- 下载wget http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.1 ...

  6. kafka java_Kafka 使用Java实现数据的生产和消费demo

    前言 在上一篇中讲述如何搭建kafka集群,本篇则讲述如何简单的使用 kafka .不过在使用kafka的时候,还是应该简单的了解下kafka. Kafka的介绍 Kafka是一种高吞吐量的分布式发布 ...

  7. KAFKA 最新版 Shell API单机生产与消费

    文章目录 一.KAFKA 启动与监控 二.KAFKA 主题创建于查看生产与消费 2.1. 查看主题列表 2.2. 创建主题 2.3. 查看主题信息 2.4. 主题信息分析 三.KAFKA 主题创建于查 ...

  8. Kafka实现消息生产和消费

    文章目录 一.Kafka测试消息生产与消费 二.Java程序进行Kafka收发消息 1.消息生产者 2.消息消费者 一.Kafka测试消息生产与消费 # 首先创建一个主题 [root@192 kafk ...

  9. kafka channle的应用案例

      kafka channle的应用案例 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 最近在新公司负责大数据平台的建设,平台搭建完毕后,需要将云平台(我们公司使用的Ucloud的 ...

最新文章

  1. DataGridView 中发生以下异常: System.Exception: 是 不是 Decimal 的有效值。 ---> System.FormatException: 输入字符串的格式不正确。
  2. C++程序员学Python:C与Python进行交互
  3. 自定义动画——animate()
  4. Apache支持ASP.NET方法浅析
  5. netdev: dev_watchdog timer(结合stmmac 分析)
  6. vue-cli3配置externals、jquery
  7. Qt:QSound无法播放.wav声音的解决办法
  8. docker 镜像修改的配置文件自动还原_所以到底该如何修改 docker 容器的端口映射!!!...
  9. Vue3(setup函数介绍)
  10. 从一开始,说出事java匿名内部类
  11. VRAR应该是工具,而不是“玩具”
  12. ES6--async函数
  13. 好工具推荐系列:Github客户端GitHub Desktop使用方法
  14. HTC官方通用解锁教程(附一键解锁工具)
  15. 懒惰还是懦弱?你真的不行吗?
  16. U盘格式化后容量变小了怎么恢复教程
  17. 玩转SOLIDWORKS的必备内功:机械传动基础原理
  18. 电脑假死卡的动不了_win10电脑突然卡死动不了怎么办 四种方法快速解决电脑卡死...
  19. arcgis 实验教程--ModelBuilder与空间建模
  20. 高管激励的有效手段----股权激励

热门文章

  1. Electron中使用bytenode加密
  2. datanode无法启动Block pool ID needed, but service not yet registered with NN
  3. Web前端学习笔记(十五)---四色花瓣
  4. 【splay】BZOJ 1152 3506:[cqoi2014]排序机械臂
  5. 企业课堂----企业运营
  6. 【来日复制粘贴】使用公式提取数据
  7. 微信购物直播商城渠道定制开发
  8. 叮! Q币派送中,快来看看你中奖了吗?
  9. 一文读懂 druid连接池
  10. 文本分类概念类大总结(机器学习+深度学习)