简介

  众所周知,RabbitMQ是一个开源的高性能的消息队列,支持多种开发语言:Java,Python,.Net,C,C++,PHP等多种语言,那么如何通过Python语言调用RabbitMQ呢?Python中pika这个模块提供了完整的调用方法,通过这个包我们可以实现Rabbit的简单模式,交换机模式以及一些特殊的参数。那么我们如何使用pika模块呢,接下来,请看听我慢慢道来。

一、简单模式

生产者

  • 简单模式下,有多个消费者时,采用轮询方式处理消息。
import pikaif __name__ == '__main__':# 创建凭证,使用rabbitmq用户密码登录credentials = pika.PlainCredentials("lhx", "123456")cpara = pika.ConnectionParameters(host='111.204.156.11', port=9015, credentials=credentials)connection = pika.BlockingConnection(cpara)# 创建频道channel = connection.channel()# 新建一个queue1队列,用于接收消息channel.queue_declare(queue='queue1')# 注意在rabbitmq中,消息想要发送给队列,必须经过交换(exchange),# 初学可以使用空字符串交换(exchange=''),# 它允许我们精确的指定发送给哪个队列(routing_key=''),# 参数body值发送的数据channel.basic_publish(exchange='',routing_key='queue1',body='哈哈哈。。。')print("已经发送了消息")# 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,需要关闭本次连接connection.close()

消费者

import pikaif __name__ == '__main__':# 建立与rabbitmq的连接credentials = pika.PlainCredentials("lhx", "123456")cpara = pika.ConnectionParameters(host='111.204.156.11', port=9015, credentials=credentials)connection = pika.BlockingConnection(cpara)# 创建频道channel = connection.channel()# 生命队列,消费者也要生命队列,因为我们不能确定消费者与生产者谁先启动。channel.queue_declare(queue="queue1")# 回调函数,里面存放执行内容。def callback(ch, method, properties, body):print(type(body))print(body.decode(encoding='utf-8'))print("消费者接收到了任务:%r" % body)# 有消息来临,callback,没有消息则夯住,等待消息channel.basic_consume(on_message_callback=callback, queue='queue1', auto_ack=True)# 开始消费,接收消息channel.start_consuming()
  • 公平分发 此模式下,如果有多个消费者,将轮询获取消息,不会考虑消费者的消费能力,那么如何让高性能的消费者接受更多的任务呢,此时,可设置公平分发机制:
    消费者中加入以下代码
 # 公平分发参数,根据消费者的性能进行分发channel.basic_qos(prefetch_count=1)
  • 取消自动应答:如果消费者消费失败,消息会丢失,此时可以设置取消自动应答,在回调函数结尾处加入如下代码:
    ch.basic_ack(delivery_tag=method.delivery_tag)
    手动应答,如果回调函数处理过程中发生错误,该条语句不会执行,消息回滚,等待下次消费。示例代码如下:
# consumer.py
import pikaif __name__ == '__main__':# 建立与rabbitmq的连接credentials = pika.PlainCredentials("lhx", "123456")cpara = pika.ConnectionParameters(host='111.204.156.11', port=9015, credentials=credentials)connection = pika.BlockingConnection(cpara)channel = connection.channel()channel.queue_declare(queue="queue2")def callback(ch, method, properties, body):print(type(body))# print(body.decode(encoding='utf-8'))print("消费者接收到了任务:%r" % body.decode(encoding='utf-8'))# 回调函数中进行手动应答。ch.basic_ack(delivery_tag=method.delivery_tag)# auto_ack=False取消自动应答channel.basic_consume(on_message_callback=callback, queue='queue2', auto_ack=False) # 开始消费,接收消息channel.start_consuming()
  • 消息持久化 : 如果消息进入消息队列后,RabbitMQ服务崩掉了怎么办,消息是不是丢失了,为了解决这个问题,可以设置持久化消息
# producer.py  创建队列时设置持久化参数
import pikaif __name__ == '__main__':# 创建凭证,使用rabbitmq用户密码登录credentials = pika.PlainCredentials("lhx", "123456")cpara = pika.ConnectionParameters(host='111.204.156.11', port=9015, credentials=credentials)connection = pika.BlockingConnection(cpara)# 创建频道channel = connection.channel()# durable=True 设置消息持久化,即使rabbitmq重启也会保存消息,记得建立新的队列,因为以前的队列不支持持久化channel.queue_declare(queue='queue3', durable=True)# 注意在rabbitmq中,消息想要发送给队列,必须经过交换(exchange),# 初学可以使用空字符串交换(exchange=''),它允许我们精确的指定发送给哪个队列(routing_key=''),参数body值发送的数据channel.basic_publish(exchange='',routing_key='queue3',body='这是一个持久化的消息。。。')print("已经发送了消息")# 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,需要关闭本次连接connection.close()
# consumer.py
import pikaif __name__ == '__main__':# 建立与rabbitmq的连接credentials = pika.PlainCredentials("lhx", "123456")cpara = pika.ConnectionParameters(host='111.204.156.11', port=9015, credentials=credentials)connection = pika.BlockingConnection(cpara)channel = connection.channel()channel.queue_declare(queue="queue3", durable=True) # 持久化参数def callback(ch, method, properties, body):print(type(body))# print(body.decode(encoding='utf-8'))print("消费者接收到了任务:%r" % body.decode(encoding='utf-8'))# 代用该方法进行手动应答ch.basic_ack(delivery_tag=method.delivery_tag)#  auto_ack=False取消自动应答channel.basic_consume(on_message_callback=callback, queue='queue3', auto_ack=False)# 开始消费,接收消息channel.start_consuming()

总结

  • 公平分发,取消自动应答,消息持久化等参数可以让用户更方便的使用消息队列和处理一些特殊的情况,有了这些参数,可以保证消息中间件更加稳定安全的运行。当然这些参数,简单模式中可以设置,交换机模式中也可以设置。

二、交换机模式-发布/订阅模式

  • 交换机的类型为fanout,此种模式下,每个消费者创建时,都会创建属于自己的队列,生产者会将消息传递到交换机,交换机传递到它绑定的所有队列上,此时所有生产者都会接收到消息。示例代码如下:

生产者

import pikaif __name__ == '__main__':# 创建凭证,使用rabbitmq用户密码登录credentials = pika.PlainCredentials("lhx", "123456")cpara = pika.ConnectionParameters(host='111.204.156.11', port=9015, credentials=credentials)connection = pika.BlockingConnection(cpara)channel = connection.channel()channel.exchange_declare(exchange='logs',exchange_type='fanout', auto_delete=True)  # 发布订阅模式message = "info: Hello World!"channel.basic_publish(exchange='logs',routing_key='',body=message)print(" [x] Sent %r" % message)connection.close()

消费者

import pikaif __name__ == '__main__':# 建立与rabbitmq的连接credentials = pika.PlainCredentials("lhx", "123456")cpara = pika.ConnectionParameters(host='111.204.156.11', port=9015, credentials=credentials)connection = pika.BlockingConnection(cpara)channel = connection.channel()channel.exchange_declare(exchange='logs',exchange_type='fanout', auto_delete=True)  # fanout发布订阅模式result = channel.queue_declare("", exclusive=True)  # exclusive参数可以生成一个随机的队列名queue_name = result.method.queuechannel.queue_bind(exchange='logs',routing_key='',  # 发布订阅模式下,routing_key必须是空字符串queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r" % body)channel.basic_consume(queue=queue_name,auto_ack=True,on_message_callback=callback)channel.start_consuming()

三、交换机模式-关键字模式

  • 发布订阅模式下,所有消费者都会接收消息,如果生产者发送的消息只想让部分消费者接收,应该如何实现呢?此时可以考虑关键字模式,当交换机类型为direct时,为关键字模式,示例代码如下:

生产者

import pikaif __name__ == '__main__':# 创建凭证,使用rabbitmq用户密码登录credentials = pika.PlainCredentials("lhx", "123456")cpara = pika.ConnectionParameters(host='111.204.156.11', port=9015, credentials=credentials)connection = pika.BlockingConnection(cpara)channel = connection.channel()channel.exchange_declare(exchange='logs2',exchange_type='direct', auto_delete=True)  # direct:关键字模式message = "error: Hello World!11111111111"channel.basic_publish(exchange='logs2',routing_key='error',  # 设置关键字为error body=message)print(" [x] Sent %r" % message)connection.close()

消费者

import pikaif __name__ == '__main__':# 建立与rabbitmq的连接credentials = pika.PlainCredentials("lhx", "123456")cpara = pika.ConnectionParameters(host='111.204.156.11', port=9015, credentials=credentials)connection = pika.BlockingConnection(cpara)channel = connection.channel()channel.exchange_declare(exchange='logs2',exchange_type='direct', auto_delete=True)  # direct:关键字模式result = channel.queue_declare("", exclusive=True)  # 自动生成队列名称queue_name = result.method.queuechannel.queue_bind(exchange='logs2',  queue=queue_name,routing_key='error'  # 绑定关键字error)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r" % body)channel.basic_consume(queue=queue_name,auto_ack=True,on_message_callback=callback)channel.start_consuming()
  • 问题:如果消费者想绑定多个关键字,应如何实现?
    解答:消费者代码中多次调用channel.queue_bind方法,一次绑定一个关键字
# consumer.py.........channel.queue_bind(exchange='logs2',  queue=queue_name,routing_key='error'  # 绑定关键字error)channel.queue_bind(exchange='logs2',  queue=queue_name,routing_key='info'  # 绑定关键字info)channel.queue_bind(exchange='logs2',  queue=queue_name,routing_key='warning'  # 绑定关键字warning).........

四、交换机模式-通配符模式

  • 关键字模式可以通过关键字匹配实现绑定队列。通配符模式就更加强大了,可以通过字符模糊匹配更加灵活的绑定队列。该模式的关键字是topic,该模式下还提供了两个通配符(有且仅有这两个通配符):

    1. 符号"#" :匹配一个或多个词,
    2. 符号"*" :仅匹配一个词

示例代码如下:

生产者

import pikaif __name__ == '__main__':# 创建凭证,使用rabbitmq用户密码登录credentials = pika.PlainCredentials("lhx", "123456")cpara = pika.ConnectionParameters(host='111.204.156.11', port=9015, credentials=credentials)connection = pika.BlockingConnection(cpara)channel = connection.channel()channel.exchange_declare(exchange='logs3',exchange_type='topic')  # topic:通配符模式message = "weather"channel.basic_publish(exchange='logs3',routing_key='usa.weather',  # 复杂关键字body=message)print(" [x] Sent %r" % message)connection.close()

消费者

import pikaif __name__ == '__main__':# 建立与rabbitmq的连接credentials = pika.PlainCredentials("lhx", "123456")cpara = pika.ConnectionParameters(host='111.204.156.11', port=9015, credentials=credentials)connection = pika.BlockingConnection(cpara)channel = connection.channel()channel.exchange_declare(exchange='logs3', exchange_type='topic')result = channel.queue_declare("", exclusive=True)  # 自动生成队列名称queue_name = result.method.queuechannel.queue_bind(exchange='logs3',queue=queue_name,routing_key='#.weather'  # 匹配 以.weather结尾的关键字)# channel.queue_bind(exchange='logs3',#                    queue=queue_name,#                    routing_key='usa.#'  # 匹配 以usa.开头的关键字#                    )print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r" % body)channel.basic_consume(queue=queue_name,auto_ack=True,on_message_callback=callback)channel.start_consuming()

总结

作为专业的消息中间件,RabbitMQ提供了足够强大的功能帮助开发者实现各种各样炫酷的功能。当然,大部分情况下,简单模式足以应付绝大多数需求。

使用皮卡(pika)操作RabbitMQ相关推荐

  1. python使用pika操作rabbitmq总结

    python 连接操作rabbitMQ 主要是使用pika库 安装: pip install pika==1.0.1 注意: pika 1.x 与 pika 0.x 有一些不同,使用的时候需要看清版本 ...

  2. python使用pika操作rabbitmq总结(一)

    python 连接操作rabbitMQ 主要是使用pika库 安装: pip install pika==1.0.1 注意: pika 1.x 与 pika 0.x 有一些不同,使用的时候需要看清版本 ...

  3. Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy

    Memcached Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度 ...

  4. Python之操作RabbitMQ

    RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统.他遵循Mozilla Public License开源协议. MQ全称为Message Queue, 消息队列(MQ)是一种应用程序 ...

  5. python操作RabbitMQ

    RabbitMQ介绍 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue)的开源实现的产品,RabbitMQ是一个消息代理,从"生产者" ...

  6. Python菜鸟之路:Python基础-Python操作RabbitMQ

    RabbitMQ简介 rabbitmq中文翻译的话,主要还是mq字母上:Message Queue,即消息队列的意思.rabbitmq服务类似于mysql.apache服务,只是提供的功能不一样.ra ...

  7. rabbitmq python_Python操作RabbitMQ服务器实现消息队列的路由功能

    Python使用Pika库(安装:sudo pip install pika)可以操作RabbitMQ消息队列服务器(安装:sudo apt-get install rabbitmq-server), ...

  8. python 操作RabbitMQ

    pip install pika使用API操作RabbitMQ基于Queue实现生产者消费者模型View Code 对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务 ...

  9. python总线 rabbitmq_python - 操作RabbitMQ

    介绍 RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统.他遵循Mozilla Public License开源协议. MQ全称为Message Queue, 消息队列(MQ)是一种应 ...

  10. python_day10のPython操作 RabbitMQ、Redis、Memcache

    Memcached Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度 ...

最新文章

  1. 盘点 | 近期活动信息都在这里啦~
  2. JavaScript学习记录总结(四)——js函数的特殊性
  3. 笔记:常用SQL语句
  4. 自定义ImageView 实现双击放大缩小还原,无极缩小和旋转及拖动(多机型测试很稳定)
  5. 近7万新冠域名一半是钓鱼网站?以色列老牌安全厂商Check Point推出全端保护新战略
  6. qt 复制字符串_QT中字符串的转化与拼接
  7. leetcode 190 python
  8. 小米11全系机型补齐,超大杯压场,但Pro版也不容忽视
  9. UVA 10048 - Audiophobia
  10. asp.net 点击查询跳转到查询结果页面_【免费毕设】ASP.NET交通信息网上查询系统的设计与实现(源代码+论文+开题报告)...
  11. 逻辑回归算法——乳腺癌检测
  12. html格式kindle能看吗,Kindle Voyage支持哪些文本格式
  13. 《GPU编程与CG语言之阳春白雪下里巴人》阅读笔记 第一章+第二章
  14. 实践数据湖iceberg 第十课 快照删除
  15. Flutter Navigator路由传参
  16. python读书心得体会范文_读书心得体会范文(通用3篇)
  17. CentOS 7下的软件安装方法总结
  18. Python编写无界面版打字练习程序
  19. python有n元人民币、其中有10元的_Python笔记-古灵阁小精灵金加隆金币兑换人民币...
  20. 使用接口测试活动的中奖概率(随机事件测试)

热门文章

  1. 波士顿大学计算机硕士排名,GPA3.25却获波士顿大学计算机硕士录取
  2. python存根文件_打包存根文件
  3. 11 款可替代 top 命令的工具
  4. day18私有化、关联、继承
  5. java 线程的插队运行_java笔记--线程的插队行为
  6. Google谷歌新手SEO优化教程篇【1】
  7. Word编辑页码不从第一页开始
  8. 大鹏教你python数据分析
  9. android 强制关闭键盘,Android关闭输入软键盘无效的问题
  10. vue的学习笔记(15)之Promise知识讲解