目录

  • 一、绪论
  • 二、生产者
    • 2.1事务机制
    • 2.2confirm模式
      • 串行模式
      • 批量模式
      • 异步模式
  • 三、消费者
    • 3.1手动ACK

一、绪论

上篇文章介绍了rabbitmq的基本知识、交换机类型实战《【消息队列之rabbitmq】学习RabbitMQ必备品之一》
这篇文章主要围绕着消息确认机制为中心,展开实战;接触过消息中间件的伙伴都知道,消息会存在以下问题:
1、消息丢失问题和可靠性投递问题;
2、消息如何保证顺序消费;
3、消息如何保证幂等性问题,即重复消费问题等等…
本文主要以Rabbitmq消息中间件解决问题一的实践,其他问题小编会重新写文章总结;

故从业务代码设计层面,我们需要保证生产者发送消息可靠性投递到MQ中间件中,其次保证消费者可以从MQ中获取消息并消费成功;

二、生产者

从生产者角度控制消息的可靠性投递实践;rabbitmq提供了以下方式:事务机制和confirm机制;
其他的工具类等相关代码,请移步到《【消息队列之rabbitmq】学习RabbitMQ必备品之一》

2.1事务机制

  • 基础知识:
    事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
    声明启动事务模式:channel.txSelect()
    提交事务:channel.txComment()
    回滚事务:channel.txRollback()

  • 实践代码
    生产者端代码如下:

package com.itwx.mq.tx;import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class ProviderTx {private static final String QUEUE_NAME = "test_tx_queue";public static void main(String[] args) {Connection connection = null;Channel channel = null;try {// 获取到连接connection = ConnectionUtil.getConnection();// 获取通道channel = connection.createChannel();/*** 声明一个队列。* 参数一:队列名称* 参数二:是否持久化* 参数三:是否排外  如果排外则这个队列只允许有一个消费者* 参数四:是否自动删除队列,如果为true表示没有消息也没有消费者连接自动删除队列* 参数五:队列的附加属性* 注意:* 1.声明队列时,如果已经存在则放弃声明,如果不存在则会声明一个新队列;* 2.队列名可以任意取值,但需要与消息接收者一致。* 3.下面的代码可有可无,一定在发送消息前确认队列名称已经存在RabbitMQ中,否则消息会发送失败。*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 启动事务,必须用txCommit()或者txRollback()回滚channel.txSelect();// 假设这里处理业务逻辑String message = "hello, tx message!";/*** 发送消息到MQ* 参数一:交换机名称,为""表示不用交换机* 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey* 参数三:消息的属性信息* 参数四:消息内容的字节数组*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());/*** 提交事务之前,如果生产者发生异常,则消息会被回滚;* 但是事务此种模式,无法解决broker宕机问题,导致生产者误以为消息已经发送成功;*///todo 测试异常int i = 1/ 0;// 提交事务channel.txCommit();} catch (Exception e) {e.printStackTrace();} finally {try {if (channel != null) {// 回滚。如果未异常会提交事务,此时回滚无影响channel.txRollback();channel.close();}if (connection != null) {connection.close();}} catch (Exception e) {}}}
}

代码中含有TODO注释,大家可以结合rabbitmq管理界面,自测生产者事务是否生效等等;
1、业务异常产生,消息回滚测试;
2、生产者无异常产生,测试消息是否发送成功;
缺点
开始事务属于同步操作,消息发送成功后,生产者端处于阻塞状态,需要等待消息中间件接收消息的响应,降低生产者的吞吐量和性能;

2.2confirm模式

confirm主要存在以下三种方式:
方式一:channel.waitForConfirms()普通发送方确认模式(串行模式);
方式二:channel.waitForConfirmsOrDie()批量确认模式;
方式三:channel.addConfirmListener()异步监听发送方确认模式;
使用confirm模式,大家可以考虑一下如果消息发送失败之后,如何处理补偿机制重新发送?redis+定时任务

串行模式

串行模式:producer每发送一条消息后,调用waitForConfirms()方法,等待broker端confirm,如果服务器端返回false或者在超时时间内未返回,客户端进行消息重传;

1、启动生产者确认模式channel.confirmSelect();
2、等待消息中间件响应结果channel.waitForConfirms();
3、处理返回结果或者捕获异常,触发补偿任务;

  • 生产者代码
package com.itwx.mq.confirm;import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;public class ProviderConfirm {private static final String QUEUE_NAME = "test_one_confirm_queue";public static void main(String[] args) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 启动发送者确认模式channel.confirmSelect();String message = "hello,message! confirmSelect";/*** 发送消息到MQ* 参数一:交换机名称,为""表示不用交换机* 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey* 参数三:消息的属性信息* 参数四:消息内容的字节数组*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());// 阻塞线程,等待服务器返回响应。该方法可以指定一个等待时间,发送成功返回true,否则返回falseboolean sendResult = channel.waitForConfirms();if (sendResult) {System.out.print("发送成功");}/*** 存在异常情况,需要补偿机制:* 1、消息发送失败,即返回false;* 2、channel.waitForConfirms 可能返回超时异常* 解决方案:重试几次发送或者利用redis+定时任务来完成补发*/channel.close();connection.close();}
}

批量模式

批量模式:producer每发送一批消息后,调用waitForConfirmsOrDie()方法,而此种模式方法无返回值,只能根据异常进行判断。如果确认失败会抛出IOException和InterruptedException。源码如下:

void waitForConfirmsOrDie() throws IOException, InterruptedException;

此外注意,写测试demo时,由于存在消息延迟等现象,故发送消息结束之后,主线程休眠5000s或者更多,之后再关闭信道连接;

  • 生产者代码
package com.itwx.mq.confirm;import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.util.concurrent.TimeUnit;public class ProviderBatchConfirm {private static final String QUEUE_NAME = "test_batch_confirm_queue";public static void main(String[] args) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 启动发送者确认模式channel.confirmSelect();String message = "hello,message! confirmSelect";/*** 发送消息到MQ* 参数一:交换机名称,为""表示不用交换机* 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey* 参数三:消息的属性信息* 参数四:消息内容的字节数组*/for (int i = 1; i<=5; i++) {channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("发送第" + i +"条消息成功");}// 阻塞线程,等待服务器返回响应。该方法可以指定一个等待时间。该方法无返回值,只能根据抛出的异常进行判断。try {channel.waitForConfirmsOrDie();} catch (Exception e) {e.printStackTrace();}TimeUnit.SECONDS.sleep(5000);//TODO,补偿机制只能依赖于捕获超时异常进行消息补发;channel.close();connection.close();}
}

异步模式

异步模式,开发者可以定义ConfirmListener实现类处理消息发送成功或者失败情况,重写handleNackhandleAck方法;
handleNack():消息接收失败的通知方法,开发者可以在这里重新投递消息;
handleAck():消息发送成功之前,需要把消息先存起来,比如用KV存储,接收到ack后删除;

  • 生产者代码
package com.itwx.mq.confirm;import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.util.concurrent.TimeUnit;public class ProviderAsyncConfirm {private static final String QUEUE_NAME = "test_async_confirm_queue";public static void main(String[] args) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 启动发送者确认模式channel.confirmSelect();String message = "hello,message! confirmSelect";/*** 发送消息到MQ* 参数一:交换机名称,为""表示不用交换机* 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey* 参数三:消息的属性信息* 参数四:消息内容的字节数组*/for (int i = 1; i<=5; i++) {channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("发送第" + i +"条消息成功");}//异步监听确认和未确认的消息channel.addConfirmListener(new ConfirmListener() {/*** 消息没有确认的回调方法* 参数一:没有确认的消息的编号* 参数二: 是否没有确认多个*/@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {//消息接收失败的通知方法,用户可以在这里重新投递消息System.out.println(String.format("未确认消息,序号:%d,是否多个消息:%b", deliveryTag, multiple));}/*** 消息确认后回调* 参数一: 确认的消息的编号,从1开始递增* 参数二: 当前消息是否同时确认了多个* 消息确认有可能是批量确认的,是否批量确认在于返回的multiple的参数,此参数为bool值,如果true表示批量执行了deliveryTag这个值以前的所有消息,如果为false的话表示单条确认*/@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {//发送端投递消息前,需要把消息先存起来,比如用KV存储,接收到ack后删除System.out.println(String.format("确认消息,序号:%d,是否多个消息:%b", deliveryTag, multiple));}});//主线程休眠,等待异步回调消息TimeUnit.SECONDS.sleep(10000);channel.close();connection.close();}}

三、消费者

3.1手动ACK

如果触发手动ACK机制,需要改动以下东西:

  • 将自动ACK改为false;
/*** 1、设置成手动ACK,即使消费者已经获取了消息,但是未及时ACK回复生产者,然后消费者宕机,消息队列会认为该消费未被消息;故此种情况会存在重复消费的情况;* 2、设置成手动ACK,即使消费者发生异常或者宕机情况,保证消息不丢失;*//*** 参数明细:* 参数:String queue, boolean autoAck, Consumer callback* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复* 3、callback,消费方法,当消费者接收到消息要执行的方法*/channel.basicConsume(QUEUE_NAME, false, consumer);
  • 考虑以下情况
    1、若未设置手动ACK,消费者获取消息后,发生异常,会发生什么情况?(消息丢失)
    2、若设置手动ACK,消费者发生异常,会发生什么情况?(未消费状态)
    3、设置手动ACK,消费者宕机,未即使发送ACK确认回调,会发生什么情况?(已消费,未确认)
    (消息中间件会将消息标记为待确认状态,不会被重复消息);若再想消费该消息,重启消费者,消息中间件会将该消息标记为待消费状态(从unacked->ready)

  • 小编demo写了TODO测试用例,注意测试

  • 消费者代码

package com.itwx.mq.ack;import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;/*** @author wangxuan* @date 2021/12/15 11:44 上午* @describe*/
public class ConsumerACK {private final static String QUEUE_NAME = "wx_test_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();// 声明队列/*** 参数明细* 参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);//实现消费方法DefaultConsumer consumer = new DefaultConsumer(channel){// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用/*** 当接收到消息后此方法将被调用* @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//TODO 手动抛异常,造成消息丢失现象//测试情况2
//                int i= 1 / 0;//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg = new String(body,"utf-8");System.out.println("consumer receive message:" + msg + ",messageId:" + deliveryTag + ",exchange name:" + exchange);/*** 考虑以下情况:* 1、若未设置手动ACK,消费者获取消息后,发生异常,会发生什么情况?(消息丢失)* 2、若设置手动ACK,消费者发生异常,会发生什么情况?(未消费状态)* 3、设置手动ACK,消费者宕机,未即使发送ACK确认回调,会发生什么情况?(已消费,未确认)*          (消息中间件会将消息标记为待确认状态,不会被重复消息);若再想消费该消息,重启消费者,消息中间件会将该消息标记为待消费状态(从unacked->ready)**///消息消费成功,手动ACK,//测试情况三,注释
//                channel.basicAck(envelope.getDeliveryTag(), false);}};// 监听队列,第二个参数:是否自动进行消息确认。/*** 参数明细:* 参数:String queue, boolean autoAck, Consumer callback* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复* 3、callback,消费方法,当消费者接收到消息要执行的方法*///设置成手动ACK,避免重要消息丢失/*** 1、设置成手动ACK,即使消费者已经获取了消息,但是未及时ACK回复生产者,然后消费者宕机,消息队列会认为该消费未被消息;故此种情况会存在重复消费的情况;* 2、设置成手动ACK,即使消费者发生异常或者宕机情况,保证消息不丢失;*/channel.basicConsume(QUEUE_NAME, false, consumer);}
}

参考资料:
RabbitMQ系列(四)RabbitMQ事务和Confirm发送方消息确认——深入解读

【消息队列之rabbitmq】Rabbitmq之消息可靠性投递和ACK机制实战相关推荐

  1. RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列

    搭建SpringBoot项目,用于演示 springboot版本 <!-- spring boot --><dependency><groupId>org.spri ...

  2. RabbitMQ:什么是消息队列MQ?为什么使用消息队列MQ?入门MQ先学哪种?(一)

    0. 引言 MQ(Message Queue):消息队列,如今在各类业务场景中已经被广泛使用,特别在并发量日益增涨的业务和微服务架构中,消息队列能够帮助我们解决很多传统方式所不能解决的问题. 所以今天 ...

  3. 消息队列8:RabbitMq的QOS实验

    环境: win10 rabbitmq-3.8.8 .net core 3.1 RabbitMQ.Client 6.2.1 vs2019 安装RabbitMq环境参照: window下安装rabbitm ...

  4. 消息队列的使用场景_消息队列MQ的特点、选型及应用场景

    一.什么是消息队列 消息队列(Message Queue,简称MQ),指保存消息的一个容器,本质是个队列. 消息(Message)是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可 ...

  5. 浅谈消息队列及常见的分布式消息队列中间件

    背景 分布式消息队列中间件是是大型分布式系统不可缺少的中间件,通过消息队列,应用程序可以在不知道彼此位置的情况下独立处理消息,或者在处理消息前不需要等待接收此消息.所以消息队列主要解决应用耦合.异步消 ...

  6. Java架构之消息队列 (一):消息队列的概述

    消息队列系列分享大纲: 一.消息队列的概述 二.消息队列之RabbitMQ的使用 三.消息队列之Kafka的使用 四.消息队列之RabbitMQ的原理详解 五.消息队列之Kafka的原理详解 六.消息 ...

  7. springboot使用redis实现消息队列功能,redis使用list和stream实现消息队列功能,redis实现消息队列的风险点分析

    文章目录 写在前面 基于list的消息队列解决方案 使用list基本实现消息队列 阻塞式消费,避免性能损失 替换while(true) 实现消息幂等 保证消息可靠性 基于stream的消息队列解决方案 ...

  8. 消息队列中间件之RabbitMQ(上)

    文章目录 1.MQ引言 1.1 什么是MQ 1.2 主流MQ以及其特点 ActiveMQ Kafka RocketMQ RabbitMQ 1.3 MQ的作用 2.RabbitMQ 的引言 2.1 Ra ...

  9. 深入浅出消息队列---5、RabbitMQ公平性保证

    RabbitMQ公平性保证 消息的可靠性传输可以保证秒杀业务的公平性.关于秒杀业务的公平性,我们还需要考虑一点:消息的顺序性(先进入队列的消息先进行处理) RabbitMQ消息顺序性 顺序性:消息的顺 ...

最新文章

  1. (C++)寻找1-100以内所有素数,复杂度为O(nsqrt(n))与O(nloglogn)的两种方法
  2. 【 Linux 】Vim的基本配置以及出现问题解决(su认证失败)
  3. 精通GridView(C#) (二)
  4. Windows Updateエラー 80072EE2
  5. Android 从AndroidManifest获取meta-data
  6. iis mysql5.7_手动配置网站环境 IIS 10+PHP 7.1+MySQL 5.7
  7. Ubuntu 8.04 Linux + Apache2 + MySQL5 + PHP + Tomcat5.5 整合安装
  8. watch取消配对怎么重新配对_一文看懂智慧LED灯杆屏怎么配对
  9. The engine “node“ is incompatible with this module
  10. 产品01]-产品经理初步认知-产品经理定义/职责/分类
  11. 数据库原理及应用教程课后习题参考答案
  12. HTTP报文格式详解
  13. 天玑720可以升级鸿蒙系统吗,华为高管:来岁年初就能用上鸿蒙体系,55部华为产物可升级鸿蒙...
  14. echarts折线颜色渐变
  15. 解决“无法访问。您可能没有权限使用网络资源。请与这台服务器的管理员联系以查明您是否有权限访问”的问题
  16. 职场新人收集的前辈经验
  17. 深入理解计算机系统_3e 第四章家庭作业(部分) CS:APP3e chapter 4 homework
  18. 我对锤子ROM 功能的看法——功能篇
  19. 数学规划模型总结(附MatLab代码)
  20. 没有预算的新媒体运营如何启动?

热门文章

  1. 记一次centos 6 x64位系统修复过程
  2. 【转】DXErrorProvider与DxValidationProvider的使用
  3. SQL:清空数据库所有数据
  4. 利用 S3-tests 测试 S3 接口兼容性
  5. ZooKeeper(二) idea中使用Java操作zookeeper
  6. 绘制等压面图_等压面练习
  7. PHP三维数组变一维
  8. Nacos长连接诉求分析
  9. json转string工具_不要再重复造轮子了,这款开源工具类库贼好使!
  10. Python精通-Python列表操作