RabbitMQ学习笔记 - mandatory参数
参考:<<RabbitMQ实战指南>>
mandatory和immediate是channel.basicPublish方法中的两个参数,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。
RabbitMQ 3.0版本开始去掉了对immediate参数的支持,这里就不在讨论该参数。
1. 发送消息api:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
参数说明:
- exchange:交换器名称,指定消息需要发送到哪个交换器中。如果设置为空字符串,则消息会被发送到RabbitMQ默认的交换器中。
- routingKey:路由键,交换器根据路由键将消息存储到相应的队列之中。
- mandatory:交换器无法根据自身类型和路由键找到一个符合条件的队列时的处理方式
- true:RabbitMQ会调用Basic.Return命令将消息返回给生产者
- false:RabbitMQ会把消息直接丢弃
- immediate:设置true时,如果该消息关联的队列上有消费者,则立即投递,否则这条消息不存入队列;如果与路由键匹配的所有队列都没有消费者时,该消息会通过Basic.Return返回至生产者
- props:消息属性集,包含14个属性成员,如持久化、优先级、投递模式、过期时间等等
- body:消息体,需要发送的消息
2.获取没有被正确路由的消息
mandatory设置为true之后,生产者通过调用channel.addReturnListener()方法来添加ReturnListener监听器,实现获取没有被正确路由到合适队列的消息。有以下几种情形:
- 交换器没有绑定任何队列
- 路由键错误,即交换器无法根据路由键投递消息到队列
注:如果是无法路由到交换器上,则不会触发Basic.Return命令,也就是监听器不会接收到无法路由的消息。
3.示例
3.1 原生api
// 获取Connection、创建Channel步骤略
// 声明交换器
String exchangeName = "direct.exchange.test.mandatory";
AMQP.Exchange.DeclareOk declareOk = channel.exchangeDeclare(exchangeName, "direct", true);
// 声明队列
String queueName = "direct.queue.test.mandatory";
Map<String, Object> arguments = new HashMap<>();
String queue = channel.queueDeclare(queueName, true, false, false, null).getQueue();
// 绑定交换器和队列
String routingKey = "direct.routing-key.test.mandatory";
channel.queueBind(queue, exchangeName, routingKey);// 正常路由的消息
channel.basicPublish(exchangeName, routingKey, true, MessageProperties.TEXT_PLAIN, "Test Msg".getBytes("UTF-8"));// 不可路由的消息
channel.basicPublish(exchangeName, routingKey + "2", true, MessageProperties.TEXT_PLAIN, "Test Msg2".getBytes("UTF-8"));channel.addReturnListener(new ReturnListener() {@Overridepublic void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(message);}
});
3.2 springboot
(1)添加rabbitmq的starter
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)application.yml
通过spring.rabbitmq.publisher-returns=true
属性以及实现org.springframework.amqp.rabbit.core.RabbitTemplate.RabbitTemplate.ReturnCallback
接口来接收无法路由的消息。
实际上,最终还是利用channel.basicPublish()方法,将mandatory设置为true来实现。
spring:rabbitmq:host: dev.tss.comport: 5672username: adminpassword: njittss# 开启发送确认# publisher-confirms: true# 开启发送失败退回,或者通过rabbitTemplate.setMandatory(true);设置publisher-returns: truerabbitmq:direct:test:mandatory:exchangeName: direct.exchange.test.mandatoryqueueName: direct.queue.test.mandatoryroutingKey: direct.routing-key.test.mandatory
(3)添加监听器
@Component
public class RabbitCallback implements RabbitTemplate.ReturnCallback {private static final Logger LOGGER = LoggerFactory.getLogger(RabbitCallback.class);@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {// 也可以通过这种方式配置// rabbitTemplate.setMandatory(true);// 每个RabbitTemplate只能设置一个RabbitTemplate.ReturnCallbackrabbitTemplate.setReturnCallback(this);}/*** 交换机路由到队列失败才会回调*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {LOGGER.warn("return callback, receive message :" + message.toString() + ", " + replyText + ", " + exchange + ", " + routingKey);}
}
(4)发送消息测试
当调用sendAbnormalMessage()方法发送消息时,监听器会收到无法路由到队列的消息。
@Value("${rabbitmq.direct.test.mandatory.exchangeName}")
private String exchangeName;
@Value("${rabbitmq.direct.test.mandatory.routingKey}")
private String routingKey;@Autowired
private RabbitTemplate rabbitTemplate;// 测试发送可以路由的消息
public boolean sendNormalMessage() {String message = "test normal message";this.rabbitTemplate.convertAndSend(exchangeName, routingKey, message);return true;
}// 测试发送不可路由的消息
public boolean sendAbnormalMessage() {String message = "test abnormal message";this.rabbitTemplate.convertAndSend(exchangeName, routingKey + "2", message);return true;
}
最后扒一下spring发送消息设置mandatory逻辑:
- 通过
rabbitTemplate.convertAndSend(exchangeName, routingKey, message)
方法发送消息,这是个重载方法。 - 继续debug,还是在RabbitTemplate类中,先调用了send()方法,再由匿名内部类调用doSend()方法,结合doSend参数来看,就是由
RabbitTemplate.this.returnCallback != null && RabbitTemplate.this.mandatoryExpression.getValue(RabbitTemplate.this.evaluationContext, message, Boolean.class)
这段逻辑确定了mandatory参数是true还是false。翻译过来就是:设置了ReturnCallback以及属性spring.rabbitmq.publisher-returns=true(或rabbitTemplate.setMandatory(true))
public void send(final String exchange, final String routingKey, final Message message, final CorrelationData correlationData) throws AmqpException {execute(new ChannelCallback<Object>() {@Overridepublic Object doInRabbit(Channel channel) throws Exception {doSend(channel, exchange, routingKey, message, RabbitTemplate.this.returnCallback != null && RabbitTemplate.this.mandatoryExpression.getValue(RabbitTemplate.this.evaluationContext, message, Boolean.class), correlationData);return null;}}, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
}protected void doSend(Channel channel, String exchange, String routingKey, Message message,boolean mandatory, CorrelationData correlationData) throws Exception {// ...Message messageToUse = message;MessageProperties messageProperties = messageToUse.getMessageProperties();if (mandatory) {messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_CORRELATION_KEY, this.uuid);}// ...// 发送消息channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, messageToUse.getBody());// ...
}
springboot-rabbitmq-demo测试代码:https://github.com/mytt-10566/springboot-rabbitmq-demo
RabbitMQ学习笔记 - mandatory参数相关推荐
- RabbitMQ 学习笔记
RabbitMQ 学习笔记 RabbitMQ 学习笔记 1. 中间件 1.1 什么是中间件 1.2 为什么要使用消息中间件 1.3 中间件特点 1.4 在项目中什么时候使用中间件技术 2. 中间件技术 ...
- Rabbitmq学习笔记(尚硅谷2021)
Rabbitmq学习笔记 (尚硅谷) 1.MQ 的概念 1.1 什么是 MQ? 1.2 为什么要用 MQ? 削峰 解耦 异步 1.3 MQ 的分类 ActiveMQ Kafka RocketMQ Ra ...
- Rabbitmq学习笔记教程-尚硅谷
Rabbitmq学习笔记 (尚硅谷) 尚硅谷 rabbitmq 教程 1.MQ 的概念 1.1 什么是 MQ? 存放消息的队列,互联网架构中常见的一种服务与服务之间通信的方式. 1.2 为什么要用 M ...
- RabbitMQ学习笔记(3)----RabbitMQ Worker的使用
1. Woker队列结构图 这里表示一个生产者生产了消息发送到队列中,但是确有两个消费者在消费同一个队列中的消息. 2. 创建一个生产者 Producer如下: package com.wangx.r ...
- RabbitMQ学习笔记(高级篇)
RabbitMQ学习笔记(高级篇) 文章目录 RabbitMQ学习笔记(高级篇) RabbitMQ的高级特性 消息的可靠投递 生产者确认 -- confirm确认模式 生产者确认 -- return确 ...
- RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决)
RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决) 参考文章: (1)RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决) (2)https://www.cnblogs. ...
- RabbitMQ学习笔记
目录 一.MQ 的相关概念 MQ是什么? MQ三大优势 MQ的劣势 MQ 的产品 RabbitMQ核心 JMS 安装 二.HelloWorld 三.Work Queues(轮训) 消息应答 Rabbi ...
- 分布式消息中间件之RabbitMQ学习笔记[一]
写在前面 嗯,陆续的整理一些中间件的笔记 今天和小伙伴们分享RabbitMQ 相关笔记 博文偏理论,内容涉及: RabbitMQ的简单介绍 AMQP协议标准介绍 RabbitMQ Demo 食用方式: ...
- MVC缓存OutPutCache学习笔记 (一) 参数配置
OutPutCache 参数详解 Duration : 缓存时间,以秒为单位,这个除非你的Location=None,可以不添加此属性,其余时候都是必须的. Location : 缓存放置的位置; 该 ...
最新文章
- 搭建Ubuntu18.04+Anaconda3.x+Pycharm+SimpleITK(二)
- 响应接收窗口大小( ra-ResponseWindowSize)
- 工业4.0提出者孔翰宁详解工业4.0
- 【机器学习】关于机器学习模型可解释(XAI),再分享一招!
- 一篇文章把Self-Attention与Transformer讲明白
- nopcommerce商城系统--源代码结构和架构
- 彻底清除计算机远程桌面连接的历史记录
- php基础教程 第十一步 面向对象
- Pyhton随机生成测试数据模块faker
- 物联网核心安全系列——车载物联网的加密防盗版
- html单元格竖着排列,html表格,表头竖向固定,横向滚动的例子
- 深入解读Linux进程调度Schedule
- 微信小程序手动获取自己位置wx.chooseLocation
- ftp客户端flashfxp破解教程
- 数据库系统基础教程复习
- NN求解NS方程进一步探讨
- IT管理员喜欢OpManager的十大原因
- 善领声音编辑器,修改search.dat完美启动静音
- python体测成绩数据分析_Python+Excel数据分析实战:军事体能考核成绩评定(二)基本框架和年龄计算...
- python批量将png格式转换为jpg格式,并保存到新的文件夹
热门文章
- 数据质量问题剖析与解决锦囊
- redis实现轮询算法_白话分布式系统中的一致性哈希算法
- 淘宝API接口大纲,引领企业信息化
- 【正点原子STM32连载】第二十一章 通用定时器实验 摘自【正点原子】MiniPro STM32H750 开发指南_V1.1
- win7需要计算机管理员权限,关于告诉你win7系统提示“需要管理员权限”的修复办法...
- 什么是redis?redis如何使用?
- “无剑胜有剑”软件大师之路的一点探索
- 最有用的期货技术 — 无招胜有招
- Python——列表推导式
- Java中的数值计算