最近在做一任务时,遇到需要延迟处理的数据,最开始的做法是现将数据存储在数据库,然后写个脚本,隔五分钟扫描数据表再处理数据,实际效果并不好。因为系统本身一直在用rabbitmq做异步处理任务的中间件,所以想到是否可以利用rabbitmq实现延迟队列。功夫不负有心人,rabbitmq虽然没有现成可用的延迟队列,但是可以利用其两个重要特性来实现之:1、time to live(ttl)消息超时机制;2、dead letter exchanges(dlx)死信队列。下面将具体描述实现原理以及实现代

延迟队列的基础原理time to live(ttl)

rabbitmq可以针对queue设置x-expires 或者 针对message设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)

rabbitmq消息的过期时间有两种方法设置。

通过队列(queue)的属性设置,队列中所有的消息都有相同的过期时间。(本次延迟队列采用的方案)对消息单独设置,每条消息ttl可以不同。

如果同时使用,则消息的过期时间以两者之间ttl较小的那个数值为准。消息在队列的生存时间一旦超过设置的ttl值,就成为死信(dead letter)

dead letter exchanges(dlx)

rabbitmq的queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。

x-dead-letter-exchange:出现死信(dead letter)之后将dead letter重新发送到指定exchange

x-dead-letter-routing-key:出现死信(dead letter)之后将dead letter重新按照指定的routing-key发送

队列中出现死信(dead letter)的情况有:

消息或者队列的ttl过期。(延迟队列利用的特性)

队列达到最大长度

消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false

综合上面两个特性,将队列设置ttl规则,队列ttl过期后消息会变成死信,然后利用dlx特性将其转发到另外的交换机和队列就可以被重新消费,达到延迟消费效果。

延迟队列设计及实现(python)

从上面描述,延迟队列的实现大致分为两步:

产生死信,有两种方式per-message ttl和 queue ttl,因为我的需求中是所有的消息延迟处理时间相同,所以本实现中采用 queue ttl设置队列的ttl,如果需要将队列中的消息设置不同的延迟处理时间,则设置per-message ttl()

设置死信的转发规则,dead letter exchanges设置方法()

完整代码如下:

"""

created on fri aug 3 17:00:44 2018

@author: bge

"""

import pika,json,logging

class rabbitmqclient:

def __init__(self, conn_str='amqp://user:pwd@host:port/%2f'):

self.exchange_type = "direct"

self.connection_string = conn_str

self.connection = pika.blockingconnection(pika.urlparameters(self.connection_string))

self.channel = self.connection.channel()

self._declare_retry_queue() #retryqueue and retryexchange

logging.debug("connection established")

def close_connection(self):

self.connection.close()

logging.debug("connection closed")

def declare_exchange(self, exchange):

self.channel.exchange_declare(exchange=exchange,

exchange_type=self.exchange_type,

durable=true)

def declare_queue(self, queue):

self.channel.queue_declare(queue=queue,

durable=true,)

def declare_delay_queue(self, queue,dlx='retryexchange',ttl=60000):

"""

创建延迟队列

:param ttl: ttl的单位是us,ttl=60000 表示 60s

:param queue:

:param dlx:死信转发的exchange

:return:

"""

arguments={}

if dlx:

#设置死信转发的exchange

arguments[ 'x-dead-letter-exchange']=dlx

if ttl:

arguments['x-message-ttl']=ttl

print(arguments)

self.channel.queue_declare(queue=queue,

durable=true,

arguments=arguments)

def _declare_retry_queue(self):

"""

创建异常交换器和队列,用于存放没有正常处理的消息。

:return:

"""

self.channel.exchange_declare(exchange='retryexchange',

exchange_type='fanout',

durable=true)

self.channel.queue_declare(queue='retryqueue',

durable=true)

self.channel.queue_bind('retryqueue', 'retryexchange','retryqueue')

def publish_message(self,routing_key, msg,exchange='',delay=0,ttl=none):

"""

发送消息到指定的交换器

:param exchange: rabbitmq交换器

:param msg: 消息实体,是一个序列化的json字符串

:return:

"""

if delay==0:

self.declare_queue(routing_key)

else:

self.declare_delay_queue(routing_key,ttl=ttl)

if exchange!='':

self.declare_exchange(exchange)

self.channel.basic_publish(exchange=exchange,

routing_key=routing_key,

body=msg,

properties=pika.basicproperties(

delivery_mode=2,

type=exchange

))

self.close_connection()

print("message send out to %s" % exchange)

logging.debug("message send out to %s" % exchange)

def start_consume(self,callback,queue='#',delay=1):

"""

启动消费者,开始消费rabbitmq中的消息

:return:

"""

if delay==1:

queue='retryqueue'

else:

self.declare_queue(queue)

self.channel.basic_qos(prefetch_count=1)

try:

self.channel.basic_consume( # 消费消息

callback, # 如果收到消息,就调用callback函数来处理消息

queue=queue, # 你要从那个队列里收消息

)

self.channel.start_consuming()

except keyboardinterrupt:

self.stop_consuming()

def stop_consuming(self):

self.channel.stop_consuming()

self.close_connection()

def message_handle_successfully(channel, method):

"""

如果消息处理正常完成,必须调用此方法,

否则rabbitmq会认为消息处理不成功,重新将消息放回待执行队列中

:param channel: 回调函数的channel参数

:param method: 回调函数的method参数

:return:

"""

channel.basic_ack(delivery_tag=method.delivery_tag)

def message_handle_failed(channel, method):

"""

如果消息处理失败,应该调用此方法,会自动将消息放入异常队列

:param channel: 回调函数的channel参数

:param method: 回调函数的method参数

:return:

"""

channel.basic_reject(delivery_tag=method.delivery_tag, requeue=false)

发布消息代码如下:

from mq.rabbitmq import rabbitmqclient

print("start program")

client = rabbitmqclient()

msg1 = '{"key":"value"}'

client.publish_message('test-delay',msg1,delay=1,ttl=10000)

print("message send out")

消费者代码如下:

from mq.rabbitmq import rabbitmqclient

import json

print("start program")

client = rabbitmqclient()

def callback(ch, method, properties, body):

msg = body.decode()

print(msg)

# 如果处理成功,则调用此消息回复ack,表示消息成功处理完成。

rabbitmqclient.message_handle_successfully(ch, method)

queue_name = "retryqueue"

client.start_consume(callback,queue_name,delay=0)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持萬仟网。

希望与广大网友互动??

点此进行留言吧!

python延时队列_如何通过Python实现RabbitMQ延迟队列相关推荐

  1. RabbitMQ 延迟队列-对于入门来说可以快速上手

    RabbitMQ 延迟队列-非常非常实用 RabbitMQ 延迟队列-非常非常实用 一.使用场景 二.消息延迟推送的实现 三.项目具体实现 RabbitMQ 延迟队列-非常非常实用 一.使用场景 ​ ...

  2. RabbitMQ 延迟队列实现定时任务的正确姿势,你学会了么?

    以下文章来源方志朋的博客,回复"666"获面试宝典 场景 开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期.订单定时关闭.微信支付2小时未支付关闭订单等 ...

  3. rabbitmq 延迟队列_Delayed Message 插件实现 RabbitMQ 延迟队列

    延迟队列是为了存放那些延迟执行的消息,待消息过期之后消费端从队列里拿出来执行. DLX + TTL 方式存在的时序问题 对于延迟队列不管是 AMQP 协议或者 RabbitMQ 本身是不支持的,之前有 ...

  4. Delayed Message 插件实现 RabbitMQ 延迟队列

    延迟队列是为了存放那些延迟执行的消息,待消息过期之后消费端从队列里拿出来执行. DLX + TTL 方式存在的时序问题 对于延迟队列不管是 AMQP 协议或者 RabbitMQ 本身是不支持的,之前有 ...

  5. 谷粒商城笔记+踩坑(22)——库存自动解锁。RabbitMQ延迟队列

    导航: 谷粒商城笔记+踩坑汇总篇 目录 1 业务流程,订单失败后自动回滚解锁库存 可靠消息+最终一致性方案 2[仓库服务]RabbitMQ环境准备 2.1 导入依赖 2.2 yml配置RabbitMQ ...

  6. python len函数_知识清单Python必备的69个函数,你掌握了吗?

    本文纲要 Python 作为一门高级编程语言,为我们提供了许多方便易用的内置函数,节省了不少开发应用的时间.目前,Python 3.7 共有 69 个内置函数,一些是我们耳熟能详的函数,另一些却不是很 ...

  7. python优化网站_[练习] 用PYTHON来优化网站中的图片

    我到公司以来,第一次加班,哇,加一晚上加一上午,现在还没下班的迹象,555,困. 对于网站中的一些关键的页面,多重缓存.静态化.程序代码优化--之外,为了提高用户打开页面的速度,图片是必须要优化的. ...

  8. python递归函数例题_递归案例python

    广告关闭 腾讯云11.11云上盛惠 ,精选热门产品助力上云,云服务器首年88元起,买的越多返的越多,最高返5000元! 而对应的中文翻译 "递归" 却表达了两个意思:"递 ...

  9. 网易图灵学院python公开课_图灵学院 Python全系列教程全栈工程师 python视频教程下载...

    大家怎么说? 老师很好,我认为,若想学好python,应该多练.多想.多看.学习资料不能仅限于老师给定的这些内容,这些毕竟是入门资料 老师讲的真不错,对于我们这种小白来说 也比较容易懂,虽然有些时候自 ...

最新文章

  1. GNU make manual 翻译( 一百一十三)
  2. python文件排序
  3. LeetCode10.正则表达式匹配 JavaScript
  4. uni-app实现上拉加载更多
  5. idea工具debug断点红色变成灰色
  6. Python sum函数- Python零基础入门教程
  7. php 参数 只用一次,php中,用函数,如果有很多个参数,只使用最后一个参数,有什么优雅的写法?...
  8. 11gR2conceptes Memory Architecture中文翻译
  9. android toast 自定义时间,android自定义Toast设定显示时间
  10. python commands执行不连续_python中的commands模块,执行出错:'{' 不是内部或外部命令,也不是可运行的程序 或批处理文件。...
  11. IOS 学习笔记 2015-04-10 OC-常用常量
  12. Unity-WebGL-打包流程以及遇到的各种坑
  13. 计算机科学与技术专业成功人士,我校2002级计算机科学与技术专业校友重返母校...
  14. (Xcode)ipa上传APP Store鉴定报错
  15. 解决memoryerror
  16. 详细解读WordNet计算相似度的几种方法
  17. 数据结构-列出连通集(图的操作)
  18. 纳斯达克的区块链野望
  19. oh god job
  20. window.close

热门文章

  1. 模式识别与机器学习笔记(二)机器学习的基础理论
  2. Android之Bundle类
  3. 解决JPA的枚举局限性
  4. 并查集算法c语言版,并查集及其C程序实现.doc
  5. 在哪里可以找水系图_虹吸雨水排水系统对比传统重力排水,好在哪里?
  6. 在Eclipse中使用Git
  7. tensorflow 如何获取模型中想要的张量
  8. iPhone各版本屏幕尺寸
  9. python读取命令行输入-python获取命令行输入参数列表
  10. Linux的实际操作:权限管理(chmod)