生产端 Confirm 消息确认机制

消息的确认,是指生产者投递消息后,如果 Broker 收到消息,则会给我们生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到 Broker ,这种方式也是消息的可靠性投递的核心保障!

Confirm 确认机制流程图

如何实现Confirm确认消息?

第一步:在 channel 上开启确认模式: channel.confirmSelect()

第二步:在 channel 上添加监听: channel.addConfirmListener(ConfirmListener listener);, 监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理!

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.ConfirmListener;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

public class ConfirmProducer {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

String exchangeName = "test_confirm_exchange";

String routingKey = "item.update";

//指定消息的投递模式:confirm 确认模式

channel.confirmSelect();

//发送

final long start = System.currentTimeMillis();

for (int i = 0; i < 5 ; i++) {

String msg = "this is confirm msg ";

channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

System.out.println("Send message : " + msg);

}

//添加一个确认监听, 这里就不关闭连接了,为了能保证能收到监听消息

channel.addConfirmListener(new ConfirmListener() {

/**

* 返回成功的回调函数

*/

public void handleAck(long deliveryTag, boolean multiple) throws IOException {

System.out.println("succuss ack");

System.out.println(multiple);

System.out.println("耗时:" + (System.currentTimeMillis() - start) + "ms");

}

/**

* 返回失败的回调函数

*/

public void handleNack(long deliveryTag, boolean multiple) throws IOException {

System.out.printf("defeat ack");

System.out.println("耗时:" + (System.currentTimeMillis() - start) + "ms");

}

});

}

}

import com.rabbitmq.client.*;

import java.io.IOException;

public class ConfirmConsumer {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

factory.setAutomaticRecoveryEnabled(true);

factory.setNetworkRecoveryInterval(3000);

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

String exchangeName = "test_confirm_exchange";

String queueName = "test_confirm_queue";

String routingKey = "item.#";

channel.exchangeDeclare(exchangeName, "topic", true, false, null);

channel.queueDeclare(queueName, false, false, false, null);

//一般不用代码绑定,在管理界面手动绑定

channel.queueBind(queueName, exchangeName, routingKey);

//创建消费者并接收消息

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte[] body)

throws IOException {

String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");

}

};

//设置 Channel 消费者绑定队列

channel.basicConsume(queueName, true, consumer);

}

}

我们此处只关注生产端输出消息

Send message : this is confirm msg

Send message : this is confirm msg

Send message : this is confirm msg

Send message : this is confirm msg

Send message : this is confirm msg

succuss ack

true

耗时:3ms

succuss ack

true

耗时:4ms

注意事项

我们采用的是异步 confirm 模式:提供一个回调方法,服务端 confirm 了一条或者多条消息后 Client 端会回调这个方法。除此之外还有单条同步 confirm 模式、批量同步 confirm 模式,由于现实场景中很少使用我们在此不做介绍,如有兴趣直接参考官方文档。

我们运行生产端会发现每次运行结果都不一样,会有多种情况出现,因为 Broker 会进行优化,有时会批量一次性 confirm ,有时会分开几条 confirm。

succuss ack

true

耗时:3ms

succuss ack

false

耗时:4ms

或者

succuss ack

true

耗时:3ms

Return 消息机制

Return Listener 用于处理一-些不可路 由的消息!

消息生产者,通过指定一个 Exchange 和 Routingkey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作!

但是在某些情况下,如果我们在发送消息的时候,当前的 exchange 不存在或者指定的路由 key 路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用 Return Listener !

在基础API中有一个关键的配置项:Mandatory:如果为 true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为 false,那么 broker 端自动删除该消息!

Return 消息机制流程图

Return 消息示例

首先我们需要发送三条消息,并且故意将第 0 条消息的 routing Key设置为错误的,让他无法正常路由到消费端。

mandatory 设置为 true 路由不可达的消息会被监听到,不会被自动删除.即channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());

最后添加监听即可监听到不可路由到消费端的消息channel.addReturnListener(ReturnListener r))

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReturnListeningProducer {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

String exchangeName = "test_return_exchange";

String routingKey = "item.update";

String errRoutingKey = "error.update";

//指定消息的投递模式:confirm 确认模式

channel.confirmSelect();

//发送

for (int i = 0; i < 3 ; i++) {

String msg = "this is return——listening msg ";

//@param mandatory 设置为 true 路由不可达的消息会被监听到,不会被自动删除

if (i == 0) {

channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());

} else {

channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());

}

System.out.println("Send message : " + msg);

}

//添加一个确认监听, 这里就不关闭连接了,为了能保证能收到监听消息

channel.addConfirmListener(new ConfirmListener() {

/**

* 返回成功的回调函数

*/

public void handleAck(long deliveryTag, boolean multiple) throws IOException {

System.out.println("succuss ack");

}

/**

* 返回失败的回调函数

*/

public void handleNack(long deliveryTag, boolean multiple) throws IOException {

System.out.printf("defeat ack");

}

});

//添加一个 return 监听

channel.addReturnListener(new ReturnListener() {

public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("return relyCode: " + replyCode);

System.out.println("return replyText: " + replyText);

System.out.println("return exchange: " + exchange);

System.out.println("return routingKey: " + routingKey);

System.out.println("return properties: " + properties);

System.out.println("return body: " + new String(body));

}

});

}

}

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReturnListeningConsumer {

public static void main(String[] args) throws Exception {

//1. 创建一个 ConnectionFactory 并进行设置

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

factory.setAutomaticRecoveryEnabled(true);

factory.setNetworkRecoveryInterval(3000);

//2. 通过连接工厂来创建连接

Connection connection = factory.newConnection();

//3. 通过 Connection 来创建 Channel

Channel channel = connection.createChannel();

//4. 声明

String exchangeName = "test_return_exchange";

String queueName = "test_return_queue";

String routingKey = "item.#";

channel.exchangeDeclare(exchangeName, "topic", true, false, null);

channel.queueDeclare(queueName, false, false, false, null);

//一般不用代码绑定,在管理界面手动绑定

channel.queueBind(queueName, exchangeName, routingKey);

//5. 创建消费者并接收消息

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte[] body)

throws IOException {

String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");

}

};

//6. 设置 Channel 消费者绑定队列

channel.basicConsume(queueName, true, consumer);

}

}

我们只关注生产端结果,消费端只收到两条消息。

Send message : this is return——listening msg

Send message : this is return——listening msg

Send message : this is return——listening msg

return relyCode: 312

return replyText: NO_ROUTE

return exchange: test_return_exchange

return routingKey: error.update

return properties: #contentHeader(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)

return body: this is return——listening msg

succuss ack

succuss ack

succuss ack

消费端 Ack 和 Nack 机制

消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿!如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功!消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker!一般我们在实际应用中,都会关闭重回队列,也就是设置为False。

参考 api

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

void basicAck(long deliveryTag, boolean multiple) throws IOException;

如何设置手动 Ack 、Nack 以及重回队列

首先我们发送五条消息,将每条消息对应的循环下标 i 放入消息的 properties 中作为标记,以便于我们在后面的回调方法中识别。

其次, 我们将消费端的 ·channel.basicConsume(queueName, false, consumer); 中的 autoAck属性设置为 false,如果设置为true的话 将会正常输出五条消息。

我们通过 Thread.sleep(2000)来延时一秒,用以看清结果。我们获取到properties中的num之后,通过channel.basicNack(envelope.getDeliveryTag(), false, true);将 num为0的消息设置为 nack,即消费失败,并且将 requeue属性设置为true,即消费失败的消息重回队列末端。

import com.rabbitmq.client.*;

import java.util.HashMap;

import java.util.Map;

public class AckAndNackProducer {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

String exchangeName = "test_ack_exchange";

String routingKey = "item.update";

String msg = "this is ack msg";

for (int i = 0; i < 5; i++) {

Map headers = new HashMap();

headers.put("num" ,i);

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()

.deliveryMode(2)

.headers(headers)

.build();

String tem = msg + ":" + i;

channel.basicPublish(exchangeName, routingKey, true, properties, tem.getBytes());

System.out.println("Send message : " + msg);

}

channel.close();

connection.close();

}

}

import com.rabbitmq.client.*;

import java.io.IOException;

public class AckAndNackConsumer {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

factory.setAutomaticRecoveryEnabled(true);

factory.setNetworkRecoveryInterval(3000);

Connection connection = factory.newConnection();

final Channel channel = connection.createChannel();

String exchangeName = "test_ack_exchange";

String queueName = "test_ack_queue";

String routingKey = "item.#";

channel.exchangeDeclare(exchangeName, "topic", true, false, null);

channel.queueDeclare(queueName, false, false, false, null);

//一般不用代码绑定,在管理界面手动绑定

channel.queueBind(queueName, exchangeName, routingKey);

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte[] body)

throws IOException {

String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

e.printStackTrace();

}

if ((Integer) properties.getHeaders().get("num") == 0) {

channel.basicNack(envelope.getDeliveryTag(), false, true);

} else {

channel.basicAck(envelope.getDeliveryTag(), false);

}

}

};

//6. 设置 Channel 消费者绑定队列

channel.basicConsume(queueName, false, consumer);

}

}

我们此处只关心消费端输出,可以看到第 0 条消费失败重新回到队列尾部消费。

[x] Received 'this is ack msg:1'

[x] Received 'this is ack msg:2'

[x] Received 'this is ack msg:3'

[x] Received 'this is ack msg:4'

[x] Received 'this is ack msg:0'

[x] Received 'this is ack msg:0'

[x] Received 'this is ack msg:0'

[x] Received 'this is ack msg:0'

[x] Received 'this is ack msg:0'

java确认rabbitmq_RabbitMQ 消息确认机制相关推荐

  1. springboot + rabbitmq 用了消息确认机制,感觉掉坑里了

    最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人 ...

  2. RabbitMQ—发布消息确认和消费消息确认

    目录 序言 消息发布流程 发布消息确认 一.事务使用 二.Confirm发送方确认模式 方式一:普通Confirm模式 方式二:批量Confirm模式 方式三:异步Confirm模式 扩展知识 消费消 ...

  3. Java笔记-RabbitMQ的消息确认机制(事务)

    目录 基本概念 代码与实例 基本概念 消息应答与消息持久化,如下代码: boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAc ...

  4. Java短信确认机制_JAVA 消息确认机制之 ACK 模式

    JAVA 消息确认机制之 ACK 模式 CLIENT_ACKNOWLEDGE : 客户端手动确认, 这就意味着 AcitveMQ 将不会 "自作主张" 的为你 ACK 任何消息, ...

  5. activemq 消息阻塞优化和消息确认机制优化

    一.消息阻塞优化 1.activemq消费者在从待消费队列中获取消息是会先进行预读取,默认是1000条(prefetch=1000).这样很容易造成消息积压. 2.可以通过设置prefetch的默认值 ...

  6. RabbitMQ(八):SpringBoot 整合 RabbitMQ(三种消息确认机制以及消费端限流)

    说明 本文 SpringBoot 与 RabbitMQ 进行整合的时候,包含了三种消息的确认模式,如果查询详细的确认模式设置,请阅读:RabbitMQ的三种消息确认模式 同时消费端也采取了限流的措施, ...

  7. Storm编程入门API系列之Storm的可靠性的ACK消息确认机制

    概念,见博客 Storm概念学习系列之storm的可靠性  什么业务场景需要storm可靠性的ACK确认机制? 答:想要保住数据不丢,或者保住数据总是被处理.即若没被处理的,得让我们知道. publi ...

  8. RabbitMQ消息确认机制

    文章目录 1. 事务机制 2. Confirm模式 2.1 生产者 2.1.1 普通Confirm模式 2.1.2 批量Confirm模式 2.1.3 异步Confirm模式 2.2 消费者 3. 其 ...

  9. RabbitMQ 基本消息模型和消息确认机制

    ​01 前言 关于 RabbitMQ 服务器的安装,本章节不做介绍,请培养个人动手能力,自行百度解决.RabbitMQ 成功安装后(win 版),浏览器输入:localhost:15672,则可以进入 ...

最新文章

  1. HDU2544(Dijstra算法)
  2. 初学flex时候搞得一个大头贴工具(开源)
  3. python拆堆和堆叠的操作_python - 如何合并不同的DFS并堆叠值? - 堆栈内存溢出
  4. android beta项目官方页面,安卓7.0开发者预览版如何安装?Android Beta项目正式上线...
  5. linux ping 虚拟网卡_虚拟机中Linux系统网卡的配置
  6. 输入控件控制输入限制
  7. 【书海泛舟】伤心咖啡馆之歌
  8. postman使用之二:数据同步和创建测试集
  9. Pytest学习-如何在用例代码中调用fixtrue时传入参数
  10. python使用-如何在Windows上使用Python进行开发
  11. java证书 查看cacer_R 语言关于 SSL 证书异常处理笔记
  12. python基于paramiko模块实现远程连接Linux虚拟机(服务器)并执行指定命令返回输出结果
  13. web_submit_data详解
  14. iOS16 系统更新教程,测试版描述文件下载
  15. php实现金币提现,PHP调用支付宝转账接口实现支付宝提现
  16. Python数据分析之智联招聘职位分析完整项目(数据爬取,数据分析,数据可视化)
  17. 随机过程基础1--随机过程与宽平稳
  18. ie地址栏不能识别中文参数(google浏览器是正常的)
  19. win10pin不可用进不去系统_人脸识别门禁控制系统+安检通道
  20. mysql查询同名同姓重名人数,查全国同名同姓人数,姓名重名查询系统全国

热门文章

  1. 具有ESB,API管理和Now ..服务网格的应用程序网络功能。
  2. Java EE与Java SE:Oracle是否放弃了企业软件?
  3. jframe透明_使JFrame透明
  4. 使用tinylog 1.1改进您在Java EE应用程序中的登录
  5. restful web_RESTful Web服务可发现性,第4部分
  6. RIP GlassFish –感谢所有的鱼。
  7. JavaFX 2.0布局窗格– HBox和VBox
  8. 我如何向团队解释依赖注入
  9. Tomcat 7上具有RESTeasy JAX-RS的RESTful Web服务-Eclipse和Maven项目
  10. php注册页面模板,选项卡式WordPress登陆注册模板