一、背景

延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。
那么,为什么需要延迟消费呢?我们来看以下的场景:

订单业务: 在电商/点餐中,都有下单后 30 分钟内没有付款,就自动取消订单。
短信通知: 下单成功后 60s 之后给用户发送短信通知。
失败重试: 业务操作失败后,间隔一定的时间进行失败重试。

传统订单处理:

采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。

当然传统的手法还可以再优化一下,即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,然后再做其他的业务操作。

但是通过定时任务来执行,实时性始终不是很好。

二、实现方案

方案一:通过死信队列实现

参考:https://www.cnblogs.com/xmf3628/p/12097101.html

方案二:通过延迟路由插件来实现rabbitmq-delayed-message-exchange

参考:https://www.cnblogs.com/wintercloud/p/10877399.html

这里演示通过插件来实现

三、插件安装

下载地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0

RabbitMQ的有些插件没有集成在初始的安装中,它们需要额外安装,这些文件的后缀为.ez,安装时需要将.ez文件拷贝到安装的插件目录。以下是不同系统中默认安装的插件目录路径:

//查看已安装的插件
rabbitmq-plugins list
//启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
//重启服务
service rabbitmq-server restart
//再次查看,插件是否生效
rabbitmq-plugins list

说明插件安装成功,并成功启用。

四、机制解释

安装插件后会生成新的Exchange类型x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。

五、代码实战

1、依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.44</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>

2、队列连接信息

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/

3、生产者代码

声明延迟队列和延迟队列的交换器,并建立绑定关系

@Configuration
public class TestDelayQueueConfig {@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<String, Object>();args.put("x-delayed-type", "direct");return new CustomExchange(ExchangeEnum.DELAY_EXCHANGE.getValue(), "x-delayed-message", true, false, args);}/*** 延迟消息队列* @return*/@Beanpublic Queue delayQueue() {return new Queue(QueueEnum.TEST_DELAY.getName(), true);}@Beanpublic Binding deplyBinding() {return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(QueueEnum.TEST_DELAY.getRoutingKey()).noargs();}
}
@Getter
public enum ExchangeEnum {DELAY_EXCHANGE("test.deply.exchange");private String value;ExchangeEnum(String value) {this.value = value;}
}
@Getter
public enum QueueEnum {/*** delay*/TEST_DELAY("test.delay.queue", "delay");/*** 队列名称*/private String name;/*** 队列路由键*/private String routingKey;QueueEnum(String name, String routingKey) {this.name = name;this.routingKey = routingKey;}
}

延迟队列生产者服务:

@Component
@Slf4j
public class DeplyProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(String msg, int delayTime) {log.info("msg= " + msg + ".delayTime" + delayTime);MessageProperties messageProperties = new MessageProperties();messageProperties.setDelay(delayTime);Message message = new Message(msg.getBytes(), messageProperties);rabbitTemplate.send(ExchangeEnum.DELAY_EXCHANGE.getValue(), QueueEnum.TEST_DELAY.getRoutingKey(), message);}
}

单元测试:

@Test
public void sendDeplyMsgTest() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String currentTime = sdf.format(new Date());log.info("发送测试消息的时间:" +currentTime );deplyProducer.send(currentTime + "发送一个测试消息,延迟10秒", 10000);//10秒deplyProducer.send(currentTime + "发送一个测试消息,延迟20秒", 20000);//2秒deplyProducer.send(currentTime + "发送一个测试消息,延迟30秒", 30000);//1秒
}

消费者:

@Component
@RabbitListener(queues = "test.delay.queue")
@Slf4j
public class DeplyConsumer {@RabbitHandlerpublic void onMessage(byte[] message,@Headers Map<String, Object> headers,Channel channel) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//log.info("收到延时消息时间:" + sdf.format(new Date()) + " Delay sent.");log.info(sdf.format(new Date())+"接收到延时消息:" + new String(message));}
}

执行测试结果:

执行消息发送代码的时候,不会立刻把消息推送到对应的队列中。
只要到了对应的时候,才会将消息推送到队列里面。

可以在执行单元测试的时候,通过管理界面,观察队列中消息的增长:
每隔10s,增加1条。

更多精彩,关注我吧。

Spring boot + RabbitMQ延迟队列实战相关推荐

  1. spring boot + rabbitMq整合之死信队列(DL)

    rabbit mq 死信队列 什么是死信队列? DL-Dead Letter 死信队列 死信,在官网中对应的单词为"Dead Letter",可以看出翻译确实非常的简单粗暴.那么死 ...

  2. Delayed Message 插件实现 RabbitMQ 延迟队列

    延迟队列是为了存放那些延迟执行的消息,待消息过期之后消费端从队列里拿出来执行. DLX + TTL 方式存在的时序问题 对于延迟队列不管是 AMQP 协议或者 RabbitMQ 本身是不支持的,之前有 ...

  3. rabbitmq 延迟队列_Delayed Message 插件实现 RabbitMQ 延迟队列

    延迟队列是为了存放那些延迟执行的消息,待消息过期之后消费端从队列里拿出来执行. DLX + TTL 方式存在的时序问题 对于延迟队列不管是 AMQP 协议或者 RabbitMQ 本身是不支持的,之前有 ...

  4. RabbitMQ 延迟队列-对于入门来说可以快速上手

    RabbitMQ 延迟队列-非常非常实用 RabbitMQ 延迟队列-非常非常实用 一.使用场景 二.消息延迟推送的实现 三.项目具体实现 RabbitMQ 延迟队列-非常非常实用 一.使用场景 ​ ...

  5. 谷粒商城笔记+踩坑(22)——库存自动解锁。RabbitMQ延迟队列

    导航: 谷粒商城笔记+踩坑汇总篇 目录 1 业务流程,订单失败后自动回滚解锁库存 可靠消息+最终一致性方案 2[仓库服务]RabbitMQ环境准备 2.1 导入依赖 2.2 yml配置RabbitMQ ...

  6. RabbitMQ 延迟队列实现定时任务的正确姿势,你学会了么?

    以下文章来源方志朋的博客,回复"666"获面试宝典 场景 开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期.订单定时关闭.微信支付2小时未支付关闭订单等 ...

  7. Spring Boot 单元测试详解+实战教程

    转载自   Spring Boot 单元测试详解+实战教程 Spring Boot 的测试类库 Spring Boot 提供了许多实用工具和注解来帮助测试应用程序,主要包括以下两个模块. spring ...

  8. Spring boot Rabbitmq 示例

    Spring boot Rabbitmq 示例 简介     Spring boot RabbitMQ 简单程序示例 编写详情 RabbitMQ docker     避免麻烦,直接使用docker启 ...

  9. Spring Boot Vue Element入门实战(完结)

    最近给朋友做一个大学运动会管理系统,用作教学案例,正好自己也在自学VUE,决定用spring boot vue做一个简单的系统.vue这个前端框架很火,他和传统的Jquery 编程思路完全不一样,Jq ...

最新文章

  1. 计算机里面有鬼的恐怖游戏,2018年度十大最佳PC恐怖游戏
  2. 免费教材丨第56期:《深度学习导论及案例分析》、《谷歌黑板报-数学之美》
  3. MVC验证05-自定义验证规则、验证2个属性值不等
  4. 科学计算机要用的电池是几号,科学计算器的常识及注意事项
  5. php商品数量怎么用js,如何使用js统计页面标签数量
  6. 像烟灰一样松散(毕淑敏)
  7. [TJOI2010]阅读理解
  8. java 枚举 示例_Java枚举name()方法及示例
  9. gRPC服务注册发现及负载均衡的实现方案与源码解析
  10. idea中新建javaWeb项目
  11. 字符串的经典hash算法
  12. 广州.Net俱乐部第二次聚会报道
  13. 【现代编译器】语法分析——正则表达式,上下文无关文法,递归下降分析,分析树...
  14. vc中format用法以及c++中Format用法
  15. 系统集成项目管理工程师思维导图
  16. 工具说明书 - 滚动截屏和录屏软件ShareX
  17. Linux有问必答:如何在Linux命令行中刻录ISO或NRG镜像到DVD
  18. 复制神器Ditto使用方法详细说明
  19. 浅谈工程总承包项目WBS的重要性与创建方法
  20. Meta Learning在NLP领域的应用

热门文章

  1. 数学分析_导函数连续问题分析
  2. SAP HR 年休假调整:从统休制到实休制,在一个自然年度内分段式生成年休假
  3. 聚簇索引(Clustered Index)和非聚簇索引 (Non- Clustered Index)
  4. Matlab绘制频率特性
  5. c语言restrict,一个c语言关键字restrict例子的疑问
  6. python提取邮件内容和附件(草稿版)
  7. 如何在NVMe SSD上安装Win7?手把手教你
  8. 创建日志表和插日志信息的sp
  9. html中colspan属性含义,colspan属性, 要如何用
  10. uboot成功移植到STM32F103ZET6(一)