前言

使用mq自带的死信去实现延时消息要注意一个坑点,就是mq只会检测队首的消息的过期时间,假设先放入队列10s过期消息,再放入2s过期。mq会检测头部10s是否过期,10s不过期的情况下,2s就算过去也不会跑到死信.

mq基本的消息模型

mq死信队列的消息模型

简单的说就是先弄一个正常队列,然后不要设置消费者,接着给这个正常队列绑定一个死信队列,这个死信队列设置方式和正常队列没啥区别。然后监听这个死信队列的消费.

一般死信队列由三大核心组件组成:死信交换机+死信路由+TTL(消息过期时间)

通常死信队列由“面向生产者的基本交换机+基本路由”绑定而成,所以生产者首先是将消息发送至“基本交换机+基本路由”所绑定而成的消息模型中,即间接性地进入到死信队列中,当过了TTL,消息将“挂掉”,从而进入下一个中转站,即“面下那个消费者的死信交换机+死信路由”所绑定而成的消息模型中.

maven依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>

配置普通队列和死信队列
yml文件

spring:application:name: ttl-queuerabbitmq:host: 110.40.181.73port: 35672username: rootpassword: 10086virtual-host: /fchanconnection-timeout: 15000# 发送确认#publisher-confirms: true# 路由失败回调publisher-returns: truetemplate:# 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃mandatory: truelistener:simple:# 每次从RabbitMQ获取的消息数量prefetch: 1default-requeue-rejected: false# 每个队列启动的消费者数量concurrency: 1# 每个队列最大的消费者数量max-concurrency: 1# 签收模式为手动签收-那么需要在代码中手动ACKacknowledge-mode: manual
package com.fchan.mq.mqDelay.dlx;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MyRabConfig {//定义普通任务队列(其实就是一个普通队列,只是没有消费者)@Bean("delayQue")public Queue delayQue(){//普通队列绑定死信交换机及路由key//delayQueue延时队列return QueueBuilder.durable("delayQueue")//指定这个延时队列下的死信交换机//如果消息过时则会被投递到当前对应的my-dlx-exchange.withArgument("x-dead-letter-exchange", "my-dlx-exchange")//死信交换机绑定的路由key//如果消息过时,my-dlx-exchange会根据routing-key-delay投递消息到对应的队列//mq发送消息的时候一般指定的是exchange交换机+这个routingKey来找到队列,这个应该是出于有时候需要让mq的交换机将一个消息发送到多个队列所采用的方式.withArgument("x-dead-letter-routing-key", "routing-key-delay").build();}//定义普通队列的交换机@Bean("delayExchange")public Exchange delayExchange(){return ExchangeBuilder.directExchange("delayExchange").build();}//绑定普通队列的交换机@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayExchange") Exchange exchange, @Qualifier("delayQue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("delayBindingRoutingKey").noargs();}//定义死信队列@Bean("dlxQueue")public Queue dlxQueue(){return QueueBuilder.durable("my-dlx-queue").build();}//定义死信交换机,这里的死信交换机要和上面普通队列所绑定的交换机名称一致@Bean("dlxExchange")public Exchange dlxExchange(){return ExchangeBuilder.directExchange("my-dlx-exchange").build();}//绑定交换机与死信队列@Bean("dlxBinding")public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange, @Qualifier("dlxQueue") Queue queue){//这儿这个routingKey要和上面正常队列中所绑定的死信routingKey一致return BindingBuilder.bind(queue).to(exchange).with("routing-key-delay").noargs();}}

死信队列消费者

package com.fchan.mq.mqDelay;import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
public class MyRabbitConsume {Logger log = LoggerFactory.getLogger(MyRabbitConsume.class);@RabbitListener(queues = {"my-dlx-queue"})public void receiveDeadMessage(Map<String,Object> map, Message message, Channel channel) throws Exception {log.info("收到死信信息:{}",map);log.info("然后进行一系列逻辑处理 Thanks♪(・ω・)ノ");//手动确认消息        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}

发送消息测试

/*** 发送消息到延时队列 delayQueue* 消息5秒后会被投递到死信队列* @return*/
@GetMapping("sendMsgToDelay")
public String sendMsgToDelay(String msg){Map<String,Object> map = new HashMap<>();map.put("msg", msg);rabbitTemplate.convertAndSend("delayExchange","delayBindingRoutingKey", map, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//设置消息过期时间,5秒过期message.getMessageProperties().setExpiration("5000");return message;}});return msg;
}

测试成功

利用rabbitMq的死信队列实现延时消息相关推荐

  1. RabbitMQ死信队列,延时队列

    死信队列 消息被消费方否定确认,使用channel.basicNack或channel.basicReject, 并且此时requeue属性被设置为false. 消息在队列的存活时间超过设置的TTL时 ...

  2. RabbitMq(五) -- 死信队列和延迟队列

    1. 死信 1.1 死信的概念 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue ...

  3. RabbitMQ 之死信队列

    文章目录 什么是死信队列 如何配置死信队列 死信消息的变化 死信队列应用场景 总结 什么是死信队列 为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将 ...

  4. RabbitMQ高级特性(五):RabbitMQ之死信队列DLX

    一.死信队列简介 (1)死信队列 死信队列,英文缩写:DLX .Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就 ...

  5. RabbitMQ:死信队列

    ✨ RabbitMQ:死信队列 1.死信队列 1.1死信队列基本介绍 1.2消息成为死信的三种情况 1.3死信队列结构图 1.4死信的处理方式 2.TTL消息过期时间 2.1基本介绍 2.2生产者 2 ...

  6. 6 RabbitMQ之死信队列

    文章目录 1. 案例一:消息TTL过期 2. 案例二:队列达到最大长度 3. 案例三:消息被拒 死信就是无法被消费的消息成为死信.正常情况下,生产者生产的消息投递到交换机,交换机根据routingKe ...

  7. rabbitMQ学习-死信队列

    死信队列 死信:顾名思义就是无法被消费的消息,一般情况下,product将消息投递到broker或者直接到queue里,consumer从queue取出消息,进行消费,但某些时候由于特定的原因导致qu ...

  8. 消息队列之延时消息应用解析及实践

    简介:消息队列常用于实现业务需要的异步.解耦以及削峰功能.但在某些特殊的业务场景中,还需要消息队列服务本身支持一些特殊的消息类型,比如常见的延时消息.本次直播为您深入剖析延时消息的特性.应用场景,对比 ...

  9. java如何保证mq一定被消费,RabbitMQ如何保证队列里的消息99.99%被消费?

    1. 本篇概要 其实,还有1种场景需要考虑:当消费者接收到消息后,还没处理完业务逻辑,消费者挂掉了,那消息也算丢失了?,比如用户下单,订单中心发送了1个消息到RabbitMQ里的队列,积分中心收到这个 ...

最新文章

  1. Access快速连接SQL Server的方法(VB代码为例)
  2. linux 文件的打包和解压
  3. mysql unix 安装教程_在UNIX系统下安装MySQL_MySQL
  4. nginx支持php解析,upstream模块
  5. 一文详解最常用的10个「激活函数」
  6. wordpress模板
  7. 第一台计算机 采用工 作原理,第1讲计算机工作原理模版课件.ppt
  8. python定期自动运行_令人惊叹的8个Python新手工具
  9. QPSK调制与解调(matlab,详细介绍仿真方案的设计、结果及结论、完整代码及注释)
  10. 输入一个三位数,分别输出他的个位十位百位
  11. AI理论知识整理(4)-期望与方差以及联合概率分布
  12. python连连看_Python-连连看
  13. 如何把1个pdf拆分几个pdf
  14. Windows下部署ubuntu16.04+anaconda2.7+tensorflow
  15. 学习记录646@python求解有效年利率
  16. 一个在线测试正则表达式的网站推荐
  17. java 多线程 数据重复,java 多线程 出现数据重复调用有关问题
  18. java实训答辩ppt_实训项目答辩.ppt
  19. Reflex WMS系统里的Team Code
  20. jQuery mobile插件基础知识笔记

热门文章

  1. 模拟算法-桶排-信号塔问题详解+代码——zzx的博客
  2. POJ 1679 解题报告
  3. Win7上的sql server2005安装教程
  4. DolphinScheduler×T3出行 | 打造车联网一站式数据应用交互体验
  5. scratch寄存器作用说明
  6. CorelDRAW 2018 快捷键大全
  7. unity 声音AudioSystem(二)
  8. ATR5179单刀双掷开关芯片替代AS179-92LF
  9. RK3399国产自主4K智能会议终端解决方案
  10. 考研-数学-等差数列公式