rabbitmq自动及手动ACK
mq的ack 主要是确认消息被消费者消费完成后通知服务器将队列里面的消息清除。
而如果不配置Ack的话呢,我测试过他会自动的忽略,也就是说此时的服务是no_ack=true的模式,就是说只要我发现你是消费了这个数据,至于异常不异常的,我不管了。通知Ack机制就是这么来的,更加灵活的,我们需要Ack不自动,而是手动,这样做的好处,就是使得我们开发人员更加人性化或者灵活的来处理我们的业务罗杰代码,更加方便的处理异常的问题以及数据的返回处理等。下面是通话机制的四条原则:
Basic.Ack 发回给 RabbitMQ 以告知,可以将相应 message 从 RabbitMQ 的消息缓存中移除。
Basic.Ack 未被 consumer 发回给 RabbitMQ 前出现了异常,RabbitMQ 发现与该 consumer 对应的连接被断开,之后将该 message 以轮询方式发送给其他 consumer (假设存在多个 consumer 订阅同一个 queue)。
在 no_ack=true 的情况下,RabbitMQ 认为 message 一旦被 deliver 出去了,就已被确认了,所以会立即将缓存中的 message 删除。所以在 consumer 异常时会导致消息丢失。
来自 consumer 侧的 Basic.Ack 与 发送给 Producer 侧的 Basic.Ack 没有直接关系。
正题部分(配置手动Ack,实现异常消息回滚)
A. 在消费者端的mq配置文件上添加,配置 关键代码为 acknowledeg = "manual",意为表示该消费者的ack方式为手动(此时的queue已经和生产者的exchange通过某个routeKey绑定了)
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> <rabbit:listener queues="queue_xxx" ref="MqConsumer"/> <rabbit:listener queues="queue_xxx" ref="MqConsumer2"/> </rabbit:listener-container>
B. 新建一个类 MqConsumer ,并实现接口 ChannelAwareMessageListener ,实现onMessage方法,不需要指定方法。
springAMQP中已经实现了一个功能,如果该监听器已经实现了下面2个接口,则直接调用onMessage方法
C. 关键点在实现了ChannelAwareMessageListener的onMessage方法后,会有2个参数。
一个是message(消息实体),一个是channel就是当前的通道,很多地方都没有说清楚怎么去手动ack,其实手动ack就是在当前channel里面调用basicAsk的方法,并传入当前消息的tagId就可以了。
消息监听接口实现
1.MessageListener消费者消息监听(自动进行任务完成确认)
基于实现MessageListener的消费者监听消息时,如果xml里没有配置acknowledge,则是默认如同xml配置acknowledge="auto" ,是自动确认消费者完成任务(消息ack), 如果此时消费者抛出异常 ,消息会返回该队列并发送给其他消费者 ,如没有其他消费者 则会继续发到该消费者
如果xml配置中acknowledge="manual",则无法收到消息。该消息会停留在服务器,然后会发给可以收到消息的消费者。
2.ChannelAwareMessageListener消费者消息监听(手动进行任务完成确认)
基于实现ChannelAwareMessageListener的消费者监听消息时,xml配置中acknowledge="auto"或不配置acknowledge时,调用方法进行消费者任务完成确认时会报如下异常(com.rabbitmq.client.ShutdownSignalException: channel error;)
所以若要实现手动消费则任务完成确认,xml的监听标签中需要配置acknowledge="manual" 手动确认消费者任务完成(消息ack)
消息确认 如未调用如下方法确认,则消息不再发到该消费者(如有其它的消费者,则轮询到其他的消费者,否则返回队列保留在服务器),multiple 为 false只确认当前一个消息收到,true确认所有consumer获得的消息
(1) 消息确认 如未确认则消息不再发到该消费者,multiple 为 false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
(2) 消息确认并返回队列 如未确认则消息不在发到该消费者,multiple 为 false只确认当前一个消息收到,true确认所有 consumer获得的消息;requeue 为true该消息重新回到队列,并发到该队列的其他消费者,为false则直接丢掉该消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
(3) 拒绝消息 requeue 为true该消息重新回到队列,并发到该队列的其他消费者,如没有其他消费者,则会一直发到该消费者,为false则直接丢掉该消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
同样的,如果要Nack或者拒绝消息(reject)的时候,也是调用channel里面的basicXXX方法就可以了(当然要制定tagId)。注意如果抛异常或Nack(并且requeue为true),消息会一直重新入队列,一不小心就会重复一大堆消息不断出现~。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // ack返回false,并重新回到队列,api里面解释得很清楚 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息
D. 针对上面所描述的情况,我们在搭建一个消息队列时候,我们的思路应该是这样的,首先,我们要启动ack的手动方式,紧接着,我们处理代码逻辑,如果发生了异常信息,我们首先通知到ack,我已经表示接受到这条数据了,你可以进行删除了,不需要让他自动的重新进入队列中,然后,我们启用一个错误处理,手动将其重新插入队列中,在此之前,有几个类和Api一起来看一下。
1. SimpleMessageListenerContainer
这个是我们的基础监听,他的作用就是队列的总监听,可以为其配置ack模式,异常处理类等。。
2. org.springframework.amqp.support.converter.SimpleMessageConverter
这个类和下面的Converter类很容易搞混淆,这个类的作用是可以解析队列中的 message 转 obj
3. org.springframework.amqp.rabbit.retry.MessageRecoverer
这个接口,需要我们开发者自定义实现,其中的一个方法recover(Message message, Throwable cause),就可以看出来他是干嘛的,就是说在监听出错,也就是没有抓取的异常而是抛出的异常会触发该方法,我们就会在这个接口的实现中,将消息重新入队列
4. org.springframework.util.ErrorHandler
这个接口也是在出现异常时候,会触发他的方法
案例。。。。。。。。。。。。。。。。。。。
生产者
import java.io.IOException;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Component;
@Component
public class MessageProducer{
private Logger logger = LoggerFactory.getLogger(MessageProducer.class);
@Resource(name="amqpTemplate")
private AmqpTemplate amqpTemplate;
@Resource(name="amqpTemplate2")
private AmqpTemplate amqpTemplate2;
public void sendMessage(Object message) throws IOException {
logger.info("to send message:{}", message);
amqpTemplate.convertAndSend("queue.Test.admin", message);
// implements ConfirmCallback
// Message re = (Message)amqpTemplate.convertSendAndReceive("queue.Test.admin", message);
// amqpTemplate2.convertAndSend("queue.Test.admin", message);
}
消费者 1 及其配置文件.xml
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumer implements MessageListener {
private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
@Override
public void onMessage(Message message) {
logger.info("admin MessageConsumer consumer receive message------->:{}", message);
// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
xml配置中acknowledge="auto" 时 是自动确认ack 如果此时消费者抛出异常 消息会发到该队列其他消费者 如没有其他消费者 则会一直发到该消费者
// throw new NullPointerException(".....admin.....消费者异常。。。。。。。。");
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="/"
username="homy" password="homy" host="120.25.212.10" port="5672" channel-cache-size="5"/>
<!--配置connection-factory,指定连接rabbit server参数
<rabbit:connection-factory id="connectionFactory" virtual-host="hymn"
username="hy" password="hy2018627" host="120.25.212.10" port="5672" />
-->
<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
<!--定义queue -->
<rabbit:queue name="queueTest" durable="true" auto-delete="false"
exclusive="false" declared-by="connectAdmin" >
</rabbit:queue>
<!-- 定义direct exchange,绑定queueTest -->
<rabbit:direct-exchange name="exchangeTest"
durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueTest" key="queue.Test.admin"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="exchangeTest"/>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 acknowledge="manual" -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="queueTest" ref="messageConsumer" method="onMessage"/>
</rabbit:listener-container>
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate2" connection-factory="connectionFactory" exchange="exchangeTopic" />
<!--定义queue -->
<rabbit:queue name="queueTest2" durable="true" auto-delete="false"
exclusive="false" declared-by="connectAdmin" />
<!-- 定义direct exchange,绑定queueTest -->
<rabbit:topic-exchange name="exchangeTopic"
durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueTest2" pattern="queue.#"></rabbit:binding>
<rabbit:binding queue="queueTest" pattern="queue.Test.*"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
<rabbit:listener queues="queueTest2" ref="messageConsumer" method="onMessage"/>
</rabbit:listener-container>
</beans>
消费者 2 及其配置文件.xml(另外一个项目)
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
public class MessageConsumer implements ChannelAwareMessageListener {
private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
@Override
public void onMessage(Message message, Channel channel) throws Exception {
// TODO Auto-generated method stub
// 消息监听接口实现
// 1.MessageListener消费者消息监听(自动进行任务完成确认)
// 基于实现MessageListener的消费者监听消息时,如果xml里没有配置acknowledge,则是默认如同xml配置acknowledge="auto" ,是自动确认消费者完成任务(消息ack), 如果此时消费者抛出异常 ,消息会返回该队列并发送给其他消费者 ,如没有其他消费者 则会继续发到该消费者
// 如果xml配置中acknowledge="manual",则无法收到消息。该消息会停留在服务器,然后会发给可以收到消息的消费者。
//
// 2.ChannelAwareMessageListener消费者消息监听(手动进行任务完成确认)
// 基于实现ChannelAwareMessageListener的消费者监听消息时,xml配置中acknowledge="auto"或不配置acknowledge时,调用方法进行消费者任务完成确认时会报如下异常(com.rabbitmq.client.ShutdownSignalException: channel error;)
// 所以若要实现手动消费则任务完成确认,xml的监听标签中需要配置acknowledge="manual" 手动确认消费者任务完成(消息ack)
//
// 消息确认 如未调用如下方法确认,则消息不再发到该消费者(如有其它的消费者,则轮询到其他的消费者),multiple 为 false只确认当前一个消息收到,true确认所有consumer获得的消息
// 消息确认 如未确认则消息不在发到该消费者,multiple 为 false只确认当前一个消息收到,true确认所有consumer获得的消息
// (1)channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 消息确认并返回队列 如未确认则消息不在发到该消费者,multiple 为 false只确认当前一个消息收到,true确认所有consumer获得的消息;requeue 为true该消息重新回到队列,并发到该队列的其他消费者,为false则直接丢掉该消息
// (2)channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
// 拒绝消息 requeue 为true该消息重新回到队列,并发到该队列的其他消费者,如没有其他消费者,则会一直发到该消费者,为false则直接丢掉该消息
// (3)channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
//..........手动消息确认。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
xml配置中acknowledge="auto" 时 是自动确认ack 如果此时消费者抛出异常 消息会发到该队列其他消费者 如没有其他消费者 则会一直发到该消费者
// if(true){
// throw new NullPointerException(".....admin.....消费者异常。。。。。。。。");
// }
logger.error("收到");
//消息确认 如未确认则消息不在发到该消费者,multiple 为 false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
logger.info("business-admin MessageConsumer receive message 出现异常 并将该消息重新入队列------->:{}", message);
logger.info("messageid:"+message.getMessageProperties().getDeliveryTag()+" ...messageBody:"+message.getBody());
//消息确认并返回队列 如未确认则消息不在发到该消费者,multiple 为 false只确认当前一个消息收到,true确认所有consumer获得的消息;requeue 为true该消息重新回到队列,并发到该队列的其他消费者,为false则直接丢掉该消息
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//拒绝消息 requeue 为true该消息重新回到队列,并发到该队列的其他消费者,如没有其他消费者,则会一直发到该消费者,为false则直接丢掉该消息
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
//........................手动通知消息生产者。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="/"
username="homy" password="homy" host="120.25.212.10" port="5672" channel-cache-size="5"/>
<!--配置connection-factory,指定连接rabbit server参数
<rabbit:connection-factory id="connectionFactory2" virtual-host="hymn"
username="hy" password="hy2018627" host="120.25.212.10" port="5672" publisher-confirms="true"/>
-->
<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
<!--定义queue -->
<rabbit:queue name="queueTest" durable="true" auto-delete="false"
exclusive="false" declared-by="connectAdmin" />
<rabbit:queue name="queueTest3" durable="true" auto-delete="false"
exclusive="false" declared-by="connectAdmin" />
<!-- 定义direct exchange,绑定queueTest -->
<rabbit:direct-exchange name="exchangeTest"
durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueTest" key="queue.Test.admin"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="exchangeTest" />
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 acknowledge="manual"-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<rabbit:listener queues="queueTest" ref="messageConsumer" method="onMessage" />
</rabbit:listener-container>
<!-- .............................................................................. -->
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate2" connection-factory="connectionFactory" exchange="exchangeTopic" />
<!--定义queue -->
<rabbit:queue name="queueTest2" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />
<!-- 定义direct exchange,绑定queueTest -->
<rabbit:topic-exchange name="exchangeTopic" durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueTest2" pattern="queue.#"></rabbit:binding>
<rabbit:binding queue="queueTest" pattern="queue.Test.*"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
<rabbit:listener queues="queueTest2" ref="topicConsumer" method="onMessage"/>
</rabbit:listener-container>
</beans>
rabbitmq自动及手动ACK相关推荐
- SpringBoot整合RabbitMQ 消息可靠投递、手动ack、延迟队列、死信队列、消息幂等性保障、消息积压
1.消息可靠投递 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式. confirm 确认模式 ...
- SpringCloudStream——RabbitMQ 手动ACK,Channel 参数为空?
问题描述 使用SpringCloudStream 集成RabbitMQ的过程中,一直无法使用手动ACK功能. SpringCloud版本:Hoxton.RELEASE SpringBoot 版本:2. ...
- RabbitMQ总结(一)--消息队列RabbitMQ应答模式(自动、手动)
原文链接 消息队列RabbitMQ应答模式(自动.手动) 为了确保消息不会丢失,RabbitMQ支持消息应答.消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了.RabbitM ...
- Springboot整合RabbitMQ手动ACK
消息应答 消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务但是只完成了部分突然它挂掉了,会发生什么情况?RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除.在 ...
- rabbitmq设置手动ack报错:Channel closed; cannot ack/nack
记一次rabbitmq设置手动ack报错:Channel closed; cannot ack/nack 报错内容 前置条件 出现错误情况 出现问题原因 解决办法 报错内容 java.lang.Ill ...
- spring-amqp生产者手动ACK
2019独角兽企业重金招聘Python工程师标准>>> <!--加上publisher-confirms=true;channel-cache-size设置为100可以减少由于 ...
- RabbitMQ的消息确认ACK机制
1.什么是消息确认ACK. 答:如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失.为了确保数据不会丢失,RabbitMQ支持消 ...
- Spring Kafka消费模式(single, batch)及确认模式(自动、手动)示例
目录 1. 单记录消费listener.type=single 1.1 单记录消费 - 自动确认 1.2 单记录消费 - 手动确认 2. 批量消费listener.type=batch 2.1 批量消 ...
- RabbitMQ自动扩展消费者源码分析
1 前言 在 RabbitMQ异常监控及动态控制队列消费的解决方案 中,提供了一种在线动态修改消费者数量的方法,但使用该方法需要及时的监控队列消息的堆积情况,不能做到自动扩展(增加或减少)消费者数量, ...
最新文章
- CS131-专题7:图像特征(SIFT算法)
- python执行mysql多个sql语句_mysql -- 一次执行多条sql语句
- python实现简单的api接口-使用Python编写API接口和使用API接口
- java 中美时间_求教用java编写一个程序要求给定一个日期值,计算若干天后的日期值,和给定两个日期计算它们之间相距的天...
- springBoot(20):使用Spring Session实现集群-redis
- matlab信号内插,基于VC++和Matlab的数字信号内插处理系统
- OpenCV python Calibration
- Android输入法
- Joyoshare UltFix快速修复iPhone黑屏死机的问题
- Python_pycharm调试模式+使用pycharm给python传递参数
- python 将txt文件转换为excel_Python实现读取txt文件并转换为excel的方法示例
- 华为自带浏览器无法使用
- Qt之撤销命令视图的使用(QUndoView)
- 小米2miui适配android6,MIUI官方声明:小米2/2S确定升级MIUI6
- oracle查询最近十天日期,ORACLE中距离某日期最近的记录的查询
- FFMPEG NVIDIA硬件加速总结
- 怎么把几个pdf文件合并到一起?
- 惠普计算机启动后不显示桌面为何故,惠普电脑开机屏幕不亮怎么办
- utf-8,gbk,gb2312区别
- 粤嵌gec6818开发板轮流显示颜色
热门文章
- 颉一软件查理:数据变现,始于流通
- MySQL使用ALTER USER修改密码
- CSS3:overflow属性详解
- 狂暴者 pat basic 练习二十五 反转链表
- 解决IOS的h5唤起键盘导致页面元素错位
- lego ev3 matlab,科学网—[转载]【源码】乐高MINDSTORMS EV3硬件的MATLAB支持包 - 刘春静的博文...
- python字典键值唯一_python字典操作详解
- 关于Out of the box翻译【待整理】
- 风车签名管理 for Mac版 - 让签名后的APP可以完全管控和实时监测
- wowpve服务器优势,服务器选择:PVP还是PVE?