延迟队列是为了存放那些延迟执行的消息,待消息过期之后消费端从队列里拿出来执行。

DLX + TTL 方式存在的时序问题

对于延迟队列不管是 AMQP 协议或者 RabbitMQ 本身是不支持的,之前有介绍过如何使用 RabbitMQ 死信队列(DLX) + TTL 的方式来模拟实现延迟队列,这也是通常的一种做法,可参见我的另一篇文章 利用 RabbitMQ 死信队列和 TTL 实现定时任务。

今天我想说的是这种方式会存在一个时序问题,看下图:

左侧队列 queue1 分别两条消息 msg1、msg2 过期时间都为 1s,输出顺序为 msg1、msg2 是没问题的

右侧队列 queue2 分别两条消息 msg1、msg2 注意问题来了,msg2 的消息过期时间为 1S 而 msg1 的消息过期为 2S,你可能想谁先过期就谁先消费呗,显然不是这样的,因为这是在同一个队列,必须前一个消费,第二个才能消费,所以就出现了时序问题

如果你的消息过期时间是有规律的,例如,有的 1S、有的 2S,那么我们可以以时间为维度设计为两个队列,如下所示:

上面我们将 1S 过期的消息拆分为队列 queue_1s,2S 过期的消息拆分为队列 queue_2s,事情得到进一步解决。如果此时消息的过期时间不确定或者消息过期时间维度过多,在消费端我们就要去监听多个消息队列且对于消息过期时间不确定的也是很难去设计的。

针对消息无序的不妨看下以下解决方案。

Delayed Message 插件

这里要感谢 @神奇的包子,掘金(juejin.im/user/5bfc1b9d6fb9a049b347a9e2) 提出的 Delayed Message 插件方案。

这里将使用的是一个 RabbitMQ 延迟消息插件 rabbitmq-delayed-message-exchange,目前维护在 RabbitMQ 插件社区,我们可以声明 x-delayed-message 类型的 Exchange,消息发送时指定消息头 x-delay 以毫秒为单位将消息进行延迟投递。

实现原理

上面使用 DLX + TTL 的模式,消息首先会路由到一个正常的队列,根据设置的 TTL 进入死信队列,与之不同的是通过 x-delayed-message 声明的交换机,它的消息在发布之后不会立即进入队列,先将消息保存至 Mnesia(一个分布式数据库管理系统,适合于电信和其它需要持续运行和具备软实时特性的 Erlang 应用。目前资料介绍的不是很多)

这个插件将会尝试确认消息是否过期,首先要确保消息的延迟范围是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被设置的范围为 (2^32)-1 毫秒),如果消息过期通过 x-delayed-type 类型标记的交换机投递至目标队列,整个消息的投递过程也就完成了。

插件安装

根据你的 RabbitMQ 版本来安装相应插件版本,RabbitMQ community-plugins 上面有版本对应信息可参考。

注意:需要 RabbitMQ 3.5.3 和更高版本。

# 注意要下载至你的 RabbitMQ 服务器的 plugins 目录下,例如:/usr/local/rabbitmq/plugins

wget https://dl.bintray.com/rabbitmq/community-plugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip

# 解压unzip rabbitmq_delayed_message_exchange-20171215-3.6.x.zip

# 解压之后得到如下文件rabbitmq_delayed_message_exchange-20171215-3.6.x.ez

启用插件

使用 rabbitmq-plugins enable 命令启用插件,启动成功会看到如下提示:

$ rabbitmq-plugins enable rabbitmq_delayed_message_exchangeThe following plugins have been enabled:  rabbitmq_delayed_message_exchange

Applying plugin configuration to rabbit@xxxxxxxx... started 1 plugin.

管理控制台声明 x-delayed-message 交换机

在开始代码之前先打开 RabbitMQ 的管理 UI 界面,声明一个 x-delayed-message 类型的交换机,否则你会遇到下面的错误:

Error: Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - Invalid argument, 'x-delayed-type' must be an existing exchange type"

这个问题困扰我了一会儿,详情可见 Github Issues rabbitmq-delayed-message-exchange/issues/19,正确操作如下图所示:

Nodejs 代码实践

上面准备工作完成了,开始我们的代码实践吧,官方没有提供 Nodejs 示例,只提供了 Java 示例,对于一个写过 Spring Boot 项目的 Nodeer 这不是问题(此处,兄得你有点飘了啊 )其实如果有时间能多了解点些,你会发现还是有益的。

构建生产者

几个注意点:

  • 交换机类型一定要设置为 x-delayed-message
  • 设置 x-delayed-type 为 direct,当然也可以是 topic 等
  • 发送消息时设置消息头 headers 的 x-delay 属性,即延迟时间,如果不设置消息将会立即投递
const amqp = require('amqplib');

async function producer(msg, expiration) {try {const connection = await amqp.connect('amqp://localhost:5672');const exchange = 'my-delayed-exchange';const exchangeType = 'x-delayed-message'; // x-delayed-message 交换机的类型const routingKey = 'my-delayed-routingKey';

const ch = await connection.createChannel();await ch.assertExchange(exchange, exchangeType, {durable: true,'x-delayed-type': 'direct'        });

console.log('producer msg:', msg);await ch.publish(exchange, routingKey, Buffer.from(msg), {headers: {'x-delay': expiration, // 一定要设置,否则无效            }        });

        ch.close();    } catch(err) {console.log(err)    }}

producer('msg0 1S Expire', 1000) // 1Sproducer('msg1 30S Expire', 1000 * 30) // 30Sproducer('msg2 10S Expire', 1000 * 10) // 10Sproducer('msg3 5S Expire', 1000 * 5) // 5S

构建消费端

消费端改变不大,交换机声明处同生产者保持一样,设置交换机类型(x-delayed-message)和 x-delayed-type

const amqp = require('amqplib');

async function consumer() {const exchange = 'my-delayed-exchange';const exchangeType = 'x-delayed-message';const routingKey = 'my-delayed-routingKey';const queueName = 'my-delayed-queue';

try {const connection = await amqp.connect('amqp://localhost:5672');const ch = await connection.createChannel();

await ch.assertExchange(exchange, exchangeType, {durable: true,'x-delayed-type': 'direct'        });await ch.assertQueue(queueName);await ch.bindQueue(queueName, exchange, routingKey);await ch.consume(queueName, msg => {console.log('consumer msg:', msg.content.toString());        }, { noAck: true });    } catch(err) {console.log('Consumer Error: ', err);    }}

consumer()

以上示例源码地址:

https://github.com/Q-Angelo/project-training/tree/master/rabbitmq/rabbitmq-delayed-message-node

最后,让我们对以上程序做个测试,左侧窗口展示了生产端信息,右侧窗口展示了消费端信息,这次实现了同一个队列里不同过期时间的消息,可以按照我们预先设置的 TTL 时间顺序性消费,我们的目的达到了。

局限性

Delayed Message 插件实现 RabbitMQ 延迟队列这种方式也不完全是一个银弹,它将延迟消息存在于 Mnesia 表中,并且在当前节点上具有单个磁盘副本,它们将在节点重启之后幸存。

目前该插件的当前设计并不真正适合包含大量延迟消息(例如数十万或数百万)的场景,详情参见 #/issues/72 另外该插件的一个可变性来源是依赖于 Erlang 计时器,在系统中使用了一定数量的长时间计时器之后,它们开始争用调度程序资源,并且时间漂移不断累积。

插件的禁用要慎重,以下方式可以实现将插件禁用,但是注意如果此时还有延迟消息未消费,那么禁掉此插件后所有的未消费的延迟消息将丢失。

rabbitmq-plugins disable rabbitmq_delayed_message_exchange

如果你采用了 Delayed Message 插件这种方式来实现,对于消息可用性要求较高的,在发现消息之前可以先落入 DB 打标记,消费之后将消息标记为已消费,中间可以加入定时任务做检测,这可以进一步保证你的消息的可靠性。

总结

经过一番实践测试、学习之后发现,DLX + TTLDelayed Message 插件这两种 RabbitMQ 延迟消息解决方案都有一定的局限性。

如果你的消息 TTL 是相同的,使用 DLX + TTL 的这种方式是没问题的,对于我来说目前还是优选。

如果你的消息 TTL 过期值是可变的,可以尝试下使用 Delayed Message 插件,对于某些应用而言它可能很好用,对于那些可能会达到高容量延迟消息应用而言,则不是很好。

关于 RabbitMQ 延迟队列,如果你有更多其它实现,欢迎关注公众号 “Nodejs技术栈” 在后台取得我的联系方式进行讨论,我很期待。

Reference

  • github.com/rabbitmq/rabbitmq-delayed-message-exchange
  • www.rabbitmq.com/community-plugins.html

作者简介:五月君,Nodejs Developer,慕课网认证作者,热爱技术、喜欢分享的 90 后青年,欢迎关注 Nodejs技术栈(id:NodejsRoadmap) 和 Github 开源项目 https://www.nodejs.red

敬请关注「Nodejs技术栈」微信公众号,获取优质文章

▼往期精彩回顾▼Node.js 如何利用 Libuv 实现事件循环和异步Nodejs 进阶:解答 Cluster 模块的几个疑问多维度分析 Express、Koa 之间的区别你需要了解的有关 Node.js 的所有信息Node.js 服务 Docker 容器化应用实践一文零基础教你学会 Docker 入门到实践JavaScript 浮点数之迷:大数危机Node.js 是什么?我为什么选择它?分享 10 道 Nodejs 进程相关面试题不容错过的 Node.js 项目架构

rabbitmq 延迟队列_Delayed Message 插件实现 RabbitMQ 延迟队列相关推荐

  1. Delayed Message 插件实现 RabbitMQ 延迟队列

    延迟队列是为了存放那些延迟执行的消息,待消息过期之后消费端从队列里拿出来执行. DLX + TTL 方式存在的时序问题 对于延迟队列不管是 AMQP 协议或者 RabbitMQ 本身是不支持的,之前有 ...

  2. 【外行也能看懂的RabbitMQ系列(四)】—— RabbitMQ进阶篇之通过插件实现延迟队列(内含实现代码及rabbitmq_delayed_message_exchange安装)

    系列文章目录 准备篇 RabbitMQ安装文档 第一章 RabbitMQ快速入门篇 第二章 RabbitMQ的Web管理界面详解 第三章 RabbitMQ进阶篇之死信队列 第四章 RabbitMQ进阶 ...

  3. RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列

    搭建SpringBoot项目,用于演示 springboot版本 <!-- spring boot --><dependency><groupId>org.spri ...

  4. Rabbitmq超级详细的笔记,包括安装,基本命令,rabbitmq的七种消息模式,以及死信队列,延迟队列,优先级队列和惰性队列的介绍

    RabbitMQ 文章目录 RabbitMQ 1 RabbitMQ介绍 1.1 基本介绍 1.2 RabbitMQ的安装 1.2.1 ubuntu20.04 安装rabbitmq 1.2.2 cent ...

  5. 【分布式】Rabbitmq死信队列模型、实战场景---订单延迟30min支付处理

    分布式 内容管理 死信队列 死信队列demo 死信队列消息模型 平台订单支付超时 --- 演示 业务分析 代码实现 RabbitMQ 死信队列/ 延迟队列 - 延迟业务逻辑 最近可能分布式进入Redi ...

  6. rabbitmq 延迟队列_框架系列|中间件RabbitMQ必看17道面试题

    46. RabbitMQ 的使用场景有哪些? 抢购活动,削峰填谷,防止系统崩塌. 延迟信息处理,比如 10 分钟之后给下单未付款的用户发送邮件提醒. 解耦系统,对于新增的功能可以单独写模块扩展,比如用 ...

  7. 【SpringBoot】43、SpringBoot中整合RabbitMQ实现延时队列(延时插件篇)

    死信队列实现篇,参考文章:[SpringBoot]60.SpringBoot中整合RabbitMQ实现延时队列(死信队列篇) 一.介绍 1.什么是延时队列? 延时队列即就是放置在该队列里面的消息是不需 ...

  8. SpringBoot整合RabbitMQ 消息可靠投递、手动ack、延迟队列、死信队列、消息幂等性保障、消息积压

    1.消息可靠投递 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式. confirm 确认模式 ...

  9. 【重难点】【RabbitMQ 01】消息队列的作用、主流的消息队列、RabbitMQ 基于什么传输消息、RabbitMQ 模型架构、死信队列和延迟队列

    [重难点][RabbitMQ 01]消息队列的作用.主流的消息队列.RabbitMQ 基于什么传输消息.RabbitMQ 模型架构.死信队列和延迟队列 文章目录 [重难点][RabbitMQ 01]消 ...

最新文章

  1. SQL删除重复数据方法
  2. 使用CruiseControl.Net全面实现持续集成
  3. c# 睡眠3秒_C#中的闭包和意想不到的坑
  4. faster rcnn在自己的数据集上训练
  5. AVS高清立体视频编码器
  6. NLP—6.数据不平衡处理
  7. p6spy oracle,springboot p6spy 打印完整sql
  8. python修改pdf内容_python3.6调整字体Python处理pdf文件库 - PyPDF2详解
  9. Powerdesigner概念模型并将概念模型转换成物理模型
  10. Android Thermal
  11. Word生成目录后,前面还有摘要,怎么样让页码从正文第一页开始
  12. 中英文切换_值得收藏|不重装软件实现ArcGIS中英文版本之间切换
  13. 服务器性能监控之New Relic 入门教程
  14. 硬件设计中电容电感磁珠总结
  15. JavaSE_day02【类型转换、进制、运算符】
  16. python中使用matplotlib.pyplot画函数图像
  17. 没有鼠标就无法对计算机进行操作,电脑鼠标不灵敏是什么原因?怎么解决?
  18. 一周 Go World 新鲜事-2019W11
  19. 超详细的计算机视觉学习书籍pdf汇总(涉及CV、深度学习、多视图几何、SLAM、点云处理等)
  20. Linux 发展历程

热门文章

  1. recycleview 使用详解,添加头部尾部,混合item,侧滑菜单,跳转到指定位置,实现九宫格布局
  2. mysql 触发器 for each row 理解_“for each row”如何在mysql中的触发器中工作?
  3. InstallShield SdShowMsg未关闭导致安装程序无法停止
  4. execCommand全集
  5. android密码可见不可见的光标控制,Android EditText 在设置为输入密码的时候 密码是否可见 光标在最后显示...
  6. jQuery 实现Ajax
  7. (五)官方Neo4j 3.3.9 Java API例子
  8. 计算机专业sci二区论文难吗,通信专业二区sci难吗
  9. 【C语言进阶深度学习记录】三十七 C/C++中造成程序内存错误的原因(野指针)
  10. hive的row_number()、rank()和dense_rank()的区别以及具体使用