6 RabbitMQ之死信队列
文章目录
- 1. 案例一:消息TTL过期
- 2. 案例二:队列达到最大长度
- 3. 案例三:消息被拒
死信就是无法被消费的消息成为死信。正常情况下,生产者生产的消息投递到交换机,交换机根据routingKey路由到队列中,消费者从队列中消费消息。但由于某些特定原因导致队列中消息无法被消费者正常消费后,消息就变成了死信消息,存放死信消息的队列就成为死信队列。
死信队列目的就是把未能正常消费的消息保存到死信队列中,保证消息不丢失。
造成消息死信的原因主要有以下3点:
- 消息TTL过期,TTL指Time To Live,指消息在队列中存活的时间;
- 队列达到最大长度,队列已经满,无法再继续添加消息;
- 消息被拒接,消息被消费者消费后,消费者发送了basic.reject拒绝指令或者basic.nack否定指令。
下面通过代码演示三种原因下死信队列的处理,如下图所示,首先生产者生产消息投递到normal_exchange交换机,然后路由到normal_queue中,队列中消息可以被Consumer1正常消费的消息会从队列中删除掉,但由于上述三种原因导致消息不能正常被消费的消息就会被重新投递到dead_exchange死信交换机,然后被路由到dead_queue死信队列中,最后死信消息被Consumer2处理。
1. 案例一:消息TTL过期
首先创建一个Consumer1消费者类,Consumer1类中创建了一个normal_exchange正常交换机,一个dead_exchange死信交换机,创建了一个normal_queue正常队列,一个dead_queue死信队列,并且正常交换机通过routingKey=normal绑定了正常交换机,通过死信交换机通过routingKey=dead绑定了死信队列。然后声明normal_queue队列时还关联了私信交换机,当消息消费失败后消息会重新投递到死信交换机,然后被路由到死信队列中,最后被Consumer2消费。normal_queue队列中可正常消费的消息会被Consumer1进行消费。
public class Consumer1 {private static final String NORMAL_EXCHANGE = "normal_exchange"; /*正常交换机*/private static final String DEAD_EXCHANGE = "dead_exchange"; /*死信交换机*/private static final String NORMAL_QUEUE = "normal_queue"; /*正常队列*/private static final String DEAD_QUEUE = "dead_queue"; /*死信队列*/public static void main(String[] args) throws IOException, TimeoutException {/*获取信道*/Channel channel = RabbitmqUtil.getChannel();/*声明死信交换机和普通交换机*/channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);/*声明死信队列*/channel.queueDeclare(DEAD_QUEUE, false, false, false, null);/*死信队列绑定死信交换机*/channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead");/*声明正常队列, 在声明队列前设置队列的属性*/Map<String, Object> parms = new HashMap<>();/*设置消息过期时间10s,超过10s后消息就会被投递到死信交换机,此参数可以在生产端设置*///parms.put("x-message-ttl", 10000);/*正常队列关联死信交换机, 关联key是固定的, 关联后正常队列中消息无法被消费者消费后就会把消息投递到死信交换机*/parms.put("x-dead-letter-exchange", DEAD_EXCHANGE);/*为正常队列设置消息失败后投递到死信交换机绑定的routingKey, 这样消息消费后就会被死信交换机通过"dead"这个routingKey路由到死信队列中*/parms.put("x-dead-letter-routing-key", "dead");channel.queueDeclare(NORMAL_QUEUE, false, false, false, parms);/*正常队列绑定正常交换机*/channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(message);};CancelCallback cancelCallback = consumerTag -> {System.out.println("消费失败");};channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, cancelCallback);}
}
下面创建Consumer2类用于消费死信消息
import com.lzj.rabbitmq.RabbitmqUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {private static final String DEAD_QUEUE = "dead_queue"; /*死信队列*/public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitmqUtil.getChannel();DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(message);};CancelCallback cancelCallback = consumerTag -> {System.out.println("消费失败");};channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback);}
}
最后创建Producer生产者用于生产消息,并对每个消息都设置了在队列中的过期时间10s。
public class Producer {private static final String NORMAL_CHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {/*获取信道*/Channel channel = RabbitmqUtil.getChannel();/*设置消息在队列中存活时间,10s后消息过期*/AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();/*发布消息*/String message = null;for (int i=1; i<5; i++){message = "commodity" + i;channel.basicPublish(NORMAL_CHANGE, "normal", properties, message.getBytes());System.out.println("生产者发布消息:" + message);}}
}
下面进行测试,首先运行Consumer1进行正常交换机、死信交换机和正常队列、死信队列的已经绑定关系的创建,运行完Consumer1后从RabbitMQ浏览器管理端可以查看交换机以及队列创建情况,其中normal_queue队列中含有两个属性DLK和DLX,DLX表示死信交换机,DLK表示死信交换机的routingKey。
下面启动生产者Producer,为模拟生产者生产的消息经过10s后都没有被Consumer1消费而进入死信队列,在启动生产者之前先关闭Consumer1,这样生产者生产的消息无法被Consumer1消费从而进入死信队列。运行Producer后,输出如下4个消息,表示生产者生产了下面4个消息
生产者发布消息:commodity1
生产者发布消息:commodity2
生产者发布消息:commodity3
生产者发布消息:commodity4
从浏览器管理端可以看到normal_queue队列中也正好有4条待处理的消息
经过10s后再看浏览器管理端,可以发现normal_queue队列中的4条消息都进到了dead_queue死信队列中
下面启动Consumer2消费者消费掉死信队列中消息,输出下面4条消息,从而验证此4条消息就是生产者生产的4条消息,此4条消息过期后进入的死信队列中。
commodity1
commodity2
commodity3
commodity4
2. 案例二:队列达到最大长度
设定normal_queue队列最大长度为4,当队列中消息条数达到4条时进入死信队列。
首先从浏览器管理端删除掉案例一中创建的normal_queue队列,重新创建normal_queue队列,在创建队列过程中通过x-max-length
参数指定队列的最大长度
parms.put("x-max-length", 4);
修改后的Consumer1程序如下所示
public class Consumer1 {private static final String NORMAL_EXCHANGE = "normal_exchange"; /*正常交换机*/private static final String DEAD_EXCHANGE = "dead_exchange"; /*死信交换机*/private static final String NORMAL_QUEUE = "normal_queue"; /*正常队列*/private static final String DEAD_QUEUE = "dead_queue"; /*死信队列*/public static void main(String[] args) throws IOException, TimeoutException {/*获取信道*/Channel channel = RabbitmqUtil.getChannel();/*声明死信交换机和普通交换机*/channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);/*声明死信队列*/channel.queueDeclare(DEAD_QUEUE, false, false, false, null);/*死信队列绑定死信交换机*/channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead");/*声明正常队列, 在声明队列前设置队列的属性*/Map<String, Object> parms = new HashMap<>();/*设置消息过期时间10s,超过10s后消息就会被投递到死信交换机*///parms.put("x-message-ttl", 10000);/*正常队列关联死信交换机, 关联key是固定的, 关联后正常队列中消息无法被消费者消费后就会把消息投递到死信交换机*/parms.put("x-dead-letter-exchange", DEAD_EXCHANGE);/*为正常队列设置消息失败后投递到死信交换机绑定的routingKey, 这样消息消费后就会被死信交换机通过"dead"这个routingKey路由到死信队列中*/parms.put("x-dead-letter-routing-key", "dead");/*设定队列的最大长度限制为4, 表示正常队列中最多同时存放4条消息,继续投递过来的消息只能进死信队列*/parms.put("x-max-length", 4);channel.queueDeclare(NORMAL_QUEUE, false, false, false, parms);/*正常队列绑定正常交换机*/channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(message);};CancelCallback cancelCallback = consumerTag -> {System.out.println("消费失败");};channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, cancelCallback);}}
Producer代码中注释掉设置的消息过期时间,并且设置发布7条消息,修改后代码如下所示
public class Producer {private static final String NORMAL_CHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {/*获取信道*/Channel channel = RabbitmqUtil.getChannel();/*设置消息在队列中存活时间*///AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();/*发布消息*/String message = null;for (int i=1; i<8; i++){message = "commodity" + i;channel.basicPublish(NORMAL_CHANGE, "normal", null, message.getBytes());System.out.println("生产者发布消息:" + message);}}
}
Consumer2代码无任何修改。
下面重新进行验证,首先启动Conumer1程序,主要是为先创建normal_queue队列,从浏览器管理端可以查看创建的normal_queue队列,其中该队列有3个属性,Lim表示队列设置了最大长度。
下面为了验证正常队列中4条消息满了后其余消息就会进入死信队列,首先关闭Consumer1,然后启动Producer,输出如下7条消息,表示生产者生产了下述7条消息
生产者发布消息:commodity1
生产者发布消息:commodity2
生产者发布消息:commodity3
生产者发布消息:commodity4
生产者发布消息:commodity5
生产者发布消息:commodity6
生产者发布消息:commodity7
查看浏览器管理端如下所示,正常队列中有4条消息,正好是正常队列的最大限度,而死信队列中是3条记录。
下面启动Consumer2消费掉死信队列中的消息,发现是下面3条消息进入了死信队列
commodity1
commodity2
commodity3
然后再重新启动Consumer1消费掉正常队列中的消息,发现是下面4条消息进入的正常队列。
commodity4
commodity5
commodity6
commodity7
3. 案例三:消息被拒
还是以上图所示案例,当生产者生产的消息被Consumer1拒绝消费时,被拒绝的消息就会投递到死信队列中。
假设生产者生产commodity1~commodity7之间的消息,但只有commodity3被拒绝消费。
下面修改Consumer1代码,只需要在添加一句向RabbitMQ服务器发送basicReject的拒绝指令即可,修改后代码如下
public class Consumer1 {private static final String NORMAL_EXCHANGE = "normal_exchange"; /*正常交换机*/private static final String DEAD_EXCHANGE = "dead_exchange"; /*死信交换机*/private static final String NORMAL_QUEUE = "normal_queue"; /*正常队列*/private static final String DEAD_QUEUE = "dead_queue"; /*死信队列*/public static void main(String[] args) throws IOException, TimeoutException {/*获取信道*/Channel channel = RabbitmqUtil.getChannel();/*声明死信交换机和普通交换机*/channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);/*声明死信队列*/channel.queueDeclare(DEAD_QUEUE, false, false, false, null);/*死信队列绑定死信交换机*/channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead");/*声明正常队列, 在声明队列前设置队列的属性*/Map<String, Object> parms = new HashMap<>();/*设置消息过期时间10s,超过10s后消息就会被投递到死信交换机*///parms.put("x-message-ttl", 10000);/*正常队列关联死信交换机, 关联key是固定的, 关联后正常队列中消息无法被消费者消费后就会把消息投递到死信交换机*/parms.put("x-dead-letter-exchange", DEAD_EXCHANGE);/*为正常队列设置消息失败后投递到死信交换机绑定的routingKey, 这样消息消费后就会被死信交换机通过"dead"这个routingKey路由到死信队列中*/parms.put("x-dead-letter-routing-key", "dead");/*设定队列的最大长度限制为4, 表示正常队列中最多同时存放4条消息,继续投递过来的消息只能进死信队列*///parms.put("x-max-length", 4);channel.queueDeclare(NORMAL_QUEUE, false, false, false, parms);/*正常队列绑定正常交换机*/channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");if (message.equals("commodity3")){System.out.println("Consumer1拒绝处理消息:" + message);channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false); //false表示被拒绝的消息不再放入队列中}else {System.out.println(message + "消费成功");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};CancelCallback cancelCallback = consumerTag -> {System.out.println("消费失败");};channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);}
}
Producer类和Consumer2类继续采用案例二中的
下面进行测试,在进行测试前首先删除normal_queue队列,然后重新启动Consumer1类,然后再启动Producer进行生产消息,生产者输出如下,表示生产了7条消息
生产者发布消息:commodity1
生产者发布消息:commodity2
生产者发布消息:commodity3
生产者发布消息:commodity4
生产者发布消息:commodity5
生产者发布消息:commodity6
生产者发布消息:commodity7
再看Consumer1的输出如下所示,表示除了commodity3消息被拒绝处理,其他6个消息都被正常消费了
ommodity1消费成功
commodity2消费成功
Consumer1拒绝处理消息:commodity3
commodity4消费成功
commodity5消费成功
commodity6消费成功
commodity7消费成功
再看我们监控端,显示有一个消息被投递到了dead_queue死信队列中
为了确定死信队列中的一个消息是否是被拒绝的commodity3消息,下面启动Consumer2消费死信队列中消息,输出如下所示,表示死信队列中的消息正是被Consumer1拒绝处理的commodity3消息。
commodity3
6 RabbitMQ之死信队列相关推荐
- RabbitMq(五) -- 死信队列和延迟队列
1. 死信 1.1 死信的概念 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue ...
- RabbitMQ:死信队列
✨ RabbitMQ:死信队列 1.死信队列 1.1死信队列基本介绍 1.2消息成为死信的三种情况 1.3死信队列结构图 1.4死信的处理方式 2.TTL消息过期时间 2.1基本介绍 2.2生产者 2 ...
- rabbitMQ学习-死信队列
死信队列 死信:顾名思义就是无法被消费的消息,一般情况下,product将消息投递到broker或者直接到queue里,consumer从queue取出消息,进行消费,但某些时候由于特定的原因导致qu ...
- RabbitMQ 之死信队列
文章目录 什么是死信队列 如何配置死信队列 死信消息的变化 死信队列应用场景 总结 什么是死信队列 为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将 ...
- rabbitmq利用死信队列+TTL 实现延迟队列
2019独角兽企业重金招聘Python工程师标准>>> 适用场景:订单超时未支付,倘若适用定时器的话,那么数据量大的话,轮询查询数据,首先IO开销大,其次任务时间要求高,扫描越频繁性 ...
- RabbitMQ的死信队列的应用
强烈推荐一个大神的人工智能的教程:http://www.captainbed.net/zhanghan [前言] 最近在项目中用到了RabbitMQ来做异步处理,自己将这块儿系统的搞了搞,下面主要记录 ...
- 消息中间件之rabbitMQ实战-死信队列
该篇文章内容较多,包括有rabbitMq相关的一些简单理论介绍,集成spring Boot,provider消息推送实例,consumer消息消费实例,Direct(直连类型交换机).Fanout(广 ...
- RabbitMQ实现死信队列
目录 死信队列是什么 怎样实现一个死信队列 说明 实现过程 导入依赖 添加配置 编写mq配置类 添加业务队列的消费者 添加死信队列的消费者 添加消息发送者 添加消息测试类 测试 死信队列的应用场景 总 ...
- RabbitMQ高级特性(五):RabbitMQ之死信队列DLX
一.死信队列简介 (1)死信队列 死信队列,英文缩写:DLX .Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就 ...
- RabbitMQ的死信队列
什么是死信 在 RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现. 死信就是消息在特定场景下的一种表现形式,这些场景包括: 消息被拒绝访问,即 RabbitMQ返回 nack ...
最新文章
- mysql怎么查询排第几名并列_MySQL并列排名和顺序排名查询
- 文件输入输出和string流
- ABAP业务涉及到的相关数据库表 .
- 【matlab-7】Matlab与线性代数(三)
- “彪悍人生”罗永浩被法院限制消费,网友:期待王者归来
- 字体选择_Word文档中的字体批量选择与更改,查找替换功能必杀技
- ASP.NET MVC传送参数至服务端
- 入门必学 | R语言程序包的安装与使用指南
- matlab画图的参数,matlab画图参数
- 求PIFA天线相关介绍
- java程序员怎么创建自己的网站:第四章:做个网站引入广告赚点小钱
- 关于虚拟机中IPI中断的思考
- 数据处理之缺失值填充
- 计算机禁用打印驱动服务器,设备: 防止用户安装打印机驱动程序
- logback 日志脱敏 隐藏PII信息
- 团队作业第六次——团队Github实战训练
- 全国计算机应用水平考试图像处理,全国计算机应用水平考试图像处理考试大纲(2019年版).PDF...
- Spark的conf目录下没有slaves文件,spark3.1.2解压以后conf目录下没有slaves,spark找不到slaves怎么办,解决slaves问题
- 爬虫终于找到了知乎/B站 Top100大V,关注!
- 【算法-面试】区间专题