Tornado:使用RabbitMQ发布消息(pika:pika.adapters.tornado_connection.TornadoConnection)
导读
最近在项目中需要将一些数据传输到另外一个程序里面以供调用,两个程序都处于同一服务器上,传输的内容少,但是频率很频繁,使用 HTTP 的方式虽然会方便一点,但是会占用一部分网络带宽,若是遇到服务器网络拥挤堵塞,就会导致消息无法发送,或者消息能发送,但是接收方无法接收的问题。于是想到使用 RabbitMQ 这一消息中间件。
要使用RabbitMQ功能,需要配合pika库。安装:
pip install pika
之前写过一个简单的 RabbitMQ demo可供参考,连接方式采用 BlockingConnection,gitee地址 。
项目框架使用Tornado,需要发布消息。一开始不熟悉pika库的使用方法,查了pika官方文档,只有简单的消费者代码示例,也查了其他作者发布的文档,讲述关于 TornadoConnection 内容的文章极少,来来回回都是那几行代码, 并且写的是消费者对象,并不具备参考性。
但是其中一篇文章给了我非常关键的提示,文章链接:链接地址 。
代码开始
一、先创建一个pika连接对象,在 MQHandler.py 文件里面
有关于pika库的相关API解释,可参考pika的官方文档,这里不做解释。
import pika
from pika.adapters.tornado_connection import TornadoConnectionclass PikaClient:def __init__(self, io_loop):self.host = "127.0.0.1"self.port = 5672self.username = "guest"self.password = "guest"self.io_loop = io_loopself.connected = Falseself.connecting = Falseself.connection = Noneself.channel = Nonedef connect(self):if self.connecting:returnself.connecting = Truecred = pika.PlainCredentials(username=self.username, password=self.password)param = pika.ConnectionParameters(self.host, self.port, "/", credentials=cred)self.connection = TornadoConnection(param, custom_ioloop=self.io_loop, on_open_callback=self.on_connected)self.connection.add_on_open_error_callback(self.err)self.connection.add_on_close_callback(self.on_closed)def err(self, conn):print('pika error!')def on_connected(self, conn):print('pika connected')self.connected = Trueself.connection = connself.connection.channel(on_open_callback=self.on_channel_open)def on_channel_open(self, channel):print(channel)channel.exchange_declare(exchange="test", durable=True) # 交换机、持久化self.channel = channeldef on_closed(self, conn, c):print('pika close!')self.io_loop.stop()
二、web 接口代码示例,在 TestHandler.py 里,可以将数据放入到消息队列里面
import json
import pika
from tornado.web import RequestHandlerclass TestProduct(RequestHandler):def post(self):data = json.loads(self.request.body)print(f"推送内容:{data}")self.application.pika.channel.basic_publish(exchange="test", routing_key="id", body=self.request.body, properties=pika.BasicProperties(delivery_mode=2)) # 1-消息非持久化,2-持久化print("推送成功。")
三、程序入口运行文件 main.py
import tornado.web
import tornado.ioloop
from tornado_demo.MQHandler import PikaClient
from tornado_demo.api.TestHandler import TestProductdef make_app():handlers = [("/api/test", TestProduct)]app = tornado.web.Application(handlers=handlers, debug=True)return appdef run():ioloop = tornado.ioloop.IOLoop.instance()app = make_app()app.pika = PikaClient(ioloop)app.pika.connect()app.listen(8888, "0.0.0.0")ioloop.start()
结尾
这里的代码只围绕在Tornado框架里使用pika库的示例,希望可以帮助到大家。
全文结束
Tornado:使用RabbitMQ发布消息(pika:pika.adapters.tornado_connection.TornadoConnection)相关推荐
- Rabbitmq报错pika.exceptions.IncompatibleProtocolError: StreamLostError: ('Transport indicated EOF',)
rabbitmq 报错 pika.exceptions.IncompatibleProtocolError: StreamLostError: ('Transport indicated EOF',) ...
- RabbitMQ—发布消息确认和消费消息确认
目录 序言 消息发布流程 发布消息确认 一.事务使用 二.Confirm发送方确认模式 方式一:普通Confirm模式 方式二:批量Confirm模式 方式三:异步Confirm模式 扩展知识 消费消 ...
- rabbitmq 连接失败pika.exceptions.ProbableAccessDeniedError
- python使用pika操作rabbitmq总结
python 连接操作rabbitMQ 主要是使用pika库 安装: pip install pika==1.0.1 注意: pika 1.x 与 pika 0.x 有一些不同,使用的时候需要看清版本 ...
- python使用pika操作rabbitmq总结(一)
python 连接操作rabbitMQ 主要是使用pika库 安装: pip install pika==1.0.1 注意: pika 1.x 与 pika 0.x 有一些不同,使用的时候需要看清版本 ...
- pika详解 (一)
pika pika处理消息可以简单分为以下几个步骤: 我们首先创建连接对象,然后启动事件循环. 当有连接时,调用on_connected方法.在该方法中创建channel channel创建完成,将调 ...
- python 连接 rabbitMQ以及rabbitMQssl注意事项,password
pip3 install pika==1.1.0 官方对于pika有如下介绍# Since threads aren't appropriate to every situation, it does ...
- [427]pika missed heartbeats from client timeout 60s 的问题
使用 rabbitmq 中 heartbeat 功能可能会遇到的问题 [问题场景] 客户端以 consumer 身份订阅到 rabbitmq server 上的 queue 上,客户端侧在 AMQP ...
- [1005]pika 线程不安全
先说结论:Pika is not thread safe. Use a BlockingConnection per-thread. 即 Pika 并不是线程安全的,应该在每个线程里,都使用各种的 B ...
最新文章
- 一位10年程序员生涯的总结与经验忠告分享
- @成都的Coder ,一起探讨终端架构持续演进
- 利用HTTP watch观察SAP CRM WebClient UI popup window
- Win10 IIS本地部署网站运行时图片和样式不正常?
- 精选| 2021年6月R新包推荐(第55期)
- redis 分布式锁 看门狗_redis分布式锁原理及实现
- 在LINUX系统中安装KVM虚拟化
- [分享]N-Gage QD新手教程
- ECMA-262 6th Edition
- 文件i/o函数 open/close
- m2接口和nvme协议接口_B85主板有M.2接口吗?支持NVMe协议吗?
- 实物补贴和货币补贴的权衡
- 压缩包文件解压找回密码
- 【计算机组成原理】冯诺伊曼结构和计算机性能指标
- docker执行权限问题Got permission denied while trying to connect to the Docker daemon socket
- java-网页404(个例)
- PDF文件如何转成HTML格式?说一种思路
- Azure认证 Administrator Associate(AZ-104) 考试指南,资料分享
- 微信链接跳转浏览器实现微信中直接下载棋牌游戏类APP功能
- 联想服务器做完raid找不到硬盘,如何配置磁盘阵列(RAID)