使用 Redis 如何实现延迟队列?
延迟消息队列在我们的日常工作中经常会被用到,比如支付系统中超过 30 分钟未支付的订单,将会被取消,这样就可以保证此商品库存可以释放给其他人购买,还有外卖系统如果商家超过 5 分钟未接单的订单,将会被自动取消,以此来保证用户可以更及时的吃到自己点的外卖,等等诸如此类的业务场景都需要使用到延迟消息队列,又因为它在业务中比较常见,因此这个知识点在面试中也会经常被问到。
我们本文的面试题是,使用 Redis 如何实现延迟消息队列?
典型回答
延迟消息队列的常见实现方式是通过 ZSet 的存储于查询来实现,它的核心思想是在程序中开启一个一直循环的延迟任务的检测器,用于检测和调用延迟任务的执行,如下图所示: ZSet 实现延迟任务的方式有两种,第一种是利用 zrangebyscore
查询符合条件的所有待处理任务,循环执行队列任务;第二种实现方式是每次查询最早的一条消息,判断这条信息的执行时间是否小于等于此刻的时间,如果是则执行此任务,否则继续循环检测。
方式一:zrangebyscore 查询所有任务 此实现方式是一次性查询出所有的延迟任务,然后再进行执行,实现代码如下:
import redis.clients.jedis.Jedis;
import utils.JedisUtils;import java.time.Instant;
import java.util.Set;/*** 延迟队列*/
public class DelayQueueExample {// zset keyprivate static final String _KEY = "myDelayQueue";public static void main(String[] args) throws InterruptedException {Jedis jedis = JedisUtils.getJedis();// 延迟 30s 执行(30s 后的时间)long delayTime = Instant.now().plusSeconds(30).getEpochSecond();jedis.zadd(_KEY, delayTime, "order_1");// 继续添加测试数据jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");// 开启延迟队列doDelayQueue(jedis);}/*** 延迟队列消费* @param jedis Redis 客户端*/public static void doDelayQueue(Jedis jedis) throws InterruptedException {while (true) {// 当前时间Instant nowInstant = Instant.now();long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); // 上一秒时间long nowSecond = nowInstant.getEpochSecond();// 查询当前时间的所有任务Set<String> data = jedis.zrangeByScore(_KEY, lastSecond, nowSecond);for (String item : data) {// 消费任务System.out.println("消费:" + item);}// 删除已经执行的任务jedis.zremrangeByScore(_KEY, lastSecond, nowSecond);Thread.sleep(1000); // 每秒轮询一次}}
}
以上程序执行结果如下:
消费:order2 消费:order3 消费:order4 消费:order5 消费:order_1
方式二:判断最早的任务 此实现方式是每次查询最早的一条任务,再与当前时间进行判断,如果任务执行时间大于当前时间则表示应该立即执行延迟任务,实现代码如下:
import redis.clients.jedis.Jedis;
import utils.JedisUtils;import java.time.Instant;
import java.util.Set;/*** 延迟队列*/
public class DelayQueueExample {// zset keyprivate static final String _KEY = "myDelayQueue";public static void main(String[] args) throws InterruptedException {Jedis jedis = JedisUtils.getJedis();// 延迟 30s 执行(30s 后的时间)long delayTime = Instant.now().plusSeconds(30).getEpochSecond();jedis.zadd(_KEY, delayTime, "order_1");// 继续添加测试数据jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");// 开启延迟队列doDelayQueue2(jedis);}/*** 延迟队列消费(方式 2)* @param jedis Redis 客户端*/public static void doDelayQueue2(Jedis jedis) throws InterruptedException {while (true) {// 当前时间long nowSecond = Instant.now().getEpochSecond();// 每次查询一条消息,判断此消息的执行时间Set<String> data = jedis.zrange(_KEY, 0, 0);if (data.size() == 1) {String firstValue = data.iterator().next();// 消息执行时间Double score = jedis.zscore(_KEY, firstValue);if (nowSecond >= score) {// 消费消息(业务功能处理)System.out.println("消费消息:" + firstValue);// 删除已经执行的任务jedis.zrem(_KEY, firstValue);}}Thread.sleep(100); // 执行间隔}}
}
以上程序执行结果和实现方式一相同,结果如下:
消费:order2 消费:order3 消费:order4 消费:order5 消费:order_1
其中,执行间隔代码 Thread.sleep(100)
可根据实际的业务情况删减或配置。
考点分析
延迟消息队列的实现方法有很多种,不同的公司可能使用的技术也是不同的,我上面是从 Redis 的角度出发来实现了延迟消息队列,但一般面试官不会就此罢休,会借着这个问题来问关于更多的延迟消息队列的实现方法,因此除了 Redis 实现延迟消息队列的方式,我们还需要具备一些其他的常见的延迟队列的实现方法。
和此知识点相关的面试题还有以下这些:
- 使用 Java 语言如何实现一个延迟消息队列?
- 你还知道哪些实现延迟消息队列的方法?
知识扩展
Java 中的延迟消息队列
我们可以使用 Java 语言中自带的 DelayQueue 数据类型来实现一个延迟消息队列,实现代码如下:
public class DelayTest {public static void main(String[] args) throws InterruptedException {DelayQueue delayQueue = new DelayQueue();delayQueue.put(new DelayElement(1000));delayQueue.put(new DelayElement(3000));delayQueue.put(new DelayElement(5000));System.out.println("开始时间:" + DateFormat.getDateTimeInstance().format(new Date()));while (!delayQueue.isEmpty()){System.out.println(delayQueue.take());}System.out.println("结束时间:" + DateFormat.getDateTimeInstance().format(new Date()));}static class DelayElement implements Delayed {// 延迟截止时间(单面:毫秒)long delayTime = System.currentTimeMillis();public DelayElement(long delayTime) {this.delayTime = (this.delayTime + delayTime);}@Override// 获取剩余时间public long getDelay(TimeUnit unit) {return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Override// 队列里元素的排序依据public int compareTo(Delayed o) {if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {return 1;} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {return -1;} else {return 0;}}@Overridepublic String toString() {return DateFormat.getDateTimeInstance().format(new Date(delayTime));}}
}
以上程序执行的结果如下:
开始时间:2019-6-13 20:40:38 2019-6-13 20:40:39 2019-6-13 20:40:41 2019-6-13 20:40:43 结束时间:2019-6-13 20:40:43
此实现方式的优点是开发比较方便,可以直接在代码中使用,实现代码也比较简单,但它缺点是数据保存在内存中,因此可能存在数据丢失的风险,最大的问题是它无法支持分布式系统。
使用 MQ 实现延迟消息队列
我们使用主流的 MQ 中间件也可以方便的实现延迟消息队列的功能,比如 RabbitMQ,我们可以通过它的 rabbitmq-delayed-message-exchange 插件来实现延迟队列。
首先我们需要配置并开启 rabbitmq-delayed-message-exchange 插件,然后再通过以下代码来实现延迟消息队列。
配置消息队列:
import com.example.rabbitmq.mq.DirectConfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;@Configuration
public class DelayedConfig {final static String QUEUE_NAME = "delayed.goods.order";final static String EXCHANGE_NAME = "delayedec";@Beanpublic Queue queue() {return new Queue(DelayedConfig.QUEUE_NAME);}// 配置默认的交换机@BeanCustomExchange customExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");//参数二为类型:必须是 x-delayed-messagereturn new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);}// 绑定队列到交换器@BeanBinding binding(Queue queue, CustomExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();}
}
发送者实现代码如下:
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;@Component
public class DelayedSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(String msg) {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("发送时间:" + sf.format(new Date()));rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setHeader("x-delay", 3000);return message;}});}
}
从上述代码我们可以看出,我们配置 3s 之后再进行任务执行。
消费者实现代码如下:
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;@Component
@RabbitListener(queues = "delayed.goods.order")
public class DelayedReceiver {@RabbitHandlerpublic void process(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("接收时间:" + sdf.format(new Date()));System.out.println("消息内容:" + msg);}
}
测试代码如下:
import com.example.rabbitmq.RabbitmqApplication;
import com.example.rabbitmq.mq.delayed.DelayedSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.text.SimpleDateFormat;
import java.util.Date;@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayedTest {@Autowiredprivate DelayedSender sender;@Testpublic void Test() throws InterruptedException {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");sender.send("Hi Admin.");Thread.sleep(5 * 1000); //等待接收程序执行之后,再退出测试}
}
以上程序的执行结果为:
发送时间:2020-06-11 20:47:51 接收时间:2018-06-11 20:47:54 消息内容:Hi Admin.
从上述结果中可以看出,当消息进入延迟队列 3s 之后才被正常消费,执行结果符合我的预期,RabbitMQ 成功的实现了延迟消息队列。
总结
本文我们讲了延迟消息队列的两种使用场景:支付系统中的超过 30 分钟未支付的订单,将会被自动取消,以此来保证此商品的库存可以正常释放给其他人购买,还有外卖系统如果商家超过 5 分钟未接单的订单,将会被自动取消,以此来保证用户可以更及时的吃到自己点的外卖。并且我们讲了延迟队列的 4 种实现方式,使用 ZSet 的 2 种实现方式,以及 Java 语言中的 DelayQueue 的实现方式,还有 RabbitMQ 的插件 rabbitmq-delayed-message-exchange 的实现方式。
使用 Redis 如何实现延迟队列?相关推荐
- redis延迟队列 实现_灵感来袭,基于Redis的分布式延迟队列(续)
背景 上一篇(灵感来袭,基于Redis的分布式延迟队列)讲述了基于Java DelayQueue和Redis实现了分布式延迟队列,这种方案实现比较简单,应用于延迟小,消息量不大的场景是没问题的,毕竟J ...
- 你知道Redis可以实现延迟队列吗?
作者:_BKing 来源:www.cnblogs.com/xiaowei123/p/13222710.html 最近,又重新学习了下Redis,深深被Redis的魅力所折服,Redis不仅能快还能慢( ...
- 你知道 Redis 可以实现延迟队列吗?
点击上方 好好学java ,选择 星标 公众号 重磅资讯.干货,第一时间送达 今日推荐:硬刚一周,3W字总结,一年的经验告诉你如何准备校招! 个人原创100W+访问量博客:点击前往,查看更多 作者:_ ...
- redis延迟队列 实现_灵感来袭,基于Redis的分布式延迟队列
一.延迟队列 延迟队列,也就是一定时间之后将消息体放入队列,然后消费者才能正常消费.比如1分钟之后发送短信,发送邮件,检测数据状态等. 二.Redisson Delayed Queue 如果你项目中使 ...
- 你真的知道怎么实现一个延迟队列吗 ?
作者:xiewang,腾讯 IEG 运营开发工程师 前言 延迟队列是我们日常开发过程中,经常接触并需要使用到的一种技术方案.前些时间在开发业务需求时,我也遇到了一个需要使用到延迟消息队列的需求场景,因 ...
- 你真的知道怎么实现一个延迟队列吗?
原文地址:https://mp.weixin.qq.com/s/jL8_23pjYWV74rsjoWNPWg 目录 前言 延迟队列定义 应用场景 实现方案 Redis zset TimeWheel 时 ...
- Redis 实现延迟队列?深深被折服!!
作者:_BKing 地址:www.cnblogs.com/xiaowei123/p/13222710.html 最近,又重新学习了下Redis,深深被Redis的魅力所折服,Redis不仅能快还能慢( ...
- 如何用 Redis 实现延迟队列?
如何用 Redis 实现延迟队列? - 前言 - 我们都知道Redis是一种基于内存的单进程单线程数据库(Redis6.0开始之后支持多线程啦! ),处理速度都非常快. 那么为何Redis又能慢呢? ...
- redis延迟队列 实现_php使用redis的有序集合zset实现延迟队列
延迟队列就是个带延迟功能的消息队列,相对于普通队列,它可以在指定时间消费掉消息. 延迟队列的应用场景: 1.新用户注册,10分钟后发送邮件或站内信. 2.用户下单后,30分钟未支付,订单自动作废. 我 ...
最新文章
- 详解BLE 空中包格式—兼BLE Link layer协议解析
- 爬取知乎“凡尔赛语录”话题下的所有回答,我知道点开看你的很帅气,但还是没我帅
- 计算机如何查找目标,如何使用命令行查找计算机地理位置? | MOS86
- 【HDU - 1045】Fire Net (dfs 或二分图)
- 一、bootstrap4基础(布局系统、栅格系统、显示与隐藏、对齐与排列、内容排版、代码与图文、表格样式、颜色和边框、工具类)
- python元组为什么不可变_为什么python字符串和元组是不可变的?
- Visio2007 与Microsoft Studio 2008不兼容
- 2 环境设置_用友U8V10.1安装(Windows 7环境)
- 图片预加载插件 preLoad.js
- java功能模块_Java 14功能
- 四数之和 leetcode
- 第五十五节,IO多路复用select模块加socket模块,伪多线并发
- php源码 gd,php 源码安装没有gd库
- Android安装应用后点击打开(Open)带来的问题及解决方式
- c++ windows console 快速编辑模式 关闭
- 360,一场阴谋的制造者
- 给定一个整型变量a,写两段代码,第一个设置a的bit 3,第二个清除a 的bit 3。在以上两个操作中,要保持其它位不变。
- ubuntu开启键盘背光灯
- 用Python做单变量数据集的异常点分析
- 一些不错的GI的资料链接