导读

最近在项目中需要将一些数据传输到另外一个程序里面以供调用,两个程序都处于同一服务器上,传输的内容少,但是频率很频繁,使用 HTTP 的方式虽然会方便一点,但是会占用一部分网络带宽,若是遇到服务器网络拥挤堵塞,就会导致消息无法发送,或者消息能发送,但是接收方无法接收的问题。于是想到使用 RabbitMQ 这一消息中间件。

要使用RabbitMQ功能,需要配合pika库。安装:

pip install pika

之前写过一个简单的 RabbitMQ demo可供参考,连接方式采用 BlockingConnection,gitee地址 。

项目框架使用Tornado,需要发布消息。一开始不熟悉pika库的使用方法,查了pika官方文档,只有简单的消费者代码示例,也查了其他作者发布的文档,讲述关于 TornadoConnection 内容的文章极少,来来回回都是那几行代码, 并且写的是消费者对象,并不具备参考性。

但是其中一篇文章给了我非常关键的提示,文章链接:链接地址 。

代码开始

一、先创建一个pika连接对象,在 MQHandler.py 文件里面

有关于pika库的相关API解释,可参考pika的官方文档,这里不做解释。

import pika
from pika.adapters.tornado_connection import TornadoConnectionclass PikaClient:def __init__(self, io_loop):self.host = "127.0.0.1"self.port = 5672self.username = "guest"self.password = "guest"self.io_loop = io_loopself.connected = Falseself.connecting = Falseself.connection = Noneself.channel = Nonedef connect(self):if self.connecting:returnself.connecting = Truecred = pika.PlainCredentials(username=self.username, password=self.password)param = pika.ConnectionParameters(self.host, self.port, "/", credentials=cred)self.connection = TornadoConnection(param, custom_ioloop=self.io_loop, on_open_callback=self.on_connected)self.connection.add_on_open_error_callback(self.err)self.connection.add_on_close_callback(self.on_closed)def err(self, conn):print('pika error!')def on_connected(self, conn):print('pika connected')self.connected = Trueself.connection = connself.connection.channel(on_open_callback=self.on_channel_open)def on_channel_open(self, channel):print(channel)channel.exchange_declare(exchange="test", durable=True)      # 交换机、持久化self.channel = channeldef on_closed(self, conn, c):print('pika close!')self.io_loop.stop()

二、web 接口代码示例,在 TestHandler.py 里,可以将数据放入到消息队列里面

import json
import pika
from tornado.web import RequestHandlerclass TestProduct(RequestHandler):def post(self):data = json.loads(self.request.body)print(f"推送内容:{data}")self.application.pika.channel.basic_publish(exchange="test", routing_key="id", body=self.request.body, properties=pika.BasicProperties(delivery_mode=2))    # 1-消息非持久化,2-持久化print("推送成功。")

三、程序入口运行文件 main.py

import tornado.web
import tornado.ioloop
from tornado_demo.MQHandler import PikaClient
from tornado_demo.api.TestHandler import TestProductdef make_app():handlers = [("/api/test", TestProduct)]app = tornado.web.Application(handlers=handlers, debug=True)return appdef run():ioloop = tornado.ioloop.IOLoop.instance()app = make_app()app.pika = PikaClient(ioloop)app.pika.connect()app.listen(8888, "0.0.0.0")ioloop.start()

结尾

这里的代码只围绕在Tornado框架里使用pika库的示例,希望可以帮助到大家。

全文结束

Tornado:使用RabbitMQ发布消息(pika:pika.adapters.tornado_connection.TornadoConnection)相关推荐

  1. Rabbitmq报错pika.exceptions.IncompatibleProtocolError: StreamLostError: ('Transport indicated EOF',)

    rabbitmq 报错 pika.exceptions.IncompatibleProtocolError: StreamLostError: ('Transport indicated EOF',) ...

  2. RabbitMQ—发布消息确认和消费消息确认

    目录 序言 消息发布流程 发布消息确认 一.事务使用 二.Confirm发送方确认模式 方式一:普通Confirm模式 方式二:批量Confirm模式 方式三:异步Confirm模式 扩展知识 消费消 ...

  3. rabbitmq 连接失败pika.exceptions.ProbableAccessDeniedError

  4. python使用pika操作rabbitmq总结

    python 连接操作rabbitMQ 主要是使用pika库 安装: pip install pika==1.0.1 注意: pika 1.x 与 pika 0.x 有一些不同,使用的时候需要看清版本 ...

  5. python使用pika操作rabbitmq总结(一)

    python 连接操作rabbitMQ 主要是使用pika库 安装: pip install pika==1.0.1 注意: pika 1.x 与 pika 0.x 有一些不同,使用的时候需要看清版本 ...

  6. pika详解 (一)

    pika pika处理消息可以简单分为以下几个步骤: 我们首先创建连接对象,然后启动事件循环. 当有连接时,调用on_connected方法.在该方法中创建channel channel创建完成,将调 ...

  7. python 连接 rabbitMQ以及rabbitMQssl注意事项,password

    pip3 install pika==1.1.0 官方对于pika有如下介绍# Since threads aren't appropriate to every situation, it does ...

  8. [427]pika missed heartbeats from client timeout 60s 的问题

    使用 rabbitmq 中 heartbeat 功能可能会遇到的问题 [问题场景] 客户端以 consumer 身份订阅到 rabbitmq server 上的 queue 上,客户端侧在 AMQP ...

  9. [1005]pika 线程不安全

    先说结论:Pika is not thread safe. Use a BlockingConnection per-thread. 即 Pika 并不是线程安全的,应该在每个线程里,都使用各种的 B ...

最新文章

  1. 一位10年程序员生涯的总结与经验忠告分享
  2. @成都的Coder ,一起探讨终端架构持续演进
  3. 利用HTTP watch观察SAP CRM WebClient UI popup window
  4. Win10 IIS本地部署网站运行时图片和样式不正常?
  5. 精选| 2021年6月R新包推荐(第55期)
  6. redis 分布式锁 看门狗_redis分布式锁原理及实现
  7. 在LINUX系统中安装KVM虚拟化
  8. [分享]N-Gage QD新手教程
  9. ECMA-262 6th Edition
  10. 文件i/o函数 open/close
  11. m2接口和nvme协议接口_B85主板有M.2接口吗?支持NVMe协议吗?
  12. 实物补贴和货币补贴的权衡
  13. 压缩包文件解压找回密码
  14. 【计算机组成原理】冯诺伊曼结构和计算机性能指标
  15. docker执行权限问题Got permission denied while trying to connect to the Docker daemon socket
  16. java-网页404(个例)
  17. PDF文件如何转成HTML格式?说一种思路
  18. Azure认证 Administrator Associate(AZ-104) 考试指南,资料分享
  19. 微信链接跳转浏览器实现微信中直接下载棋牌游戏类APP功能
  20. 联想服务器做完raid找不到硬盘,如何配置磁盘阵列(RAID)

热门文章

  1. 几率波量子雷达/反事实量子通信
  2. DTT年度收官圆桌π,华为云8位技术专家的年末盘点
  3. NQA和静态路由联动
  4. modify()函数的用法
  5. 编写一个程序,将两个字符串链接起来, 结果取代第一个字符串
  6. Mail::Sender
  7. 无论你想发展的方向是什么,关注这些总没错!
  8. python3 锦鲤第一步!了解随机抽样之蓄水池算法
  9. 传奇开区网站如何添加流量统计代码
  10. Android程序中完美解决Zxing二维码扫描图片变形问题