java确认rabbitmq_RabbitMQ 消息确认机制
生产端 Confirm 消息确认机制
消息的确认,是指生产者投递消息后,如果 Broker 收到消息,则会给我们生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到 Broker ,这种方式也是消息的可靠性投递的核心保障!
Confirm 确认机制流程图
如何实现Confirm确认消息?
第一步:在 channel 上开启确认模式: channel.confirmSelect()
第二步:在 channel 上添加监听: channel.addConfirmListener(ConfirmListener listener);, 监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理!
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
public class ConfirmProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_confirm_exchange";
String routingKey = "item.update";
//指定消息的投递模式:confirm 确认模式
channel.confirmSelect();
//发送
final long start = System.currentTimeMillis();
for (int i = 0; i < 5 ; i++) {
String msg = "this is confirm msg ";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
System.out.println("Send message : " + msg);
}
//添加一个确认监听, 这里就不关闭连接了,为了能保证能收到监听消息
channel.addConfirmListener(new ConfirmListener() {
/**
* 返回成功的回调函数
*/
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("succuss ack");
System.out.println(multiple);
System.out.println("耗时:" + (System.currentTimeMillis() - start) + "ms");
}
/**
* 返回失败的回调函数
*/
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.printf("defeat ack");
System.out.println("耗时:" + (System.currentTimeMillis() - start) + "ms");
}
});
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConfirmConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_confirm_exchange";
String queueName = "test_confirm_queue";
String routingKey = "item.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, false, false, false, null);
//一般不用代码绑定,在管理界面手动绑定
channel.queueBind(queueName, exchangeName, routingKey);
//创建消费者并接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//设置 Channel 消费者绑定队列
channel.basicConsume(queueName, true, consumer);
}
}
我们此处只关注生产端输出消息
Send message : this is confirm msg
Send message : this is confirm msg
Send message : this is confirm msg
Send message : this is confirm msg
Send message : this is confirm msg
succuss ack
true
耗时:3ms
succuss ack
true
耗时:4ms
注意事项
我们采用的是异步 confirm 模式:提供一个回调方法,服务端 confirm 了一条或者多条消息后 Client 端会回调这个方法。除此之外还有单条同步 confirm 模式、批量同步 confirm 模式,由于现实场景中很少使用我们在此不做介绍,如有兴趣直接参考官方文档。
我们运行生产端会发现每次运行结果都不一样,会有多种情况出现,因为 Broker 会进行优化,有时会批量一次性 confirm ,有时会分开几条 confirm。
succuss ack
true
耗时:3ms
succuss ack
false
耗时:4ms
或者
succuss ack
true
耗时:3ms
Return 消息机制
Return Listener 用于处理一-些不可路 由的消息!
消息生产者,通过指定一个 Exchange 和 Routingkey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作!
但是在某些情况下,如果我们在发送消息的时候,当前的 exchange 不存在或者指定的路由 key 路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用 Return Listener !
在基础API中有一个关键的配置项:Mandatory:如果为 true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为 false,那么 broker 端自动删除该消息!
Return 消息机制流程图
Return 消息示例
首先我们需要发送三条消息,并且故意将第 0 条消息的 routing Key设置为错误的,让他无法正常路由到消费端。
mandatory 设置为 true 路由不可达的消息会被监听到,不会被自动删除.即channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());
最后添加监听即可监听到不可路由到消费端的消息channel.addReturnListener(ReturnListener r))
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReturnListeningProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_return_exchange";
String routingKey = "item.update";
String errRoutingKey = "error.update";
//指定消息的投递模式:confirm 确认模式
channel.confirmSelect();
//发送
for (int i = 0; i < 3 ; i++) {
String msg = "this is return——listening msg ";
//@param mandatory 设置为 true 路由不可达的消息会被监听到,不会被自动删除
if (i == 0) {
channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());
} else {
channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
}
System.out.println("Send message : " + msg);
}
//添加一个确认监听, 这里就不关闭连接了,为了能保证能收到监听消息
channel.addConfirmListener(new ConfirmListener() {
/**
* 返回成功的回调函数
*/
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("succuss ack");
}
/**
* 返回失败的回调函数
*/
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.printf("defeat ack");
}
});
//添加一个 return 监听
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("return relyCode: " + replyCode);
System.out.println("return replyText: " + replyText);
System.out.println("return exchange: " + exchange);
System.out.println("return routingKey: " + routingKey);
System.out.println("return properties: " + properties);
System.out.println("return body: " + new String(body));
}
});
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReturnListeningConsumer {
public static void main(String[] args) throws Exception {
//1. 创建一个 ConnectionFactory 并进行设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
//2. 通过连接工厂来创建连接
Connection connection = factory.newConnection();
//3. 通过 Connection 来创建 Channel
Channel channel = connection.createChannel();
//4. 声明
String exchangeName = "test_return_exchange";
String queueName = "test_return_queue";
String routingKey = "item.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, false, false, false, null);
//一般不用代码绑定,在管理界面手动绑定
channel.queueBind(queueName, exchangeName, routingKey);
//5. 创建消费者并接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//6. 设置 Channel 消费者绑定队列
channel.basicConsume(queueName, true, consumer);
}
}
我们只关注生产端结果,消费端只收到两条消息。
Send message : this is return——listening msg
Send message : this is return——listening msg
Send message : this is return——listening msg
return relyCode: 312
return replyText: NO_ROUTE
return exchange: test_return_exchange
return routingKey: error.update
return properties: #contentHeader(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
return body: this is return——listening msg
succuss ack
succuss ack
succuss ack
消费端 Ack 和 Nack 机制
消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿!如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功!消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker!一般我们在实际应用中,都会关闭重回队列,也就是设置为False。
参考 api
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
void basicAck(long deliveryTag, boolean multiple) throws IOException;
如何设置手动 Ack 、Nack 以及重回队列
首先我们发送五条消息,将每条消息对应的循环下标 i 放入消息的 properties 中作为标记,以便于我们在后面的回调方法中识别。
其次, 我们将消费端的 ·channel.basicConsume(queueName, false, consumer); 中的 autoAck属性设置为 false,如果设置为true的话 将会正常输出五条消息。
我们通过 Thread.sleep(2000)来延时一秒,用以看清结果。我们获取到properties中的num之后,通过channel.basicNack(envelope.getDeliveryTag(), false, true);将 num为0的消息设置为 nack,即消费失败,并且将 requeue属性设置为true,即消费失败的消息重回队列末端。
import com.rabbitmq.client.*;
import java.util.HashMap;
import java.util.Map;
public class AckAndNackProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String routingKey = "item.update";
String msg = "this is ack msg";
for (int i = 0; i < 5; i++) {
Map headers = new HashMap();
headers.put("num" ,i);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.headers(headers)
.build();
String tem = msg + ":" + i;
channel.basicPublish(exchangeName, routingKey, true, properties, tem.getBytes());
System.out.println("Send message : " + msg);
}
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
public class AckAndNackConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String queueName = "test_ack_queue";
String routingKey = "item.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, false, false, false, null);
//一般不用代码绑定,在管理界面手动绑定
channel.queueBind(queueName, exchangeName, routingKey);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if ((Integer) properties.getHeaders().get("num") == 0) {
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//6. 设置 Channel 消费者绑定队列
channel.basicConsume(queueName, false, consumer);
}
}
我们此处只关心消费端输出,可以看到第 0 条消费失败重新回到队列尾部消费。
[x] Received 'this is ack msg:1'
[x] Received 'this is ack msg:2'
[x] Received 'this is ack msg:3'
[x] Received 'this is ack msg:4'
[x] Received 'this is ack msg:0'
[x] Received 'this is ack msg:0'
[x] Received 'this is ack msg:0'
[x] Received 'this is ack msg:0'
[x] Received 'this is ack msg:0'
java确认rabbitmq_RabbitMQ 消息确认机制相关推荐
- springboot + rabbitmq 用了消息确认机制,感觉掉坑里了
最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人 ...
- RabbitMQ—发布消息确认和消费消息确认
目录 序言 消息发布流程 发布消息确认 一.事务使用 二.Confirm发送方确认模式 方式一:普通Confirm模式 方式二:批量Confirm模式 方式三:异步Confirm模式 扩展知识 消费消 ...
- Java笔记-RabbitMQ的消息确认机制(事务)
目录 基本概念 代码与实例 基本概念 消息应答与消息持久化,如下代码: boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAc ...
- Java短信确认机制_JAVA 消息确认机制之 ACK 模式
JAVA 消息确认机制之 ACK 模式 CLIENT_ACKNOWLEDGE : 客户端手动确认, 这就意味着 AcitveMQ 将不会 "自作主张" 的为你 ACK 任何消息, ...
- activemq 消息阻塞优化和消息确认机制优化
一.消息阻塞优化 1.activemq消费者在从待消费队列中获取消息是会先进行预读取,默认是1000条(prefetch=1000).这样很容易造成消息积压. 2.可以通过设置prefetch的默认值 ...
- RabbitMQ(八):SpringBoot 整合 RabbitMQ(三种消息确认机制以及消费端限流)
说明 本文 SpringBoot 与 RabbitMQ 进行整合的时候,包含了三种消息的确认模式,如果查询详细的确认模式设置,请阅读:RabbitMQ的三种消息确认模式 同时消费端也采取了限流的措施, ...
- Storm编程入门API系列之Storm的可靠性的ACK消息确认机制
概念,见博客 Storm概念学习系列之storm的可靠性 什么业务场景需要storm可靠性的ACK确认机制? 答:想要保住数据不丢,或者保住数据总是被处理.即若没被处理的,得让我们知道. publi ...
- RabbitMQ消息确认机制
文章目录 1. 事务机制 2. Confirm模式 2.1 生产者 2.1.1 普通Confirm模式 2.1.2 批量Confirm模式 2.1.3 异步Confirm模式 2.2 消费者 3. 其 ...
- RabbitMQ 基本消息模型和消息确认机制
01 前言 关于 RabbitMQ 服务器的安装,本章节不做介绍,请培养个人动手能力,自行百度解决.RabbitMQ 成功安装后(win 版),浏览器输入:localhost:15672,则可以进入 ...
最新文章
- HDU2544(Dijstra算法)
- 初学flex时候搞得一个大头贴工具(开源)
- python拆堆和堆叠的操作_python - 如何合并不同的DFS并堆叠值? - 堆栈内存溢出
- android beta项目官方页面,安卓7.0开发者预览版如何安装?Android Beta项目正式上线...
- linux ping 虚拟网卡_虚拟机中Linux系统网卡的配置
- 输入控件控制输入限制
- 【书海泛舟】伤心咖啡馆之歌
- postman使用之二:数据同步和创建测试集
- Pytest学习-如何在用例代码中调用fixtrue时传入参数
- python使用-如何在Windows上使用Python进行开发
- java证书 查看cacer_R 语言关于 SSL 证书异常处理笔记
- python基于paramiko模块实现远程连接Linux虚拟机(服务器)并执行指定命令返回输出结果
- web_submit_data详解
- iOS16 系统更新教程,测试版描述文件下载
- php实现金币提现,PHP调用支付宝转账接口实现支付宝提现
- Python数据分析之智联招聘职位分析完整项目(数据爬取,数据分析,数据可视化)
- 随机过程基础1--随机过程与宽平稳
- ie地址栏不能识别中文参数(google浏览器是正常的)
- win10pin不可用进不去系统_人脸识别门禁控制系统+安检通道
- mysql查询同名同姓重名人数,查全国同名同姓人数,姓名重名查询系统全国
热门文章
- 具有ESB,API管理和Now ..服务网格的应用程序网络功能。
- Java EE与Java SE:Oracle是否放弃了企业软件?
- jframe透明_使JFrame透明
- 使用tinylog 1.1改进您在Java EE应用程序中的登录
- restful web_RESTful Web服务可发现性,第4部分
- RIP GlassFish –感谢所有的鱼。
- JavaFX 2.0布局窗格– HBox和VBox
- 我如何向团队解释依赖注入
- Tomcat 7上具有RESTeasy JAX-RS的RESTful Web服务-Eclipse和Maven项目
- php注册页面模板,选项卡式WordPress登陆注册模板