一、简介

死信交换机(Dead-Letter-Exchange):当消息在一个队列中由于过期、被拒绝等原因变成死信 (dead message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是死信交换机,绑定死信交换机的队列就称之为死信队列。

  • 判断一个消息是否是死信消息(Dead Message)的依据:
  • a. 消息被拒绝(Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false;
  • b. 消息过期; 消息过期时间设置主要有两种方式:
  • 1.设置队列的过期时间,这样该队列中所有的消息都存在相同的过期时间(在队列申明的时候使用 x-message-ttl 参数,单位为 毫秒);
  • 2.单独设置某个消息的过期时间,每条消息的过期时间都不一样;(设置消息属性的 expiration 参数的值,单位为 毫秒);
  • 3.如果同时使用了两种方式设置过期时间,以两者之间较小的那个数值为准;
  • c. 队列已满(队列满了,无法再添加消息到mq中);
使用方法:申明队列的时候设置 x-dead-letter-exchange 参数
备份交换器(alternate-exchange):未被正确路由的消息将会经过此交换器
使用方法:申明交换器的时候设置 alternate-exchange 参数

二、使用方法

下面通过一个简单的示例说明在RabbitMQ中死信队列和备份交换机的使用方法。

【a】示意图:

【b】RabbitMQ配置信息,绑定交换器、队列、路由键设置

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;/*** @Description: RabbitMQ配置信息,绑定交换器、队列、路由键设置* @author: weishihuai* @Date: 2019/6/27 15:38* <p>* 说明:* <p>* 死信交换机(Dead-Letter-Exchange): 当消息变成一个死信之后,如果这个消息所在的队列存在x-dead-letter-exchange参数,那么它会被发送到x-dead-letter-exchange对应值的交换器上* <p>* 使用方法:申明队列的时候设置 x-dead-letter-exchange 参数* <p>* 判断一个消息是否是死信消息(Dead Message)的依据:* a. 消息被拒绝(Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false;* b. 消息过期; 消息过期时间设置主要有两种方式:*      1.设置队列的过期时间,这样该队列中所有的消息都存在相同的过期时间(在队列申明的时候使用 x-message-ttl 参数,单位为 毫秒)*      2.单独设置某个消息的过期时间,每条消息的过期时间都不一样;(设置消息属性的 expiration 参数的值,单位为 毫秒)*      3.如果同时使用了两种方式设置过期时间,以两者之间较小的那个数值为准;* c. 队列已满(队列满了,无法再添加数据到mq中);* <p>* 备份交换器(alternate-exchange):未被正确路由的消息将会经过此交换器* 使用方法:申明交换器的时候设置 alternate-exchange 参数*/
@Component
public class RabbitMQConfig {private static final String MESSAGE_BAK_QUEUE_NAME = "un_routing_queue_name";private static final String MESSAGE_BAK_EXCHANGE_NAME = "un_routing_exchange_name";private static final String DEAD_LETTERS_QUEUE_NAME = "dead_letters_queue_name";private static final String DEAD_LETTERS_EXCHANGE_NAME = "dead_letters_exchange_name";private static final String QUEUE_NAME = "test_dlx_queue_name";private static final String EXCHANGE_NAME = "test_dlx_exchange_name";private static final String ROUTING_KEY = "user.add";/*** 声明备份队列、备份交换机、绑定队列到备份交换机* 建议使用FanoutExchange广播式交换机*/@Beanpublic Queue msgBakQueue() {return new Queue(MESSAGE_BAK_QUEUE_NAME);}@Beanpublic FanoutExchange msgBakExchange() {return new FanoutExchange(MESSAGE_BAK_EXCHANGE_NAME);}@Beanpublic Binding msgBakBinding() {return BindingBuilder.bind(msgBakQueue()).to(msgBakExchange());}/*** 声明死信队列、死信交换机、绑定队列到死信交换机* 建议使用FanoutExchange广播式交换机*/@Beanpublic Queue deadLettersQueue() {return new Queue(DEAD_LETTERS_QUEUE_NAME);}@Beanpublic FanoutExchange deadLettersExchange() {return new FanoutExchange(DEAD_LETTERS_EXCHANGE_NAME);}@Beanpublic Binding deadLettersBinding() {return BindingBuilder.bind(deadLettersQueue()).to(deadLettersExchange());}/*** 声明普通队列,并指定相应的备份交换机、死信交换机*/@Beanpublic Queue queue() {Map<String, Object> arguments = new HashMap<>(10);//指定死信发送的Exchangearguments.put("x-dead-letter-exchange", DEAD_LETTERS_EXCHANGE_NAME);return new Queue(QUEUE_NAME, true, false, false, arguments);}@Beanpublic Exchange exchange() {Map<String, Object> arguments = new HashMap<>(10);//声明备份交换机arguments.put("alternate-exchange", MESSAGE_BAK_EXCHANGE_NAME);return new DirectExchange(EXCHANGE_NAME, true, false, arguments);}@Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY).noargs();}}

【c】生产者:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;/*** @Description: 生产者* @author: weishihuai* @Date: 2019/6/27 15:59*/
@Component
public class Producer {private static final Logger logger = LoggerFactory.getLogger(Producer.class);private static final String EXCHANGE_NAME = "test_dlx_exchange_name";private static final String ROUTING_KEY = "user.add";private static final String UN_ROUTING_KEY = "user.delete";@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage() {// 发送10条能够正确被路由的消息for (int i = 1; i <= 10; i++) {String message = "发送第" + i + "条消息.";rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message, new CorrelationData(UUID.randomUUID().toString()));logger.info("【发送了一条能够正确被路由的消息】,exchange:[{}],routingKey:[{}],message:[{}]", EXCHANGE_NAME, ROUTING_KEY, message);}// 发送两条不能正确被路由的消息,该消息将会被转发到我们指定的备份交换器中for (int i = 1; i <= 2; i++) {String message = "不能正确被路由的消息" + i;rabbitTemplate.convertAndSend(EXCHANGE_NAME, UN_ROUTING_KEY, message, new CorrelationData(UUID.randomUUID().toString()));logger.info("【发送了第一条不能正确被路由的消息】,exchange:[{}],routingKey:[{}],message:[{}]", EXCHANGE_NAME, UN_ROUTING_KEY, message);}}}

【d】自定义消息发送确认的回调:

/*** @Description: 自定义消息发送确认的回调* @author: weishihuai* @Date: 2019/6/27 15:18*/
@Component
public class CustomConfirmCallback implements RabbitTemplate.ConfirmCallback {private static final Logger logger = LoggerFactory.getLogger(CustomConfirmCallback.class);@Autowiredprivate RabbitTemplate rabbitTemplate;/*** PostConstruct: 用于在依赖关系注入完成之后需要执行的方法上,以执行任何初始化.*/@PostConstructpublic void init() {//指定 ConfirmCallbackrabbitTemplate.setConfirmCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {logger.info("(start)生产者消息确认=========================");if (!isSendSuccess) {logger.info("消息可能未到达rabbitmq服务器");}logger.info("(end)生产者消息确认=========================");}}

【e】消费者:

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Random;/*** @Description: 消费者* @author: weishihuai* @Date: 2019/6/27 16:07*/
@Component
public class Consumer {private static final Logger logger = LoggerFactory.getLogger(Consumer.class);@RabbitListener(queues = "test_dlx_queue_name")public void receiveMessage(String receiveMessage, Message message, Channel channel) {try {logger.info("【Consumer】接收到消息:[{}]", receiveMessage);//这里模拟随机拒绝一些消息到死信队列中if (new Random().nextInt(10) < 5) {logger.info("【Consumer】拒绝一条信息:[{}],该消息将会被转发到死信交换器中", receiveMessage);channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);} else {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}} catch (Exception e) {logger.info("【Consumer】接消息后的处理发生异常", e);try {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e1) {logger.error("手动确认消息异常.", e1);}}}}

【f】配置文件:

server:port: 7777
spring:application:name: mq-dead-letter-exchangerabbitmq:host: 127.0.0.1virtual-host: /vhostusername: wshpassword: wshport: 5672#消息发送确认回调publisher-confirms: truelistener:simple:acknowledge-mode: manualretry:enabled: trueprefetch: 1auto-startup: truedefault-requeue-rejected: false#    publisher-returns: truetemplate:#当mandatory设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;#当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;#通俗的讲,mandatory标志告诉broker代理服务器至少将消息route到一个队列中,否则就将消息return给发送者;mandatory: trueconnection-timeout: 10000

【g】测试用例:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootRabbitmqDeadLetterExchangeApplicationTests {@Autowiredprivate Producer producer;@Testpublic void contextLoads() {producer.sendMessage();}}

【h】运行结果:

由上图可见,生产者发送了10条消息,有两条消息不能被正确路由到队列的,那么这两条消息应该被转发到备份交换机上所绑定的队列上面;其中也有3条消息被拒绝了,那么将会被转发到死信交换机对应的死信队列中,可以观察MQ管理控制台:

【I】总结:

  • arguments.put("x-dead-letter-exchange", DEAD_LETTERS_EXCHANGE_NAME);    //指定死信发送的Exchange
  • arguments.put("alternate-exchange", MESSAGE_BAK_EXCHANGE_NAME);     //声明备份交换机

三、应用场景

实际工作中,死信队列可以应用在许多场景中,例如常见的过期未支付订单自动取消 就可以通过 (死信队列 + 过期时间)来实现,就是当有一个队列 queue1,其 对应的死信交换机 为 deadEx1,deadEx1 绑定了一个队列 deadQueue1,
当队列 queue1 中有一条消息因过期(假设30分钟未支付就取消订单)或者其他原因成为死信的试试,消息就会被转发到死信队列上面,然后我们可以通过监听死信队列中的消息,同时可以加上判断订单的状态是否已经支付,如果已经支付那么不处理,如果未支付,那么可以更新订单状态为已取消。(也就相当于消费的是因过期产生的死信订单信息)。

对比未使用消息队列的时候的解决方案:

设置一个定时器,每秒轮询数据库查找超出过期时间且未支付的订单,然后修改状态,但是这种方式会占用很多资源

相比较而言,使用消息队列可以减少对数据库的压力,在高流量的情况下可以提高系统的响应速度。

四、总结

本文通过一个简单的示例说明了在RabbitMQ中如何使用死信队列和备份交换机,主要点就是声明队列的时候使用x-dead-letter-exchange参数指定死信交换机,声明交换机的时候使用alternate-exchange参数指定备份交换机是哪一个,这样死信消息就会被正确转发到死信交换机绑定的队列上,未被正确路由的消息会被转发到备份交换机对应的备份队列上,根据具体的业务场景,我们可通过监听死信队列或者备份队列进行进一步的处理工作。

RabbitMQ死信队列和备份交换器总结相关推荐

  1. RabbitMQ死信队列应用

    目录 1.什么是死信队列 2.死信队列应用场景 3.延迟消息概念 3.1 消息的TTL(Time To Live) 3.2 死信交换器 3.3 延时队列 4.死信队列使用流程图 5.死信队列应用(管理 ...

  2. Java秒杀系统实战系列~RabbitMQ死信队列处理超时未支付的订单(转)

    转自: https://juejin.cn/post/6844903903130042376 文末有源代码,非常棒 摘要: 本篇博文是"Java秒杀系统实战系列文章"的第十篇,本篇 ...

  3. RabbitMQ - 死信队列(DLX)

    RabbitMQ - 死信队列(DLX) 配置死信队列 方式1 - RabbitMQ 管理后台配置死信队列 方式2 - 代码创建死信队列 验证 满足死信队列的条件 死信队列只是一个概念,本质就是普通的 ...

  4. Rabbitmq死信队列

    目录 1.什么是死信队列 2.产生死信队列的原因 3.代码实现---直连交换机 3.1.导入依赖 3.2.配置rabbitmq连接信息 3.3.编写配置类 3.4.编写生产者 3.5.编写消费者 3. ...

  5. 【分布式】Rabbitmq死信队列模型、实战场景---订单延迟30min支付处理

    分布式 内容管理 死信队列 死信队列demo 死信队列消息模型 平台订单支付超时 --- 演示 业务分析 代码实现 RabbitMQ 死信队列/ 延迟队列 - 延迟业务逻辑 最近可能分布式进入Redi ...

  6. RabbitMq死信队列介绍

    一.官方地址信息 rabbitmq代码仓库:https://github.com/rabbitmq rabbitmq官网地址:https://rabbitmq.com rabbitmq死信交换机官网介 ...

  7. RabbitMQ死信队列管理

    概述 之前我们折腾了RabbitMQ的搭建,RabbitMQ的集群.所以,很快我就在公司封装好并推广了消息队列的使用.我们因为仅使用发布订阅和点对点的模型,所以很简单而单纯,就用起来了,但是,我们很快 ...

  8. rabbitmq死信队列详解与使用

    关注架构师高级俱乐部 开启架构之路 不定期福利发放哦~ 什么是死信队列 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到b ...

  9. springboot集成rabbitmq死信队列的延时队列使用

    目录         1.自动分列延时队列 2.应答失败自动转储延时再通知机制 ------------------------------------------------------------ ...

  10. RabbitMQ死信队列,延时队列

    死信队列 消息被消费方否定确认,使用channel.basicNack或channel.basicReject, 并且此时requeue属性被设置为false. 消息在队列的存活时间超过设置的TTL时 ...

最新文章

  1. 性能压测诡异的Requests/second 响应刺尖问题
  2. 一起谈.NET技术,获取ISAPI_Rewrite重写后的URL
  3. 首次公开:京东数科强一致、高性能分布式事务中间件 JDTX
  4. CRC32爆破解密脚本工具(三)
  5. centos进入单用户模式
  6. PHP框架和springboot区别,Spring和SpringBoot的区别
  7. SQL数据库对象的建立
  8. 详解Mysql中的JSON系列操作函数
  9. go语言io reader_Go语言中的io.Reader和io.Writer以及它们的实现
  10. Spring Boot基础学习笔记10:配置国际化页面
  11. 逆向生成的Dimac.JMail工程及测试项目
  12. 重磅发布!最新版《动手学深度学习》PDF版今天终于可以下载
  13. VisualStudio2022如何改为中文语言(vs2022汉化)
  14. KeyTool生成证书链及使用
  15. 超赞,1万字的后端面试题及面试经验分享!
  16. Transaction
  17. voyage java_GitHub - zhaoshiling1017/voyage: 采用Java实现的基于netty轻量的高性能分布式RPC服务框架...
  18. 【学习笔记】STM32hal库开发入门笔记
  19. ios微信公众号分享自定义分享无效
  20. Qt之鼠标滑过控件由箭头变成手型

热门文章

  1. kaggle 预测房价竞赛总结 动手学深度学习v2 pytorch
  2. mysql语句解析_MYSQL中SQL执行分析
  3. 316.去除重复字母
  4. 一个包含所有c++的头文件的头文件
  5. github第一次使用--创建hello-world
  6. 串口屏与6050_MPU6050 STM32控制 六轴传感器,可通过串口屏显示,还可连接匿名上位机 欧拉角 SCM 单片 发 267万源代码下载- www.pudn.com...
  7. KNN(k-nearest neighbor algorithm)--从原理到实现
  8. 《概率统计》知识点(持续更新……)
  9. vmware ubuntu 16.04 guest 修复不能桌面大小自动调整和从宿主机复制粘贴的问题
  10. 卡方分布的定义与概率密度的计算