第四十六章:SpringBoot RabbitMQ完成消息延迟消费
在2018-3-1
日SpringBoot
官方发版了2.0.0.RELEASE
最新版本,新版本完全基于Spring5.0
来构建,JDK
最低支持也从原来的1.6
也改成了1.8
,不再兼容1.8
以下的版本,更多新特性请查看官方文档。
本章目标
基于SpringBoot
整合RabbitMQ
完成消息延迟消费。
构建项目
注意前言
由于
SpringBoot
的内置扫描机制,我们如果不自动配置扫描路径,请保持下面rabbitmq-common
模块内的配置可以被SpringBoot
扫描到,否则不会自动创建队列,控制台会输出404的错误信息。
SpringBoot 企业级核心技术学习专题
专题 | 专题名称 | 专题描述 |
---|---|---|
001 | Spring Boot 核心技术 | 讲解SpringBoot一些企业级层面的核心组件 |
002 | Spring Boot 核心技术章节源码 | Spring Boot 核心技术简书每一篇文章码云对应源码 |
003 | Spring Cloud 核心技术 | 对Spring Cloud核心技术全面讲解 |
004 | Spring Cloud 核心技术章节源码 | Spring Cloud 核心技术简书每一篇文章对应源码 |
005 | QueryDSL 核心技术 | 全面讲解QueryDSL核心技术以及基于SpringBoot整合SpringDataJPA |
006 | SpringDataJPA 核心技术 | 全面讲解SpringDataJPA核心技术 |
007 | SpringBoot核心技术学习目录 | SpringBoot系统的学习目录,敬请关注点赞!!! |
我们本章采用2.0.0.RELEASE
版本的SpringBoot
,添加相关的依赖如下所示:
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.0.RELEASE</version><relativePath/> <!-- lookup parent from repository -->
</parent>
......
<dependencies><!--rabbbitMQ相关依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--web相关依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--lombok依赖--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!--spring boot tester--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--fast json依赖--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.40</version></dependency></dependencies>
......
复制代码
我们仍然采用多模块的方式来测试队列的Provider
以及Consumer
。
队列公共模块
我们先来创建一个名为rabbitmq-common
公共依赖模块(Create New Maven Module) 在公共模块内添加一个QueueEnum
队列枚举配置,该枚举内配置队列的Exchange
、QueueName
、RouteKey
等相关内容,如下所示:
package com.hengyu.rabbitmq.lazy.enums;import lombok.Getter;/*** 消息队列枚举配置** @author:于起宇 <br/>* ===============================* Created with IDEA.* Date:2018/3/3* Time:下午4:33* 简书:http://www.jianshu.com/u/092df3f77bca* ================================*/
@Getter
public enum QueueEnum {/*** 消息通知队列*/MESSAGE_QUEUE("message.center.direct", "message.center.create", "message.center.create"),/*** 消息通知ttl队列*/MESSAGE_TTL_QUEUE("message.center.topic.ttl", "message.center.create.ttl", "message.center.create.ttl");/*** 交换名称*/private String exchange;/*** 队列名称*/private String name;/*** 路由键*/private String routeKey;QueueEnum(String exchange, String name, String routeKey) {this.exchange = exchange;this.name = name;this.routeKey = routeKey;}
}
复制代码
可以看到MESSAGE_QUEUE
队列配置跟我们之前章节的配置一样,而我们另外新创建了一个后缀为ttl
的消息队列配置。我们采用的这种方式是RabbitMQ
消息队列其中一种的延迟消费模块,通过配置队列消息过期后转发的形式。
这种模式比较简单,我们需要将消息先发送到
ttl
延迟队列内,当消息到达过期时间后会自动转发到ttl
队列内配置的转发Exchange
以及RouteKey
绑定的队列内完成消息消费。
下面我们来模拟消息通知
的延迟消费场景,先来创建一个名为MessageRabbitMqConfiguration
的队列配置类,该配置类内添加消息通知队列
配置以及消息通过延迟队列
配置,如下所示:
/*** 消息通知 - 消息队列配置信息** @author:恒宇少年 <br/>* ===============================* Created with IDEA.* Date:2018/3/3* Time:下午4:32* 简书:http://www.jianshu.com/u/092df3f77bca* ================================*/
@Configuration
public class MessageRabbitMqConfiguration {/*** 消息中心实际消费队列交换配置** @return*/@BeanDirectExchange messageDirect() {return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.MESSAGE_QUEUE.getExchange()).durable(true).build();}/*** 消息中心延迟消费交换配置** @return*/@BeanDirectExchange messageTtlDirect() {return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.MESSAGE_TTL_QUEUE.getExchange()).durable(true).build();}/*** 消息中心实际消费队列配置** @return*/@Beanpublic Queue messageQueue() {return new Queue(QueueEnum.MESSAGE_QUEUE.getName());}/*** 消息中心TTL队列** @return*/@BeanQueue messageTtlQueue() {return QueueBuilder.durable(QueueEnum.MESSAGE_TTL_QUEUE.getName())// 配置到期后转发的交换.withArgument("x-dead-letter-exchange", QueueEnum.MESSAGE_QUEUE.getExchange())// 配置到期后转发的路由键.withArgument("x-dead-letter-routing-key", QueueEnum.MESSAGE_QUEUE.getRouteKey()).build();}/*** 消息中心实际消息交换与队列绑定** @param messageDirect 消息中心交换配置* @param messageQueue 消息中心队列* @return*/@BeanBinding messageBinding(DirectExchange messageDirect, Queue messageQueue) {return BindingBuilder.bind(messageQueue).to(messageDirect).with(QueueEnum.MESSAGE_QUEUE.getRouteKey());}/*** 消息中心TTL绑定实际消息中心实际消费交换机** @param messageTtlQueue* @param messageTtlDirect* @return*/@Beanpublic Binding messageTtlBinding(Queue messageTtlQueue, DirectExchange messageTtlDirect) {return BindingBuilder.bind(messageTtlQueue).to(messageTtlDirect).with(QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey());}
}
复制代码
我们声明了消息通知队列
的相关Exchange
、Queue
、Binding
等配置,将message.center.create
队列通过路由键message.center.create
绑定到了message.center.direct
交换上。
除此之外,我们还添加了消息通知延迟队列
的Exchange
、Queue
、Binding
等配置,将message.center.create.ttl
队列通过message.center.create.ttl
路由键绑定到了message.center.topic.ttl
交换上。
我们仔细来看看messageTtlQueue
延迟队列的配置,跟messageQueue
队列配置不同的地方这里多出了x-dead-letter-exchange
、x-dead-letter-routing-key
两个参数,而这两个参数就是配置延迟队列过期后转发的Exchange
、RouteKey
,只要在创建队列时对应添加了这两个参数,在RabbitMQ
管理平台看到的队列配置就不仅是单纯的Direct
类型的队列类型,如下图所示:
在上图内我们可以看到message.center.create.ttl
队列多出了DLX
、DLK
的配置,这就是RabbitMQ
内死信交换
的标志。 满足死信交换
的条件,在官方文档中表示:
Messages from a queue can be 'dead-lettered'; that is, republished to another exchange when any of the following events occur:
The message is rejected (basic.reject or basic.nack) with requeue=false, The TTL for the message expires; or The queue length limit is exceeded.
- 该消息被拒绝(basic.reject或 basic.nack),requeue = false
- 消息的TTL过期
- 队列长度限制已超出 官方文档地址
我们需要满足上面的其中一种方式就可以了,我们采用满足第二个条件,采用过期的方式。
队列消息提供者
我们再来创建一个名为rabbitmq-lazy-provider
的模块(Create New Maven Module),并且在pom.xml
配置文件内添加rabbitmq-common
模块的依赖,如下所示:
<!--添加公共模块依赖-->
<dependency><groupId>com.hengyu</groupId><artifactId>rabbitmq-common</artifactId><version>0.0.1-SNAPSHOT</version>
</dependency>
复制代码
配置队列
在resource
下创建一个名为application.yml
的配置文件,在该配置文件内添加如下配置信息:
spring:#rabbitmq消息队列配置信息rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /hengboypublisher-confirms: true
复制代码
消息提供者类
接下来我们来创建名为MessageProvider
消息提供者类,用来发送消息内容到消息通知延迟队列,代码如下所示:
/*** 消息通知 - 提供者** @author:于起宇 <br/>* ===============================* Created with IDEA.* Date:2018/3/3* Time:下午4:40* 简书:http://www.jianshu.com/u/092df3f77bca* ================================*/
@Component
public class MessageProvider {/*** logger instance*/static Logger logger = LoggerFactory.getLogger(MessageProvider.class);/*** RabbitMQ 模版消息实现类*/@Autowiredprivate AmqpTemplate rabbitMqTemplate;/*** 发送延迟消息** @param messageContent 消息内容* @param exchange 队列交换* @param routerKey 队列交换绑定的路由键* @param delayTimes 延迟时长,单位:毫秒*/public void sendMessage(Object messageContent, String exchange, String routerKey, final long delayTimes) {if (!StringUtils.isEmpty(exchange)) {logger.info("延迟:{}毫秒写入消息队列:{},消息内容:{}", delayTimes, routerKey, JSON.toJSONString(messageContent));// 执行发送消息到指定队列rabbitMqTemplate.convertAndSend(exchange, routerKey, messageContent, message -> {// 设置延迟毫秒值message.getMessageProperties().setExpiration(String.valueOf(delayTimes));return message;});} else {logger.error("未找到队列消息:{},所属的交换机", exchange);}}
}
复制代码
由于我们在 pom.xml
配置文件内添加了RabbitMQ
相关的依赖并且在上面application.yml
文件内添加了对应的配置,SpringBoot
为我们自动实例化了AmqpTemplate
,该实例可以发送任何类型的消息到指定队列。 我们采用convertAndSend
方法,将消息内容发送到指定Exchange
、RouterKey
队列,并且通过setExpiration
方法设置过期时间,单位:毫秒。
编写发送测试
我们在test
目录下创建一个测试类,如下所示:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMqLazyProviderApplication.class)
public class RabbitMqLazyProviderApplicationTests {/*** 消息队列提供者*/@Autowiredprivate MessageProvider messageProvider;/*** 测试延迟消息消费*/@Testpublic void testLazy() {// 测试延迟10秒messageProvider.sendMessage("测试延迟消费,写入时间:" + new Date(),QueueEnum.MESSAGE_TTL_QUEUE.getExchange(),QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey(),10000);}
}
复制代码
注意:
@SpringBootTest
注解内添加了classes
入口类的配置,因为我们是模块创建的项目并不是默认创建的SpringBoot
项目,这里需要配置入口程序类才可以运行测试。
在测试类我们注入了MessageProvider
消息提供者,调用sendMessage
方法发送消息到消息通知延迟队列
,并且设置延迟的时间为10秒
,这里衡量发送到指定队列的标准是要看MessageRabbitMqConfiguration
配置类内的相关Binding
配置,通过Exchange
、RouterKey
值进行发送到指定的队列。
到目前为止我们的rabbitmq-lazy-provider
消息提供模块已经编写完成了,下面我们来看看消息消费者模块。
队列消息消费者
我们再来创建一个名为rabbitmq-lazy-consumer
的模块(Create New Maven Module),同样需要在pom.xml
配置文件内添加rabbitmq-common
模块的依赖,如下所示:
<!--添加公共模块依赖-->
<dependency><groupId>com.hengyu</groupId><artifactId>rabbitmq-common</artifactId><version>0.0.1-SNAPSHOT</version>
</dependency>
复制代码
当然同样需要在resource
下创建application.yml
并添加消息队列的相关配置,代码就不贴出来了,可以直接从rabbitmq-lazy-provider
模块中复制application.yml
文件到当前模块内。
消息消费者类
接下来创建一个名为MessageConsumer
的消费者类,该类需要监听消息通知队列
,代码如下所示:
/*** 消息通知 - 消费者** @author:于起宇 <br/>* ===============================* Created with IDEA.* Date:2018/3/3* Time:下午5:00* 简书:http://www.jianshu.com/u/092df3f77bca* ================================*/
@Component
@RabbitListener(queues = "message.center.create")
public class MessageConsumer {/*** logger instance*/static Logger logger = LoggerFactory.getLogger(MessageConsumer.class);@RabbitHandlerpublic void handler(String content) {logger.info("消费内容:{}", content);}
}
复制代码
在@RabbitListener
注解内配置了监听的队列,这里配置内容是QueueEnum
枚举内的queueName
属性值,当然如果你采用常量的方式在注解属性上是直接可以使用的,枚举不支持这种配置,这里只能把QueueName
字符串配置到queues
属性上了。 由于我们在消息发送时采用字符串的形式发送消息内容,这里在@RabbitHandler
处理方法的参数内要保持数据类型一致!
消费者入口类
我们为消费者模块添加一个入口程序类,用于启动消费者,代码如下所示:
/*** 【第四十六章:SpringBoot & RabbitMQ完成消息延迟消费】* 队列消费者模块 - 入口程序类** @author:于起宇 <br/>* ===============================* Created with IDEA.* Date:2018/3/3* Time:下午4:55* 简书:http://www.jianshu.com/u/092df3f77bca* ================================*/
@SpringBootApplication
public class RabbitMqLazyConsumerApplication {public static void main(String[] args) {SpringApplication.run(RabbitMqLazyConsumerApplication.class, args);}
}
复制代码
测试
我们的代码已经编写完毕,下面来测试下是否完成了我们预想的效果,步骤如下所示:
1. 启动消费者模块
2. 执行RabbitMqLazyProviderApplicationTests.testLazy()方法进行发送消息到通知延迟队列
3. 查看消费者模块控制台输出内容
复制代码
我们可以在消费者模块控制台看到输出内容:
2018-03-04 10:10:34.765 INFO 70486 --- [cTaskExecutor-1] c.h.r.lazy.consumer.MessageConsumer : 消费内容:测试延迟消费,写入时间:Sun Mar 04 10:10:24 CST 2018
复制代码
我们在提供者测试方法发送消息的时间为10:10:24
,而真正消费的时间则为10:10:34
,与我们预计的一样,消息延迟了10秒
后去执行消费。
总结
终上所述我们完成了消息队列的延迟消费
,采用死信
方式,通过消息过期方式触发,在实际项目研发过程中,延迟消费还是很有必要的,可以省去一些定时任务的配置。
本章源码已经上传到码云: SpringBoot配套源码地址:gitee.com/hengboy/spr… SpringCloud配套源码地址:gitee.com/hengboy/spr… SpringBoot相关系列文章请访问:目录:SpringBoot学习目录 QueryDSL相关系列文章请访问:QueryDSL通用查询框架学习目录 SpringDataJPA相关系列文章请访问:目录:SpringDataJPA学习目录,感谢阅读!
第四十六章:SpringBoot RabbitMQ完成消息延迟消费相关推荐
- pdfstamper生成pdf无法显示汉字_正点原子STM32F4/F7水星开发板资料连载第四十六章 汉字显示实验...
1)实验平台:正点原子水星 STM32F4/F7 开发板 2)摘自<STM32F7 开发指南(HAL 库版)>关注官方微信号公众号,获取更多资料:正点原子 3)全套实验源码+手册+视频下载 ...
- 第四十六章 SQL函数 DAY
文章目录 第四十六章 SQL函数 DAY 大纲 参数 描述 第四十六章 SQL函数 DAY 返回日期表达式的月份日期的日期函数. 大纲 DAY(date-expression){fn DAY(date ...
- 【正点原子STM32连载】第四十六章 FATFS实验 摘自【正点原子】MiniPro STM32H750 开发指南_V1.1
1)实验平台:正点原子MiniPro H750开发板 2)平台购买地址:https://detail.tmall.com/item.htm?id=677017430560 3)全套实验源码+手册+视频 ...
- 第四十六章 Caché 变量大全 ^$GLOBAL 变量
文章目录 第四十六章 Caché 变量大全 ^$GLOBAL 变量 大纲 参数 描述 进程私有全局变量 参数 nspace global_name 示例 作为`$DATA`的参数 作为`$ORDER` ...
- 光盘显示0字节可用_正点原子STM32F4/F7水星开发板资料连载第四十六章 汉字显示实验
1)实验平台:正点原子水星 STM32F4/F7 开发板 2)摘自<STM32F7 开发指南(HAL 库版)>关注官方微信号公众号,获取更多资料:正点原子 3)全套实验源码+手册+视频下载 ...
- 第四十六章 使用 ^SystemPerformance 监视性能 - 生成 ^SystemPerformance 性能报告
文章目录 第四十六章 使用 ^SystemPerformance 监视性能 - 生成 ^SystemPerformance 性能报告 生成 `^SystemPerformance` 性能报告 使用任务 ...
- 【正点原子FPGA连载】第四十六章SD卡读写测试实验 -摘自【正点原子】新起点之FPGA开发指南_V2.1
1)实验平台:正点原子新起点V2开发板 2)平台购买地址:https://detail.tmall.com/item.htm?id=609758951113 2)全套实验源码+手册+视频下载地址:ht ...
- 嵌入式linux系统蜂鸣器实验,「正点原子Linux连载」第四十六章Linux蜂鸣器实验
1)实验平台:正点原子Linux开发板 2)摘自<正点原子I.MX6U嵌入式Linux驱动开发指南> 关注官方微信号公众号,获取更多资料:正点原子 大家将imx35_gpio_hwdata ...
- rabbitmq 手动提交_第四章----SpringBoot+RabbitMQ发送确认和消费手动确认机制
1. 配置RabbitMQ # 发送确认 spring.rabbitmq.publisher-confirms=true # 发送回调 spring.rabbitmq.publisher-return ...
最新文章
- 2019年智能手机AI要被深度开发,这五项技术将是重点
- eclipse 快捷键_eclipse两种注释的快捷键
- spring三: 装配bean( 在xml中进行显式配置, 在java中进行显式配置)
- python中的set函数、列表的操作
- dev c++运行没有结果_「C/C++」一行注释也能影响运行结果?
- 1.4最基本的使用--POM.xml文件
- PXC集群常见错误(一)
- ES9新特性_ES9扩展运算符与rest参数---JavaScript_ECMAScript_ES6-ES11新特性工作笔记053
- Caffe100数据集使用
- ​对于边界值中有不确定字符串时该怎么处理
- SQL中GROUP BY用法示例
- java循环遍历map集合_Java中遍历Map集合的四种方法
- 基于python物流管理系统毕业设计-python实现快递价格查询系统
- RISC-V MCU+病房系统
- sis 最新_传统SIS系统面临考验——“可持续的安全仪表系统”来袭
- jQuery中的get和post请求
- CPU架构解析:ARM和x86大比拼
- 【tool】动态注释LOG_NDEBUG宏定义
- 11届蓝桥杯青少年组C++全国赛高级组
- Math 的 ceil、floor、round方法详解及示例
热门文章
- gatk过滤_重测序2--看了不后悔的gatk-变异检测
- java tempfile read_Java资源作为文件
- apiclod 上传图片_Apicloud——关于上传图片、视频(二)
- python坐标定位_Python_元素定位浏览器坐标定位
- 虚拟服务器ip是什么意思,虚拟主机独立ip是什么意思
- 报错,Unknown custom element: <DeviceVendorStatistics> - did you register the component correctly? For
- 大数据学习笔记02:在私有云上创建与配置虚拟机
- 专业英语笔记:Install and Use Python
- flutter 动画展开菜单_蒲公英 · JELLY技术周刊 Vol.34: 芜湖~ Flutter
- python索引字符串_Python:通过索引删除子字符串