文章目录

  • 什么是死信队列
  • 如何配置死信队列
  • 死信消息的变化
  • 死信队列应用场景
  • 总结

什么是死信队列

为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。

死信是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:

  • 消息被否定确认,并且 channel.basicNack 或 channel.basicReject 参数里 requeue 属性被设置为false。表示不会被再次放在队列里,被其他消费者使用。
  • 消息在队列的存活时间超过设置的TTL时间。
  • 消息队列的消息数量已经超过最大队列长度。排在前面的消息会被丢弃或者扔到死信路由上。

那么该消息将成为“死信”。

死信消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

Dead Letter Exchange 其实就是一种普通exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列。

手动ack&异常消息统一放在一个队列处理建议的两种方式:

  • catch异常后,手动发送到指定队列,然后使用channel给rabbitmq确认消息已消费。
  • 给Queue绑定死信队列,使用nack(requque为false)确认消息消费失败。

注意死信队列不能有消费者。

如何配置死信队列

配置死信队列可以分为以下步骤:

  • 配置业务队列,绑定到业务交换机上
  • 为业务队列配置死信交换机和路由key
  • 为死信交换机配置死信队列

注意,并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。

有了死信交换机和路由key后,接下来,就像配置业务队列一样,配置死信队列,然后绑定在死信交换机上。也就是说,死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型【Direct、Fanout、Topic】。一般来说,会为每个业务队列分配一个独有的路由key,并对应的配置一个死信队列进行监听,也就是说,一般会为每个重要的业务队列配置一个死信队列。

RabbitMQ配置:
声明了两个Exchange,一个是业务Exchange(FanoutExchange),另一个是死信Exchange(DirectExchange),业务Exchange下绑定了两个业务队列,业务队列都配置了同一个死信Exchange,并分别配置了路由key,在死信Exchange下绑定了两个死信队列,设置的路由key分别为业务队列里配置的路由key。

@Configuration
public class RabbitMQConfig {public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea";public static final String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb";public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey";public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey";public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea";public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb";// 声明业务Exchange@Bean("businessExchange")public FanoutExchange businessExchange() {return new FanoutExchange(BUSINESS_EXCHANGE_NAME);}// 声明死信Exchange@Bean("deadLetterExchange")public DirectExchange deadLetterExchange() {return new DirectExchange(DEAD_LETTER_EXCHANGE);}// 声明业务队列A@Bean("businessQueueA")public Queue businessQueueA() {Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//       x-dead-letter-routing-key  这里声明当前队列的死信路由keyargs.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();}// 声明业务队列B@Bean("businessQueueB")public Queue businessQueueB() {Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//       x-dead-letter-routing-key  这里声明当前队列的死信路由keyargs.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build();}// 声明死信队列A@Bean("deadLetterQueueA")public Queue deadLetterQueueA() {return new Queue(DEAD_LETTER_QUEUEA_NAME);}// 声明死信队列B@Bean("deadLetterQueueB")public Queue deadLetterQueueB() {return new Queue(DEAD_LETTER_QUEUEB_NAME);}// 声明业务队列A绑定关系@Beanpublic Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,@Qualifier("businessExchange") FanoutExchange exchange) {return BindingBuilder.bind(queue).to(exchange);}// 声明业务队列B绑定关系@Beanpublic Binding businessBindingB(@Qualifier("businessQueueB") Queue queue,@Qualifier("businessExchange") FanoutExchange exchange) {return BindingBuilder.bind(queue).to(exchange);}// 声明死信队列A绑定关系@Beanpublic Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,@Qualifier("deadLetterExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);}// 声明死信队列B绑定关系@Beanpublic Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,@Qualifier("deadLetterExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);}
}

yml配置:

spring:rabbitmq:host: localhostpassword: guestusername: guestvirtualHost: /listener:type: simplesimple:default-requeue-rejected: falseacknowledge-mode: manual

业务队列的消费者:

@Slf4j
@Component
public class BusinessMessageReceiver {@RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUEA_NAME)public void receiveA(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());System.out.println("收到业务消息A:" + new String(message.getBody()));boolean ack = true;Exception exception = null;try {if (msg.contains("deadletter")) {throw new RuntimeException("dead letter exception");}} catch (Exception e) {ack = false;exception = e;}if (!ack) {log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);} else {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}@RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUEB_NAME)public void receiveB(Message message, Channel channel) throws IOException {System.out.println("收到业务消息B:" + new String(message.getBody()));channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}

死信队列的消费者:

@Component
public class DeadLetterMessageReceiver {@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUEA_NAME)public void receiveA(Message message, Channel channel) throws IOException {System.out.println("收到死信消息A:" + new String(message.getBody()));channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUEB_NAME)public void receiveB(Message message, Channel channel) throws IOException {System.out.println("收到死信消息B:" + new String(message.getBody()));channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}

消息生产者:

@RestController
public class BusinessController {@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("send")public void sendMsg(String msg) {rabbitTemplate.convertSendAndReceive(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, "", msg);}
}

从RabbitMQ的管理后台中看到一共有四个Queue,两个Exchange。


测试nck的消息:http://localhost:8080/send?msg=deadletter
将会触发业务队列A的NCK,按照预期,消息被NCK后,会抛到死信队列中,因此死信队列将会出现这个消息,日志如下:

收到业务消息B:deadletter
收到业务消息A:deadletter
消息消费发生异常,error msg:dead letter exception
java.lang.RuntimeException: dead letter exception
...收到死信消息A:deadletter

可以看到,死信队列的Consumer接受到了这个消息,所以流程到此为止就打通了。

死信消息的变化

那么“死信”被丢到死信队列中后,会发生什么变化呢?

如果队列配置了参数 x-dead-letter-routing-key 的话,“死信”的路由key将会被替换成该参数对应的值。如果没有设置,则保留该消息原有的路由key。

举个例子:

如果原有消息的路由key是testA,被发送到业务Exchage中,然后被投递到业务队列QueueA中,如果该队列没有配置参数x-dead-letter-routing-key,则该消息成为死信后,将保留原有的路由key testA,如果配置了该参数,并且值设置为testB,那么该消息成为死信后,路由key将会被替换为testB,然后被抛到死信交换机中。

另外,由于被抛到了死信交换机,所以消息的Exchange Name也会被替换为死信交换机的名称。

消息的Header中,也会添加很多奇奇怪怪的字段,修改一下上面的代码,在死信队列的消费者中添加一行日志输出:log.info("死信消息properties:{}", message.getMessageProperties());

得到死信消息Header中被添加的信息:

死信消息properties:MessageProperties [headers={x-first-death-exchange=dead.letter.demo.simple.business.exchange, x-death=[{reason=rejected, count=1, exchange=dead.letter.demo.simple.business.exchange, time=Sun Jul 14 16:48:16 CST 2019, routing-keys=[], queue=dead.letter.demo.simple.business.queuea}], x-first-death-reason=rejected, x-first-death-queue=dead.letter.demo.simple.business.queuea}, correlationId=1, replyTo=amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAREVTS1RPUC1DUlZGUzBOAAAPQAAAAAAB.bLbsdR1DnuRSwiKKmtdOGw==, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dead.letter.demo.simple.deadletter.exchange, receivedRoutingKey=dead.letter.demo.simple.deadletter.queuea.routingkey, deliveryTag=1, consumerTag=amq.ctag-NSp18SUPoCNvQcoYoS2lPg, consumerQueue=dead.letter.demo.simple.deadletter.queuea]

Header中看起来有很多信息,实际上并不多,只是值比较长而已。下面就简单说明一下Header中的值:

死信队列应用场景

一般用在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,一般发生消费异常可能原因主要有由于消息信息本身存在错误导致处理异常,处理过程中参数校验异常,或者因网络波动导致的查询异常等等,当发生异常时,当然不能每次通过日志来获取原消息,然后让运维帮忙重新投递消息。
通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息,这样比手工恢复数据要好太多了。

死信队列其实并没有什么神秘的地方,不过是绑定在死信交换机上的普通队列,而死信交换机也只是一个普通的交换机,不过是用来专门处理死信的交换机。

总结

总结一下死信消息的生命周期:

  • 业务消息被投入业务队列
  • 消费者消费业务队列的消息,由于处理过程中发生异常,于是进行了nck或者reject操作
  • 被nck或reject的消息由RabbitMQ投递到死信交换机中
  • 死信交换机将消息投入相应的死信队列
  • 死信队列的消费者消费死信消息

死信消息是RabbitMQ为我们做的一层保证,其实我们也可以不使用死信队列,而是在消息消费异常时,将消息主动投递到另一个交换机中,当你明白了这些之后,这些Exchange和Queue想怎样配合就能怎么配合。比如从死信队列拉取消息,然后发送邮件、短信、钉钉通知来通知开发人员关注。或者将消息重新投递到一个队列然后设置过期时间,来进行延时消费。

【RabbitMQ】一文带你搞定RabbitMQ死信队列
【RabbitMQ】一文带你搞定RabbitMQ延迟队列
【RabbitMQ】如何进行消息可靠投递【上篇】
【RabbitMQ】如何进行消息可靠投递【下篇】

RabbitMQ 之死信队列相关推荐

  1. RabbitMq(五) -- 死信队列和延迟队列

    1. 死信 1.1 死信的概念 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue ...

  2. RabbitMQ:死信队列

    ✨ RabbitMQ:死信队列 1.死信队列 1.1死信队列基本介绍 1.2消息成为死信的三种情况 1.3死信队列结构图 1.4死信的处理方式 2.TTL消息过期时间 2.1基本介绍 2.2生产者 2 ...

  3. rabbitMQ学习-死信队列

    死信队列 死信:顾名思义就是无法被消费的消息,一般情况下,product将消息投递到broker或者直接到queue里,consumer从queue取出消息,进行消费,但某些时候由于特定的原因导致qu ...

  4. rabbitmq利用死信队列+TTL 实现延迟队列

    2019独角兽企业重金招聘Python工程师标准>>> 适用场景:订单超时未支付,倘若适用定时器的话,那么数据量大的话,轮询查询数据,首先IO开销大,其次任务时间要求高,扫描越频繁性 ...

  5. RabbitMQ的死信队列的应用

    强烈推荐一个大神的人工智能的教程:http://www.captainbed.net/zhanghan [前言] 最近在项目中用到了RabbitMQ来做异步处理,自己将这块儿系统的搞了搞,下面主要记录 ...

  6. 消息中间件之rabbitMQ实战-死信队列

    该篇文章内容较多,包括有rabbitMq相关的一些简单理论介绍,集成spring Boot,provider消息推送实例,consumer消息消费实例,Direct(直连类型交换机).Fanout(广 ...

  7. RabbitMQ实现死信队列

    目录 死信队列是什么 怎样实现一个死信队列 说明 实现过程 导入依赖 添加配置 编写mq配置类 添加业务队列的消费者 添加死信队列的消费者 添加消息发送者 添加消息测试类 测试 死信队列的应用场景 总 ...

  8. RabbitMQ高级特性(五):RabbitMQ之死信队列DLX

    一.死信队列简介 (1)死信队列 死信队列,英文缩写:DLX .Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就 ...

  9. RabbitMQ的死信队列

    什么是死信 在 RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现. 死信就是消息在特定场景下的一种表现形式,这些场景包括: 消息被拒绝访问,即 RabbitMQ返回 nack ...

最新文章

  1. shell与 .sh文件与 .bash文件
  2. 【组队学习】【28期】Datawhale组队学习内容介绍
  3. py文件 添加模块映射_Python模块的定义,模块的导入,__name__用法实例分析
  4. 达梦数据库DM8飞腾版本、芯版本获取地址,最新达梦数据库各国产化版本获取方法,达梦数据库DM8使用手册、产品文档获取
  5. 怎么复制远程服务器上的文件夹,Linux系统复制文件/文件夹到远程服务器
  6. for in for of区别_(for…in) VS (for…of)
  7. Flask项目--注册
  8. GEEK学习笔记— —程序员面试宝典笔记(四)
  9. 常见的网络协议和端口号
  10. 产品读书《自卑与超越》
  11. ORA-20001: Latest xml inventory is not loaded into table
  12. 哈夫曼树以及哈夫曼编码
  13. Vue倒计时动画效果
  14. 【转载】JavaScript进阶问题列表
  15. 隧道代理ip使用流程
  16. 购买亚马逊保险前,卖家须注意的问题值得你收藏!
  17. 6 款代码对比工具,你知道几个?
  18. 华为云首席架构师顾炯炯:敢为人先,探索架构创新之路如何走
  19. Jackson之JSON序列化和多态反序列化
  20. TCP是“第一个系统”

热门文章

  1. Linux接口驱动03 - SWIM(脱离ST烧录器,单线完美升级STM8的固件,附带总结经验和源代码)
  2. 图像类型的转换(matlab)
  3. Linux中VMware虚拟机打不开,方法总结
  4. Elasticsearch实战秘籍:GPT助你解锁高效搜索引擎的技巧
  5. 单片机三种烧录方式ISP、IAP和ICP有什么不同?
  6. 微软决定于2022年停止对 IE 浏览器的支持
  7. 仿B站简单版播放器带有弹幕,并支持解析
  8. 圈粉无数的B站美妆UP主@机智的党妹,她的涨粉秘诀是什么?
  9. 商品系统设计(四):商品属性设计之自定义属性
  10. 由于目标计算机积极拒绝,无法连接。 (10061)