目录

前言:

常见问题及解决思路

一、消息防丢方案

二、消息防堆积方案

三、消息发送失败补偿方案

3.1 消息发送失败处理方案

3.2 消息发送失败补偿方案

3.3 confirm方案对比

四、 消息消费失败处理方案

4.1.消息消费失败处理方案

4.2死信队列补偿机制

五、日志与监控

5.1关键节点日志记录

5.2 重点指标监控

前言:

本文主要描述在MQ使用过程中遇见的一些异常情况,并对异常情况,我们有什么思路去解决问题。

常见问题及解决思路

一、消息防丢方案

  • 生产者开启Confirm确认机制。
  • MQ消息默认设置为持久化,为所有队列设置镜像队列。
  • 消费者默认设置手动确认autoAck=false,并设置死信队列。

二、消息防堆积方案

  • 加强对不合理使用MQ的审批。
  • 监控消费能力(耗时<300ms),及时预警。
  • 框架层实现发送方限流。(默认值:100条/s)
  • 设置消息TTL。
  • 使用惰性队列。

三、消息发送失败补偿方案

3.1 消息发送失败处理方案

  1. 场景一:消息找不到队列导致消息发送失败。
  • 方案一:设置备用交换机(AE)

所有无法正确路由的消息都发往这个备份交换器中,可以为所有的交换器设置同一个AE,不过这里需要提前确保的是AE已经正确的绑定了队列,最好类型也是fanout的。

  • 方案二:设置mandatory=true

当mandatory参数设为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列的话,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。

这时候可以通过调用channel.addReturnListener来添加ReturnListener监听器实现。

总结:建议使用方案二,错误消息记录Warn级别日志,进行监控。

  1. 场景二:生产者客户端发送出去之后可以发生网络丢包、网络故障等造成消息丢失。
  • 方案一:开启MQ的事务机制

在通过channel.txSelect方法开启事务之后,我们便可以发布消息给RabbitMQ了,如果事务提交成功,则消息一定到达了RabbitMQ中,如果在事务提交执行之前由于RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行channel.txRollback方法来实现事务回滚。

缺点:只有消息成功被RabbitMQ接收,事务才能提交成功,否则我们便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是使用事务机制的话会“吸干”RabbitMQ的性能。

  • 方案二:生产者将信道设置成confirm(确认)模式

一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正确到达了目的地了。RabbitMQ回传给生产者的确认消息中的deliveryTag包含了确认消息的序号,此外RabbitMQ也可以设置channel.basicAck方法中的multiple参数,表示到这个序号之前的所有消息都已经得到了处理。

总结:建议使用方案二,避免开启事务后的性能消耗。

3.2 消息发送失败补偿方案

方案一:当消息发送失败后,框架层将错误信息写入文件(在容器中,需要挂载日志文件),然后生产者起定时任务,读取文件中的信息,进行重发。

方案二:

当消息发送失败后,结合MQ配置,对消息进行重试并记录error日志,达到重试次数后,将处理结果通过回调接口的方式告诉生产者,生产者去进行额外的补偿机制。

总结:建议使用方案二,在方案一中,首先写文件是一件比较重的设计,涉及IO,其次异常发生时,对生产者不透明。生产者不知道何时消息能从文件中进行补发。

3.3 confirm方案对比

客户端实现生产者confirm有三种方式:

  • 普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。
  • 批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。
  • 异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法
  • 第一种:

普通confirm模式最简单,publish一条消息后,等待服务器端confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传。

关键代码如下:

  • 第二种:

批量confirm模式稍微复杂一点,客户端程序需要定期(每隔多少秒)或者定量(达到多少条)或者两则结合起来publish消息,然后等待服务器端confirm, 相比普通confirm模式,批量极大提升confirm效率,但是问题在于一旦出现confirm返回false或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量confirm性能应该是不升反降的。

  • 第三种:

异步confirm模式的编程实现最复杂,Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK中的waitForConfirms()方法也是通过SortedSet维护消息序号的。

总结:建议使用方案一,发送方目前没有性能瓶颈。使用方案一进行可靠性确认。

四、 消息消费失败处理方案

4.1.消息消费失败处理方案

由于网络波动等原因,导致消息消费失败

  • 方案一:设置死信队列

当消息发送失败后,设置requeue=false消息进入死信队列,并获取死信队列的长度,设置重新发送到正常队列的重试时间和重试间隔,重新发送到正常队列。

监控死信队列长度,日志记录及时预警。

  • 方案二:将消息存入本地客户端,进行重发。

将消息失败的信息、存入redis进行重发,或者将消息持久化到消费端的数据库表。

  • 方案三:设置回调队列,生产者重发消息

RPC方案,远程调用方案,生产者通过回调队列,对消费失败的消息,进行重发。

总结:死信队列能很好的处理消息消费失败的场景,使用MQ原生的支持,避免过重的设计,使用方案一即可。

4.2死信队列补偿机制

  • 方案一:当消息消费失败后,进入死信队列,运维同事监控死信队列,及时预警。当评估不是因为消息数据的原因,导致消费失败。邮件告知领导审批后,运维同事在管理平台,手动将消息迁移到正常队列。

  • 方案二:当消息消费失败后,进入死信队列,框架层实现逻辑,获取对应死信队列的消息长度,当大于0时并判断是否超过重试次数并达到重试间隔。当没有超过重试次数时,自动将消息从死信队列迁移到正常队列。

总结:为了实现自动恢复机制,框架层应该支持方案二的重试机制,避免人为参与产生的错误。

五、日志与监控

5.1关键节点日志记录

  • MQ成功接受消息时。(info)
  • 生产者消息发送失败时。(error)
  • 生产者confrim确认失败时。(error)
  • 生产消息量过大,限流时。(error)
  • 生产者连接MQ超时时。 (error)
  • 消息大小大于10KB时。(error)
  • 消费者成功消费消息是。(info)
  • 消费者连接MQ超时时。(error)
  • 消费者消费失败时。(error)
  • 消费者进入死信队列时。(error)
  • 消费耗时低于300ms时。(error)

5.2 重点指标监控

指标

安全值范围

队列堆积数

<10000

队列未消费者绑定数

<0

Channel数量

<100

系统TPS

<3000

集群TPS

<8000

消息大小

<10KB

消息丢失

<0

异常日志

<0

                                  消费耗时

<300ms

死信队列

<0

CPU

<40%

内存

<35%

磁盘

>200MB

RabbitMQ的消息补偿机制相关推荐

  1. RabbitMQ之消息确认机制(事务+Confirm)

    概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达brok ...

  2. 什么是消息补偿机制?

    为什么还要消息补偿机制呢? 难道消息还会丢失,没错,系统是在一个复杂的环境,不要想的太简单了,虽然以上的三种方案,基本可以保证消息的高可用不丢失的问题,但是作为有追求的程序员来讲,要绝对保证我的系统的 ...

  3. WebAPi的可视化输出模式(RabbitMQ、消息补偿相关)——所有webapi似乎都缺失的一个功能...

    最近的工作我在做一个有关于消息发送和接受封装工作.大概流程是这样的,消息中间件是采用rabbitmq,为了保证消息的绝对无丢失,我们需要在发送和接受前对消息进行DB落地.在发送前我会先进行DB的插入, ...

  4. WebAPi的可视化输出模式(RabbitMQ、消息补偿相关)——所有webapi似乎都缺失的一个功能

    最近的工作我在做一个有关于消息发送和接受封装工作.大概流程是这样的,消息中间件是采用rabbitmq,为了保证消息的绝对无丢失,我们需要在发送和接受前对消息进行DB落地.在发送前我会先进行DB的插入, ...

  5. RabbitMQ的消息确认机制

    转:https://www.toutiao.com/a6583957771840913934/?tt_from=mobile_qq&utm_campaign=client_share& ...

  6. Java笔记-RabbitMQ的消息确认机制(事务)

    目录 基本概念 代码与实例 基本概念 消息应答与消息持久化,如下代码: boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAc ...

  7. RabbitMQ ACK消息确认机制 快速入门

    RabbitMQ 消息确认机制ACK ack机制保证的是broker和消费者之间的可靠性 ack表示的是消费端收到消息后的确认方式,有三种确认方式 自动确认:acknowledge="non ...

  8. RabbitMQ之消息重试机制

    1.消息重试机制 消费者消费消息的时候,发生异常情况,导致消息未确认,该消息会被重复消费(默认没有重复次数,即无限循环消费),但可以通过设置重试次数以及达到重试次数之后的消息处理 spring:rab ...

  9. rabbitmq消息队列 ack机制(消息确认机制)和消息补偿机制

    参考:https://blog.csdn.net/pan_junbiao/article/details/112956537 ack 机制就是消息在 生产者在发布消息以后,消息存在内存中,如果消息被确 ...

最新文章

  1. Docker创建 tomcat/weblogic 集群
  2. eclipse如何导出WAR包
  3. (Oracle学习笔记) Oracle概述
  4. 【实用】SAP MR8M校验增强
  5. 编制网站首页的基本原则
  6. as_hash ruby_Ruby中带有示例的Hash.delete_if方法
  7. Java项目课程06:系统实现-数据库
  8. Robot Framework操作
  9. 微信浏览器下载APK文件的实现方案
  10. google浏览器安装gliffy 安装方法及插件
  11. 西门子PLC s7-1200学习之路
  12. 浪潮服务器 虚拟光驱,玩转虚拟光驱:DAEMON TOOLS Pro
  13. 数据库信息泄漏 不可忽视的安全短板
  14. HDU-4747 Mex
  15. 2022年最佳WordPress企业主题
  16. [软件测试] - No.1 Fault Error Failure 区别
  17. 文献笔记04-大学生运动会报名系统的设计与研究
  18. 已知一个字典包含若干员工信息,姓请编写一个函数,删除性别为男的员工信息
  19. linux centos如何开启远程桌面,CentOS配置远程桌面
  20. springboot-cache + reds整合

热门文章

  1. 百度地图 JavaScript API ios不能使用情况
  2. ACMjava杨辉三角形与二项式定理递推实现与组合实现
  3. 百趣代谢组学分享:HSFB2b通过促进类黄酮生物合成赋予大豆耐盐能力
  4. 中国服装自主品牌行业市场环境与投资趋势分析报告
  5. MySQL索引底层实现原理 MyISAM非聚簇索引 vs. InnoDB聚簇索引
  6. 2019日历全年一张_2020年剑桥英语全年考试日历发布,参与活动还能赢取实体台历!...
  7. 统计文本大写字母和小写字母和数字和其他字符的个数
  8. matlab调用gams错误,求助:GAMS软件运行中错误报告
  9. thinkpad重装系统不引导_联想电脑重装系统无法引导原因及解决方法
  10. 安装与下载 Android studio(以下为处理器是Intel的安装方法,下载前请检查一下自己的CPU为AMD 还是Intel的)