一、场景

当涉及到需要延时处理的业务,比如订单30分钟后过期,2小时后操作业务数据等操作,这里选择用MQ的延时队列+插件来处理,本文记录具体代码实现供参考。

二、代码配置

统一集成在common包中,供各服务集成调用。

1.  mq   config配置

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;/*** 描述:RabbitMQ配置类** @author: winy_work* @date: 2022-08-24 9:50*/
@Configuration
public class RabbitMQConfig {/*** 模板配置* @param connectionFactory* @return*/@Bean/** 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置 */@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {CachingConnectionFactory cachingConnectionFactory = (CachingConnectionFactory) connectionFactory;/** 如果要进行消息回调,则这里必须要设置为true */cachingConnectionFactory.setPublisherConfirms(true);//rabbitmq心跳20scachingConnectionFactory.setRequestedHeartBeat(20);// 设置自动恢复cachingConnectionFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);// 设置 每10s ,重试一次cachingConnectionFactory.getRabbitConnectionFactory().setNetworkRecoveryInterval(10);// 设置不重新声明交换器,队列等信息。cachingConnectionFactory.getRabbitConnectionFactory().setTopologyRecoveryEnabled(false);cachingConnectionFactory.setChannelCacheSize(50);RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);return template;}
}

2.  交换机 exchange 声明 和 队列声明,绑定

import com.bossien.common.constants.RabbitMQConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** 描述:RabbitMQ 延时队列声明集合类** @author: winy_work* @date: 2023-08-24 9:50*/
@Configuration
public class RabbitMQDelayedDeclare {/*** 声明交换机** @return*/@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>(2);// 交换机类型args.put("x-delayed-type", "direct");return new CustomExchange(RabbitMQConstant.DELAYED_EXCHANGE,"x-delayed-message",true,false,args);}/*** 声明队列** @return*/@Beanpublic Queue delayedQueue() {Queue queue = new Queue(RabbitMQConstant.DELAYED_QUEUE, true, false, false);return queue;}/*** 绑定交换机** @param delayedQueue* @param delayedExchange* @return*/@Beanpublic Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue delayedQueue,@Qualifier("delayedExchange") CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(RabbitMQConstant.DELAYED_QUEUE).noargs();}}

3. 发送消息生产(工具)类

import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;/*** 消息发送服务接口** @author winy_work*/
@Component
public class RabbitMQProducer {private final static Logger logger = LoggerFactory.getLogger(RabbitMQProducer.class);@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送延时消息到队列(定时需求的才用)** @param message     消息内容* @param delayedTime 延时时间 单位毫秒*/public void sendDelayMsg(Object message, Integer delayedTime) {String correlationDataId = UUID.randomUUID().toString();String msgStr = JSONObject.toJSONString(message);logger.info("messageId is " + correlationDataId + ",消息内容=" + msgStr);if (msgStr == null) {logger.error("messageId is " + correlationDataId + ",send message failed: message is null");return;}Message messageObj = MessageBuilder.withBody(msgStr.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();// 消息发送rabbitTemplate.convertAndSend(RabbitMQConstant.DELAYED_EXCHANGE,RabbitMQConstant.DELAYED_QUEUE,messageObj,(msg) -> {msg.getMessageProperties().setDelay(delayedTime);return msg;});}
}

4. 注入bean, 交给spring管理, 支持bean注入调用

common resource下新建文件夹 META-INF,  创建文件  spring.factories, 添加内容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\com.bossien.common.core.rabbitmq.RabbitMQConfig,\com.bossien.common.core.rabbitmq.RabbitMQProducer,\com.bossien.common.core.rabbitmq.RabbitMQDelayedDeclare

5. 上面队列常量类涉及常量  RabbitMQConstant

    /*** 延时队列专用交换机*/public static final String DELAYED_EXCHANGE = "delayed.exchange";/*** 延时队列*/public static final String DELAYED_QUEUE = "delayed.queue";

三、消息生产者测试类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** 描述:mq测试控制类** @author: winy_work* @date: 2022-08-22 14:12*/
@RestController
@RequestMapping("/rabbitmq")
public class TestRabbitmqController {@Autowiredprivate RabbitMQProducer producer;/*** 测试延时队列场景* @return*/@GetMapping("/testRabbitmqDelayed")public Response testRabbitmqDelayed(){// 如果正常情况,那么消费者会根据时间从小到大依次消费执行producer.sendDelayMsg("测试延时队列-5s",5000);producer.sendDelayMsg("测试延时队列-30s",30000);producer.sendDelayMsg("测试延时队列-20s",20000);producer.sendDelayMsg("测试延时队列-15s",15000);producer.sendDelayMsg("测试延时队列-2s",2000);return new Response().success("发送成功!");}}

四、消息消费者测试类

import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 测试mq监听类** @author winy_work* @desc*/
@Component
public class TestListenter {private final static Logger logger = LoggerFactory.getLogger(TestListenter.class);/*** 消费测试延时队列* @param message*/@RabbitListener(queues = RabbitMQConstant.DELAYED_QUEUE)public void handleDelayedMessage(Object obj) {logger.info("mq接收到信息:message={}", obj);}
}

五、注意:上面四步配置完成后工程是启动不起来的,因为声明延时队列的 类型 

x-delayed-messageMQ中还没有。需要安装该插件。

插件下载地址:这里需要根据自己安装的mq版本号来下载对应的插件(.ez 文件)。如下图是3.10.2版本的插件。

Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub

六、插件安装

下面的地址根据自己的安装路径替换。试了不重启也可以。可以不重启。

# RabbitMQ 的安装目录
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins
# 将下载的插件放到 RabbitMQ 安装目录的 plugins 目录下
cp /usr/local/rabbitmq/rabbitmq_delayed_message_exchange-3.10.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins
# 安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 重启 RabbitMQ
systemctl restart rabbitmq-server

安装成功后登录MQ控制台,点击exchange  tab页面,查看是否多了如下图选择项,如有,则安装成功。

七、启动程序,调用rest接口测试   localhost:8080/rabbitmq/testRabbitmqDelayed

可以看到控制台打印日志,是按照时间从小到大依次执行的。2s  5s  15s  20s 30s

至此,延时队列就可以成功使用了。

Springcloud集成 RabbitMQ延时队列相关推荐

  1. docker安装rabbitmq延时队列插件

    docker安装rabbitMQ延时队列插件(delayed_message_exchange) 1. 查找Docker容器中的RabbitMQ镜像 docker ps -a [root@linux ...

  2. springboot集成rabbitmq死信队列的延时队列使用

    目录         1.自动分列延时队列 2.应答失败自动转储延时再通知机制 ------------------------------------------------------------ ...

  3. RabbitMQ延时队列原理讲解

    RabbitMQ延时消息队列 延时队列介绍 延时队列即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费. 那么,为什么需要延迟消费呢?我们来看以下的场景 网上商城下订单后30分钟 ...

  4. java实现rabbitMQ延时队列详解以及spring-rabbit整合教程

    在实际的业务中我们会遇见生产者产生的消息,不立即消费,而是延时一段时间在消费.RabbitMQ本身没有直接支持延迟队列功能,但是我们可以根据其特性Per-Queue Message TTL和 Dead ...

  5. SpringBoot集成RabbitMQ消息队列搭建与ACK消息确认入门

    1.Windows下安装RabbitMQ的步骤详解+图解(erlang+RabbitMQ) 2.SpringBoot集成RabbitMQ参考文章 1.RabbitMQ介绍 RabbitMQ是实现AMQ ...

  6. RabbitMQ延时队列

    1.检查是否安装延时插件 : rabbitmq_delayed_message_exchange rabbitmq-plugins list 若没有就去下载 https://www.rabbitmq. ...

  7. 详解SpringCloud中RabbitMQ消息队列原理及配置,一篇就够!

    作者:kosamino cnblogs.com/jing99/p/11679426.html 一.MQ用途 1.同步变异步消息 场景:用户下单完成后,发送邮件和短信通知. 运用消息队列之后,用户下单完 ...

  8. RabbitMQ第二话 -- Springboot基于四种Exchange(Direct、Fanout、Topic、Heders、延时队列)的实现和多虚拟主机下的生产消费者实现

    本文主要分享RabbitMQ exchange类型的功能和使用.RabbitMQ延时队列.一个springboot服务发送消息到多虚拟主机 1.RabbitMQ exchange exchange交换 ...

  9. Docker安装RabbitMQ并安装延时队列插件

    一.RabbitMQ简介 RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消 ...

最新文章

  1. mysql 集群实践_MySQL Cluster集群探索与实践
  2. python mp4提取音频加入另一段视频_使用 PHP-FFMpeg 操作视频/音频文件
  3. 【Mysql】Mysql root 权限下无法创建数据库
  4. 【Android 逆向】ptrace 函数 ( C 标准库 ptrace 函数简介 | ptrace 函数真实作用 )
  5. 在deepin系统中制作桌面快捷方式
  6. boost::multi_array模块确保 multi_arrays 与 STL 容器一起使用
  7. java date 最小值_java – Datepicker和timepicker – 设置最大值和最小值
  8. python编程选股_用Python选一个自己的股票池2
  9. javascript中对象的运用
  10. 李彦宏说互联网思维已过时,AI可以根本上变革交通、城市、农业和医疗
  11. pandas小记:pandas索引和选择
  12. [JarvisOj][XMAN]lTell Me Something
  13. 数据库读写分离这个坑,你应该踩过吧?
  14. 零基础Python知识点回顾(三)
  15. 理顺8个版本vue的区别
  16. Unity3D视频教程,Unity3D从入门到精通视频教程
  17. 经验正交函数分析(EOF)或主成分分析(PCA)在matlab上的实现及实例
  18. 二元函数对xy同时求导_呆哥数学每日一题 ——求多元函数最小值
  19. thoughtworks业务需求分析师面试总结
  20. 数据处理与机器学习(大致进行了解学习)

热门文章

  1. C# binding textbox 获取编辑后的 设置mode=twoway
  2. java 城市多音字处理
  3. python基于PHP+MySQL的汽车零配件生产企业ERP生产管理子系统
  4. idea中摸鱼插件_idea中那些好用到飞起的插件,偷懒神器
  5. uni-app学习日记3
  6. 绿色数治开采工艺: 3D 可视化智慧矿山
  7. WORD里面插入图片只能显示下面很小一部分
  8. 抖音评论功能简单实现 (小程序版)
  9. 信捷plc和台达变频器通信程序通过信捷xc3的modbus通信控制台达vfd-m变频器的正转
  10. WinServer 2019 组策略 开启远程桌面