文章目录

  • 前言
  • 一、mq-fw 是什么?
  • 二、功能介绍
  • 三、安装
  • 四、示例
    • 1. pulsar 生产
    • 2.pulsar消费
    • 3.pulsar服务端
    • 4.pulsar调用端
    • 5.rabbitmq生产
    • 6.rabbitmq消费
    • 7.rabbitmq服务端
    • 8.rabbitmq调用端
    • 9.pulsar与rabbitmq 互相调用
  • 五、pulsar与rabbitmq 互相调用(进阶)
  • 总结

前言

pulsar和 rabbitmq都是消息队列。本文介绍了mq-fw这个包,使用这个包可用很少的代码就能实现pulsar和rabbitmq使用
Pulsar是一个企业级分布式消息系统,最初由雅虎在2016年开源,目前由 Apache 软件基金会管理。Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。极低的发布延迟和端到端延迟。可无缝扩展到超过一百万个 topic。
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、 安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

一、mq-fw 是什么?

mq-fw 是一个消息队列的框架,包含pulsar和 rabbitmq的消费和生产,还包含pulsar和 rabbitmq互相调用。
示例代码地址:https://gitee.com/maxbanana/mq-fw-examples

二、功能介绍

使用mq-fw能快速实现下列功能:

  1. pulsar 生产
  2. pulsar 消费
  3. pulsar 服务端
  4. pulsar 调用端
  5. rabbitmq 生产
  6. rabbitmq 消费
  7. rabbitmq 服务端
  8. rabbitmq 调用端
  9. pulsar与rabbitmq 互相调用

注意:rabbitmq默认是 topic模式,不能选择其他模式

三、安装

pip install mq-fw

四、示例

1. pulsar 生产

代码如下(示例):

"""
生产消息
"""import pulsar_mq
import json# pulsar服务地址
pulsar_url = 'pulsar://0.0.0.0:6650'# 生产的topic
produce_topic = ''"""
1. 连接pulsar
"""
client = pulsar_mq.client(pulsar_url)"""
2. 创建生产者
"""
producer = client.create_producer(produce_topic)msg = {"data": {"a": "1", "b": "2"}}"""
3.发送消息
默认参数: _async=True, callback=None, random_topic=None
_async: 是否异步发送消息, True异步发送, Flase 同步发送
callback: 异步发送时的回调函数
random_topic: 随机topic"""
producer.send(json.dumps(msg))# 一次发多条消息
# producer.send([json.dumps(msg), json.dumps(msg2)])

2.pulsar消费

代码如下(示例):

"""
消费数据
"""
import pulsar_mq# pulsar服务地址
pulsar_url = 'pulsar://0.0.0.0:6650'# 消费者订阅的topic
consumer_topic = ''# 消费者的名字
consumer_name = ''"""
1. 连接pulsar
"""
client = pulsar_mq.client(pulsar_url)"""
2. 创建消费者
默认参数 schema=pulsar.schema.StringSchema(), consumer_type='Shared'
'Shared': 共享模式
'Exclusive': 独占模式
'Failover': 灾备模式
'KeyShared': 关键字共享模式
"""
consumer = client.create_consumer(consumer_topic, consumer_name)def task(msg):"""3. 回调函数处理接收的消息:param msg:  消费的消息:return:"""print(msg)"""
4.开始消费
一直监听进行消费
默认参数 thread_count=None, logger=None
若设置thread_count=5 程序将开启5个线程进行消费
logger 日志收集器
"""
consumer.receive(task)# 只消费一个就停止监听,关闭消费者
# consumer.receive_one(task)# 关闭消费者
# consumer.close()# 取消订阅,并关闭消费者
# consumer.unsubscribe()

3.pulsar服务端

代码如下(示例):

"""
服务端
先消费消息,再生产消息
"""import pulsar_mq
import jsonclient = pulsar_mq.client('pulsar://0.0.0.0:6650')
"""
默认使用随机队列模式
随机队列模式是:
消费的消息里带一个random_topic,
生产的消息网random_topic里发送当传入参数 producer_topic,使producer_topic不等于None时,
生产的消息往 producer_topic里发送
"""
service = client.consume_produce(consumer_topic='', consumer_name='')def task(msg):"""回调函数:param msg::return:"""msg = json.loads(msg)print(msg)random_topic = msg.get('random_topic')print(random_topic)import timetime.sleep(6)msg = {"data": {"a": "1", "b": "2"}}return json.dumps(msg)service.run(task)

4.pulsar调用端

代码如下(示例):

"""
调用端
先生产发送消息,然后消费消息
"""import pulsar_mq
import jsonclient = pulsar_mq.client('pulsar://0.0.0.0:6650')"""
默认使用随机队列模式
随机队列模式是:
当使用默认参数 consumer_topic=None, consumer_name=None时
pulsar_mq包自动生成一个random_topic,然后和消息一起发送出去,同时监听random_topic
服务端接收到消息后将本服务生产好的消息往random_topic发送。当传入参数 consumer_topic、consumer_name时,不使用随机队列模式
"""
service = client.produce_consume(producer_topic='')msg = {"data": {"a": "1", "b": "2"}}
msg2 = {"data": {"c": "3", "d": "4"}}msg_list = [json.dumps(msg), json.dumps(msg2)]data = service.run(msg_list)
print(data)
for d in data:print(d)# 只发送一个消息
# data = service.run(json.dumps(msg))
# print(data)

5.rabbitmq生产

代码如下(示例):

import rabbitmq
import json
"""
rabbitmq 使用topic模式,不能更改
"""host = '0.0.0.0'
port = 5672
username = ''
password = ''
exchange = ''
routing_key = ''# 连接rabbitmq
rq = rabbitmq.connect(host, port, username, password)
# 创建生产者
producer = rq.create_producer(exchange, routing_key)msg = {"data": {"a": "1", "b": "2"}}
msg2 = {"data": {"c": "3", "d": "4"}}msg_list = [json.dumps(msg), json.dumps(msg2)]# 发送数据
producer.send(msg_list)

6.rabbitmq消费

代码如下(示例):

import rabbitmq
"""
rabbitmq 默认使用topic模式,不能更改
"""host = '0.0.0.0'
port = 5672
username = ''
password = ''
exchange = ''
routing_key = ''# 连接rabbitmq
rq = rabbitmq.connect(host, port, username, password)# 创建消费者
consumer = rq.create_consumer(exchange, routing_key)def task(msg):import timetime.sleep(5)print("接收 {} 成功.......".format(msg))# 一直消费
consumer.receive(task)# consume_num = 1 ,只消费一次, consume_num = n : 消费 n 次就停止消费
# consumer.receive(task, consume_num=1)

7.rabbitmq服务端

代码如下(示例):

import json
import rabbitmq
import time
"""
rabbitmq 默认使用topic模式,不能更改
该示例是 rabbitmq 服务端,先消费在生产
"""host = '0.0.0.0'
port = 5672
username = ''
password = ''consumer_exchange = ''
consumer_routing_key = ''
producer_exchange = consumer_exchange
producer_routing_key = ''connect = rabbitmq.connect(host, port, username, password)# 使用服务端, 推荐使用随机模式
# 随机模式:生产时使用random_exchange, random_routing_key
service = connect.consume_produce(consumer_exchange, consumer_routing_key, durable=True)# 生产时 使用固定exchange, routing_key
# service = connect.consume_produce(consumer_exchange, consumer_routing_key,
#                                   producer_exchange, producer_routing_key, durable=True)def task(body):print(body)time.sleep(5)return [json.dumps({'result': body})]# 运行服务端
service.run(task, thread_count=2)

8.rabbitmq调用端

代码如下(示例):

import rabbitmq
import json
"""
rabbitmq 默认使用topic模式,不能更改
该示例是 rabbitmq 调用端,先生产在消费
"""host = '0.0.0.0'
port = 5672
username = ''
password = ''consumer_exchange = ''
consumer_routing_key = ''
producer_exchange = consumer_exchange
producer_routing_key = ''connect = rabbitmq.connect(host, port, username, password)# 使用调用端, 推荐使用随机模式
# 随机模式:消费时 使用random_exchange, random_routing_key
service = connect.produce_consume(producer_exchange, producer_routing_key, durable=True)# 消费时 使用固定exchange, routing_key
# service = connect.produce_consume(producer_exchange, producer_routing_key, consumer_exchange, consumer_routing_key,
#                                   durable=True)msg_list = []
for i in range(5):msg_list.append(json.dumps({'data': i}))# 运行调用端
result = service.run(msg_list, thread_count=2)
for i in result:print(i)

9.pulsar与rabbitmq 互相调用

代码如下(示例):

from loguru import logger
import RabbitmqPulsar
"""
Rabbitmq 与 Pulsar 互相连接
Rabbitmq 消费的消息发送至 pulsar
pulsar 消费的消息发送至 Rabbitmq
"""# pulsar 配置
pulsar_url = ''
producer_topic = ''
consumer_topic = ''
consumer_name = ''# rabbitmq 配置
host = ''
port = 5672
username = ''
password = ''
rb_mq_send_ex = ''
rb_mq_send_key = ''
rb_mq_cons_ex = ''
rb_mq_cons_key = ''connect = RabbitmqPulsar.connect(host, port, username, password, pulsar_url)"""
1.默认模式(消息从rabbitmq开始流转):inter_services() 里 默认参数是 start_with_rabbitmq=True, random_queue=True
1). 从 rabbitmq 订阅,将数据发送至 pulsar; 再从 pulsar 订阅,将数据发送至 rabbitmq
2). 使用使用随机队列来生产消息
3). 消息数据流向: rabbitmq调用端 --> 本互联服务(rabbitmq服务端,pulsar调用端)--> pulsar服务端
"""
# 使用默认模式
service = connect.inter_services()# 运行服务
service.run(producer_topic, rb_mq_cons_ex, rb_mq_cons_key, durable=True, thread_count=5, logger=logger)"""
2.消息从pulsar开始流转模式:inter_services() 里参数是 start_with_rabbitmq=False, random_queue=True
1). 从 pulsar 订阅,将数据发送至 rabbitmq; 再从 rabbitmq 订阅,将数据发送至 pulsar
2). random_queue=True, 使用使用随机队列来生产消息
3). 消息数据流向: pulsar调用端 --> 本互联服务(pulsar服务端,rabbitmq调用端)--> rabbitmq服务端
"""
# 使用从pulsar开始流转模式
# service = connect.inter_services(start_with_rabbitmq=False)# 运行服务
# service.run(consumer_topic, consumer_name,
#             rb_mq_send_ex, rb_mq_send_key, durable=True, thread_count=5, logger=logger)"""
3.通用模式:inter_services() 里 参数是 random_queue=False
1). 从 pulsar 订阅,将数据发送至 rabbitmq; 再从 rabbitmq 订阅,将数据发送至 pulsar
2). random_queue=False, 不使用使用随机队列来生产消息
3). 消息数据流向, 两条独立的数据流同时运行1. pulsar生产 --> 本互联服务(pulsar消费,rabbitmq生产)--> rabbitmq消费2. rabbitmq生产 --> 本互联服务(rabbitmq消费,pulsar生产)--> pulsar消费
"""
# 使用从pulsar开始流转模式
# service = connect.inter_services(random_queue=False)# 运行服务
# service.run(producer_topic, consumer_topic, consumer_name,
#             rb_mq_send_ex, rb_mq_send_key, rb_mq_cons_ex, rb_mq_cons_key, durable=True, logger=logger)

五、pulsar与rabbitmq 互相调用(进阶)

上一步(9.pulsar与rabbitmq 互相调用)中的示例是将rabbitmq的消息原封不动的发送到pulsar,也会将pulsar的消息原封不动的发送到rabbitmq。假如想对rabbitmq 或 pulsar消息进行处理该怎么办?
可以在service.run里添加参数rabbitmq_task和pulsar_task来实现
代码如下(示例):

"""
进阶使用
Rabbitmq 与 Pulsar 互相连接
"""from loguru import logger
import RabbitmqPulsar# pulsar 配置
pulsar_url = ''
producer_topic = ''
consumer_topic = ''
consumer_name = ''# rabbitmq 配置
host = ''
port = 5672
username = ''
password = ''
rb_mq_send_ex = ''
rb_mq_send_key = ''
rb_mq_cons_ex = ''
rb_mq_cons_key = ''connect = RabbitmqPulsar.connect(host, port, username, password, pulsar_url)# 使用默认模式(消息从rabbitmq开始流转)
service = connect.inter_services()def rabbitmq_task(msg):"""处理消费的rabbitmq消息,返回结果将发送至 pulsar:param msg: rabbitmq消息:return:"""print('rabbitmq消息:', msg)return msgdef pulsar_task(msg):"""处理消费的pulsar消息,返回结果将发送至 rabbitmq:param msg: pulsar消息:return:"""print('pulsar消息:', msg)return [msg]# 运行服务
service.run(producer_topic, rb_mq_cons_ex, rb_mq_cons_key, rabbitmq_task=rabbitmq_task, pulsar_task=pulsar_task,durable=True, thread_count=5, logger=logger)

总结

本文简单介绍了mq-fw的使用,欢迎留言交流学习,有不足之处还望指正,感谢!!

python使用mq-fw包相关推荐

  1. python中使用squarify包可视化treemap图:treemap将分层数据显示为一组嵌套矩形,每一组都用一个矩形表示,该矩形的面积与其值成正比

    python中使用squarify包可视化treemap图:treemap将分层数据显示为一组嵌套矩形,每一组都用一个矩形表示,该矩形的面积与其值成正比 目录

  2. python中使用squarify包可视化treemap图:treemap将分层数据显示为一组嵌套矩形,每一组都用一个矩形表示,该矩形的面积与其值成正比、自定义设置每一个数据格的颜色

    python中使用squarify包可视化treemap图:treemap将分层数据显示为一组嵌套矩形,每一组都用一个矩形表示,该矩形的面积与其值成正比.自定义设置每一个数据格的颜色 目录

  3. python中使用squarify包可视化treemap图:使用treemap图可视化个人或者集体的股票、基金的持仓结构(treemap with squarify package)

    python中使用squarify包可视化treemap图:使用treemap图可视化个人或者集体的股票.基金的持仓结构(treemap with squarify package) 目录

  4. python入门须知:包、模块、库的含义以及导入以及包下__init__.py的作用

    概念 包:(Package) 包:是一个有层级的目录结构,包含n个模块或者n个子包,包中一定要有__init__.py文件,所以包只是一个组织方式,更加有条理,并不是必须的. 模块:(Module) ...

  5. python恶搞表情包-Python自动生成表情包,python在手,从此斗图无敌手

    作为一个数据分析师,应该信奉一句话----"一图胜千言".不过这里要说的并不是数据可视化,而是一款全民向的产品形态----表情包!!!! 表情包不仅仅是一种符号,更是一种文化:是促 ...

  6. python怎么导入包-Python模块导入与包构建最佳实践

    [TOC] 最开始写程序的时候,都是一个文件里输入几行源码(python 的一个 web 框架bottle就特别强调自己是单文件框架).随着程程式变大变复杂,一个文件很难承载如此多的功能,因此将代码拆 ...

  7. python哪里下载import包-【Python实战】模块和包导入详解(import)

    1.模块(module) 1.1 模块定义 通常模块为一个.py文件,其他可作为module的文件类型还有".pyo".".pyc".".pyd&qu ...

  8. python 模块(Module)和包

    阿里云大学人工智能学前小测验-Python测验 19.以下关于模块说法正确的是 A. 一个.py就是一个模块 B. 任何一个普通的xx.py文件可以作为模块导入 C. 模块文件的扩展名一定是 .py ...

  9. Python编程语言学习:包导入和模块搜索路径简介、使用方法之详细攻略

    Python编程语言学习:包导入和模块搜索路径简介.使用方法之详细攻略 目录 包导入和模块搜索路径简介 1.Pyhon搜索模块路径的机制 2.自定义配置搜索路径

  10. python 聚类算法包_Python聚类算法之DBSACN实例分析 python怎么用sklearn包进行聚类

    python 怎么可视化聚类的结果 science 发表的聚类算法的python代码 测试数据长什...说明你的样本数据中有nan值,通常是因为原始数据中包含空字符串或None值引起的. 解决办法是把 ...

最新文章

  1. Java 分布式 RPC 框架性能大比拼,Dubbo 排第几?
  2. SAP CRM RDS快速部署解决方案
  3. linux下kegg注释软件,KEGG数据中全部代谢反应和代谢物注释信息的下载
  4. 自定义按键_王者荣耀:自定义按键让你的百里守约百发百中
  5. Android 获取app 地址,获取手机设备信息、app版本信息、ip地址
  6. Python画图库Turtle库详解篇
  7. 上科大提出:对抗神经网络 动态人像系统SofGAN!
  8. 【王道计组笔记】高速缓存器:局部性原理及性能分析
  9. PBR理论基础2:光照、材质与微面元理论
  10. python列表嵌套字典取值_Python学习100天-Day03(字符串、列表、字典、元组)
  11. Turbo码编码举例计算
  12. 怎么在linux上网络功能,Linux系统如何通过手机GPRS功能无线上网
  13. python随机密码生成以整数17为随机数种子_简述pythonpytorch 随机种子的实现
  14. uniapp中,H5端使用html2canvas生成海报
  15. 怎么制作游戏脚本_怎么剪游戏视频?五步教你制作绝地求生击杀合集
  16. 文件cpy改进,文件加密,对文件两次运算可解密,密码65
  17. 在家中搭建网站服务器可行吗?
  18. Swift将改变一切
  19. mysql 升级系列 楔子
  20. PAT_乙级 1016 部分A+B(15)

热门文章

  1. PCI Express (PCIe) 介绍
  2. 阿里云短信接口写法参照实例
  3. java ftps 证书_java – 连接到FTPS服务器
  4. S一文读懂应力集中与应力奇异
  5. java sql in语句,sql语句In查询的好
  6. 软件Hspice基础知识学习笔记(1)
  7. 小度计算机笔记,开售告罄、口碑炸裂、高语音交互率的小度耳机,全新升级语音笔记...
  8. 2021-01-05
  9. arcgis加载经纬度信息并导入91地图
  10. 详解ResNet残差网络