rabbitmq:publisher confirms
0.背景
最近在我们的业务系统中遇到一个问题,
publisher
行为:convertAndSend
然后打日志。
consumer
行为:@RabbitListener
接到消息立刻打日志。
问题是,publisher
打出了发送消息的日志,consumer
没打出收到消息的日志。
基于这种情况,准备启用rabbitmq java client
的ReturnCallback
及ConfirmCallback
机制,先确认消息是否成功发到了正确的queue
里面。
之前没有用Callback
,因为对于我们的场景,Rabbitmq
还是非常稳定的,即使极少出现的异常情况,我们也有办法把丢掉的消息补发,因此没必要浪费Channel
资源去让rabbitmq server
给发送确认信息,也不想平白增加系统复杂性。
1.代码实现
一般我们使用rabbitmq
可能会配置下面几个bean
(不论通过何种方式,xml
,@Configuretion
,或者spring boot
的autoconfigure
),在此基础上,添加一些属性设置:
@Configuration
public class MqConfig {@Value("${rabbitmq.enableConfirm}")private boolean enableConfirm;@Value("${rabbitmq.enableReturn}")private boolean enableReturn;@Value("${rabbitmq.enableMessageCorrelation}")private boolean enableMessageCorrelation;@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();//省略其它属性设置...//根据配置决定是否开启 Confirm 机制connectionFactory.setPublisherConfirms(enableConfirm);//根据配置决定是否开启 Return 机制connectionFactory.setPublisherReturns(enableReturn);return connectionFactory;}@Beanpublic RabbitTemplate rabbitTemplate() throws Exception {//根据配置决定使用哪种 RabbitTemplateRabbitTemplate template = enableMessageCorrelation ?new CorrelationRabbitTemplate(connectionFactory()) : new RabbitTemplate(connectionFactory());//省略其它属性设置...//如果启用 Confirm 机制,设置 ConfirmCallbackif (enableConfirm) {template.setConfirmCallback(confirmCallback());}//如果启用 Return 机制,设置 ReturnCallback,及打开 Mandatoryif (enableReturn) {template.setReturnCallback(returnCallback());template.setMandatory(true);}return template;}
}
对于Publisher
而言,以上两个bean
足以。
下面是 RabbitTemplate
中需要的ConfirmCallback
和ReturnCallback
:
@Bean@ConditionalOnMissingBean(value = RabbitTemplate.ConfirmCallback.class)public RabbitTemplate.ConfirmCallback confirmCallback() {return new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {// do something ...};}@Bean@ConditionalOnMissingBean(value = RabbitTemplate.ReturnCallback.class)public RabbitTemplate.ReturnCallback returnCallback() {return new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {// do something ...}};}
ok,关于这两个Callback
:
ConfirmCallback
:每一条发到rabbitmq server
的消息都会调一次confirm
方法。- 如果消息成功到达
exchange
,则ack
参数为true
,反之为false
; cause
参数是错误信息;CorrelationData
可以理解为context
,在发送消息时传入的这个参数,此时会拿到。
- 如果消息成功到达
ReturnCallback
:成功到达exchange
,但routing
不到任何queue
时会调用。- 可以看到这里能直接拿到
message
,exchange
,routingKey
信息。
- 可以看到这里能直接拿到
这里会遇到一个问题,当ConfirmCallback
被调用,且ack
参数为false
时,意味着这条消息可能发送失败了,那我可能想把这条消息在这里保存下来,比如打条日志,以免消息丢失,但对于ConfirmCallback
,是不能像ReturnCallback
一样直接拿到message
的。
所以,我们需要CorrelationData
这个参数,我们可以把message
塞到这个参数里面。
我们扩展一下CorrelationData
类:
public class MessageCorrelationData extends CorrelationData {//...private Message message;public Message getMessage() {return message;}public void setMessage(Message message) {this.message = message;}
}
现在,我们需要在发送消息时,把message
设置到MessageCorrelationData
中随message
一起发出去。
在上面RabbitTemplate bean
的配置中,可以看到这行代码:
RabbitTemplate template = enableMessageCorrelation ?new CorrelationRabbitTemplate(connectionFactory()) : new RabbitTemplate(connectionFactory());
这个是因为不想改老代码,所以对RabbitTemplate
类做一下扩展:
public class CorrelationRabbitTemplate extends RabbitTemplate {//...@Overridepublic void send(final String exchange, final String routingKey,final Message message, final CorrelationData correlationData)throws AmqpException {super.send(exchange, routingKey, message, correlationData == null ? new MessageCorrelationData(message) : correlationData);}
}
不管调用RabbitTemplate
的哪个方法发送消息,最终都是调用某个send
方法,所以,我们重写这个方法,把MessageCorrelationData
给塞进去。
ok,这样老代码不需要做任何改动,改一下rabbitmq
的配置文件,就能在ConfirmCallback
中拿到发出去但可能发送失败的message
,拿到了message
,那么,为所欲为吧。
2.为什么ConfirmCallback中不能直接拿到message
为了能在ConfirmCallback
中拿到message
,拐了好大一个弯,为什么不直接给我呢?这是一个很自然的问题。
简单追一下源码,看ConfirmCallback
和ReturnCallback
分别是在哪里被调用的。
可以在com.rabbitmq.client.impl.AMQChannel
类中找到这个方法:
public void handleCompleteInboundCommand(AMQCommand command) throws IOException {// First, offer the command to the asynchronous-command// handling mechanism, which gets to act as a filter on the// incoming command stream. If processAsync() returns true,// the command has been dealt with by the filter and so should// not be processed further. It will return true for// asynchronous commands (deliveries/returns/other events),// and false for commands that should be passed on to some// waiting RPC continuation.if (!processAsync(command)) {// The filter decided not to handle/consume the command,// so it must be some reply to an earlier RPC.RpcContinuation nextOutstandingRpc = nextOutstandingRpc();// the outstanding RPC can be null when calling Channel#asyncRpcif(nextOutstandingRpc != null) {nextOutstandingRpc.handleCommand(command);markRpcFinished();}}}
大部分rabbitmq java client
与rabbitmq server
的交互都会走到这里来,除了heartbeat
等一些特殊的交互。
追踪到processAsync(command)
这个方法:
@Override public boolean processAsync(Command command) throws IOException {Method method = command.getMethod();if (method instanceof Channel.Close) {asyncShutdown(command);return true;}if (isOpen()) {if (method instanceof Basic.Deliver) {processDelivery(command, (Basic.Deliver) method);return true;} else if (method instanceof Basic.Return) {callReturnListeners(command, (Basic.Return) method);return true;} else if (method instanceof Channel.Flow) {// ...return true;} else if (method instanceof Basic.Ack) {Basic.Ack ack = (Basic.Ack) method;callConfirmListeners(command, ack);handleAckNack(ack.getDeliveryTag(), ack.getMultiple(), false);return true;} else if (method instanceof Basic.Nack) {Basic.Nack nack = (Basic.Nack) method;callConfirmListeners(command, nack);handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true);return true;} else if (method instanceof Basic.RecoverOk) {// ...return false;} else if (method instanceof Basic.Cancel) {// ...return true;} else {return false;}} else {if (method instanceof Channel.CloseOk) {return false;} else {return true;}}}
command
中包含了method
信息,根据method
的不同,做相应的处理。
可以看到:
ReturnCallback
在收到Basic.Return
时调用;ConfirmCallback
在收到Basic.Ack
或Basic.Nack
时调用,根据debug
,Channel.Close
也会调。
再去官网看一下basic.ack
这些是什么东西。
根据官网描述,个人对callback
过程中client
与server
的交互做如下总结,如有错误,欢迎指正:
client: confirm.select; // 如果收到消息并处理完毕请通知我
server: confirm.select-ok; // 好的,我已经给这个 channel 开通了 confirm 模式,你用这个 channel 发消息给我,我还用这个 channel 通知你
... // 可能还有一些其它对话
client: basic.publish; // 给你发了一条消息,注意查收
server: basic.nack; // 不好意思,我这边处理出了点问题,你重新发一次
client: basic.publish; // 好吧
server: basic.nack; // 你这个 exchange 不存在
... // 可能伴随着 channel shutdown 和 reset
client: basic.publish; // ok,我的错,我已纠正,应该没问题了
server: basic.return; // 抱歉,你这个 exchange 上没这个 routingKey
server: basic.ack; // 虽然 routingKey 没有,但这个 exchange 没问题
client: basic.publish; // ok,我改了
server: basic.ack; // 收到,所有的 queue 都已落盘,没问题
回到代码,找一下,两个callback
的那些参数都是在哪里设置进去的。
对于ReturnCallback
,找到com.rabbitmq.client.impl.ChannelN
类的这个方法:
private void callReturnListeners(Command command, Basic.Return basicReturn) {try {for (ReturnListener l : this.returnListeners) {l.handleReturn(basicReturn.getReplyCode(),basicReturn.getReplyText(),basicReturn.getExchange(),basicReturn.getRoutingKey(),(BasicProperties) command.getContentHeader(),command.getContentBody());}} catch (Throwable ex) {getConnection().getExceptionHandler().handleReturnListenerException(this, ex);}}
可以看到,消息体就是command.getContentBody()
这里拿到的,所以是basic.return
附带的。
对于ConfirmCallback
,找到org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl
类的processAck
方法:
private synchronized void processAck(long seq, boolean ack, boolean multiple, boolean remove) {if (multiple) {// ...}else {Listener listener = this.listenerForSeq.remove(seq);if (listener != null) {SortedMap<Long, PendingConfirm> confirmsForListener = this.pendingConfirms.get(listener);PendingConfirm pendingConfirm;if (remove) {pendingConfirm = confirmsForListener.remove(seq);}else {pendingConfirm = confirmsForListener.get(seq);}if (pendingConfirm != null) {doHandleConfirm(ack, listener, pendingConfirm);}}else {if (this.logger.isDebugEnabled()) {this.logger.debug(this.delegate.toString() + " No listener for seq:" + seq);}}}}
可以看到,basic.ack
给了一个seq
参数,这个参数是server
给确认的时候在delivery-tag
里面带过来的,所以,我们看到server
并没有给CorrelationData
数据,那么CorrelationData
就应该是放在本地内存里的。
进一步查看源码可以知道,这个seq
参数是对当前channel
发送的消息的一个序列号,发送消息的时候,CorrelationData
放在本地与seq
关联,server
给confirm
的时候会给一个delivery-tag
,在这种场景下用来指明这是这个channel
中第几条消息的confirm
,再从本地内存中取出相应的CorrelationData
交给ConfirmCallback
。
这样来看,ConfirmCallback
是有能力直接提供message
信息的,只是java client
没有这么实现。
3.CorrelationData的id与MessageProperties的correlationId
从上面可以知道CorrelationData
其实不依赖它的id
属性来区分,这个id
属性完全可以不设置,实际上我上面的示例中就没有设置,并不影响功能。
那么这个id
是干嘛用的呢?看了下源码,只在org.springframework.amqp.rabbit.AsyncRabbitTemplate
类里面用到了,具体没有深究。
MessageProperties
类里面有一个CorrelationId
属性,这个官网有一个做RPC
的示例。
都是用来做reply
确认的,除此以外,好像没啥关联?
rabbitmq:publisher confirms相关推荐
- RabbitMQ(三)发布确认 Publisher Confirms
代码仓库:github:https://github.com/stopping5/RabbitMq-Operation-Record.git 本代码示例需要引入rabbitmq依赖 <!-- r ...
- 官方 RabbitMQ 教程 - 7 Publisher Confirms
文章目录 发布者确认 概述 在一个通道上开启发布者确认 策略1:单个的发布消息 策略2:批量的发布消息 策略3:异步确认发布消息 综述 把它们放一起 官方文档地址: 7 Publisher Confi ...
- rabbit mq Consumer Acknowledgements and Publisher Confirms 翻译
原文地址 Consumer Acknowledgements and Publisher Confirms 消费者消息的Ack机制和生产者发布消息的确认机制 Overview This guide c ...
- Consumer Acknowledgements and Publisher Confirms
点击打开链接 Acknowledging Multiple Deliveries at Once Manual acknowledgements can be batched to reduce ne ...
- RabbitMQ:The channelMax limit is reached. Try later.
RabbitMQ:The channelMax limit is reached. Try later. 这个问题是我当初写项目时遇到的,因为用RabbitMQ做削峰处理,高并发情况下,chann ...
- RabbitMQ:惰性队列
RabbitMQ [第一章]RabbitMQ:从认识MQ到安装,学习消息模型等 [第二章]RabbitMQ:常见消息模型 [第三章]RabbitMQ:生产者消息确认.消息持久化.消费者消息确认.消费失 ...
- docker RabbitMQ:修改Channel limit
RabbitMQ:The channelMax limit is reached. Try later. rabbitmq 默认 最大链接默认是2047访问量过大会导致数据丢失 复制 docker 容 ...
- RabbitMQ:死信队列
✨ RabbitMQ:死信队列 1.死信队列 1.1死信队列基本介绍 1.2消息成为死信的三种情况 1.3死信队列结构图 1.4死信的处理方式 2.TTL消息过期时间 2.1基本介绍 2.2生产者 2 ...
- RabbitMQ:订阅模型-消息订阅模式
订阅模型-消息订阅模式,也可以称为广播模式,生产者将消息发送到 Exchange,Exchange 再转发到与之绑定的 Queue中,每个消费者再到自己的 Queue 中取消息. RabbitMQ 单 ...
- 别人家的团队怎么用RabbitMQ:我总结的5点规范
大概从 2013 年开始,我就开始了自己和 RabbitMQ 的接触,到现在已经有七年多了. 在这七年中,既有一些对 RabbitMQ 的深度体验,更有无数的血泪史. 而根据我这么多年的使用经验,我将 ...
最新文章
- jsTree 插件Ajax数据
- PHP获取 当前页面名称、主机名、URL完整地址、URL参数、获取IP
- webParts与Web部件
- Python外(1)--try-expect
- python中的reduce函数用法
- AttributeError: module 'pip' has no attribute 'main'
- webbrowser控件 加载为空白_OpenLayers教程五:地图控件之坐标拾取控件和鹰眼控件...
- SetWindowLong 和SetClassLong区别
- 163 镜像源 linux,网易163的Ubuntu apt镜像源
- 漫谈iOS程序的证书和签名机制
- ElasticSearch8.1.2 ik分词器
- raytrace 算法理论与实践
- 百度竞价账户能否多少词?
- 用JavaScript实现烟花效果
- 股市投资必修课二十三--增长的导向
- IOS端 vux中scroll滚动自动回弹到顶部或者左侧的解决办法
- 【解决】Failure to find com.xxx:xxx-target:pom:1.0-SNAPSHOT in https://xxxx/snapshot was cached in the
- 计算机兼容,兼容条件
- STM32笔记 (七)中断系统与NVIC嵌套向量中断控制器
- Win10电脑如何进行内存诊断?教程来了