python消息队列中间件_python-RabbtiMQ消息队列
1.RabbitMQ简介
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
2.RabbitMQ能为你做些什么?
消息系统允许软件、应用相互连接和扩展.这些应用可以相互链接起来组成一个更大的应用,或者将用户设备和数据进行连接.消息系统通过将消息的发送和接收分离来实现应用程序的异步和解偶.
或许你正在考虑进行数据投递,非阻塞操作或推送通知。或许你想要实现发布/订阅,异步处理,或者工作队列。所有这些都可以通过消息系统实现。
RabbitMQ是一个消息代理- 一个消息系统的媒介。它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全。
3.RabbitMQ 安装使用
4.Python应用RabbitMQ
python操作RabbitMQ的模块有三种:pika,Celery,Haigha。
本文使用的是pika。
"""RabbitMQ-生产者。"""
importpika"""声明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""声明一个管道"""channel=connection.channel()"""定义一个queue,定义queue名称,标识"""channel.queue_declare(queue='hello')"""定义queue中的消息内容"""channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')print("[x] Sent 'Hello World!'")
"""RabbitMQ-消费者。"""
importpika"""声明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""声明一个管道"""channel=connection.channel()"""定义一个queue,定义queue名称,标识,与生产者队列中对应"""channel.queue_declare(queue='hello')defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)"""消费,接收消息..."""channel.basic_consume(
consumer_callback=callback, #如果收到消息,则回调这个函数处理消息
queue='hello', #queue_declare(queue='hello') 对应
no_ack=True
)"""消费者会一直监听这queue,如果队列中没有消息,则会卡在这里,等待消息队列中生成消息。"""
print('waiting for meassages, to exit press CTRL+C')
channel.start_consuming()
5.RabbitMQ消息持久化
importpika
queue_name= 'xiaoxi_'
"""声明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""声明一个管道"""channel=connection.channel()"""定义一个queue,定义queue名称,标识
queue,durable 持久化"""channel.queue_declare(queue=queue_name)whileTrue:
input_value= input(">>:").strip()ifinput_value:"""定义queue中的消息内容"""
print('producer messages:{0}'.format(input_value))
channel.basic_publish(
exchange='',
routing_key=queue_name,
body=input_value,
properties=pika.BasicProperties( #消息持久化.....
delivery_mode=2,
)
)continue
producer.py
importpika,time
queue_name= 'xiaoxi_'
"""声明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""声明一个管道"""channel=connection.channel()"""定义一个queue,定义queue名称,标识"""channel.queue_declare(queue=queue_name)defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)#time.sleep(5) # 模拟消费者丢失生产者发送的消息,生产者消息队列中的这一条消息则不会删除。
print('rev messages-->',body)"""手动向生产者确认收到消息"""
#ch.basic_ack(delivery_tag=method.delivery_tag)
"""消费,接收消息..."""channel.basic_consume(
consumer_callback=callback, #如果收到消息,则回调这个函数处理消息
queue=queue_name,#no_ack=True #接收到消息,主动向生产者确认已经接收到消息。
)print('waiting for meassages, to exit press CTRL+C')
channel.start_consuming()
consumer.py
6.RabbitMQ消息公平分发
importpika
queue_name= 'xiaoxi_1'
"""声明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""声明一个管道"""channel=connection.channel()"""定义一个queue,定义queue名称,标识
queue,durable 持久化"""channel.queue_declare(queue=queue_name)whileTrue:
input_value= input(">>:").strip()ifinput_value:"""定义queue中的消息内容"""
print('producer messages:{0}'.format(input_value))
channel.basic_publish(
exchange='',
routing_key=queue_name,
body=input_value,
)continue
producer.py
importpika,time
queue_name= 'xiaoxi_1'
"""声明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""声明一个管道"""channel=connection.channel()"""定义一个queue,定义queue名称,标识
queue,durable 持久化"""channel.queue_declare(queue=queue_name)defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)"""模拟处理消息快慢速度"""time.sleep(1)
ch.basic_ack(delivery_tag=method.delivery_tag)"""根据消费者处理消息的快慢公平分发消息"""channel.basic_qos(prefetch_count=1)"""消费,接收消息..."""channel.basic_consume(
consumer_callback=callback, #如果收到消息,则回调这个函数处理消息
queue=queue_name,#no_ack=True #接收到消息,主动向生产者确认已经接收到消息。
)print('waiting for meassages, to exit press CTRL+C')
channel.start_consuming()
consumer.py
7.RabbitMQ-广播模式。
消息的发送模式类型1.fanout: 所有bind到此exchange的queue都可以接收消息 即是广播模式,所有的consumer都能收到。2.direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息 ,指定唯一的。3.topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。符合条件的。
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
4.headers: 通过headers 来决定把消息发给哪些queue (少用)
7.1 topic 广播模式。
importpika"""声明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""声明一个管道"""channel=connection.channel()"""通过routingKey和exchange决定的那个唯一的queue可以接收消息 ,指定唯一的。"""exchange_name= 'topic_messages1'routing_key= 'my_topic'
"""定义exchage模式 direct广播模式"""channel.exchange_declare(exchange=exchange_name,exchange_type='topic')"""消息的发送模式类型
1.fanout: 所有bind到此exchange的queue都可以接收消息 即是广播模式,所有的consumer都能收到。
2.direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息 ,指定唯一的。
3.topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。符合条件的。
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
4.headers: 通过headers 来决定把消息发给哪些queue (少用)"""
whileTrue:
input_value= input(">>:").strip()ifinput_value:"""定义queue中的消息内容"""
print('producer messages:{0}'.format(input_value))
channel.basic_publish(
exchange=exchange_name,
routing_key=routing_key,
body=input_value,
)continue
producer.py
importpika,time"""声明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""声明一个管道"""channel=connection.channel()"""通过routingKey和exchange决定的那个唯一的queue可以接收消息 ,指定唯一的。"""exchange_name= 'topic_messages1'routing_key= 'my_topic'channel.exchange_declare(exchange=exchange_name,exchange_type='topic')"""不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除"""res= channel.queue_declare(exclusive=True)
queue_name=res.method.queue
channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key=routing_key)print('direct_key:{0}'.format(routing_key))print('queue_name:{0}'.format(queue_name))defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)
ch.basic_ack(delivery_tag=method.delivery_tag)"""消费,接收消息..."""channel.basic_consume(
consumer_callback=callback, #如果收到消息,则回调这个函数处理消息
queue=queue_name,
)print('waiting for meassages, to exit press CTRL+C')
channel.start_consuming()
consumer.py
7.2 direct 广播模式
importpika
connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel=connection.channel()"""通过routingKey和exchange决定的那个唯一的queue可以接收消息 ,指定唯一的。"""exchange_name= 'direct_messages'routing_key= 'my_direct'
"""定义exchage模式 direct广播模式
消息的发送模式类型
1.fanout: 所有bind到此exchange的queue都可以接收消息 即是广播模式,所有的consumer都能收到。
2.direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息 ,指定唯一的。
3.topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。符合条件的。
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
4.headers: 通过headers 来决定把消息发给哪些queue (少用)"""channel.exchange_declare(exchange=exchange_name,exchange_type='direct')
channel.basic_publish(
exchange=exchange_name,
routing_key=routing_key,
body='hello word!',
)#while True:#input_value = input(">>:").strip()#if input_value:#"""定义queue中的消息内容"""#print('producer messages:{0}'.format(input_value))#channel.basic_publish(#exchange=exchange_name,#routing_key=routing_key,#body=input_value,#)#continue
producer.py
importpika,time
connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel=connection.channel()
exchange_name= 'direct_messages'routing_key= 'my_direct'channel.exchange_declare(exchange=exchange_name,exchange_type='direct')"""不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除"""res= channel.queue_declare(exclusive=True)
queue_name=res.method.queue
channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key=routing_key)print('direct_key:{0}'.format(routing_key))print('queue_name:{0}'.format(queue_name))defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)
ch.basic_ack(delivery_tag=method.delivery_tag)"""消费,接收消息..."""channel.basic_consume(
consumer_callback=callback, #如果收到消息,则回调这个函数处理消息
queue=queue_name,
)print('waiting for meassages, to exit press CTRL+C')
channel.start_consuming()
consumer.py
7.3 fanout 广播模式
importpika"""声明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""声明一个管道"""channel=connection.channel()
exchange_name= 'messages'
"""定义exchage模式 fanout广播模式"""channel.exchange_declare(exchange=exchange_name,exchange_type='fanout')"""消息的发送模式类型
1.fanout: 所有bind到此exchange的queue都可以接收消息 即是广播模式,所有的consumer都能收到。
2.direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息 ,指定唯一的。
3.topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。符合条件的。
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
4.headers: 通过headers 来决定把消息发给哪些queue (少用)"""
whileTrue:
input_value= input(">>:").strip()ifinput_value:"""定义queue中的消息内容"""
print('producer messages:{0}'.format(input_value))
channel.basic_publish(
exchange=exchange_name,
routing_key='',
body=input_value,
)continue
producer.py
importpika,time"""声明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""声明一个管道"""channel=connection.channel()"""
"""exchange_name= 'messages'channel.exchange_declare(exchange=exchange_name,exchange_type='fanout')"""不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除"""res= channel.queue_declare(exclusive=True)
queue_name=res.method.queue
channel.queue_bind(exchange=exchange_name,queue=queue_name)"""每一个消费者随机一个唯一的queue_name"""
print('queue_name:{0}',format(queue_name))defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)
ch.basic_ack(delivery_tag=method.delivery_tag)"""消费,接收消息..."""channel.basic_consume(
consumer_callback=callback, #如果收到消息,则回调这个函数处理消息
queue=queue_name,#no_ack=True #接收到消息,主动向生产者确认已经接收到消息。
)print('waiting for meassages, to exit press CTRL+C')
channel.start_consuming()
consumer.py
8 RabbitMQ 实现 RPC
"""RabbitMQ-生产者。
利用rabbitMQ 实现一个能收能发的RPC小程序。
重点需要注意的是:queue的绑定。接收的一端必选预先绑定queue生成队列,发送端才能根据queue发送。"""
importpika,uuid,timeclassrabbitmqClient(object):def __init__(self,rpc_queue):
self.rpc_queue=rpc_queue
self.app_id=str(uuid.uuid4())
self.connection= pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel=self.connection.channel()"""生成一个自动queue,传过去server,server再往这个自动queue回复数据"""autoqueue= self.channel.queue_declare(exclusive=True)
self.callback_queue=autoqueue.method.queue"""先定义一个接收回复的动作"""self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)defon_response(self,ch,method,properties,body):if self.app_id ==properties.app_id:
self.response=bodydefsend(self,msg):
self.response=None
self.channel.basic_publish(
exchange='',
routing_key=self.rpc_queue,
properties=pika.BasicProperties(
reply_to=self.callback_queue,
app_id=self.app_id,
),
body=str(msg)
)#发送完消息,进入接收模式。
while self.response isNone:#print('callback_queue:{0} app_id:{1} wait...'.format(self.callback_queue,self.app_id))
self.connection.process_data_events()#time.sleep(0.5)
returnself.response
rpc_request_queue= 'rpc_request_queue'rb=rabbitmqClient(rpc_request_queue)whileTrue:
msg= input('input >> :').strip()ifmsg:print('rpc_queue:{0} app_id:{1}'.format(rb.rpc_queue,rb.app_id))print('send msg:{}'.format(msg))
reponses=rb.send(msg)print('reponses msg:{}'.format(reponses.decode('utf-8')))continue
client.py
"""RabbitMQ-消费者。"""
importpikaclassrabbitmqServer(object):def __init__(self,rpc_queue):
self.rpc_queue=rpc_queue
self.connection= pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel=self.connection.channel()
self.channel.queue_declare(queue=self.rpc_queue)defon_reponses(self,ch,method,properties,body):ifbody:#reponser ...
ch.basic_publish(exchange='',
routing_key=properties.reply_to,
properties=pika.BasicProperties(
reply_to=properties.reply_to,
app_id=properties.app_id,
),
body='reponses ok! msg is:{}'.format(body.decode('utf-8')))defstart_consuming(self):
self.channel.basic_consume(consumer_callback=self.on_reponses,queue=self.rpc_queue,no_ack=True)print('waiting for meassages, to exit press CTRL+C')
self.channel.start_consuming()
rpc_request_queue= 'rpc_request_queue'rd_server=rabbitmqServer(rpc_request_queue)
rd_server.start_consuming()
server.py
python消息队列中间件_python-RabbtiMQ消息队列相关推荐
- python多线程队列处理_Python线程和队列使用的一点思考
Python线程和队列使用的一点思考 1. 斗哥采访环节请问为什么要使用线程? 答:为了提高程序速度,代码效率呀. 请问为什么要使用队列? 答:个人认为队列可以保证线程安全,实现线程间的同步,比较稳. ...
- python 队列实现_python中实现队列的queue模块
python中的queue模块提供了同步的.线程安全的队列类,包括FIFO(先进先出)的Queue类和LIFO(后进先出,栈结构)LifoQueue类和优先队列PriorityQueue类,它们都实现 ...
- python websocket实现消息推送_Python Websocket消息推送---GoEasy
Goeasy, 它是一款第三方推送服务平台,使用它的API可以轻松搞定实时推送!个人感觉goeasy推送更稳定,推送 速度快,代码简单易懂上手快 浏览器兼容性:GoEasy推送 支持websocket ...
- python链表实现栈_python实现链表队列栈
#!/usr/bin/python # -*- coding: utf-8 -*- #便于测试 data 使用数字 class Node(object): def __init__(self,data ...
- python队列实现_Python 数据结构之队列的实现
Python 队列 Queue 队列是一种先进先出(FIFO)的数据类型, 新的元素通过 入队 的方式添加进 Queue 的末尾, 出队 就是从 Queue 的头部删除元素. 用列表来做 Queue: ...
- python队列溢出_python – 多处理队列maxsize限制是32767
我正在尝试使用多处理编写Python 2.6(OSX)程序,并且我想填充一个超过默认值32767项的Queue. from multiprocessing import Queue Queue(2** ...
- python消息框设置_Python Tkinter消息框(附带实例讲解)
在 messagebox 模块下提供了大量工具函数来生成各种消息框,这些消息框的结构大致如图 1 所示. 图 1 消息框的结构 在默认情况下,开发者在调用 messagebox 的工具函数时只要设置提 ...
- python 消息中间件_消息队列中间件 RabbitMQ 详细介绍——安装与基本应用(Python)...
RabbitMQ 是当前最流行的消息中间件(Message Broker)之一,支持多种消息协议(如 AMQP.MQTT). 同时它也是一个轻量级的非常易于部署的开源软件,可以运行在当前大多数操作系统 ...
- python消息队列中间件_常见的消息队列中间件介绍
题目 为什么使用消息队列? 消息队列有什么优点和缺点? Kafka.ActiveMQ.RabbitMQ.RocketMQ 都有什么区别,以及适合哪些场景? 什么是消息队列 在正式介绍和对比Kafka. ...
最新文章
- 45个优秀的国外电子商务网站设计实例
- linux+分配挂载点权限,Linux系统管理(一)——初学者建议
- Spring Cloud Config 和Spring Cloud Bus实现配置中心
- PowerDesigner的汉化破解安装到逆向工程(ORACLE)
- 2021牛客暑期多校训练营9
- 在 Confluence 6 中禁用 workbox 应用通知
- 斯坦福李纪为博士毕业论文:让机器像人一样交流
- 百度搜索遭遇“假德邦” 宣判结果来了...
- OpenShift 4 - 使用教程和免费试用环境
- 怎样在Xcode 4下编译发布与提交App到AppStore?(转)
- PDF拆分页面,免费拆分为多个PDF
- 优色专显教你led显示屏诺瓦刷屏教程
- 试题 算法提高 盾神与积木游戏
- excel-中心趋势-基本函数-离散程度-四分位数QUARTILE.EXC-画箱线图-标准差
- HIVE最全面入门指南
- ES-JOB——分布式定时任务高级使用——控制台修改任务
- Java:使用POI实现word的docx文件的模板功能
- uniapp开发技术
- java中singleton_java中singleton的几种实现方式
- python打包程序在win10不能运行、点击无反应_win10双击安装包没反应,win10运行exe无反应...