python操作RabbitMQ
RabbitMQ介绍
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue)的开源实现的产品,RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接受者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收message
- 内部架构:
说明
- Message (消息):RabbitMQ 转发的二进制对象,包括Headers(头)、Properties (属性)和 Data (数据),其中数据部分不是必要的。Producer(生产者): 消息的生产者,负责产生消息并把消息发到交换机
Exhange的应用。
- Consumer (消费者):使用队列 Queue 从 Exchange 中获取消息的应用。
- Exchange (交换机):负责接收生产者的消息并把它转到到合适的队列
Queue (队列):一个存储Exchange 发来的消息的缓冲,并将消息主动发送给Consumer,或者 Consumer 主动来获取消息。见 1.4 部分的描述。
Binding (绑定):队列 和 交换机 之间的关系。Exchange 根据消息的属性和 Binding 的属性来转发消息。绑定的一个重要属性是 binding_key。
Connection (连接)和 Channel (通道):生产者和消费者需要和 RabbitMQ 建立 TCP 连接。一些应用需要多个connection,为了节省TCP 连接,可以使用 Channel,它可以被认为是一种轻型的共享 TCP 连接的连接。连接需要用户认证,并且支持 TLS (SSL)。连接需要显式关闭。
参考:RabbitMQ百度百科
RabbitMQ安装
centos安装:
http://www.rabbitmq.com/install-rpm.html
windows安装:
http://www.rabbitmq.com/install-windows.html
安装python rabbitMQ module:
pip install pika
or
easy_install pika
or
源码https://pypi.python.org/pypi/pika
Python操作RabbitMQ
1.实现简单消息队列
一个Product向queue发送一个message,一个Client从该queue接收message并打印
远程连接rabbitmq server,需要配置权限:
首先在rabbitmq server上创建一个用户
sudo rabbitmqctl add_user alex alex3714
同时还要配置权限,允许从外面访问
sudo rabbitmqctl set_permissions -p / alex ".*" ".*" ".*"
set_permissions [-p vhost] {user} {conf} {write} {read}
- vhost
The name of the virtual host to which to grant the user access, defaulting to /.
- user
The name of the user to grant access to the specified virtual host.
- conf
A regular expression matching resource names for which the user is granted configure permissions.
- write
A regular expression matching resource names for which the user is granted write permissions.
- read
A regular expression matching resource names for which the user is granted read permissions.
- 发消息 product
import pikacredentials = pika.PlainCredentials('alex','alex3714') # 凭证connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.152.134',port=5672,credentials=credentials)) # 定义连接池channel = connection.channel() # 生成连接通道channel.queue_declare(queue='test') # 声明队列以向其发送消息channel.basic_publish(exchange='',routing_key='test',body='Hello World!') # 注意当未定义exchange时,routing_key需和queue的值保持一致print('send success msg to rabbitmq') connection.close() # 关闭连接
- 收消息,client
import pikacredentials = pika.PlainCredentials('alex','alex3714') # 凭证connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.152.134',port=5672,credentials=credentials)) # 连接参数channel = connection.channel() # 生成连接通道channel.queue_declare(queue='test') # 声明队列。之所以消费者也需要声明队列,是为了防止生产者未声明队列,导致运行报错。def callback(ch, method, properties, body):""" 回调函数,处理从rabbitmq中取出的消息:param ch: 通道:param method: 方法:param properties: 属性:param body: 内容:return: 接收到得信息""" print("[x] Received %r" % body)# print(ch,method,properties,body)""" <pika.adapters.blocking_connection.BlockingChannel object at 0x0000000002F1DB70><Basic.Deliver(['consumer_tag=ctag1.3c1d688587c447e5ac3a72ea99e98cac', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=test'])><BasicProperties> b'Hello World!'""" channel.basic_consume(callback, queue='test', no_ack=True) # no_ack 表示不需要发送ack。默认是False,表示开启状态。print('[*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() # 开始监听,接收消息
执行效果:
#product端: send success msg to rabbitmq#client端:[*] Waiting for messages. To exit press CTRL+C[x] Received b'Hello World!'
- 消息确认
当客户端从队列中取出消息之后,可能需要一段时间才能处理完成,如果在这个过程中,客户端出错了,异常退出了,而数据还没用处理完成,那么非常不幸,这段数据就丢失了,因为rabbitmq默认会把此消息标记为已完成,然后从队列中移除。
消息确认是客户端从rabbitmq中取出消息,并处理完成之后,会发送一个ack告诉rabbitmq,消息处理完成,当rabbitmq收到客户端的获取消息请求之后,或标记为处理中,当再次收到ack之后,才会标记为已完成,然后从队列中删除。当rabbitmq检测到客户端和自己断开链接之后,还没收到ack,则会重新将消息放回消息队列,交给下一个客户端处理,保证消息不丢失,也就是说,RabbitMQ给了客户端足够长的时间来做数据处理。
在客户端使用no_ack来标记是否需要发送ack,默认是False,开启状态。
product向rabbitmq发送两条消息:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, )) #定义连接池 channel = connection.channel() #声明队列以向其发送消息消息channel.queue_declare(queue='test') channel.basic_publish(exchange='', routing_key='test', body='1') channel.basic_publish(exchange='', routing_key='test', body='2') channel.basic_publish(exchange='', routing_key='test', body='3') print('send success msg to rabbitmq') connection.close() #关闭连接
客户端接受消息,不发送ack
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672)) channel = connection.channel()channel.queue_declare(queue='test')def callback(ch, method, properties, body):'''回调函数,处理从rabbitmq中取出的消息'''print(" [x] Received %r" % body)time.sleep(5)#ch.basic_ack(delivery_tag = method.delivery_tag) #发送ack消息channel.basic_consume(callback,queue='test',no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() #开始监听 接受消息
执行结果,发现消息并没有从队列中删除
第一次执行:[*] Waiting for messages. To exit press CTRL+C[x] Received b'1'[x] Received b'2'[x] Received b'3' 第二次执行:[*] Waiting for messages. To exit press CTRL+C[x] Received b'1'[x] Received b'2'[x] Received b'3'
加入ack之后:
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672)) channel = connection.channel()channel.queue_declare(queue='test')def callback(ch, method, properties, body):'''回调函数,处理从rabbitmq中取出的消息'''print(" [x] Received %r" % body)time.sleep(5)ch.basic_ack(delivery_tag = method.delivery_tag) #发送ack消息channel.basic_consume(callback,queue='test',no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() #开始监听 接受消息
运行结果:发现第二次运行队列中已经没有消息
第一次:[*] Waiting for messages. To exit press CTRL+C[x] Received b'1'[x] Received b'2'[x] Received b'3 第二次:[*] Waiting for messages. To exit press CTRL+C
- 改变消息获取顺序
默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。
channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列
默认情况:使用product往队列中放10个数字
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, )) #定义连接池 channel = connection.channel() #声明队列以向其发送消息消息channel.queue_declare(queue='test') for i in range(10):channel.basic_publish(exchange='', routing_key='test', body=str(i))print('send success msg[%s] to rabbitmq' %i) connection.close() #关闭连接
运行结果;
send success msg[1] to rabbitmq send success msg[2] to rabbitmq send success msg[3] to rabbitmq send success msg[4] to rabbitmq send success msg[5] to rabbitmq send success msg[6] to rabbitmq send success msg[7] to rabbitmq send success msg[8] to rabbitmq send success msg[9] to rabbitmq
客户端1收消息:
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672)) channel = connection.channel()channel.queue_declare(queue='test')def callback(ch, method, properties, body):'''回调函数,处理从rabbitmq中取出的消息'''print(" [x] Received %r" % body)#time.sleep(5)ch.basic_ack(delivery_tag = method.delivery_tag) #发送ack消息channel.basic_consume(callback,queue='test',no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() #开始监听 接受消息
运行结果:
[*] Waiting for messages. To exit press CTRL+C[x] Received b'0'[x] Received b'2'[x] Received b'4'[x] Received b'6'[x] Received b'8'
客户端2收消息:和client1的区别是加了一个sleep(1)
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672)) channel = connection.channel()channel.queue_declare(queue='test')def callback(ch, method, properties, body):'''回调函数,处理从rabbitmq中取出的消息'''print(" [x] Received %r" % body)time.sleep(1)ch.basic_ack(delivery_tag = method.delivery_tag) #发送ack消息channel.basic_consume(callback,queue='test',no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() #开始监听 接受消息
执行结果:
[*] Waiting for messages. To exit press CTRL+C[x] Received b'1'[x] Received b'3'[x] Received b'5'[x] Received b'7'[x] Received b'9'
在两个客户端里加入channel.basic_qos(prefetch_count=1)参数
客户端1:
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672)) channel = connection.channel()channel.queue_declare(queue='test')def callback(ch, method, properties, body):'''回调函数,处理从rabbitmq中取出的消息'''print(" [x] Received %r" % body)##time.sleep(1)ch.basic_ack(delivery_tag = method.delivery_tag) #发送ack消息channel.basic_qos(prefetch_count=1) #添加不按顺序分配消息的参数 channel.basic_consume(callback,queue='test',no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() #开始监听 接受消息
执行效果:
[*] Waiting for messages. To exit press CTRL+C[x] Received b'0'[x] Received b'2'[x] Received b'3'[x] Received b'4'[x] Received b'5'[x] Received b'6'[x] Received b'7'[x] Received b'8'[x] Received b'9'
客户端2:
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672)) channel = connection.channel()channel.queue_declare(queue='test')def callback(ch, method, properties, body):'''回调函数,处理从rabbitmq中取出的消息'''print(" [x] Received %r" % body)time.sleep(1)ch.basic_ack(delivery_tag = method.delivery_tag) #发送ack消息channel.basic_qos(prefetch_count=1) channel.basic_consume(callback,queue='test',no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() #开始监听 接受消息
执行结果:
[*] Waiting for messages. To exit press CTRL+C [x] Received b'1'
发现,加入channel.basic_qos(prefetch_count=1)参数之后,客户端2由于sleep了1s,所以只拿到了一个消息,其他的消息都被client1拿到了
- 消息持久化 消息确认机制使得客户端在崩溃的时候,服务端消息不丢失,但是如果rabbitmq奔溃了呢?该如何保证队列中的消息不丢失? 此就需要product在往队列中push消息的时候,告诉rabbitmq,此队列中的消息需要持久化,用到的参数:durable=True,再次强调,Producer和client都应该去创建这个queue,尽管只有一个地方的创建是真正起作用的:
channel.basic_publish(exchange='', routing_key="test", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
具体代码:
product端:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, )) #定义连接池 channel = connection.channel() #声明队列以向其发送消息消息channel.queue_declare(queue='test_persistent',durable=True) for i in range(10):channel.basic_publish(exchange='', routing_key='test_persistent', body=str(i),properties=pika.BasicProperties(delivery_mode=2))print('send success msg[%s] to rabbitmq' %i) connection.close() #关闭连接
client端:
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672)) channel = connection.channel()channel.queue_declare(queue='test_persistent',durable=True)def callback(ch, method, properties, body):'''回调函数,处理从rabbitmq中取出的消息'''print(" [x] Received %r" % body)#time.sleep(5)ch.basic_ack(delivery_tag = method.delivery_tag) #发送ack消息channel.basic_qos(prefetch_count=1) channel.basic_consume(callback,queue='test_persistent',no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() #开始监听 接受消息 注意:client端也需配置durable=True,否则将报下面错误:pika.exceptions.ChannelClosed: (406, "PRECONDITION_FAILED - parameters for queue 'test_persistent' in vhost '/' not equivalent")
配置完之后,发现product往rabbitmq端push消息之后,重启rabbitmq,消息依然存在
[root@dns ~]# rabbitmqctl list_queues Listing queues ... abc 0 abcd 0 hello2 300 test 0 test1 20 test_persistent 10 ...done. [root@dns ~]# /etc/init.d/rabbitmq-server restart Restarting rabbitmq-server: SUCCESS rabbitmq-server. [root@dns ~]# rabbitmqctl list_queues Listing queues ... abc 0 abcd 0 hello2 300 test1 20 test_persistent 10 ...done.
参考文档:参考文档:http://www.rabbitmq.com/tutorials/tutorial-two-python.html
2.使用Exchanges:
exchanges主要负责从product那里接受push的消息,根据product定义的规则,投递到queue中,是product和queue的中间件
Exchange 类型
- direct 关键字类型
- topic 模糊匹配类型
- fanout 广播类型
使用fanout实现发布订阅者模型
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中
订阅者:
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672)) channel = connection.channel()channel.exchange_declare(exchange='test123',type='fanout') #定义一个exchange ,类型为fanout rest = channel.queue_declare(exclusive=True) #创建一个随机队列,并启用exchange queue_name = rest.method.queue #获取队列名 channel.queue_bind(exchange='test123',queue=queue_name) #将随机队列名和exchange进行绑定def callback(ch, method, properties, body):'''回调函数,处理从rabbitmq中取出的消息'''print(" [x] Received %r" % body)time.sleep(1)ch.basic_ack(delivery_tag = method.delivery_tag) #发送ack消息channel.basic_qos(prefetch_count=1) channel.basic_consume(callback,queue=queue_name,no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() #开始监听 接受消息
发布者:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, )) #定义连接池 channel = connection.channel() #声明队列以向其发送消息消息channel.exchange_declare(exchange='test123',type='fanout') for i in range(10):channel.basic_publish(exchange='test123', routing_key='', body=str(i),properties=pika.BasicProperties(delivery_mode=2))print('send success msg[%s] to rabbitmq' %i) connection.close() #关闭连接
注意:
需先定义订阅者,启动订阅者,否则发布者publish到一个不存在的exchange是被禁止的。如果没有queue bindings exchange的话,msg是被丢弃的。
- 使用direct 实现根据关键字发布消息
消息发布订阅者模型是发布者发布一条消息,所有订阅者都可以收到,现在rabbitmq还支持根据关键字发送,在发送消息的时候使用routing_key参数指定关键字,rabbitmq的exchange会判断routing_key的值,然后只将消息转发至匹配的队列,注意,此时需要订阅者先创建队列
配置参数为exchange的type=direct,然后定义routing_key即可
订阅者1: 订阅error,warning,info
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672)) channel = connection.channel()channel.exchange_declare(exchange='test321',type='direct') #定义一个exchange ,类型为fanout rest = channel.queue_declare(exclusive=True) #创建一个随机队列,并启用exchange queue_name = rest.method.queue #获取队列名severities = ['error','warning','info'] #定义三个routing_keyfor severity in severities:channel.queue_bind(exchange='test321', routing_key=severity,queue=queue_name)def callback(ch, method, properties, body):'''回调函数,处理从rabbitmq中取出的消息'''print(" [x] Received %r" % body)time.sleep(1)ch.basic_ack(delivery_tag = method.delivery_tag) #发送ack消息channel.basic_qos(prefetch_count=1) channel.basic_consume(callback,queue=queue_name,no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() #开始监听 接受消息
订阅者2:订阅error,warning
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672)) channel = connection.channel()channel.exchange_declare(exchange='test321',type='direct') #定义一个exchange ,类型为fanout rest = channel.queue_declare(exclusive=True) #创建一个随机队列,并启用exchange queue_name = rest.method.queue #获取队列名severities = ['error','warning'] #定义两个routing_keyfor severity in severities:channel.queue_bind(exchange='test321', routing_key=severity,queue=queue_name)def callback(ch, method, properties, body):'''回调函数,处理从rabbitmq中取出的消息'''print(" [x] Received %r" % body)time.sleep(1)ch.basic_ack(delivery_tag = method.delivery_tag) #发送ack消息channel.basic_qos(prefetch_count=1) channel.basic_consume(callback,queue=queue_name,no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() #开始监听 接受消息
发布者:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, )) #定义连接池 channel = connection.channel() #声明队列以向其发送消息消息channel.exchange_declare(exchange='test321',type='direct') channel.basic_publish(exchange='test321', routing_key='info', body='info msg',properties=pika.BasicProperties(delivery_mode=2)) #发送info msg到 info routing_key channel.basic_publish(exchange='test321', routing_key='error', body='error msg',properties=pika.BasicProperties(delivery_mode=2)) #发送error msg到 error routing_keyprint('send success msg[] to rabbitmq') connection.close() #关闭连接**
效果:发现订阅者1和订阅者2都收到error消息,但是只有订阅者1收到了info消息
订阅者1:[*] Waiting for messages. To exit press CTRL+C[x] Received b'info msg'[x] Received b'error msg' 订阅者2:[*] Waiting for messages. To exit press CTRL+C[x] Received b'error msg'
- 使用topic实现模糊匹配发布消息
direct实现了根据自定义的routing_key来标示不同的queue,使用topic可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列
# 表示可以匹配 0 个 或 多个 单词
* 表示只能匹配 一个 单词如:
fuzj.test 和fuzj.test.test
fuzj.# 会匹配到 fuzj.test 和fuzj.test.test
fuzj.* 只会匹配到fuzj.test
订阅者1: 使用#匹配
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672)) channel = connection.channel()channel.exchange_declare(exchange='test333',type='topic') #定义一个exchange ,类型为fanout rest = channel.queue_declare(exclusive=True) #创建一个随机队列,并启用exchange queue_name = rest.method.queue #获取队列名channel.queue_bind(exchange='test333', routing_key='test.#',queue=queue_name)def callback(ch, method, properties, body):'''回调函数,处理从rabbitmq中取出的消息'''print(" [x] Received %r" % body)time.sleep(1)ch.basic_ack(delivery_tag = method.delivery_tag) #发送ack消息channel.basic_qos(prefetch_count=1) channel.basic_consume(callback,queue=queue_name,no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() #开始监听 接受消息
订阅者2:使用*匹配
import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672)) channel = connection.channel()channel.exchange_declare(exchange='test333',type='topic') #定义一个exchange ,类型为fanout rest = channel.queue_declare(exclusive=True) #创建一个随机队列,并启用exchange queue_name = rest.method.queue #获取队列名channel.queue_bind(exchange='test333', routing_key='test.*',queue=queue_name)def callback(ch, method, properties, body):'''回调函数,处理从rabbitmq中取出的消息'''print(" [x] Received %r" % body)time.sleep(1)ch.basic_ack(delivery_tag = method.delivery_tag) #发送ack消息channel.basic_qos(prefetch_count=1) channel.basic_consume(callback,queue=queue_name,no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() #开始监听 接受消息
发布者:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, )) #定义连接池 channel = connection.channel() #声明队列以向其发送消息消息channel.exchange_declare(exchange='test333',type='topic') channel.basic_publish(exchange='test333', routing_key='test.123', body='test.123 msg',properties=pika.BasicProperties(delivery_mode=2)) channel.basic_publish(exchange='test333', routing_key='test.123.321', body=' test.123.321 msg',properties=pika.BasicProperties(delivery_mode=2))print('send success msg[] to rabbitmq') connection.close() #关闭连接
输出效果:
订阅者1:[*] Waiting for messages. To exit press CTRL+C[x] Received b'test.123 msg'[x] Received b' test.123.321 msg'订阅者2:[*] Waiting for messages. To exit press CTRL+C[x] Received b'test.123 msg'
- 实现RPC
- 过程:
客户端 Client 设置消息的 routing key 为 Service 的队列 op_q,设置消息的 reply-to 属性为返回的 response 的目标队列 reponse_q,设置其 correlation_id 为以随机UUID,然后将消息发到 exchange。比如
channel.basic_publish(exchange='', routing_key='op_q', properties=pika.BasicProperties(reply_to = reponse_q, correlation_id = self.corr_id),body=request)
Exchange 将消息转发到 Service 的 op_q
Service 收到该消息后进行处理,然后将response 发到 exchange,并设置消息的 routing_key 为原消息的 reply_to 属性,以及设置其 correlation_id 为原消息的 correlation_id 。
ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response))Exchange 将消息转发到 reponse_q
Client 逐一接受 response_q 中的消息,检查消息的 correlation_id 是否为等于它发出的消息的correlation_id,是的话表明该消息为它需要的response。
- 代码实现:
- 服务端:
import pika import subprocess connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, )) #定义连接池channel = connection.channel() #创建通道channel.queue_declare(queue='rpc_queue') #创建rpc_queue队列def operating(arg):p = subprocess.Popen(arg, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) #执行系统命令res = p.stdout.read() #取出标准输出if not res: #判断是否有执行结果responses_msg = p.stderr.read() #没有执行结果则取出标准错误输出else:responses_msg = resreturn responses_msgdef on_request(ch, method, props, body):command = str(body,encoding='utf-8')print(" [.] start Processing command : %s" % command)response_msg = operating(command) #调用函数执行命令ch.basic_publish(exchange='',routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id = props.correlation_id),body=str(response_msg))ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_qos(prefetch_count=1) #消息不平均分配,谁取谁得 channel.basic_consume(on_request, queue='rpc_queue') #监听队列print(" [x] Awaiting RPC requests") channel.start_consuming()
- 客户端
import pika import uuid import timeclass FibonacciRpcClient(object):def __init__(self):self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1',port=5672,)) #定义连接池self.channel = self.connection.channel() #创建通道result = self.channel.queue_declare(exclusive=True,auto_delete=True) #创建客户端短接受服务端回应消息的队列,\exclusive=True表示只队列只允许当前链接进行连接,auto_delete=True表示自动删除self.callback_queue = result.method.queue #获取队列名称self.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue) #从队列中获取消息def on_response(self, ch, method, props, body):if self.corr_id == props.correlation_id: #判断self.response = bodydef call(self, n):self.response = Noneself.corr_id = str(uuid.uuid4())self.channel.basic_publish(exchange='',routing_key='rpc_queue',properties=pika.BasicProperties(reply_to = self.callback_queue, #回应消息的队列correlation_id = self.corr_id, #correlation id可以理解为请求的唯一标识码),body=str(n))while self.response is None: #不断从自己监听的队列里取消息,直到取到消息self.connection.process_data_events()return self.response.decode()fibonacci_rpc = FibonacciRpcClient()print(" [x] Requesting server" ) time.sleep(0.1) while True:command = input('>> ')response = fibonacci_rpc.call(command)print(" [.] Get %r \n" % response)
- 服务端:
转载于:https://www.cnblogs.com/luchuangao/p/RabbitMQ.html
python操作RabbitMQ相关推荐
- Python菜鸟之路:Python基础-Python操作RabbitMQ
RabbitMQ简介 rabbitmq中文翻译的话,主要还是mq字母上:Message Queue,即消息队列的意思.rabbitmq服务类似于mysql.apache服务,只是提供的功能不一样.ra ...
- Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy
Memcached Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度 ...
- python 操作RabbitMQ
pip install pika使用API操作RabbitMQ基于Queue实现生产者消费者模型View Code 对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务 ...
- python_day10のPython操作 RabbitMQ、Redis、Memcache
Memcached Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度 ...
- day12 Python操作rabbitmq及pymsql
一.rabbitmq介绍 RabbitMQ简介 AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消 ...
- 【Python之路Day12】网络篇之Python操作RabbitMQ
基础知识 分布式系统中,我们广泛运用消息中间件进行系统间的数据交换,便于异步解耦.消息中间件这块在我们前面的学习中,是使用python中的queue模块来提供,但这个模块仅限于在本机的内存中使用,假设 ...
- python操作rabbitmq操作数据
全栈工程师开发手册 (作者:栾鹏) 架构系列文章 ##一.RabbitMQ 消息队列介绍 RabbitMQ也是消息队列,那RabbitMQ和之前python的Queue有什么区别么? py 消息队列: ...
- Python之操作RabbitMQ
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统.他遵循Mozilla Public License开源协议. MQ全称为Message Queue, 消息队列(MQ)是一种应用程序 ...
- python总线 rabbitmq_python - 操作RabbitMQ
介绍 RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统.他遵循Mozilla Public License开源协议. MQ全称为Message Queue, 消息队列(MQ)是一种应 ...
最新文章
- python描述器做权限控制_Python装饰器14-描述器
- 技术图文:Python描述符 (descriptor) 详解
- 50颗传感器、超1亿像素,算力700TOPS,这个自动驾驶平台有点儿炫!
- DDD:用 “四色原型” 进行 “聚合设计”
- iBatis 的插入一个实体
- 红色警报 (25 分)【测试点分析】【两种解法】
- 基于层序+中序遍历序列构建二叉树
- Redis 学习之事务处理
- Atitit 怎么阅读一本书 消化 分析 检索 attilax总结 1. 读书的本质 是数据的处理,大量的数据,处理能力有限的大脑	2 2. ETL数据清洗转换 摘要,缩小数据规模	2 2.1
- comsol光学仿真03
- 20221103使用ffmpeg提取mp4视频的字幕
- winform backgroundWorker 用法
- 在线更换背景网站(白色背景换为蓝色背景证件照)
- 计算机桌面图标底纹,怎样去除桌面图标下的底色[XP系统]【图文教程】
- linux mv中途进程断掉,shell入门
- POJ-3368 Frequent values
- NX二次开发-UFUN创建圆柱UF_MODL_create_cyl1
- R语言lowess函数数据平滑实战(Locally Weighted Regression, Loess)
- 餐饮如何运用人工智能
- 三面向对象分析之UML核心元素之参与者