一、基本概念

  • Topic:一组消息数据的标记符;
  • Producer:生产者,用于生产数据,可将生产后的消息送入指定的Topic;
  • Consumer:消费者,获取数据,可消费指定的Topic;
  • Group:消费者组,同一个group可以有多个消费者,一条消息在一个group中,只会被一个消费者获取;
  • Partition:分区,为了保证kafka的吞吐量,一个Topic可以设置多个分区。同一分区只能被一个消费者订阅。

二、本地安装与启动(基于Docker)

  1. 下载zookeeper镜像与kafka镜像:
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka

2. 本地启动zookeeper

docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper  

3. 本地启动kafka

docker run -d --name kafka --publish 9092:9092 --link zookeeper \
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
--env KAFKA_ADVERTISED_HOST_NAME=localhost \
--env KAFKA_ADVERTISED_PORT=9092 \
wurstmeister/kafka:latest 

注意:上述代码,将kafka启动在9092端口

4. 进入kafka bash

docker exec -it kafka bash
cd /opt/kafka/bin

5. 创建Topic,分区为2,Topic name为'kafka_demo'

kafka-topics.sh --create --zookeeper zookeeper:2181 \
--replication-factor 1 --partitions 2 --topic kafka_demo

6. 查看当前所有topic

kafka-topics.sh --zookeeper zookeeper:2181 --list

7. 安装kafka-python

pip install kafka-python

三、生产者(Producer)与消费者(Consumer)

个人封装

生产者和消费者的简易Demo,这里一起演示:

#!/usr/bin/env python
# -*- coding: utf-8 -*-import json
import tracebackfrom kafka import KafkaConsumer, KafkaProducer, TopicPartition"""
kafka 生产者
"""
class KProducer(object):def __init__(self, bootstrap_servers):""":param bootstrap_servers: 地址"""# json 格式化发送的内容self.producer = KafkaProducer(bootstrap_servers = bootstrap_servers,value_serializer = lambda m: json.dumps(m).encode("ascii")# compression_type = "gzip"    # 压缩消息发送)def sync_producer(self, topic, data):"""同步发送 数据:param topic:  topic:param data_li:  发送数据:return:"""future = self.producer.send(topic, data)record_metadata = future.get(timeout=10)  # 同步确认消费partition = record_metadata.partition     # 数据所在的分区offset = record_metadata.offset           # 数据所在分区的位置print("save success, partition: {}, offset: {}".format(partition, offset))def asyn_producer(self, topic, data):"""异步发送数据:param topic:  topic:param data_li:发送数据:return:"""self.producer.send(topic, data)self.producer.flush()  # 批量提交def asyn_producer_callback(self, topic, data):"""异步发送数据 + 发送状态处理:param topic:  topic:param data_li:发送数据:return:"""self.producer.send(topic, data).add_callback(self.send_success).add_errback(self.send_error)self.producer.flush()  # 批量提交def send_success(self, *args, **kwargs):"""异步发送成功回调函数"""print('save success')returndef send_error(self, *args, **kwargs):"""异步发送错误回调函数"""print('save error')returndef close_producer(self):try:self.producer.close()except:pass"""
kafka 消费商
"""
class PConsumers(object):def __init__(self, bootstrap_servers, group_id):""":param bootstrap_servers: 地址"""self.bootstrap_servers = bootstrap_serversself.group_id = group_id# 获取规定个数的数据(可修改做无限持续获取数据)def get_message(self, topic, count=1):""":param topic:   topic:param count: 取的条数:return: msg"""counter = 0msg = []try:consumer = KafkaConsumer(topic,bootstrap_servers = self.bootstrap_servers,group_id = self.group_id,value_deserializer = lambda m: json.loads(m.decode("ascii")),   # 确定返回结果json还是strauto_offset_reset = "earliest")for message in consumer:print("%s:%d:%d: key=%s value=%s header=%s" % (message.topic, message.partition,message.offset, message.key, message.value, message.headers))msg.append(message.value)counter += 1if count == counter:breakelse:continueconsumer.close()except Exception as e:print("{0}, {1}".format(e, traceback.print_exc()))return Nonereturn msg# 查看剩余量def get_count(self, topic):""":param topic: topic:return: count"""try:consumer = KafkaConsumer(topic,bootstrap_servers = self.bootstrap_servers,group_id = self.group_id)partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]#print("start to cal offset:")# totaltoff = consumer.end_offsets(partitions)toff = [(key.partition, toff[key]) for key in toff.keys()]toff.sort()#print("total offset: {}".format(str(toff)))# currentcoff = [(x.partition, consumer.committed(x)) for x in partitions]coff.sort()#print("current offset: {}".format(str(coff)))# cal sum and lefttoff_sum = sum([x[1] for x in toff])cur_sum = sum([x[1] for x in coff if x[1] is not None])left_sum = toff_sum - cur_sum#print("kafka left: {}".format(left_sum))consumer.close()except Exception as e:print("{0}, {1}".format(e, traceback.print_exc()))return Nonereturn left_sumif __name__ == "__main__":send_data_li = {"test": 1}#kp = KProducer(topic="test", bootstrap_servers='127.0.0.1:9001,127.0.0.1:9002')kp = KProducer(bootstrap_servers="1.1.1.1:9092")# 同步发送#kp.sync_producer(send_data_li)# 异步发送# kp.asyn_producer(send_data_li)# 异步+回调kp.asyn_producer_callback(topic="test", data=send_data_li)#kp.close_producer()#cp = PConsumers(bootstrap_servers="1.1.1.1:9092", topic="detect-file")cp = PConsumers(bootstrap_servers="1.1.1.1:9092", group_id = "boxer")#cp = PConsumers(bootstrap_servers="1.1.1.1:9092", topic="custom-event")#print(cp.get_count(topic="test"))print(cp.get_message(topic="test"))

Python kafka操作实例相关推荐

  1. python数据库操作实例

    本篇文章主要讲解python3.9.6下数据库的链接和查询数据的方法 前置环境需要安装mysql和json两个模块,引入方式为import 模块名,不懂的朋友可以先看<python小白操作入门教 ...

  2. Python邮件操作实例:发个邮件用Python实现远程自动关机

    一.邮件收发 用于接收和发送邮件的邮箱可任意指定(可以是同一邮箱),不过要想使用邮箱的自动收发功能,还需要生成特定的授权码,一般来说QQ.163等邮箱都支持此类操作.以QQ邮箱为例,可以在" ...

  3. python对文件的读操作方法有哪些-Python文件操作实例大全

    目录: 一.打开文件 二.文件对象的方法 三.文件的关闭 四.文件的读取和定位 五.文件的写入 六.课时28课后习题及答案 大多数程序都遵循着:输入->处理->输出的模型,首先接受输入数据 ...

  4. python多线程操作_python多线程操作实例

    一.python多线程 因为CPython的实现使用了Global Interpereter Lock(GIL),使得python中同一时刻只有一个线程在执行,从而简化了python解释器的实现,且p ...

  5. Python文件操作详解

    打开和关闭文件 open 函数 你必须先用Python内置的open()函数打开一个文件,创建一个file对象,相关的方法才可以调用它进行读写.语法: file object = open(file_ ...

  6. python字典操作添加_Python字典常见操作实例小结【定义、添加、删除、遍历】

    本文实例总结了python字典常见操作.分享给大家供大家参考,具体如下: 简单的字典: 字典就是键值对key-value组合. #字典 键值对组合 alien_0 ={'color':'green', ...

  7. python数值运算实例_Python矩阵常见运算操作实例总结

    本文实例讲述了Python矩阵常见运算操作.分享给大家供大家参考,具体如下: python的numpy库提供矩阵运算的功能,因此我们在需要矩阵运算的时候,需要导入numpy的包. 一.numpy的导入 ...

  8. python查询oracle数据库_python针对Oracle常见查询操作实例分析

    本文实例讲述了python针对Oracle常见查询操作.分享给大家供大家参考,具体如下: 1.子查询(难): 当进行查询的时候,发现需要的数据信息不明确,需要先通过另一个查询得到, 此查询称为子查询: ...

  9. python dataframe 列_python pandas库中DataFrame对行和列的操作实例讲解

    用pandas中的DataFrame时选取行或列: import numpy as np import pandas as pd from pandas import Sereis, DataFram ...

最新文章

  1. 如何才能识别市场趋势?[转]
  2. R语言可视化包ggplot2绘制分组回归线实战(Regression Line by Group)
  3. poj 1753 Flip Game dfs 技巧
  4. android 之图文混排+GridView
  5. 编程方法学18:多维数组收尾
  6. hadoop大数据--深入讲解hdfs源码
  7. 广西 启动计算机教案,广西版六年级下册信息技术教案.docx
  8. 美好生活水果新食尚消费趋势报告
  9. 二维高斯滤波器(gauss filter)的实现
  10. android数字提示错误,从服务器接收数据时出现Android错误
  11. 1千条数据平均分配给15人_母狗一胎生下15只小狗,差点破纪录,1年后再相聚的场景让人泪目...
  12. 前端进阶之路-利用Jenkins快速打造前端项目自动化工作流
  13. win7x64 连接oracle 客户端 vs 2010调试 提示“ORA-12154: TNS: 无法解析指定的连接标识符 ”
  14. 视频教程-路由技术(CCNA魔鬼训练营系列)-思科认证
  15. failed with status 128
  16. php excel加密,表格加密怎么加密
  17. 单元测试总结反思_单元考试反思总结
  18. 网站中的PV是什么意思?
  19. 【金猿产品展】云简业财——满足中大型企业个性化需求的报销、费控、预算、业财一体化管理平台...
  20. 熊猫人表情包python 代码,Python熊猫替换特殊字符

热门文章

  1. 心脏为什么长在左边?原来是因为这个消失的器官
  2. macOS 新功能:【控制中心】让你的 Mac 系统更方便!
  3. xjc java_xjc命令转换成java类乱码
  4. 程序员之常用软件安装过程记录
  5. 手把手带你入门Python爬虫(三、PyMySQL)
  6. vue2.0 路由不显示router-view
  7. Java笔记-Semaphore简单应用实例
  8. canvas笔记-二次贝塞尔曲线与三次贝塞尔曲线的用法
  9. Java笔记-多线程相关
  10. 设计模式工作笔记-UML和设计模式导论