简介

python连接kafka的标准库,kafka-python和pykafka。kafka-python使用的人多是比较成熟的库,kafka-python并没有zk的支持。pykafka是Samsa的升级版本,使用samsa连接zookeeper,生产者直接连接kafka服务器列表,消费者才用zookeeper。

安装

# PyPI安装
pip install kafka-python# conda安装
conda install -c conda-forge kafka-python# anaconda自带pip安装
/root/anaconda3/bin/pip install kafka-python

官方链接

  • 官网:https://kafka-python.readthedocs.io/en/master/index.html
  • git:https://github.com/dpkp/kafka-python

注意:1.4.0 以上的 kafka-python 版本使用了独立的心跳线程去上报心跳

生产者

API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html。

生产者代码是线程安全的,支持多线程,而消费者则不然。

类 KafkaProducer

class kafka.KafkaProducer(**configs)

  • bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'组成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host为broker(Broker:缓存代理,Kafka集群中的单台服务器)地址,默认值为 localhost, port默认值为9092,这里可以不用填写所有broker的host和port,但必须保证至少有一个broker)
  • key_serializer (可调用对象) –用于转换用户提供的key值为字节,必须返回字节数据。 如果为None,则等同调用f(key)。 默认值: None.
  • value_serializer(可调用对象) – 用于转换用户提供的value消息值为字节,必须返回字节数据。 如果为None,则等同调用f(value)。 默认值: None.

方法

send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)

函数返回FutureRecordMetadata类型的RecordMetadata数据

  • topic(str) – 设置消息将要发布到的主题,即消息所属主题
  • value(可选) – 消息内容,必须为字节数据,或者通过value_serializer序列化后的字节数据。如果为None,则key必填,消息等同于“删除”。( If value is None, key is required and message acts as a ‘delete’)
  • partition (int, 可选) – 指定分区。如果未设置,则使用配置的partitioner
  • key (可选) – 和消息对应的key,可用于决定消息发送到哪个分区。如果partition为None,则相同key的消息会被发布到相同分区(但是如果key为None,则随机选取分区)(If partition is None (and producer’s partitioner config is left as default), then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly)). 必须为字节数据或者通过配置的key_serializer序列化后的字节数据.
  • headers (可选) – 设置消息header,header-value键值对表示的list。list项为元组:格式 (str_header,bytes_value)
  • timestamp_ms (int, 可选) –毫秒数 (从1970 1月1日 UTC算起) ,作为消息时间戳。默认为当前时间

flush(timeout=None)

发送所有可以立即获取的缓冲消息(即时linger_ms大于0),线程block直到这些记录发送完成。当一个线程等待flush调用完成而block时,其它线程可以继续发送消息。

注意:flush调用不保证记录发送成功

metrics(raw=False)

获取生产者性能指标。

#-*- encoding:utf-8 -*-
from kafka import KafkaProducer
import jsonproducer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
for i in range(0, 100):producer.send('MY_TOPIC1', value=b'lai zi shouke de msg', key=None, headers=None, partition=None, timestamp_ms=None)# Block直到单条消息发送完或者超时
future = producer.send('MY_TOPIC1', value=b'another msg',key=b'othermsg')
result = future.get(timeout=60)
print(result)
# future.get函数等待单条消息发送完成或超时,经测试,必须有这个函数,不然发送不出去,或用time.sleep代替,待验证# Block直到所有阻塞的消息发送到网络
# 注意: 该操作不保证传输或者消息发送成功,仅在配置了linger_ms的情况下有用。(It is really only useful if you configure internal batching using linger_ms# 序列化json数据
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('MY_TOPIC1', {'shouke':'kafka'})# 序列化字符串key
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', key_serializer=str.encode)
producer.send('MY_TOPIC1', b'shouke', key='strKey')# 压缩
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092',compression_type='gzip')
for i in range(2):producer.send('MY_TOPIC1', ('msg %d' % i).encode('utf-8'))# 消息记录携带header
producer.send('MY_TOPIC1', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64'),])# 获取性能数据(注意,实践发现分区较多的情况下,该操作比较耗时
metrics = producer.metrics()
print(metrics)
producer.flush()

实践中遇到错误: kafka.errors.NoBrokersAvailable: NoBrokersAvailable,解决方案如下:

进入到配置目录(config),编辑server.properties文件,查找并设置listener,配置监听端口,格式:listeners = listener_name://host_name:port,供kafka客户端连接用的ip和端口,例中配置如下:

listeners=PLAINTEXT://127.0.0.1:9092

消费者

参考API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

消费者代码不是线程安全的,最好不要用多线程

类KafkaConsumer

class kafka.KafkaConsumer(*topics, **configs)

*topics (str) – 可选,设置需要订阅的topic,如果未设置,需要在消费记录前调用subscribe或者assign。

  • bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'组成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host为broker(Broker:缓存代理,Kafka集群中的单台服务器)地址,默认值为 localhost, port默认值为9092,这里可以不用填写所有broker的host和port,但必须保证至少有一个broker)
  • client_id (str) – 客户端名称,默认值: ‘kafka-python-{version}’
  • group_id (str or None) – 消费组名称。如果为None,则通过group coordinator auto-partition分区分配,offset提交被禁用。默认为None
  • auto_offset_reset (str) – 重置offset策略: 'earliest'将移动到最老的可用消息, 'latest'将移动到最近消息。 设置为其它任何值将抛出异常。默认值:'latest'。
  • enable_auto_commit (bool) – 如果为True,将自动定时提交消费者offset。默认为True。
  • auto_commit_interval_ms (int) – 自动提交offset之间的间隔毫秒数。如果enable_auto_commit 为true,默认值为: 5000。
  • value_deserializer(可调用对象) - 携带原始消息value并返回反序列化后的value
  • consumer_timeout_ms – 毫秒数,若不指定 consumer_timeout_ms,默认一直循环等待接收,若指定,则超时返回,不再等待
  • max_poll_interval_ms – 毫秒数,它表示最大的poll数据间隔,如果超过这个间隔没有发起pool请求,但heartbeat仍旧在发,就认为该 consumer 处于 livelock 状态,进行 reblancing
  • session_timout_ms – 毫秒数,控制心跳超时时间。在分布式系统中,由于网络问题你不清楚没接收到心跳,是因为对方真正挂了还是只是因为负载过重没来得及发生心跳或是网络堵塞。所以一般会约定一个时间,超时即判定对方挂了
  • heartbeat_interval_ms – 毫秒数,控制心跳发送频率,频率越高越不容易被误判,但也会消耗更多资源。
  • max_pool_record(int),kafka 每次 pool 拉取消息的最大数量

subscribe(topics=(), pattern=None, listener=None)

订阅需要的主题

  • topics (list) – 需要订阅的主题列表
  • pattern (str) – 用于匹配可用主题的模式,即正则表达式。注意:必须提供topics、pattern两者参数之一,但不能同时提供两者。

metrics(raw=False)

获取消费者性能指标。

#-*- encoding:utf-8 -*-
from kafka import KafkaConsumer
from kafka import TopicPartition
import json
consumer = KafkaConsumer('MY_TOPIC1',bootstrap_servers=['127.0.0.1:9092'],auto_offset_reset='latest',   # 消费 kafka 中最近的数据,如果设置为 earliest 则消费最早的未被消费的数据enable_auto_commit=True,      # 自动提交消费者的 offsetauto_commit_interval_ms=3000, # 自动提交消费者 offset 的时间间隔group_id='MY_GROUP1',consumer_timeout_ms= 10000,   # 如果 10 秒内 kafka 中没有可供消费的数据,自动退出client_id='consumer-python3'
)for msg in consumer:print (msg)print('topic: ', msg.topic)print('partition: ', msg.partition)print('key: ', msg.key, 'value: ', msg.value)print('offset:', msg.offset)print('headers:', msg.headers)# Get consumer metrics
metrics = consumer.metrics()
print(metrics)# 通过assign、subscribe两者之一为消费者设置消费的主题
consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'],auto_offset_reset='latest',enable_auto_commit=True,    # 自动提交消费数据的 offsetconsumer_timeout_ms= 10000, # 如果 10 秒内 kafka 中没有可供消费的数据,自动退出value_deserializer=lambda m: json.loads(m.decode('ascii')), #消费json 格式的消息client_id='consumer-python3'
)# consumer.assign([TopicPartition('MY_TOPIC1', 0)])
# msg = next(consumer)
# print(msg)
consumer.subscribe('MY_TOPIC1')
for msg in consumer:print (msg)

客户端

  • 参考API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html

    • 用于异步请求/响应网络I / O的网络客户端。
    • 这是一个内部类,用于实现面向用户的生产者和消费者客户端。
    • 此类不是线程安全的!
  • 参考API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html 
    • 管理Kafka集群

类 KafkaClient

class kafka.client.KafkaClient(**configs)

  • bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'组成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host为broker(Broker:缓存代理,Kafka集群中的单台服务器)地址,默认值为 localhost, port默认值为9092,这里可以不用填写所有broker的host和port,但必须保证至少有一个broker)
  • client_id (str) – 客户端名称,默认值: ‘kafka-python-{version}’
  • request_timeout_ms (int) – 客户端请求超时时间,单位毫秒。默认值: 30000.

方法

brokers()

获取所有broker元数据

available_partitions_for_topic(topic)

返回主题的所有分区

#-*- encoding:utf-8 -*-
from kafka.client import KafkaClientclient = KafkaClient(bootstrap_servers=['127.0.0.1:9092'], request_timeout_ms=3000)# 获取所有broker
brokers = client.cluster.brokers()
for broker in brokers:print('broker: ', broker)  # broker:  BrokerMetadata(nodeId=0, host='127.0.0.1', port=9092, rack=None)print('broker nodeId: ', broker.nodeId)  # broker nodeId:  0# 获取主题的所有分区
topic = 'MY_TOPIC1'
partitions = client.cluster.available_partitions_for_topic(topic)
print(partitions)  # {0}partition_dict = {}
partition_dict[topic] = [partition for partition in partitions]
print(partition_dict)  # {'MY_TOPIC1': [0]}

类 KafkaAdminClient

class kafka.client.KafkaAdminClient(**configs)

  • bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'组成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host为broker(Broker:缓存代理,Kafka集群中的单台服务器)地址,默认值为 localhost, port默认值为9092,这里可以不用填写所有broker的host和port,但必须保证至少有一个broker)
  • client_id (str) – 客户端名称,默认值: ‘kafka-python-{version}’
  • request_timeout_ms (int) – 客户端请求超时时间,单位毫秒。默认值: 30000.

方法

list_topics()

获取所有的 topic

create_partitions(topic_partitions,timeout_ms = None,validate_only = False )

为现有主题创建其他分区。返回值:合适版本的CreatePartitionsResponse类。

  • topic_partitions –主题名称字符串到NewPartition对象的映射。
  • timeout_ms –代理返回之前等待创建新分区的毫秒数。
  • validate_only –如果为True,则实际上不创建新分区。默认值:False

create_topics(new_topics,timeout_ms = None,validate_only = False )

在集群中创建新主题。返回值:合适版本的CreateTopicResponse类。

  • new_topics – NewTopic对象的列表。
  • timeout_ms –代理返回之前等待创建新主题的毫秒。
  • validate_only –如果为True,则实际上不创建新主题。并非所有版本都支持。默认值:False

delete_topics(主题,timeout_ms =无)

从集群中删除主题。返回值:合适版本的DeleteTopicsResponse类。

  • 主题-主题名称的字符串列表。
  • timeout_ms –代理返回之前等待删除主题的毫秒数。

describe_consumer_groups(group_ids,group_coordinator_id = None,include_authorized_operations = False)

描述一组消费者group。返回值:组说明列表。目前,组描述是DescribeGroupsResponse的原始结果。

  • group_ids –消费者组ID的列表。这些通常是作为字符串的组名。
  • group_coordinator_id –组的协调器代理的node_id。如果设置为None,它将查询群集中的每个组以找到该组的协调器。如果您已经知道组协调器,则明确指定此选项对于避免额外的网络往返很有用。这仅在所有group_id具有相同的协调器时才有用,否则会出错。默认值:无。
  • include_authorized_operations –是否包括有关允许组执行的操作的信息。仅在API版本> = v3上受支持。默认值:False。

list_consumer_group_offsets(group_id,group_coordinator_id = None,partitions = None)

获取单个消费者组的消费者offset。注意:这不会验证group_id或分区在集群中是否实际存在。一旦遇到任何错误,就会立即报错。   返回字典:具有TopicPartition键和OffsetAndMetada值的字典。省略未指定且group_id没有记录偏移的分区。偏移值-1表示group_id对于该TopicPartition没有偏移。一个-1只能发生于显式指定的分区。

  • group_id –要获取其偏移量的消费者组ID名称。
  • group_coordinator_id –组的协调代理的node_id。如果设置为None,将查询群集以查找组协调器。如果您已经知道组协调器,则明确指定此选项对于防止额外的网络往返很有用。默认值:无。
  • partitions –要获取其偏移量的TopicPartitions列表。在> = 0.10.2上,可以将其设置为“无”以获取使用者组的所有已知偏移量。默认值:无。

list_consumer_groups(broker_ids = None)

列出集群已知的所有消费者组。这将返回消费者组元组的列表。元组由使用者组名称和使用者组协议类型组成。仅返回将偏移量存储在Kafka中的消费者组。对于使用Kafka <0.9 API创建的群组,协议类型将为空字符串,因为尽管它们将偏移量存储在Kafka中,但它们并不使用Kafka进行群组协调。对于使用Kafka> = 0.9创建的群组,协议类型通常为“消费者”。

  • broker_ids –用于查询使用者组的代理节点ID的列表。如果设置为None,将查询集群中的所有代理。明确指定经纪人对于确定哪些消费者组由这些经纪人进行协调很有用。默认值:无
from kafka.admin import KafkaAdminClient, NewTopic
client = KafkaAdminClient(bootstrap_servers="localhost:9092")
topic_list = []# 创建自定义分区的topic 可以使用以下方法创建名称为test,12个分区3份副本的topic
topic_list.append(NewTopic(name="test", num_partitions=12, replication_factor=3))
client.create_topics(new_topics=topic_list, validate_only=False)# 获取所有的 topic
client.list_topics()# 删除 topic
client.delete_topics(['test', 'ssl_test'])  # 传入要删除的 topic 列表# list_consumer_groups()的返回值是一个元组(消费者组的名称,消费组协议类型)组成的列表。
client.list_consumer_groups()
#  [('xray', 'consumer'), ('awvs', 'consumer')]# 返回值是一个字典,字典的key是TopicPartition,值是OffsetAndMetada
client.list_consumer_group_offsets('awvs')
#  {TopicPartition(topic='scan, partition=0): OffsetAndMetadata(offset=17, metadata='')

python-kafka 常用 api 汇总相关推荐

  1. python命令大全下载-Python pip 常用命令汇总

    使用了这么就pip命令,但是一直是简单使用,很多命令都是用了查,查了用,今天把常用的命令汇总一下,方便使用. 命令: pip 由上图可以看到 pip 支持一下命令 Commands: install ...

  2. python常用命令大全-Python pip 常用命令汇总

    使用了这么就pip命令,但是一直是简单使用,很多命令都是用了查,查了用,今天把常用的命令汇总一下,方便使用. 命令: pip 由上图可以看到 pip 支持一下命令 Commands: install ...

  3. Paho MQTT Python客户端常用API、安装与使用

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的即时通信协议,相关介绍可见:MQTT简介. Paho 是Eclipse的开源 MQTT 客户端项目,提供 ...

  4. c、c++ 常用API汇总

    前言 本文汇总c.cpp里常用API,会持续更新,便于查阅. C语言部分参考:C语言 基础知识整理 <string.h> 传入此类函数的指针必须是以空字符为结尾的. char *strch ...

  5. python flash_Python常用知识点汇总(Flash)

    一.Python中的数据结构 python的元组.列表.字典数据类型是很python(there python is a adjective)的数据结构.这些结构都是经过足够优化后的,所以如果使用好的 ...

  6. kafka 常用命令汇总

    启动 kafka 服务 # 使用 -daemon 选项表示后台运行kafka服务 ./kafka-server-start.sh -daemon ../config/server.properties ...

  7. 工作、生活免费常用API汇总

    free-api: https://www.free-api.com/ 短信验证码:可用于登录.注册.找回密码.支付认证等等应用场景.支持三大运营商,3秒可达,99.99%到达率,支持大容量高并发. ...

  8. Python最常用库汇总

    一.数据处理 1. 数据分析:Numpy, Pandas, SciPy 2. 数据可视化:Matplotlib, Seaborn, Mayavi (3D) 3. 文本处理:PyPDF2, NLTK, ...

  9. jquery的常用API汇总

    1.$.contains()方法 定义与用法:$.contains() 方法用于判断指定元素内是否包含另一个元素.即判断另一个DOM元素是否是指定DOM元素的后代.语法: $.contains( co ...

最新文章

  1. PTA 基础编程题目集 7-12 两个数的简单计算器 C语言
  2. 蓝桥杯:入门训练 圆的面积
  3. mysql数据库备份到oss_备份MySQL数据库并上传到阿里云OSS存储
  4. java包含关系图_Java——Spring框架完整依赖关系图!再复习了解加工一下吧?
  5. hive java udf_UDF_Hive教程_田守枝Java技术博客
  6. Xshell5 访问虚拟机Ubuntu16.04
  7. 解决sklearn库使用过程中No module named model_selection的错误
  8. pyqt5设置dialog的标题_PyQt5教程——对话框(6)
  9. 怎么用计算机编程算术,总算认识怎么用scratch做计算题答题程序
  10. 如何把空间数据从CGCS2000转换到WGS84和BD09 ——JAVA语言实现
  11. 解决无法删除文件夹的情况:文件夹正在使用,操作无法完成,因为其中的文件,或文件夹已在另一个程序中打开...
  12. Alpha测试与Beta测试
  13. ctfshow (ssrf学习和实践)
  14. win11系统输入法增加小鹤双排
  15. Python的PIL库中的getpixel方法 putpixel方法
  16. 自己的家用电脑怎么架设传奇私服??
  17. Java词云--Kumo使用
  18. 2008北京奥运会足球赛程(男足)
  19. 吴恩达机器学习第十周测试
  20. 基于Sentinel-2的杞县大蒜提取试验

热门文章

  1. 4G发牌或提早 电信联通面临艰难抉择
  2. mp4(H264容器)的详细文件格式分析
  3. 解决:springcloud 启动 config-client 报错:... .integration.config.HandlerMethodArgumentResolversHolder
  4. 解决:Error response from daemon: Get https://index.docker.io/v1/search?q=openjdkn=25: dial tcp: looku
  5. IDEA中怎么设置黑色或白色背景?
  6. IPv6 解说 ,与IPv4的同异
  7. uni-app—从安装到卸载
  8. 计算机专业 程序员技术练级攻略(转载)
  9. 【原创】Performanced C++ 经验规则 第五条:再谈重载、覆盖和隐藏
  10. 1566:基础练习 十六进制转八进制