RabbitMQ消息路由失败的处理方案(回调与备份交换机AE)
我们知道,消息在RabbitMQ的整个生命周期是生产者投递消息到Exchange,Exchange根据路由键将消息路由到合适的Queue,Queue再将消息推(或消费者主动拉)给消费者。
在这个过程当中,Exchange根据路由键将消息路由到合适的Queue的过程,可能发生诸如
Exchange没有任何Queue与其绑定,
或者根据消息的路由键,没有任何一个合适的Queue来投递消息,
从而导致消息路由失败。对于这些路由失败的消息应该如何处理呢?有两种方式:
将消息返回给投递该条消息的生产者。
使用备份交换机 alternate-exchange(AE)。
方式1:将消息返回给投递该条消息的生产者
配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=futao
spring.rabbitmq.password=123456789
spring.rabbitmq.virtual-host=/tech-sharing# 当exchange无法找到任何一个合适的queue时,将消息return给生产者
spring.rabbitmq.template.mandatory=true
# 必须设置为true,否则消息消息路由失败也无法触发Return回调
spring.rabbitmq.publisher-returns=true
交换机定义与消息发送
@Slf4j
@Component
public class NoMatchQueue {/*** 交换机名称*/public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE";@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void send() {log.info("发送消息");Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus());Message message = MessageBuilder.withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8)).setContentEncoding(StandardCharsets.UTF_8.displayName()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);}
}@Configuration
class ExchangeDeclare {/*** 只定义一个交换机,但是不绑定任何Queue,所以发送到该Exchange的消息都会路由失败** @return*/@Beanpublic Exchange noMatchQueueExchange() {return ExchangeBuilder.topicExchange(NoMatchQueue.EXCHANGE_NAME).durable(true).build();}
}
设置回调函数
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.error("消息被退回:{}", returnedMessage);}
});
消息被退回:且可以看到原因是无法路由
方式2:使用备份交换机
使用方式1需要我们在程序中进行编码设置回调函数监听,增加了生产者代码的复杂性,那么为了消息不丢失还有没有其他方式来处理路由失败的消息呢:答案是使用备份交换机。
相较于使用回调函数,使用备份交换机只需要给交换机绑定一个备份交换机即可,当消息路由失败之后,消息将投递到备份交换机,再由备份交换机路由消息到备份队列。这样我们只需要关注这个备份队列就能知道/获取到路由失败的消息。通常情况下备份交换的Type应该设置为
fanout
。配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=futao
spring.rabbitmq.password=123456789
spring.rabbitmq.virtual-host=/tech-sharing# 当exchange无法找到任何一个合适的queue时,将消息return给生产者
spring.rabbitmq.template.mandatory=false
# 必须设置为true,否则消息消息路由失败也无法触发Return回调
spring.rabbitmq.publisher-returns=false
注意: 使用备份交换机模式,mandatory将无效,即就算mandatory设置为false,路由失败的消息同样会被投递到绑定的备份交换机。
正常业务交换机(不绑定队列,使得消息一定会路由失败)
/*** 业务交换机** @return*/
@Bean
public Exchange noMatchQueueExchange() {return ExchangeBuilder.topicExchange(NoMatchQueueAlternateExchange.EXCHANGE_NAME).durable(true)// 绑定备份交换机.alternate(X_ALTERNATE).build();
}
备份交换机/队列/绑定
/*** 备份队列** @return*/
@Bean
public Queue alternateQueue() {return QueueBuilder.durable("Q_ALTERNATE").build();
}/*** 备份交换机** @return*/
@Bean
public Exchange alternateExchange() {return ExchangeBuilder.fanoutExchange(X_ALTERNATE).durable(true).build();
}/*** 备份绑定** @param alternateExchange* @param alternateQueue* @return*/
@Bean
public Binding alternateBinding(Exchange alternateExchange, Queue alternateQueue) {return BindingBuilder.bind(alternateQueue).to(alternateExchange).with("").noargs();
}
消息投递
/*** 正常业务交换机*/
public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE_ALTERNATE";@Autowired
private RabbitTemplate rabbitTemplate;/*** 发送消息*/
@PostConstruct
public void send() {log.info("发送消息");Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus());Message message = MessageBuilder.withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8)).setContentEncoding(StandardCharsets.UTF_8.displayName()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);
}
结果是消息被路由到备份交换机的备份队列
且:如果你同时使用了两种方式,即(mandatory为true+Listener监听)和(备份交换机AlternateExchange),消息将只会路由到备份交换机,不会Return回生产者。
# 在原生RabbitMQ-client中演示这一过程:
@Slf4j
public class AeTest {/*** 获取Channel*/private static final Channel CHANNEL = MqChannelUtils.getChannel();/*** 备份交换机*/private static final String X_AE = "X_AE";/*** 备份交换机绑定的队列*/private static final String Q_AE = "Q_AE";/*** 正常业务的交换机*/private static final String X_1 = "X_1";public static void main(String[] args) throws IOException {// 定义备份交换机-其实也是一个正常的交换机CHANNEL.exchangeDeclare(X_AE, BuiltinExchangeType.FANOUT, true);// 定义备份队列CHANNEL.queueDeclare(Q_AE, true, false, false, null);// 绑定备份CHANNEL.queueBind(Q_AE, X_AE, "");HashMap<String, Object> arguments = new HashMap<>();// 绑定的备份交换机arguments.put("alternate-exchange", X_AE);// 定义交换机CHANNEL.exchangeDeclare(X_1, BuiltinExchangeType.TOPIC, false, false, arguments);// 添加监听器,看看是否还会return消息CHANNEL.addReturnListener(new ReturnCallback() {@Overridepublic void handle(Return returnMessage) {log.error("消息被退回{}", returnMessage);}});// 尝试向交换机发送消息(无法路由)- mandatory参数无效CHANNEL.basicPublish(X_1, "", false, false,new AMQP.BasicProperties(), "阿依古丽".getBytes(StandardCharsets.UTF_8));}
}
两个交换机,正常的交换机
X_1
和备份交换机X_AE
备份交换机绑定的队列已经接收到了路由失败的消息
其他要注意的点:
备份交换机的Type设置为fanout比较合适,这样可以忽略RoutingKey,避免备份交换机又路由失败。
被投递到备份交换机的RoutingKey为消息投递到MQ时的原始RoutingKey,不会变,这一点在其他场景下也是一样的。
使用备份交换机模式,mandatory将无效,即就算mandatory设置为false,路由失败的消息同样会被投递到绑定的备份交换机。
# 源代码
https://gitee.com/FutaoSmile/tech-sharing-mq
特别推荐一个分享架构+算法的优质内容,还没关注的小伙伴,可以长按关注一下:
长按订阅更多精彩▼如有收获,点个在看,诚挚感谢
RabbitMQ消息路由失败的处理方案(回调与备份交换机AE)相关推荐
- rabbitmq可靠性投递_解决RabbitMQ消息丢失问题和保证消息可靠性(一)
工作中经常用到消息中间件来解决系统间的解耦问题或者高并发消峰问题,但是消息的可靠性如何保证一直是个很大的问题,什么情况下消息就不见了?如何防止消息丢失?下面通过这篇文章,我们就聊聊RabbitMQ 消 ...
- 保证RabbitMQ消息的可靠性总结
文章目录 一.关于消息的可靠性 二.生产者发送消息对象 三.将消息发送给交换机 四.将消息发送给队列 五.将消息发送给消费者 六.保证消息的幂等性 七.死信消息的补偿[存在问题,待完善] 一.关于消息 ...
- rabbitmq消息队列入门到整合springboot(篇幅较长内容详细)
1.安装rabbitmq服务器 我们选择在linux下安装 安装的前提需要在虚拟机下安装docker docker pull rabbitmq:management(拉去镜像) docker run ...
- rabbitmq消息队列,消息发送失败,消息持久化,消费者处理失败相关
转:https://blog.csdn.net/u014373554/article/details/92686063 项目是使用springboot项目开发的,前是代码实现,后面有分析发送消息失败. ...
- RabbitMQ消息队列,发送消息失败、消息持久化、消费者失败处理方法和发送消息
项目是使用springboot项目开发的,前是代码实现,后面有分析发送消息失败.消息持久化.消费者失败处理方法和发送消息解决方法及手动确认的模式 先引入pom.xml <!--rabbitmq- ...
- rabbitmq消息ACK确认机制及发送失败处理
rabbitmq为确保消息发送和接收成功,采用ack机制. (1)生产者producter发送消息到mq时,mq会发送ack给producter告知消息是否投递成功: (2)消费者consumer接收 ...
- BizTalk接收消息后路由失败
错误描述 The published message could not be routed because no subscribers were found. This error occurs ...
- 四种策略确保 RabbitMQ 消息发送可靠性!你用哪种?
微服务可以设计成消息驱动的微服务,响应式系统也可以基于消息中间件来做,从这个角度来说,在互联网应用开发中,消息中间件真的是太重要了. 今天,以 RabbitMQ 为例,松哥来和大家聊一聊消息中间消息发 ...
- RabbitMQ(消息队列)浅记
消息队列 PS:大二下学习RabbitMQ的随手小记 一.什么是 MQ MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是message 而 ...
最新文章
- debian+pxe+preseed.cfg 安装配置
- AppStore 拒绝审核原因:PLA 2.3
- 能源结构进入变革时代 光伏业趋于壮大转型
- python打开中文文本utf-8用不了_关于Python文档读取UTF-8编码文件问题
- 从TCP到RDMA网络最新技术|扩展技术视野
- 数据可视化(BI报表的开发)第三天
- ajax delete 传递参数,springMVC使用PUT、DELETE方法传递参数解决方案
- Linus Torvalds:未来 25 年,Linux 风采依旧!
- 用vs2008创建运行c++项目
- 市场调研-全球与中国天线测量系统市场现状及未来发展趋势
- 三维医学图像数据标注 3D Slicer
- word自带参考文献标注功能—以word2013为例
- 主成分分析(PCA)及动态主成分分析(Dynamic PCA)模型原理分析
- SQL语句(查询、新建表、删除表、更新表、新建视图)
- 软件开发程序员的“九阳神功”——设计模式
- Git添加远程子仓库
- 视频教程-【跟一夫学设计】从0基础到精通学全套coreldraw x7轻松掌握CDR基础加案例学习视频教程-CorelDraw
- 计算机 哈弗结构图,作为一个程序员,不知道什么是冯诺依曼体系结构?那肯定也不知道哈佛结构喽!...
- [苹果开发者账号]03 申请APPID、苹果开发者账号 常见问题整理
- Leetcode跳跃游戏
热门文章
- 简单介绍SQL中ISNULL函数使用方法
- 一名合格的运维工程师的历练之路
- poj3177(双联通分量)
- 2021-03-26习题4-7 最大公约数和最小公倍数 (15 分)
- poj3304(线段相交问题)
- BZOJ 2142 礼物(拓展Lucas,中国剩余定理)【BZOJ修复工程】
- Luogu P4336 [SHOI2016]黑暗前的幻想乡(容斥,矩阵树定理,子集反演)
- #6279. 数列分块入门 3(区间修改,查询权值前驱)
- Liunx下MySQL常用命令
- java session缓存_Java服务端采用Session的缓存oauth2.0授权用户信息