python使用mq-fw包
文章目录
- 前言
- 一、mq-fw 是什么?
- 二、功能介绍
- 三、安装
- 四、示例
- 1. pulsar 生产
- 2.pulsar消费
- 3.pulsar服务端
- 4.pulsar调用端
- 5.rabbitmq生产
- 6.rabbitmq消费
- 7.rabbitmq服务端
- 8.rabbitmq调用端
- 9.pulsar与rabbitmq 互相调用
- 五、pulsar与rabbitmq 互相调用(进阶)
- 总结
前言
pulsar和 rabbitmq都是消息队列。本文介绍了mq-fw这个包,使用这个包可用很少的代码就能实现pulsar和rabbitmq使用
Pulsar是一个企业级分布式消息系统,最初由雅虎在2016年开源,目前由 Apache 软件基金会管理。Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。极低的发布延迟和端到端延迟。可无缝扩展到超过一百万个 topic。
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、 安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
一、mq-fw 是什么?
mq-fw 是一个消息队列的框架,包含pulsar和 rabbitmq的消费和生产,还包含pulsar和 rabbitmq互相调用。
示例代码地址:https://gitee.com/maxbanana/mq-fw-examples
二、功能介绍
使用mq-fw能快速实现下列功能:
- pulsar 生产
- pulsar 消费
- pulsar 服务端
- pulsar 调用端
- rabbitmq 生产
- rabbitmq 消费
- rabbitmq 服务端
- rabbitmq 调用端
- pulsar与rabbitmq 互相调用
注意:rabbitmq默认是 topic模式,不能选择其他模式
三、安装
pip install mq-fw
四、示例
1. pulsar 生产
代码如下(示例):
"""
生产消息
"""import pulsar_mq
import json# pulsar服务地址
pulsar_url = 'pulsar://0.0.0.0:6650'# 生产的topic
produce_topic = ''"""
1. 连接pulsar
"""
client = pulsar_mq.client(pulsar_url)"""
2. 创建生产者
"""
producer = client.create_producer(produce_topic)msg = {"data": {"a": "1", "b": "2"}}"""
3.发送消息
默认参数: _async=True, callback=None, random_topic=None
_async: 是否异步发送消息, True异步发送, Flase 同步发送
callback: 异步发送时的回调函数
random_topic: 随机topic"""
producer.send(json.dumps(msg))# 一次发多条消息
# producer.send([json.dumps(msg), json.dumps(msg2)])
2.pulsar消费
代码如下(示例):
"""
消费数据
"""
import pulsar_mq# pulsar服务地址
pulsar_url = 'pulsar://0.0.0.0:6650'# 消费者订阅的topic
consumer_topic = ''# 消费者的名字
consumer_name = ''"""
1. 连接pulsar
"""
client = pulsar_mq.client(pulsar_url)"""
2. 创建消费者
默认参数 schema=pulsar.schema.StringSchema(), consumer_type='Shared'
'Shared': 共享模式
'Exclusive': 独占模式
'Failover': 灾备模式
'KeyShared': 关键字共享模式
"""
consumer = client.create_consumer(consumer_topic, consumer_name)def task(msg):"""3. 回调函数处理接收的消息:param msg: 消费的消息:return:"""print(msg)"""
4.开始消费
一直监听进行消费
默认参数 thread_count=None, logger=None
若设置thread_count=5 程序将开启5个线程进行消费
logger 日志收集器
"""
consumer.receive(task)# 只消费一个就停止监听,关闭消费者
# consumer.receive_one(task)# 关闭消费者
# consumer.close()# 取消订阅,并关闭消费者
# consumer.unsubscribe()
3.pulsar服务端
代码如下(示例):
"""
服务端
先消费消息,再生产消息
"""import pulsar_mq
import jsonclient = pulsar_mq.client('pulsar://0.0.0.0:6650')
"""
默认使用随机队列模式
随机队列模式是:
消费的消息里带一个random_topic,
生产的消息网random_topic里发送当传入参数 producer_topic,使producer_topic不等于None时,
生产的消息往 producer_topic里发送
"""
service = client.consume_produce(consumer_topic='', consumer_name='')def task(msg):"""回调函数:param msg::return:"""msg = json.loads(msg)print(msg)random_topic = msg.get('random_topic')print(random_topic)import timetime.sleep(6)msg = {"data": {"a": "1", "b": "2"}}return json.dumps(msg)service.run(task)
4.pulsar调用端
代码如下(示例):
"""
调用端
先生产发送消息,然后消费消息
"""import pulsar_mq
import jsonclient = pulsar_mq.client('pulsar://0.0.0.0:6650')"""
默认使用随机队列模式
随机队列模式是:
当使用默认参数 consumer_topic=None, consumer_name=None时
pulsar_mq包自动生成一个random_topic,然后和消息一起发送出去,同时监听random_topic
服务端接收到消息后将本服务生产好的消息往random_topic发送。当传入参数 consumer_topic、consumer_name时,不使用随机队列模式
"""
service = client.produce_consume(producer_topic='')msg = {"data": {"a": "1", "b": "2"}}
msg2 = {"data": {"c": "3", "d": "4"}}msg_list = [json.dumps(msg), json.dumps(msg2)]data = service.run(msg_list)
print(data)
for d in data:print(d)# 只发送一个消息
# data = service.run(json.dumps(msg))
# print(data)
5.rabbitmq生产
代码如下(示例):
import rabbitmq
import json
"""
rabbitmq 使用topic模式,不能更改
"""host = '0.0.0.0'
port = 5672
username = ''
password = ''
exchange = ''
routing_key = ''# 连接rabbitmq
rq = rabbitmq.connect(host, port, username, password)
# 创建生产者
producer = rq.create_producer(exchange, routing_key)msg = {"data": {"a": "1", "b": "2"}}
msg2 = {"data": {"c": "3", "d": "4"}}msg_list = [json.dumps(msg), json.dumps(msg2)]# 发送数据
producer.send(msg_list)
6.rabbitmq消费
代码如下(示例):
import rabbitmq
"""
rabbitmq 默认使用topic模式,不能更改
"""host = '0.0.0.0'
port = 5672
username = ''
password = ''
exchange = ''
routing_key = ''# 连接rabbitmq
rq = rabbitmq.connect(host, port, username, password)# 创建消费者
consumer = rq.create_consumer(exchange, routing_key)def task(msg):import timetime.sleep(5)print("接收 {} 成功.......".format(msg))# 一直消费
consumer.receive(task)# consume_num = 1 ,只消费一次, consume_num = n : 消费 n 次就停止消费
# consumer.receive(task, consume_num=1)
7.rabbitmq服务端
代码如下(示例):
import json
import rabbitmq
import time
"""
rabbitmq 默认使用topic模式,不能更改
该示例是 rabbitmq 服务端,先消费在生产
"""host = '0.0.0.0'
port = 5672
username = ''
password = ''consumer_exchange = ''
consumer_routing_key = ''
producer_exchange = consumer_exchange
producer_routing_key = ''connect = rabbitmq.connect(host, port, username, password)# 使用服务端, 推荐使用随机模式
# 随机模式:生产时使用random_exchange, random_routing_key
service = connect.consume_produce(consumer_exchange, consumer_routing_key, durable=True)# 生产时 使用固定exchange, routing_key
# service = connect.consume_produce(consumer_exchange, consumer_routing_key,
# producer_exchange, producer_routing_key, durable=True)def task(body):print(body)time.sleep(5)return [json.dumps({'result': body})]# 运行服务端
service.run(task, thread_count=2)
8.rabbitmq调用端
代码如下(示例):
import rabbitmq
import json
"""
rabbitmq 默认使用topic模式,不能更改
该示例是 rabbitmq 调用端,先生产在消费
"""host = '0.0.0.0'
port = 5672
username = ''
password = ''consumer_exchange = ''
consumer_routing_key = ''
producer_exchange = consumer_exchange
producer_routing_key = ''connect = rabbitmq.connect(host, port, username, password)# 使用调用端, 推荐使用随机模式
# 随机模式:消费时 使用random_exchange, random_routing_key
service = connect.produce_consume(producer_exchange, producer_routing_key, durable=True)# 消费时 使用固定exchange, routing_key
# service = connect.produce_consume(producer_exchange, producer_routing_key, consumer_exchange, consumer_routing_key,
# durable=True)msg_list = []
for i in range(5):msg_list.append(json.dumps({'data': i}))# 运行调用端
result = service.run(msg_list, thread_count=2)
for i in result:print(i)
9.pulsar与rabbitmq 互相调用
代码如下(示例):
from loguru import logger
import RabbitmqPulsar
"""
Rabbitmq 与 Pulsar 互相连接
Rabbitmq 消费的消息发送至 pulsar
pulsar 消费的消息发送至 Rabbitmq
"""# pulsar 配置
pulsar_url = ''
producer_topic = ''
consumer_topic = ''
consumer_name = ''# rabbitmq 配置
host = ''
port = 5672
username = ''
password = ''
rb_mq_send_ex = ''
rb_mq_send_key = ''
rb_mq_cons_ex = ''
rb_mq_cons_key = ''connect = RabbitmqPulsar.connect(host, port, username, password, pulsar_url)"""
1.默认模式(消息从rabbitmq开始流转):inter_services() 里 默认参数是 start_with_rabbitmq=True, random_queue=True
1). 从 rabbitmq 订阅,将数据发送至 pulsar; 再从 pulsar 订阅,将数据发送至 rabbitmq
2). 使用使用随机队列来生产消息
3). 消息数据流向: rabbitmq调用端 --> 本互联服务(rabbitmq服务端,pulsar调用端)--> pulsar服务端
"""
# 使用默认模式
service = connect.inter_services()# 运行服务
service.run(producer_topic, rb_mq_cons_ex, rb_mq_cons_key, durable=True, thread_count=5, logger=logger)"""
2.消息从pulsar开始流转模式:inter_services() 里参数是 start_with_rabbitmq=False, random_queue=True
1). 从 pulsar 订阅,将数据发送至 rabbitmq; 再从 rabbitmq 订阅,将数据发送至 pulsar
2). random_queue=True, 使用使用随机队列来生产消息
3). 消息数据流向: pulsar调用端 --> 本互联服务(pulsar服务端,rabbitmq调用端)--> rabbitmq服务端
"""
# 使用从pulsar开始流转模式
# service = connect.inter_services(start_with_rabbitmq=False)# 运行服务
# service.run(consumer_topic, consumer_name,
# rb_mq_send_ex, rb_mq_send_key, durable=True, thread_count=5, logger=logger)"""
3.通用模式:inter_services() 里 参数是 random_queue=False
1). 从 pulsar 订阅,将数据发送至 rabbitmq; 再从 rabbitmq 订阅,将数据发送至 pulsar
2). random_queue=False, 不使用使用随机队列来生产消息
3). 消息数据流向, 两条独立的数据流同时运行1. pulsar生产 --> 本互联服务(pulsar消费,rabbitmq生产)--> rabbitmq消费2. rabbitmq生产 --> 本互联服务(rabbitmq消费,pulsar生产)--> pulsar消费
"""
# 使用从pulsar开始流转模式
# service = connect.inter_services(random_queue=False)# 运行服务
# service.run(producer_topic, consumer_topic, consumer_name,
# rb_mq_send_ex, rb_mq_send_key, rb_mq_cons_ex, rb_mq_cons_key, durable=True, logger=logger)
五、pulsar与rabbitmq 互相调用(进阶)
上一步(9.pulsar与rabbitmq 互相调用)中的示例是将rabbitmq的消息原封不动的发送到pulsar,也会将pulsar的消息原封不动的发送到rabbitmq。假如想对rabbitmq 或 pulsar消息进行处理该怎么办?
可以在service.run里添加参数rabbitmq_task和pulsar_task来实现
代码如下(示例):
"""
进阶使用
Rabbitmq 与 Pulsar 互相连接
"""from loguru import logger
import RabbitmqPulsar# pulsar 配置
pulsar_url = ''
producer_topic = ''
consumer_topic = ''
consumer_name = ''# rabbitmq 配置
host = ''
port = 5672
username = ''
password = ''
rb_mq_send_ex = ''
rb_mq_send_key = ''
rb_mq_cons_ex = ''
rb_mq_cons_key = ''connect = RabbitmqPulsar.connect(host, port, username, password, pulsar_url)# 使用默认模式(消息从rabbitmq开始流转)
service = connect.inter_services()def rabbitmq_task(msg):"""处理消费的rabbitmq消息,返回结果将发送至 pulsar:param msg: rabbitmq消息:return:"""print('rabbitmq消息:', msg)return msgdef pulsar_task(msg):"""处理消费的pulsar消息,返回结果将发送至 rabbitmq:param msg: pulsar消息:return:"""print('pulsar消息:', msg)return [msg]# 运行服务
service.run(producer_topic, rb_mq_cons_ex, rb_mq_cons_key, rabbitmq_task=rabbitmq_task, pulsar_task=pulsar_task,durable=True, thread_count=5, logger=logger)
总结
本文简单介绍了mq-fw的使用,欢迎留言交流学习,有不足之处还望指正,感谢!!
python使用mq-fw包相关推荐
- python中使用squarify包可视化treemap图:treemap将分层数据显示为一组嵌套矩形,每一组都用一个矩形表示,该矩形的面积与其值成正比
python中使用squarify包可视化treemap图:treemap将分层数据显示为一组嵌套矩形,每一组都用一个矩形表示,该矩形的面积与其值成正比 目录
- python中使用squarify包可视化treemap图:treemap将分层数据显示为一组嵌套矩形,每一组都用一个矩形表示,该矩形的面积与其值成正比、自定义设置每一个数据格的颜色
python中使用squarify包可视化treemap图:treemap将分层数据显示为一组嵌套矩形,每一组都用一个矩形表示,该矩形的面积与其值成正比.自定义设置每一个数据格的颜色 目录
- python中使用squarify包可视化treemap图:使用treemap图可视化个人或者集体的股票、基金的持仓结构(treemap with squarify package)
python中使用squarify包可视化treemap图:使用treemap图可视化个人或者集体的股票.基金的持仓结构(treemap with squarify package) 目录
- python入门须知:包、模块、库的含义以及导入以及包下__init__.py的作用
概念 包:(Package) 包:是一个有层级的目录结构,包含n个模块或者n个子包,包中一定要有__init__.py文件,所以包只是一个组织方式,更加有条理,并不是必须的. 模块:(Module) ...
- python恶搞表情包-Python自动生成表情包,python在手,从此斗图无敌手
作为一个数据分析师,应该信奉一句话----"一图胜千言".不过这里要说的并不是数据可视化,而是一款全民向的产品形态----表情包!!!! 表情包不仅仅是一种符号,更是一种文化:是促 ...
- python怎么导入包-Python模块导入与包构建最佳实践
[TOC] 最开始写程序的时候,都是一个文件里输入几行源码(python 的一个 web 框架bottle就特别强调自己是单文件框架).随着程程式变大变复杂,一个文件很难承载如此多的功能,因此将代码拆 ...
- python哪里下载import包-【Python实战】模块和包导入详解(import)
1.模块(module) 1.1 模块定义 通常模块为一个.py文件,其他可作为module的文件类型还有".pyo".".pyc".".pyd&qu ...
- python 模块(Module)和包
阿里云大学人工智能学前小测验-Python测验 19.以下关于模块说法正确的是 A. 一个.py就是一个模块 B. 任何一个普通的xx.py文件可以作为模块导入 C. 模块文件的扩展名一定是 .py ...
- Python编程语言学习:包导入和模块搜索路径简介、使用方法之详细攻略
Python编程语言学习:包导入和模块搜索路径简介.使用方法之详细攻略 目录 包导入和模块搜索路径简介 1.Pyhon搜索模块路径的机制 2.自定义配置搜索路径
- python 聚类算法包_Python聚类算法之DBSACN实例分析 python怎么用sklearn包进行聚类
python 怎么可视化聚类的结果 science 发表的聚类算法的python代码 测试数据长什...说明你的样本数据中有nan值,通常是因为原始数据中包含空字符串或None值引起的. 解决办法是把 ...
最新文章
- Java 分布式 RPC 框架性能大比拼,Dubbo 排第几?
- SAP CRM RDS快速部署解决方案
- linux下kegg注释软件,KEGG数据中全部代谢反应和代谢物注释信息的下载
- 自定义按键_王者荣耀:自定义按键让你的百里守约百发百中
- Android 获取app 地址,获取手机设备信息、app版本信息、ip地址
- Python画图库Turtle库详解篇
- 上科大提出:对抗神经网络 动态人像系统SofGAN!
- 【王道计组笔记】高速缓存器:局部性原理及性能分析
- PBR理论基础2:光照、材质与微面元理论
- python列表嵌套字典取值_Python学习100天-Day03(字符串、列表、字典、元组)
- Turbo码编码举例计算
- 怎么在linux上网络功能,Linux系统如何通过手机GPRS功能无线上网
- python随机密码生成以整数17为随机数种子_简述pythonpytorch 随机种子的实现
- uniapp中,H5端使用html2canvas生成海报
- 怎么制作游戏脚本_怎么剪游戏视频?五步教你制作绝地求生击杀合集
- 文件cpy改进,文件加密,对文件两次运算可解密,密码65
- 在家中搭建网站服务器可行吗?
- Swift将改变一切
- mysql 升级系列 楔子
- PAT_乙级 1016 部分A+B(15)