RabbitMQ高级特性(五):RabbitMQ之死信队列DLX
一、死信队列简介
(1)死信队列
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
(2)死信条件
消息成为死信(Dead message)的三种情况:
- 队列消息长度到达限制。
- 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false。
- 原队列存在消息过期设置,消息到达超时时间未被消费。
(3)普通队列与死信交换机关联
普通队列需绑定死信交换机:
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
二、生产者工程
(1)RabbitMQ配置文件(rabbitmq.properties)
rabbitmq.host=192.168.116.161
rabbitmq.port=5672
rabbitmq.username=xiao
rabbitmq.password=xiao
rabbitmq.virtual-host=/myhost
(2)声明队列、交换机、交换机绑定队列(spring-rabbitmq-producer.xml)
① 声明死信队列(queue_dlx)和死信交换机(exchange_dlx);
② 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx);
③ 正常队列绑定死信交换机:
设置两个参数:
(1)x-dead-letter-exchange:死信交换机名称
(2)x-dead-letter-routing-key:发送给死信交换机的routingkey
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttps://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!--加载rabbitmq配置文件--><context:property-placeholder location="classpath:rabbitmq.properties"/><!-- 定义rabbitmq connectionFactory --><rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.virtual-host}"publisher-confirms="true"publisher-returns="true"/><!--定义管理交换机、队列--><rabbit:admin connection-factory="connectionFactory"/><!--定义rabbitTemplate对象操作可以在代码中方便发送消息--><rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/><!-- ===========测试死信队列================================================================= --><!--1. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)2. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)正常队列绑定死信交换机,设置两个参数:(1)x-dead-letter-exchange:死信交换机名称(2)x-dead-letter-routing-key:发送给死信交换机的routingkey--><!-- 1. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx),两者绑定 --><rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue><rabbit:topic-exchange name="exchange_dlx"><rabbit:bindings><rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange><!-- 2. 声明普通队列(test_queue_dlx)和普通交换机(test_exchange_dlx),两者绑定 --><rabbit:queue name="test_queue_dlx" id="test_queue_dlx"><rabbit:queue-arguments><!-- 普通队列绑定死信交换机--><entry key="x-dead-letter-exchange" value="exchange_dlx" /><!-- 设置发送给死信交换机的routingkey--><entry key="x-dead-letter-routing-key" value="dlx.zhangxiaohu" /><!-- 设置队列的过期时间ttl(10秒)--><entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" /><!-- 设置队列的长度限制max-length --><entry key="x-max-length" value="10" value-type="java.lang.Integer" /></rabbit:queue-arguments></rabbit:queue><rabbit:topic-exchange name="test_exchange_dlx"><rabbit:bindings><rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange><!-- ============================================================================ --></beans>
(3)测试类(测试因过期时间成为死信消息
)
package net.xiaof.test;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 测试死信队列*/@Testpublic void testDlx(){//1. 发送死信消息(因过期时间成为死信消息)rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");}}
运行结果:
注: spring-rabbitmq-producer.xml中已经设置过期时间为10秒。
(3)修改测试类(测试因队列长度限制成为死信消息
)
注释掉spring-rabbitmq-consumer.xml中的过期时间设置:
<!-- 2. 声明普通队列(test_queue_dlx)和普通交换机(test_exchange_dlx),两者绑定 -->
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx"><rabbit:queue-arguments>......<!-- 设置队列的过期时间ttl(10秒)--><!--<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />-->......</rabbit:queue-arguments>
</rabbit:queue>
修改测试类代码:
package net.xiaof.test;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 测试死信队列*/@Testpublic void testDlx(){//发送死信消息(因队列长度限制成为死信消息)for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");}}}
运行结果:
注:
spring-rabbitmq-producer.xml中已经设置队列最大长度为10,当发送20条消息时,会有10条消息重新路由到死信队列中(如下图)
。
三、搭建消费者工程(目的:测试因消费者拒签而成为死信
)
(1)RabbitMQ配置文件(rabbitmq.properties)
rabbitmq.host=192.168.116.161
rabbitmq.port=5672
rabbitmq.username=xiao
rabbitmq.password=xiao
rabbitmq.virtual-host=/myhost
(2)自定义死信队列监听器
package net.xiaof.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;/*** @author zhangxh* @Description: 消费端,定义死信队列监听器* @date 2020-12-26*/
@Component
public class DlxListener implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {Thread.sleep(1200);//防止控制台打印过快long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("DlxListener的onMessage方法执行了");try {//1.接收并转换消息System.out.println(new String(message.getBody()));//2.处理业务逻辑System.out.println("处理业务逻辑...");int i = 3/0;//出现错误,在catch中拒绝签收//3.手动签收channel.basicAck(deliveryTag,true);} catch (Exception e) {//e.printStackTrace();//4.拒绝签收,不重回队列 requeue=false/*** long deliveryTag:该消息的index* boolean multiple:是否批量。true:将一次性拒绝所有小于deliveryTag的消息。* boolean requeue:被拒绝的是否重新入队列*/System.out.println("出现异常,拒绝签收...");channel.basicNack(deliveryTag,true,false);}}}
(3)开启手动确认模式、声明DLX监听器(spring-rabbitmq-consumer.xml)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttps://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!--加载配置文件--><context:property-placeholder location="classpath:rabbitmq.properties"/><!-- 定义rabbitmq connectionFactory --><rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.virtual-host}"/><!-- 组件扫描 --><context:component-scan base-package="net.xiaof.listener" /><!--定义监听器容器(设置手动签收模式:acknowledge="manual",prefetch预处理数量)--><rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" ><!-- 声明DLX监听器,监听队列test_queue_dlx--><rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"/></rabbit:listener-container></beans>
(4)消费者测试类(死循环)
package net.xiaof.test;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;/*** @author zhangxh* @Description: 测试* @date 2020-12-25*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {/*** 使用死循环开启消费端*/@Testpublic void testConsumer() throws InterruptedException {while (true){}}}
(5)修改生产者测试类(测试因消费者拒收消息(且不做重回队列处理)成为死信消息
)
package net.xiaof.test;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 测试死信队列*/@Testpublic void testDlx(){//发送死信消息(因消费者拒收消息(且不做重回队列处理)成为死信消息)rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");}}
(6)测试死信队列(因消费者拒收消息(且不做重回队列处理)成为死信消息
)
(1)运行生产者工程测试类,发送消息到“test_exchange_dlx”普通队列中。
(2)运行消费者工程测试类(死循环),DLX监听器中进行了手动拒签(且不重回队列),消息拒签后转为死信,重新路由到了queue_dlx死信队列(如下图):
五、死信队列DLX总结
1. 死信交换机和死信队列和普通的没有区别。
2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列。
3. 消息成为死信的三种情况:
(1)队列消息长度到达限制;
(2)消费者拒接消费消息,并且不重回队列;
(3)原队列存在消息过期设置,消息到达超时时间未被消费;
RabbitMQ高级特性(五):RabbitMQ之死信队列DLX相关推荐
- RabbitMQ(二):RabbitMQ高级特性
RabbitMQ(二):RabbitMQ高级特性 RabbitMQ是目前非常热门的一款消息中间件,不管是互联网大厂还是中小企业都在大量使用.作为一名合格的开发者,有必要了解一下相关知识,RabbitM ...
- RabbitMQ 高级特性(吐血猝死整理篇)
文章目录 RabbitMQ 高级特性 消息可靠性投递(可靠性发送) 事务机制 代码实现 发送方确认机制 为什么比事务性能好 示例代码 测试一下QPS 持久化存储 TTL 队列 死信队列(DLX) 延迟 ...
- RabbitMQ高级特性——死信队列DLX以及代码测试
大伙可以到我的RabbitMQ专栏获取更多信息 demo示例这里拿 概述 死信队列,缩写DLX(dead letter exchange 死信交换机),当消息称为dead message之后,会被重新 ...
- 3 RabbitMQ高级特性 3
主要为大家讲解RabbitMQ的高级特性和实际场景应用, 包括消息如何保障 100% 的投递成功 ? 幂等性概念详解,在海量订单产生的业务高峰期,如何避免消息的重复消费问题? Confirm确认消息. ...
- RabbitMQ高级特性
文章目录 1. 简述 2. 特性示例: 2.1 消息可靠性投递 2.2 Consumer Ack 2.3 消费端限流 2.4 TTL 2.5 死信队列 2.6 延迟队列 1. 简述 在rabbitMQ ...
- RabbitMQ 死信队列DLX
死信队列的简单介绍 利用dlx,当消息在一个队列中变成死信之后,它能被重新publish到另一个exchange,这个exchange就是dlx 消息变成死信的以下几种情况 消息被拒绝,并且reque ...
- 【消息中间件】RabbitMQ 高级特性与应用问题
消息的可靠投递 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式. confirm 确认模式 r ...
- RabbitMQ - 死信队列(DLX)
RabbitMQ - 死信队列(DLX) 配置死信队列 方式1 - RabbitMQ 管理后台配置死信队列 方式2 - 代码创建死信队列 验证 满足死信队列的条件 死信队列只是一个概念,本质就是普通的 ...
- 1-8 (4). RabbitMQ高级特性-消费端ACK
Consumer ACK 指Acknowledge,确认 有三种方式: (1)自动确认:acknowledge="none"(默认) (2)手动确认:acknowledge=&qu ...
最新文章
- 中消协发布2018年春节消费提示
- 【★】Web精彩实战之智能迷宫
- 【2015沈阳现场A】
- Hbase2.1.0-CDH6.3.2 Region in Transition (永久RIT) 异常解决
- JQuery中的.attr()与.removeAttr()
- sqlserver建表语句_重新认识MySQL中的COUNT语句
- 上班族吐槽大集合:那些发生在公司的傻X奇遇
- array_chunk_PHP array_chunk()函数与示例
- mysql update多个表_mysql update 多表 (复制)
- 冻结和只读取当前对象的属性,不读取对象原型的属性
- jquery load 事件用法
- 【Foreign】朗格拉日计数 [暴力]
- JavaScript中大数相加的解法
- 对Photoshop高斯模糊滤镜的算法总结
- 嵌入式linux地图,基于嵌入式Linux的MapInfo格式地图显示
- 分苹果(C语言实现)
- verilog实现格雷码与二进制码的互换
- 如何使用project制定项目计划?(附详细步骤截图)
- 如何去掉抖音短视频水印----全网最好用的去抖音视频水印方法
- 扫二维码 下载app