默认情况下如果一个 Message 被消费者所正确接收则会被从 Queue 中移除

如果一个 Queue 没被任何消费者订阅,那么这个 Queue 中的消息会被 Cache(缓存),当有消费者订阅时则会立即发送,当 Message 被消费者正确接收时,就会被从 Queue 中移除

消息发送确认

发送的消息怎么样才算失败或成功?如何确认?

  • 当消息无法路由到队列时,确认消息路由失败。消息成功路由时,当需要发送的队列都发送成功后,进行确认消息,对于持久化队列意味着写入磁盘,对于镜像队列意味着所有镜像接收成功

ConfirmCallback

  • 通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中

@Componentpublic class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback{    @Autowiredprivate RabbitTemplate rabbitTemplate;    @PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);            //指定 ConfirmCallback}    @Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("消息唯一标识:"+correlationData);System.out.println("确认结果:"+ack);System.out.println("失败原因:"+cause);}
  • 还需要在配置文件添加配置

spring:rabbitmq:publisher-confirms: true

ReturnCallback

  • 通过实现 ReturnCallback 接口,启动消息失败返回,比如路由不到队列时触发回调

@Componentpublic class RabbitTemplateConfig implements RabbitTemplate.ReturnCallback{    @Autowiredprivate RabbitTemplate rabbitTemplate;    @PostConstructpublic void init(){rabbitTemplate.setReturnCallback(this);             //指定 ReturnCallback}    @Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("消息主体 message : "+message);System.out.println("消息主体 message : "+replyCode);System.out.println("描述:"+replyText);System.out.println("消息使用的交换器 exchange : "+exchange);System.out.println("消息使用的路由键 routing : "+routingKey);}
}
  • 还需要在配置文件添加配置

spring:rabbitmq:publisher-returns: true

消息接收确认

消息消费者如何通知 Rabbit 消息消费成功?

  • 消息通过 ACK 确认是否被正确接收,每个 Message 都要被确认(acknowledged),可以手动去 ACK 或自动 ACK

  • 自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息

  • 如果消息已经被处理,但后续代码抛出异常,使用 Spring 进行管理的话消费端业务逻辑会进行回滚,这也同样造成了实际意义的消息丢失

  • 如果手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者

  • 如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限

  • ACK 机制还可以起到限流作用,比如在接收到某条消息时休眠几秒钟

  • 消息确认模式有:

    • AcknowledgeMode.NONE:自动确认

    • AcknowledgeMode.AUTO:根据情况确认

    • AcknowledgeMode.MANUAL:手动确认

确认消息(局部方法处理消息)

  • 默认情况下消息消费者是自动 ack (确认)消息的,如果要手动 ack(确认)则需要修改确认模式为 manual

spring:rabbitmq:listener:simple:acknowledge-mode: manual
  • 或在 RabbitListenerContainerFactory 中进行开启手动 ack

@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);             //开启手动 ackreturn factory;
}
  • 确认消息

@RabbitHandlerpublic void processMessage2(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) {System.out.println(message);    try {channel.basicAck(tag,false);            // 确认消息} catch (IOException e) {e.printStackTrace();}
}
  • 需要注意的 basicAck 方法需要传递两个参数

    • deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel

    • multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息

手动否认、拒绝消息

  • 发送一个 header 中包含 error 属性的消息

hducA.png

  • 消费者获取消息时检查到头部包含 error 则 nack 消息

@RabbitHandlerpublic void processMessage2(String message, Channel channel,@Headers Map<String,Object> map) {System.out.println(message);    if (map.get("error")!= null){System.out.println("错误的消息");        try {channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true);      //否认消息return;} catch (IOException e) {e.printStackTrace();}}    try {channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);            //确认消息} catch (IOException e) {e.printStackTrace();}
}
  • 此时控制台重复打印,说明该消息被 nack 后一直重新入队列然后一直重新消费

hello
错误的消息
hello
错误的消息
hello
错误的消息
hello
错误的消息
  • 也可以拒绝该消息,消息会被丢弃,不会重回队列

channel.basicReject((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);        //拒绝消息

确认消息(全局处理消息)

  • 自动确认涉及到一个问题就是如果在处理消息的时候抛出异常,消息处理失败,但是因为自动确认而导致 Rabbit 将该消息删除了,造成消息丢失

@Beanpublic SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("consumer_queue");                 // 监听的队列container.setAcknowledgeMode(AcknowledgeMode.NONE);     // NONE 代表自动确认container.setMessageListener((MessageListener) message -> {         //消息监听处理System.out.println("====接收到消息=====");System.out.println(new String(message.getBody()));        //相当于自己的一些消费逻辑抛错误throw new NullPointerException("consumer fail");});    return container;
}
  • 手动确认消息

@Beanpublic SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("consumer_queue");              // 监听的队列container.setAcknowledgeMode(AcknowledgeMode.MANUAL);        // 手动确认container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {      //消息处理System.out.println("====接收到消息=====");System.out.println(new String(message.getBody()));        if(message.getMessageProperties().getHeaders().get("error") == null){channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);System.out.println("消息已经确认");}else {            //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);System.out.println("消息拒绝");}});    return container;
}
  • AcknowledgeMode 除了 NONE 和 MANUAL 之外还有 AUTO ,它会根据方法的执行情况来决定是否确认还是拒绝(是否重新入queue)

    • 如果消息成功被消费(成功的意思是在消费的过程中没有抛出异常),则自动确认

    • 当抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且 requeue = false(不重新入队列)

    • 当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会被确认

    • 其他的异常,则消息会被拒绝,且 requeue = true(如果此时只有一个消费者监听该队列,则有发生死循环的风险,多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免的)。可以通过 setDefaultRequeueRejected(默认是true)去设置

@Beanpublic SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("consumer_queue");              // 监听的队列container.setAcknowledgeMode(AcknowledgeMode.AUTO);     // 根据情况确认消息container.setMessageListener((MessageListener) (message) -> {System.out.println("====接收到消息=====");System.out.println(new String(message.getBody()));        //抛出NullPointerException异常则重新入队列//throw new NullPointerException("消息消费失败");//当抛出的异常是AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue=false//throw new AmqpRejectAndDontRequeueException("消息消费失败");//当抛出ImmediateAcknowledgeAmqpException异常,则消费者会被确认throw new ImmediateAcknowledgeAmqpException("消息消费失败");});    return container;
}

消息可靠总结

  • 持久化

    • exchange要持久化

    • queue要持久化

    • message要持久化

  • 消息确认

    • 启动消费返回(@ReturnList注解,生产者就可以知道哪些消息没有发出去)

    • 生产者和Server(broker)之间的消息确认

    • 消费者和Server(broker)之间的消息确认

RabbitMQ:消息发送确认 与 消息接收确认(ACK)相关推荐

  1. rabbitmq消息队列,消息发送失败,消息持久化,消费者处理失败相关

    转:https://blog.csdn.net/u014373554/article/details/92686063 项目是使用springboot项目开发的,前是代码实现,后面有分析发送消息失败. ...

  2. im中的定位消息 ios_iOS消息发送失败和消息进度回调中msgId没有解包

    /// 重新发送消息 /// [msgId] 原消息的ID /// [receiver] 消息接收者的 userID, 如果是发送 C2C 单聊消息,只需要指定 receiver 即可. /// [g ...

  3. KAFKA 集成 SpringBoot2 消息发送和消费消息(基础篇)

    文章目录 1. 技术选型 2. 导入依赖 3. kafka配置 4. 生产者(同步) 5. 生产者(异步) 6. 消费者 1. 技术选型 软件/框架 版本 jdk 1.8.0_202 springbo ...

  4. rabbitmq 启动异常_RabbitMQ:消息发送确认 与 消息接收确认(ACK)

    默认情况下如果一个 Message 被消费者所正确接收则会被从 Queue 中移除 如果一个 Queue 没被任何消费者订阅,那么这个 Queue 中的消息会被 Cache(缓存),当有消费者订阅时则 ...

  5. RabbitMQ——使用Exchange中的fanout交换机实现消息发送和接收

    文章目录: 1.写在前面 2.使用fanout交换机实现消息的发送和接收 2.1 编写消息接收类(有两个) 2.2 编写消息发送类 1.写在前面 所有 MQ 产品从模型抽象上来说都是一样的过程: 消费 ...

  6. 整合rabbitmq+redis发送验证码消息

    文章目录 配置: 验证码工具类 处理发送验证码的消息 controller层 验证验证码是否正确 配置: server:port: 8084 #邮件相关配置 spring:application:na ...

  7. 四种策略确保 RabbitMQ 消息发送可靠性!你用哪种?

    微服务可以设计成消息驱动的微服务,响应式系统也可以基于消息中间件来做,从这个角度来说,在互联网应用开发中,消息中间件真的是太重要了. 今天,以 RabbitMQ 为例,松哥来和大家聊一聊消息中间消息发 ...

  8. akka 消息发送接收_Akka型演员:探索接收器模式

    akka 消息发送接收 在上一篇文章中,我们研究了Akka Typed提供的一些基本功能. 在本文和下一篇文章中,我们将更进一步地了解一些其他功能,并通过查看Akka Typed提供的两种不同模式来做 ...

  9. RabbitMQ 入门到应用 ( 六 ) 消息可靠性

    7.RabbitMQ可靠性投递 为了保证信息不丢失, 可靠抵达,引入确认机制 消息从生产者传递到消费者的过程中, 不同的阶段使用不同的确认方式. 7.0.准备请求 一次性发送10 个消息 通过 new ...

最新文章

  1. 调用webservice时提示对操作的回复消息正文进行反序列化时出错
  2. c语言 #define dpath .exe是什么意思,C语言宏定义#define
  3. Elasticsearch 集群中增加专用master节点
  4. 如果是你你会如何重新设计和定义维基百科(wikipedia)?
  5. pygame做的著名游戏_用Python和Pygame写游戏-从入门到放弃(1)
  6. spring如何下载源码和jar包
  7. 安卓网络连接全解:包括网络连接状态的监听、网络数据使用状态的监听、获取当前网络连接情况、启动wifi、获取当前连接wifi的网络情况、扫描wifi热点
  8. linux中查看网卡型号的命令
  9. LDN蓝牙双模键盘驱动和固件更新日志
  10. Python系列之Python-docx生成运行日报Word模板
  11. WebStorm 好用的插件推荐
  12. FGSM论文阅读笔记
  13. vuecli-脚手架,安装使用及目录详解
  14. 数字调制解调技术的MATLAB与FPGA实现(关盘资料源码)
  15. app installation failed 的问题的解决过程
  16. 【Python爬虫实战】使用Selenium爬取QQ音乐歌曲及评论信息
  17. ROS配置和使用“北通”无线手柄,主从机远程控制JP-Chassis底盘
  18. React + Redux + Express + Mongodb 零基础开发完整大型商城网站视频教程(97 个视频)
  19. 微服务注册中心怎么选?
  20. php培训月度总结,PHP常量,数组和分支语句小结--PHP培训十期线上班

热门文章

  1. ios项目中使用gcd的场景_iOS中超级超级详细介绍GCD
  2. java自行车火多重,多重继承的演变
  3. JAVA开发面试常问问题总结1
  4. Linux能ping主机,但ping不了网关以及外网,显示包全丢失解决方案
  5. ROS入门-8.发布者Publisher的编程实现
  6. 深度学习——卷积神经网络CNN
  7. 用JavaScript刷leetcode(刷题 第一天)
  8. matplotlib的学习
  9. Vue和MVVM的对应关系
  10. 【译】在设计表单的时候应该注意的八点