RabbitMQ(五)死信队列和延迟队列
1.1 概念
先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
1.2 死信的来源
消息 TTL 过期
队列达到最大长度(队列满了,无法再添加数据到 mq 中)
消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false
1.3 死信实战
1.3.1 代码架构图
1.3.2 ttl过期
生产者代码
public class Producer {private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//设置消息的 TTL 时间AMQP.BasicProperties properties = newAMQP.BasicProperties().builder().expiration("10000").build();for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties,message.getBytes());System.out.println("生产者发送消息:" + message);}}}
}
消费者代码
public class Consumer01 {//普通交换机名称private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信队列绑定死信交换机与 routingkeychannel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常队列绑定死信队列信息Map<String, Object> params = new HashMap<>();//正常队列设置死信交换机 参数 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常队列设置死信 routing-key 参数 key 是固定值params.put("x-dead-letter-routing-key", "lisi");
String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("Consumer01 接收到消息" + message);};channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});}
}
死信消费者
public class Consumer02 {private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");System.out.println("等待接收死信队列消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("Consumer02 接收死信队列的消息" + message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});}
}
1.3.3 队列达到最大长度
1)生产者发送消息去掉ttl属性
2)普通消费者增加如下属性
params.put("x-max-length",6);
此时我们发送10条,就会有4条消息被发送到死信队列。
重启Consumer01,另外6条也会被消费。
1.3.4 消息被拒
public class Consumer03 {//普通交换机名称private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信队列绑定死信交换机与 routingkeychannel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常队列绑定死信队列信息Map<String, Object> params = new HashMap<>();//正常队列设置死信交换机 参数 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常队列设置死信 routing-key 参数 key 是固定值params.put("x-dead-letter-routing-key", "lisi");String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);if (message.equals("info5")) {System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);} else {System.out.println("Consumer01 接收到消息" + message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};boolean autoAck = false;channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {});}
}
2. 延迟队列
2.1概念
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
2.2 使用场景
1.订单在十分钟之内未支付则自动取消。
2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。
RabbitMQ(五)死信队列和延迟队列相关推荐
- 【重难点】【RabbitMQ 01】消息队列的作用、主流的消息队列、RabbitMQ 基于什么传输消息、RabbitMQ 模型架构、死信队列和延迟队列
[重难点][RabbitMQ 01]消息队列的作用.主流的消息队列.RabbitMQ 基于什么传输消息.RabbitMQ 模型架构.死信队列和延迟队列 文章目录 [重难点][RabbitMQ 01]消 ...
- 死信队列和延迟队列_在实践中使用延迟队列
死信队列和延迟队列 通常,在某些情况下,当您有某种工作或作业队列时,有必要不立即处理每个工作项或作业,而是要延迟一些时间. 例如,如果用户单击一个按钮来触发要完成的某项工作,而一秒钟后,用户意识到他/ ...
- 【无标题】支付场景常见的死信队列+TTL延迟队列的实现
支付场景常见的死信队列+TTL延迟队列的实现 有道云笔记https://note.youdao.com/s/8ndbQ5NS 分析:可以发现 ,一个消息随着业务产生后 (这个消息伴随着死信队列发出)经 ...
- RabbitMq(五) -- 死信队列和延迟队列
1. 死信 1.1 死信的概念 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue ...
- RabbitMq(十二) 借用死信交换机实现延迟队列
概述:延迟队列即在消息发送后延迟固定时间后再去接受处理,做相应的一些相应. 应用场景举例:在电商购物后,订单支付前发送消息信息,在三分钟之后检查订单是否支付成功,如果支付,则取消订单并库存数量恢复:或 ...
- Rabbitmq超级详细的笔记,包括安装,基本命令,rabbitmq的七种消息模式,以及死信队列,延迟队列,优先级队列和惰性队列的介绍
RabbitMQ 文章目录 RabbitMQ 1 RabbitMQ介绍 1.1 基本介绍 1.2 RabbitMQ的安装 1.2.1 ubuntu20.04 安装rabbitmq 1.2.2 cent ...
- redisson究极爽文-手把手带你实现redisson的发布订阅,消息队列,延迟队列(死信队列),(模仿)分布式线程池
参考资料 :分布式中间件实战:java版 (书籍), 多线程视频教程(视频)- 项目启动环境 导入依赖 <parent><groupId>org.springframework ...
- 消息队列之延迟队列超详细入门教程速看
一. 延迟队列的应用场景 1.具体应用 关于消息队列我们已经很熟悉了,我们知道在消息队列中可以实现延迟队列效果,那你知道延迟队列有哪些使用场景吗?这里我给大家总结了延迟队列的几个经典使用场景,看看你的 ...
- RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列
搭建SpringBoot项目,用于演示 springboot版本 <!-- spring boot --><dependency><groupId>org.spri ...
最新文章
- 单应性矩阵和仿射变换_单应矩阵 基本矩阵 本质矩阵的区别与联系
- Mysql安装和常用命令及问题汇总
- 聊透分布式系统一致性
- idea快捷键之记录
- 如何查看现有项目的struts和hibernate和spring版本
- flex invalidation 机制
- 单片机奇偶交替闪烁_自学单片机第十三篇中:单点交替
- linux锐捷代码_锐捷 for linux - 沈阳建筑大学
- PCWorld:IT界14对经典“生死冤家”
- Failed to install the following Android SDK packages as some licences have not been accepted.
- 手把手教你配置阿里云服务器搭建网站(图文教程)
- Windows使用shipyard
- mysql计算1000天后的日期_Mysql中常用的日期函数
- 第7章 Stata相关性分析
- 关于移动硬盘变成CD驱动器的非正常修复方法
- 十分详细的阳光十六法则
- 树莓派机器视觉环境搭建
- PyTorch 2.0 重磅发布:一行代码提速 30%
- Origin与OriginPro 版本一览图
- 使用python将freemind转化成excel