【消息队列之rabbitmq】Rabbitmq之消息可靠性投递和ACK机制实战
目录
- 一、绪论
- 二、生产者
- 2.1事务机制
- 2.2confirm模式
- 串行模式
- 批量模式
- 异步模式
- 三、消费者
- 3.1手动ACK
一、绪论
上篇文章介绍了rabbitmq的基本知识、交换机类型实战《【消息队列之rabbitmq】学习RabbitMQ必备品之一》
这篇文章主要围绕着消息确认机制为中心,展开实战;接触过消息中间件的伙伴都知道,消息会存在以下问题:
1、消息丢失问题和可靠性投递问题;
2、消息如何保证顺序消费;
3、消息如何保证幂等性问题,即重复消费问题等等…
本文主要以Rabbitmq消息中间件解决问题一的实践,其他问题小编会重新写文章总结;
故从业务代码设计层面,我们需要保证生产者发送消息可靠性投递到MQ中间件中,其次保证消费者可以从MQ中获取消息并消费成功;
二、生产者
从生产者角度控制消息的可靠性投递实践;rabbitmq提供了以下方式:事务机制和confirm机制
;
其他的工具类等相关代码,请移步到《【消息队列之rabbitmq】学习RabbitMQ必备品之一》
2.1事务机制
基础知识:
事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
声明启动事务模式:channel.txSelect()
提交事务:channel.txComment()
回滚事务:channel.txRollback()
实践代码
生产者端代码如下:
package com.itwx.mq.tx;import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class ProviderTx {private static final String QUEUE_NAME = "test_tx_queue";public static void main(String[] args) {Connection connection = null;Channel channel = null;try {// 获取到连接connection = ConnectionUtil.getConnection();// 获取通道channel = connection.createChannel();/*** 声明一个队列。* 参数一:队列名称* 参数二:是否持久化* 参数三:是否排外 如果排外则这个队列只允许有一个消费者* 参数四:是否自动删除队列,如果为true表示没有消息也没有消费者连接自动删除队列* 参数五:队列的附加属性* 注意:* 1.声明队列时,如果已经存在则放弃声明,如果不存在则会声明一个新队列;* 2.队列名可以任意取值,但需要与消息接收者一致。* 3.下面的代码可有可无,一定在发送消息前确认队列名称已经存在RabbitMQ中,否则消息会发送失败。*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 启动事务,必须用txCommit()或者txRollback()回滚channel.txSelect();// 假设这里处理业务逻辑String message = "hello, tx message!";/*** 发送消息到MQ* 参数一:交换机名称,为""表示不用交换机* 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey* 参数三:消息的属性信息* 参数四:消息内容的字节数组*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());/*** 提交事务之前,如果生产者发生异常,则消息会被回滚;* 但是事务此种模式,无法解决broker宕机问题,导致生产者误以为消息已经发送成功;*///todo 测试异常int i = 1/ 0;// 提交事务channel.txCommit();} catch (Exception e) {e.printStackTrace();} finally {try {if (channel != null) {// 回滚。如果未异常会提交事务,此时回滚无影响channel.txRollback();channel.close();}if (connection != null) {connection.close();}} catch (Exception e) {}}}
}
代码中含有TODO
注释,大家可以结合rabbitmq管理界面,自测生产者事务是否生效等等;
1、业务异常产生,消息回滚测试;
2、生产者无异常产生,测试消息是否发送成功;
缺点:
开始事务属于同步操作,消息发送成功后,生产者端处于阻塞状态,需要等待消息中间件接收消息的响应,降低生产者的吞吐量和性能;
2.2confirm模式
confirm主要存在以下三种方式:
方式一:channel.waitForConfirms()普通发送方确认模式(串行模式);
方式二:channel.waitForConfirmsOrDie()批量确认模式;
方式三:channel.addConfirmListener()异步监听发送方确认模式;
使用confirm模式,大家可以考虑一下如果消息发送失败之后,如何处理补偿机制重新发送?redis+定时任务
串行模式
串行模式:producer每发送一条消息后,调用waitForConfirms()方法,等待broker端confirm,如果服务器端返回false或者在超时时间内未返回,客户端进行消息重传;
1、启动生产者确认模式channel.confirmSelect();
2、等待消息中间件响应结果channel.waitForConfirms();
3、处理返回结果或者捕获异常,触发补偿任务;
- 生产者代码
package com.itwx.mq.confirm;import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;public class ProviderConfirm {private static final String QUEUE_NAME = "test_one_confirm_queue";public static void main(String[] args) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 启动发送者确认模式channel.confirmSelect();String message = "hello,message! confirmSelect";/*** 发送消息到MQ* 参数一:交换机名称,为""表示不用交换机* 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey* 参数三:消息的属性信息* 参数四:消息内容的字节数组*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());// 阻塞线程,等待服务器返回响应。该方法可以指定一个等待时间,发送成功返回true,否则返回falseboolean sendResult = channel.waitForConfirms();if (sendResult) {System.out.print("发送成功");}/*** 存在异常情况,需要补偿机制:* 1、消息发送失败,即返回false;* 2、channel.waitForConfirms 可能返回超时异常* 解决方案:重试几次发送或者利用redis+定时任务来完成补发*/channel.close();connection.close();}
}
批量模式
批量模式:producer每发送一批消息后,调用waitForConfirmsOrDie()方法,而此种模式方法无返回值,只能根据异常进行判断。如果确认失败会抛出IOException和InterruptedException。源码如下:
void waitForConfirmsOrDie() throws IOException, InterruptedException;
此外注意,写测试demo时,由于存在消息延迟等现象,故发送消息结束之后,主线程休眠5000s或者更多,之后再关闭信道连接;
- 生产者代码
package com.itwx.mq.confirm;import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.util.concurrent.TimeUnit;public class ProviderBatchConfirm {private static final String QUEUE_NAME = "test_batch_confirm_queue";public static void main(String[] args) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 启动发送者确认模式channel.confirmSelect();String message = "hello,message! confirmSelect";/*** 发送消息到MQ* 参数一:交换机名称,为""表示不用交换机* 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey* 参数三:消息的属性信息* 参数四:消息内容的字节数组*/for (int i = 1; i<=5; i++) {channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("发送第" + i +"条消息成功");}// 阻塞线程,等待服务器返回响应。该方法可以指定一个等待时间。该方法无返回值,只能根据抛出的异常进行判断。try {channel.waitForConfirmsOrDie();} catch (Exception e) {e.printStackTrace();}TimeUnit.SECONDS.sleep(5000);//TODO,补偿机制只能依赖于捕获超时异常进行消息补发;channel.close();connection.close();}
}
异步模式
异步模式,开发者可以定义ConfirmListener
实现类处理消息发送成功或者失败情况,重写handleNack
和handleAck
方法;
handleNack()
:消息接收失败的通知方法,开发者可以在这里重新投递消息;
handleAck()
:消息发送成功之前,需要把消息先存起来,比如用KV存储,接收到ack后删除;
- 生产者代码
package com.itwx.mq.confirm;import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.util.concurrent.TimeUnit;public class ProviderAsyncConfirm {private static final String QUEUE_NAME = "test_async_confirm_queue";public static void main(String[] args) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 启动发送者确认模式channel.confirmSelect();String message = "hello,message! confirmSelect";/*** 发送消息到MQ* 参数一:交换机名称,为""表示不用交换机* 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey* 参数三:消息的属性信息* 参数四:消息内容的字节数组*/for (int i = 1; i<=5; i++) {channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("发送第" + i +"条消息成功");}//异步监听确认和未确认的消息channel.addConfirmListener(new ConfirmListener() {/*** 消息没有确认的回调方法* 参数一:没有确认的消息的编号* 参数二: 是否没有确认多个*/@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {//消息接收失败的通知方法,用户可以在这里重新投递消息System.out.println(String.format("未确认消息,序号:%d,是否多个消息:%b", deliveryTag, multiple));}/*** 消息确认后回调* 参数一: 确认的消息的编号,从1开始递增* 参数二: 当前消息是否同时确认了多个* 消息确认有可能是批量确认的,是否批量确认在于返回的multiple的参数,此参数为bool值,如果true表示批量执行了deliveryTag这个值以前的所有消息,如果为false的话表示单条确认*/@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {//发送端投递消息前,需要把消息先存起来,比如用KV存储,接收到ack后删除System.out.println(String.format("确认消息,序号:%d,是否多个消息:%b", deliveryTag, multiple));}});//主线程休眠,等待异步回调消息TimeUnit.SECONDS.sleep(10000);channel.close();connection.close();}}
三、消费者
3.1手动ACK
如果触发手动ACK机制,需要改动以下东西:
- 将自动ACK改为false;
/*** 1、设置成手动ACK,即使消费者已经获取了消息,但是未及时ACK回复生产者,然后消费者宕机,消息队列会认为该消费未被消息;故此种情况会存在重复消费的情况;* 2、设置成手动ACK,即使消费者发生异常或者宕机情况,保证消息不丢失;*//*** 参数明细:* 参数:String queue, boolean autoAck, Consumer callback* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复* 3、callback,消费方法,当消费者接收到消息要执行的方法*/channel.basicConsume(QUEUE_NAME, false, consumer);
考虑以下情况:
1、若未设置手动ACK,消费者获取消息后,发生异常,会发生什么情况?(消息丢失)
2、若设置手动ACK,消费者发生异常,会发生什么情况?(未消费状态)
3、设置手动ACK,消费者宕机,未即使发送ACK确认回调,会发生什么情况?(已消费,未确认)
(消息中间件会将消息标记为待确认状态,不会被重复消息);若再想消费该消息,重启消费者,消息中间件会将该消息标记为待消费状态(从unacked->ready)小编demo写了
TODO
测试用例,注意测试消费者代码
package com.itwx.mq.ack;import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;/*** @author wangxuan* @date 2021/12/15 11:44 上午* @describe*/
public class ConsumerACK {private final static String QUEUE_NAME = "wx_test_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();// 声明队列/*** 参数明细* 参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);//实现消费方法DefaultConsumer consumer = new DefaultConsumer(channel){// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用/*** 当接收到消息后此方法将被调用* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//TODO 手动抛异常,造成消息丢失现象//测试情况2
// int i= 1 / 0;//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg = new String(body,"utf-8");System.out.println("consumer receive message:" + msg + ",messageId:" + deliveryTag + ",exchange name:" + exchange);/*** 考虑以下情况:* 1、若未设置手动ACK,消费者获取消息后,发生异常,会发生什么情况?(消息丢失)* 2、若设置手动ACK,消费者发生异常,会发生什么情况?(未消费状态)* 3、设置手动ACK,消费者宕机,未即使发送ACK确认回调,会发生什么情况?(已消费,未确认)* (消息中间件会将消息标记为待确认状态,不会被重复消息);若再想消费该消息,重启消费者,消息中间件会将该消息标记为待消费状态(从unacked->ready)**///消息消费成功,手动ACK,//测试情况三,注释
// channel.basicAck(envelope.getDeliveryTag(), false);}};// 监听队列,第二个参数:是否自动进行消息确认。/*** 参数明细:* 参数:String queue, boolean autoAck, Consumer callback* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复* 3、callback,消费方法,当消费者接收到消息要执行的方法*///设置成手动ACK,避免重要消息丢失/*** 1、设置成手动ACK,即使消费者已经获取了消息,但是未及时ACK回复生产者,然后消费者宕机,消息队列会认为该消费未被消息;故此种情况会存在重复消费的情况;* 2、设置成手动ACK,即使消费者发生异常或者宕机情况,保证消息不丢失;*/channel.basicConsume(QUEUE_NAME, false, consumer);}
}
参考资料:
RabbitMQ系列(四)RabbitMQ事务和Confirm发送方消息确认——深入解读
【消息队列之rabbitmq】Rabbitmq之消息可靠性投递和ACK机制实战相关推荐
- RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列
搭建SpringBoot项目,用于演示 springboot版本 <!-- spring boot --><dependency><groupId>org.spri ...
- RabbitMQ:什么是消息队列MQ?为什么使用消息队列MQ?入门MQ先学哪种?(一)
0. 引言 MQ(Message Queue):消息队列,如今在各类业务场景中已经被广泛使用,特别在并发量日益增涨的业务和微服务架构中,消息队列能够帮助我们解决很多传统方式所不能解决的问题. 所以今天 ...
- 消息队列8:RabbitMq的QOS实验
环境: win10 rabbitmq-3.8.8 .net core 3.1 RabbitMQ.Client 6.2.1 vs2019 安装RabbitMq环境参照: window下安装rabbitm ...
- 消息队列的使用场景_消息队列MQ的特点、选型及应用场景
一.什么是消息队列 消息队列(Message Queue,简称MQ),指保存消息的一个容器,本质是个队列. 消息(Message)是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可 ...
- 浅谈消息队列及常见的分布式消息队列中间件
背景 分布式消息队列中间件是是大型分布式系统不可缺少的中间件,通过消息队列,应用程序可以在不知道彼此位置的情况下独立处理消息,或者在处理消息前不需要等待接收此消息.所以消息队列主要解决应用耦合.异步消 ...
- Java架构之消息队列 (一):消息队列的概述
消息队列系列分享大纲: 一.消息队列的概述 二.消息队列之RabbitMQ的使用 三.消息队列之Kafka的使用 四.消息队列之RabbitMQ的原理详解 五.消息队列之Kafka的原理详解 六.消息 ...
- springboot使用redis实现消息队列功能,redis使用list和stream实现消息队列功能,redis实现消息队列的风险点分析
文章目录 写在前面 基于list的消息队列解决方案 使用list基本实现消息队列 阻塞式消费,避免性能损失 替换while(true) 实现消息幂等 保证消息可靠性 基于stream的消息队列解决方案 ...
- 消息队列中间件之RabbitMQ(上)
文章目录 1.MQ引言 1.1 什么是MQ 1.2 主流MQ以及其特点 ActiveMQ Kafka RocketMQ RabbitMQ 1.3 MQ的作用 2.RabbitMQ 的引言 2.1 Ra ...
- 深入浅出消息队列---5、RabbitMQ公平性保证
RabbitMQ公平性保证 消息的可靠性传输可以保证秒杀业务的公平性.关于秒杀业务的公平性,我们还需要考虑一点:消息的顺序性(先进入队列的消息先进行处理) RabbitMQ消息顺序性 顺序性:消息的顺 ...
最新文章
- (C++)寻找1-100以内所有素数,复杂度为O(nsqrt(n))与O(nloglogn)的两种方法
- 【 Linux 】Vim的基本配置以及出现问题解决(su认证失败)
- 精通GridView(C#) (二)
- Windows Updateエラー 80072EE2
- Android 从AndroidManifest获取meta-data
- iis mysql5.7_手动配置网站环境 IIS 10+PHP 7.1+MySQL 5.7
- Ubuntu 8.04 Linux + Apache2 + MySQL5 + PHP + Tomcat5.5 整合安装
- watch取消配对怎么重新配对_一文看懂智慧LED灯杆屏怎么配对
- The engine “node“ is incompatible with this module
- 产品01]-产品经理初步认知-产品经理定义/职责/分类
- 数据库原理及应用教程课后习题参考答案
- HTTP报文格式详解
- 天玑720可以升级鸿蒙系统吗,华为高管:来岁年初就能用上鸿蒙体系,55部华为产物可升级鸿蒙...
- echarts折线颜色渐变
- 解决“无法访问。您可能没有权限使用网络资源。请与这台服务器的管理员联系以查明您是否有权限访问”的问题
- 职场新人收集的前辈经验
- 深入理解计算机系统_3e 第四章家庭作业(部分) CS:APP3e chapter 4 homework
- 我对锤子ROM 功能的看法——功能篇
- 数学规划模型总结(附MatLab代码)
- 没有预算的新媒体运营如何启动?