在我们的各个项目中,经常会有这样的需求。

  • 订单模块:在订单下单后30分钟如果没有付款,就自动取消订单,

  • 短信模块:在下单成功后60s给用户发送短信通知

  • 支付模块:在微信/支付宝支付成功后,1分钟后去调用上游接口检查订单有没有支付成功

实现这种需求的方式有几种。

一种比较笨的方式是采用定时任务,轮训数据库,方法简单好用,但性能低下,在高并发情况下容易弄死数据库,间隔时间不好设置,时间过大,影响精度,过小影响性能,而且做不到按超时的时间顺序处理。

还有一种是利用redis监听过期key,给key设置过期时间,在key失效以后处理失效后的逻辑实现上述需求。这种实现有一个问题就是当项目是单机项目,在项目部署的时候,刚好redis的key失效了,就会存在消息丢失。

rabbitmq实现延时队列:

`RabbitMQ队列本身是没有直接实现支持延迟队列的功能,但可以通过它的Time-To-Live Extensions 与 Dead Letter Exchange 的特性模拟出延迟队列的功能。

Time-To-Live Extensions

RabbitMQ支持为队列或者消息设置TTL(time to live 存活时间)。TTL表明了一条消息可在队列中存活的最大时间。当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在TTL时间后死亡成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。

Dead Letter Exchange

死信交换机,上文中提到设置了 TTL 的消息或队列最终会成为Dead Letter。如果为队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新发送到Dead Letter Exchange中,然后通过Dead Letter Exchange路由到其他队列,即可实现延迟队列的功能。

SpringBoot中实现RabbitMQ延时队列

1.导入依赖

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>

2.配置文件

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest

3.队列配置类

@Configuration
@Slf4j
public class DelayRabbitConfig {/*** 延迟队列 TTL 名称*/private static final String ORDER_DELAY_QUEUE = "user.order.delay.queue";/*** DLX,dead letter发送到的exchange* 延时消息就是发送到该交换机的*/public static final String ORDER_DELAY_EXCHANGE = "user.order.delay.exchange";/*** routing key 名称* 具体消息发送在该 routingKey 的*/public static final String ORDER_DELAY_ROUTING_KEY = "order_delay";
​public static final String ORDER_QUEUE_NAME = "user.order.queue";public static final String ORDER_EXCHANGE_NAME = "user.order.exchange";public static final String ORDER_ROUTING_KEY = "order";
​/*** 延迟队列配置* <p>* 1、params.put("x-message-ttl", 5 * 1000);* 第一种方式是直接设置 Queue 延迟时间 但如果直接给队列设置过期时间,这种做法不是很灵活,(当然二者是兼容的,默认是时间小的优先)* 2、rabbitTemplate.convertAndSend(book, message -> {* message.getMessageProperties().setExpiration(2 * 1000 + "");* return message;* });* 第二种就是每次发送消息动态设置延迟时间,这样我们可以灵活控制**/@Beanpublic Queue delayOrderQueue() {Map<String, Object> params = new HashMap<>(2);// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,params.put("x-dead-letter-exchange", ORDER_EXCHANGE_NAME);// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。params.put("x-dead-letter-routing-key", ORDER_ROUTING_KEY);return new Queue(ORDER_DELAY_QUEUE, true, false, false, params);}
​/*** 需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。* 这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,* 不会转发dog.puppy,也不会转发dog.guard,只会转发dog。** @return DirectExchange*/@Beanpublic DirectExchange orderDelayExchange() {return new DirectExchange(ORDER_DELAY_EXCHANGE);}
​@Beanpublic Binding dlxBinding() {return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY);}
​@Beanpublic Queue orderQueue() {return new Queue(ORDER_QUEUE_NAME, true);}
​/*** 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。* 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。* 因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。**/@Beanpublic TopicExchange orderTopicExchange() {return new TopicExchange(ORDER_EXCHANGE_NAME);}
​@Beanpublic Binding orderBinding() {// TODO 如果要让延迟队列之间有关联,这里的 routingKey 和 绑定的交换机很关键return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(ORDER_ROUTING_KEY);}
​
}

4.订单实体

@Data
public class Order implements Serializable {private String orderId;private String name;/*** 订单状态:0-未支付 1-已支付 2-订单已取消*/private Integer orderStatus;
}
​

5.消息发送者

@Component
@Slf4j
public class DelaySender {
​@Autowiredprivate AmqpTemplate amqpTemplate;
​public void sendDelay(Order order) {log.info("【订单生成时间】" + new Date().toString() + "【1分钟后检查订单是否已经支付】" + order.toString());this.amqpTemplate.convertAndSend(DelayRabbitConfig.ORDER_DELAY_EXCHANGE, DelayRabbitConfig.ORDER_DELAY_ROUTING_KEY, order, message -> {// 如果配置了 params.put("x-message-ttl", 5 * 1000);// 那么这一句也可以省略,具体根据业务需要是声明 Queue 的时候就指定好延迟时间还是在发送自己控制时间message.getMessageProperties().setExpiration(1000 * 60 + "");return message;});}
}
​

6.消息接收者

@Component
@Slf4j
public class DelayReceiver {
​@RabbitListener(queues = {DelayRabbitConfig.ORDER_QUEUE_NAME})public void orderDelayQueue(Order order, Message message, Channel channel) {log.info("---------------------------------------------");log.info("【orderDelayQueue 监听的消息】 - 【消费时间】 - [{}]- 【订单内容】 - [{}]", new Date(), order.toString());if (order.getOrderStatus() == 0) {order.setOrderStatus(2);log.info("【该订单未支付,取消订单】" + order.toString());} else if (order.getOrderStatus() == 1) {log.info("【该订单已完成支付】");} else if (order.getOrderStatus() == 2) {log.info("【该订单已取消】");}log.info("---------------------------------------------");
​}
}

7.测试

@RestController
public class TestController {@Autowiredprivate DelaySender delaySender;
​@GetMapping("/sendDelay")public Object sendDelay() {Order order1 = new Order();order1.setOrderStatus(0);order1.setOrderId("2019110874129");order1.setName("switch");
​Order order2 = new Order();order2.setOrderStatus(1);order2.setOrderId("2019110874215");order2.setName("PS4 pro");
​delaySender.sendDelay(order1);delaySender.sendDelay(order2);return "success";}
​
}

测试结果如下:

2019-11-08 14:29:01.544  INFO 44447 --- [nio-8080-exec-1] c.y.study.rabbitmqdeadqueue.DelaySender  : 【订单生成时间】Fri Nov 08 14:29:01 CST 2019【1分钟后检查订单是否已经支付】Order(orderId=2019110874129, orderStatus=0, orderName=switch)
2019-11-08 14:29:01.689  INFO 44447 --- [nio-8080-exec-1] c.y.study.rabbitmqdeadqueue.DelaySender  : 【订单生成时间】Fri Nov 08 14:29:01 CST 2019【1分钟后检查订单是否已经支付】Order(orderId=2019110874215, orderStatus=1, orderName=PS4 pro)
2019-11-08 14:30:01.723  INFO 44447 --- [ntContainer#0-1] c.y.s.rabbitmqdeadqueue.DelayReceiver    : ---------------------------------------------
2019-11-08 14:30:01.724  INFO 44447 --- [ntContainer#0-1] c.y.s.rabbitmqdeadqueue.DelayReceiver    : 【orderDelayQueue 监听的消息】 - 【消费时间】 - [Fri Nov 08 14:30:01 CST 2019]- 【订单内容】 - [Order(orderId=2019110874129, orderStatus=0, orderName=switch)]
2019-11-08 14:30:01.727  INFO 44447 --- [ntContainer#0-1] c.y.s.rabbitmqdeadqueue.DelayReceiver    : 【该订单未支付,取消订单】Order(orderId=2019110874129, orderStatus=2, orderName=switch)
2019-11-08 14:30:01.727  INFO 44447 --- [ntContainer#0-1] c.y.s.rabbitmqdeadqueue.DelayReceiver    : ---------------------------------------------
2019-11-08 14:30:01.728  INFO 44447 --- [ntContainer#0-1] c.y.s.rabbitmqdeadqueue.DelayReceiver    : ---------------------------------------------
2019-11-08 14:30:01.729  INFO 44447 --- [ntContainer#0-1] c.y.s.rabbitmqdeadqueue.DelayReceiver    : 【orderDelayQueue 监听的消息】 - 【消费时间】 - [Fri Nov 08 14:30:01 CST 2019]- 【订单内容】 - [Order(orderId=2019110874215, orderStatus=1, orderName=PS4 pro)]
2019-11-08 14:30:01.729  INFO 44447 --- [ntContainer#0-1] c.y.s.rabbitmqdeadqueue.DelayReceiver    : 【该订单已完成支付】
2019-11-08 14:30:01.729  INFO 44447 --- [ntContainer#0-1] c.y.s.rabbitmqdeadqueue.DelayReceiver    : ---------------------------------------------

SpringBoot之使用RabbitMQ实现延迟队列相关推荐

  1. 【SpringBoot】43、SpringBoot中整合RabbitMQ实现延时队列(延时插件篇)

    死信队列实现篇,参考文章:[SpringBoot]60.SpringBoot中整合RabbitMQ实现延时队列(死信队列篇) 一.介绍 1.什么是延时队列? 延时队列即就是放置在该队列里面的消息是不需 ...

  2. SpringBoot+RabbitMQ之延迟队列

    一.前言 延时队列应用于什么场景 延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费. 那么,为什么需要延迟消费呢?我们来看以下的场景: 网上商城下订单后30分 ...

  3. rabbitMq实现延迟队列

    文章目录 业务场景: 1 安装rabbitMq 2 添加maven依赖 3 在application.properties配置 4 具体的实现 4.1 Dead Letter Exchanges 4. ...

  4. docker rabbitmq 安装 延迟队列 rabbitmq_delayed_message_exchange 插件

    以  官方的 rabbitmq:3.9.20-management 为例,默认开启的插件有4个,如下所示 2022-07-09 21:31:55.624125+08:00 [info] <0.8 ...

  5. RabbitMQ实现延迟队列的方式

    1.背景 最近在做类似拍卖系统的上架功能,卖家上架物品以后,例如到期时间24小时或者48小时,如果无竞拍者或者购买者,则物品自动下架到用户的邮件中.诸如电商用户下单,30分钟未支付,则自动取消订单,归 ...

  6. 【不是拷贝】rabbitmq安装延迟队列插件rabbitmq_delayed_message_exchange

    1.查看当前rabbitmq已安装的插件 查看当前的rabbitmq 安装了哪些插件: rabbitmq-plugins list [root@yq-test1 ~]# rabbitmq-plugin ...

  7. RabbitMQ 延迟队列实现定时任务的正确姿势,你学会了么?

    以下文章来源方志朋的博客,回复"666"获面试宝典 场景 开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期.订单定时关闭.微信支付2小时未支付关闭订单等 ...

  8. RabbitMQ —— 延迟队列

    RabbitMQ实现延迟队列一:在队列上设置TTL Publish --> delaysync.exchange --> delay.5m.queue(延迟队列) --> delay ...

  9. RabbitMQ如何实现延迟队列?

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

最新文章

  1. R语言使用magick包的image_annotate函数在图片中添加文本标签信息、自定义文本标签内容的位置、色彩(Text annotations)
  2. [FZSZOJ 1029] 观察者加强版
  3. 基于深度学习的目标检测算法:SSD——常见的目标检测算法
  4. webuploader 怎么在react中_另辟蹊径搭建阅读React源码调试环境支持所有React版本细分文件断点调试...
  5. LiveVideoStackCon讲师热身分享 ( 七 ) —— 视频编码器的对比与选择
  6. 1.0jpa 2.0_EasyCriteria 2.0 – JPA标准应该很容易
  7. 2021年面试前端岗位需要注意什么?
  8. 自创本派高考理数试题集现在发布
  9. 贺利坚老师汇编课程37笔记:运用栈加两层循环之把六个字符串里的字母都改写成大写字母
  10. spring mvc---controller返回值
  11. Hibernate原生SQL查询
  12. Python实现桌面程序:PyQt5 + QtDesigner -- 界面设计与逻辑编写
  13. WindowsXP自带小工具(转)
  14. c语言选择题题及答案,c语言选择题
  15. c++ 调用meshlab程序慢_MeshLab中插件的添加过程
  16. CDISC SDTM AE domain学习笔记 - 1
  17. 卸载Windows的引导界面中的变色龙选项
  18. c语言考场排座系统,具才考场座次编排系统
  19. python通过ssh链接sql(python通过阿里跳板机链接阿里数据库)
  20. 单页面SPA和多页面MPA应用的区别

热门文章

  1. javaWeb_JSP 动态指令 forward 的程序
  2. React中的路由react-router
  3. 【漫天烟花】绚烂烟花点亮夜空也太美了叭、某程序员携带烟花秀给大家拜年啦~
  4. 程序人生 Hello‘s P2P
  5. Cascade EF-GAN: Progressive Facial Expression Editing with Local Focuses 论文解读
  6. (20)PDE_PTE属性(U/S PS A D 有效位)
  7. 检测代码区校验和实现简易反调试
  8. CVE-2021-35211: SolarWinds Serv-U SSH 漏洞分析
  9. 【安全】从mimikatz学习Windows安全之访问控制模型
  10. 160个Crackme019