死信队列实现篇,参考文章:【SpringBoot】60、SpringBoot中整合RabbitMQ实现延时队列(死信队列篇)

一、介绍

  • 1、什么是延时队列?
    延时队列即就是放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费
  • 2、适用场景
    (1)商城订单超时未支付,取消订单
    (2)使用权限到期前十分钟提醒用户
    (3)收益项目,投入后一段时间后产生收益

二、实现方式

从以上场景中,我们可以看出,延时队列的主要功能就是在指定的时间之后做指定的事情,那么,我们思考有哪些工具我们可以使用?

  • 1、Redis 监听过期 Key

可以参考我的博客【SpringBoot】三十五、SpringBoot整合Redis监听Key过期事件

https://lizhou.blog.csdn.net/article/details/109238083
  • 2、RabbitMQ等实现延时队列

这也是本片文章中要讲的知识点,使用 RabbitMQ 实现延时队列有两种方式

(1)利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)
(2)利用 RabbitMQ 中的插件 x-delay-message

本文主要讲解第二种方式,使用插件的方式

三、下载插件

RabbitMQ 实现了一个插件 x-delay-message 来实现延时队列,我们可以从 这里 下载到它

https://www.rabbitmq.com/community-plugins.html

选择 rabbitmq_delayed_message_exchange 插件,如图所示


选择 .ez 格式的文件下载,下载后放置 RabbitMQ 的安装目录下的 plugins 目录下,如我的路径为

D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.16\plugins

执行命令

cd D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.16\sbinrabbitmq-plugins enable rabbitmq_delayed_message_exchange

安装插件完成

四、在SpringBoot整合RabbitMQ

1、引入 RabbitMQ 依赖

<!-- rabbitmq消息队列 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置 RabbitMQ 信息

spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestlistener:simple:# 手动ACK 不开启自动ACK模式,目的是防止报错后未正确处理消息丢失 默认 为 noneacknowledge-mode: manual

3、RabbitMQ 常量类

package com.asurplus.common.rabbitmq;/*** rabbit常量类** @Author Lizhou*/
public final class RabbitConst {/*** 交换机*/public static final String DELAY_EXCHANGE = "delay_exchange";/*** 队列*/public static final String DELAY_QUEUE = "delay_queue";/*** 路由*/public static final String DELAY_KEY = "delay_key";}

4、RabbitMQ 配置类

package com.asurplus.common.rabbitmq;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** rabbitmq配置类** @Author Lizhou*/
@Configuration
public class RabbitConfig {/*** 延时队列交换机** @return*/@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(RabbitConst.DELAY_EXCHANGE, "x-delayed-message", true, false, args);}/*** 延时队列** @return*/@Beanpublic Queue delayQueue() {return new Queue(RabbitConst.DELAY_QUEUE, true);}/*** 给延时队列绑定交换机** @return*/@Beanpublic Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {return BindingBuilder.bind(delayQueue).to(delayExchange).with(RabbitConst.DELAY_KEY).noargs();}
}

5、RabbitMQ 生产者

package com.asurplus.common.rabbitmq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** rabbitMq生产者** @Author Lizhou*/
@Component
@Slf4j
public class RabbitProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送消息** @param object      发送对象* @param millisecond 延时(毫秒)*/public void sendDelayMessage(Object object, long millisecond) {this.rabbitTemplate.convertAndSend(RabbitConst.DELAY_EXCHANGE,RabbitConst.DELAY_KEY,object.toString(),message -> {message.getMessageProperties().setHeader("x-delay", millisecond);return message;});}
}

6、RabbitMQ 消费者

package com.asurplus.common.rabbitmq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;/*** activeMq消费者** @Author Lizhou*/
@Component
@Slf4j
public class RabbitConsumer {/*** 接收消息** @param object 监听的内容*/@RabbitListener(queues = RabbitConst.DELAY_QUEUE)public void cfgUserReceiveDealy(Object object, Message message, Channel channel) throws IOException {// 通知 MQ 消息已被接收,可以ACK(从队列中删除)了channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);try {log.info("接受消息:{}", object.toString());} catch (Exception e) {log.error(e.getMessage());/*** basicRecover方法是进行补发操作,* 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到,* 设置为false是只补发给当前的consumer*/channel.basicRecover(false);}}
}

五、测试

package com.asurplus;import com.asurplus.common.rabbitmq.RabbitProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@SpringBootApplication
@RestController
public class RabbitmqApplication {@Autowiredprivate RabbitProducer product;@GetMapping("init")public void init() {String message1 = "这是第一条消息";String message2 = "这是第二条消息";product.sendDelayMessage(message1, 5000);product.sendDelayMessage(message2, 10000);}public static void main(String[] args) {SpringApplication.run(RabbitmqApplication.class, args);}}

通过测试,第一条消息在 5s后接收到,第二条消息在 10s后接收到,说明我们的延时队列已经成功

如您在阅读中发现不足,欢迎留言!!!

【SpringBoot】43、SpringBoot中整合RabbitMQ实现延时队列(延时插件篇)相关推荐

  1. SpringAMQP整合RabbitMQ使用---不同队列模型的具体使用

    SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便. SpringAmqp的官方地址:https://spring.io/pr ...

  2. docker rabbitmq 安装 延迟队列 rabbitmq_delayed_message_exchange 插件

    以  官方的 rabbitmq:3.9.20-management 为例,默认开启的插件有4个,如下所示 2022-07-09 21:31:55.624125+08:00 [info] <0.8 ...

  3. RabbitMQ的6种工作模式的学习记录,普通MAVEN和springboot项目整合rabbitmq的API详解

    1.RabbitMQ后台管理页面 2.RabbitMQ 核心(自我理解) 3.RabbitMQ6种工作模式介绍 4. RabbitMQ的消息可靠性 5.RabbitMQ普通MAVEN项目使用 6.Sp ...

  4. RabbitMQ 第一天 基础 6 SpringBoot 整合RabbitMQ

    RabbitMQ [黑马程序员RabbitMQ全套教程,rabbitmq消息中间件到实战] 文章目录 RabbitMQ 第一天 基础 6 SpringBoot 整合RabbitMQ 6.1 Sprin ...

  5. springboot集成rabbitmq死信队列的延时队列使用

    目录         1.自动分列延时队列 2.应答失败自动转储延时再通知机制 ------------------------------------------------------------ ...

  6. RabbitMQ自学之路(九)——RabbitMQ实现延时队列的两种方式

    一.什么是延时队列 延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费. 二.延时队列应用于什么场景 场景一:在订单系统中,一个用户下单之后通常有30分钟的时间 ...

  7. RabbitMQ通过TTL和DLX实现延时队列

    RabbitMQ实现延时队列 一.介绍 1.TTL 如何设置TTL(2种方式): 2.Dead Letter Exchanges 二.实现延时队列的思路 三.SpringBoot+RabbitMQ实现 ...

  8. Spring Boot中使用RabbitMQ

    很久没有写Spring Boot的内容了,正好最近在写Spring Cloud Bus的内容,因为内容会有一些相关性,所以先补一篇关于AMQP的整合. Message Broker与AMQP简介 Me ...

  9. rabbitmq使用_Spring Boot中使用RabbitMQ

    Message Broker与AMQP简介 Message Broker是一种消息验证.传输.路由的架构模式,其设计目标主要应用于下面这些场景: 消息路由到一个或多个目的地 消息转化为其他的表现方式 ...

最新文章

  1. php mvc cms企业站,Phpcms V9程序目录结构及MVC简析
  2. php监听网页日志,如何用php程序监听一个不断增长的日志文件
  3. oracle12c ora 12547,Oracle 12c DBCA出现PRCR-1079 ORA-12547 CRS-5017
  4. 分布式事务模型--TCC
  5. node ajax validator,node/express 4:在ajax post上使用express-validator显示错误
  6. mysql 添加最高权限设置_mysql 添加用户并设置权限
  7. Linux(RedHat)下Weblogic 12C静默安装
  8. 智能DNS+双线机房
  9. 8086状态标志寄存器含义
  10. 使用UML工具分析类图与类的关系-bouml(java和C++)
  11. css3中3d旋转中rotatex,rotatey,rotatez的旋转正方向
  12. python sort sorted 排序详解
  13. 深度剖析redis缓存穿透,缓存击穿,缓存雪崩原因+解决方案
  14. 关于idea中springboot主启动类没有绿色启动的问题
  15. 【Mac】使用Karabiner 映射输入法快捷键
  16. linux 把进程调到前台,【如何将后台运行的程序转到前台来?】
  17. 循环辅助:continue和break
  18. ARCore之路-前言
  19. Keil软件常见配置
  20. 字节跳动人力资源体系 附下载

热门文章

  1. shell脚本编程笔记(九)—— 初识流编辑器 sed
  2. 大数据之路——阿里巴巴大数据实践:总述
  3. (一)ProxmoxVE 初识
  4. 怎么联系vue客服_Vue在线客服系统【开源项目】
  5. linux自解压执行程序,如何在Linux中使用shar创建自解压文件
  6. 关于centOS7在U盘安装时遇到的dracut-initqueue[]:Warning:dracut-inituenue timeout....查找不到文件无法安装系统的问题的解决办法。
  7. 基于Python+django的航班查询与推荐-计算机毕业设计
  8. sketchflow_使用SketchFlow进行快速原型制作
  9. AP计算机自学笔记:方法参数
  10. 2017 多校训练第二场 HDU 6047 Maximum Sequence(贪心+优先队列)