文章目录

      • 基础案例环境搭建:
    • 环境:
    • 1. 生产者发送消息确认
      • 1.1 confirm 确认模式
      • 1.2 return 退回模式
        • 源代码
      • 1.1.3 小结
    • 2. 消费者签收消息(ACK)
      • 2.1 代码实现
        • 源代码
  • 总结

基础案例环境搭建:

基础环境搭建(手把手教你环境搭建和五种工作模式):https://blog.csdn.net/m0_48325361/article/details/123174843?spm=1001.2014.3001.5502

环境:

队列:

交换机:

交换机和队列进行绑定:

1. 生产者发送消息确认

如果保证消息的可靠性?需要解决如下问题

问题1:生产者能百分之百将消息发送给消息队列!

  • 两种意外情况:

    • 第一,消费者发送消息给MQ失败,消息丢失;
    • 第二,交换机路由到队列失败,路由键写错;

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

  • confirm 确认模式

  • return 退回模式

rabbitmq 整个消息投递的路径为:

  • 消息从生产者(producer)发送消息到交换机(exchange),不论是否成功,都会执行一个确认回调方法confirmCallback 。

  • 消息从交换机(exchange)到消息队列( queue )投递失败则会执行一个返回回调方法 returnCallback 。

我们将利用这两个 callback 控制消息的可靠性投递

1.1 confirm 确认模式

目标:演示消息确认模式效果

生产者发布消息确认模式特点,不论消息是否进入交换机均执行回调方法

实现过程:

  1. 生产者配置文件中,开启生产者发布消息确认模式

    # 开启生产者确认模式:(confirm),投递到交换机,不论失败或者成功都回调
    spring.rabbitmq.publisher-confirm-type=correlated
    # 开启return退回模式
    spring.rabbitmq.publisher-returns=true
    
  2. 编写生产者确认回调方法

    //发送消息回调确认类,实现回调接口ConfirmCallback,重写其中confirm()方法
    @Component
    public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback {/*** 投递到交换机,不论投递成功还是失败都回调次方法* @param correlationData 投递相关数据* @param ack 是否投递到交换机* @param cause 投递失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack){System.out.println("消息进入交换机成功{}");} else {System.out.println("消息进入交换机失败{} , 失败原因:" + cause);}}
    }
    
  3. 在RabbitTemplate中,设置消息发布确认回调方法

    @Component
    public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 创建RabbitTemplate对象之后执行当前方法,为模板对象设置回调确认方法* 设置消息确认回调方法* 设置消息回退回调方法*/@PostConstructpublic void initRabbitTemplate(){//设置消息确认回调方法rabbitTemplate.setConfirmCallback(this::confirm);}/*** 投递到交换机,不论投递成功还是失败都回调次方法* @param correlationData 投递相关数据* @param ack 是否投递到交换机* @param cause 投递失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack){System.out.println("消息进入交换机成功{}");} else {System.out.println("消息进入交换机失败{} , 失败原因:" + cause);}}
    }
    
  4. 测试:
    成功:

    失败:

1.2 return 退回模式

消息回退模式特点:消息进入交换机,路由到队列过程中出现异常则执行回调方法
实现ReturnCallback接口

实现过程:

  1. 在配置文件中,开启生产者发布消息回退模式

    # 开启生产者回退模式:(returns),交换机将消息路由到队列,出现异常则回调
    spring.rabbitmq.publisher-returns=true
    
  2. 在MessageConfirmCallback类中,实现接口RabbitTemplate.ReturnCallback

    @Component
    public class RabbitConfirm implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {//..省略
    }
    
  3. 并重写RabbitTemplate.ReturnCallback接口中returnedMessage()方法

    /*** 当消息投递到交换机,交换机路由到消息队列中出现异常,执行returnedMessaged方法* @param message 投递消息内容* @param replyCode 返回错误状态码* @param replyText 返回错误内容* @param exchange 交换机名称* @param routingKey 路由键*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("交换机路由至消息队列出错:>>>>>>>");System.out.println("交换机:"+exchange);System.out.println("路由键:"+routingKey);System.out.println("错误状态码:"+replyCode);System.out.println("错误原因:"+replyText);System.out.println("发送消息内容:"+message.toString());System.out.println("<<<<<<<<");}
    
  4. 在RabbitTemplate中,设置消息发布回退回调方法

    @PostConstruct
    public void initRabbitTemplate(){//设置消息确认回调方法rabbitTemplate.setConfirmCallback(this::confirm);//设置消息回退回调方法rabbitTemplate.setReturnCallback(this::returnedMessage);
    }
    

测试:

成功:失败:

源代码

@Component
public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 创建RabbitTemplate对象之后执行当前方法,为模板对象设置回调确认方法* 设置消息确认回调方法* 设置消息回退回调方法*/@PostConstructpublic void initRabbitTemplate() {//设置消息确认回调方法rabbitTemplate.setConfirmCallback(this::confirm);//设置消息退回方法rabbitTemplate.setReturnsCallback(this::returnedMessage);}/*** 投递到交换机,不论投递成功还是失败都回调次方法** @param correlationData 投递相关数据* @param ack             是否投递到交换机* @param cause           投递失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {System.out.println("消息进入交换机成功{}");} else {System.out.println("消息进入交换机失败{} , 失败原因:" + cause);}}/*** 当消息投递到交换机,交换机路由到消息队列中出现异常,执行returnedMessage方法*/@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("交换机路由至消息队列出错:>>>>>>>");System.out.println("错误原因:" + returnedMessage.getReplyText());System.out.println("发送消息内容:" + returnedMessage.getMessage());System.out.println("错误状态码:" + returnedMessage.getReplyCode());System.out.println("路由键:" + returnedMessage.getRoutingKey());System.out.println("交换机:" + returnedMessage.getExchange());}
}

1.1.3 小结

确认模式:

  • 设置publisher-confirms=“true” 开启 确认模式。
  • 实现RabbitTemplate.ConfirmCallback接口,重写confirm方法
  • 特点:不论消息是否成功投递至交换机,都回调confirm方法,只有在发送失败时需要写业务代码进行处理。

退回模式

  • 设置publisher-returns=“true” 开启 退回模式。
  • 实现RabbitTemplate.ReturnCallback接口,重写returnedMessage方法
  • 特点:消息进入交换机后,只有当从exchange路由到queue失败,才触发回调returnedMessage方法;

2. 消费者签收消息(ACK)

问题2:如何保证消费者能百分百接收到请求,且业务执行过程中还不能出错!

ack指 Acknowledge,拥有确认的含义,是消费端收到消息的一种确认机制;

消息确认的三种类型:

  • 自动确认:acknowledge=“none

  • 手动确认:acknowledge=“manual

  • 根据异常情况确认:acknowledge=“auto”,(这种方式使用麻烦,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。

如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

2.1 代码实现

目标:演示消费者手动确认效果

  • 1.在消费者配置文件中开启ack机制
  • 2.在@RabbitListener消费者监听器方法中加入Message和Channel参数

实现过程:

消费者工程中,创建自定义监听器类CustomAckConsumerListener,实现ChannelAwareMessageListener接口

#rabbitmq启动ack机制,手动确认
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

修改消费者监听器方法

测试成功案例:
发送请求:

消费者控制台打印:

测试失败案例:
修改topic_queue1队列的业务逻辑,让其抛出异常

在可视化界面也可以看到消息一直在队列中

  • 如果想手动清楚队列的消息
    点击队列

源代码

    @RabbitListener(queues = "topic_queue2")public void topic2Ack(String msg, Channel channel, Message message) throws IOException {System.out.println("=====routingInfo====>" + msg);/*** 手动拒绝签收* 参数1:当前消息的投递标签* 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息*/channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}//ack方式二@RabbitListener(queues = "topic_queue1")public void routingAck(String msg, Channel channel, Message message) throws Exception {System.out.println("=====routingAck====>" + msg);try {int i = 1 / 0;channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);System.out.println("receiver success");} catch (Exception e) {System.out.println("业务逻辑产生异常" + e.getMessage());/*** 手动拒绝签收* 参数1:当前消息的投递标签* 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息* 参数3:是否重回队列,true为重回队列,false为不重回*///消息重回队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);System.out.println("receiver fail");}}

总结

后续还会更新TTL,死信队列,延迟队列等内容

RabbitMQ:消费者ACK机制、生产者消息确认相关推荐

  1. RabbitMQ 实战(四)消费者 ack 以及 生产者 confirms

    2019独角兽企业重金招聘Python工程师标准>>> 这篇文章主要讲 RabbitMQ 中 消费者 ack 以及 生产者 confirms. 如上图,生产者把消息发送到 Rabbi ...

  2. Java短信确认机制_JAVA 消息确认机制之 ACK 模式

    JAVA 消息确认机制之 ACK 模式 CLIENT_ACKNOWLEDGE : 客户端手动确认, 这就意味着 AcitveMQ 将不会 "自作主张" 的为你 ACK 任何消息, ...

  3. RabbitMQ的ack机制

    1.什么是消息确认ACK. 答:如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失.为了确保数据不会丢失,RabbitMQ支持消 ...

  4. rabbitmq消息队列 ack机制(消息确认机制)和消息补偿机制

    参考:https://blog.csdn.net/pan_junbiao/article/details/112956537 ack 机制就是消息在 生产者在发布消息以后,消息存在内存中,如果消息被确 ...

  5. rabbitmq怎样确认是否已经消费了消息_【朝夕专刊】RabbitMQ生产者/消费者消息确认...

    欢迎大家阅读<朝夕Net社区技术专刊> 我们致力于.NetCore的推广和落地,为更好的帮助大家学习,方便分享干货,特创此刊!很高兴你能成为忠实读者,文末福利不要错过哦! 上篇文章介绍了R ...

  6. RabbitMQ之消息确认机制(事务+Confirm)

    概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达brok ...

  7. RabbitMQ消息确认机制和消息重发机制

    一.机制 首先我们要知道一条消息的传递过程. 生产者 -> 交换机 ->  队列 我们的生产者生产消息,生产完成的消息发送到交换机,由交换机去把这个消息转发到对应的队列上.这其中我们可能在 ...

  8. 用redis实现消息队列(实时消费+ack机制)【转】

    用redis实现消息队列(实时消费+ack机制) java queue 消息队列 redis 消息队列 首先做简单的引入. MQ主要是用来: 解耦应用. 异步化消息 流量削峰填谷 目前使用的较多的有A ...

  9. RabbitMq(十四)消息的事务支持及代码演示

    在rabbitmq中我们也有类似数据库的事务需求,及当程序运行过程中出现异常时,不能完整的执行一个流程时,为了保持功能完整性,我们需要将之前发送的消息也不让他发送出去,此时就需要使用到rabbitmq ...

最新文章

  1. Date, TimeZone, MongoDB, java中date的时区问题
  2. lxml安装_Beautiful Soup的安装和使用
  3. 软件项目经理新手上路9 - 谁是你的敌人?
  4. Python知识点7——类
  5. Linux_ServicesManagement_RHEL7
  6. 如何解决http封包中gzip编码的html
  7. 二叉树中如何求根节点到任意节点的路径?
  8. 【C++ grammar】引用
  9. 哈希表的构造和查找算法
  10. 在斯坦福,做 Manning 的 phd 要有多强?
  11. docker部署php站点,docker部署php
  12. 阿里巴巴 DevOps 工具体系
  13. Spring AOP 前置通知
  14. homebrew mac_借助Homebrew使从Mac到Linux的转换更加容易
  15. python3携程多任务_python3之携程yield及greenlet
  16. 通过标签向flash传递参数
  17. TCP/IP源码分析
  18. Logstash对nginx的access/error.log日志清洗并数据可视化监控设计
  19. 什么是m叉树_C#的λ表达式树(LambdaExpression)保姆级超详细简单入门教程
  20. wr703n刷openwrt离线下载及upnp共享播放

热门文章

  1. iOS开发--Core Graphics绘图
  2. 基于JavaScript的人物走路动画
  3. 10-253 B2-1查找订单数最多的员工信息
  4. iOS-记一些官网地址
  5. java 校验图片的大小、尺寸、比例
  6. 原厂对NPI安全稽核要求
  7. 杜兰特全部比赛录像合集【百度网盘高清分享】
  8. 计算机学院的学生起个昵称,查询“计算机系”学生的学号、姓名、学生所选课程的课程名和成绩,正确的命令是(..._考试资料网...
  9. Vivado仿真小技巧,让所有模块的波形都可以显示
  10. 归并排序(默认2路归并)