延迟消息队列在我们的日常工作中经常会被用到,比如支付系统中超过 30 分钟未支付的订单,将会被取消,这样就可以保证此商品库存可以释放给其他人购买,还有外卖系统如果商家超过 5 分钟未接单的订单,将会被自动取消,以此来保证用户可以更及时的吃到自己点的外卖,等等诸如此类的业务场景都需要使用到延迟消息队列,又因为它在业务中比较常见,因此这个知识点在面试中也会经常被问到。

我们本文的面试题是,使用 Redis 如何实现延迟消息队列?

典型回答

延迟消息队列的常见实现方式是通过 ZSet 的存储于查询来实现,它的核心思想是在程序中开启一个一直循环的延迟任务的检测器,用于检测和调用延迟任务的执行,如下图所示: ZSet 实现延迟任务的方式有两种,第一种是利用 zrangebyscore 查询符合条件的所有待处理任务,循环执行队列任务;第二种实现方式是每次查询最早的一条消息,判断这条信息的执行时间是否小于等于此刻的时间,如果是则执行此任务,否则继续循环检测。

方式一:zrangebyscore 查询所有任务 此实现方式是一次性查询出所有的延迟任务,然后再进行执行,实现代码如下:

import redis.clients.jedis.Jedis;
import utils.JedisUtils;import java.time.Instant;
import java.util.Set;/*** 延迟队列*/
public class DelayQueueExample {// zset keyprivate static final String _KEY = "myDelayQueue";public static void main(String[] args) throws InterruptedException {Jedis jedis = JedisUtils.getJedis();// 延迟 30s 执行(30s 后的时间)long delayTime = Instant.now().plusSeconds(30).getEpochSecond();jedis.zadd(_KEY, delayTime, "order_1");// 继续添加测试数据jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");// 开启延迟队列doDelayQueue(jedis);}/*** 延迟队列消费* @param jedis Redis 客户端*/public static void doDelayQueue(Jedis jedis) throws InterruptedException {while (true) {// 当前时间Instant nowInstant = Instant.now();long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); // 上一秒时间long nowSecond = nowInstant.getEpochSecond();// 查询当前时间的所有任务Set<String> data = jedis.zrangeByScore(_KEY, lastSecond, nowSecond);for (String item : data) {// 消费任务System.out.println("消费:" + item);}// 删除已经执行的任务jedis.zremrangeByScore(_KEY, lastSecond, nowSecond);Thread.sleep(1000); // 每秒轮询一次}}
}

以上程序执行结果如下:

消费:order2 消费:order3 消费:order4 消费:order5 消费:order_1

方式二:判断最早的任务 此实现方式是每次查询最早的一条任务,再与当前时间进行判断,如果任务执行时间大于当前时间则表示应该立即执行延迟任务,实现代码如下:

import redis.clients.jedis.Jedis;
import utils.JedisUtils;import java.time.Instant;
import java.util.Set;/*** 延迟队列*/
public class DelayQueueExample {// zset keyprivate static final String _KEY = "myDelayQueue";public static void main(String[] args) throws InterruptedException {Jedis jedis = JedisUtils.getJedis();// 延迟 30s 执行(30s 后的时间)long delayTime = Instant.now().plusSeconds(30).getEpochSecond();jedis.zadd(_KEY, delayTime, "order_1");// 继续添加测试数据jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");// 开启延迟队列doDelayQueue2(jedis);}/*** 延迟队列消费(方式 2)* @param jedis Redis 客户端*/public static void doDelayQueue2(Jedis jedis) throws InterruptedException {while (true) {// 当前时间long nowSecond = Instant.now().getEpochSecond();// 每次查询一条消息,判断此消息的执行时间Set<String> data = jedis.zrange(_KEY, 0, 0);if (data.size() == 1) {String firstValue = data.iterator().next();// 消息执行时间Double score = jedis.zscore(_KEY, firstValue);if (nowSecond >= score) {// 消费消息(业务功能处理)System.out.println("消费消息:" + firstValue);// 删除已经执行的任务jedis.zrem(_KEY, firstValue);}}Thread.sleep(100); // 执行间隔}}
}

以上程序执行结果和实现方式一相同,结果如下:

消费:order2 消费:order3 消费:order4 消费:order5 消费:order_1

其中,执行间隔代码 Thread.sleep(100) 可根据实际的业务情况删减或配置。

考点分析

延迟消息队列的实现方法有很多种,不同的公司可能使用的技术也是不同的,我上面是从 Redis 的角度出发来实现了延迟消息队列,但一般面试官不会就此罢休,会借着这个问题来问关于更多的延迟消息队列的实现方法,因此除了 Redis 实现延迟消息队列的方式,我们还需要具备一些其他的常见的延迟队列的实现方法。

和此知识点相关的面试题还有以下这些:

  • 使用 Java 语言如何实现一个延迟消息队列?
  • 你还知道哪些实现延迟消息队列的方法?

知识扩展

Java 中的延迟消息队列

我们可以使用 Java 语言中自带的 DelayQueue 数据类型来实现一个延迟消息队列,实现代码如下:

public class DelayTest {public static void main(String[] args) throws InterruptedException {DelayQueue delayQueue = new DelayQueue();delayQueue.put(new DelayElement(1000));delayQueue.put(new DelayElement(3000));delayQueue.put(new DelayElement(5000));System.out.println("开始时间:" +  DateFormat.getDateTimeInstance().format(new Date()));while (!delayQueue.isEmpty()){System.out.println(delayQueue.take());}System.out.println("结束时间:" +  DateFormat.getDateTimeInstance().format(new Date()));}static class DelayElement implements Delayed {// 延迟截止时间(单面:毫秒)long delayTime = System.currentTimeMillis();public DelayElement(long delayTime) {this.delayTime = (this.delayTime + delayTime);}@Override// 获取剩余时间public long getDelay(TimeUnit unit) {return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Override// 队列里元素的排序依据public int compareTo(Delayed o) {if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {return 1;} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {return -1;} else {return 0;}}@Overridepublic String toString() {return DateFormat.getDateTimeInstance().format(new Date(delayTime));}}
}

以上程序执行的结果如下:

开始时间:2019-6-13 20:40:38 2019-6-13 20:40:39 2019-6-13 20:40:41 2019-6-13 20:40:43 结束时间:2019-6-13 20:40:43

此实现方式的优点是开发比较方便,可以直接在代码中使用,实现代码也比较简单,但它缺点是数据保存在内存中,因此可能存在数据丢失的风险,最大的问题是它无法支持分布式系统。

使用 MQ 实现延迟消息队列

我们使用主流的 MQ 中间件也可以方便的实现延迟消息队列的功能,比如 RabbitMQ,我们可以通过它的 rabbitmq-delayed-message-exchange 插件来实现延迟队列。

首先我们需要配置并开启 rabbitmq-delayed-message-exchange 插件,然后再通过以下代码来实现延迟消息队列。

配置消息队列:

import com.example.rabbitmq.mq.DirectConfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;@Configuration
public class DelayedConfig {final static String QUEUE_NAME = "delayed.goods.order";final static String EXCHANGE_NAME = "delayedec";@Beanpublic Queue queue() {return new Queue(DelayedConfig.QUEUE_NAME);}// 配置默认的交换机@BeanCustomExchange customExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");//参数二为类型:必须是 x-delayed-messagereturn new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);}// 绑定队列到交换器@BeanBinding binding(Queue queue, CustomExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();}
}

发送者实现代码如下:

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;@Component
public class DelayedSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(String msg) {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("发送时间:" + sf.format(new Date()));rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setHeader("x-delay", 3000);return message;}});}
}

从上述代码我们可以看出,我们配置 3s 之后再进行任务执行。

消费者实现代码如下:

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;@Component
@RabbitListener(queues = "delayed.goods.order")
public class DelayedReceiver {@RabbitHandlerpublic void process(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("接收时间:" + sdf.format(new Date()));System.out.println("消息内容:" + msg);}
}

测试代码如下:

import com.example.rabbitmq.RabbitmqApplication;
import com.example.rabbitmq.mq.delayed.DelayedSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.text.SimpleDateFormat;
import java.util.Date;@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayedTest {@Autowiredprivate DelayedSender sender;@Testpublic void Test() throws InterruptedException {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");sender.send("Hi Admin.");Thread.sleep(5 * 1000); //等待接收程序执行之后,再退出测试}
}

以上程序的执行结果为:

发送时间:2020-06-11 20:47:51 接收时间:2018-06-11 20:47:54 消息内容:Hi Admin.

从上述结果中可以看出,当消息进入延迟队列 3s 之后才被正常消费,执行结果符合我的预期,RabbitMQ 成功的实现了延迟消息队列。

总结

本文我们讲了延迟消息队列的两种使用场景:支付系统中的超过 30 分钟未支付的订单,将会被自动取消,以此来保证此商品的库存可以正常释放给其他人购买,还有外卖系统如果商家超过 5 分钟未接单的订单,将会被自动取消,以此来保证用户可以更及时的吃到自己点的外卖。并且我们讲了延迟队列的 4 种实现方式,使用 ZSet 的 2 种实现方式,以及 Java 语言中的 DelayQueue 的实现方式,还有 RabbitMQ 的插件 rabbitmq-delayed-message-exchange 的实现方式。

使用 Redis 如何实现延迟队列?相关推荐

  1. redis延迟队列 实现_灵感来袭,基于Redis的分布式延迟队列(续)

    背景 上一篇(灵感来袭,基于Redis的分布式延迟队列)讲述了基于Java DelayQueue和Redis实现了分布式延迟队列,这种方案实现比较简单,应用于延迟小,消息量不大的场景是没问题的,毕竟J ...

  2. 你知道Redis可以实现延迟队列吗?

    作者:_BKing 来源:www.cnblogs.com/xiaowei123/p/13222710.html 最近,又重新学习了下Redis,深深被Redis的魅力所折服,Redis不仅能快还能慢( ...

  3. 你知道 Redis 可以实现延迟队列吗?

    点击上方 好好学java ,选择 星标 公众号 重磅资讯.干货,第一时间送达 今日推荐:硬刚一周,3W字总结,一年的经验告诉你如何准备校招! 个人原创100W+访问量博客:点击前往,查看更多 作者:_ ...

  4. redis延迟队列 实现_灵感来袭,基于Redis的分布式延迟队列

    一.延迟队列 延迟队列,也就是一定时间之后将消息体放入队列,然后消费者才能正常消费.比如1分钟之后发送短信,发送邮件,检测数据状态等. 二.Redisson Delayed Queue 如果你项目中使 ...

  5. 你真的知道怎么实现一个延迟队列吗 ?

    作者:xiewang,腾讯 IEG 运营开发工程师 前言 延迟队列是我们日常开发过程中,经常接触并需要使用到的一种技术方案.前些时间在开发业务需求时,我也遇到了一个需要使用到延迟消息队列的需求场景,因 ...

  6. 你真的知道怎么实现一个延迟队列吗?

    原文地址:https://mp.weixin.qq.com/s/jL8_23pjYWV74rsjoWNPWg 目录 前言 延迟队列定义 应用场景 实现方案 Redis zset TimeWheel 时 ...

  7. Redis 实现延迟队列?深深被折服!!

    作者:_BKing 地址:www.cnblogs.com/xiaowei123/p/13222710.html 最近,又重新学习了下Redis,深深被Redis的魅力所折服,Redis不仅能快还能慢( ...

  8. 如何用 Redis 实现延迟队列?

    如何用 Redis 实现延迟队列? - 前言 - 我们都知道Redis是一种基于内存的单进程单线程数据库(Redis6.0开始之后支持多线程啦! ),处理速度都非常快. 那么为何Redis又能慢呢? ...

  9. redis延迟队列 实现_php使用redis的有序集合zset实现延迟队列

    延迟队列就是个带延迟功能的消息队列,相对于普通队列,它可以在指定时间消费掉消息. 延迟队列的应用场景: 1.新用户注册,10分钟后发送邮件或站内信. 2.用户下单后,30分钟未支付,订单自动作废. 我 ...

最新文章

  1. 详解BLE 空中包格式—兼BLE Link layer协议解析
  2. 爬取知乎“凡尔赛语录”话题下的所有回答,我知道点开看你的很帅气,但还是没我帅
  3. 计算机如何查找目标,如何使用命令行查找计算机地理位置? | MOS86
  4. 【HDU - 1045】Fire Net (dfs 或二分图)
  5. 一、bootstrap4基础(布局系统、栅格系统、显示与隐藏、对齐与排列、内容排版、代码与图文、表格样式、颜色和边框、工具类)
  6. python元组为什么不可变_为什么python字符串和元组是不可变的?
  7. Visio2007 与Microsoft Studio 2008不兼容
  8. 2 环境设置_用友U8V10.1安装(Windows 7环境)
  9. 图片预加载插件 preLoad.js
  10. java功能模块_Java 14功能
  11. 四数之和 leetcode
  12. 第五十五节,IO多路复用select模块加socket模块,伪多线并发
  13. php源码 gd,php 源码安装没有gd库
  14. Android安装应用后点击打开(Open)带来的问题及解决方式
  15. c++ windows console 快速编辑模式 关闭
  16. 360,一场阴谋的制造者
  17. 给定一个整型变量a,写两段代码,第一个设置a的bit 3,第二个清除a 的bit 3。在以上两个操作中,要保持其它位不变。
  18. ubuntu开启键盘背光灯
  19. 用Python做单变量数据集的异常点分析
  20. 一些不错的GI的资料链接

热门文章

  1. SQLPlus命令详细说明
  2. 日常问题——阿里云服务器ssh经常一段时间就断掉解决办法
  3. Windows2003如何安装IIS和ftp
  4. Javascript Proxy对象 简介
  5. J2EE基础之Web服务简介
  6. ntfs安全权限和共享权限的区别
  7. DataGridview动态添加列
  8. java(安全方便的从控制台读入数据)[对Scanner类进行封装,用正则表达式判断]...
  9. 业务单号自动增长的处理办法
  10. Mybatis源码阅读(一):Mybatis初始化1.1 解析properties、settings