简介

MessageQueue用于解决跨进程、跨线程、跨应用、跨网络的通信问题。

RabbitMQ使用erlang开发,在windows上使用时要先安装erlang。

官方的示例比较容易理解,可以点这里去看看。

结构

生产者 ---> exchange ---> queue ---> 消费者。

生产者负责提供消息,exchange负责分发消息到指定queue,queue存储消息(默认临时,可设置持久化),消费者接收处理消息。

基本模型

流程:

  1. 建立到rabbitmq的连接
  2. 建立通道
  3. 声明使用的队列(生产者和消费者都要声明,因为不能确定两者谁先运行)
  4. 生产/消费
  5. 持续监听/关闭连接

python中使用pika模块来处理与rabbitmq的连接。

# 生产者
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
r = channel.queue_declare(queue='name', exclusive=False, durable=False) # exclusive设置True是随机生成一个queue名字并返回,durable设置True是队列持久化
queue_name = r.method.queuechannel.basic_publish(exchange = '', # 使用默认分发器routing_key = queue_name,properties = pika.BasicProperties(delivery_mode = 2 # 这个参数用于设置消息持久化),body = [data]
)connection.close()# 消费者
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
r = channel.queue_declare(queue='name', exclusive=False, durable=False)
queue_name = r.method.queuedef callback(channel, method, properties, body):pass# channel.basic_ack(delivery_tag = method.delivery_tag) 在回调函数最后调用手工应答,表示消息处理完毕,queue可以删除消息了channel.basic_consume(callback, # 这是个回调函数,接收生产者发来的bodyqueue = queue_name,no_ack = True # 设置True表示消息一经获取就被从queue中删除,如果这时消费者崩溃,则这条消息将永久丢失,所以去掉这个属性,在回调函数中手工应答比较安全
)channel.basic_qos(prefetch_count = [num]) # 设置消费者的消费能力,数字越大,则说明该消费者能力越强,往往与设备性能成正比channel.start_consuming() # 阻塞模式获取消息
# connection.process_data_events() 非阻塞模式获取消息

发布订阅模型

类似收音机广播,订阅者只要打开收音机就能收听信息,但接收不到它打开之前的消息。

包括发布订阅模型以及接下来的一些模型,都是通过exchange和routing_key这两个属性来控制的。直接用官网的源码来做注释。

流程:

  1. 发布者设置发布频道
  2. 收听者配置频道信息
  3. 收听者通过随机queue绑定频道接收消息
# 发布者
#!/usr/bin/env python
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()# 创建一个命名exchange,并设置其type为fanout,表示广播
channel.exchange_declare(exchange='logs',exchange_type='fanout')# 从命令行接收输入
message = ' '.join(sys.argv[1:]) or "info: Hello World!"# 由于是广播模式,任意消费者只要设置同样的exchange,就能以任意queue来接收消息,所以这里routing_key置空
channel.basic_publish(exchange='logs',routing_key='',body=message)
print(" [x] Sent %r" % message)
connection.close()# 收听者
#!/usr/bin/env python
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()# 这里使用同样的exchange配置,就像调节收音机频道
channel.exchange_declare(exchange='logs',exchange_type='fanout')# 在基础模型中提到过,设置exclusive=True表示生成随机的queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue# 生成了queue,还要将它与exchange进行绑定,这样消息才能通过exchange进入queue
channel.queue_bind(exchange='logs',queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r" % body)channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()

路由/级别模型

将消息发送到指定的路由处,类似于logging模块的分级日志消息。

主要利用channel.queue_bind(routing_key=[route])这个方法,来为queue增加路由。

流程:

  1. 生产者向指定路由发送消息
  2. 消费者绑定路由
  3. 根据路由接收到不同的消息
# 生产者
#!/usr/bin/env python
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()# 同样使用命名exchange,主要是type为direct
channel.exchange_declare(exchange='direct_logs',exchange_type='direct')# 将命令行输入的路由作为接收消息的queue的属性,只有匹配的才能接收到消息
severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()# 消费者
#!/usr/bin/env python
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='direct_logs',exchange_type='direct')result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue# 指定该消费者接收的消息路由
severities = sys.argv[1:]
if not severities:sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])sys.exit(1)# 对该消费者的queue绑定路由
for severity in severities:channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()

细目模型/更细致的划分

这个模型比前几种更强大,但是原理与路由模型是相同的。

如果routing_key='#',它就相当于发布订阅模式,向所有queue发送消息,如果routing_key值中不包含*,#,则相当于路由模型。

该模型主要是实现了模糊匹配。

# 生产者
#!/usr/bin/env python
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='topic_logs',exchange_type='topic')routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',routing_key=routing_key,body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()# 消费者
#!/usr/bin/env python
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='topic_logs',exchange_type='topic')result = channel.queue_declare(exclusive=True)
queue_name = result.method.queuebinding_keys = sys.argv[1:]
if not binding_keys:sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])sys.exit(1)for binding_key in binding_keys:channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=binding_key)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()

RPC模型

前面的几种模型都只能是一端发消息,另一端接收,RPC模型实现的就是单端收发功能。

主要是通过两个队列实现,一个发,一个收。

流程:

  1. 客户端发送消息到约定队列,并且附带返回队列的名称和验证id
  2. 服务器接到消息,将处理过的消息发送给指定队列并附带验证id
  3. 客户端接到消息先验证id,通过则处理消息
# 服务器
#!/usr/bin/env python
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()channel.queue_declare(queue='rpc_queue')def fib(n):if n == 0:return 0elif n == 1:return 1else:return fib(n-1) + fib(n-2)def on_request(ch, method, props, body):n = int(body)print(" [.] fib(%s)" % n)response = fib(n)ch.basic_publish(exchange='',routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id = \props.correlation_id),body=str(response))ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')print(" [x] Awaiting RPC requests")
channel.start_consuming()# 客户端
#!/usr/bin/env python
import pika
import uuidclass FibonacciRpcClient(object):def __init__(self):self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))self.channel = self.connection.channel()result = self.channel.queue_declare(exclusive=True)self.callback_queue = result.method.queueself.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue)def on_response(self, ch, method, props, body):if self.corr_id == props.correlation_id:self.response = bodydef call(self, n):self.response = Noneself.corr_id = str(uuid.uuid4())self.channel.basic_publish(exchange='',routing_key='rpc_queue',properties=pika.BasicProperties(reply_to = self.callback_queue,correlation_id = self.corr_id,),body=str(n))while self.response is None:self.connection.process_data_events()return int(self.response)fibonacci_rpc = FibonacciRpcClient()print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

转载于:https://www.cnblogs.com/ikct2017/p/9434468.html

RabbitMQ/pika模块相关推荐

  1. rabbitmq订单模块_RabbitMQ播放模块! 构架

    rabbitmq订单模块 RabbitMQ提供了具有可预测且一致的吞吐量和延迟的高可用性,可伸缩和便携式消息传递系统. RabbitMQ是AMQP (业务消息传递的开放标准)的领先实现 ,并且通过适配 ...

  2. RabbitMQ播放模块! 构架

    RabbitMQ提供了具有可预测且一致的吞吐量和延迟的高可用性,可伸缩和便携式消息系统. RabbitMQ是AMQP (业务消息传递的开放标准)的领先实现 ,并且通过适配器支持XMPP,SMTP,ST ...

  3. springboot jpa 创建数据库以及rabbitMQ分模块扫描问题

    在使用jpa过程中,如果没有在配置中加入自动创建实体对于的sql,则需要提前创建建表语句 spring.jpa.properties.hibernate.show_sql=true spring.jp ...

  4. python安装pika模块rabbitmq

    1.pip install pika 2.如找不到 拷贝 D:\python\testmq\venv\Lib\site-packages  \pika目录 转载于:https://www.cnblog ...

  5. RabbitMQ pika错误处理 delivery acknowledgement on channel 1 timed out

    AMQPChannelError 异常捕获信息: (406, 'PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed ou ...

  6. rabbitMQ pika demo

    python3 有兴趣可以跑下 github: https://github.com/chujiangke/RabbitMQ_demo.git

  7. 刚安装的python如何使用-python中RabbitMQ的使用(安装和简单教程)

    1,简介 RabbitMQ(Rabbit Message Queue)是流行的开源消息队列系统,用erlang语言开发. 1.1关键词说明: Broker:消息队列服务器实体. Exchange:消息 ...

  8. 第二百九十二节,RabbitMQ多设备消息队列-Python开发

    RabbitMQ多设备消息队列-Python开发 首先安装Python开发连接RabbitMQ的API,pika模块 pika模块为第三方模块  对于RabbitMQ来说,生产和消费不再针对内存里的一 ...

  9. python rabitmq_python RabbitMQ队列使用

    原博文 2019-01-17 21:17 − python RabbitMQ队列使用 关于python的queue介绍 关于python的队列,内置的有两种,一种是线程queue,另一种是进程queu ...

最新文章

  1. yum安装etcd集群
  2. GP通过外部表装载数据时遇到ERROR:extra data after last expected column解决方法
  3. 【Java 网络编程】TCP 连接 断开 机制 ( 三次握手 | 四次挥手 )
  4. 计算机网络技术及应用 课程 英语,计算机网络应用—现代英语课堂中的第三种语言...
  5. 【CyberSecurityLearning 69】反序列化漏洞
  6. wp如何代码实现锁屏
  7. 14.理解copy_if算法的正确实现
  8. 使用Recast.AI创建具有人工智能的聊天机器人
  9. Games101现代图形学入门Lecture 4: Transformation Cont知识点总结
  10. (2) MongoDB基本概念及与关系型数据库的对照
  11. Python3 模块相关及输入输出模式
  12. sql 语句-初级进阶(二)
  13. mysql 怎么修改成新字段_Mysql入门第一课《建表、改表、删表》
  14. code review手记2
  15. [转] 电子技术*笔记4【2013-03】
  16. deepin驱动精灵_深度Linux Deepin系统安装教程使用体验
  17. 杂题 P1640 [SCOI2010]连续攻击游戏
  18. 定时器0练习,利用左循环函数_crol_(a,b)
  19. 高级信息系统项目管理师(高项)高分通过经验分享
  20. ArcGIS制图之阴影效果的表达与运用

热门文章

  1. html定位fix,html 定位fixed
  2. CSS之background-size属性
  3. Github项目|几行代码即可实现人脸检测、目标检测的开源计算机视觉库
  4. oracle出错如何备份数据,备份oracle数据库出错
  5. cocos creator 数组_CoCos Creator中的数据类型
  6. iPhone X Web 设计
  7. String.equals用法注意
  8. CodeForces 139C Literature Lesson(模拟)
  9. testNG之组测试
  10. Linux 命令快捷键