1. 本篇概要

其实,还有1种场景需要考虑:当消费者接收到消息后,还没处理完业务逻辑,消费者挂掉了,那消息也算丢失了?,比如用户下单,订单中心发送了1个消息到RabbitMQ里的队列,积分中心收到这个消息,准备给这个下单的用户增加20积分,但积分还没增加成功呢,积分中心自己挂掉了,导致数据出现问题。

那么如何解决这种问题呢?

为了保证消息被消费者成功的消费,RabbitMQ提供了消息确认机制(message acknowledgement),本文主要讲解RabbitMQ中,如何使用消息确认机制来保证消息被消费者成功的消费,避免因为消费者突然宕机而引起的消息丢失。

2. 开启显式Ack模式

我们开启一个消费者的代码是这样的:

// 创建队列消费者

com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

System.out.println("Received Message '" + message + "'");

}

};

channel.basicConsume(QUEUE_NAME, true, consumer);

这里的重点是channel.basicConsume(QUEUE_NAME, true, consumer);方法的第2个参数,让我们先看下basicConsume()的源码:

public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException {

return this.basicConsume(queue, autoAck, "", callback);

}

这里的autoAck参数指的是是否自动确认,如果设置为ture,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者接收到消息是否处理成功;如果设置为false,RabbitMQ会等待消费者显式的回复确认信号后才会从内存(或者磁盘)中删除。

建议将autoAck设置为false,这样消费者就有足够的时间处理消息,不用担心处理消息过程中消费者宕机造成消息丢失。

此时,队列里的消息就分成了2个部分:

等待投递给消费者的消息(下图中的Ready部分)

已经投递给消费者,但是还没有收到消费者确认信号的消息(下图中的Unacked部分)

RabbitMQ如何保证队列里的消息99.99%被消费?

如果RabbitMQ一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。

RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开,这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。

为了便于理解,我们举个具体的例子,生产者的话的我们延用上文中的DurableProducer:

package com.zwwhnly.springbootaction.rabbitmq.durable;

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class DurableProducer {

private final static String EXCHANGE_NAME = "durable-exchange";

private final static String QUEUE_NAME = "durable-queue";

public static void main(String[] args) throws IOException, TimeoutException {

// 创建连接

ConnectionFactory factory = new ConnectionFactory();

// 设置 RabbitMQ 的主机名

factory.setHost("localhost");

// 创建一个连接

Connection connection = factory.newConnection();

// 创建一个通道

Channel channel = connection.createChannel();

// 创建一个Exchange

channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

// 发送消息

String message = "durable exchange test";

AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();

channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

// 关闭频道和连接

channel.close();

connection.close();

}

}

然后新建一个消费者AckConsumer类:

package com.zwwhnly.springbootaction.rabbitmq.ack;

import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class AckConsumer {

private final static String QUEUE_NAME = "durable-queue";

public static void main(String[] args) throws IOException, TimeoutException {

// 创建连接

ConnectionFactory factory = new ConnectionFactory();

// 设置 RabbitMQ 的主机名

factory.setHost("localhost");

// 创建一个连接

Connection connection = factory.newConnection();

// 创建一个通道

Channel channel = connection.createChannel();

// 创建队列消费者

com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

int result = 1 / 0;

System.out.println("Received Message '" + message + "'");

}

};

channel.basicConsume(QUEUE_NAME, true, consumer);

}

}

我们先将autoAck参数设置为ture,即自动确认,并在消费消息时故意写个异常,然后先运行生产者客户端将消息写入队列中,然后运行消费者客户端,发现消息未消费成功但是却消失了:

RabbitMQ如何保证队列里的消息99.99%被消费?

RabbitMQ如何保证队列里的消息99.99%被消费?

然后我们将autoAck设置为false:

channel.basicConsume(QUEUE_NAME, false, consumer);

再次运行生产者客户端将消息写入队列中,然后运行消费者客户端,此时虽然消费者客户端仍然代码异常,但是消息仍然在队列中:

RabbitMQ如何保证队列里的消息99.99%被消费?

然后我们删除掉消费者客户端中的异常代码,重新启动消费者客户端,发现消息消费成功了,但是消息一直未Ack:

RabbitMQ如何保证队列里的消息99.99%被消费?

RabbitMQ如何保证队列里的消息99.99%被消费?

手动停掉消费者客户端,发现消息又到了Ready状态,准备重新投递:

RabbitMQ如何保证队列里的消息99.99%被消费?

之所以消费掉消息,却一直还是Unacked状态,是因为我们没在代码中添加显式的Ack代码:

String message = new String(body, "UTF-8");

//int result = 1 / 0;

System.out.println("Received Message '" + message + "'");

long deliveryTag = envelope.getDeliveryTag();

channel.basicAck(deliveryTag, false);

deliveryTag可以看做消息的编号,它是一个64位的长×××值。

此时运行消费者客户端,发现消息消费成功,并且在队列中被移除:

RabbitMQ如何保证队列里的消息99.99%被消费?

RabbitMQ如何保证队列里的消息99.99%被消费?

文末彩蛋

[Java学习、面试;文档、视频资源免费获取]

加QQ群:219571750

java如何保证mq一定被消费,RabbitMQ如何保证队列里的消息99.99%被消费?相关推荐

  1. rocketmq python 某个队列不消费_消息队列 RocketMQ 版消息轨迹没有显示消费信息,为什么?...

    关于 消息队列 RocketMQ 版消息轨迹没有显示消费信息,为什么?的搜索结果 回答 2021一月拼团已有400余人拼团成功最低一折 点击进入:一月新人专场 服务器配置时间价格1核2G1年84元1核 ...

  2. 利用rabbitMq的死信队列实现延时消息

    前言 使用mq自带的死信去实现延时消息要注意一个坑点,就是mq只会检测队首的消息的过期时间,假设先放入队列10s过期消息,再放入2s过期.mq会检测头部10s是否过期,10s不过期的情况下,2s就算过 ...

  3. rabbitmq 如何删除队列中的消息

    对于rabbitmq删除队列消息,有下面几种方式: 1. rabbitmqctl --node rabbit@node --vhost testVHost purge_queue testQueue ...

  4. SpringBoot+RabbitMQ ,保证消息100%投递成功并被消费(附源码)

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试资料 来源:rrd.me/f2cxz 一.先扔一张图 说明: 本文涵盖了 ...

  5. SpringBoot + RabbitMQ (保证消息100%投递成功并被消费)

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 来源 | jianshu.com/p/dca01aad6 ...

  6. 消费流程图_SpringBoot+RabbitMQ ,保证消息100%投递成功并被消费(附源码)

    来自:简书,作者:wangzaiplus 链接:https://www.jianshu.com/p/dca01aad6bc8 一.先扔一张图 说明: 本文涵盖了关于RabbitMQ很多方面的知识点, ...

  7. rabbitmq消费固定个数消息_SpringBoot+RabbitMQ (保证消息100%投递成功并被消费)

    作者:wangzaiplus https://www.jianshu.com/p/dca01aad6bc8 一.先扔一张图 说明:本文涵盖了关于RabbitMQ很多方面的知识点, 如: 消息发送确认机 ...

  8. springboot + rabbitmq发送邮件(保证消息100%投递成功并被消费)

    前言: RabbitMQ相关知识请参考: https://www.jianshu.com/p/cc3d2017e7b3 Linux安装RabbitMQ请参考: https://www.jianshu. ...

  9. java 如何判定消息已在队列_Java面试—消息队列

    消息队列面试题 题目来自于中华石杉,解决方案根据自己的思路来总结而得.题目主要如下: 1. 为什么要引入消息队列? 消息队列的引入可以解决3个核心问题: 解耦 异步 削峰 解耦 在一个项目中,如果一个 ...

最新文章

  1. 《OpenMP编译原理及实现技术》摘录
  2. 今日arXiv精选 | 9篇ICCV 2021最新论文
  3. 高性能编程:三级缓存(LLC)访问优化
  4. SAP OData Service group - get entity set
  5. [BZOJ4815][CQOI2017]小Q的表格 数论+分块
  6. 20145217 《信息安全系统设计基础》第0周学习总结
  7. 2021年Q2全球智能手机销量小米升至第二,苹果降至第三
  8. 数据的实操与测试|附代码(全了)
  9. [Winodows Phone 7控件详解]Silverlight toolkit for Windows Phone 7.1控件-5
  10. 【免费下载】2021年11月热门报告盘点(附热门报告列表及下载链接)
  11. python基础知识-Python语言基础知识总结
  12. 字节跳动面试经验 php,双指针算法:字节跳动初级面试题 PHP
  13. nginx apache tomcat 相关收藏
  14. SSH中各个框架的作用以及Spring AOP,IOC,DI详解
  15. codeproject 的精彩文章----MFC系列
  16. cnvd与cnnvd区别_漏洞都是怎么编号的CVE/CAN/BUGTRAQ/CNCVE/CNVD/CNNVD
  17. OXY OPENCART 商城自适应主题模板 ABC-0020-01
  18. 0基础能学“软件测试”吗?好学吗?怎么学?
  19. RA-L期刊投稿相关内容
  20. Java 复制PPT幻灯片

热门文章

  1. C# 实现Excel单元格画边框
  2. 爱博精电亮相四川省节能环保品牌推广全川行——乐山站
  3. 实用selenium+python实现web自动化测试
  4. 代码测试,调试与优化小结
  5. iNFTnews|日本即时通讯软件LINE推出NFT市场
  6. win7防火墙入站规则
  7. 怎么添加扫描仪到计算机快捷键,Win7系统添加扫描仪快捷方式的方法
  8. svg常用元素和属性
  9. 计算企业发放的奖金根据利润提成
  10. 聚类分析(cluster analysis)