Springboot整合RabbitMQ手动ACK
消息应答
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务但是只完成了部分突然它挂掉了,会发生什么情况?RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息,以及后续发送给该消费者的消息,因为它无法接收到。为了保证消息在发送过程中不丢失,引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了。当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
手动应答
- Channel.basicAck (用于肯定确认):RabbitMQ 已知道该消息成功被处理,可以将其丢弃了。
- Channel.basicNack (用于否定确认)
- Channel.basicReject (用于否定确认):与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了。
生产者代码
properties
server.port=8081
#rabbitmq服务器ip
spring.rabbitmq.host=localhost
#rabbitmq的端口
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#配置虚拟机
spring.rabbitmq.virtual-host=demo
#开启发送确认机制,消息到达交换机后会有回调
spring.rabbitmq.publisher-confirm-type=correlated
#可以确保消息在未被队列接收时返回
spring.rabbitmq.publisher-returns=true##发送重试配置
#启用发送重试
#spring.rabbitmq.template.retry.enabled=true
#最大重试次数
#spring.rabbitmq.template.retry.max-attempts=5
#第一次和第二次尝试发布或传递消息之间的间隔
#spring.rabbitmq.template.retry.initial-interval=1000ms
#应用于上一重试间隔的乘数 步长
#spring.rabbitmq.template.retry.multiplier=2
#最大重试时间间隔
#spring.rabbitmq.template.retry.max-interval=10000ms
pom依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.6.3</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.78</version></dependency>
RabbitConfig
@Configuration
@Slf4j
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {@AutowiredRabbitTemplate rabbitTemplate;@PostConstructpublic void initRabbitTemplate(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}/*** durable:是否持久化* exclusive:是否独享、排外的* autoDelete:是否自动删除* @return*/@BeanQueue addUserQueue(){return new Queue(RabbitConstant.QUEUE_ADD_USER,true,false,false);}/*** 消息成功到达交换机触发该方法* @param correlationData* @param ack* @param cause*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack){//消息成功到达交换机log.info("{}消息成功到达交换机",correlationData.getId());}else{log.error("{}消息未到达交换机,原因:{}",correlationData.getId(),cause);}}/*** 配置publisher-returns为true 消息未成功到达队列,会触发该方法* @param returned*/@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("{}消息未到达队列",returned.toString());}
}
这里我们直接用直连交换机,【DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key
相同的 Queue 上】
再写个常量类专门放队列名,交换机名啥的,写到配置文件也可以
RabbitConstant
public class RabbitConstant {/*** 简单消息队列*/public static final String QUEUE_HELLO_MSG = "hello_world_mq";/*** 队列*/public static final String QUEUE_ADD_USER = "queue.add.user";}
ProducerServiceImpl
@Service
public class ProducerServiceImpl implements ProducerService {@AutowiredRabbitTemplate rabbitTemplate;@Overridepublic Boolean addUser(User user) {//这里进行一些操作,然后把用户信息发送到消息队列String userStr = JSON.toJSONString(user);rabbitTemplate.convertAndSend(RabbitConstant.QUEUE_ADD_USER, (Object) userStr,new CorrelationData(UUID.randomUUID().toString()));return true;}
}
实体类User
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {private static final long serialVersionUID = 1809655848237434192L;private Integer id;private String userName;private String describe;
}
ProducerController
@RestController
public class ProducerController {@AutowiredProducerService producerService;@PostMapping("/addUser")public Boolean addUser(@RequestBody User user){return producerService.addUser(user);}}
消费者代码
properties
server.port=8082
#rabbitmq服务器ip
spring.rabbitmq.host=localhost
#rabbitmq的端口
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#配置虚拟机
spring.rabbitmq.virtual-host=demo
#设置消费端手动 ack none不确认 auto自动确认 manual手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
pom
pom就不贴了都一样的
ConsumerService
@Service
@Slf4j
public class ConsumerService {public static final String QUEUE_ADD_USER = "queue.add.user";@RabbitListener(queues =QUEUE_ADD_USER)@RabbitHandlerpublic void addUser(String userStr,Message message, Channel channel){long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//在这里做一些操作User user = JSONObject.parseObject(userStr,User.class);log.info(user.toString());//手动ack 第一个参数是消息的标记,第二个参数代表是false 代表仅仅确认当前消息,为true表示确认之前的所有消息channel.basicAck(deliveryTag,false);} catch (Exception e) {//告诉mq本条消息消费失败try {channel.basicNack(deliveryTag,false,true);} catch (IOException ex) {ex.printStackTrace();}}}
}
测试
发送后我们看消费者这边已经拿到了
来不及截图我又发送了一次,看一下RabbitMQ的控制台
再修改一下消费端代码,直接除零异常,看是否会出现Nack
好了我们再发送一次
而且该消息一直在再投递
下一篇具体讲讲如何处理这种情况
基本概念可以参考此篇博文rabbitmq入门
Springboot整合RabbitMQ手动ACK相关推荐
- SpringBoot整合RabbitMQ 消息可靠投递、手动ack、延迟队列、死信队列、消息幂等性保障、消息积压
1.消息可靠投递 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式. confirm 确认模式 ...
- Springboot——整合Rabbitmq之Confirm和Return详解
文章目录 前言 为什么会有Confirm Springboot 整合 Mq 实现 Confirm 监听机制 依赖引入 增加配置文件,设定连接信息 配置队列.交换机,以及对其进行绑定 编写mq消息发送服 ...
- SpringBoot整合RabbitMQ(包含生产者和消费者)
生产者 创建一个SpringBoot项目springboot-producer,作为RabbitMQ的生产者. 在pom文件中引入相关的依赖坐标 <dependency><group ...
- Springboot 整合RabbitMq ,用心看完这一篇就够了
该篇文章内容较多,包括有rabbitMq相关的一些简单理论介绍,provider消息推送实例,consumer消息消费实例,Direct.Topic.Fanout的使用,消息回调.手动确认等. (但是 ...
- Springboot整合一之Springboot整合RabbitMQ
前言 目前,springboot已然成为了最热的java开发整合框架,主要是因其简单的配置,并且本身提供了很多与第三方框架的整合,甚至可以让我们在短短的几分钟里就可以搭建一个完整的项目架构.所以,博主 ...
- RabbitMQ原理及SpringBoot整合RabbitMQ
RabbitMQ原理及SpringBoot整合RabbitMQ 1. RabbitMQ环境搭建 参考:https://blog.csdn.net/u013071014/article/details/ ...
- 九、springboot整合rabbitMQ
springboot整合rabbitMQ 简介 rabbitMQ是部署最广泛的开源消息代理. rabbitMQ轻量级,易于在内部和云中部署. 它支持多种消息传递协议. RabbitMQ可以部署在分布式 ...
- RabbitMQ,RabbitMQ 的工作模式,Spring 整合 RabbitMQ,Springboot 整合RabbitMQ
什么是RabbitMQ 1.1 MQ概述 MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器.多用于分布式系统之间进行通信. ⚫ MQ,消息队列,存储消息的中间件 ⚫ ...
- SpringBoot整合 ActiveMQ、SpringBoot整合RabbitMQ、SpringBoot整合Kafka
1.概念:SpringBoot 整合消息服务2.具体内容对于异步消息组件在实际的应用之中会有两类:· JMS:代表作就是 ActiveMQ,但是其性能不高,因为其是用 java 程序实现的:· AMQ ...
最新文章
- 深度洞悉2017企业IT三大关注焦点
- makefile文件编写教程
- 【算法与数据结构】关于代码运行时间复杂度的计算方法
- buu Alice与Bob
- windows安装rsync
- ensp安装对电脑配置要求高吗_直线导轨有哪些安装方法?为什么直线导轨安装精度要求高?...
- Python 多线程基本步骤
- 《深入浅出Python机器学习》读书笔记 第一章 概述
- .axf文件_MDK 的编译过程及文件类型全解(一)
- Python微博项目
- 适合程序员学习的国外网站推荐
- 计算机更改为英语,win7如何修改语言 电脑语言改成英文的方法
- 江苏省对口单招分数线计算机,2021年江苏省对口单招分数线公布 江苏省对口单招省控线出炉...
- Linux Shell学习笔记:exit退出状态代码
- 读书笔记:《权力之治:人工智能时代的算法规制》
- 程序员数学(13)--轴对称与等腰三角形
- 将目录专为源码html,LiteOS移植笔记
- day1-计算机基础
- oracle 层层汇总,人在江湖漂,哪能不挨刀之ORACLE分区表(上)
- 2021年php面试题