延时队列的几种实现方式

何为延迟队列?

顾名思义,首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。

延时队列能做什么?

延时队列多用于需要延时工作的场景。最常见的是以下场景:

延迟消费,比如:

1 ,订单成功后,在 30 分钟内没有支付,自动取消订单

2 ,如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存

3 ,支付成功后, 2 秒后查询支付结果

4 , ……

如何实现?

实现延时队列的方式有很多种,本文主要介绍以下几种常见的方式:

1. 基于 DelayQueue 实现的本地延时队列。
2. 基于 RabbitMQ 死信队列实现的延时队列。
3. 基于 RabbitMQ 插件实现的延时队列。

1. 基于 DelayQueue 实现的本地延时队列

一个最简单的解决方案就是使用 JDK Java.util.concurrent 包下 DelayQueue。(实际上,如无必要,我们应该尽可能使用 jdk 自带的一些类库,而非重复造轮子,或者过度设计)。

DelayQueue 是一个 BlockingQueue (无界阻塞)队列,它本质就是封装了一个 PriorityQueue (优先级队列),并加上了延时功能。可以这么说,DelayQueue 就是一个使用优先队列(PriorityQueue)实现的 BlockingQueue,优先队列的比较基准值是时间。即:

DelayQueue = BlockingQueue + PriorityQueue + Delayed

从继承层次上看:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E>

DelayQueue 是 一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。当一个元素的 getDelay (TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即使无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size 方法同时返回到期和未到期元素的计数。此队列不允许使用 null 元素。

要实现 DelayQueue 延时队列,队中元素要 implements Delayed 接口,这个接口里只有一个 getDelay 方法,用于设置延期时间。Delayed 由实现了 Comparable 接口, compareTo 方法负责对队列中的元素进行排序。

下面看一个demo:

首先定义一个 我们自定义的 Delayed 实现:

public class DelayMessage<T extends Runnable> implements Delayed {private static final int MINUS_ONE = -1;private final long time;private final T task;/*** @param timeout 毫秒* @param t       T extends Runnable*/public DelayMessage(long timeout, T t) {this.time = System.nanoTime() + timeout;this.task = t;}/*** 返回与此对象相关的剩余延迟时间,以给定的时间单位表示*/@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);}@Overridepublic int compareTo(Delayed delayed) {// 过期时间长的放置在队列尾部if (this.getDelay(TimeUnit.MICROSECONDS) > getDelay(TimeUnit.MICROSECONDS)) {return 1;}// 过期时间短的放置在队列头if (this.getDelay(TimeUnit.MICROSECONDS) < getDelay(TimeUnit.MICROSECONDS)) {return -1;}return 0;}public T getTask() {return this.task;}@Overridepublic int hashCode() {return task.hashCode();}@Overridepublic boolean equals(Object obj) {return task.equals(obj);}}

可以看到,我们自定义的 DelayMessage 里面的元素是一个 Runnable 对象,这是为了我们方便把延时队列中的对象丢到线程池里面去执行。

接下来,再使用 DelayQueue + 线程池 实现一个工具类,用来从 DelayQueue 中取出 Runnable 对象, 放到线程池去执行:

@Component
public class DealyQueueManager {/*** 可缓冲的线程池*/private ExecutorService executor;/*** 延时队列*/private DelayQueue<DelayMessage<?>> delayQueue;/*** 初始化*/@PostConstruct@SuppressWarnings({"PMD.AvoidManuallyCreateThreadRule", "PMD.ThreadPoolCreationRule"})public void init() {executor = newCachedThreadPool();delayQueue = new DelayQueue<>();//后台线程,监听延时队列Thread daemonThread = new Thread(this::execute);daemonThread.setName("本地延时队列-DelayQueueMonitor");daemonThread.start();}private void execute() {while (true) {try {// 从延时队列中获取任务,如果队列为空, take 方法将会阻塞在这里DelayMessage<? extends Runnable> delayMessage = delayQueue.take();Runnable task = delayMessage.getTask();if (null == task) {continue;}// 提交到线程池执行 taskexecutor.execute(task);} catch (Exception e) {e.printStackTrace();}}}/*** 添加任务** @param task 待延迟执行的任务* @param time 延时时间* @param unit 时间单位*/public void put(Runnable task, long time, TimeUnit unit) {// 获取延时时间long timeout = TimeUnit.NANOSECONDS.convert(time, unit);// 将任务封装成实现 Delayed 接口的消息体DelayMessage<? extends Runnable> delayMessage = new DelayMessage<>(timeout, task);// 将消息体放到延时队列中delayQueue.put(delayMessage);}/*** 删除任务*/public boolean removeTask(Runnable task) {return delayQueue.remove(task);}}

这样,我们就基于 DelayQueue 实现了一个高效的本地延时队列, 但是缺点就是 在多节点实例部署时,不能同步消息,同步消费,也不能持久化。因此我们可以考虑使用 RabbitMQ 实现的延时队列解决这些问题。

2. 基于 RabbitMQ 死信队列实现的延时队列

使用 RabbitMQ 实现延时队列主要用到了它的两个特性:一个是 Time-To-Live Extensions(TTL),另一个是 Dead Letter Exchanges(DLX)。

Time-To-Live Extensions

RabbitMQ 允许我们为消息或者队列设置 TTL(time to live),也就是过期时间。TTL 表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了 TTL 或者当某条消息进入了设置了 TTL 的队列时,这条消息会在经过 TTL 秒后 “死亡”,成为 Dead Letter。如果既配置了消息的 TTL,又配置了队列的 TTL,那么较小的那个值会被取用。

Dead Letter Exchange

刚才提到了,被设置了 TTL 的消息在过期后会成为 Dead Letter。其实在 RabbitMQ 中,一共有三种消息的 “死亡” 形式:

  1. 消息被拒绝。通过调用 basic.reject 或者 basic.nack 并且设置的 requeue 参数为 false。
  2. 消息因为设置了 TTL 而过期。
  3. 消息进入了一条已经达到最大长度的队列。

如果队列设置了 Dead Letter Exchange(DLX),那么这些 Dead Letter 就会被重新 publish 到 Dead Letter Exchange,通过 Dead Letter Exchange 路由到其他队列。

聪明的你肯定已经想到了,如何将 RabbitMQ 的 TTL 和 DLX 特性结合在一起,实现一个延迟队列。

如上图所示,生产者产生的消息首先会进入缓冲队列(图中红色队列)。通过 RabbitMQ 提供的 TTL 扩展,这些消息会被设置过期时间,也就是延迟消费的时间。等消息过期之后,这些消息会通过配置好的 DLX 转发到实际消费队列(图中蓝色队列),以此达到延时消费的效果。

Demo 示例:

首先我们需要先准备好交换机和队列:

交换机(Exchange) 队列(Queue) 绑定 key 队列属性
my.dead.exchange my.dead.queue my.dead.key x-dead-letter-exchange: my.msg.exchange
x-dead-letter-routing-key: my.msg.key
my…msg.exchange my.msg.queue my.msg.key

我们先通过上面的表格定义添加好交换机和队列,首先 定义两个交换机,两个队列, 并分别绑定,注意我们在创建 队列: my.dead.queue 时需要添加两个属性:x-dead-letter-exchange:my.msg.exchangex-dead-letter-routing-key: my.msg.key, 这样,我们就只需要往队列 my.dead.queue 发送消息并设置过期时间, 等到 队列my.dead.queue 中的消息过期时,就会被转发到和交换机 my..msg.exchange 绑定的 key 为 my.msg.key 的队列 my.msg.queue 中,因此,我们只需要监听: my.msg.queue 队列就能收到 队列 my.dead.queue 中的延迟消息了。

下面代码以 SpringBoot + rabbitmq 为例:

发布消息:

@Component
@Slf4j
public class DeadDelayMessagePublisher<T> {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** @param payload 消息体* @param delay   延迟时间:单位 毫秒(ms)*/public void sendDelay(T payload, long delay) {MQMessage message = new MQMessage<T>(payload);rabbitTemplate.convertAndSend("my.dead.exchange", "my.dead.key", message, process -> {// 设置消息过期时间 单位: msprocess.getMessageProperties().setExpiration(String.valueOf(delay));return process;}, new CorrelationData(message.getId()));}
}

监听消息:

@Component
@Slf4j
public class DelayedListener {@RabbitListener(queues = "my.msg.queue")public void receiveMessage(MQMessage<PosMessage> message{// 在此处理收到延时消息的逻辑}}

这种实现方式其实是有一个弊端的,加入我们有两个消息一前一后进入 队列 my.dead.queue,前面的消息过期时间为 1 分钟, 后面的消息过期时间为 30 秒, 那以这种方式实现的延时队列, 是必须要等到 1分钟的消息消费完后才能轮到 30 秒那个消息。

为解决这个问题,我们可以使用下面这种方式:

3. 基于 RabbitMQ 插件实现的延时队列

这里使用的是一个 RabbitMQ 延迟消息插件 rabbitmq-delayed-message-exchange,目前维护在 RabbitMQ 插件社区,我们可以声明 x-delayed-message 类型的 Exchange,消息发送时指定消息头 x-delay 以毫秒为单位将消息进行延迟投递。

实现原理:

上面使用 DLX + TTL 的模式,消息首先会路由到一个正常的队列,根据设置的 TTL 进入死信队列,与之不同的是通过 x-delayed-message 声明的交换机,它的消息在发布之后不会立即进入队列,先将消息保存至 Mnesia

这个插件将会尝试确认消息是否过期,首先要确保消息的延迟范围是 Delay > 0, Delay =<ERL_MAX_T(在 Erlang 中可以被设置的范围为 (2^32)-1 毫秒),如果消息过期通过 x-delayed-type 类型标记的交换机投递至目标队列,整个消息的投递过程也就完成了。

插件安装

根据你的 RabbitMQ 版本来安装相应插件版本,RabbitMQ community-plugins 上面有版本对应信息可参考。

注意:需要 RabbitMQ 3.5.3 和更高版本。

启用插件

使用 rabbitmq-plugins enable 命令启用插件,启动成功会看到如下提示:

$ rabbitmq-plugins enable rabbitmq_delayed_message_exchange
The following plugins have been enabled:rabbitmq_delayed_message_exchangeApplying plugin configuration to rabbit@xxxxxxxx... started 1 plugin.

管理控制台声明 x-delayed-message 交换机

在开始代码之前先打开 RabbitMQ 的管理 UI 界面,声明一个 x-delayed-message 类型的交换机,否则你会遇到下面的错误:

Error: Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - Invalid argument, 'x-delayed-type' must be an existing exchange type"

详情可见 Github Issues rabbitmq-delayed-message-exchange/issues/19,正确操作如下图所示:

按照如下表格在 rabbitmq-admin 建好插件形式的交换机和队列,并填写正确的属性

交换机 type 队列 key 交换机属性
my-delayed-exchange x-delayed-meassage my-dealyed-queue my-delayed-key x-delayed-type:direct

代码示例和普通的交换机队列使用基本一致。

@Component
public class DelayMessagePublisher<T> {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** @param exchange mq exahange* @param key      bindKey* @param payload  消息体* @param delay    延迟时间 单位: ms*/public void sendDelay(T payload, long delay) {MQMessage message = new MQMessage<T>(payload);rabbitTemplate.convertAndSend("my-delayed-exchange", "my-delayed-key", message, process -> {// 插件形式设置消息延迟发送时间process.getMessageProperties().setDelay((int) delay);return process;}, new CorrelationData(message.getId()));}
}

监听消息:

@Component
@Slf4j
public class MQDelayedListener {@RabbitListener(queues = "my-dealyed-queue")public void receiveMessage(MQMessage<T> message{// 在此处理收到延时消息的逻辑}}

基于这种插件的形式,我们可以实现,过期消息立刻处理,以弥补死信队列的不足之处。

还有一些其它方法也可以实现延时队列,比如使用 redis 的 sortedset ,还有一些比较复杂的延时队列的算法实现的,比如: 时间轮 。Kafka、Netty 都有基于时间轮算法实现延时队列。这些就不再介绍,感兴趣的可以去网上了解一下,有很多文章讲解的很不错。

对比 :

使用难度 多实例 过期消息能都立刻处理
DelayQueue JDK 自带,集成方便 不支持
RabbitMQ 死信队列 依赖 RabbitMQ 中间件,集成略微复杂 支持 否(FIFO)
RabbitMQ 插件 依赖 RabbitMQ 中间件并且需要安装插件,集成复杂 支持

总结:

之所以写这篇文章,是因为项目中有个需求,支付完成后,需要延时获取支付结果,因此需要用到延时队列,由于一些列原因,进行了一些技术变动,开始项目使用的是:JDK 自带的 DelayQueue 实现,后来为了支持多实例,又采用了 RabbitMQ 延时插件 实现,再后来测试环境延时插件收不到消息,也不是很稳定,又改为了 RabbitMQ 死信队列 的方式实现延时队列。

我从中总结到的经验就是:

  1. 前期的技术选型要考虑充分,频繁变更技术细节的事情应当避免或减少发生。
  2. 用好依赖倒置原则,也就是用接口隔离底层实现,屏蔽掉底层细节的变更。在这次项目中,一开始就使用依赖倒置原则屏蔽了 延时队列的具体实现, 所以虽然变更了三次技术细节,但是改动起来还是很顺利的,变更的代码不影响上层业务逻辑。

延时队列的几种实现方式相关推荐

  1. RabbitMQ自学之路(九)——RabbitMQ实现延时队列的两种方式

    一.什么是延时队列 延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费. 二.延时队列应用于什么场景 场景一:在订单系统中,一个用户下单之后通常有30分钟的时间 ...

  2. 延时任务的四种实现方式

    什么是延迟任务? 顾明思议,我们把需要延迟执行的任务叫做延迟任务. 延迟任务的使用场景有以下这些: 红包 24 小时未被查收,需要延迟执退还业务: 每个月账单日,需要给用户发送当月的对账单: 订单下单 ...

  3. 队列的两种存储方式的介绍与实现

    简介 队列是一种特殊的线性表,它的特殊之处在于它必须在队列的头部进行删除元素,在队列尾部插入元素.我们把队列的头部称为 对头(front),队列尾部叫做 队尾(rear).进行删除元素位置叫队头(fr ...

  4. redis stream java消息队列_Redis-消息队列的两种实现方式

    索引: 基于list的实现方式 基于publish/subscribe 实战 消息队列简介 消息队列:是消息的顺序集合. 比如网站的PV统计和查看,传统方式就是每个页面发一个AJAX然后mysql给P ...

  5. 队列的两种存储方式的介绍与实现(后续)

    简介 队列分为顺序存储结构和链式存储结构,链式存储结构其实就是线性表的单链表,只是只能在对头出元素,队尾进元素而已.从之前实现的队列的顺序存储结构中 我们可以看到他的缺点,我们为了避免"假溢 ...

  6. 【SpringBoot】43、SpringBoot中整合RabbitMQ实现延时队列(延时插件篇)

    死信队列实现篇,参考文章:[SpringBoot]60.SpringBoot中整合RabbitMQ实现延时队列(死信队列篇) 一.介绍 1.什么是延时队列? 延时队列即就是放置在该队列里面的消息是不需 ...

  7. 【240期】面试官问:说说基于 Redis 实现延时队列服务?

    点击上方"Java精选",选择"设为星标" 别问别人为什么,多问自己凭什么! 下方有惊喜,留言必回,有问必答! 每天 08:15 更新文章,每天进步一点点... ...

  8. PHP怎样写延时队列(定时器)

    背景 PHP没有定时器,依托的都是crontab这样的系统工具,也没有go中defer这样的延时方法,本文介绍几种PHP写延时队列的几种姿势. 延时队列的定义 普通的队列是先进先出,但是延时队列并不是 ...

  9. 一口气说出 6种 延时队列的实现方案,大厂offer稳稳的

    下边会介绍多种实现延时队列的思路,文末提供有几种实现方式的 github地址.其实哪种方式都没有绝对的好与坏,只是看把它用在什么业务场景中,技术这东西没有最好的只有最合适的. 一.延时队列的应用 什么 ...

最新文章

  1. [Asp.net]c#中的斜杠和反斜杠
  2. 剑指Offer(Java实现)删除链表中重复的结点
  3. 数据库-MySQL-Java数据库连接-JDBC
  4. 凝聚共识 聚力前行丨《数据库系统的分类和评测研究》报告发布
  5. OpenGL中的投影使用
  6. NotePad++ 调试PHP代码中文显示乱码
  7. FileInputStreamFileOutputStream
  8. 对python程序设计的学习心得_程序设计心得体会-精选模板
  9. 变异记录文件格式 vcf
  10. c语言同构数循环,C语言求同构数.pdf
  11. swift 实现音视频播放器
  12. Mixly 软件的基本应用
  13. 禾多科技与RTI达成合作,加速自动驾驶在中国量产落地
  14. 视频格式720P、1080i 和 1080P
  15. 大数据千亿级离线数仓项目第三天 维度数据分析与业务开发
  16. 1.1微信支付之现金红包 - Java 开发
  17. cobbler部署与示例
  18. 7-2 输出数组元素分数 20
  19. c语言常量定义规则,c语言常量(c语言常量定义规则)
  20. 热爱,对待人生的首要态度

热门文章

  1. 【无标题】 中国小龙虾市场消费状况与盈利前景预测报告(新版)2022-2027年
  2. 【MOOC浙大翁恺】C语言学习笔记
  3. java 程序设计 第八版,java语言程序设计第八版答案
  4. c语言经典例题及其答案详解,100个经典c语言例题(带答案)
  5. windows2008服务器IIS7下php程序伪静态处理 猫叔
  6. 计算机视觉:人工智能领域当下火热的计算机视觉技术综述
  7. 佳明520自制中文地图
  8. 北京故宫博物院钟表馆改陈完成 以新面貌与观众见面
  9. Excel如何批量从身份证中提取地址?
  10. 【ROS-Gazebo】将sdf文件转为urdf的方法