1 概述

常用的延迟消息实现方式有:

  • 利用 队列TTL + 死信队列 方式实现

  • 利用消息延迟插件实现

消息变成死信的原因有:​​​​

  • 消息过期。消息TTL或队列TTL

  • 消息被拒绝。消费者调用了 channel.basicNackchannel.basicReject ,并且设置 requeue=false

  • 队列满。

    当设置了最大队列长度或大小并达到最大值时,RabbitMQ 的默认行为是从队列前面丢弃或 dead-letter 消息(即队列中最早的消息)。要修改这种行为,请使用下面描述的 overflow 设置

    overflow

    常见参数说明

2 队列TTL + 死信队列方式

这里直接贴出 rabbitConfig 代码,其他的代码参考该文章:RabbitMQ (三)消息重试

1 RabbitConfig

主要操作:

  1. 创建死信队列和交换器,并绑定

  2. 创建队列,同时设置队列的TTL、绑定死信队列;创建交换器,并绑定,

package com.fmi110.rabbitmq.config;
​
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
​
import java.util.HashMap;
​
​
/*** @author fmi110* @description rabbitMQ 配置类* @date 2021/7/1 15:08*/
@Configuration
@Slf4j
public class RabbitConfig {
​String dlQueueName  = "my-queue-dl"; // 普通队列名称String dlExchangeName = "my-exchange-dl"; // 死信交换器名称String dlRoutingKey   = "rabbit.test";
​String queueName = "retry-queue";String exchangeName = "my-exchange"; // 普通交换器名称
​/*** 创建死信队列** @return*/@Beanpublic Queue queueDL() {
​return QueueBuilder.durable(dlQueueName) // 持久化队列.build();}
​/*** 创建死信交换机** @return*/@Beanpublic TopicExchange exchangeDL() {return new TopicExchange(dlExchangeName, true, false);}
​/*** 绑定操作*/@Beanpublic Binding bindQueueDL2ExchangeDL(Queue queueDL, TopicExchange exchangeDL) {log.info(">>>> 队列与交换器绑定");return BindingBuilder.bind(queueDL).to(exchangeDL).with(dlRoutingKey);}
​/*** 创建持久化队列,同时绑定死信交换器** @return*/@Beanpublic Queue queue() {log.info(">>>> 创建队列 retry-queue");HashMap<String, Object> params = new HashMap<>();params.put("x-dead-letter-exchange", dlExchangeName);params.put("x-dead-letter-routing-key", dlRoutingKey);
​params.put("x-message-ttl", 10 * 1000); // 队列过期时间 10s
​return QueueBuilder.durable(queueName) // 持久化队列.withArguments(params) // 关联死信交换器.build();}
​
​/*** 创建交换机** @return*/@Beanpublic TopicExchange exchange() {log.info(">>>> 创建交换器 my-exchange");boolean durable    = true; // 持久化boolean autoDelete = false; // 消费者全部解绑时不自动删除return new TopicExchange(exchangeName, durable, autoDelete);}
​/*** 绑定队列到交换机** @param queue* @param exchange* @return*/@Beanpublic Binding bindQueue2Exchange(Queue queue, TopicExchange exchange) {log.info(">>>> 队列与交换器绑定");return BindingBuilder.bind(queue).to(exchange).with("rabbit.test");}
​
}

2 RabbitConsumer 消费者

延迟消息通过队列的TTL产生,所以这里不应该设置普通队列的消费者,让消息过期然后自动转入死信队列,此时再进行消费以此实现延迟消息

package com.fmi110.rabbitmq;
​
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
​
import java.util.concurrent.atomic.AtomicInteger;
​
​
/*** @author fmi110* @description 消息消费者* @date 2021/7/1 16:08*/
@Component
@Slf4j
public class RabbitConsumer {/*** 死信队列消费者* @param data* @param channel* @param tag* @throws Exception*/@RabbitListener(queues="my-queue-dl")public void consumeDL(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");log.info(">>>> {} 死信队列消费 tag = {},消息内容 : {}", dateFormat.format(new Date()), tag, data);}
}

3 弊端

如上图所示实现了延迟10s的消息,但是如果需要实现延迟5s的消息,则需要新建一个TTL为5s的队列,所以如果延迟时间需要很多的话,就需要创建很多队列,实现起来比较麻烦。

再贴一段对消息设置TTL的代码:

   AtomicInteger aint = new AtomicInteger();public void send(String msg) {String exchangeName = "my-exchange";String routingKey   = "rabbit.test";// rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
​MessageProperties messageProperties = new MessageProperties();messageProperties.setCorrelationId(UUID.randomUUID().toString().replace("-", ""));// TTL 为5sint i = 9 * 1000;
​if (aint.incrementAndGet() % 2 == 0) {i = 5 * 1000;}msg = "message send at " + dateFormat.format(new Date()) +", expired at "+dateFormat.format(new Date().getTime()+i);messageProperties.setExpiration(String.valueOf(i)); // 设置过期时间Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);rabbitTemplate.send(exchangeName, routingKey, message);}

可以看到消息的过期时间与期望的不一致。因为队列是先进后出的,只有在头部的消息,系统才对其进行过期检测。所以如果消息不再队列头部,即使时间已经过期,也不会导致消息进入死信队列!!!

当同时设置了消息的TTL和队列的TTL时,过期时间谁小谁生效(队列头部的消息才进行TTL检测)。

3 使用延迟插件实现

插件的安装参考 docker安装rabbitMQ

1 RabbitConfig

使用延迟插件实现,需要创建延迟交换器,使用 CustomExchange 类创建,同时指定交换器类型为 x-delayed-message ,此外还需要设置属性 x-delayed-type ,创建的交换器如下图所示

package com.fmi110.rabbitmq.config;
​
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
​
import java.util.HashMap;
​
/*** @author fmi110* @description 配置交换器、队列* @date 2021/7/3 9:58*/
@Slf4j
@Configuration
public class RabbitConfig2 {
​String exchangeName = "delay-exchange";String queueName    = "delay-queue";String exchangeType = "x-delayed-message";
​@Beanpublic CustomExchange exchange() {
​HashMap<String, Object> args = new HashMap<>();args.put("x-delayed-type", "topic");return new CustomExchange(exchangeName, exchangeType, true, false, args);}
​@Beanpublic Queue queue() {return new Queue(queueName, true, false, false);}
​@Beanpublic Binding binding(CustomExchange exchange, Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("rabbit.delay").noargs();
​}
}

2 RabbitProducer

这里开启了消息投递失败回调。测试中发现,使用延迟插件,虽然消息正常投递了,但是始终会报 “NO_ROUTER” 提示路由失败。虽然不影响功能。运行截图见后文。目前不确定是我设置问题还是框架的问题...

package com.fmi110.rabbitmq;
​
import com.rabbitmq.client.AMQP;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
​
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
​
/*** @author fmi110* @description 消息生产者* @date 2021/7/1 15:08*/
@Component
@Slf4j
public class RabbitProducer {@AutowiredRabbitTemplate rabbitTemplate;
​/*** 1 设置 confirm 回调,消息发送到 exchange 时回调* 2 设置 return callback ,当路由规则无法匹配到消息队列时,回调* <p>* correlationData:消息发送时,传递的参数,里边只有一个id属性,标识消息用*/@PostConstructpublic void enableConfirmCallback() {// #1/*** 连接不上 exchange或exchange不存在时回调*/rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (!ack) {log.error("消息发送失败");// TODO 记录日志,发送通知等逻辑}});
​// #2/*** 消息投递到队列失败时,才会回调该方法* message:发送的消息* exchange:消息发往的交换器的名称* routingKey:消息携带的路由关键字信息*/rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.error("{},exchange={},routingKey={}",replyText,exchange,routingKey);// TODO 路由失败后续处理逻辑});}
​public void sendDelayMsg(String delay) {int               delayInt          = StringUtils.isEmpty(delay) ? 0 : Integer.valueOf(delay);String            exchangeName      = "delay-exchange";String            routingKey        = "rabbit.delay";SimpleDateFormat  dateFormat        = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String            msg               = "message send at " + dateFormat.format(new Date()) + ", expired at " + dateFormat.format(new Date().getTime() + delayInt * 1000);
//        MessageProperties messageProperties = new MessageProperties();
//        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 持久化消息
//        messageProperties.setDelay(delayInt * 1000);
//        Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);
//        rabbitTemplate.send(exchangeName,routingKey,message);
​rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, message ->{message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);  //消息持久化message.getMessageProperties().setDelay(delayInt * 1000);   // 单位为毫秒return message;});
​}
}

3 RabbitConsumer

消费者,指定监听对应的消息队列即可。

package com.fmi110.rabbitmq;
​
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
​
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;
​
​
/*** @author fmi110* @description 消息消费者* @date 2021/7/1 16:08*/
@Component
@Slf4j
public class RabbitConsumer {
​@RabbitListener(queues="delay-queue")public void consumeDelay(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");log.info(">>>> {} 延迟队列消费 tag = {},消息内容 : {}", dateFormat.format(new Date()), tag, data);}
}

4 controller

package com.fmi110.rabbitmq.controller;
​
​
import com.fmi110.rabbitmq.RabbitProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import sun.rmi.runtime.Log;
​
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
@Slf4j
@RestController
public class TestController {@AutowiredRabbitProducer rabbitProducer;
​@GetMapping("/delay")public Object delay(String delay) {rabbitProducer.sendDelayMsg(delay); // 发送消息HashMap<String, Object> result = new HashMap<>();result.put("code", 0);result.put("msg", "success");return result;}
}

5 依赖

    
    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
​<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
​<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>compile</scope></dependency>

6 运行截图

RabbitMQ (四)实现延迟消息相关推荐

  1. redis延迟消息队列不准时php,Redis实现延迟消息队列

    消息队列是应用中常用的一个技术点,通常我们可以借助消息队列中间件来实现,但是并不是所有的情况下,都需要使用到MQ. 如果只需要实现简单的消息队列,那么借助Redis即可. 如果对消息有着严格的可靠性等 ...

  2. 【重难点】【RabbitMQ 01】消息队列的作用、主流的消息队列、RabbitMQ 基于什么传输消息、RabbitMQ 模型架构、死信队列和延迟队列

    [重难点][RabbitMQ 01]消息队列的作用.主流的消息队列.RabbitMQ 基于什么传输消息.RabbitMQ 模型架构.死信队列和延迟队列 文章目录 [重难点][RabbitMQ 01]消 ...

  3. 17 个方面,综合对比 Kafka、RabbitMQ、RocketMQ、ActiveMQ 四个分布式消息队列

    点击上方"方志朋",选择"置顶公众号" 技术文章第一时间送达! 作者:28cm不含头(来自:知乎) 原文链接: https://www.zhihu.com/qu ...

  4. RabbitMQ 延迟消息的极限是多少?

    点击蓝色"程序猿DD"关注我 回复"资源"获取独家整理的学习资料! 之前在写Spring Cloud Stream专题内容的时候,特地介绍了一下如何使用Rabb ...

  5. Spring Cloud Stream 使用延迟消息实现定时任务(RabbitMQ)

    应用场景 我们在使用一些开源调度系统(比如:elastic-job等)的时候,对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次.然而实际业务中还存在另外一种定时 ...

  6. rabbitmq 取消消息_SpringBoot整合RabbitMQ实现延迟消息

    ## RabbitMQ RabbitMQ是一个被广泛使用的开源消息队列.它是轻量级且易于部署的,它能支持多种消息协议.RabbitMQ可以部署在分布式和联合配置中,以满足高规模.高可用性的需求. R ...

  7. spring boot rabbitmq_Spring Boot+RabbitMQ 实现延迟消息实现完整版,实用!

    本文同步Java知音社区,专注于Java 作者:Sam哥哥http://blog.csdn.net/linsongbin1/article/details/80178122 概述 曾经去网易面试的时候 ...

  8. mall整合RabbitMQ实现延迟消息

    摘要 本文主要讲解mall整合RabbitMQ实现延迟消息的过程,以发送延迟消息取消超时订单为例.RabbitMQ是一个被广泛使用的开源消息队列.它是轻量级且易于部署的,它能支持多种消息协议.Rabb ...

  9. RabbitMQ延迟消息的极限是多少?

    之前在写Spring Cloud Stream专题内容的时候,特地介绍了一下如何使用RabbitMQ的延迟消息来实现定时任务.最近正好因为开发碰到了使用过程中发现,延迟消息没有效果,消息直接就被消费了 ...

最新文章

  1. ASP.NET中Web DataGrid的使用指南-转
  2. ini配置文件的读取
  3. break与continue
  4. java递归整数逆序,将一个整数逆序输出,分别给出递归和非递归算法 | 学步园...
  5. 转】R利剑NoSQL系列文章 之 Hive
  6. JSP的概念||原理||JSP的脚本||JSP的内置对象||response.getWriter()和out.write()的区别||案例:改造Cookie案例
  7. bzoj4563放棋子
  8. 工厂三兄弟之抽象工厂模式
  9. 头回遇见网上找不到的问题,“缺少实例ID,实例ID是必需的”
  10. 15、java中的集合(2)
  11. python中如何编写代码输入多个数据并把它们放在一个列表中去_10分钟学习函数式Python...
  12. 求一个数是几位数,并求每位数相加的和
  13. 软件的接口设计图_基于GJB 5000A的软件配置管理研究与系统实现
  14. springboot在启动jar由于配置hibernate的映射文件上classpath导致的!BOOT-INF/classes/!路径出现!号问题解决方法
  15. Spring Cloud Feign 1(声明式服务调用Feign 简介)
  16. Linux服务之cobbler批量部署篇
  17. java色_JavaSE是什么
  18. java课程心得_Java课程感想
  19. 【OFDM通信】基于块状导频的信道估计算法仿真含Matlab源码
  20. Bing搜索崩了“无法访问”解决方案

热门文章

  1. 长此以往的发展,以BCH为代表的数字货币终将会为自己正名
  2. linux/Docker
  3. 我的网站搭建 (第十七天) celery 定时刷新缓存
  4. python_bomb----函数高级特性(生成器)
  5. GP通过外部表装载数据时遇到ERROR:extra data after last expected column解决方法
  6. Linux 文件系统权限(一)
  7. 移动端与PHP服务端接口通信流程设计(基础版)
  8. 手把手教你如何建立自己的Linux系统(二)
  9. 谢文: 三网融合还是三网凑合(转一篇好文)
  10. 脊柱是导致身体生病的重要原因