一、死信队列简介

(1)死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

(2)死信条件

消息成为死信(Dead message)的三种情况:

  1. 队列消息长度到达限制。
  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false。
  3. 原队列存在消息过期设置,消息到达超时时间未被消费。

(3)普通队列与死信交换机关联

普通队列需绑定死信交换机:

给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key


二、生产者工程

(1)RabbitMQ配置文件(rabbitmq.properties)

rabbitmq.host=192.168.116.161
rabbitmq.port=5672
rabbitmq.username=xiao
rabbitmq.password=xiao
rabbitmq.virtual-host=/myhost

(2)声明队列、交换机、交换机绑定队列(spring-rabbitmq-producer.xml)

① 声明死信队列(queue_dlx)和死信交换机(exchange_dlx);

② 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx);

③ 正常队列绑定死信交换机:

设置两个参数:
(1)x-dead-letter-exchange:死信交换机名称
(2)x-dead-letter-routing-key:发送给死信交换机的routingkey

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttps://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!--加载rabbitmq配置文件--><context:property-placeholder location="classpath:rabbitmq.properties"/><!-- 定义rabbitmq connectionFactory --><rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.virtual-host}"publisher-confirms="true"publisher-returns="true"/><!--定义管理交换机、队列--><rabbit:admin connection-factory="connectionFactory"/><!--定义rabbitTemplate对象操作可以在代码中方便发送消息--><rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/><!-- ===========测试死信队列================================================================= --><!--1. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)2. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)正常队列绑定死信交换机,设置两个参数:(1)x-dead-letter-exchange:死信交换机名称(2)x-dead-letter-routing-key:发送给死信交换机的routingkey--><!-- 1. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx),两者绑定 --><rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue><rabbit:topic-exchange name="exchange_dlx"><rabbit:bindings><rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange><!-- 2. 声明普通队列(test_queue_dlx)和普通交换机(test_exchange_dlx),两者绑定 --><rabbit:queue name="test_queue_dlx" id="test_queue_dlx"><rabbit:queue-arguments><!-- 普通队列绑定死信交换机--><entry key="x-dead-letter-exchange" value="exchange_dlx" /><!-- 设置发送给死信交换机的routingkey--><entry key="x-dead-letter-routing-key" value="dlx.zhangxiaohu" /><!-- 设置队列的过期时间ttl(10秒)--><entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" /><!-- 设置队列的长度限制max-length --><entry key="x-max-length" value="10" value-type="java.lang.Integer" /></rabbit:queue-arguments></rabbit:queue><rabbit:topic-exchange name="test_exchange_dlx"><rabbit:bindings><rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange><!-- ============================================================================ --></beans>

(3)测试类(测试因过期时间成为死信消息

package net.xiaof.test;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 测试死信队列*/@Testpublic void testDlx(){//1. 发送死信消息(因过期时间成为死信消息)rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");}}

运行结果:

注: spring-rabbitmq-producer.xml中已经设置过期时间为10秒。


(3)修改测试类(测试因队列长度限制成为死信消息

注释掉spring-rabbitmq-consumer.xml中的过期时间设置:

<!-- 2. 声明普通队列(test_queue_dlx)和普通交换机(test_exchange_dlx),两者绑定 -->
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx"><rabbit:queue-arguments>......<!-- 设置队列的过期时间ttl(10秒)--><!--<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />-->......</rabbit:queue-arguments>
</rabbit:queue>

修改测试类代码:

package net.xiaof.test;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 测试死信队列*/@Testpublic void testDlx(){//发送死信消息(因队列长度限制成为死信消息)for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");}}}

运行结果:

注: spring-rabbitmq-producer.xml中已经设置队列最大长度为10,当发送20条消息时,会有10条消息重新路由到死信队列中(如下图)


三、搭建消费者工程(目的:测试因消费者拒签而成为死信

(1)RabbitMQ配置文件(rabbitmq.properties)

rabbitmq.host=192.168.116.161
rabbitmq.port=5672
rabbitmq.username=xiao
rabbitmq.password=xiao
rabbitmq.virtual-host=/myhost

(2)自定义死信队列监听器

package net.xiaof.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;/*** @author zhangxh* @Description: 消费端,定义死信队列监听器* @date 2020-12-26*/
@Component
public class DlxListener implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {Thread.sleep(1200);//防止控制台打印过快long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("DlxListener的onMessage方法执行了");try {//1.接收并转换消息System.out.println(new String(message.getBody()));//2.处理业务逻辑System.out.println("处理业务逻辑...");int i = 3/0;//出现错误,在catch中拒绝签收//3.手动签收channel.basicAck(deliveryTag,true);} catch (Exception e) {//e.printStackTrace();//4.拒绝签收,不重回队列 requeue=false/*** long deliveryTag:该消息的index* boolean multiple:是否批量。true:将一次性拒绝所有小于deliveryTag的消息。* boolean requeue:被拒绝的是否重新入队列*/System.out.println("出现异常,拒绝签收...");channel.basicNack(deliveryTag,true,false);}}}

(3)开启手动确认模式、声明DLX监听器(spring-rabbitmq-consumer.xml)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttps://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!--加载配置文件--><context:property-placeholder location="classpath:rabbitmq.properties"/><!-- 定义rabbitmq connectionFactory --><rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.virtual-host}"/><!-- 组件扫描 --><context:component-scan base-package="net.xiaof.listener" /><!--定义监听器容器(设置手动签收模式:acknowledge="manual",prefetch预处理数量)--><rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" ><!-- 声明DLX监听器,监听队列test_queue_dlx--><rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"/></rabbit:listener-container></beans>

(4)消费者测试类(死循环)

package net.xiaof.test;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;/*** @author zhangxh* @Description: 测试* @date 2020-12-25*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {/*** 使用死循环开启消费端*/@Testpublic void testConsumer() throws InterruptedException {while (true){}}}

(5)修改生产者测试类(测试因消费者拒收消息(且不做重回队列处理)成为死信消息

package net.xiaof.test;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 测试死信队列*/@Testpublic void testDlx(){//发送死信消息(因消费者拒收消息(且不做重回队列处理)成为死信消息)rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");}}

(6)测试死信队列(因消费者拒收消息(且不做重回队列处理)成为死信消息

(1)运行生产者工程测试类,发送消息到“test_exchange_dlx”普通队列中。

(2)运行消费者工程测试类(死循环),DLX监听器中进行了手动拒签(且不重回队列),消息拒签后转为死信,重新路由到了queue_dlx死信队列(如下图):


五、死信队列DLX总结

1. 死信交换机和死信队列和普通的没有区别。

2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列。

3. 消息成为死信的三种情况:

(1)队列消息长度到达限制;
(2)消费者拒接消费消息,并且不重回队列;
(3)原队列存在消息过期设置,消息到达超时时间未被消费;

RabbitMQ高级特性(五):RabbitMQ之死信队列DLX相关推荐

  1. RabbitMQ(二):RabbitMQ高级特性

    RabbitMQ(二):RabbitMQ高级特性 RabbitMQ是目前非常热门的一款消息中间件,不管是互联网大厂还是中小企业都在大量使用.作为一名合格的开发者,有必要了解一下相关知识,RabbitM ...

  2. RabbitMQ 高级特性(吐血猝死整理篇)

    文章目录 RabbitMQ 高级特性 消息可靠性投递(可靠性发送) 事务机制 代码实现 发送方确认机制 为什么比事务性能好 示例代码 测试一下QPS 持久化存储 TTL 队列 死信队列(DLX) 延迟 ...

  3. RabbitMQ高级特性——死信队列DLX以及代码测试

    大伙可以到我的RabbitMQ专栏获取更多信息 demo示例这里拿 概述 死信队列,缩写DLX(dead letter exchange 死信交换机),当消息称为dead message之后,会被重新 ...

  4. 3 RabbitMQ高级特性 3

    主要为大家讲解RabbitMQ的高级特性和实际场景应用, 包括消息如何保障 100% 的投递成功 ? 幂等性概念详解,在海量订单产生的业务高峰期,如何避免消息的重复消费问题? Confirm确认消息. ...

  5. RabbitMQ高级特性

    文章目录 1. 简述 2. 特性示例: 2.1 消息可靠性投递 2.2 Consumer Ack 2.3 消费端限流 2.4 TTL 2.5 死信队列 2.6 延迟队列 1. 简述 在rabbitMQ ...

  6. RabbitMQ 死信队列DLX

    死信队列的简单介绍 利用dlx,当消息在一个队列中变成死信之后,它能被重新publish到另一个exchange,这个exchange就是dlx 消息变成死信的以下几种情况 消息被拒绝,并且reque ...

  7. 【消息中间件】RabbitMQ 高级特性与应用问题

    消息的可靠投递 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式. confirm 确认模式 r ...

  8. RabbitMQ - 死信队列(DLX)

    RabbitMQ - 死信队列(DLX) 配置死信队列 方式1 - RabbitMQ 管理后台配置死信队列 方式2 - 代码创建死信队列 验证 满足死信队列的条件 死信队列只是一个概念,本质就是普通的 ...

  9. 1-8 (4). RabbitMQ高级特性-消费端ACK

    Consumer ACK 指Acknowledge,确认 有三种方式: (1)自动确认:acknowledge="none"(默认) (2)手动确认:acknowledge=&qu ...

最新文章

  1. 中消协发布2018年春节消费提示
  2. 【★】Web精彩实战之智能迷宫
  3. 【2015沈阳现场A】
  4. Hbase2.1.0-CDH6.3.2 Region in Transition (永久RIT) 异常解决
  5. JQuery中的.attr()与.removeAttr()
  6. sqlserver建表语句_重新认识MySQL中的COUNT语句
  7. 上班族吐槽大集合:那些发生在公司的傻X奇遇
  8. array_chunk_PHP array_chunk()函数与示例
  9. mysql update多个表_mysql update 多表 (复制)
  10. 冻结和只读取当前对象的属性,不读取对象原型的属性
  11. jquery load 事件用法
  12. 【Foreign】朗格拉日计数 [暴力]
  13. JavaScript中大数相加的解法
  14. 对Photoshop高斯模糊滤镜的算法总结
  15. 嵌入式linux地图,基于嵌入式Linux的MapInfo格式地图显示
  16. 分苹果(C语言实现)
  17. verilog实现格雷码与二进制码的互换
  18. 如何使用project制定项目计划?(附详细步骤截图)
  19. 如何去掉抖音短视频水印----全网最好用的去抖音视频水印方法
  20. 扫二维码 下载app

热门文章

  1. 直流3V电压升压到3.3V的PCB板(使用AD)
  2. USB(FS、HS、LS)
  3. linux yum安装ipmitool,centos安装ipmitool
  4. TIMEWAIT状态
  5. 通过RVM方式安装管理Ruby
  6. LeetCode-按顺序刷题备注1-50
  7. 一款超级好用的鼠标手势软件Mouselnc
  8. 中国国内银行==银行外汇理财产品
  9. 如何用 CSS 和 D3 创作一个用文字组成的心形图案
  10. 以太坊开发攻略五:以太坊的编程接口web3.js API的使用上:转账