目录

1.生产者发消息到交换机时候的消息确认

2.交换机给队列发消息时候的消息确认

3.备用队列

3.消费者手动ack

rabbitmq的发布确认方式,可以有效的保证我们的数据不丢失。

消息正常发送的流程是:生产者发送消息到交换机,然后交换机通过路由键把消息发送给对应的队列,然后消费者监听队列消费消息

但是如果生产者发送的消息,交换机收不到呢,又或者交换机通过路由键给对应的队列发消息时,路由键不存在呢,这些就是消息发布确认所要解决的问题

消息的发布确认分别有:

  • 生产者发消息到交换机时候的消息确认
  • 以及交换机发消息给队列的消息确认

先在application.properties配置文件中加上以下代码:

# 确认消息已发送到交换机(Exchange)
spring.rabbitmq.publisher-confirm-type= correlated
# 确认消息已发送到队列
spring.rabbitmq.publisher-returns= true

# 确认消息已发送到交换机(Exchange)
spring.rabbitmq.publisher-confirm-type= correlated

这个意思是开启confirm模式,这样的话,当生产者发送消息的时候,无论交换机是否收到,都会触发回调方法

1.生产者发消息到交换机时候的消息确认

 写一个容器:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;// ConfirmCallback:消息只要发出,无论交换机有没有接到消息,都会触发ConfirmCallback类的confirm方法
// ConfirmCallback是有个内部类@Component
public class messageConfirm implements RabbitTemplate.ConfirmCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);}/**** @param correlationData correlationData是发送消息时候携带的消息* @param ack 如果为true,表示交换机接收到消息了* @param message 异常消息*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String message) {if (ack){System.out.println("交换机收到消息成功:" + correlationData.getId());}else {System.out.println("交换机收到消息失败:" + correlationData.getId() + "原因:" + message);}}}

RabbitTemplate.ConfirmCallback是一个内部接口类,只要生产者往交换机发送消息,都会该触发ConfirmCallback类的confirm方法

注意:
        因为RabbitTemplate.ConfirmCallback是一个内部类,所以我们要通过    @PostConstruct注解,把当前类赋值给ConfirmCallback

配置类:

package com.example.rabbitmq.发布确认;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class messageConfrimConfig {@Beanpublic DirectExchange getConfrimTopic(){// 创建一个直接交换机return ExchangeBuilder.directExchange("ljl-ConfrimTopic").build();}@Beanpublic Queue getConfrimQueue(){return new Queue("ljl-ConfrimQueue");}@Beanpublic Binding TopicConfrimBinding(){return BindingBuilder.bind(getConfrimQueue()).to(getConfrimTopic()).with("messageConfirm");}}

消费者:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;@Component
public class clientConfirm {@RabbitListener(queues = "ljl-ConfrimQueue")@RabbitHandlerpublic void ConfrimQueue(Message message) {System.out.println("正常队列正常接收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));}}

生产者:

@RestController
public class testConfirmController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping(value = "/sendMessageConfirm")public String sendMessageConfirm(){HashMap<String, Object> mapExchange = new HashMap<>();mapExchange.put("message","测试交换机的发布确认消息");// 关联数据的一个类,交换机无论有没有收到生产者发送的消息,都会返回这个对象CorrelationData correlationData = new CorrelationData();correlationData.setId(UUID.randomUUID().toString());// 这个是正常发送的,交换机的名称,跟路由键的名称都是存在的rabbitTemplate.convertAndSend("ljl-ConfrimTopic","messageConfirm",JSONObject.toJSONString(mapExchange),correlationData);return "成功";}}

直接运行项目代码:http://localhost:8080/sendMessageConfirm

可以看到消息正常发送,正常消费,然后交换机回调方法

当交换机不存在的时候:

一样会触发回调方法,然后打印错误消息

2.交换机给队列发消息时候的消息确认

        写一个容器,实现 RabbitTemplate.ReturnCallback 接口,重写 returnedMessage 方法,这个方法是当交换机推送消息给队列的时候,路由键不存在就触发的方法
        注意:
                因为RabbitTemplate.ReturnCallback是一个内部类,所以我们要通过    @PostConstruct注解,把当前类赋值给ReturnCallback

写一个容器类:

package com.example.rabbitmq.发布确认;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;// ConfirmCallback:消息只要发出,无论交换机有没有接到消息,都会触发ConfirmCallback类的confirm方法
// ConfirmCallback是个内部类// ReturnCallback是个内部类
// ReturnCallback:但不可路由的时候,触发回调方法
@Component
public class messageConfirm implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}/**** @param correlationData correlationData是发送消息时候携带的消息* @param ack 如果为true,表示交换机接收到消息了* @param message 异常消息*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String message) {if (ack){System.out.println("交换机收到消息成功:" + correlationData.getId());}else {System.out.println("交换机收到消息失败:" + correlationData.getId() + "原因:" + message);}}// 当routingkey不存在的时候,会触发该方法/**** @param message 消息主体* @param code 错误码* @param text 错误消息* @param exchange 推送该消息的交换机* @param routingkey 推送消息时的routingkey*/@Overridepublic void returnedMessage(Message message, int code, String text, String exchange, String routingkey) {System.out.println("交换机推送消息到队列失败,推送的消息是:" + new String(message.getBody()) + "错误原因:" + text);}
}

生产者:

       // 这个是正常发送到交换机的,但是路由建的名称不存在rabbitTemplate.convertAndSend("ljl-ConfrimTopic","messageConfirmAnomaly",JSONObject.toJSONString(mapExchange),correlationData);

运行代码看效果:

3.备用队列

当消息不可路由的时候,mq会触发returncallback接口的回调方法,把不可路由的消息回调回来,但是这有个问题,就是消息虽然回调过来了,但是并没有消费者去把不可路由的消息给消费掉,所以这个时候就要加一个备用队列和一个报警队列,报警队列的作用是用来通知管理员,有什么消息被回退了....然后备用队列是把消息给保存起来,需要的时候就从备用队列中取数据出来使用
        注意:当我们设置了备用队列的时候,returncallback接口的回调方法将不会被触发,

但是当消息不可路由,而且备用队列也不能使用的时候,才会触发returncallback接口的回调方法,也就是说,触发回调方法在最终条件是消息无法被任何一个队列接受,在mq丢弃前才会触发回调方法

配置类(加入备用交换机,备用队列,报警队列,然后使用的是扇形交换机):

alternate-exchange 参数:设置备用交换机,当消息不可路由的时候就会把消息推送到该交换机上
@Configuration
public class messageConfrimConfig {@Beanpublic DirectExchange getConfrimTopic(){// 创建一个直接交换机
//        return ExchangeBuilder.directExchange("ljl-ConfrimTopic").build();//       alternate-exchange 参数:设置备用交换机,当消息不可路由的时候就会把消息推送到该交换机上return ExchangeBuilder.directExchange("ljl-ConfrimTopic").withArgument("alternate-exchange","ljl-standbyFanoutExchange").build();}@Beanpublic Queue getConfrimQueue(){return new Queue("ljl-ConfrimQueue");}@Beanpublic Binding TopicConfrimBinding(){return BindingBuilder.bind(getConfrimQueue()).to(getConfrimTopic()).with("messageConfirm");}// 备用交换机,备用队列,报警队列@Beanpublic FanoutExchange standbyFanoutExchange(){// 备用交换机return new FanoutExchange("ljl-standbyFanoutExchange");}@Beanpublic Queue getstandbyQueue(){// 备用队列return new Queue("ljl-standbyQueue");}@Beanpublic Queue getalarmQueue(){// 报警队列return new Queue("ljl-alarmQueue");}// 设置备用队列和备用交换机的绑定关系@Beanpublic Binding standbyExchagneBinding(){return BindingBuilder.bind(getstandbyQueue()).to(standbyFanoutExchange());}// 设置报警队列和备用交换机的绑定关系@Beanpublic Binding alarmExchagneBinding(){return BindingBuilder.bind(getalarmQueue()).to(standbyFanoutExchange());}}

在消费者上:

@Component
public class clientConfirm {@RabbitListener(queues = "ljl-ConfrimQueue")@RabbitHandlerpublic void ConfrimQueue(Message message) {System.out.println("正常队列正常接收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));}@RabbitListener(queues = "ljl-alarmQueue")@RabbitHandlerpublic void alarmQueue(Message message) {System.out.println("报警队列接收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));}}

执行代码看效果(此时的生产者发送给mq的路由键还是不存在的):

这个时候会发现我们设置的备用交换机没有起到效果,这是因为我们在修改参数的时候

在mq中并没有起到效果,在是因为原本‘ljl-ConfrimTopic' 交换机已经存在,写的参数并不会覆盖之前的,我们需要把这个交换机给删掉,然后再执行一起看下效果:

报警交换机的作用生效了,不可路由的时候不会触发 returncallback接口的回调方

3.消费者手动ack

在配置文件中加入:

#开启手动ack
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#设置消费者每一次拉取的条数
spring.rabbitmq.listener.simple.prefetch= 5

消费者:

在消费者的方法上,加上这个类(Channel channel),然后这个类有几个方法:

1.消费者正常消费完成该消息,手动返回ack,然后队列把消息移除掉:

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
参数1:message.getMessageProperties().getDeliveryTag()表示的是这条消息在队列中的一个标志,删除的时候也是根据这个标志来进行删除
参数2:是否要批量确认,这个意思是:是否把小于等于message.getMessageProperties().getDeliveryTag()值的消息批量确认

2.消费者在消费消息的过程中,出现了异常,那么就可以使用channel.basicNack方法

channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
参数1:标志
参数2:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
参数3:是否重新进入队列 出现异常的时候,可以的用参数3来指定该条消息是否重新入队,然后参数2来控制这个操作是否批量操作

对于手动ack以及消息阻塞的一些总结:
        假设生产者发送了一百条消息
        现在只有一个消费者,然后设置消费者每一次拉取10条消息来消费(默认好像200多条),这个时候的正常流程就是消费者拉取一批消息,然后正常消费,通过返回ack,接着拉取消息来进行下一批消费,假如出现异常那就需要使用basicNack方法来判断是否要重新入队,但是异常消息入队后,被消费者重新消费,还是会出现异常,这个时候就会一直循环,造成消息堆积
        两个消费者:假设其中一个消费者A可以正常消费消息并正常返回ack,而另外一个消费者B会中会出现异常,使用basicNack方法让消息重新入队,然后重新入队的消息有可能会被消费者A获取,然后正常消费并正常手动返回ack

        面试题:如何rabbitmq确保消息不丢失/消息的可靠性
        在生产者生成消息的时候,去开启confirm模式,写一个容器类去实行confirmcallback接口,这样交换机是否成功收到消息都会触发回调方法,然后在声明交换机,声明队列,以及发送消息的时候,做持久化处理,然后开启消息回退模式,写一个容器类去实现returncallback接口,这样当交换机推送消息给队列时,如果失败会触发回调方法,在消费者这边,开启手动ack模式,确保消息正常执行完毕,然后还可以去配置备用队列跟死信队列,这样就可以基本上确保mq的消息不会丢失了

以上就是总体的解答思路,大家用自己的话来总结就行

springboot整合rabbitmq的发布确认,消费者手动返回ack,设置备用队列,以及面试题:rabbitmq确保消息不丢失相关推荐

  1. SpringBoot整合redis实现发布订阅模式

    Redis的发布订阅模式 发布订阅(Pub/Sub):目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接 ...

  2. springboot 与rabbitmq集成+生产者投递确认+消费者手动确认+TTL+死信队列+延时队列

    1.生产者的消息可靠性投递机制 1)springboot yml文件配置 spring:rabbitmq:host: 10.0.23.83username: lifwepassword: 123456 ...

  3. RabbitMQ异步发布确认

    异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是 ...

  4. RabbitMQ单个发布确认

    这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布, waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候 ...

  5. SpringBoot整合activeMQ消息队列手动签收(Session.CLIENT_ACKNOWLEDGE)为什么失效啊?

    今天在家隔离办公,不太忙,然后就琢磨起来消息队列activeMQ的消息事务来解决分布式事务,但是奈何在SpringBoot整合activeMQ时,其消费者手动签收消息时出现了问题-->当acti ...

  6. RabbitMQ 消费者回执和发布确认

    为了保证数据安全,消费者和生产者的回执(ack)都是非常重要的. 由于我们无法保证消息都能像我们期望的那样,正常到达另一端或者被 Consumer 消费成功.因此,publisher 和 consum ...

  7. 十、RabbitMQ发布确认高级

    RabbitMQ发布确认高级 发布确认SpringBoot版本 发布确认Springboot版本 简单的发布确认机制在应答与签收已经介绍,本内容将介绍整合了 SpringBoot 的发布确认机制. 介 ...

  8. RabbitMQ 从入门到精通 消息应答 持久化 交换机 队列 发布确认 集群 等

    RabbitMQ消息队列 RabbitMQ 的概念 RabbitMQ 是一个消息中间件:它接受并转发消息.你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快 ...

  9. Springboot 整合 RabbitMQ(二)

    承接上文 Springboot 整合RabbitMQ_Maplelhw的博客-CSDN博客 开启消息手动确认 yaml文件: spring:rabbitmq:host: 服务器地址port: 5672 ...

最新文章

  1. 做向量召回 All You Need is 双塔
  2. Centos7-firewall-cmd
  3. RabbitMQ有5种工作模式
  4. 深入浅出TCPIP之实战篇—用c++开发一个http服务器(二十一)
  5. 留守女孩携笔从戎,被录取为空军飞行员
  6. 深入研究 C++中的 STL Deque 容器
  7. 使用Kendo上传控件实现ASP.NET Core的“批处理模式”
  8. powershell自动化操作AD域、Exchange邮箱系列(4)—批量导入邮箱/域账号(文件)
  9. 网络抓包,不能使用路由器和交换机,必须是具有镜像功能的HUB(集线器)
  10. Windows系统口令扫描之——使用Tscrack扫描3389口令
  11. litepal更新数据失败
  12. Msql自定义函数和存储过程
  13. 小米html查看器打开,小米文档查看器APP
  14. python:使用 PythonMagick 生成 icon 图标
  15. camus执行任务,偶发性异常
  16. 用对象流把对象存到文件中,再从文件中读取出来打印。
  17. 手机回收价格—换换回收让用户低成本换机
  18. 项目管理中-采购管理
  19. Replika Software完成从LVMH和欧莱雅的A轮融资
  20. 飞桨创意赛火热进行中,总有一款AI时代Chatbot适合你

热门文章

  1. 2020年产品经理职业发展路径
  2. python代码画樱花带图片_python编程——pygame画樱花树
  3. DSR评分对 搜索权重 、转化率、淘宝活动、是否成为金牌卖家的影响
  4. Excel无法响应,挂起,冻结或停止工作
  5. 离职了半年了,大家觉得我为啥离职呢?
  6. 百草味荣获“食品安全诚信单位“奖 食品安全质量获行业肯定
  7. python中Pandas之DataFrame索引、选取数据
  8. 川土微电子|推出带隔离电源的双通道数字隔离器
  9. 《北风那个吹》大结局看完了
  10. 10大开源的快速开发平台