python kafka收不到消息_python通过Pykafka库来连接kafka并收发消息
1.安装pykafka
pip install pykafka
2.下载安装
git clone https://github.com/Parsely/pykafka.git
然后将下载下来的pykafka文件夹下的pykafka文件(pykafka的库文件)放到/Library/Python/2.7/site-packages/路径下即可
3.假设你有至少一个卡夫卡实例在本地运行,你可以使用pykafka连接它。
consumer.py 消费者
#!/usr/bin/python
# -*- coding:utf-8 -*-
from pykafka import KafkaClient
#kafka默认端口为9092
client = KafkaClient(hosts='192.168.1.140:9092,192.168.1.141:9092,192.168.1.142:9092')#这里连接多个客户端
topic = client.topics['test_kafka_topic']
#从zookeeper消费,zookeeper的默认端口为2181
balanced_consumer = topic.get_balanced_consumer(
consumer_group='test_kafka_group',
auto_commit_enable=True, # 设置为False的时候不需要添加consumer_group,直接连接topic即可取到消息
zookeeper_connect='192.168.1.140:2181,192.168.1.141:2181,192.168.1.142:2181'#这里就是连接多个zk
)
for message in balanced_consumer:
# print message
if message is not None:
print message.offset, message.value#打印接收到的消息体的偏移个数和值
producer.py 生产者
#!/usr/bin/python
# -*- coding:utf-8 -*-
from pykafka import KafkaClient
client = KafkaClient(hosts ="192.168.1.140:9092,192.168.1.141:9092,192.168.1.142:9092") #可接受多个client
#查看所有的topic
client.topics
print client.topics
topic = client.topics['test_kafka_topic']#选择一个topic
message ="test message test message"
#当有了topic之后呢,可以创建一个producer,来发消息,生产kafka数据,通过字符串形式,
with topic.get_sync_producer() as producer:
producer.produce(message)
#The example above would produce to kafka synchronously -
#the call only returns after we have confirmation that the message made it to the cluster.
#以上的例子将产生kafka同步消息,这个调用仅仅在我们已经确认消息已经发送到集群之后
#但生产环境,为了达到高吞吐量,要采用异步的方式,通过delivery_reports =True来启用队列接口;
with topic.get_sync_producer() as producer:
producer.produce('test message',partition_key='{}'.)
producer=topic.get_producer()
producer.produce(message)
print message
python kafka收不到消息_python通过Pykafka库来连接kafka并收发消息相关推荐
- 怎么看rabbitmq的浏览器信息_没用过消息队列?一文带你体验RabbitMQ收发消息
人生终将是场单人旅途,孤独之前是迷茫,孤独过后是成长. 楔子 先给大家说声抱歉,最近一周都没有发文,有一些比较要紧重要的事需要处理. 今天正好得空,本来说准备写SpringIOC相关的东西,但是发现想 ...
- java连接rabbitmq_没用过消息队列?一文带你体验RabbitMQ收发消息
楔子 先给大家说声抱歉,最近一周都没有发文,有一些比较要紧重要的事需要处理. 今天正好得空,本来说准备写SpringIOC相关的东西,但是发现想要梳理一遍,还是需要很多时间,所以我打算慢慢写,先把MQ ...
- python撤销上一步操作_Python 神操作,还原已撤回的微信消息
项目环境 语言:Python3 编辑器:Pycharm 导包效果展示 以下截图显示的撤回消息类型依次是文字消息.微信自带表情.图片.语音.定位地图.名片.公众号文章.音乐.视频.有群里撤回的,也有个人 ...
- python 微信公众号回复图片_Python webpy微信公众号开发之 回复图文消息
新建图文回复模板reply_pictext.xml: $def with (toUser,fromUser,createTime,title1,description1,picurl1,url1) $ ...
- python中bin函数的用法_Python 3标准库用法--ascii()、bin()、breakpoint()函数
ascii(object) 就像函数 repr(),返回一个对象可打印的字符串,但是 repr() 返回的字符串中非 ASCII 编码的字符,会使用 \x.\u 和 \U 来转义.生成的字符串和 Py ...
- python中re的安装步骤_Python中requent库的安装与卸载【原创】
以管理员身份运行cmd,输入 pip install requests,点击回车键后,需要等待大概十几秒钟就可以安装成功了,出现如下界面 安装完成后进行安装测试,此时要先输入python,敲击回车,运 ...
- python接口测试之requests详解_Python接口测试-requests库
一.requests库 Requests 是用Python语言编写,基于 urllib,采用 Apache2 Licensed 开源协议的 HTTP 库.它比 urllib 更加方便,可以节约我们大量 ...
- python xlrd读取文件报错_python中xlrd库如何实现文件读取?
俗话说得好,技多不压身,虽然我们已经掌握了多种可以实现读取文件的方式,但是丝毫不影响我们要学会精益求精,他说学习文件读取的奥秘,况且,数据分析是十分重要的,一切的代码运行,总归都是要服务于数据,好啦, ...
- python处理一亿条数据_Python基础数据处理库
Numpy 简介 import numpy as np Numpy是应用Python进行科学计算的基础库.它的功能包括多维数组.基本线性代数.基本统计计算.随机模拟等.Numpy的核心功能是ndarr ...
最新文章
- JPA的Column注解总结
- 圆环,扇形控件基本算法一种实现 - 代码库 - CocoaChina_让移动开发更简单
- 代码检查规则:Python语言案例详解
- ElasticSearch5.3插件开发(二)获取集群健康信息
- java open course_关于开闭原则 JavaDiscountCourse 类的设计
- 虚拟化系列-VMware vSphere 5.1 虚拟机管理
- PCM(脉冲编码调制)、iLBC编解码、opus(声音编码格式)、VP8视频压缩格式、H.264数字视频压缩格式
- 那年学过的Java笔记三核心类库二
- AFNetworking 图片的本地缓存问题
- 巧妙的实现 CSS 斜线
- 自动生成xml报文_使用python如何给xml报文进行签名 signXML库
- ELDD Chapter 2..3 Linux Kernel Facilities
- DB9串口定义及含义
- Cesium 纹理贴图
- linux系统带界面,linux系统界面详情介绍
- mysql ndb 安装_mysql NDB的安装配置使用示例
- Linux C 函数指针应用---回调函数
- oracle之物理数据库结构概述(数据文件、重做日志文件,控制文件等各种数据库文件)
- filco蓝牙不好用_蓝牙党+精简布局键位的选择:Filco Minila Air青轴两个月使用体验...
- Windos测试IP和端口是否能访问
热门文章
- 睿智的目标检测38——TF2搭建Efficientdet目标检测平台(tensorflow2)
- 码云上传文件夹_码云上传本地文件夹,码云只能上传20个文件的突破方法
- winform的Textbox设置只读之后ForeColor无效的解决方法
- rtx2080ti和gtx1080ti 对比哪个好
- 支撑掩护式液压支架设计(论文+CAD图纸+翻译)
- 八卦一下今年的菲尔兹数学奖
- 华为USG统一安全边界网关的设计、演示、经验鉴证实评-卷A
- 三国杀技术支持(内含数学建模)
- pyw3 windows 启动_用pywin32实现windows模拟鼠标及键盘动作
- Group coordinator 192.169.0.16:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cau