python rabitmq_python使用rabbitmq实例二,工作队列
上一篇介绍了rabbitmq的安装和经典的hello world!实例。这里将对工作队列(Work Queues)做一个了解。因为是接上一篇说明的,所以如果没看过上一篇,看这篇可能会比较难理解。上一篇的地址是:ubuntu安装rabbitmq和python的使用实现
消息也可以理解为任务,消息发送者可以理解为任务分配者,消息接收者可以理解为工作者,当工作者接收到一个任务,还没完成的时候,任务分配者又发一个任务过来,那就忙不过来了,于是就需要多个工作者来共同处理这些任务,这些工作者,就称为工作队列。结构图如下:
rabbitmq的python实例工作队列
准备工作(Preparation)
在实例程序中,用new_task.py来模拟任务分配者, worker.py来模拟工作者。
修改send.py,从命令行参数里接收信息,并发送
import sys
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
print " [x] Sent %r" % (message,)
修改receive.py的回调函数。
import time
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( body.count('.') )
print " [x] Done"
这边先打开两个终端,都运行worker.py,处于监听状态,这边就相当于两个工作者。打开第三个终端,运行new_task.py
$ python new_task.py First message.
$ python new_task.py Second message..
$ python new_task.py Third message...
$ python new_task.py Fourth message....
$ python new_task.py Fifth message.....
观察worker.py接收到任务,其中一个工作者接收到3个任务 :
$ python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Received 'Third message...'
[x] Received 'Fifth message.....'
另外一个工作者接收到2个任务 :
$ python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Second message..'
[x] Received 'Fourth message....'
从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。
消息确认(Message acknowledgment)
消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep(5)
print " [x] Done"
ch.basic_ack(delivery_tag = method.delivery_tag)
这边停顿5秒,可以方便ctrl+c退出。
去除no_ack=True参数或者设置为False也可以。
channel.basic_consume(callback, queue='hello', no_ack=False)
用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。
消息持久化存储(Message durability)
虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:
channel.queue_declare(queue='hello', durable=True)
但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:
channel.queue_declare(queue='task_queue', durable=True)
在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
公平调度(Fair dispatch)
上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。
channel.basic_qos(prefetch_count=1)
new_task.py完整代码
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
print " [x] Sent %r" % (message,)
connection.close()
worker.py完整代码
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( body.count('.') )
print " [x] Done"
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
python rabitmq_python使用rabbitmq实例二,工作队列相关推荐
- python correlation_python使用rabbitmq实例七,相互关联编号correlation id
上一遍演示了远程结果返回的示例,但是有一个没有提到,就是correlation id,这个是个什么东东呢? 假设有多个计算节点,控制中心开启多个线程,往这些计算节点发送数字,要求计算结果并返回,但是控 ...
- (Python第七天)实例二玩转函数
实现一个程序,将分钟转为小时和分钟,代码中不要使用input()函数,否则挑战测试会卡住,出现Timeout报错 在文件中实现一个函数 Hours(),将用户输入的 分钟数 转化为 小时数和分钟数,并 ...
- python 分数序列求和公式_Python分数序列求和,编程练习题实例二十四
本文是关于Python分数序列求和的应用练习,适合菜鸟练习使用,python大牛绕行哦. Python练习题问题如下: 问题简述:有一分数序列:2/1,3/2,5/3,8/5,13/8,21/13 要 ...
- python 爬虫实例-Python 爬虫:Scrapy 实例(二)
原标题:Python 爬虫:Scrapy 实例(二) 稍微增加点难度,做个所需项目多一点的,并将的结果以多种形式保存起来.我们就从网络天气预报开始. 首先要做的是确定网络天气数据的来源.打开百度,搜索 ...
- python rabitmq_python RabbitMQ队列使用
原博文 2019-01-17 21:17 − python RabbitMQ队列使用 关于python的queue介绍 关于python的队列,内置的有两种,一种是线程queue,另一种是进程queu ...
- 轻松搞定RabbitMQ(二)——工作队列之消息分发机制
上一篇博文中简单介绍了一下RabbitMQ的基础知识,并写了一个经典语言入门程序--HelloWorld.本篇博文中我们将会创建一个工作队列用来在工作者(consumer)间分发耗时任务.同样是翻译的 ...
- 《 Python List列表全实例详解系列(二)》__创建列表(5种方式)
< Python List列表全实例详解系列(二)> __创建列表(5种方式) 上一篇:< Python List 列表全实例详解系列(一)>__系列总目录.列表概念 本章目录 ...
- RabbitMQ(二):Work Queues、循环分发、消息确认、持久化、公平分发
内容翻译自:RabbitMQ Tutorials Java版 RabbitMQ(一):Hello World程序 RabbitMQ(二):Work Queues.循环分发.消息确认.持久化.公平分发 ...
- python程序使用RabbitMQ
目录 一.使用思路 二.Rabbitmq服务器的准备 三.代码示例:python程序中连接RabbitMQ和使用 四.名词解释 一.使用思路 RabbitMQ也称 面向消息的中间件.RabbitMQ以 ...
最新文章
- java dateformat类_JAVA--常量池,Date类,SimpleDateFormat类与Calendar类
- Python之selenium:selenium库的简介、安装、使用方法之详细攻略
- Python之sklearn:GridSearchCV()和fit()函数的简介、具体案例、使用方法之详细攻略
- mysql的等号是什么意思_整理思维——等于等于等于?{MySQL条件等号的异常}
- angularjs上传文件到服务器,AngularJS:如何使用multipart表单实现简单的文件上传?...
- 华为鸿蒙系统后续,不负期待!鸿蒙操作系统将于6月2日正式发布,你的手机更新了吗?...
- vue 中indexof_前端小知识-Vue中使用indexOf() 方法
- git sync fatal: Authentication failed for https://github.com/ did not exit cleanly (exit code 128)
- Chromium OS源码
- 关于未来几年的发展,闰土有话要说
- 天龙八部服务器维护怎么进去,天龙八部怎么进不去?维护了吗?到什么时候?...
- windows2003 php 加速,window_Win 2003 加速****,微软的Windown Server 2003尽管它是 - phpStudy...
- 林忆莲:白莲花,红玫瑰
- 今晚7:30 | 结构化知识的统一建模和多任务学习
- ei拼音的四个声调对应的字_幼儿园学前班拼音教案:复习 ei 以及四声调
- Java打印101-150之间所有的质数
- OKR 年度规划实践:如何在 2022 年做好准备
- 一分钟了解“#include命令是干啥的”
- Omnigraffle Pro 注册码/许可证
- 高效查询快递物流信息
热门文章
- 如何使用COMPUTER VISION将LEPRECHAUN-HATS放入您的网站
- javascript指南_JavaScript指南
- 学python分析双色球_我通过使用Python分析了80多个工作拒绝而学到的东西
- 智能合约怎么创建合约_可出售智能合约的协议
- pytorch 全局变量_Pytorch如何通过深度学习展现全局
- 小程序开发 宽度100%_这是您作为开发人员可以实现100%年度目标的方式
- 浅谈Java的数据结构
- 超大规模数据集类的创建
- Python老司机总结新手常见10大错误
- 列表合并变种题,map()函数扩展