Spring boot + RabbitMQ延迟队列实战
一、背景
延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。
那么,为什么需要延迟消费呢?我们来看以下的场景:
订单业务: 在电商/点餐中,都有下单后 30 分钟内没有付款,就自动取消订单。
短信通知: 下单成功后 60s 之后给用户发送短信通知。
失败重试: 业务操作失败后,间隔一定的时间进行失败重试。
传统订单处理:
采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。
当然传统的手法还可以再优化一下,即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,然后再做其他的业务操作。
但是通过定时任务来执行,实时性始终不是很好。
二、实现方案
方案一:通过死信队列实现
参考:https://www.cnblogs.com/xmf3628/p/12097101.html
方案二:通过延迟路由插件来实现rabbitmq-delayed-message-exchange
参考:https://www.cnblogs.com/wintercloud/p/10877399.html
这里演示通过插件来实现
三、插件安装
下载地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0
RabbitMQ的有些插件没有集成在初始的安装中,它们需要额外安装,这些文件的后缀为.ez,安装时需要将.ez文件拷贝到安装的插件目录。以下是不同系统中默认安装的插件目录路径:
//查看已安装的插件
rabbitmq-plugins list
//启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
//重启服务
service rabbitmq-server restart
//再次查看,插件是否生效
rabbitmq-plugins list
说明插件安装成功,并成功启用。
四、机制解释
安装插件后会生成新的Exchange类型x-delayed-message
,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia
(一个分布式数据系统)表中,检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type
类型标记的交换机类型投递至目标队列。
五、代码实战
1、依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.44</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>
2、队列连接信息
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
3、生产者代码
声明延迟队列和延迟队列的交换器,并建立绑定关系
@Configuration
public class TestDelayQueueConfig {@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<String, Object>();args.put("x-delayed-type", "direct");return new CustomExchange(ExchangeEnum.DELAY_EXCHANGE.getValue(), "x-delayed-message", true, false, args);}/*** 延迟消息队列* @return*/@Beanpublic Queue delayQueue() {return new Queue(QueueEnum.TEST_DELAY.getName(), true);}@Beanpublic Binding deplyBinding() {return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(QueueEnum.TEST_DELAY.getRoutingKey()).noargs();}
}
@Getter
public enum ExchangeEnum {DELAY_EXCHANGE("test.deply.exchange");private String value;ExchangeEnum(String value) {this.value = value;}
}
@Getter
public enum QueueEnum {/*** delay*/TEST_DELAY("test.delay.queue", "delay");/*** 队列名称*/private String name;/*** 队列路由键*/private String routingKey;QueueEnum(String name, String routingKey) {this.name = name;this.routingKey = routingKey;}
}
延迟队列生产者服务:
@Component
@Slf4j
public class DeplyProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(String msg, int delayTime) {log.info("msg= " + msg + ".delayTime" + delayTime);MessageProperties messageProperties = new MessageProperties();messageProperties.setDelay(delayTime);Message message = new Message(msg.getBytes(), messageProperties);rabbitTemplate.send(ExchangeEnum.DELAY_EXCHANGE.getValue(), QueueEnum.TEST_DELAY.getRoutingKey(), message);}
}
单元测试:
@Test
public void sendDeplyMsgTest() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String currentTime = sdf.format(new Date());log.info("发送测试消息的时间:" +currentTime );deplyProducer.send(currentTime + "发送一个测试消息,延迟10秒", 10000);//10秒deplyProducer.send(currentTime + "发送一个测试消息,延迟20秒", 20000);//2秒deplyProducer.send(currentTime + "发送一个测试消息,延迟30秒", 30000);//1秒
}
消费者:
@Component
@RabbitListener(queues = "test.delay.queue")
@Slf4j
public class DeplyConsumer {@RabbitHandlerpublic void onMessage(byte[] message,@Headers Map<String, Object> headers,Channel channel) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//log.info("收到延时消息时间:" + sdf.format(new Date()) + " Delay sent.");log.info(sdf.format(new Date())+"接收到延时消息:" + new String(message));}
}
执行测试结果:
执行消息发送代码的时候,不会立刻把消息推送到对应的队列中。
只要到了对应的时候,才会将消息推送到队列里面。
可以在执行单元测试的时候,通过管理界面,观察队列中消息的增长:
每隔10s,增加1条。
更多精彩,关注我吧。
Spring boot + RabbitMQ延迟队列实战相关推荐
- spring boot + rabbitMq整合之死信队列(DL)
rabbit mq 死信队列 什么是死信队列? DL-Dead Letter 死信队列 死信,在官网中对应的单词为"Dead Letter",可以看出翻译确实非常的简单粗暴.那么死 ...
- Delayed Message 插件实现 RabbitMQ 延迟队列
延迟队列是为了存放那些延迟执行的消息,待消息过期之后消费端从队列里拿出来执行. DLX + TTL 方式存在的时序问题 对于延迟队列不管是 AMQP 协议或者 RabbitMQ 本身是不支持的,之前有 ...
- rabbitmq 延迟队列_Delayed Message 插件实现 RabbitMQ 延迟队列
延迟队列是为了存放那些延迟执行的消息,待消息过期之后消费端从队列里拿出来执行. DLX + TTL 方式存在的时序问题 对于延迟队列不管是 AMQP 协议或者 RabbitMQ 本身是不支持的,之前有 ...
- RabbitMQ 延迟队列-对于入门来说可以快速上手
RabbitMQ 延迟队列-非常非常实用 RabbitMQ 延迟队列-非常非常实用 一.使用场景 二.消息延迟推送的实现 三.项目具体实现 RabbitMQ 延迟队列-非常非常实用 一.使用场景 ...
- 谷粒商城笔记+踩坑(22)——库存自动解锁。RabbitMQ延迟队列
导航: 谷粒商城笔记+踩坑汇总篇 目录 1 业务流程,订单失败后自动回滚解锁库存 可靠消息+最终一致性方案 2[仓库服务]RabbitMQ环境准备 2.1 导入依赖 2.2 yml配置RabbitMQ ...
- RabbitMQ 延迟队列实现定时任务的正确姿势,你学会了么?
以下文章来源方志朋的博客,回复"666"获面试宝典 场景 开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期.订单定时关闭.微信支付2小时未支付关闭订单等 ...
- Spring Boot 单元测试详解+实战教程
转载自 Spring Boot 单元测试详解+实战教程 Spring Boot 的测试类库 Spring Boot 提供了许多实用工具和注解来帮助测试应用程序,主要包括以下两个模块. spring ...
- Spring boot Rabbitmq 示例
Spring boot Rabbitmq 示例 简介 Spring boot RabbitMQ 简单程序示例 编写详情 RabbitMQ docker 避免麻烦,直接使用docker启 ...
- Spring Boot Vue Element入门实战(完结)
最近给朋友做一个大学运动会管理系统,用作教学案例,正好自己也在自学VUE,决定用spring boot vue做一个简单的系统.vue这个前端框架很火,他和传统的Jquery 编程思路完全不一样,Jq ...
最新文章
- 计算机里面有鬼的恐怖游戏,2018年度十大最佳PC恐怖游戏
- 免费教材丨第56期:《深度学习导论及案例分析》、《谷歌黑板报-数学之美》
- MVC验证05-自定义验证规则、验证2个属性值不等
- 科学计算机要用的电池是几号,科学计算器的常识及注意事项
- php商品数量怎么用js,如何使用js统计页面标签数量
- 像烟灰一样松散(毕淑敏)
- [TJOI2010]阅读理解
- java 枚举 示例_Java枚举name()方法及示例
- gRPC服务注册发现及负载均衡的实现方案与源码解析
- idea中新建javaWeb项目
- 字符串的经典hash算法
- 广州.Net俱乐部第二次聚会报道
- 【现代编译器】语法分析——正则表达式,上下文无关文法,递归下降分析,分析树...
- vc中format用法以及c++中Format用法
- 系统集成项目管理工程师思维导图
- 工具说明书 - 滚动截屏和录屏软件ShareX
- Linux有问必答:如何在Linux命令行中刻录ISO或NRG镜像到DVD
- 复制神器Ditto使用方法详细说明
- 浅谈工程总承包项目WBS的重要性与创建方法
- Meta Learning在NLP领域的应用
热门文章
- 数学分析_导函数连续问题分析
- SAP HR 年休假调整:从统休制到实休制,在一个自然年度内分段式生成年休假
- 聚簇索引(Clustered Index)和非聚簇索引 (Non- Clustered Index)
- Matlab绘制频率特性
- c语言restrict,一个c语言关键字restrict例子的疑问
- python提取邮件内容和附件(草稿版)
- 如何在NVMe SSD上安装Win7?手把手教你
- 创建日志表和插日志信息的sp
- html中colspan属性含义,colspan属性, 要如何用
- uboot成功移植到STM32F103ZET6(一)