消息消费流程:

1.生产端发送消息到RabbitMQ;

2.RabbitMQ发送消息到消费端;

3.消费端消费消息;

以上3个步骤每个步骤都可能导致消息丢失,消息丢失并不可怕,可怕的是丢失了我们还不知道,所以要有一系列措施来保证我们系统的可靠性。

生产端可靠性投递

一、事务消息机制

事务消息机制由于会严重降低性能,所以一般不采用这种方法。

二、confirm消息确认机制

生产端投递的消息一旦投递到RabbitMQ后,RabbitMQ就会发送一个确认消息给生产端,让生产端知道我已经收到消息了,否则这条消息就可能已经丢失了,需要生产端重新发送消息了。

通过下面这句代码来开启确认模式:

channel.confirmSelect();// 开启发送方确认模式

然后异步监听确认和未确认的消息:

channel.addConfirmListener(new ConfirmListener() {//消息正确到达broker@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("已收到消息");//做一些其他处理}//RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("未确认消息,标识:" + deliveryTag);//做一些其他处理,比如消息重发等}
});

三、消息持久化

RabbitMQ收到消息后将这个消息暂时存在了内存中,如果RabbitMQ挂了,那重启后数据就丢失了,所以相关的数据应该持久化到硬盘中,这样就算RabbitMQ重启后也可以到硬盘中取数据恢复。

message消息到达RabbitMQ后先是到exchange交换机中,然后路由给queue队列,最后发送给消费端。

所有需要给exchange、queue和message都进行持久化:

exchange持久化:

//第三个参数true表示这个exchange持久化
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);

queue持久化:

//第二个参数true表示这个queue持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

message持久化:

//第三个参数MessageProperties.PERSISTENT_TEXT_PLAIN表示这条消息持久化
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));

四、消息入库

首先发送消息前先将消息保存到数据库中,有一个状态字段status=0,表示生产端将消息发送给了RabbitMQ但还没收到确认;在生产端收到确认后将status设为1,表示RabbitMQ已收到消息。这里有可能会出现上面说的两种情况,所以生产端这边开一个定时器,定时检索消息表,将status=0并且超过固定时间后(可能消息刚发出去还没来得及确认这边定时器刚好检索到这条status=0的消息,所以给个时间)还没收到确认的消息取出重发(第二种情况下这里会造成消息重复,消费者端要做幂等性),可能重发还会失败,所以可以做一个最大重发次数,超过就做另外的处理。

消费端消息不丢失

默认情况下,以下3种情况会导致消息丢失:

  • 在RabbitMQ将消息发出后,消费端还没接收到消息之前,发生网络故障,消费端与RabbitMQ断开连接,此时消息会丢失;

  • 在RabbitMQ将消息发出后,消费端还没接收到消息之前,消费端挂了,此时消息会丢失;

  • 消费端正确接收到消息,但在处理消息的过程中发生异常或宕机了,消息也会丢失。

 一、自动ack机制改为手动ack机制

因为RabbitMQ的自动ack机制,即默认RabbitMQ在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完,导致消费端消息丢失时RabbitMQ自己又没有这条消息了。

消费端手动确认消息:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {try {//接收到消息,做处理//手动确认channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (Exception e) {//出错处理,这里可以让消息重回队列重新发送或直接丢弃消息}
};
//第二个参数autoAck设为false表示关闭自动确认机制,需手动确认
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});

二、保证消息幂等性

让每个消息携带一个全局的唯一ID,即可保证消息的幂等性,具体消费过程为:

  1. 消费者获取到消息后先根据id去查询redis/db是否存在该消息
  2. 如果不存在,则正常消费,消费完毕后写入redis/db
  3. 如果存在,则证明消息被消费过,直接丢弃。
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))
public class Consumer {@RabbitHandlerpublic void receiveMessage(Message message) throws Exception {Jedis jedis = new Jedis("localhost", 6379);String messageId = message.getMessageProperties().getMessageId();String msg = new String(message.getBody(),"UTF-8");System.out.println("接收到的消息为:"+msg+"==消息id为:"+messageId);String messageIdRedis = jedis.get("messageId");if(messageId == messageIdRedis){return;}JSONObject jsonObject = JSONObject.parseObject(msg);String email = jsonObject.getString("message");jedis.set("messageId",messageId);}
}

RabbitMQ消息100%不丢失?相关推荐

  1. 使用MQ的时候,怎么确保消息100%不丢失?

    面试官在面试候选人时,如果发现候选人的简历中写了在项目中使用了 MQ 技术(如 Kafka.RabbitMQ.RocketMQ),基本都会抛出一个问题:在使用 MQ 的时候,怎么确保消息 100% 不 ...

  2. 消息队列探秘-RabbitMQ消息队列介绍 侵立删

    1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现.AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有 ...

  3. 消息队列探秘 – RabbitMQ 消息队列工作原理

    1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现.AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有 ...

  4. RabbitMQ宕机后,消息100%不会丢失吗

    V-xin:ruyuanhadeng获得600+页原创精品文章汇总PDF 这篇文章,给不太熟悉MQ技术的同学,介绍一个生产环境中可能会遇到的问题. 目前为止,你的RabbitMQ部署在线上服务器了,对 ...

  5. SpringBoot+RabbitMQ ,保证消息100%投递成功并被消费(附源码)

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试资料 来源:rrd.me/f2cxz 一.先扔一张图 说明: 本文涵盖了 ...

  6. 面试官:引入RabbitMQ后,你如何保证全链路数据100%不丢失?

    作者:指尖凉 来源:blog.csdn.net/hsz2568952354/article/details/86559470 我们都知道,消息从生产端到消费端消费要经过3个步骤: 生产端发送消息到Ra ...

  7. rabbitmq消费固定个数消息_SpringBoot+RabbitMQ (保证消息100%投递成功并被消费)

    作者:wangzaiplus https://www.jianshu.com/p/dca01aad6bc8 一.先扔一张图 说明:本文涵盖了关于RabbitMQ很多方面的知识点, 如: 消息发送确认机 ...

  8. SpringBoot + RabbitMQ (保证消息100%投递成功并被消费)

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 来源 | jianshu.com/p/dca01aad6 ...

  9. springboot + rabbitmq发送邮件(保证消息100%投递成功并被消费)

    前言: RabbitMQ相关知识请参考: https://www.jianshu.com/p/cc3d2017e7b3 Linux安装RabbitMQ请参考: https://www.jianshu. ...

最新文章

  1. 用看板工具leangoo做人事招聘
  2. python django mysql安装_Django+Nginx+uWSGI+Mysql搭建Python Web服务器
  3. MySQL - Explain深度剖析
  4. 用 Python 和 OpenCV 检测图片上的条形码Detecting Barcodes in Images with Python and OpenCV
  5. [PHP] 通用网关接口CGI 的运行原理
  6. 牛客contest897 D-Bamboo Rat(二分+黑白染色+最小割)
  7. 数值计算算法-多项式插值算法的实现与分析
  8. 终于要来了!华为P50将提供两个版本:国内仅有鸿蒙
  9. STM32F103高级定时器使用
  10. python逐行比较两个csv_python内两个CSV文件数据比较。。。求大神解答!!
  11. 使用java将String类型的json转为json对象并进行取出响应的值
  12. [LeetCode] 860. 柠檬水找零 lemonade-change(贪心算法)
  13. kmz文件转为arcgis的图层(制作底图)
  14. vue3 使用echarts
  15. 杂谈(1)--人生必知的78种经典效应
  16. 网络攻击更难预料,IoT到底是福是祸?
  17. 设有n个人围坐一圈并按顺时针方向从1到n编号
  18. 电脑连不上网故障排查思路
  19. 六、cocos2dx-效果(Effect)
  20. 多渠道归因分析(Attribution):传统归因(一)

热门文章

  1. android加固之后出问题,Android 应用加固
  2. 三种食物会让肿瘤疯长
  3. org.quartz.JobPersistenceException: Couldn‘t store job:
  4. matlab 符号微积分
  5. 综述:视频和图像去雾算法以及相关的图像恢复和增强研究
  6. Linux--firewalld防火墙基础(firewalld和iptables的关系,四表五链,netfilter与iptables的关系,iptables语法与参数,firewalld网络区域)
  7. 万能手机usb内窥镜软件下载_万能证件生成器手机版-万能证件生成器手机版下载 v1.0 免费版...
  8. 【网络-实验】恐怖的网络环路
  9. Android中list常用方法,Android中的常用控件及其基本用法
  10. python能做界面吗_如何使用pyQT做pythonGUI界面|