这是小小本周的第五篇,本篇将会说出六种延迟队列的基本实现方案

延迟队列的基本应用

延迟队列,顾名思义,就是具有队列的特性,再增加一个延迟消费队列的功能,可以指定队列中的消息在哪个时间点被消费。延迟队列在项目中的基本应用场景。

  1. 订单成功以后,在30分钟内没有支付,自动取消订单。

  2. 外卖平台发送订餐通知,下单成功后60s给用户推送短信。

  3. 如果订单一直处于一个某一个为完结的状态,及时处理关闭订单,退还库存。

  4. 淘宝新建商户一个月内还没上传商品信息,冻结商铺。

延迟队列的实现

DelayQueue

这里使用的是Java.util.concurrent包下DelayQueue 简单的实现效果,添加三个订单入队,分别设置5s,10s,15s,出队。

代码如下

public class Order implements Delayed {/*** 延迟时间*/@JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")private long time;String name;public Order(String name, long time, TimeUnit unit) {this.name = name;this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);}@Overridepublic long getDelay(TimeUnit unit) {return time - System.currentTimeMillis();}@Overridepublic int compareTo(Delayed o) {Order Order = (Order) o;long diff = this.time - Order.time;if (diff <= 0) {return -1;} else {return 1;}}
}

main方法如下

public class DelayQueueDemo {public static void main(String[] args) throws InterruptedException {Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);DelayQueue<Order> delayQueue = new DelayQueue<>();delayQueue.put(Order1);delayQueue.put(Order2);delayQueue.put(Order3);System.out.println("订单延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));while (delayQueue.size() != 0) {/*** 取队列头部元素是否过期*/Order task = delayQueue.poll();if (task != null) {System.out.format("订单:{%s}被取消, 取消时间:{%s}n", task.name, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));}Thread.sleep(1000);}}
}

执行效果如下

订单延迟队列开始时间:2020-05-06 14:59:09
订单:{Order1}被取消, 取消时间:{2020-05-06 14:59:14}
订单:{Order2}被取消, 取消时间:{2020-05-06 14:59:19}
订单:{Order3}被取消, 取消时间:{2020-05-06 14:59:24}

Quartz定时任务

这里使用QUartz实现定时任务框架。这里引入quartz框架

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

添加注解开启定时任务

@EnableScheduling
@SpringBootApplication
public class DelayqueueApplication {public static void main(String[] args) {SpringApplication.run(DelayqueueApplication.class, args);}
}

编写定时任务,每五秒执行

@Component
public class QuartzDemo {//每隔五秒@Scheduled(cron = "0/5 * * * * ? ")public void process(){System.out.println("我是定时任务!");}
}

Redis sorted set

Redis数据结构为Zset,同样可以实现延迟队列的效果,利用score属性,redis通过score类为集合中的成员从小到大进行排序。通过zadd命令,队列中delayqueue中添加元素,并设置score值表示元素过期时间,然后分别添加三个order1,order2,order3,然后判断什么时候过期

 zadd delayqueue 3 order3

消费端获取,轮询的结果,然后进行对比

    /*** 消费消息*/public void pollOrderQueue() {while (true) {Set<Tuple> set = jedis.zrangeWithScores(DELAY_QUEUE, 0, 0);String value = ((Tuple) set.toArray()[0]).getElement();int score = (int) ((Tuple) set.toArray()[0]).getScore();Calendar cal = Calendar.getInstance();int nowSecond = (int) (cal.getTimeInMillis() / 1000);if (nowSecond >= score) {jedis.zrem(DELAY_QUEUE, value);System.out.println(sdf.format(new Date()) + " removed key:" + value);}if (jedis.zcard(DELAY_QUEUE) <= 0) {System.out.println(sdf.format(new Date()) + " zset empty ");return;}Thread.sleep(1000);}}

查看是否符合预期

2020-05-07 13:24:09 add finished.
2020-05-07 13:24:19 removed key:order1
2020-05-07 13:24:29 removed key:order2
2020-05-07 13:24:39 removed key:order3
2020-05-07 13:24:39 zset empty

Redis 过期回调

Redis 中的key的过期回调事件,也能达到延迟队列的效果。开启监听key是否过期的事件,如果过期,将会触发callback事件。修改redis.conf文件,开启

notify-keyspace-events Ex

Redis 监听配置,注入Bean的RedisMessageListenerContainer

@Configuration
public class RedisListenerConfig {@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);return container;}
}

编写Redis过期回调监听方法

@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}@Overridepublic void onMessage(Message message, byte[] pattern) {String expiredKey = message.toString();System.out.println("监听到key:" + expiredKey + "已过期");}
}

测试

添加key,设置3s过期

 set xiaofu 123 ex 3

成功监听到了

监听到过期的key为:xiaofu

RabbitMq 延迟队列

利用RabbitMq做延迟队列,通过TTL和DXL两个属性实现。

TTL

消息过期时间

设置队列过期时间,那么队列中所有消息都具有相同的过期时间。
设置消息过期时间,对队列中的某一条消息设置过期时间,每条消息TTL都可以不同。

如果同时设置队列和队列中消息的TTL,则TTL值以两者中较小的值为准。而队列中的消息存在队列中的时间,一旦超过TTL过期时间则成为Dead Letter(死信)。

DLX

DLX 为死信交换机,绑定在死信交换机上的队列,rabbitmq,可以配置两个参数,一旦出现死信,吃屎将会路由到另外一个交换机,消息重新消费。

逻辑

下边结合一张图看看如何实现超30分钟未支付关单功能,我们将订单消息A0001发送到延迟队列order.delay.queue,并设置x-message-tt消息存活时间为30分钟,当到达30分钟后订单消息A0001成为了Dead Letter(死信),延迟队列检测到有死信,通过配置x-dead-letter-exchange,将死信重新转发到能正常消费的关单队列,直接监听关单队列处理关单逻辑即可。

代码

发送消息指定延迟时间

public void send(String delayTimes) {amqpTemplate.convertAndSend("order.pay.exchange", "order.pay.queue","大家好我是延迟数据", message -> {// 设置延迟毫秒值message.getMessageProperties().setExpiration(String.valueOf(delayTimes));return message;});}
}

设置延迟队列出现死信后的转发规则

/*** 延时队列*/@Bean(name = "order.delay.queue")public Queue getMessageQueue() {return QueueBuilder.durable(RabbitConstant.DEAD_LETTER_QUEUE)// 配置到期后转发的交换.withArgument("x-dead-letter-exchange", "order.close.exchange")// 配置到期后转发的路由键.withArgument("x-dead-letter-routing-key", "order.close.queue").build();}

时间轮

基本概念

wheel

时间轮,图中的圆盘可以看做钟表的刻度,一圈round长度为24秒,刻度为8,每一个刻度为3秒,时间精度为3秒。添加一个定时,延时任务A,如果延时25s后自行,时间轮会根据长度,和刻度得到一个圈数和指针的位置,也是就任务A会绕一圈指向0格子上,此时时间轮会记录该任务的round和 index信息。当round=0,index=0 ,指针指向0格子 任务A并不会执行,因为 round=0不满足要求。所以每一个格子代表的是一些时间,比如1秒和25秒 都会指向0格子上,而任务则放在每个格子对应的链表中,这点和HashMap的数据有些类似。

下面我们用Netty 简单实现延时队列,HashedWheelTimer构造函数比较多,解释一下各参数的含义。

ThreadFactory :表示用于生成工作线程,一般采用线程池;tickDuration和unit:每格的时间间隔,默认100ms;ticksPerWheel:一圈下来有几格,默认512,而如果传入数值的不是2的N次方,则会调整为大于等于该参数的一个2的N次方数值,有利于优化hash值的计算。

public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {this(threadFactory, tickDuration, unit, ticksPerWheel, true);}

TimerTask:一个定时任务的实现接口,其中run方法包装了定时任务的逻辑。Timeout:一个定时任务提交到Timer之后返回的句柄,通过这个句柄外部可以取消这个定时任务,并对定时任务的状态进行一些基本的判断。Timer:是HashedWheelTimer实现的父接口,仅定义了如何提交定时任务和如何停止整个定时机制。

public class NettyDelayQueue {public static void main(String[] args) {final Timer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 5, TimeUnit.SECONDS, 2);//定时任务TimerTask task1 = new TimerTask() {public void run(Timeout timeout) throws Exception {System.out.println("order1  5s 后执行 ");timer.newTimeout(this, 5, TimeUnit.SECONDS);//结束时候再次注册}};timer.newTimeout(task1, 5, TimeUnit.SECONDS);TimerTask task2 = new TimerTask() {public void run(Timeout timeout) throws Exception {System.out.println("order2  10s 后执行");timer.newTimeout(this, 10, TimeUnit.SECONDS);//结束时候再注册}};timer.newTimeout(task2, 10, TimeUnit.SECONDS);//延迟任务timer.newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {System.out.println("order3  15s 后执行一次");}}, 15, TimeUnit.SECONDS);}
}

执行结果

order1  5s 后执行
order2  10s 后执行
order3  15s 后执行一次
order1  5s 后执行
order2  10s 后执行

小明菜市场

推荐阅读

● 实战 | Element UI 父子组件传值与事件绑定(逆向)

● 实战 | Element UI 父子组件传值与事件绑定(正向)

● 实战 | Vue + Element UI 表格组件二次封装

● 应用 | Redis实现 主从,单例,集群,哨兵,配置应用

● 了解 | 你必须了解的Mysql 三大日志

理论 | 六种延迟队列的实现方案相关推荐

  1. redis延迟队列 实现_灵感来袭,基于Redis的分布式延迟队列(续)

    背景 上一篇(灵感来袭,基于Redis的分布式延迟队列)讲述了基于Java DelayQueue和Redis实现了分布式延迟队列,这种方案实现比较简单,应用于延迟小,消息量不大的场景是没问题的,毕竟J ...

  2. 你真的知道怎么实现一个延迟队列吗 ?

    作者:xiewang,腾讯 IEG 运营开发工程师 前言 延迟队列是我们日常开发过程中,经常接触并需要使用到的一种技术方案.前些时间在开发业务需求时,我也遇到了一个需要使用到延迟消息队列的需求场景,因 ...

  3. python延时队列_如何通过Python实现RabbitMQ延迟队列

    最近在做一任务时,遇到需要延迟处理的数据,最开始的做法是现将数据存储在数据库,然后写个脚本,隔五分钟扫描数据表再处理数据,实际效果并不好.因为系统本身一直在用rabbitmq做异步处理任务的中间件,所 ...

  4. 电商项目订单取消(Redis 延迟队列)--1

    现功能时的选择很重要,如果你的系统所处理的数据量不是很大,我觉得队列和缓存很适合你,这样你可以对消息的传递更加了解,但你使用MQ,kafka的中间件时,你会发现使用起来更加轻松,但对于数据量大的系统来 ...

  5. 你真的知道怎么实现一个延迟队列吗?

    原文地址:https://mp.weixin.qq.com/s/jL8_23pjYWV74rsjoWNPWg 目录 前言 延迟队列定义 应用场景 实现方案 Redis zset TimeWheel 时 ...

  6. 履约系统接单和制作流转方案优化-基于JDK延迟队列

    目录 改进思路 履约系统新的设计方案 创建制作单后加入待接单延迟队列

  7. haddler处理队列 netty_如何实现延迟队列

    延迟队列的需求各位应该在日常开发的场景中经常碰到.比如: 用户登录之后5分钟给用户做分类推送: 用户多少天未登录给用户做召回推送: 定期检查用户当前退款账单是否被商家处理等等场景. 一般这种场景和定时 ...

  8. 高可用延迟队列设计与实现

    延迟队列:一种带有 延迟功能 的消息队列 延时 → 未来一个不确定的时间 mq → 消费行为具有顺序性 这样解释,整个设计就清楚了.你的目的是 延时,承载容器是 mq. 背景 列举一下我日常业务中可能 ...

  9. 【RabbitMQ】一文带你搞定RabbitMQ延迟队列

    本文口味:鱼香肉丝   预计阅读:10分钟 0|1一.说明 在上一篇中,介绍了RabbitMQ中的死信队列是什么,何时使用以及如何使用RabbitMQ的死信队列.相信通过上一篇的学习,对于死信队列已经 ...

最新文章

  1. Unity3D是怎么提升游戏运行效率的?
  2. vscode wecode的配置_使用体验神似VS Code?三步带你了解华为云CloudIDE前世今生
  3. 线程同步——内核对象实现线程同步——等待函数
  4. mysql 运维 最佳实践_Mysql 开发最佳实践
  5. jdk 安装_Jdk 安装使用教程
  6. 从Java直接输出到flume_js生成日志信息及实现java直接调用flume
  7. CCF NOI1046 打印方阵
  8. intel fortran免费版安装
  9. OPPO Find X5系列领衔OPPO春季新品发布会,多款产品亮相
  10. mysql数据库迁移工具_MysqlToMsSql(数据库迁移工具)
  11. ios 程序中安装 描述文件
  12. STM32三种BOOT模式
  13. 火车售票系统/C语言
  14. 改变命运的21个黄金法则
  15. 由于找不到d3dx9_42.dll,无法继续执行代码。
  16. android笔记:长按APP图标弹出快捷方式(shortcuts)
  17. 【转】安卓知道:手机IMEI是什么?有什么作用?如何检查它
  18. python计算圆的体积_[宜配屋]听图阁
  19. 8路抢答器proteus仿真 2种电路图
  20. UG模具设计干货!内滑块设计细节

热门文章

  1. 终于等到了scilab 5.1.1
  2. mysql 主主+keepalive
  3. 发布一个mmap的trie_midrmm02_新浪博客
  4. 湖北文理学院数学与计算机科学学院,数学与计算机科学学院计算机协会十一月总结会议...
  5. android 480p分辨率,[RK3399][Android7.1] HDMI显示屏(副屏)调试记录小结
  6. (43) 讨论和通知
  7. 我的Android进阶之旅------gt;Android Studio 快捷键整理分享
  8. [LeetCode]238.Product of Array Except Self
  9. 为什么使用 Redis及其产品定位
  10. Git学习系列(六)解决分支冲突及分支管理策略