2018-3-1SpringBoot官方发版了2.0.0.RELEASE最新版本,新版本完全基于Spring5.0来构建,JDK最低支持也从原来的1.6也改成了1.8,不再兼容1.8以下的版本,更多新特性请查看官方文档。

本章目标

基于SpringBoot整合RabbitMQ完成消息延迟消费。

构建项目

注意前言

由于SpringBoot的内置扫描机制,我们如果不自动配置扫描路径,请保持下面rabbitmq-common模块内的配置可以被SpringBoot扫描到,否则不会自动创建队列,控制台会输出404的错误信息。

SpringBoot 企业级核心技术学习专题


专题 专题名称 专题描述
001 Spring Boot 核心技术 讲解SpringBoot一些企业级层面的核心组件
002 Spring Boot 核心技术章节源码 Spring Boot 核心技术简书每一篇文章码云对应源码
003 Spring Cloud 核心技术 对Spring Cloud核心技术全面讲解
004 Spring Cloud 核心技术章节源码 Spring Cloud 核心技术简书每一篇文章对应源码
005 QueryDSL 核心技术 全面讲解QueryDSL核心技术以及基于SpringBoot整合SpringDataJPA
006 SpringDataJPA 核心技术 全面讲解SpringDataJPA核心技术
007 SpringBoot核心技术学习目录 SpringBoot系统的学习目录,敬请关注点赞!!!

我们本章采用2.0.0.RELEASE版本的SpringBoot,添加相关的依赖如下所示:

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.0.RELEASE</version><relativePath/> <!-- lookup parent from repository -->
</parent>
......
<dependencies><!--rabbbitMQ相关依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--web相关依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--lombok依赖--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!--spring boot tester--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--fast json依赖--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.40</version></dependency></dependencies>
......
复制代码

我们仍然采用多模块的方式来测试队列的Provider以及Consumer

队列公共模块

我们先来创建一个名为rabbitmq-common公共依赖模块(Create New Maven Module) 在公共模块内添加一个QueueEnum队列枚举配置,该枚举内配置队列的ExchangeQueueNameRouteKey等相关内容,如下所示:

package com.hengyu.rabbitmq.lazy.enums;import lombok.Getter;/*** 消息队列枚举配置** @author:于起宇 <br/>* ===============================* Created with IDEA.* Date:2018/3/3* Time:下午4:33* 简书:http://www.jianshu.com/u/092df3f77bca* ================================*/
@Getter
public enum QueueEnum {/*** 消息通知队列*/MESSAGE_QUEUE("message.center.direct", "message.center.create", "message.center.create"),/*** 消息通知ttl队列*/MESSAGE_TTL_QUEUE("message.center.topic.ttl", "message.center.create.ttl", "message.center.create.ttl");/*** 交换名称*/private String exchange;/*** 队列名称*/private String name;/*** 路由键*/private String routeKey;QueueEnum(String exchange, String name, String routeKey) {this.exchange = exchange;this.name = name;this.routeKey = routeKey;}
}
复制代码

可以看到MESSAGE_QUEUE队列配置跟我们之前章节的配置一样,而我们另外新创建了一个后缀为ttl的消息队列配置。我们采用的这种方式是RabbitMQ消息队列其中一种的延迟消费模块,通过配置队列消息过期后转发的形式。

这种模式比较简单,我们需要将消息先发送到ttl延迟队列内,当消息到达过期时间后会自动转发到ttl队列内配置的转发Exchange以及RouteKey绑定的队列内完成消息消费。

下面我们来模拟消息通知的延迟消费场景,先来创建一个名为MessageRabbitMqConfiguration的队列配置类,该配置类内添加消息通知队列配置以及消息通过延迟队列配置,如下所示:

/*** 消息通知 - 消息队列配置信息** @author:恒宇少年 <br/>* ===============================* Created with IDEA.* Date:2018/3/3* Time:下午4:32* 简书:http://www.jianshu.com/u/092df3f77bca* ================================*/
@Configuration
public class MessageRabbitMqConfiguration {/*** 消息中心实际消费队列交换配置** @return*/@BeanDirectExchange messageDirect() {return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.MESSAGE_QUEUE.getExchange()).durable(true).build();}/*** 消息中心延迟消费交换配置** @return*/@BeanDirectExchange messageTtlDirect() {return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.MESSAGE_TTL_QUEUE.getExchange()).durable(true).build();}/*** 消息中心实际消费队列配置** @return*/@Beanpublic Queue messageQueue() {return new Queue(QueueEnum.MESSAGE_QUEUE.getName());}/*** 消息中心TTL队列** @return*/@BeanQueue messageTtlQueue() {return QueueBuilder.durable(QueueEnum.MESSAGE_TTL_QUEUE.getName())// 配置到期后转发的交换.withArgument("x-dead-letter-exchange", QueueEnum.MESSAGE_QUEUE.getExchange())// 配置到期后转发的路由键.withArgument("x-dead-letter-routing-key", QueueEnum.MESSAGE_QUEUE.getRouteKey()).build();}/*** 消息中心实际消息交换与队列绑定** @param messageDirect 消息中心交换配置* @param messageQueue  消息中心队列* @return*/@BeanBinding messageBinding(DirectExchange messageDirect, Queue messageQueue) {return BindingBuilder.bind(messageQueue).to(messageDirect).with(QueueEnum.MESSAGE_QUEUE.getRouteKey());}/*** 消息中心TTL绑定实际消息中心实际消费交换机** @param messageTtlQueue* @param messageTtlDirect* @return*/@Beanpublic Binding messageTtlBinding(Queue messageTtlQueue, DirectExchange messageTtlDirect) {return BindingBuilder.bind(messageTtlQueue).to(messageTtlDirect).with(QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey());}
}
复制代码

我们声明了消息通知队列的相关ExchangeQueueBinding等配置,将message.center.create队列通过路由键message.center.create绑定到了message.center.direct交换上。

除此之外,我们还添加了消息通知延迟队列ExchangeQueueBinding等配置,将message.center.create.ttl队列通过message.center.create.ttl路由键绑定到了message.center.topic.ttl交换上。

我们仔细来看看messageTtlQueue延迟队列的配置,跟messageQueue队列配置不同的地方这里多出了x-dead-letter-exchangex-dead-letter-routing-key两个参数,而这两个参数就是配置延迟队列过期后转发的ExchangeRouteKey,只要在创建队列时对应添加了这两个参数,在RabbitMQ管理平台看到的队列配置就不仅是单纯的Direct类型的队列类型,如下图所示:

在上图内我们可以看到message.center.create.ttl队列多出了DLXDLK的配置,这就是RabbitMQ死信交换的标志。 满足死信交换的条件,在官方文档中表示:

Messages from a queue can be 'dead-lettered'; that is, republished to another exchange when any of the following events occur:

The message is rejected (basic.reject or basic.nack) with requeue=false, The TTL for the message expires; or The queue length limit is exceeded.

  • 该消息被拒绝(basic.reject或 basic.nack),requeue = false
  • 消息的TTL过期
  • 队列长度限制已超出 官方文档地址

我们需要满足上面的其中一种方式就可以了,我们采用满足第二个条件,采用过期的方式。

队列消息提供者

我们再来创建一个名为rabbitmq-lazy-provider的模块(Create New Maven Module),并且在pom.xml配置文件内添加rabbitmq-common模块的依赖,如下所示:

<!--添加公共模块依赖-->
<dependency><groupId>com.hengyu</groupId><artifactId>rabbitmq-common</artifactId><version>0.0.1-SNAPSHOT</version>
</dependency>
复制代码

配置队列

resource下创建一个名为application.yml的配置文件,在该配置文件内添加如下配置信息:

spring:#rabbitmq消息队列配置信息rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /hengboypublisher-confirms: true
复制代码

消息提供者类

接下来我们来创建名为MessageProvider消息提供者类,用来发送消息内容到消息通知延迟队列,代码如下所示:

/*** 消息通知 - 提供者** @author:于起宇 <br/>* ===============================* Created with IDEA.* Date:2018/3/3* Time:下午4:40* 简书:http://www.jianshu.com/u/092df3f77bca* ================================*/
@Component
public class MessageProvider {/*** logger instance*/static Logger logger = LoggerFactory.getLogger(MessageProvider.class);/*** RabbitMQ 模版消息实现类*/@Autowiredprivate AmqpTemplate rabbitMqTemplate;/*** 发送延迟消息** @param messageContent 消息内容* @param exchange       队列交换* @param routerKey      队列交换绑定的路由键* @param delayTimes     延迟时长,单位:毫秒*/public void sendMessage(Object messageContent, String exchange, String routerKey, final long delayTimes) {if (!StringUtils.isEmpty(exchange)) {logger.info("延迟:{}毫秒写入消息队列:{},消息内容:{}", delayTimes, routerKey, JSON.toJSONString(messageContent));// 执行发送消息到指定队列rabbitMqTemplate.convertAndSend(exchange, routerKey, messageContent, message -> {// 设置延迟毫秒值message.getMessageProperties().setExpiration(String.valueOf(delayTimes));return message;});} else {logger.error("未找到队列消息:{},所属的交换机", exchange);}}
}
复制代码

由于我们在 pom.xml配置文件内添加了RabbitMQ相关的依赖并且在上面application.yml文件内添加了对应的配置,SpringBoot为我们自动实例化了AmqpTemplate,该实例可以发送任何类型的消息到指定队列。 我们采用convertAndSend方法,将消息内容发送到指定ExchangeRouterKey队列,并且通过setExpiration方法设置过期时间,单位:毫秒。

编写发送测试

我们在test目录下创建一个测试类,如下所示:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMqLazyProviderApplication.class)
public class RabbitMqLazyProviderApplicationTests {/*** 消息队列提供者*/@Autowiredprivate MessageProvider messageProvider;/*** 测试延迟消息消费*/@Testpublic void testLazy() {// 测试延迟10秒messageProvider.sendMessage("测试延迟消费,写入时间:" + new Date(),QueueEnum.MESSAGE_TTL_QUEUE.getExchange(),QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey(),10000);}
}
复制代码

注意:@SpringBootTest注解内添加了classes入口类的配置,因为我们是模块创建的项目并不是默认创建的SpringBoot项目,这里需要配置入口程序类才可以运行测试。

在测试类我们注入了MessageProvider消息提供者,调用sendMessage方法发送消息到消息通知延迟队列,并且设置延迟的时间为10秒,这里衡量发送到指定队列的标准是要看MessageRabbitMqConfiguration配置类内的相关Binding配置,通过ExchangeRouterKey值进行发送到指定的队列。

到目前为止我们的rabbitmq-lazy-provider消息提供模块已经编写完成了,下面我们来看看消息消费者模块。

队列消息消费者

我们再来创建一个名为rabbitmq-lazy-consumer的模块(Create New Maven Module),同样需要在pom.xml配置文件内添加rabbitmq-common模块的依赖,如下所示:

<!--添加公共模块依赖-->
<dependency><groupId>com.hengyu</groupId><artifactId>rabbitmq-common</artifactId><version>0.0.1-SNAPSHOT</version>
</dependency>
复制代码

当然同样需要在resource下创建application.yml并添加消息队列的相关配置,代码就不贴出来了,可以直接从rabbitmq-lazy-provider模块中复制application.yml文件到当前模块内。

消息消费者类

接下来创建一个名为MessageConsumer的消费者类,该类需要监听消息通知队列,代码如下所示:

/*** 消息通知 - 消费者** @author:于起宇 <br/>* ===============================* Created with IDEA.* Date:2018/3/3* Time:下午5:00* 简书:http://www.jianshu.com/u/092df3f77bca* ================================*/
@Component
@RabbitListener(queues = "message.center.create")
public class MessageConsumer {/*** logger instance*/static Logger logger = LoggerFactory.getLogger(MessageConsumer.class);@RabbitHandlerpublic void handler(String content) {logger.info("消费内容:{}", content);}
}
复制代码

@RabbitListener注解内配置了监听的队列,这里配置内容是QueueEnum枚举内的queueName属性值,当然如果你采用常量的方式在注解属性上是直接可以使用的,枚举不支持这种配置,这里只能把QueueName字符串配置到queues属性上了。 由于我们在消息发送时采用字符串的形式发送消息内容,这里在@RabbitHandler处理方法的参数内要保持数据类型一致!

消费者入口类

我们为消费者模块添加一个入口程序类,用于启动消费者,代码如下所示:

/*** 【第四十六章:SpringBoot & RabbitMQ完成消息延迟消费】* 队列消费者模块 - 入口程序类** @author:于起宇 <br/>* ===============================* Created with IDEA.* Date:2018/3/3* Time:下午4:55* 简书:http://www.jianshu.com/u/092df3f77bca* ================================*/
@SpringBootApplication
public class RabbitMqLazyConsumerApplication {public static void main(String[] args) {SpringApplication.run(RabbitMqLazyConsumerApplication.class, args);}
}
复制代码

测试

我们的代码已经编写完毕,下面来测试下是否完成了我们预想的效果,步骤如下所示:

1. 启动消费者模块
2. 执行RabbitMqLazyProviderApplicationTests.testLazy()方法进行发送消息到通知延迟队列
3. 查看消费者模块控制台输出内容
复制代码

我们可以在消费者模块控制台看到输出内容:

2018-03-04 10:10:34.765  INFO 70486 --- [cTaskExecutor-1] c.h.r.lazy.consumer.MessageConsumer      : 消费内容:测试延迟消费,写入时间:Sun Mar 04 10:10:24 CST 2018
复制代码

我们在提供者测试方法发送消息的时间为10:10:24,而真正消费的时间则为10:10:34,与我们预计的一样,消息延迟了10秒后去执行消费。

总结

终上所述我们完成了消息队列的延迟消费,采用死信方式,通过消息过期方式触发,在实际项目研发过程中,延迟消费还是很有必要的,可以省去一些定时任务的配置。

本章源码已经上传到码云: SpringBoot配套源码地址:gitee.com/hengboy/spr… SpringCloud配套源码地址:gitee.com/hengboy/spr… SpringBoot相关系列文章请访问:目录:SpringBoot学习目录 QueryDSL相关系列文章请访问:QueryDSL通用查询框架学习目录 SpringDataJPA相关系列文章请访问:目录:SpringDataJPA学习目录,感谢阅读!

第四十六章:SpringBoot RabbitMQ完成消息延迟消费相关推荐

  1. pdfstamper生成pdf无法显示汉字_正点原子STM32F4/F7水星开发板资料连载第四十六章 汉字显示实验...

    1)实验平台:正点原子水星 STM32F4/F7 开发板 2)摘自<STM32F7 开发指南(HAL 库版)>关注官方微信号公众号,获取更多资料:正点原子 3)全套实验源码+手册+视频下载 ...

  2. 第四十六章 SQL函数 DAY

    文章目录 第四十六章 SQL函数 DAY 大纲 参数 描述 第四十六章 SQL函数 DAY 返回日期表达式的月份日期的日期函数. 大纲 DAY(date-expression){fn DAY(date ...

  3. 【正点原子STM32连载】第四十六章 FATFS实验 摘自【正点原子】MiniPro STM32H750 开发指南_V1.1

    1)实验平台:正点原子MiniPro H750开发板 2)平台购买地址:https://detail.tmall.com/item.htm?id=677017430560 3)全套实验源码+手册+视频 ...

  4. 第四十六章 Caché 变量大全 ^$GLOBAL 变量

    文章目录 第四十六章 Caché 变量大全 ^$GLOBAL 变量 大纲 参数 描述 进程私有全局变量 参数 nspace global_name 示例 作为`$DATA`的参数 作为`$ORDER` ...

  5. 光盘显示0字节可用_正点原子STM32F4/F7水星开发板资料连载第四十六章 汉字显示实验

    1)实验平台:正点原子水星 STM32F4/F7 开发板 2)摘自<STM32F7 开发指南(HAL 库版)>关注官方微信号公众号,获取更多资料:正点原子 3)全套实验源码+手册+视频下载 ...

  6. 第四十六章 使用 ^SystemPerformance 监视性能 - 生成 ^SystemPerformance 性能报告

    文章目录 第四十六章 使用 ^SystemPerformance 监视性能 - 生成 ^SystemPerformance 性能报告 生成 `^SystemPerformance` 性能报告 使用任务 ...

  7. 【正点原子FPGA连载】第四十六章SD卡读写测试实验 -摘自【正点原子】新起点之FPGA开发指南_V2.1

    1)实验平台:正点原子新起点V2开发板 2)平台购买地址:https://detail.tmall.com/item.htm?id=609758951113 2)全套实验源码+手册+视频下载地址:ht ...

  8. 嵌入式linux系统蜂鸣器实验,「正点原子Linux连载」第四十六章Linux蜂鸣器实验

    1)实验平台:正点原子Linux开发板 2)摘自<正点原子I.MX6U嵌入式Linux驱动开发指南> 关注官方微信号公众号,获取更多资料:正点原子 大家将imx35_gpio_hwdata ...

  9. rabbitmq 手动提交_第四章----SpringBoot+RabbitMQ发送确认和消费手动确认机制

    1. 配置RabbitMQ # 发送确认 spring.rabbitmq.publisher-confirms=true # 发送回调 spring.rabbitmq.publisher-return ...

最新文章

  1. 2019年智能手机AI要被深度开发,这五项技术将是重点
  2. eclipse 快捷键_eclipse两种注释的快捷键
  3. spring三: 装配bean( 在xml中进行显式配置, 在java中进行显式配置)
  4. python中的set函数、列表的操作
  5. dev c++运行没有结果_「C/C++」一行注释也能影响运行结果?
  6. 1.4最基本的使用--POM.xml文件
  7. PXC集群常见错误(一)
  8. ES9新特性_ES9扩展运算符与rest参数---JavaScript_ECMAScript_ES6-ES11新特性工作笔记053
  9. Caffe100数据集使用
  10. ​对于边界值中有不确定字符串时该怎么处理
  11. SQL中GROUP BY用法示例
  12. java循环遍历map集合_Java中遍历Map集合的四种方法
  13. 基于python物流管理系统毕业设计-python实现快递价格查询系统
  14. RISC-V MCU+病房系统
  15. sis 最新_传统SIS系统面临考验——“可持续的安全仪表系统”来袭
  16. jQuery中的get和post请求
  17. CPU架构解析:ARM和x86大比拼
  18. 【tool】动态注释LOG_NDEBUG宏定义
  19. 11届蓝桥杯青少年组C++全国赛高级组
  20. Math 的 ceil、floor、round方法详解及示例

热门文章

  1. gatk过滤_重测序2--看了不后悔的gatk-变异检测
  2. java tempfile read_Java资源作为文件
  3. apiclod 上传图片_Apicloud——关于上传图片、视频(二)
  4. python坐标定位_Python_元素定位浏览器坐标定位
  5. 虚拟服务器ip是什么意思,虚拟主机独立ip是什么意思
  6. 报错,Unknown custom element: <DeviceVendorStatistics> - did you register the component correctly? For
  7. 大数据学习笔记02:在私有云上创建与配置虚拟机
  8. 专业英语笔记:Install and Use Python
  9. flutter 动画展开菜单_蒲公英 · JELLY技术周刊 Vol.34: 芜湖~ Flutter
  10. python索引字符串_Python:通过索引删除子字符串