1.安装pykafka

pip install pykafka

2.下载安装

git clone https://github.com/Parsely/pykafka.git

然后将下载下来的pykafka文件夹下的pykafka文件(pykafka的库文件)放到/Library/Python/2.7/site-packages/路径下即可

3.假设你有至少一个卡夫卡实例在本地运行,你可以使用pykafka连接它。

consumer.py 消费者

#!/usr/bin/python

# -*- coding:utf-8 -*-

from pykafka import KafkaClient

#kafka默认端口为9092

client = KafkaClient(hosts='192.168.1.140:9092,192.168.1.141:9092,192.168.1.142:9092')#这里连接多个客户端

topic = client.topics['test_kafka_topic']

#从zookeeper消费,zookeeper的默认端口为2181

balanced_consumer = topic.get_balanced_consumer(

consumer_group='test_kafka_group',

auto_commit_enable=True, # 设置为False的时候不需要添加consumer_group,直接连接topic即可取到消息

zookeeper_connect='192.168.1.140:2181,192.168.1.141:2181,192.168.1.142:2181'#这里就是连接多个zk

)

for message in balanced_consumer:

# print message

if message is not None:

print message.offset, message.value#打印接收到的消息体的偏移个数和值

producer.py 生产者

#!/usr/bin/python

# -*- coding:utf-8 -*-

from pykafka import KafkaClient

client = KafkaClient(hosts ="192.168.1.140:9092,192.168.1.141:9092,192.168.1.142:9092") #可接受多个client

#查看所有的topic

client.topics

print client.topics

topic = client.topics['test_kafka_topic']#选择一个topic

message ="test message test message"

#当有了topic之后呢,可以创建一个producer,来发消息,生产kafka数据,通过字符串形式,

with topic.get_sync_producer() as producer:

producer.produce(message)

#The example above would produce to kafka synchronously -

#the call only returns after we have confirmation that the message made it to the cluster.

#以上的例子将产生kafka同步消息,这个调用仅仅在我们已经确认消息已经发送到集群之后

#但生产环境,为了达到高吞吐量,要采用异步的方式,通过delivery_reports =True来启用队列接口;

with topic.get_sync_producer() as producer:

producer.produce('test message',partition_key='{}'.)

producer=topic.get_producer()

producer.produce(message)

print message

python kafka收不到消息_python通过Pykafka库来连接kafka并收发消息相关推荐

  1. 怎么看rabbitmq的浏览器信息_没用过消息队列?一文带你体验RabbitMQ收发消息

    人生终将是场单人旅途,孤独之前是迷茫,孤独过后是成长. 楔子 先给大家说声抱歉,最近一周都没有发文,有一些比较要紧重要的事需要处理. 今天正好得空,本来说准备写SpringIOC相关的东西,但是发现想 ...

  2. java连接rabbitmq_没用过消息队列?一文带你体验RabbitMQ收发消息

    楔子 先给大家说声抱歉,最近一周都没有发文,有一些比较要紧重要的事需要处理. 今天正好得空,本来说准备写SpringIOC相关的东西,但是发现想要梳理一遍,还是需要很多时间,所以我打算慢慢写,先把MQ ...

  3. python撤销上一步操作_Python 神操作,还原已撤回的微信消息

    项目环境 语言:Python3 编辑器:Pycharm 导包效果展示 以下截图显示的撤回消息类型依次是文字消息.微信自带表情.图片.语音.定位地图.名片.公众号文章.音乐.视频.有群里撤回的,也有个人 ...

  4. python 微信公众号回复图片_Python webpy微信公众号开发之 回复图文消息

    新建图文回复模板reply_pictext.xml: $def with (toUser,fromUser,createTime,title1,description1,picurl1,url1) $ ...

  5. python中bin函数的用法_Python 3标准库用法--ascii()、bin()、breakpoint()函数

    ascii(object) 就像函数 repr(),返回一个对象可打印的字符串,但是 repr() 返回的字符串中非 ASCII 编码的字符,会使用 \x.\u 和 \U 来转义.生成的字符串和 Py ...

  6. python中re的安装步骤_Python中requent库的安装与卸载【原创】

    以管理员身份运行cmd,输入 pip install requests,点击回车键后,需要等待大概十几秒钟就可以安装成功了,出现如下界面 安装完成后进行安装测试,此时要先输入python,敲击回车,运 ...

  7. python接口测试之requests详解_Python接口测试-requests库

    一.requests库 Requests 是用Python语言编写,基于 urllib,采用 Apache2 Licensed 开源协议的 HTTP 库.它比 urllib 更加方便,可以节约我们大量 ...

  8. python xlrd读取文件报错_python中xlrd库如何实现文件读取?

    俗话说得好,技多不压身,虽然我们已经掌握了多种可以实现读取文件的方式,但是丝毫不影响我们要学会精益求精,他说学习文件读取的奥秘,况且,数据分析是十分重要的,一切的代码运行,总归都是要服务于数据,好啦, ...

  9. python处理一亿条数据_Python基础数据处理库

    Numpy 简介 import numpy as np Numpy是应用Python进行科学计算的基础库.它的功能包括多维数组.基本线性代数.基本统计计算.随机模拟等.Numpy的核心功能是ndarr ...

最新文章

  1. JPA的Column注解总结
  2. 圆环,扇形控件基本算法一种实现 - 代码库 - CocoaChina_让移动开发更简单
  3. 代码检查规则:Python语言案例详解
  4. ElasticSearch5.3插件开发(二)获取集群健康信息
  5. java open course_关于开闭原则 JavaDiscountCourse 类的设计
  6. 虚拟化系列-VMware vSphere 5.1 虚拟机管理
  7. PCM(脉冲编码调制)、iLBC编解码、opus(声音编码格式)、VP8视频压缩格式、H.264数字视频压缩格式
  8. 那年学过的Java笔记三核心类库二
  9. AFNetworking 图片的本地缓存问题
  10. 巧妙的实现 CSS 斜线
  11. 自动生成xml报文_使用python如何给xml报文进行签名 signXML库
  12. ELDD Chapter 2..3 Linux Kernel Facilities
  13. DB9串口定义及含义
  14. Cesium 纹理贴图
  15. linux系统带界面,linux系统界面详情介绍
  16. mysql ndb 安装_mysql NDB的安装配置使用示例
  17. Linux C 函数指针应用---回调函数
  18. oracle之物理数据库结构概述(数据文件、重做日志文件,控制文件等各种数据库文件)
  19. filco蓝牙不好用_蓝牙党+精简布局键位的选择:Filco Minila Air青轴两个月使用体验...
  20. Windos测试IP和端口是否能访问

热门文章

  1. 睿智的目标检测38——TF2搭建Efficientdet目标检测平台(tensorflow2)
  2. 码云上传文件夹_码云上传本地文件夹,码云只能上传20个文件的突破方法
  3. winform的Textbox设置只读之后ForeColor无效的解决方法
  4. rtx2080ti和gtx1080ti 对比哪个好
  5. 支撑掩护式液压支架设计(论文+CAD图纸+翻译)
  6. 八卦一下今年的菲尔兹数学奖
  7. 华为USG统一安全边界网关的设计、演示、经验鉴证实评-卷A
  8. 三国杀技术支持(内含数学建模)
  9. pyw3 windows 启动_用pywin32实现windows模拟鼠标及键盘动作
  10. Group coordinator 192.169.0.16:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cau