目录

  • 死信队列是什么
  • 怎样实现一个死信队列
    • 说明
    • 实现过程
      • 导入依赖
      • 添加配置
      • 编写mq配置类
      • 添加业务队列的消费者
      • 添加死信队列的消费者
      • 添加消息发送者
      • 添加消息测试类
      • 测试
  • 死信队列的应用场景
  • 总结

死信队列是什么

“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:

  • 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
  • 消息在队列的存活时间超过设置的TTL时间。
  • 消息队列的消息数量已经超过最大队列长度。

那么该消息将成为“死信”。

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

怎样实现一个死信队列

说明

配置死信队列大概可以分为三个步骤:

1.配置业务队列,绑定到业务交换机上

2.为业务队列配置死信交换机和路由key

3.为死信交换机配置死信队列

注意,并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。

实现过程

导入依赖

        <!--RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

添加配置

spring:      #rabbitmqrabbitmq:host: 83.136.16.134password: guestusername: guestlistener:type: simplesimple:default-requeue-rejected: falseacknowledge-mode: manual

编写mq配置类

代码里面有详细说明,这里不在赘述。

package com.miaosha.study.mq;import com.sun.org.apache.regexp.internal.RE;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @Author: laz* @CreateTime: 2023-02-27  09:16* @Version: 1.0*/
@Configuration
public class RabbitmqConfig {/*** 业务交换机*/public static final String BUSINESS_EXCHANGE_NAME = "business.exchange";/*** 业务队列a*/public static final String BUSINESS_QUEUEA_NAME = "business.queue.a";/*** 业务交换机b*/public static final String BUSINESS_QUEUEB_NAME = "business.queue.b";/*** 死信交换机*/public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";/*** 死信队列a*/public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.queue.a";/*** 死信队列b*/public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.queue.b";/*** 死信队列路由键a*/public static final String DEAD_LETTER_QUEUEA_ROUNTING_KEY_NAME = "dead.letter.queue.a.rounting.key";/*** 死信队列路由键b*/public static final String DEAD_LETTER_QUEUEB_ROUNTING_KEY_NAME = "dead.letter.queue.b.rounting.key";/*** 申明业务交换机* @return*/@Beanpublic FanoutExchange businessExchange(){return new FanoutExchange(BUSINESS_EXCHANGE_NAME);}/*** 申明死信交换机* @return*/@Beanpublic DirectExchange deadletterExchange(){return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);}/*** 申明业务队列a* @return*/@Beanpublic Queue queuea(){Map<String,Object> map = new HashMap<>();//绑定死信交换机map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);//绑定的死信路由键map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEA_ROUNTING_KEY_NAME);return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(map).build();}/*** 申明业务队列b* @return*/@Beanpublic Queue queueb(){Map<String,Object> map = new HashMap<>();//绑定死信交换机map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);//绑定的死信路由键map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEB_ROUNTING_KEY_NAME);return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(map).build();}/*** 申明死信队列a* @return*/@Beanpublic Queue deadletterQueuea(){return new Queue(DEAD_LETTER_QUEUEA_NAME);}/*** 申明死信队列b* @return*/@Beanpublic Queue deadletterQueueb(){return new Queue(DEAD_LETTER_QUEUEB_NAME);}/*** 队列a绑定到业务交换机* @return*/@Beanpublic Binding businessBindinga(){return BindingBuilder.bind(queuea()).to(businessExchange());}/*** 队列b绑定到业务交换机* @return*/@Beanpublic Binding businessBindingb(){return BindingBuilder.bind(queueb()).to(businessExchange());}/*** 死信队列a绑定到死信交换机* @return*/@Beanpublic Binding deadletterBindinga(){return BindingBuilder.bind(deadletterQueuea()).to(deadletterExchange()).with(DEAD_LETTER_QUEUEA_ROUNTING_KEY_NAME);}/*** 死信队列b绑定到死信交换机* @return*/@Beanpublic Binding deadletterBindingB(){return BindingBuilder.bind(deadletterQueueb()).to(deadletterExchange()).with(DEAD_LETTER_QUEUEB_ROUNTING_KEY_NAME);}
}

添加业务队列的消费者

package com.miaosha.study.mq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;import static com.miaosha.study.mq.RabbitmqConfig.BUSINESS_QUEUEA_NAME;
import static com.miaosha.study.mq.RabbitmqConfig.BUSINESS_QUEUEB_NAME;/*** @Author: laz* @CreateTime: 2023-02-27  09:53* @Version: 1.0*/
@Slf4j
@Component
public class RabbitmqReceiver {/*** 监听业务队列a* @param message*/@RabbitListener(queues = BUSINESS_QUEUEA_NAME)public void queuea(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.info("业务队列A接受到消息【{}】",msg);boolean ack = true;Exception exception = null;try {//这里模拟业务逻辑出现异常的情况if (msg.contains("fail")){throw new RuntimeException("dead letter exception");}} catch (Exception e){ack = false;exception = e;}//当ack为false时(业务逻辑出现异常),说明当前消息消费异常,这里直接放入死信队列if (!ack){log.error("业务队列A消费发生异常,error msg:{}", exception.getMessage());/*** void basicNack(long deliveryTag, boolean multiple, boolean requeue)* 参数一:当前消息的唯一id* 参数二:是否针对多条消息* 参数三:是否从新入队列*/channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);} else {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}/*** 监听业务队列b* @param msg*/@RabbitListener(queues = BUSINESS_QUEUEB_NAME)public void queueb(Message msg,Channel channel) throws Exception{String str = new String(msg.getBody());log.info("业务队列B接受到消息【{}】",str);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);}
}

添加死信队列的消费者

package com.miaosha.study.mq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;import static com.miaosha.study.mq.RabbitmqConfig.*;/*** @Author: laz* @CreateTime: 2023-02-27  09:58* @Version: 1.0*/
@Slf4j
@Component
public class DeadLetterReceiver {/*** 监听业务队列a* @param msg*/@RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)public void queuea(Message msg, Channel channel) throws IOException {String str = new String(msg.getBody());log.info("死信队列A接受到消息【{}】",str);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);log.info("死信消息properties:{}", msg.getMessageProperties());}/*** 监听业务队列b* @param msg*/@RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)public void queueb(Message msg, Channel channel) throws IOException {String str = new String(msg.getBody());log.info("死信队列B接受到消息【{}】",str);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);log.info("死信消息properties:{}", msg.getMessageProperties());}
}

添加消息发送者

package com.miaosha.study.mq;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import static com.miaosha.study.mq.RabbitmqConfig.BUSINESS_EXCHANGE_NAME;/*** @Author: laz* @CreateTime: 2023-02-27  09:49* @Version: 1.0*/
@Component
public class RabbitmqSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(String msg){rabbitTemplate.convertAndSend(BUSINESS_EXCHANGE_NAME,"",msg);}
}

添加消息测试类

package com.miaosha.study.controller;import com.miaosha.study.mq.RabbitmqSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @Author: laz* @CreateTime: 2023-02-27  09:59* @Version: 1.0*/
@RestController
@RequestMapping("mq")
public class TestController {@Autowiredprivate RabbitmqSender rabbitmqSender;@RequestMapping("testDeadLetterQueue/{msg}")public void testDeadLetterQueue(@PathVariable("msg")String msg){rabbitmqSender.sendMsg(msg);}
}

测试

运行项目,访问:http://localhost:8081/mq/testDeadLetterQueue/msg

可以看到,此时只有业务消费者消费了消息,死信队列并没有消费到消息。

然后根据消费者里面的逻辑,我们发送一条 ‘fail’的消息,再次测试

访问:http://localhost:8081/mq/testDeadLetterQueue/fail

可以看到,死信队列a已收到消息。到此实现死信队列的流程就通了。

注意:我们的死信消息MessageProperties中的内容比较多,代表的含义分别是:

字段名 含义
x-first-death-exchange 第一次被抛入的死信交换机的名称
x-first-death-reason 第一次成为死信的原因,rejected:消息在重新进入队列时被队列拒绝,由于default-requeue-rejected 参数被设置为false。expired :消息过期。maxlen : 队列内消息数量超过队列最大容量
x-first-death-queue 第一次成为死信前所在队列名称
x-death 历次被投入死信交换机的信息列表,同一个消息每次进入一个死信交换机,这个数组的信息就会被更新

死信队列的应用场景

一般用在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,一般发生消费异常可能原因主要有由于消息信息本身存在错误导致处理异常,处理过程中参数校验异常,或者因网络波动导致的查询异常等等,当发生异常时,当然不能每次通过日志来获取原消息,然后让运维帮忙重新投递消息。通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息,这样比手工恢复数据要好太多了。

总结

死信队列其实并没有什么神秘的地方,不过是绑定在死信交换机上的普通队列,而死信交换机也只是一个普通的交换机,不过是用来专门处理死信的交换机。

死信消息的生命周期:

  1. 业务消息被投入业务队列
  2. 消费者消费业务队列的消息,由于处理过程中发生异常,于是进行了nck或者reject操作
  3. 被nck或reject的消息由RabbitMQ投递到死信交换机中
  4. 死信交换机将消息投入相应的死信队列
  5. 死信队列的消费者消费死信消息

本篇文章到此结束!希望对您有所帮助。

RabbitMQ实现死信队列相关推荐

  1. RabbitMq(五) -- 死信队列和延迟队列

    1. 死信 1.1 死信的概念 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue ...

  2. RabbitMQ:死信队列

    ✨ RabbitMQ:死信队列 1.死信队列 1.1死信队列基本介绍 1.2消息成为死信的三种情况 1.3死信队列结构图 1.4死信的处理方式 2.TTL消息过期时间 2.1基本介绍 2.2生产者 2 ...

  3. rabbitMQ学习-死信队列

    死信队列 死信:顾名思义就是无法被消费的消息,一般情况下,product将消息投递到broker或者直接到queue里,consumer从queue取出消息,进行消费,但某些时候由于特定的原因导致qu ...

  4. RabbitMQ 之死信队列

    文章目录 什么是死信队列 如何配置死信队列 死信消息的变化 死信队列应用场景 总结 什么是死信队列 为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将 ...

  5. rabbitmq利用死信队列+TTL 实现延迟队列

    2019独角兽企业重金招聘Python工程师标准>>> 适用场景:订单超时未支付,倘若适用定时器的话,那么数据量大的话,轮询查询数据,首先IO开销大,其次任务时间要求高,扫描越频繁性 ...

  6. RabbitMQ的死信队列的应用

    强烈推荐一个大神的人工智能的教程:http://www.captainbed.net/zhanghan [前言] 最近在项目中用到了RabbitMQ来做异步处理,自己将这块儿系统的搞了搞,下面主要记录 ...

  7. 消息中间件之rabbitMQ实战-死信队列

    该篇文章内容较多,包括有rabbitMq相关的一些简单理论介绍,集成spring Boot,provider消息推送实例,consumer消息消费实例,Direct(直连类型交换机).Fanout(广 ...

  8. RabbitMQ高级特性(五):RabbitMQ之死信队列DLX

    一.死信队列简介 (1)死信队列 死信队列,英文缩写:DLX .Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就 ...

  9. RabbitMQ的死信队列

    什么是死信 在 RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现. 死信就是消息在特定场景下的一种表现形式,这些场景包括: 消息被拒绝访问,即 RabbitMQ返回 nack ...

最新文章

  1. windows下载anaconda3速度太慢怎么办
  2. 在windows中安装python
  3. java 线程状态_JAVA线程漫谈:线程状态与状态转换解析
  4. 【转】 Android ListView与Button的显示----不错不错
  5. 华为GaussDB相比PostgreSQL做了哪些内核优化?
  6. 高薪诚聘项目经理,架构师,高级工程师,工程师,网页设计师
  7. 在window7 64位下Oracle 10g 数据库中PLSQL Developer配置和使用
  8. 最大的LeftMax与rightMax之差绝对值
  9. Liunx 环境 docker-安装redis11
  10. R-CNN学习笔记2:Rich feature hierarchies for accurate object detection and semantic segmentation
  11. js便签笔记(5)——Dean Edwards大牛的跨浏览器AddEvent()设计(不知道是不是jQuery事件系统的原型)...
  12. Unity3D 场景编辑器扩展学习笔记-Editor
  13. 汪学明导师—商业模式创新与转型专家
  14. 0x0000006B蓝屏解决方法
  15. 增强型MOSFET导通条件
  16. FBReader阅读引擎支持的功能
  17. 终于懂得孤独是躲不开的单行道
  18. 生日祝福电子贺卡html5,电子生日祝福卡
  19. php readystate,ajax+php打造进度条 readyState各状态
  20. java的基本概念:进制、单位、编码、数据类型、变量声明、ASCII码

热门文章

  1. WINDOWS命令行查看内存、CPU
  2. 2017ios android比例,[图表]iOS与Android全球份额差距正越拉越大
  3. AES加密:PHP与Java互通,解密准确
  4. java 字符串截取的几种方式
  5. Spark操作Kudu
  6. 【AI人工智能】人工智能简介——AI 的发展是否会导致人类失去工作?
  7. 【高级】管理科学基础知识
  8. win7删除计算机 网络连接,win7系统怎么删除多余的本地连接【图文】
  9. APF抽屉柜有源电力滤波装置100A
  10. 爆肝万字,终于搞定这篇⛵神经网络搭建全全全流程!学不会你来找我~