rabbitmq 之 ack
2019独角兽企业重金招聘Python工程师标准>>>
场景1:对于消息处理失败,有可能有由于网络波动导致的数据处理异常,待网络稳定时消息就会正常处理,对于这种处理失败,应该继续尝试去处理消息
场景2:消息重复处理,例如我们通过消息队列向数据库中添加数据,由于数据库网络波动,导致数据库连接超时,而我们的系统认为消息处理失败,就会把消息回滚到消息队列,继续尝试处理,这时就会造成消息重复处理的现象,对于重要的消息,我们可以每处理一条消息,就记录一下,处理新的消息时,进行判断消息是否已经处理,如果已经处理,就丢弃消息 设置firstQueue队列为手动ack处理
@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer_one(){SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);simpleMessageListenerContainer.addQueues(queueConfig.firstQueue());simpleMessageListenerContainer.setExposeListenerChannel(true);simpleMessageListenerContainer.setMaxConcurrentConsumers(5);simpleMessageListenerContainer.setConcurrentConsumers(1);simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认simpleMessageListenerContainer.setMessageListener(firstConsumer);return simpleMessageListenerContainer;}
生产者
public void send(String uuid,Object message) {CorrelationData correlationId = new CorrelationData(uuid);for (int i = 0; i < 5; i++) {rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTINGKEY1,(Object) (String.valueOf(message)+i), correlationId);}}
未确认ack的消费者
@Component
public class FirstConsumer implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {String msg = new String(message.getBody());// 处理消息System.out.println("FirstConsumer {} handleMessage :"+msg);}
}
执行结果,发现只消费了一条(但是未ack)
由于未确认ack,故在rabbitmq的界面上看到的firstQueue队列的信息见下图
从图中看到队列中有5个消息,unacked(表示未ack确认的有一个),还有四个准备中,消息就算程序收到了 但是未确认ACK导致消息服务器以为他是未成功消费的 后续还会再发。
此时发现程序重启,有接受到了ss0的这条数据。
手动确认ack消费者
@Component
public class FirstConsumer implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {// 每次只接收一个信息channel.basicQos(1);String msg = null;try {msg = new String(message.getBody());//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);if (msg.equals("ss1")){channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(),MessageProperties.PERSISTENT_TEXT_PLAIN, "重新放入队列中的数据".getBytes());}} catch (Exception e) {e.printStackTrace();System.out.println("FirstConsumer consumer fail");// 丢弃信息channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);}// 处理消息System.out.println("FirstConsumer {} handleMessage :"+msg);}
}
打断点查看下
此时有断点,再次去rabbitmq的界面查看如图
此时会发现,队列中只有2个消息了,还有一个未ack(因为打断点了),故需要手动调用channel.basicAck告知服务器,消费者已经消费了可以从队列中删除了。
转载于:https://my.oschina.net/u/3370769/blog/3001446
rabbitmq 之 ack相关推荐
- RabbitMQ的ack机制
1.什么是消息确认ACK. 答:如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失.为了确保数据不会丢失,RabbitMQ支持消 ...
- SpringCloudStream——RabbitMQ 手动ACK,Channel 参数为空?
问题描述 使用SpringCloudStream 集成RabbitMQ的过程中,一直无法使用手动ACK功能. SpringCloud版本:Hoxton.RELEASE SpringBoot 版本:2. ...
- rabbitmq消息ACK确认机制及发送失败处理
rabbitmq为确保消息发送和接收成功,采用ack机制. (1)生产者producter发送消息到mq时,mq会发送ack给producter告知消息是否投递成功: (2)消费者consumer接收 ...
- Springboot整合RabbitMQ手动ACK
消息应答 消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务但是只完成了部分突然它挂掉了,会发生什么情况?RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除.在 ...
- RabbitMQ的消息确认ACK机制
1.什么是消息确认ACK. 答:如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失.为了确保数据不会丢失,RabbitMQ支持消 ...
- 消息中间件--RabbitMQ ---高级特性之消费端ACK与重回队列
什么是消费端的ACK和重回队列? 消费端的手工ACK和NACK 消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿 如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障 ...
- RabbitMQ:消费者ACK机制、生产者消息确认
文章目录 基础案例环境搭建: 环境: 1. 生产者发送消息确认 1.1 confirm 确认模式 1.2 return 退回模式 源代码 1.1.3 小结 2. 消费者签收消息(ACK) 2.1 代码 ...
- springboot 集成 RabbitMQ confirm 确认模式和 return 回退模式以及Consumer Ack模式
springboot 集成 RabbitMQ confirm 确认模式和 return 回退模式以及Consumer Ack模式 说明: RabbitMQ消息的可靠投递 在使用 RabbitMQ 的时 ...
- RabbitMQ指南(上)
原文出处: Listen RabbitMQ是一个消息中间件,在一些需要异步处理.发布/订阅等场景的时候,使用RabbitMQ可以完成我们的需求. 下面是我在学习RabbitMQ的过程中的一些记录,内容 ...
最新文章
- 通过源代码研究ASP.NET MVC中的Controller和View(二)
- QuickTime 0day ***代码发布,可能允许执行任意代码
- openwrt安装蒲公英_不会OpenWRT、软路由也能双宽带叠加,蒲公英X6快速上手体验...
- mysql递归层次查询
- oracle中的日期查询在mybatis中写法
- c语言中栈堆,全程剖析C语言中堆和栈的区别
- DeFi收益聚合协议Pickle Finance与APY Vision达成合作
- JS与Jquery学习笔记(一)
- 如何查看mysql数据库的引擎/MySQL数据库引擎详解
- python培训班-Python培训机构有哪些值得推荐?
- 全网最详细的Windows里下载与安装Sublime Text *(图文详解)
- apache php过期,设置HTTP使用PHP和Apache过期头
- java12安装和配置_JDK12 安装和环境变量配置
- 山石网科Hillstone防火墙L2TP详细配置步骤(官方最新版)
- 使用gmediarender-resurrect搭建DLNA音箱
- 教你如何使用 python 制作一个简单的密码本
- Oracle数据库表空间不足 ORA-01653:unable to extend table 表名称 by 8192 in tablespace 表空间名称
- mysql logs_MySQL Logs
- 光纤入户后,光猫怎么放?
- express 搭建简易的本地服务器
热门文章
- mail linux 客户端,Nylas Mail: 一个 Linux 的免费邮件客户端
- oppoJava面试!mysql客户端安装包
- 如何减少mysql的连接时间_mysql连接的空闲时间超过8小时后 MySQL自动断开该连接解决方案...
- ie 传递给系统调用的数据区域太小。_RFID银行资产管理系统,智能,简便,易操作...
- 求标准体重Java题_Java习题
- post请求体 ajax,ajax的post请求 @RequestBody解析问题
- 出现红字是电脑问题吗_苹果12还会出现信号不好的问题吗?
- gini系数 决策树_决策树系列--ID3、C4.5、CART
- 在React的render方法中使用箭头函数
- 机箱一直反复开机熄火_小身材大容量,老炮九州风神魔方110机箱+DQ 650ST+玄冰400双刃装机体验...