上一篇介绍了rabbitmq的安装和经典的hello world!实例。这里将对工作队列(Work Queues)做一个了解。因为是接上一篇说明的,所以如果没看过上一篇,看这篇可能会比较难理解。上一篇的地址是:ubuntu安装rabbitmq和python的使用实现

消息也可以理解为任务,消息发送者可以理解为任务分配者,消息接收者可以理解为工作者,当工作者接收到一个任务,还没完成的时候,任务分配者又发一个任务过来,那就忙不过来了,于是就需要多个工作者来共同处理这些任务,这些工作者,就称为工作队列。结构图如下:

rabbitmq的python实例工作队列

准备工作(Preparation)

在实例程序中,用new_task.py来模拟任务分配者, worker.py来模拟工作者。

修改send.py,从命令行参数里接收信息,并发送

import sys

message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(exchange='',

routing_key='hello',

body=message)

print " [x] Sent %r" % (message,)

修改receive.py的回调函数。

import time

def callback(ch, method, properties, body):

print " [x] Received %r" % (body,)

time.sleep( body.count('.') )

print " [x] Done"

这边先打开两个终端,都运行worker.py,处于监听状态,这边就相当于两个工作者。打开第三个终端,运行new_task.py

$ python new_task.py First message.

$ python new_task.py Second message..

$ python new_task.py Third message...

$ python new_task.py Fourth message....

$ python new_task.py Fifth message.....

观察worker.py接收到任务,其中一个工作者接收到3个任务 :

$ python worker.py

[*] Waiting for messages. To exit press CTRL+C

[x] Received 'First message.'

[x] Received 'Third message...'

[x] Received 'Fifth message.....'

另外一个工作者接收到2个任务 :

$ python worker.py

[*] Waiting for messages. To exit press CTRL+C

[x] Received 'Second message..'

[x] Received 'Fourth message....'

从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。

消息确认(Message acknowledgment)

消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:

def callback(ch, method, properties, body):

print " [x] Received %r" % (body,)

time.sleep(5)

print " [x] Done"

ch.basic_ack(delivery_tag = method.delivery_tag)

这边停顿5秒,可以方便ctrl+c退出。

去除no_ack=True参数或者设置为False也可以。

channel.basic_consume(callback, queue='hello', no_ack=False)

用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。

消息持久化存储(Message durability)

虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:

channel.queue_declare(queue='hello', durable=True)

但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:

channel.queue_declare(queue='task_queue', durable=True)

在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:

channel.basic_publish(exchange='',

routing_key="task_queue",

body=message,

properties=pika.BasicProperties(

delivery_mode = 2, # make message persistent

))

公平调度(Fair dispatch)

上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。

channel.basic_qos(prefetch_count=1)

new_task.py完整代码

#!/usr/bin/env python

import pika

import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(

host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(exchange='',

routing_key='task_queue',

body=message,

properties=pika.BasicProperties(

delivery_mode = 2, # make message persistent

))

print " [x] Sent %r" % (message,)

connection.close()

worker.py完整代码

#!/usr/bin/env python

import pika

import time

connection = pika.BlockingConnection(pika.ConnectionParameters(

host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):

print " [x] Received %r" % (body,)

time.sleep( body.count('.') )

print " [x] Done"

ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,

queue='task_queue')

channel.start_consuming()

python rabitmq_python使用rabbitmq实例二,工作队列相关推荐

  1. python correlation_python使用rabbitmq实例七,相互关联编号correlation id

    上一遍演示了远程结果返回的示例,但是有一个没有提到,就是correlation id,这个是个什么东东呢? 假设有多个计算节点,控制中心开启多个线程,往这些计算节点发送数字,要求计算结果并返回,但是控 ...

  2. (Python第七天)实例二玩转函数

    实现一个程序,将分钟转为小时和分钟,代码中不要使用input()函数,否则挑战测试会卡住,出现Timeout报错 在文件中实现一个函数 Hours(),将用户输入的 分钟数 转化为 小时数和分钟数,并 ...

  3. python 分数序列求和公式_Python分数序列求和,编程练习题实例二十四

    本文是关于Python分数序列求和的应用练习,适合菜鸟练习使用,python大牛绕行哦. Python练习题问题如下: 问题简述:有一分数序列:2/1,3/2,5/3,8/5,13/8,21/13 要 ...

  4. python 爬虫实例-Python 爬虫:Scrapy 实例(二)

    原标题:Python 爬虫:Scrapy 实例(二) 稍微增加点难度,做个所需项目多一点的,并将的结果以多种形式保存起来.我们就从网络天气预报开始. 首先要做的是确定网络天气数据的来源.打开百度,搜索 ...

  5. python rabitmq_python RabbitMQ队列使用

    原博文 2019-01-17 21:17 − python RabbitMQ队列使用 关于python的queue介绍 关于python的队列,内置的有两种,一种是线程queue,另一种是进程queu ...

  6. 轻松搞定RabbitMQ(二)——工作队列之消息分发机制

    上一篇博文中简单介绍了一下RabbitMQ的基础知识,并写了一个经典语言入门程序--HelloWorld.本篇博文中我们将会创建一个工作队列用来在工作者(consumer)间分发耗时任务.同样是翻译的 ...

  7. 《 Python List列表全实例详解系列(二)》__创建列表(5种方式)

    < Python List列表全实例详解系列(二)> __创建列表(5种方式) 上一篇:< Python List 列表全实例详解系列(一)>__系列总目录.列表概念 本章目录 ...

  8. RabbitMQ(二):Work Queues、循环分发、消息确认、持久化、公平分发

    内容翻译自:RabbitMQ Tutorials Java版 RabbitMQ(一):Hello World程序 RabbitMQ(二):Work Queues.循环分发.消息确认.持久化.公平分发 ...

  9. python程序使用RabbitMQ

    目录 一.使用思路 二.Rabbitmq服务器的准备 三.代码示例:python程序中连接RabbitMQ和使用 四.名词解释 一.使用思路 RabbitMQ也称 面向消息的中间件.RabbitMQ以 ...

最新文章

  1. java dateformat类_JAVA--常量池,Date类,SimpleDateFormat类与Calendar类
  2. Python之selenium:selenium库的简介、安装、使用方法之详细攻略
  3. Python之sklearn:GridSearchCV()和fit()函数的简介、具体案例、使用方法之详细攻略
  4. mysql的等号是什么意思_整理思维——等于等于等于?{MySQL条件等号的异常}
  5. angularjs上传文件到服务器,AngularJS:如何使用multipart表单实现简单的文件上传?...
  6. 华为鸿蒙系统后续,不负期待!鸿蒙操作系统将于6月2日正式发布,你的手机更新了吗?...
  7. vue 中indexof_前端小知识-Vue中使用indexOf() 方法
  8. git sync fatal: Authentication failed for https://github.com/ did not exit cleanly (exit code 128)
  9. Chromium OS源码
  10. 关于未来几年的发展,闰土有话要说
  11. 天龙八部服务器维护怎么进去,天龙八部怎么进不去?维护了吗?到什么时候?...
  12. windows2003 php 加速,window_Win 2003 加速****,微软的Windown Server 2003尽管它是 - phpStudy...
  13. 林忆莲:白莲花,红玫瑰
  14. 今晚7:30 | 结构化知识的统一建模和多任务学习
  15. ei拼音的四个声调对应的字_幼儿园学前班拼音教案:复习 ei 以及四声调
  16. Java打印101-150之间所有的质数
  17. OKR 年度规划实践:如何在 2022 年做好准备
  18. 一分钟了解“#include命令是干啥的”
  19. Omnigraffle Pro 注册码/许可证
  20. 高效查询快递物流信息

热门文章

  1. 如何使用COMPUTER VISION将LEPRECHAUN-HATS放入您的网站
  2. javascript指南_JavaScript指南
  3. 学python分析双色球_我通过使用Python分析了80多个工作拒绝而学到的东西
  4. 智能合约怎么创建合约_可出售智能合约的协议
  5. pytorch 全局变量_Pytorch如何通过深度学习展现全局
  6. 小程序开发 宽度100%_这是您作为开发人员可以实现100%年度目标的方式
  7. 浅谈Java的数据结构
  8. 超大规模数据集类的创建
  9. Python老司机总结新手常见10大错误
  10. 列表合并变种题,map()函数扩展