#-*- coding: utf-8 -*-

"""""

使用kafka-Python 1.3.3模块

# pip install kafka==1.3.5

# pip install kafka-python==1.3.5"""

importsysimporttimeimportjsonfrom kafka importKafkaProducerfrom kafka importKafkaConsumerfrom kafka.errors importKafkaError

KAFAKA_HOST= "101.236.51.235"KAFAKA_PORT= 9092KAFAKA_TOPIC= "test"

classKafka_producer():"""""

生产模块:根据不同的key,区分消息"""

def __init__(self, kafkahost,kafkaport, kafkatopic, key):

self.kafkaHost=kafkahost

self.kafkaPort=kafkaport

self.kafkatopic=kafkatopic

self.key=keyprint("producer:h,p,t,k",kafkahost,kafkaport,kafkatopic,key)

bootstrap_servers= "{kafka_host}:{kafka_port}".format(

kafka_host=self.kafkaHost,

kafka_port=self.kafkaPort

)print("boot svr:",bootstrap_servers)

self.producer= KafkaProducer(bootstrap_servers =bootstrap_servers

)defsendjsondata(self, params):try:

parmas_message= json.dumps(params,ensure_ascii=False)

producer=self.producerprint(parmas_message)

v= parmas_message.encode("utf-8")

k= key.encode("utf-8")print("send msg:(k,v)",k,v)

producer.send(self.kafkatopic, key=k, value=v)

producer.flush()exceptKafkaError as e:print(e)classKafka_consumer():"""""

消费模块: 通过不同groupid消费topic里面的消息"""

def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):

self.kafkaHost=kafkahost

self.kafkaPort=kafkaport

self.kafkatopic=kafkatopic

self.groupid=groupid

self.key=key

self.consumer= KafkaConsumer(self.kafkatopic, group_id =self.groupid,

bootstrap_servers= "{kafka_host}:{kafka_port}".format(

kafka_host=self.kafkaHost,

kafka_port=self.kafkaPort )

)defconsume_data(self):try:for message inself.consumer:yieldmessageexceptKeyboardInterrupt as e:print(e)defmain(xtype, group, key):"""""

测试consumer和producer"""

if xtype == "p":#生产模块

producer =Kafka_producer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, key)print ("===========> producer:", producer)for _id in range(100):

params= "{"msg" : "%s"}" %str(_id)

params=[{"msg0" :_id},{"msg1":_id}]

producer.sendjsondata(params)

time.sleep(1)if xtype == "c":#消费模块

consumer =Kafka_consumer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, group)print ("===========> consumer:", consumer)

message=consumer.consume_data()for msg inmessage:print ("msg---------------->k,v", msg.key,msg.value)print ("offset---------------->", msg.offset)if __name__ == "__main__":

xtype= sys.argv[1]

group= sys.argv[2]

key= sys.argv[3]

main(xtype, group, key)

python连接kafka-python连接kafka生产者,消费者脚本相关推荐

  1. python下载模块命令_python kafka模块操作命令集合

    1.安装pykafka pip install pykafka 2.生产者 from pykafka import KafkaClient from pykafka import KafkaClien ...

  2. Python多线程实现生产者消费者模式

    什么是生产者消费者模式 在软件开发的过程中,经常碰到这样的场景: 某些模块负责生产数据,这些数据由其他模块来负责处理(此处的模块可能是:函数.线程.进程等).产生数据的模块称为生产者,而处理数据的模块 ...

  3. RabbitMQ 入门系列(3)— 生产者消费者 Python 代码实现

    生产者消费者代码示例 上一章节中对消息通信概念做了详细的说明,本章节我们对 RabbitMQ 生产者和消费者代码分别做一示例说明. 1. 生产者代码 #!/usr/bin/env python # c ...

  4. Python之路(第三十八篇) 并发编程:进程同步锁/互斥锁、信号量、事件、队列、生产者消费者模型...

    一.进程锁(同步锁/互斥锁) 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理. 例 ...

  5. kafka 怎么样连接图形化界面_从零开始搭建Kafka+SpringBoot分布式消息系统

    前言 由于kafka强依赖于zookeeper,所以需先搭建好zookeeper集群.由于zookeeper是由java编写的,需运行在jvm上,所以首先应具备java环境. (ps:默认您的cent ...

  6. python 异步 生产者 消费者_python 生产者消费者模式 - 刘江的python教程

    生产者消费者模式 阅读: 9884 评论:4 利用多线程和队列可以实现生产者消费者模式.该模式通过平衡生产线程和消费线程的工作能力来提高程序整体处理数据的速度. 什么是生产者和消费者? 在线程世界里, ...

  7. python进阶09并发之五生产者消费者

    原创博客地址:python进阶09并发之五生产者消费者 这也是实际项目中使用较多的一种并发模式,用Queue(JoinableQueue)实现,是Python中最常用的方式(这里的queue特指mul ...

  8. kafka python client:PyKafka vs kafka-python

    引用:https://github.com/Parsely/pykafka/issues/334 @emmett9001写道 @microamp Thanks, this is a great ide ...

  9. 最新Kafka教程(包含kafka部署与基本操作、java连接kafka、spring连接kafka以及使用springboot)

    最新Kafka教程(包含kafka部署与基本操作.java连接kafka.spring连接kafka以及使用springboot) 欢迎转载,转载请注明网址:https://blog.csdn.net ...

  10. python 全栈开发,Day39(进程同步控制(锁,信号量,事件),进程间通信(队列,生产者消费者模型))...

    昨日内容回顾 python中启动子进程 并发编程 并发 :多段程序看起来是同时运行的 ftp 网盘 不支持并发 socketserver 多进程 并发 异步 两个进程 分别做不同的事情 创建新进程 j ...

最新文章

  1. php isset()与empty()详解
  2. 系统安装操作优化:chapter5 安装驱动程序与检测电脑
  3. log4j2 logger_简单一致的Log4j2 Logger命名
  4. mysql中预定义常量_PHP预定义常量
  5. python网络爬虫系列(六)——数据提取 lxml模块
  6. mycat两个mysql实例的搭建_Mycat-多实例的搭建
  7. 连续函数matlab采样,基于 MATLAB 的时域信号采样及频谱分析(转)
  8. 基于Spring Security的认证方式_编程理解PasswordEncoder工作原理_Spring Security OAuth2.0认证授权---springcloud工作笔记125
  9. 如何使用macOS自带网络测速功能?
  10. SSD【目标检测篇】
  11. html万花筒相册旋转效果,jquery css3 3D万花筒图片相册展示特效
  12. Nginx: 104: Connection reset by peer 错误
  13. 用k-mer分析进行基因组调查:(一)基本原理
  14. mysql-高级命令(1)和一些函数(悟已往之不谏,知来者之可追)
  15. 相似剩余金额宝数值添加的动画
  16. systemctl 是管制服务的主要工具
  17. 超过10的带圆圈的自动项目编号
  18. [Vue]@keyup.enter不起作用
  19. 2021年大一下网页期末作业(纯html+css实现)
  20. 书店销售管理系统----数据库原理及应用综合实验

热门文章

  1. AV1,实时编码READY
  2. LiveVideoStack线上分享第四季(二):基于内容的自适应视频传输算法及其应用...
  3. 王盛:QUIC让B站在20%丢包时实现零卡顿
  4. 音视频技术开发周刊 83期
  5. 性能测试工具curl-loader二---测试分析
  6. 机器学习之线性回归 (Python SKLearn)
  7. 基于注解进行bean的装配
  8. 算法设计与分析(第三周)递归/迭代求Fibonacci前n项 【以及递归算法速度慢的原因】
  9. C语言 链表 头插法
  10. java安全编码指南之:锁的双重检测