Confluent-kafka是由Confluent公司维护的一个kafka-client,同产品下还有c/c++,java、Go、.net和JMS。它是企业级支持的一款产品。

coufluent-kafka是Python模块,是对librdkafka的轻量级封装,librdkafka又是基于c/c++的kafka库,性能上不必多说。使用上要优于kafka-python。

参考:kafka干货(四):kafka-python和confluent-kafka比较

#安装
####首先安装librdkafka
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka/
./configure
make
sudo make install

####安装confluent-kafka
pip install confluent-kafka

#测试
##Producer

from confluent_kafka import Producer##producer配置,dict格式
p = Producer({'bootstrap.servers': '192.168.56.101,192.168.56.103,192.168.56.102'})##回调函数
def delivery_report(err, msg):if err is not None:print('Message delivery failed: {}'.format(err))else:print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))##发送
for data in ['hello','word’]:p.produce('mytopic', data.encode('utf-8'), callback=delivery_report)p.poll(10)  ##等待返回结果最大时常,单位秒
p.flush()

###参数详解
Producer()
指定生产者配置,参数类型:dict。

len(int)
等待发送的消息数,参数类型:int。

flush(timeout)
发送调用poll(),发送消息,直到len()为0。参数类型:timeout

poll(timeout)
轮询发送消息,并调用相应的返回函数,Callbacks。参数为等待返回的超市时间。

produce()
发送消息,支持回调函数。参数包含:

  1. topic
  2. value
  3. key
  4. partition
  5. on_delivery(err,msg)
  6. timestamp
  7. dict|list

##Consumer

from confluent_kafka import Consumer, KafkaErrorc = Consumer({'bootstrap.servers': '192.168.56.101','group.id': 'mygroup','default.topic.config': {'auto.offset.reset': 'smallest'}
})c.subscribe(['mytopic'])while True:msg = c.poll()if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:continueelse:print(msg.error())breakprint('Received message: {}'.format(msg.value().decode('utf-8')))c.close()

###参数详解
Consumer(config)
指定消费者配置,参数类型:dict。bootstrap.servers and group.id是必须配置。

on_commit(err, partitions)
回调函数

close()
关闭消费者,主要有三步:

  1. 停止消费。
  2. 提交偏移量。前提是enable.auto.commit=true。
  3. 离开消费者组。

commit([message=None][, offsets=None][, asynchronous=True])
提交消息或偏移量。如果两者都没有输入,则提交当前分区偏移。前提是enable.auto.commit=true。
asynchronous=True为异步提交,不回会塞。

poll(timeout)
消费消息,参数为等待超时时间。

consume([num_messages=1][, timeout=-1])
批量消费,指定一次消费条数和等待超时时间。返回一个list。关于返回报错:

  1. RuntimeError - 如果消费者已经关闭
  2. KafkaError - 如果出现kafka内部错误
  3. ValueError - 如果num_messages> 1M

pause(partitions)
暂停。

resume(partitions)
恢复。

position(partitions[, timeout=None])
返回分区偏移量。

seek(partition)
指定偏移量消费。参数:TopicPartition(topic[, partition][, offset])。

store_offsets()
提交偏移量。参数:message或offset。前提是enable.auto.offset.store=false。

subscribe(topics[, listener=None])
指定消费的主题。参数:list。可使用正则。包含两个回调函数:

  1. on_assign(consumer, partitions)
  2. on_revoke(consumer, partitions)

unassign()
删除当前分区分配

unsubscribe()
删除订阅


下班了,更多关于序列化/topic操作/配置修改/返回报错处理等更多内容会在近期更新


更多文章关注公众号

更多:kafka深入理解专栏
——————————————————————————————————
作者:桃花惜春风
转载请标明出处,原文地址:
https://blog.csdn.net/xiaoyu_BD/article/details/82051993
如果感觉本文对您有帮助,您的支持是我坚持写作最大的动力,谢谢!

kafka干货(五):kakfka的python客户端----Confluent-kafka相关推荐

  1. 【C++】【python】【kafka】使用C++调用python函数向kafka发送消息

    1.python操作kafka的代码: import sys import time import jsonfrom kafka import KafkaProducer from kafka imp ...

  2. Hello Kafka(八)——Confluent Kafka简介

    一.Confluent Kafka简介 1.Confluent Kafka简介 2014年,Kafka的创始人Jay Kreps.NahaNarkhede和饶军离开LinkedIn创立Confluen ...

  3. kafka干货(一):Confluent

    引自网络:"由于该技术平台能够实时处理业务数据,在过去的几年时间里,部署 Apache Kafka 的企业数量"如火箭般飙升".而 Confluent 技术的" ...

  4. 基于Confluent.Kafka实现的Kafka客户端操作类使用详解

    一.引言 有段时间没有写东西了,当然不是没得写,还有MongoDB的系列没有写完呢,那个系列还要继续.今天正好是周末,有点时间,来写新东西吧.最近公司用了Kafka做为消息的中间件,最开始写的那个版本 ...

  5. python子进程进行kinit认证_使用kafka-python客户端进行kafka kerberos认证

    之前说过python confluent kafka客户端做kerberos认证的过程,如果使用kafka python客户端的话同样也可以进行kerberos的认证,具体的认证机制这里不再描述,主要 ...

  6. kafka python客户端连接风暴_kafka配置单向ssl加密,以及加密后python客户端访问方式(kafka v1.1.0)...

    一.kafka broker配置以及sh客户端的使用 最近在使用kafka集群的过程中,为了保证安全性,配置了ssl加密,首先按照官网的配置进行如下的设置 #!/bin/bash #Step 1 ke ...

  7. Hello Kafka(五)——Kafka管理

    一.Kafka工具脚本简介 1.Kafka工具脚本简介 Kafka默认提供了很多个命令行脚本,用于实现各种各样的功能和运维管理.默认情况下,不加任何参数或携带--help运行Kafka shell脚本 ...

  8. 2021年大数据Kafka(五):❤️Kafka的java API编写❤️

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Kafka的java API编写 一.生产者代码 第一步: ...

  9. 干货 | 五千字长文带你快速入门FlinkSQL

    本文已收录github:https://github.com/BigDataScholar/TheKingOfBigData,里面有大数据高频考点,Java一线大厂面试题资源,上百本免费电子书籍,作者 ...

最新文章

  1. mysql存储、function、触发器等实例
  2. 分割字符串_[话俾你知]Python使用正则处理字符串技巧(分割、替换)
  3. Vue后台管理系统实现登录功能
  4. 修改VMOS2的SID 并成为成员服务器,求助SAS9.4服务器版的sid!!!急急!
  5. mysql 优惠卷表设计_这些年MySQL表设计踩过的坑!
  6. 电脑功耗测试软件_聊一款“躺着都中枪”的笔记本电脑
  7. java消费者中url找不到,java – URL可以使用浏览器访问,但是仍然是具有URLConnection的FileNotFoundException...
  8. Swift-类、结构体、枚举
  9. cacti 监控平台部署心得
  10. Nginx+Lua 积累
  11. 4.9.5 通用注释
  12. Data-Mediator入门系列4----常用类说明
  13. oracle中主键的建立,oracle 建立主键与索引
  14. Atitit. 数据约束 校验 原理理论与 架构设计 理念模式java php c#.net js javascript mysql oracle
  15. python wav转pcm
  16. 解决——完美解决Anaconda打开Spyder5报错:link image0 hasn’t been detected!
  17. thingworx集中常见数据存储方法
  18. 基于libusb库、uac协议,获取Audio声音数据
  19. 解决打包过程中出现 ModuleNotFoundError: No module named ‘pydicom.encoders.gdcm‘ 报错的方法
  20. 计算机桌面怎么全屏显示,电脑显示器如何设置全屏 把电脑屏幕调成满屏的方法有哪些...

热门文章

  1. python模块 之 xlwt模块
  2. win10如何去掉电脑桌面快捷方式图标小箭头?(强迫症必看)--附还原方法
  3. 一文搞懂│王者游戏中荣耀水晶难抽?探索游戏中的抽奖算法
  4. 14、文件指令集与变量
  5. Python数据收集入门
  6. RAID的几种常用模式
  7. CRF as RNN 代码解读
  8. nn.CrossEntropyLoss的ignore_index标签(CE loss)
  9. 22-09-02 西安 JVM 类加载器、栈、堆体系、堆参数调优、GC垃圾判定、垃圾回收算法、对象的finalize机制
  10. 华为通信算法岗(实习)-- 准备到入职全记录