Rabbitmq 基础
Rabbitmq基础
1 MQ 基本概念
1.1 MQ概述
MQ全称Message Queue,是在消息的传输过程中保存消息的容器,多用于分布式系统之间进行通信。
分布式系统通信两种方式:直接远程调用(如:http请求)和 借助第三方完成间接通信(如:MQ)。
1.2 MQ优势
应用解耦:提高系统容错性和可维护性。
异步提速:提升用户体验和系统吞吐量。
削峰填谷:提高系统稳定性。
1.2.1 应用解耦
如下系统,直接使用远程调用。 用户下单后,订单系统先后访问库存、支付、物流。
- 如果库存系统异常,会导致整个系统异常,即整个系统耦合性太高。
- 调用流程中新增X系统,那么订单系统也需要适配,即订单系统可维护性太低。
综上,系统的耦合性越高,容错性就越低,可维护性就越低。
如下,使用MQ作为中间件,用户下订单只需要将消息发送给MQ,而其他系统只需要从MQ中取出消息进行消费。
- 如果,库存系统存在短暂的异常,最终可以恢复并消费MQ中的消息,即整个系统的耦合性就可以降低。
- 加入新增一个X系统,X系统只需要从MQ取出消息进行消费即可,订单系统不再需要适配,即可维护性提高。
综上,使用 MQ 使得应用间解耦,提升容错性和可维护性。
1.2.2 异步提速
如下:
一个下单操作耗时:20 + 300 + 300 + 300 = 920ms
用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!
使用MQ后:
用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。
提升用户体验和系统吞吐量(单位时间内处理请求的数目)。
1.2.3 削峰填谷
如下图:
A系统最多每秒处理1000个请求,当用户请求瞬间增多到每秒5000请求,A系统处理不了。
使用MQ后,请求可以先存在MQ中,A系统每秒从MQ中拿1000个请求进行处理。
使用了 MQ 之后,系统消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。
综上,使用MQ后可以提高系统稳定性。
1.3 MQ劣势
系统可用性降低:系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?
系统复杂度提高:MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
一致性问题:A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?
1.4 MQ使用场景
- 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。
- 容许短暂的不一致性,保证最终一致性。
- 确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。
实际上,在openstack中rpc.call方式可以做到同步请求的效果,即请求+响应。
1.5 MQ产品
1.6 RabbitMQ简介
AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。
2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
RabbitMQ 基础架构如下图:
RabbitMQ 中的相关概念:
- Broker:接收和分发消息的应用,即RabbitMQ Server。如果有多个rabbitmq组成集群,那么就会有多个Broker。
- Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等。
- Connection:publisher/consumer 和 broker 之间的 TCP 连接。
- Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。
- Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
- Queue:消息最终被送到这里等待 consumer 取走。
- Binding key:用于设定exchange 和 queue 之间的绑定关系。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。
- routing key:每条消息都要带着routing key,传递给exchange后,exchange通过routing key 和 binding key决定将消息转发到哪个queue。
1.6 RabbitMQ安装
下载安装包:
http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
http://www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
启动rabbitmq:
为了能在浏览器访问mq管理控制台,还需要做以下操作:
- 开启管理界面: rabbitmq-plugins enable rabbitmq_management
- 修改默认配置信息:vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
- 重启rabbitmq:service rabbitmq-server restart
前台登录的端口默认为15672,tcp连接的端口为5672:
1.7 配置虚拟主机及用户
1.7.1 用户角色
RabbitMQ在安装好后,可以访问http://ip地址:15672 ;其自带了guest/guest的用户名和密码;如果需要创建自定义用户;那么也可以登录管理界面后,如下操作:
1.7.1.1 角色
- 超级管理员(administrator):
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。 - 监控者(monitoring):
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等) - 策略制定者(policymaker):
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。 - 普通管理者(management):
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。 - 其他:
无法登陆管理控制台,通常就是普通的生产者和消费者。
1.7.2 Virtual Hosts配置
1.7.2.1 创建 Virtual Hosts
像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。RabbitMQ也有类似的权限管理;在RabbitMQ中可以设置虚拟消息服务器Virtual Host,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。Virtual Name一般以/开头。
1.7.2.2 设置Virtual Hosts权限
点击虚拟主机名进入配置页面:
选择可以管理这个虚拟主机的用户:
2 MQ 基本用法
2.1 简单模式
参考官网:https://www.rabbitmq.com/tutorials/tutorial-one-python.html
2.1.1 生产者
send.py
import pikacredentials = pika.credentials.PlainCredentials("mq_dev", "123456")
virtual_host = "/wy"
# 1. 连接broker,创建connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host="XXX.XXX.XXX.XXX",port=5672,credentials=credentials,virtual_host=virtual_host))
# 2. 创建channel
channel = connection.channel()# 3. 创建queue
channel.queue_declare(queue='hello')# 4. 简单模式下使用默认exchange发送消息给queue,
# 只需要保证routing_key和队列名称一样即可
channel.basic_publish(exchange='',routing_key='hello',body='Hello World!')# 5. 关闭连接
connection.close()
执行后send.py后,在后台或前台都可以看到创建的 hello 队列信息:
2.1.2 消费者
import os
import sysimport pikacredentials = pika.credentials.PlainCredentials("mq_dev", "123456")
virtual_host = "/wy"# 1. ch:channel信息
# 2. method:consumer_tag、exchange、routing 等信息
# 3. properties:配置信息
# 4. 消息内容
def callback(ch, method, properties, body):print(" [x] ch %s" % ch)print(" [x] method %s" % method)print(" [x] properties %s" % properties)print(" [x] Received %r" % body)def main():# 1. 连接broker,创建connectionconnection = pika.BlockingConnection(pika.ConnectionParameters(host="XXX.XXX.XXX.XXX",port=5672,credentials=credentials,virtual_host=virtual_host))# 2. 创建channelchannel = connection.channel()# 3. 创建queue,因为不确定生产者和消费者哪个先启动,# 所以在生产者和消费者启动时都去创建queue,保证queue存在channel.queue_declare(queue='hello')# 4. 创建消费者,callback会在接收到消息时调用# auto_ack=True参数即消费者接收到消息(没有处理)需要给 rabbitmq 发送确认消息,# 然后MQ再删除队列中的消息,这种方式相对于处理完业务手动确认要高效一些,但是有消息丢失的风险。channel.basic_consume(queue='hello',auto_ack=True,on_message_callback=callback)print(' [*] Waiting for messages. To exit press CTRL+C')# 5. 启动循环监听channel.start_consuming()if __name__ == '__main__':try:main()except KeyboardInterrupt:print('Interrupted')try:sys.exit(0)except SystemExit:os._exit(0)
2.1.3 小结
- 简单模式不需要创建exchange,使用默认exchange=""即可,需要保证routing_key和queue名称一致。
- 消费者处理完消息后需要发送ack确认信息,然后MQ再删除队列中的消息,保证消息不会丢失。
2.2 Work Queues
Work Queues:与简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息,多个消费者之间是竞争关系。
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
2.2.1 生产者
send.py:
import pika
import randomcredentials = pika.credentials.PlainCredentials("mq_dev", "123456")
virtual_host = "/wy"
# 1. 连接broker,创建connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host="XXX.XXX.XXX.XXX",port=5672,credentials=credentials,virtual_host=virtual_host))
# 2. 创建channel
channel = connection.channel()# 3. 创建queue
channel.queue_declare(queue='worker_queue', durable=True)# 4. 使用默认exchange发送消息给queue
# delivery_mode=2,代表消息持久化到磁盘,如果MQ重启也不会消息丢失
# 唯一可能丢失的是,收到消息,还未保存到磁盘时。更可靠的方式可以采用publisher confirms.
for i in range(5):channel.basic_publish(exchange='',routing_key='worker_queue',body='Hello World ' + str(i),properties=pika.BasicProperties(delivery_mode=2))# 5. 关闭连接
connection.close()
2.2.2 消费者
worker.py:
import os
import sys
import timeimport pikacredentials = pika.credentials.PlainCredentials("mq_dev", "123456")
virtual_host = "/wy"def callback(ch, method, properties, body):print(" [x] Received and start do: %r" % body)time.sleep(15)print(" [x] Received and do end: %r" % body)# 处理完成业务后再手动向 rabbitmq 发送确认消息,# 然后MQ再删除队列中的消息,保证消息不丢失。ch.basic_ack(delivery_tag=method.delivery_tag)def main():# 1. 连接broker,创建connectionconnection = pika.BlockingConnection(pika.ConnectionParameters(host="XXX.XXX.XXX.XXX",port=5672,credentials=credentials,virtual_host=virtual_host))# 2. 创建channelchannel = connection.channel()# 3. 创建queue,因为不确定生产者和消费者哪个先启动,# 所以在生产者和消费者启动时都去创建queue,保证queue存在channel.queue_declare(queue='worker_queue', durable=True)# 4. prefetch_count=1代表每次只推送一个消息给消费者,如果消费者1在处理其他消息,# 则将消息推送给其他空闲的消费者,这样不会造成某个消费者堆积很多消息来不及处理channel.basic_qos(prefetch_count=1)# 5. 创建消费者,callback会在接收到消息时调用channel.basic_consume(queue='worker_queue',on_message_callback=callback)print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()if __name__ == '__main__':try:main()except KeyboardInterrupt:print('Interrupted')try:sys.exit(0)except SystemExit:os._exit(0)
启动3个消费者,通过命令行可以查到worker_queue当前有3个消费者在等待消费。
消费者中通过time.sleep(15)模拟消费者处理消息的过程,如下生产者一次发送5个消息(messages),其中3个消息正在处理(message_unackpneledged),2个消息等待处理(messages_reday)。
2.2.3 小结
2.2.3.1 持久化问题
如果MQ收到消息还未发送发送给消费者处理,这时MQ重启则会导致消息丢失。
为了防止这种情况,可以使用将消息持久化到磁盘。
- queue持久化:
# durable=True参数指定队列持久化channel.queue_declare(queue='worker_queue', durable=True)
- 消息持久化:
# properties=pika.BasicProperties(delivery_mode=2)
# 生产者发送消息时,properties参数指定delivery_mode=2,即消息持久化
channel.basic_publish(exchange='',routing_key='worker_queue',body='Hello World ' + str(random.randint(0, 10)),properties=pika.BasicProperties(delivery_mode=2))
即使消息持久化到磁盘还存在一种可能,就是收到消息还未来得及持久化到磁盘时MQ重启。
更可靠的一种方案是 publisher confirms:https://www.rabbitmq.com/confirms.html。
2.2.3.2 消息分发问题
MQ 分发消息使用的是 Fair dispatch,类似于轮询分发的方式,如果 消费者1 还未处理完消息,MQ又推送一条消息给 消费者1,这样就会造成 消费者1 的消息积压,处理不及时。
解决方案:指定每次只发送给消费者一条消息,未处理完时,消息先积压在MQ中,如果有其他空闲的消费者,也可以分发给其他消费者。
消费者channel指定prefetch_count=1,即每次只取1条消息进行消费
channel.basic_qos(prefetch_count=1)
2.2.3.3 消息确认
消费者发送消息确认有如下两种方式:
# 自动确认auto_ack=True
channel.basic_consume(queue='hello',auto_ack=True,on_message_callback=callback)# 在回调函数中手动确认
# ch.basic_ack(delivery_tag=method.delivery_tag)
def callback(ch, method, properties, body):print(" [x] Received and start do: %r" % body)time.sleep(15)print(" [x] Received and do end: %r" % body)ch.basic_ack(delivery_tag=method.delivery_tag)
2.3 发布订阅(Publish/Subscribe)
在订阅模型中,多了一个 Exchange 角色,简单模式和worker queue中设置了exchange="",这时会将消息转发给routing_key同名的队列中。
Exchange(交换机):负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
2.3.1 生产者
send.py:
import pikacredentials = pika.credentials.PlainCredentials("mq_dev", "123456")
virtual_host = "/wy"# 1. 连接broker,创建connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host="XXX.XXX.XXX.XXX",port=5672,credentials=credentials,virtual_host=virtual_host))
# 2. 创建channel
channel = connection.channel()# 3. 创建名为logs的exchange,类型为fanout
channel.exchange_declare(exchange="logs", exchange_type="fanout")# 4. 发送消息,fanout类型不需要指定routing_key,
# 所有与exchange绑定的queue都会收到消息
channel.basic_publish(exchange="logs", routing_key="", body="Hello world")# 5. 关闭连接
connection.close()
2.3.2 消费者
recevie.py:
import os
import sys
import timeimport pikacredentials = pika.credentials.PlainCredentials("mq_dev", "123456")
virtual_host = "/wy"def callback(ch, method, properties, body):print(" [x] Received and start do: %r" % body)time.sleep(15)print(" [x] Received and do end: %r" % body)def main():# 1. 连接broker,创建connectionconnection = pika.BlockingConnection(pika.ConnectionParameters(host="XXX.XXX.XXX.XXX",port=5672,credentials=credentials,virtual_host=virtual_host))# 2. 创建channelchannel = connection.channel()# 3. 创建名为logs的exchange,类型为fanoutchannel.exchange_declare(exchange="logs", exchange_type="fanout")# 4. 创建随机queue,设置queue=“”会生成amq.gen-XXX的队列。# exclusive=True,即消费者连接断开后会自动删除该队列result = channel.queue_declare(queue="", exclusive=True)queue_name = result.method.queue# 5. 绑定exchange和queuechannel.queue_bind(queue=queue_name, exchange="logs")# 6. 接收消息channel.basic_consume(queue=queue_name,auto_ack=True,on_message_callback=callback)# 7. 循环监听channel.start_consuming()if __name__ == '__main__':try:main()except KeyboardInterrupt:print('Interrupted')try:sys.exit(0)except SystemExit:os._exit(0)
使用rabbitmqctl list_bindings查看 exchange 和 queue 的绑定关系:
2.3.3 小结
2.3.3.1 fanout类型exchange
exchange有direct,、topic、headers、fanout四种类型。 发布订阅模式使用的exchange是fanout类型,即会将消息广播给所有与exchange绑定的queue。
fanout类型的exchange,消费者发送消息时不需要指定具体的routing_key,因为会转发给所有的queue。
channel.basic_publish(exchange="logs", routing_key="", body="Hello world")
2.3.3.2 生成随机队列
消费者生成随机队列与exchange绑定,可以设置exclusive=True,在消费者断开连接后自动删除随机队列。
# 4. 创建随机queue,设置queue=“”会生成amq.gen-XXX的队列。
# exclusive=True,即消费者连接断开后会自动删除该队列
result = channel.queue_declare(queue="", exclusive=True)
queue_name = result.method.queue# 5. 绑定exchange和queue
channel.queue_bind(queue=queue_name, exchange="logs")
2.4 Routing
队列与交换机的绑定时指定一个 routing_key(路由key);在向 Exchange 发送消息时,也必须指定消息的 routing_key。
Exchange 会根据消息的 routing_key 进行判断,只有队列的 routing_key 与消息的 routing_key 完全一致,才会接收到消息。
2.4.1 生产者
import sys
import pikacredentials = pika.credentials.PlainCredentials("mq_dev", "123456")
virtual_host = "/wy"# 1. 连接broker,创建connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host="XXX.XXX.XXX.XXX",port=5672,credentials=credentials,virtual_host=virtual_host))
# 2. 创建channel
channel = connection.channel()# 3. 创建exchange,类型为direct
channel.exchange_declare(exchange="direct_logs", exchange_type="direct")# 4. 发送消息,根据传入的命令行参数决定routing_key(debug/info/error)
routing_key = sys.argv[1]
channel.basic_publish(exchange="direct_logs", routing_key=routing_key,body="Hello world")# 5. 关闭连接
connection.close()
2.4.2 消费者
import os
import sys
import timeimport pikacredentials = pika.credentials.PlainCredentials("mq_dev", "123456")
virtual_host = "/wy"def callback(ch, method, properties, body):print(" [x] Received and start do: %r" % body)time.sleep(15)print(" [x] Received and do end: %r" % body)def main():# 1. 连接broker,创建connectionconnection = pika.BlockingConnection(pika.ConnectionParameters(host="XXX.XXX.XXX.XXX",port=5672,credentials=credentials,virtual_host=virtual_host))# 2. 创建channelchannel = connection.channel()# 3. 创建exchange,类型为directchannel.exchange_declare(exchange="direct_logs", exchange_type="direct")# 4. 创建随机queueresult = channel.queue_declare(queue="", exclusive=True)queue_name = result.method.queue# 5. 绑定exchange和queue# 通过命令行参数传入当前要创建的queue和exchange的绑定关系,即binding keylevels = sys.argv[1:]for level in levels:channel.queue_bind(queue=queue_name, exchange="direct_logs",routing_key=level)# 6. 接收消息channel.basic_consume(queue=queue_name,auto_ack=True,on_message_callback=callback)# 7. 循环监听channel.start_consuming()if __name__ == '__main__':try:main()except KeyboardInterrupt:print('Interrupted')try:sys.exit(0)except SystemExit:os._exit(0)
如下图: debug、info作为binding key绑定了同一个队列1;error作为binding key绑定了另一个队列2。
当发送消息时,指定routing_key为info或debug时,消息会分发到队列1;
当发送消息时,指定routing_key为error时,消息会分发到队列2;
通过 rabbitmqctl list_bindings -p /wy | grep direct_logs 查看绑定关系:
2.4.3 小结
2.4.3.1 direct类型exchange
direct类型的exchange,在创建时指定类型为direct。
- 在创建exchange时指定类型为direct类型。
- 生产者发送消息时,需要指定消息的routing_key,exchange 根据routing key判断将消息交给哪个队列。
- 消费者绑定 queue和exchange 时指定一个或多个routing_key,即queue和exchange的路由关系。
# 创建exchange,类型为direct
channel.exchange_declare(exchange="direct_logs", exchange_type="direct")
# 生产者
routing_key = sys.argv[1]
channel.basic_publish(exchange="direct_logs", routing_key=routing_key,body="Hello world")
# 消费者
levels = sys.argv[1:]
for level in levels:channel.queue_bind(queue=queue_name, exchange="direct_logs",routing_key=level)
下面这种direct类型,绑定两个相同routing_key的queue,与fanout类型效果是一样的,这样做也是合法的。
2.5 Topics
Topic 类型与 Direct 都是根据 routing_key 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符;routing_key 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert 。
通配符规则:# 匹配一个或多个词;* 匹配1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert;item.* 只能匹配 item.insert
2.5.1 生产者
send.py:
import sysimport pikacredentials = pika.credentials.PlainCredentials("mq_dev", "123456")
virtual_host = "/wy"
# 1. 连接broker,创建connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host="XXX.XXX.XXX.XXX",port=5672,credentials=credentials,virtual_host=virtual_host))
# 2. 创建channel
channel = connection.channel()# 3. 创建exchange
channel.exchange_declare(exchange="topic_logs", exchange_type="topic")# 4. 从命令行获取message和routing_key
routing_key = sys.argv[1]
message = sys.argv[2]
channel.basic_publish(exchange='topic_logs',routing_key=routing_key,body=message)# 5. 关闭连接
connection.close()
2.5.2 消费者
receive.py:
import os
import sys
import timeimport pikacredentials = pika.credentials.PlainCredentials("mq_dev", "123456")
virtual_host = "/wy"def callback(ch, method, properties, body):print(" [x] Received and start do: %r" % body)time.sleep(15)print(" [x] Received and do end: %r" % body)def main():# 1. 连接broker,创建connectionconnection = pika.BlockingConnection(pika.ConnectionParameters(host="XXX.XXX.XXX.XXX",port=5672,credentials=credentials,virtual_host=virtual_host))# 2. 创建channelchannel = connection.channel()# 3. 创建queueresult = channel.queue_declare(queue='', exclusive=True)queue_name = result.method.queue# 4. 创建exchangechannel.exchange_declare(exchange="topic_logs", exchange_type="topic")# 5. 绑定exchange和queuebinding_keys = sys.argv[1:]for key in binding_keys:channel.queue_bind(queue=queue_name, exchange="topic_logs",routing_key=key)# 6. 创建消费者channel.basic_consume(queue=queue_name,auto_ack=True,on_message_callback=callback)print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()if __name__ == '__main__':try:main()except KeyboardInterrupt:print('Interrupted')try:sys.exit(0)except SystemExit:os._exit(0)
如下:
创建binding_key为 #,代表匹配任意多个单词,即匹配所有单词,空字符串也可以匹配
创建binding_key为 kern.*,代表匹配 kern.任意一个或0个单词。
注意:kern.可以匹配到,kern匹配不到。
创建两个消费者,发送test.critical时两个消费者都能接收到消息;发送kern.test时只有消费者1能接受到消息。
2.5.3 小结
使用Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。
2.6 Remote procedure call (RPC)
2.6.1 rpc_client
import sysimport pika
import uuidcredentials = pika.credentials.PlainCredentials("mq_dev", "123456")
virtual_host = "/wy"class FibonacciRpcClient(object):def __init__(self):# 1. 创建connectionself.connection = pika.BlockingConnection(pika.ConnectionParameters(host="XXX.XXX.XXX.XXX",port=5672,credentials=credentials,virtual_host=virtual_host))# 2. 创建channelself.channel = self.connection.channel()# 3. 创建回调队列,用于接收rpc_server返回的消息result = self.channel.queue_declare(queue="", exclusive=True)self.callback_queue = result.method.queue# 4. 创建rpc_client消费者,用于接收rpc_server返回的消息self.channel.basic_consume(queue=self.callback_queue,on_message_callback=self.on_response,auto_ack=True)def on_response(self, ch, method, properties, body):# RPC服务端可能会在向client端发送答案之后,发送请求的确认消息之前死亡。# 如果发生这种情况,RPC服务器重新启动后将再次处理该请求,返回结果。# 而 client 端已经接受过响应,如果这时 client 正在等待另一个响应,# 通过对比client端的correlation_id和响应的correlation_id就可以直到是不是需要response# 在客户端上我们必须妥善处理重复的响应,理想情况下RPC应该是幂等的。if properties.correlation_id == self.corr_id:self.response = bodydef call(self, n):self.response=Noneself.corr_id = str(uuid.uuid4())# 5. 创建生产者,发送消息给rpc_server# a. exchange="",消息会发送给routing_key字段值同名的queue,即rpc_queue# b. 队列指定为rpc_queue,一般能调用rpc.call,那么server肯定启动了,在server端会创建queue# c. properties的reply_to参数指定rpc_server返回数据时传递的队列名# d. properties的correlation_id参数用于关联rpc request和 responseself.channel.basic_publish(exchange="",routing_key="rpc_queue",properties=pika.BasicProperties(reply_to=self.callback_queue,correlation_id=self.corr_id),body=str(n))# 6. 阻塞等待服务端响应,直到收到响应while self.response is None:self.connection.process_data_events()print(self.response)print("Got %s" % self.response)fibonacci_rpc = FibonacciRpcClient()number = sys.argv[1]
print("Requesting fib(%s)" % number)
fibonacci_rpc.call(number)
2.6.2 rpc_server
import pikacredentials = pika.credentials.PlainCredentials("mq_dev", "123456")
virtual_host = "/wy"def fib(n):if n == 0:return 0elif n == 1:return 1else:return fib(n - 1) + fib(n - 2)def on_request(ch, method, proprity, body):n = int(body)print(" [.] fib(%s)" % n)response = fib(n)# routing_key设置为proprity.reply_to,即消息转给client端的callback_queue# correlation_id设置为client端传过来的proprity.correlation_idch.basic_publish(exchange="",routing_key=proprity.reply_to,properties=pika.BasicProperties(correlation_id=proprity.correlation_id),body=str(response))# 1. 创建connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host="XXX.XXX.XXX.XXX",port=5672,credentials=credentials,virtual_host=virtual_host))# 2. 创建channel
channel = connection.channel()# 3. 设置消费者每次只接受一条消息
channel.basic_qos(prefetch_count=1)# 4. 创建队列
channel.queue_declare(queue='rpc_queue')# 5. 创建消费者
channel.basic_consume(queue="rpc_queue",on_message_callback=on_request,auto_ack=True)channel.start_consuming()
2.6.2 小结
2.6.2.1 rpc_client
- 定义接收rpc_server响应的callback_queue。
# 创建channel
self.channel = self.connection.channel()# 创建回调队列,用于接收rpc_server返回的消息
result = self.channel.queue_declare(queue="", exclusive=True)
self.callback_queue = result.method.queue# 创建rpc_client消费者,用于接收rpc_server返回的消息
self.channel.basic_consume(queue=self.callback_queue,on_message_callback=self.on_response,auto_ack=True)
- callback_queue消息的处理中需要对比properties.correlation_id是否和rpc_client端保存correlation_id一致,防止重复消息。
if properties.correlation_id == self.corr_id:self.response = body
发送消息时exchange="", 消息会发送给routing_key字段值同名的queue,即rpc_queue。
通过properties的reply_to参数指定rpc_server返回数据时rpc_client端接收数据queue。
properties的correlation_id参数用于关联rpc request和response;client端发送消息时设置correlation_id,server端响应时设置收到的correlation_id;在client端的callback_queue回调函数中做防重复消息处理。
self.channel.basic_publish(exchange="",routing_key="rpc_queue",properties=pika.BasicProperties(reply_to=self.callback_queue,correlation_id=self.corr_id),body=str(n))
2.6.2.2 rpc_server
rpc_queue在服务端启动的时候创建。
server端在callback中发送消息给client端,并指定routing_key=proprity.reply_to,即发送给client端定义的callback_queue。property.correlation_id指定当前响应的唯一id(client端传入的)。
# routing_key设置为proprity.reply_to,即消息转给client端的callback_queue# correlation_id设置为client端传过来的proprity.correlation_idch.basic_publish(exchange="",routing_key=proprity.reply_to,properties=pika.BasicProperties(correlation_id=proprity.correlation_id),body=str(response))
Rabbitmq 基础相关推荐
- java B2B2C Springcloud电子商务平台源码-RabbitMQ基础概念...
RabbitMQ是一个由erlang开发的AMQP的开源实现. 需要JAVA Spring Cloud大型企业分布式微服务云构建的B2B2C电子商务平台源码 一零三八七七四六二六 AMQP,即Adva ...
- RabbitMQ基础知识详解
RabbitMQ基础知识详解 2017年08月28日 20:42:57 dreamchasering 阅读数:41890 标签: RabbitMQ 什么是MQ? MQ全称为Message Queue, ...
- Ruby使用RabbitMQ(基础)
Ruby使用RabbitMQ(基础) RabbitMQ documentation rabbitmq-tutorials rabbitmq-configure bunny 前提 最近刚刚接触到mq, ...
- rabbitmq基础1——消息中间件概念、Rabbitmq的发展起源和基本组件的作用流程
文章目录 一.消息中间件 1.1 概念 1.2 作用 1.2.1 消息队列持久化 1.2.2 消息队列分发策略 1.2.3 消息队列的高可用和高可靠 1.2.3.1 一主多从共享集群 1.2.3.2 ...
- rabbitmq基础2——rabbitmq二进制安装和docker安装、基础命令
文章目录 一.RabbitMQ安装 1.1 二进制安装 1.2 rabbitmqctl工具 1.3 docker安装 二.rabbitmq基础命令 2.1 多租户与权限类 2.1.1 创建虚拟主机 2 ...
- (2)RabbitMQ基础概念及工作流程详解
上一节中我们对MQ做了一个概要介绍,这一节开始我们选取RabbitMQ开始进行学习,本节将会RabbitMQ做个简单介绍,并且会对其常见的基础概念做个讲解,最后会简单介绍一下RabbitMQ的工作流程 ...
- RabbitMQ基础知识介绍、RabbitMQ的安装
RabbitMQ基础知识介绍 官方解释:MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过 读写出入队列的消息 ...
- 转 RabbitMQ 基础概念及 Spring 的配置和使用 推荐好文 举例讲解
从不知道到了解-RabbitMQ 基础概念及 Spring 的配置和使用 转: sumile.cn » 从不知道到了解-RabbitMQ 基础概念及 Spring 的配置和使用 序言 你在系统中是 ...
- RabbitMQ基础概念详细介绍
转至:http://www.ostest.cn/archives/497 引言 你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用.通讯的问题而苦恼. ...
最新文章
- Java数字包装类基本程序,Java基本数据类型包装类
- android listview divider color,android listview 属性
- Linq的简介和基础知识学习
- Visual Entity 手册(十一)代码生成设置
- [css] 你会经常用到伪元素吗?一般都用在哪方面?
- python 2048源码_一个python的2048简单实现
- FileOutputStream输出流
- pythonwin下载中文版_Python官方下载 v3.9.0中文版_Win10镜像官网
- Python-GeoPandas地图、专题地图绘制
- 西数文件共享服务器,数据轻松共享 西数Live网络硬盘首测
- React中文文档 9. 表单
- 2021年数据泄露成本报告解读
- 2、Class和Subclass
- iTop4412 Booting Sequence
- 深度学习--解决模型过拟合的问题
- 最近失业了,在做副业的路上走了很多坑
- 《电子测量与仪器学报》杂志投稿总结
- NBA中的那些黑科技
- 总结2012年世界经济形势主要特征
- 迅睿CMS 网站安全权限划分
热门文章
- python中def main是什么意思_Python中’__main__’模块的作用
- kmeans python interation flag_机器学习经典算法-logistic回归代码详解
- 计算机画图更改,如何用电脑画图功能修改图片与加字
- python装饰器和异常处理_装饰器异常处理-面向对象编程-Python教程自动化开发_Python视频教程...
- 百分比收益率和对数收益率
- 有趣的手机壁纸——水印壁纸
- std::set用法
- java拼图游戏(带文档资料)
- 音乐计算机曲谱狂妄之人,undertale狂妄之人简谱
- 《Python基础教程》学习笔记——异常