理论 | 六种延迟队列的实现方案
这是小小本周的第五篇,本篇将会说出六种延迟队列的基本实现方案
延迟队列的基本应用
延迟队列,顾名思义,就是具有队列的特性,再增加一个延迟消费队列的功能,可以指定队列中的消息在哪个时间点被消费。延迟队列在项目中的基本应用场景。
订单成功以后,在30分钟内没有支付,自动取消订单。
外卖平台发送订餐通知,下单成功后60s给用户推送短信。
如果订单一直处于一个某一个为完结的状态,及时处理关闭订单,退还库存。
淘宝新建商户一个月内还没上传商品信息,冻结商铺。
延迟队列的实现
DelayQueue
这里使用的是Java.util.concurrent包下DelayQueue 简单的实现效果,添加三个订单入队,分别设置5s,10s,15s,出队。
代码如下
public class Order implements Delayed {/*** 延迟时间*/@JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")private long time;String name;public Order(String name, long time, TimeUnit unit) {this.name = name;this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);}@Overridepublic long getDelay(TimeUnit unit) {return time - System.currentTimeMillis();}@Overridepublic int compareTo(Delayed o) {Order Order = (Order) o;long diff = this.time - Order.time;if (diff <= 0) {return -1;} else {return 1;}}
}
main方法如下
public class DelayQueueDemo {public static void main(String[] args) throws InterruptedException {Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);DelayQueue<Order> delayQueue = new DelayQueue<>();delayQueue.put(Order1);delayQueue.put(Order2);delayQueue.put(Order3);System.out.println("订单延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));while (delayQueue.size() != 0) {/*** 取队列头部元素是否过期*/Order task = delayQueue.poll();if (task != null) {System.out.format("订单:{%s}被取消, 取消时间:{%s}n", task.name, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));}Thread.sleep(1000);}}
}
执行效果如下
订单延迟队列开始时间:2020-05-06 14:59:09
订单:{Order1}被取消, 取消时间:{2020-05-06 14:59:14}
订单:{Order2}被取消, 取消时间:{2020-05-06 14:59:19}
订单:{Order3}被取消, 取消时间:{2020-05-06 14:59:24}
Quartz定时任务
这里使用QUartz实现定时任务框架。这里引入quartz框架
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
添加注解开启定时任务
@EnableScheduling
@SpringBootApplication
public class DelayqueueApplication {public static void main(String[] args) {SpringApplication.run(DelayqueueApplication.class, args);}
}
编写定时任务,每五秒执行
@Component
public class QuartzDemo {//每隔五秒@Scheduled(cron = "0/5 * * * * ? ")public void process(){System.out.println("我是定时任务!");}
}
Redis sorted set
Redis数据结构为Zset,同样可以实现延迟队列的效果,利用score属性,redis通过score类为集合中的成员从小到大进行排序。通过zadd命令,队列中delayqueue中添加元素,并设置score值表示元素过期时间,然后分别添加三个order1,order2,order3,然后判断什么时候过期
zadd delayqueue 3 order3
消费端获取,轮询的结果,然后进行对比
/*** 消费消息*/public void pollOrderQueue() {while (true) {Set<Tuple> set = jedis.zrangeWithScores(DELAY_QUEUE, 0, 0);String value = ((Tuple) set.toArray()[0]).getElement();int score = (int) ((Tuple) set.toArray()[0]).getScore();Calendar cal = Calendar.getInstance();int nowSecond = (int) (cal.getTimeInMillis() / 1000);if (nowSecond >= score) {jedis.zrem(DELAY_QUEUE, value);System.out.println(sdf.format(new Date()) + " removed key:" + value);}if (jedis.zcard(DELAY_QUEUE) <= 0) {System.out.println(sdf.format(new Date()) + " zset empty ");return;}Thread.sleep(1000);}}
查看是否符合预期
2020-05-07 13:24:09 add finished.
2020-05-07 13:24:19 removed key:order1
2020-05-07 13:24:29 removed key:order2
2020-05-07 13:24:39 removed key:order3
2020-05-07 13:24:39 zset empty
Redis 过期回调
Redis 中的key的过期回调事件,也能达到延迟队列的效果。开启监听key是否过期的事件,如果过期,将会触发callback事件。修改redis.conf文件,开启
notify-keyspace-events Ex
Redis 监听配置,注入Bean的RedisMessageListenerContainer
@Configuration
public class RedisListenerConfig {@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);return container;}
}
编写Redis过期回调监听方法
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}@Overridepublic void onMessage(Message message, byte[] pattern) {String expiredKey = message.toString();System.out.println("监听到key:" + expiredKey + "已过期");}
}
测试
添加key,设置3s过期
set xiaofu 123 ex 3
成功监听到了
监听到过期的key为:xiaofu
RabbitMq 延迟队列
利用RabbitMq做延迟队列,通过TTL和DXL两个属性实现。
TTL
消息过期时间
设置队列过期时间,那么队列中所有消息都具有相同的过期时间。
设置消息过期时间,对队列中的某一条消息设置过期时间,每条消息TTL都可以不同。
如果同时设置队列和队列中消息的TTL,则TTL值以两者中较小的值为准。而队列中的消息存在队列中的时间,一旦超过TTL过期时间则成为Dead Letter(死信)。
DLX
DLX 为死信交换机,绑定在死信交换机上的队列,rabbitmq,可以配置两个参数,一旦出现死信,吃屎将会路由到另外一个交换机,消息重新消费。
逻辑
下边结合一张图看看如何实现超30分钟未支付关单功能,我们将订单消息A0001发送到延迟队列order.delay.queue,并设置x-message-tt消息存活时间为30分钟,当到达30分钟后订单消息A0001成为了Dead Letter(死信),延迟队列检测到有死信,通过配置x-dead-letter-exchange,将死信重新转发到能正常消费的关单队列,直接监听关单队列处理关单逻辑即可。
代码
发送消息指定延迟时间
public void send(String delayTimes) {amqpTemplate.convertAndSend("order.pay.exchange", "order.pay.queue","大家好我是延迟数据", message -> {// 设置延迟毫秒值message.getMessageProperties().setExpiration(String.valueOf(delayTimes));return message;});}
}
设置延迟队列出现死信后的转发规则
/*** 延时队列*/@Bean(name = "order.delay.queue")public Queue getMessageQueue() {return QueueBuilder.durable(RabbitConstant.DEAD_LETTER_QUEUE)// 配置到期后转发的交换.withArgument("x-dead-letter-exchange", "order.close.exchange")// 配置到期后转发的路由键.withArgument("x-dead-letter-routing-key", "order.close.queue").build();}
时间轮
基本概念
wheel
时间轮,图中的圆盘可以看做钟表的刻度,一圈round长度为24秒,刻度为8,每一个刻度为3秒,时间精度为3秒。添加一个定时,延时任务A,如果延时25s后自行,时间轮会根据长度,和刻度得到一个圈数和指针的位置,也是就任务A会绕一圈指向0格子上,此时时间轮会记录该任务的round和 index信息。当round=0,index=0 ,指针指向0格子 任务A并不会执行,因为 round=0不满足要求。所以每一个格子代表的是一些时间,比如1秒和25秒 都会指向0格子上,而任务则放在每个格子对应的链表中,这点和HashMap的数据有些类似。
下面我们用Netty 简单实现延时队列,HashedWheelTimer构造函数比较多,解释一下各参数的含义。
ThreadFactory :表示用于生成工作线程,一般采用线程池;tickDuration和unit:每格的时间间隔,默认100ms;ticksPerWheel:一圈下来有几格,默认512,而如果传入数值的不是2的N次方,则会调整为大于等于该参数的一个2的N次方数值,有利于优化hash值的计算。
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {this(threadFactory, tickDuration, unit, ticksPerWheel, true);}
TimerTask:一个定时任务的实现接口,其中run方法包装了定时任务的逻辑。Timeout:一个定时任务提交到Timer之后返回的句柄,通过这个句柄外部可以取消这个定时任务,并对定时任务的状态进行一些基本的判断。Timer:是HashedWheelTimer实现的父接口,仅定义了如何提交定时任务和如何停止整个定时机制。
public class NettyDelayQueue {public static void main(String[] args) {final Timer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 5, TimeUnit.SECONDS, 2);//定时任务TimerTask task1 = new TimerTask() {public void run(Timeout timeout) throws Exception {System.out.println("order1 5s 后执行 ");timer.newTimeout(this, 5, TimeUnit.SECONDS);//结束时候再次注册}};timer.newTimeout(task1, 5, TimeUnit.SECONDS);TimerTask task2 = new TimerTask() {public void run(Timeout timeout) throws Exception {System.out.println("order2 10s 后执行");timer.newTimeout(this, 10, TimeUnit.SECONDS);//结束时候再注册}};timer.newTimeout(task2, 10, TimeUnit.SECONDS);//延迟任务timer.newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {System.out.println("order3 15s 后执行一次");}}, 15, TimeUnit.SECONDS);}
}
执行结果
order1 5s 后执行
order2 10s 后执行
order3 15s 后执行一次
order1 5s 后执行
order2 10s 后执行
小明菜市场
推荐阅读
● 实战 | Element UI 父子组件传值与事件绑定(逆向)
● 实战 | Element UI 父子组件传值与事件绑定(正向)
● 实战 | Vue + Element UI 表格组件二次封装
● 应用 | Redis实现 主从,单例,集群,哨兵,配置应用
● 了解 | 你必须了解的Mysql 三大日志
理论 | 六种延迟队列的实现方案相关推荐
- redis延迟队列 实现_灵感来袭,基于Redis的分布式延迟队列(续)
背景 上一篇(灵感来袭,基于Redis的分布式延迟队列)讲述了基于Java DelayQueue和Redis实现了分布式延迟队列,这种方案实现比较简单,应用于延迟小,消息量不大的场景是没问题的,毕竟J ...
- 你真的知道怎么实现一个延迟队列吗 ?
作者:xiewang,腾讯 IEG 运营开发工程师 前言 延迟队列是我们日常开发过程中,经常接触并需要使用到的一种技术方案.前些时间在开发业务需求时,我也遇到了一个需要使用到延迟消息队列的需求场景,因 ...
- python延时队列_如何通过Python实现RabbitMQ延迟队列
最近在做一任务时,遇到需要延迟处理的数据,最开始的做法是现将数据存储在数据库,然后写个脚本,隔五分钟扫描数据表再处理数据,实际效果并不好.因为系统本身一直在用rabbitmq做异步处理任务的中间件,所 ...
- 电商项目订单取消(Redis 延迟队列)--1
现功能时的选择很重要,如果你的系统所处理的数据量不是很大,我觉得队列和缓存很适合你,这样你可以对消息的传递更加了解,但你使用MQ,kafka的中间件时,你会发现使用起来更加轻松,但对于数据量大的系统来 ...
- 你真的知道怎么实现一个延迟队列吗?
原文地址:https://mp.weixin.qq.com/s/jL8_23pjYWV74rsjoWNPWg 目录 前言 延迟队列定义 应用场景 实现方案 Redis zset TimeWheel 时 ...
- 履约系统接单和制作流转方案优化-基于JDK延迟队列
目录 改进思路 履约系统新的设计方案 创建制作单后加入待接单延迟队列
- haddler处理队列 netty_如何实现延迟队列
延迟队列的需求各位应该在日常开发的场景中经常碰到.比如: 用户登录之后5分钟给用户做分类推送: 用户多少天未登录给用户做召回推送: 定期检查用户当前退款账单是否被商家处理等等场景. 一般这种场景和定时 ...
- 高可用延迟队列设计与实现
延迟队列:一种带有 延迟功能 的消息队列 延时 → 未来一个不确定的时间 mq → 消费行为具有顺序性 这样解释,整个设计就清楚了.你的目的是 延时,承载容器是 mq. 背景 列举一下我日常业务中可能 ...
- 【RabbitMQ】一文带你搞定RabbitMQ延迟队列
本文口味:鱼香肉丝 预计阅读:10分钟 0|1一.说明 在上一篇中,介绍了RabbitMQ中的死信队列是什么,何时使用以及如何使用RabbitMQ的死信队列.相信通过上一篇的学习,对于死信队列已经 ...
最新文章
- Unity3D是怎么提升游戏运行效率的?
- vscode wecode的配置_使用体验神似VS Code?三步带你了解华为云CloudIDE前世今生
- 线程同步——内核对象实现线程同步——等待函数
- mysql 运维 最佳实践_Mysql 开发最佳实践
- jdk 安装_Jdk 安装使用教程
- 从Java直接输出到flume_js生成日志信息及实现java直接调用flume
- CCF NOI1046 打印方阵
- intel fortran免费版安装
- OPPO Find X5系列领衔OPPO春季新品发布会,多款产品亮相
- mysql数据库迁移工具_MysqlToMsSql(数据库迁移工具)
- ios 程序中安装 描述文件
- STM32三种BOOT模式
- 火车售票系统/C语言
- 改变命运的21个黄金法则
- 由于找不到d3dx9_42.dll,无法继续执行代码。
- android笔记:长按APP图标弹出快捷方式(shortcuts)
- 【转】安卓知道:手机IMEI是什么?有什么作用?如何检查它
- python计算圆的体积_[宜配屋]听图阁
- 8路抢答器proteus仿真 2种电路图
- UG模具设计干货!内滑块设计细节
热门文章
- 终于等到了scilab 5.1.1
- mysql 主主+keepalive
- 发布一个mmap的trie_midrmm02_新浪博客
- 湖北文理学院数学与计算机科学学院,数学与计算机科学学院计算机协会十一月总结会议...
- android 480p分辨率,[RK3399][Android7.1] HDMI显示屏(副屏)调试记录小结
- (43) 讨论和通知
- 我的Android进阶之旅------gt;Android Studio 快捷键整理分享
- [LeetCode]238.Product of Array Except Self
- 为什么使用 Redis及其产品定位
- Git学习系列(六)解决分支冲突及分支管理策略