0.背景

最近在我们的业务系统中遇到一个问题,

publisher行为:convertAndSend然后打日志。

consumer行为:@RabbitListener接到消息立刻打日志。

问题是,publisher打出了发送消息的日志,consumer没打出收到消息的日志。

基于这种情况,准备启用rabbitmq java clientReturnCallbackConfirmCallback机制,先确认消息是否成功发到了正确的queue里面。

之前没有用Callback,因为对于我们的场景,Rabbitmq还是非常稳定的,即使极少出现的异常情况,我们也有办法把丢掉的消息补发,因此没必要浪费Channel资源去让rabbitmq server给发送确认信息,也不想平白增加系统复杂性。

1.代码实现

一般我们使用rabbitmq可能会配置下面几个bean(不论通过何种方式,xml@Configuretion,或者spring bootautoconfigure),在此基础上,添加一些属性设置:

@Configuration
public class MqConfig {@Value("${rabbitmq.enableConfirm}")private boolean enableConfirm;@Value("${rabbitmq.enableReturn}")private boolean enableReturn;@Value("${rabbitmq.enableMessageCorrelation}")private boolean enableMessageCorrelation;@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();//省略其它属性设置...//根据配置决定是否开启 Confirm 机制connectionFactory.setPublisherConfirms(enableConfirm);//根据配置决定是否开启 Return 机制connectionFactory.setPublisherReturns(enableReturn);return connectionFactory;}@Beanpublic RabbitTemplate rabbitTemplate() throws Exception {//根据配置决定使用哪种 RabbitTemplateRabbitTemplate template = enableMessageCorrelation ?new CorrelationRabbitTemplate(connectionFactory()) : new RabbitTemplate(connectionFactory());//省略其它属性设置...//如果启用 Confirm 机制,设置 ConfirmCallbackif (enableConfirm) {template.setConfirmCallback(confirmCallback());}//如果启用 Return 机制,设置 ReturnCallback,及打开 Mandatoryif (enableReturn) {template.setReturnCallback(returnCallback());template.setMandatory(true);}return template;}
}

对于Publisher而言,以上两个bean足以。

下面是 RabbitTemplate中需要的ConfirmCallbackReturnCallback

    @Bean@ConditionalOnMissingBean(value = RabbitTemplate.ConfirmCallback.class)public RabbitTemplate.ConfirmCallback confirmCallback() {return new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {// do something ...};}@Bean@ConditionalOnMissingBean(value = RabbitTemplate.ReturnCallback.class)public RabbitTemplate.ReturnCallback returnCallback() {return new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {// do something ...}};}

ok,关于这两个Callback

  • ConfirmCallback:每一条发到rabbitmq server的消息都会调一次confirm方法。

    • 如果消息成功到达exchange,则ack参数为true,反之为false
    • cause参数是错误信息;
    • CorrelationData可以理解为context,在发送消息时传入的这个参数,此时会拿到。
  • ReturnCallback:成功到达exchange,但routing不到任何queue时会调用。
    • 可以看到这里能直接拿到messageexchangeroutingKey信息。

这里会遇到一个问题,当ConfirmCallback被调用,且ack参数为false时,意味着这条消息可能发送失败了,那我可能想把这条消息在这里保存下来,比如打条日志,以免消息丢失,但对于ConfirmCallback,是不能像ReturnCallback一样直接拿到message的。

所以,我们需要CorrelationData这个参数,我们可以把message塞到这个参数里面。

我们扩展一下CorrelationData类:

public class MessageCorrelationData extends CorrelationData {//...private Message message;public Message getMessage() {return message;}public void setMessage(Message message) {this.message = message;}
}

现在,我们需要在发送消息时,把message设置到MessageCorrelationData中随message一起发出去。

在上面RabbitTemplate bean的配置中,可以看到这行代码:

RabbitTemplate template = enableMessageCorrelation ?new CorrelationRabbitTemplate(connectionFactory()) : new RabbitTemplate(connectionFactory());

这个是因为不想改老代码,所以对RabbitTemplate类做一下扩展:

public class CorrelationRabbitTemplate extends RabbitTemplate {//...@Overridepublic void send(final String exchange, final String routingKey,final Message message, final CorrelationData correlationData)throws AmqpException {super.send(exchange, routingKey, message, correlationData == null ? new MessageCorrelationData(message) : correlationData);}
}

不管调用RabbitTemplate的哪个方法发送消息,最终都是调用某个send方法,所以,我们重写这个方法,把MessageCorrelationData给塞进去。

ok,这样老代码不需要做任何改动,改一下rabbitmq的配置文件,就能在ConfirmCallback中拿到发出去但可能发送失败的message,拿到了message,那么,为所欲为吧。

2.为什么ConfirmCallback中不能直接拿到message

为了能在ConfirmCallback中拿到message,拐了好大一个弯,为什么不直接给我呢?这是一个很自然的问题。

简单追一下源码,看ConfirmCallbackReturnCallback分别是在哪里被调用的。

可以在com.rabbitmq.client.impl.AMQChannel类中找到这个方法:

public void handleCompleteInboundCommand(AMQCommand command) throws IOException {// First, offer the command to the asynchronous-command// handling mechanism, which gets to act as a filter on the// incoming command stream.  If processAsync() returns true,// the command has been dealt with by the filter and so should// not be processed further.  It will return true for// asynchronous commands (deliveries/returns/other events),// and false for commands that should be passed on to some// waiting RPC continuation.if (!processAsync(command)) {// The filter decided not to handle/consume the command,// so it must be some reply to an earlier RPC.RpcContinuation nextOutstandingRpc = nextOutstandingRpc();// the outstanding RPC can be null when calling Channel#asyncRpcif(nextOutstandingRpc != null) {nextOutstandingRpc.handleCommand(command);markRpcFinished();}}}

大部分rabbitmq java clientrabbitmq server的交互都会走到这里来,除了heartbeat等一些特殊的交互。

追踪到processAsync(command)这个方法:

@Override public boolean processAsync(Command command) throws IOException {Method method = command.getMethod();if (method instanceof Channel.Close) {asyncShutdown(command);return true;}if (isOpen()) {if (method instanceof Basic.Deliver) {processDelivery(command, (Basic.Deliver) method);return true;} else if (method instanceof Basic.Return) {callReturnListeners(command, (Basic.Return) method);return true;} else if (method instanceof Channel.Flow) {// ...return true;} else if (method instanceof Basic.Ack) {Basic.Ack ack = (Basic.Ack) method;callConfirmListeners(command, ack);handleAckNack(ack.getDeliveryTag(), ack.getMultiple(), false);return true;} else if (method instanceof Basic.Nack) {Basic.Nack nack = (Basic.Nack) method;callConfirmListeners(command, nack);handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true);return true;} else if (method instanceof Basic.RecoverOk) {// ...return false;} else if (method instanceof Basic.Cancel) {// ...return true;} else {return false;}} else {if (method instanceof Channel.CloseOk) {return false;} else {return true;}}}

command中包含了method信息,根据method的不同,做相应的处理。

可以看到:

  • ReturnCallback在收到Basic.Return时调用;
  • ConfirmCallback在收到Basic.AckBasic.Nack时调用,根据debugChannel.Close也会调。

再去官网看一下basic.ack这些是什么东西。

根据官网描述,个人对callback过程中clientserver的交互做如下总结,如有错误,欢迎指正:

client: confirm.select; // 如果收到消息并处理完毕请通知我
server: confirm.select-ok; // 好的,我已经给这个 channel 开通了 confirm 模式,你用这个 channel 发消息给我,我还用这个 channel 通知你
... // 可能还有一些其它对话
client: basic.publish; // 给你发了一条消息,注意查收
server: basic.nack; // 不好意思,我这边处理出了点问题,你重新发一次
client: basic.publish; // 好吧
server: basic.nack; // 你这个 exchange 不存在
... // 可能伴随着 channel shutdown 和 reset
client: basic.publish; // ok,我的错,我已纠正,应该没问题了
server: basic.return; // 抱歉,你这个 exchange 上没这个 routingKey
server: basic.ack; // 虽然 routingKey 没有,但这个 exchange 没问题
client: basic.publish; // ok,我改了
server: basic.ack; // 收到,所有的 queue 都已落盘,没问题

回到代码,找一下,两个callback的那些参数都是在哪里设置进去的。

对于ReturnCallback,找到com.rabbitmq.client.impl.ChannelN类的这个方法:

private void callReturnListeners(Command command, Basic.Return basicReturn) {try {for (ReturnListener l : this.returnListeners) {l.handleReturn(basicReturn.getReplyCode(),basicReturn.getReplyText(),basicReturn.getExchange(),basicReturn.getRoutingKey(),(BasicProperties) command.getContentHeader(),command.getContentBody());}} catch (Throwable ex) {getConnection().getExceptionHandler().handleReturnListenerException(this, ex);}}

可以看到,消息体就是command.getContentBody()这里拿到的,所以是basic.return附带的。

对于ConfirmCallback,找到org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl类的processAck方法:

private synchronized void processAck(long seq, boolean ack, boolean multiple, boolean remove) {if (multiple) {// ...}else {Listener listener = this.listenerForSeq.remove(seq);if (listener != null) {SortedMap<Long, PendingConfirm> confirmsForListener = this.pendingConfirms.get(listener);PendingConfirm pendingConfirm;if (remove) {pendingConfirm = confirmsForListener.remove(seq);}else {pendingConfirm = confirmsForListener.get(seq);}if (pendingConfirm != null) {doHandleConfirm(ack, listener, pendingConfirm);}}else {if (this.logger.isDebugEnabled()) {this.logger.debug(this.delegate.toString() + " No listener for seq:" + seq);}}}}

可以看到,basic.ack给了一个seq参数,这个参数是server给确认的时候在delivery-tag里面带过来的,所以,我们看到server并没有给CorrelationData数据,那么CorrelationData就应该是放在本地内存里的。

进一步查看源码可以知道,这个seq参数是对当前channel发送的消息的一个序列号,发送消息的时候,CorrelationData放在本地与seq关联,serverconfirm的时候会给一个delivery-tag,在这种场景下用来指明这是这个channel中第几条消息的confirm,再从本地内存中取出相应的CorrelationData交给ConfirmCallback

这样来看,ConfirmCallback是有能力直接提供message信息的,只是java client没有这么实现。

3.CorrelationData的id与MessageProperties的correlationId

从上面可以知道CorrelationData其实不依赖它的id属性来区分,这个id属性完全可以不设置,实际上我上面的示例中就没有设置,并不影响功能。

那么这个id是干嘛用的呢?看了下源码,只在org.springframework.amqp.rabbit.AsyncRabbitTemplate类里面用到了,具体没有深究。

MessageProperties类里面有一个CorrelationId属性,这个官网有一个做RPC的示例。

都是用来做reply确认的,除此以外,好像没啥关联?

rabbitmq:publisher confirms相关推荐

  1. RabbitMQ(三)发布确认 Publisher Confirms

    代码仓库:github:https://github.com/stopping5/RabbitMq-Operation-Record.git 本代码示例需要引入rabbitmq依赖 <!-- r ...

  2. 官方 RabbitMQ 教程 - 7 Publisher Confirms

    文章目录 发布者确认 概述 在一个通道上开启发布者确认 策略1:单个的发布消息 策略2:批量的发布消息 策略3:异步确认发布消息 综述 把它们放一起 官方文档地址: 7 Publisher Confi ...

  3. rabbit mq Consumer Acknowledgements and Publisher Confirms 翻译

    原文地址 Consumer Acknowledgements and Publisher Confirms 消费者消息的Ack机制和生产者发布消息的确认机制 Overview This guide c ...

  4. Consumer Acknowledgements and Publisher Confirms

    点击打开链接 Acknowledging Multiple Deliveries at Once Manual acknowledgements can be batched to reduce ne ...

  5. RabbitMQ:The channelMax limit is reached. Try later.

    RabbitMQ:The channelMax limit is reached. Try later. ​ 这个问题是我当初写项目时遇到的,因为用RabbitMQ做削峰处理,高并发情况下,chann ...

  6. RabbitMQ:惰性队列

    RabbitMQ [第一章]RabbitMQ:从认识MQ到安装,学习消息模型等 [第二章]RabbitMQ:常见消息模型 [第三章]RabbitMQ:生产者消息确认.消息持久化.消费者消息确认.消费失 ...

  7. docker RabbitMQ:修改Channel limit

    RabbitMQ:The channelMax limit is reached. Try later. rabbitmq 默认 最大链接默认是2047访问量过大会导致数据丢失 复制 docker 容 ...

  8. RabbitMQ:死信队列

    ✨ RabbitMQ:死信队列 1.死信队列 1.1死信队列基本介绍 1.2消息成为死信的三种情况 1.3死信队列结构图 1.4死信的处理方式 2.TTL消息过期时间 2.1基本介绍 2.2生产者 2 ...

  9. RabbitMQ:订阅模型-消息订阅模式

    订阅模型-消息订阅模式,也可以称为广播模式,生产者将消息发送到 Exchange,Exchange 再转发到与之绑定的 Queue中,每个消费者再到自己的 Queue 中取消息. RabbitMQ 单 ...

  10. 别人家的团队怎么用RabbitMQ:我总结的5点规范

    大概从 2013 年开始,我就开始了自己和 RabbitMQ 的接触,到现在已经有七年多了. 在这七年中,既有一些对 RabbitMQ 的深度体验,更有无数的血泪史. 而根据我这么多年的使用经验,我将 ...

最新文章

  1. jsTree 插件Ajax数据
  2. PHP获取 当前页面名称、主机名、URL完整地址、URL参数、获取IP
  3. webParts与Web部件
  4. Python外(1)--try-expect
  5. python中的reduce函数用法
  6. AttributeError: module 'pip' has no attribute 'main'
  7. webbrowser控件 加载为空白_OpenLayers教程五:地图控件之坐标拾取控件和鹰眼控件...
  8. SetWindowLong 和SetClassLong区别
  9. 163 镜像源 linux,网易163的Ubuntu apt镜像源
  10. 漫谈iOS程序的证书和签名机制
  11. ElasticSearch8.1.2 ik分词器
  12. raytrace 算法理论与实践
  13. 百度竞价账户能否多少词?
  14. 用JavaScript实现烟花效果
  15. 股市投资必修课二十三--增长的导向
  16. IOS端 vux中scroll滚动自动回弹到顶部或者左侧的解决办法
  17. 【解决】Failure to find com.xxx:xxx-target:pom:1.0-SNAPSHOT in https://xxxx/snapshot was cached in the
  18. 计算机兼容,兼容条件
  19. STM32笔记 (七)中断系统与NVIC嵌套向量中断控制器
  20. Win10电脑如何进行内存诊断?教程来了

热门文章

  1. 上传多张图片到oss服务器
  2. 行人属性识别 PETA数据集
  3. 目标框选之单阶段与两阶段目标检测区别
  4. 专利与论文-1:为什么要写专利?专利有什么好处?
  5. 2020 春节集五福最详细收集攻略
  6. android下拉水波纹,RecyclerView实现水波纹点击效果
  7. 不同内核浏览器的差异以及浏览器渲染简介(转)
  8. C语言怎么把int类型转为char,c++ 如何把一个int转为char*
  9. java界面添加动态背景图片
  10. FPGA之旅设计99例之第九例-----驱动0.96寸OLED屏