rabbitmq python amqp user_python 与rabbitmq
一、rabbitmq简介、安装
简介:
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现的产品,遵循Mozilla Public License开源协议,RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收messag。
安装(linux)
1、安装erlang
以root身份执行下面命令
yum install erlang xmlto
2、安装epel源
rpm -ivh http://download.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
wget -O /etc/yum.repos.d/epel-erlang.repo http://repos.fedorapeople.org/repos/peter/erlang/epel-erlang.repo
3、安装rabbitmq rpm包
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.1.5/rabbitmq-server-3.1.5-1.noarch.rpm
rpm -ivh rabbitmq-server-3.1.5-1.noarch.rpm
4、启动rabbitmq,并验证启动情况
rabbitmq-server --detached &ps aux |grep rabbitmq
5、以服务的方式启动
service rabbitmq-server start
6、检查端口5672是否打开
/sbin/iptables -I INPUT -p tcp --dport 5672 -j ACCEPT
/etc/rc.d/init.d/iptables save
/etc/init.d/iptables restart
/etc/init.d/iptables status
7、启用维护插件
rabbitmq-plugins enable rabbitmq_management
8、重启rabbitmq
service rabbitmq-server restart
9、登录
http://192.168.110.60:15672/ 用户名密码 guest
无法登陆解决办法
vim /etc/rabbitmq/rabbitmq.config
写入信息,并保存
[{rabbit, [{loopback_users, []}]}].
其他相关:
1、服务器启动与关闭
启动:service rabbitmq-server start
关闭:service rabbitmq-server stop
重启:service rabbitmq-server restart
2、用户管理
新增 rabbitmqctl add_user admin admin
删除 rabbitmqctl delete_user admin
修改 rabbitmqctl change_password admin admin123
用户列表 rabbitmqctl list_users
设置角色 rabbitmqctl set_user_tags admin administrator monitoring policymaker management
设置用户权限 rabbitmqctl set_permissions -p VHostPath admin ConfP WriteP ReadP
查询所有权限 rabbitmqctl list_permissions [-p VHostPath]
指定用户权限 rabbitmqctl list_user_permissions admin
清除用户权限 rabbitmqctl clear_permissions [-p VHostPath] admin
tips:
设置远程用户密码
创建一个admin用户:rabbitmqctl add_user admin 1234qwer
设置该用户为administrator角色:rabbitmqctl set_user_tags admin administrator
设置权限:rabbitmqctl set_permissions -p '/' admin '.' '.' '.'
重启rabbitmq服务: service rabbitmq-server restart
二、rabbitmq python API
详细的api请查看rabbitmq官网:http://www.rabbitmq.com/devtools.html
安装:pip install pika
1.简单的消费者生产者模型
生产者:
#!/usr/bin/env python3#_*_ coding:utf-8 _*_#Author:wd
importpika
connection=pika.BlockingConnection(pika.ConnectionParameters(
host='10.0.0.241',port=5672)) #创建连接
channel = connection.channel()#建立管道
channel.queue_declare(queue='hello')#声明queue
#n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')print("Sent 'Hello World!'")
connection.close()
product
消费者:
#!/usr/bin/env python3#_*_ coding:utf-8 _*_#Author:wd
importpika
connection=pika.BlockingConnection(pika.ConnectionParameters(
host='10.0.0.241',port=5672))#建立连接
channel = connection.channel()#建立管道
channel.queue_declare(queue='hello')#声明从那个管道接受消息
def callback(ch, method, properties, body):#回调函数,收到消息后执行的函数,body指消息主题
print("[x] Received %r" %body)
channel.basic_consume(callback,
queue='hello',
no_ack=True) #如果设置no_ack=Flase,会把消费的消息重写添加到队列中
print('[*] Waiting for messages.')
channel.start_consuming()#阻塞模式
consumer
2.work模式(轮询)
在这种模式下,RabbitMQ会默认把p发的消息依次分发给连接该条队列的各个消费者(c),跟负载均衡差类似,如果在消费者段设置了no_ack=Flase(默认),也就是确认消息,如果在回调函数中不手动进行确认,那么该消息将一直存在,此时我们需要在回调函数周手动确认消息接收完毕,此时队列中的消息才会被删除。
假如消费者处理消息需要15秒,当消费者断开了,那这个消息处理明显还没处理完,并设置了no_ack=Flase(默认),此时该条消息会发给下一个消费者。
上面的效果消费端断了就转到另外一个消费端去了,但是生产者怎么知道消费端断了呢? 因为生产者和消费者是通过socket连接的,socket断了,就说明消费端断开了。
生产者:
#!/usr/bin/env python3#_*_ coding:utf-8 _*_#Author:wd
importpikaimporttime
connection=pika.BlockingConnection(pika.ConnectionParameters('10.0.0.241'))
channel=connection.channel()#声明queue
channel.queue_declare(queue='task_queue')
message= "Hello World! %s" %time.time()
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
)print("[x] Sent %r" %message)
connection.close()
product
消费者:
#!/usr/bin/env python3#_*_ coding:utf-8 _*_#Author:wd
importpika, time
connection=pika.BlockingConnection(pika.ConnectionParameters('10.0.0.241'))
channel=connection.channel()defcallback(ch, method, properties, body):print("[x] Received %r" %body)
time.sleep(10)print("[x] Done")print("method.delivery_tag", method.delivery_tag)
ch.basic_ack(delivery_tag=method.delivery_tag)#主动向服务器发确认消息,此时delivery_tag为消费消息的tag号
channel.basic_consume(callback,
queue='task_queue',#no_ack=True 如果在回掉函数中手动确认必须把no_ack设置为Flase或者不带该参数
)print('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
consumer
公平的分发消息:
在实际的应用中,每个客户端的消费消息的能力是不一样的,如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。如下图:
消费者:
#!/usr/bin/env python3#_*_ coding:utf-8 _*_#Author:wd
importpikaimporttime
connection=pika.BlockingConnection(pika.ConnectionParameters(
host='10.0.0.241'))
channel=connection.channel()
channel.queue_declare(queue='task_queue')print('[*] Waiting for messages. To exit press CTRL+C')defcallback(ch, method, properties, body):print("[x] Received %r" %body)
time.sleep(body.count(b'.'))print("[x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)#设置消费的条数为1,当当前消费者有一条消息未消费完时,该消费者不会主动接受消息了。
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
按消费能力接受消息
三、消息持久化
当rabbitmq队列中有很多消息,此时rabbitmq server宕机了,会导致数据丢下,那么如何将消息进行持久化呢。分两步:
1.持久化管道:
在生产者和消费者两端声明管道时候加参数:
channel.queue_declare(queue='hello2', durable=True)
2.持久化消息:
在生产者端设置properties参数:
properties=pika.BasicProperties( delivery_mode=2, )# 消息持久化
完整的demo:
生产者:
importpika
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost',5672)) #默认端口5672,可不写
channel =connection.channel()#声明queue
channel.queue_declare(queue='hello2', durable=True)
channel.basic_publish(exchange='',
routing_key='hello2',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2, #make message persistent
)
)print("[x] Sent 'Hello World!'")
connection.close()
product
消费者:
importpikaimporttime
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
channel.queue_declare(queue='hello2', durable=True)defcallback(ch, method, properties, body):print("[x] Received %r" %body)
time.sleep(10)
ch.basic_ack(delivery_tag= method.delivery_tag) #告诉生产者,消息处理完成
channel.basic_qos(prefetch_count=1) #类似权重,按能力分发,如果有一个消息,就不在给你发
channel.basic_consume( #消费消息
callback, #如果收到消息,就调用callback
queue='hello2',#no_ack=True # 一般不写,处理完接收处理结果。宕机则发给其他消费者
)print('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
consumer
四、rabbitmq发布/订阅的三种模式
之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,定义的类型有三种:
fanout: 所有绑定到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic: 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
TIPS:以上三种模式都是广播形式,时时接收,如果消费者不在线该条消息将不会再次接收,类似收音机。
1.fanout
fanout模式是纯广播模式,所有绑定了相同的exchange的消费者都能收到来自生产者的一条消息,收取消息时需要queue和exchange绑定,因为消费者不是和exchange直连的,消费者是连在queue上,queue绑定在exchange上,消费者只会在queu里收消息。如下图:
demo:
发布者:
importpikaimportsys
connection=pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel=connection.channel()#注意:这里是广播,不需要声明queue
channel.exchange_declare(exchange='logs', #声明广播管道
type='fanout')#message = ' '.join(sys.argv[1:]) or "info: Hello World!"
message = "info: Hello World!"channel.basic_publish(exchange='logs',
routing_key='', #注意此处空,必须有
body=message)print("[x] Sent %r" %message)
connection.close()
订阅者:
importpika
connection=pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel=connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')#不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
result = channel.queue_declare(exclusive=True)#获取随机的queue名字
queue_name =result.method.queueprint("random queuename:", queue_name)
channel.queue_bind(exchange='logs', #queue绑定到转发器上
queue=queue_name)print('[*] Waiting for logs. To exit press CTRL+C')defcallback(ch, method, properties, body):print("[x] %r" %body)
channel.basic_consume(callback,
queue=queue_name,)
channel.start_consuming()
2.direct模式
RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列,此时的关键字由参数routing_key指定。模式如下图:
发布者:
#!/usr/bin/env python3#_*_ coding:utf-8 _*_#Author:wd
importpikafrom random importrandint
credentials= pika.PlainCredentials('admin','1234qwer')#使用用户名密码连接
connection =pika.BlockingConnection(pika.ConnectionParameters(
host='10.0.0.241',port=5672,virtual_host='/',credentials=credentials))
channel=connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')#声明type类型
index=randint(0,3)
log_level=['info','wraning','error','nothing']
message='{}--->Hello World!'.format(log_level[index])
channel.basic_publish(exchange='direct_logs',
routing_key=log_level[index], #发消息随机绑定一个关键字
body=message)print("[x] Sent %r:%r" % (log_level[index], message))
订阅者:
#!/usr/bin/env python3#_*_ coding:utf-8 _*_#Author:wd
importpika
credentials= pika.PlainCredentials('admin','1234qwer')#使用用户名密码连接
connection =pika.BlockingConnection(pika.ConnectionParameters(
host='10.0.0.241',port=5672,virtual_host='/',credentials=credentials))
channel=connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result= channel.queue_declare(exclusive=True)#随机生成队列名字,断开后删除
queue_name =result.method.queue#获取运行脚本所有的参数
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key='info')#只绑定了info关键字,接受只接受info关键字的消息
print('[*] Waiting for logs. To exit press CTRL+C')defcallback(ch, method, properties, body):print("[x] %r:%r" %(method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,)
channel.start_consuming()
3.topic(主题)模式
topic相比于dirct而言,提供了更为详细的消息接受规则,可使用*、#等来匹配关键字来接受消息。
发往主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。一些合法的选择键的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你可以定义任何数量的标识符,上限为255个字节。
绑定键和选择键的形式一样。主题类型的转发器背后的逻辑和直接类型的转发器很类似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。需要注意的是:关于绑定键有两种特殊的情况。
*可以匹配一个标识符。
#可以匹配0个或多个标识符。
例如:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
topic消费模式如下图:
demo:
发布者:
#!/usr/bin/env python3#_*_ coding:utf-8 _*_#Author:wd
importpikaimportsys
credentials= pika.PlainCredentials('admin','1234qwer')
connection=pika.BlockingConnection(pika.ConnectionParameters(
host='10.0.0.241',port=5672,virtual_host="/",credentials=credentials))
channel=connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key= sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'message= ' '.join(sys.argv[2:]) or 'Hello World!'channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)print("[x] Sent %r:%r" %(routing_key, message))
connection.close()
订阅者:
#!/usr/bin/env python3#_*_ coding:utf-8 _*_#Author:wd
importpikaimportsys
credentials= pika.PlainCredentials('admin','1234qwer')
connection=pika.BlockingConnection(pika.ConnectionParameters(
host='10.0.0.241',port=5672,virtual_host="/",credentials=credentials))
channel=connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result= channel.queue_declare(exclusive=True)
queue_name=result.method.queue
binding_keys= sys.argv[1:]if notbinding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" %sys.argv[0])
sys.exit(1)for binding_key in binding_keys:#循环绑定routing_key,如果绑定*.info,就接受以.info结尾的routing_key所发的消息。
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)print('[*] Waiting for logs. To exit press CTRL+C')defcallback(ch, method, properties, body):print("[x] %r:%r" %(method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,)
channel.start_consuming()
五、rabbitmq应用场景(简单RPC)
RPC,即 Remote Procedure Call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样。真正的RPC有更为标准的定义,这里我们可以使用rabbitmq来实现简单的RPC模型,其原理图如下:
上述图中,client和server对于rabbitmq来说都具有两个角色,即:即是生产者又是消费者。client端通过生产者角色发送命令,服务端此时充当消费者接受客户端的命令消息,当接受到消息以后又以生产者角色发送命令结果给客户端,此时客户端是消费者接受客户端的消息。
过程:
客户端 Client 设置消息的 routing key 为 Service 的队列 op_q,设置消息的 reply-to 属性为返回的 response 的目标队列 reponse_q,设置其 correlation_id 为以随机UUID,然后将消息发到 exchange。比如channel.basic_publish(exchange='', routing_key='op_q', properties=pika.BasicProperties(reply_to = reponse_q, correlation_id = self.corr_id),body=request)
Exchange 将消息转发到 Service 的 op_q
Service 收到该消息后进行处理,然后将response 发到 exchange,并设置消息的 routing_key 为原消息的 reply_to 属性,以及设置其 correlation_id 为原消息的 correlation_id
ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response))Exchange 将消息转发到 reponse_q
Client 逐一接受 response_q 中的消息,检查消息的 correlation_id 是否为等于它发出的消息的correlation_id,是的话表明该消息为它需要的response。
代码实现:
clinet:
importpikaimportuuidclassFibonacciRpcClient(object):def __init__(self):
self.connection=pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel=self.connection.channel()
result= self.channel.queue_declare(exclusive=True)
self.callback_queue=result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)defon_response(self, ch, method, props, body):if self.corr_id ==props.correlation_id:
self.response=bodydefcall(self, n):
self.response=None
self.corr_id=str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))while self.response isNone:
self.connection.process_data_events()returnint(self.response)
fibonacci_rpc=FibonacciRpcClient()print("[x] Requesting fib(30)")
response= fibonacci_rpc.call(30)print("[.] Got %r" % response)
server:
importpikaimporttime
connection=pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel=connection.channel()
channel.queue_declare(queue='rpc_queue')deffib(n):if n ==0:return0elif n == 1:return 1
else:return fib(n-1) + fib(n-2)defon_request(ch, method, props, body):
n=int(body)print("[.] fib(%s)" %n)
response=fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id =\
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')print("[x] Awaiting RPC requests")
channel.start_consuming()
rabbitmq python amqp user_python 与rabbitmq相关推荐
- rabbitmq python 发送失败_python rabbitmq no_ack=false
发送端:import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel ...
- kafka rabbitmq优劣对比_Kafka、RabbitMQ、RocketMQ等消息中间件的对比
原文链接:Kafka.RabbitMQ.RocketMQ等消息中间件的对比 消息中间件现在有不少,网上很多文章都对其做过对比,在这我对其做进一步总结与整理. RocketMQ 淘宝内部的交易系统使用了 ...
- RabbitMQ(七):RabbitMQ 消费端限流、TTL、死信队列是什么?
消费端限流 1. 为什么要对消费端限流 假设一个场景,首先,我们 Rabbitmq 服务器积压了有上万条未处理的消息,我们随便打开一个消费者客户端,会出现这样情况: 巨量的消息瞬间全部推送过来,但是我 ...
- RabbitMQ狂神说笔记(RabbitMQ B站狂神说笔记、KuangStudy、学相伴飞哥)
一. 引用文章 RabbitMQ狂神说笔记(B站狂神说笔记.KuangStudy.学相伴飞哥) RabbitMQ狂神说笔记(B站狂神说笔记.KuangStudy.学相伴飞哥)百度云盘地址,提取码:07 ...
- [RabbitMQ+Python入门经典] 兔子和兔子窝
RabbitMQ作为一个工业级的消息队列服务器,在其客户端手册列表的Python段当中推荐了一篇blog,作为RabbitMQ+Python的入门手册再合适不过了.不过,正如其标题Rabbit and ...
- [转][RabbitMQ+Python入门经典] 兔子和兔子窝
来源:http://blog.ftofficer.com/2010/03/translation-rabbitmq-python-rabbits-and-warrens/ RabbitMQ作为一个工业 ...
- Python三方库:RabbitMQ基本使用
Python有多种插件都支持RabbitMQ,本文介绍的是RabbitMQ推荐的Pika插件.使用pip直接安装即可 pip install pika . 一.RabbitMQ简介 1. MQ简介 M ...
- python使用pika操作rabbitmq总结
python 连接操作rabbitMQ 主要是使用pika库 安装: pip install pika==1.0.1 注意: pika 1.x 与 pika 0.x 有一些不同,使用的时候需要看清版本 ...
- python使用pika操作rabbitmq总结(一)
python 连接操作rabbitMQ 主要是使用pika库 安装: pip install pika==1.0.1 注意: pika 1.x 与 pika 0.x 有一些不同,使用的时候需要看清版本 ...
最新文章
- 无线渗透--wifiphisher之wifi钓鱼获取wifi密码
- MySQL为关联表添加数据
- 存储器的保护(一)——《x86汇编语言:从实模式到保护模式》读书笔记18
- sender分析之Selector
- java监听器的原理与实现
- opencv计算图像亮度调节_图像数据集增强方式总结和实现【数字图像处理系列四】...
- 从阿尔法狗元(AlphaGo Zero)的诞生看终极算法的可能性
- pm2 启动 Node + TS 项目
- 腐蚀rust电脑分辨率调多少_腐蚀Rust帧数优化指南 游戏FPS提高方法说明
- 谈逻辑与数学界线之淡化
- godaddy不支持java_godaddy主机被墙的解决方案
- 163免费企业邮箱服务地址
- 模型包装,答辩吹牛方法论!
- 批量压缩图片软件 JAVA
- 2020年中山大学CS夏令营
- 录音文件下载_录音转文字app有哪些?录音转文字助手怎么样?
- 【软件安装分享】FME使用三年后的使用体验及任意版本安装教程
- 拒绝服务攻击漏洞-Hash
- 离线安装—Tensorflow教程
- 《社群》思维导图读书笔记精华分享
热门文章
- 这里天刚黑,而家里都已经后半夜了
- 同软件多个线程设置不同ip_中学校园广播-中学IP网络广播系统解决方案
- 【UAV】高度控制代码分析
- 【控制】《鲁棒控制-线性矩阵不等式处理方法》-俞立老师-第8章-鲁棒方差控制
- 【控制】《多智能体机器人系统信息融合与协调》范波老师-第8章-Agent 技术在机器人智能控制系统的应用
- 2.7 Inception 网络-深度学习第四课《卷积神经网络》-Stanford吴恩达教授
- Python whl文件制作简介
- 【SVM】通过SVM对数据进行训练和分类测试,matlab仿真
- 通过MATLAB将数据转化为mif文件,供Quartusii软件的ROM核读取调用
- Ubifs文件系统和mkfs.ubifs以及ubinize工具的用法