1.1 导入依赖

<dependencies><!--RabbitMQ 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--swagger--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><!--RabbitMQ 测试依赖--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies>

1.2 修改配置文件

spring.rabbitmq.host=xxx.xxx.xxx.xxx
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123

1.3 队列ttl

1.3.1 队列架构图

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交 换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:

1.3.2 队列配置类

@Configuration
public class TtlQueueConfig {private static final String X_EXCHANGE = "X";private static final String QUEUE_A = "QA";private static final String QUEUE_B = "QB";private static final String Y_DEAD_LETTER_EXCHANGE = "Y";private static final String DEAD_LETTER_QUEUE = "QD";
​// 声明 xExchange@Bean("xExchange")public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}
​// 声明 xExchange@Bean("yExchange")public DirectExchange yExchange() {return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}
​//声明队列 A ttl 为 10s 并绑定到对应的死信交换机@Bean("queueA")public Queue queueA() {Map<String, Object> args = new HashMap<>(3);//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//声明队列的 TTLargs.put("x-message-ttl", 10000);return QueueBuilder.durable(QUEUE_A).withArguments(args).build();}
​// 声明队列 A 绑定 X 交换机@Beanpublic Binding queueaBindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with("XA");}
​//声明队列 B ttl 为 40s 并绑定到对应的死信交换机@Bean("queueB")public Queue queueB() {Map<String, Object> args = new HashMap<>(3);//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//声明队列的 TTLargs.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(args).build();}
​//声明队列 B 绑定 X 交换机@Beanpublic Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queue1B).to(xExchange).with("XB");}
​//声明死信队列 QD@Bean("queueD")public Queue queueD() {return new Queue(DEAD_LETTER_QUEUE);}
​//声明死信队列 QD 绑定关系@Beanpublic Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange) {return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}

1.3.3 生产者

@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;
​@GetMapping("sendMsg/{message}")public void sendMsg(@PathVariable String message) {log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: " + message);rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: " + message);}
}

1.3.4 消费者

@Slf4j
@Component
public class DeadLetterQueueConsumer {@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);}
}

1.3.5 消息消费

发起一个请求 http://localhost:8080/ttl/sendMsg/hello

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。

1.3.6 消费者延迟时间设置问题

     /*** 可以发送指定过期时间的消息* 不同过期时间指定不同的队列会造成队列过多* 我们可以不指定队列的过期时间,而是在生产者这边指定消息的过期时间** @param message* @param ttlTime*/@GetMapping("sendExpirationMsg/{message}/{ttlTime}")public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {correlationData.getMessageProperties().setExpiration(ttlTime);return correlationData;});log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(), ttlTime, message);}

RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

1.3.7 延时队列总结

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景。

RabbitMQ(六)整合SpringBoot相关推荐

  1. Rabbitmq消息中间件整合Springboot

    首先在docker-compose中部署rabbitmq: docker-compose.yml version: '3' services:rabbitmq:image: rabbitmq:mana ...

  2. (RabbitMQ 二)Springboot项目中使用RabbitMQ的相关依赖

    (RabbitMQ 二)Springboot项目中使用RabbitMQ的相关依赖 RabbitMQ系列文章如下: (RabbitMQ 一[转载])windows10环境下的RabbitMQ安装步骤 h ...

  3. RabbitMQ消息队列(六):SpringBoot整合之通配符模式

    RabbitMQ消息队列(六):SpringBoot整合之通配符模式 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AeZQrNHS-1660220618697)(E: ...

  4. rabbitMq工作模式特性及整合springboot

    因为公司项目后面需要用到mq做数据的同步,所以学习mq并在此记录,这里的是rabbitMq mq(message queue)消息队列 官网:www.rabbitmq.com 使用消息队列的优点:1. ...

  5. 六、springboot整合swagger

    六.springboot整合swagger 简介 swagger 提供最强大,最易用的工具,以充分利用OpenAPI规范. 官网 : https://swagger.io/ 准备工作 pom.xml ...

  6. [RabbitMQ]整合SpringBoot

    整合SpringBoot 创建项目 引入依赖 <dependencies><!--RabbitMQ 依赖--><dependency><groupId> ...

  7. RabbitMQ教程_5 整合SpringBoot

    https://gitee.com/fakerlove/rabbitmq 文章目录 5. 整合SpringBoot 5.1 helloword 模型 引入依赖 创建生产者 创建消费者 目录结构 5.2 ...

  8. rabbitmq消息队列入门到整合springboot(篇幅较长内容详细)

    1.安装rabbitmq服务器 我们选择在linux下安装 安装的前提需要在虚拟机下安装docker docker pull rabbitmq:management(拉去镜像) docker run ...

  9. RabbitMQ原理及SpringBoot整合RabbitMQ

    RabbitMQ原理及SpringBoot整合RabbitMQ 1. RabbitMQ环境搭建 参考:https://blog.csdn.net/u013071014/article/details/ ...

  10. 第十六节 springboot 打包vue代码实现前后端统一部署

    svbadmin学习日志 本学习日志是使用Springboot和Vue来搭建的后台管理系统: 演示地址:http://118.31.68.110:8081/index.html 账号:root 密码: ...

最新文章

  1. Understanding Global Unicast IPv6 Addressing
  2. api接口参数加密_解决API接口开发安全性的四种方案
  3. php中获取网站访客来源的关键词方法
  4. 为啥led灯用一年后暗了很多_想把卤素灯换掉,选LED灯为什么比选氙气灯
  5. ajax将响应结果显示到iframe,JavaScript:iframe / Ajax / JSON
  6. 用Python在Excel里画出蒙娜丽莎
  7. kafka 集群_Kafka集群搭建
  8. Struts2 标签库讲解
  9. asp.net伪静态配置
  10. ARM开发7.3.2 基础实训( 2 ) 单个按键的输入系统设计( 2)--LPC21XX
  11. visio的一些用法
  12. font-family:常用中文字体的英文名称 (宋体 微软雅黑)
  13. 医疗卫生行业中的领域模型
  14. 刘汝佳小白书-最长回文字串
  15. 我的2020年终回顾:人生,海海,破浪前行
  16. 阿里云服务器哪个便宜?
  17. 团购幸存者:团购是个苦生意
  18. 云计算具有哪些特点,主要分为哪几大类型?
  19. [LOJ#3124][CTS2019]氪金手游(概率 + 树形 DP + 容斥)
  20. 高通SM4350平台指纹移植流程

热门文章

  1. cgo的几种使用方式
  2. python——常用的内置函数
  3. [ATF]-ARM级别/异常/状态切回时候的寄存器保存与恢复
  4. 2022-02-07
  5. mac包安装kafka
  6. (24)2-9-9-12分页(上)
  7. 详解虚函数的实现过程之菱形继承修罗场(6)
  8. 2020-11-27(下标寻址和指针寻址)
  9. 告诉你,初学网络安全应该怎样去学呢?安排的明明白白的
  10. Windows驱动开发学习笔记(七)—— 多核同步内核重载