1、消息重试机制

由于MQ经常处于复杂的分布式系统中,考虑网络波动、服务宕机、程序异常因素,很有可能出现消息发送或者消费失败的问题。因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点。如果没有消息重试,就可能产生消息丢失的问题,可能对系统产生很大的影响。所以,秉承宁可多发消息,也不可丢失消息的原则,大部分MQ都对消息重试提供了很好的支持。

RocketMQ为使用者封装了消息重试的处理流程,无需开发人员手动处理。RocketMQ支持了生产端和消费端两类重试机制。

1.1 生产端重试

生产端配置的有发送失败重试次数,默认为2。使用了set方法对外进行暴露,producer客户端可以改写这个默认值。

public DefaultMQProducer(String producerGroup, RPCHook rpcHook) {this.createTopicKey = "TBW102";this.defaultTopicQueueNums = 4;this.sendMsgTimeout = 3000;this.compressMsgBodyOverHowmuch = 4096;//发送失败,重试次数this.retryTimesWhenSendFailed = 2;this.retryAnotherBrokerWhenNotStoreOK = false;this.maxMessageSize = 131072;this.unitMode = false;this.producerGroup = producerGroup;this.defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);}

1.2 消费端重试

消费者消费消息后,需要给Broker返回消费状态。以MessageListenerConcurrently监听器为例,Consumer消费完成后需要返回ConsumeConcurrentlyStatus并发消费状态。查看源码,ConsumeConcurrentlyStatus是一个枚举,共有两种状态:

public enum ConsumeConcurrentlyStatus {//消费成功ConsumeConcurrentlyStatus,//消费失败,一段时间后重试RECONSUME_LATER;
}

RECONSUME_LATER代表因为某种原因,消费失败,稍后再试。后续会再次消费
官方文档介绍如下:

RocketMQ中的消息无法无限次重新消费,当然了,手动修改重试次数是可以的,不介入的话不行。当重试次数超过所有延迟级别之后。消息会进入死信,死信Topic的命名为:%DLQ% + Consumer组名。

进入死信之后的消息肯定不会再投递了,不过可以通过接口去查询当前RocketMQ中死信队列的消息。如果在上层实现自有命令,那么可以将消息从死信中移出并重新投递。

死信消息具有以下特性:

  • 不会再被消费者正常消费。
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。

2、保证消息不丢失

分别从Producer发送机制、Broker的持久化机制,以及消费者的offSet机制来最大程度保证消息不易丢失

  • 从Producer的视角来看:如果消息未能正确的存储在MQ中,或者消费者未能正确的消费到这条消息,都是消息丢失。
  • 从Broker的视角来看:如果消息已经存在Broker里面了,如何保证不会丢失呢(宕机、磁盘崩溃)
  • 从Consumer的视角来看:如果消息已经完成持久化了,但是Consumer取了,但是未消费成功且没有反馈,就是消息丢失

从Producer分析:如何确保消息正确的发送到了Broker?

  • 默认情况下,可以通过同步的方式阻塞式的发送,check SendStatus,状态是OK,表示消息一定成功的投递到了Broker,状态超时或者失败,则会触发默认的2次重试。此方法的发送结果,可能Broker存储成功了,也可能没成功
  • 采取事务消息的投递方式,并不能保证消息100%投递成功到了Broker,但是如果消息发送Ack失败的话,此消息会存储在CommitLog当中,但是对ConsumerQueue是不可见的。可以在日志中查看到这条异常的消息,严格意义上来讲,也并没有完全丢失
  • RocketMQ支持 日志的索引,如果一条消息发送之后超时,也可以通过查询日志的API,来check是否在Broker存储成功

从Broker分析:如果确保接收到的消息不会丢失?

  • 消息支持持久化到Commitlog里面,即使宕机后重启,未消费的消息也是可以加载出来的
  • Broker自身支持同步刷盘、异步刷盘的策略,可以保证接收到的消息一定存储在本地的内存中
  • Broker集群支持 1主N从的策略,支持同步复制和异步复制的方式,同步复制可以保证即使Master 磁盘崩溃,消息仍然不会丢失

从Cunmser分析:如何确保拉取到的消息被成功消费?

  • 消费者可以根据自身的策略批量Pull消息
  • Consumer自身维护一个持久化的offset(对应MessageQueue里面的min offset),标记已经成功消费或者已经成功发回到broker的消息下标
  • 如果Consumer消费失败,那么它会把这个消息发回给Broker,发回成功后,再更新自己的offset
  • 如果Consumer消费失败,发回给broker时,broker挂掉了,那么Consumer会定时重试这个操作
  • 如果Consumer和broker一起挂了,消息也不会丢失,因为consumer 里面的offset是定时持久化的,重启之后,继续拉取offset之前的消息到本地

RocketMq重试及消息不丢失机制相关推荐

  1. RocketMQ如何保证消息不丢失? 如何快速处理积压消息?

    文章目录 1. 哪些环节会有丢消息的可能? 2. 消息生产阶段如何保证消息不丢失 2.1 同步发送 2.2 采用事务消息 3. Broker如何保证接收到的消息不会丢失 4. 消费者如何确保拉取到的消 ...

  2. RocketMq怎么保证消息不丢失

    目录 Producer发送消息阶段 手段一:提供SYNC的发送消息方式,等待broker处理结果. 手段二:发送消息如果失败或者超时,则重新发送. 手段三:broker提供多master模式 总结 B ...

  3. RocketMQ如何保证消息不丢失(消息可靠性)

    为什么说ROcketMQ更适用于业务型的消息中间件,因为它能够保证消息不丢失且带有事务消息. 先来看一张RocketMQ集群部署结构 其中Name Server主要是提供路由信息,这里暂时忽略,大致流 ...

  4. RocketMQ中的消息类型种类(二)

    消息种类 按照发送的特点分 同步消息 异步消息 单向消息 按照使用功能特点分 顺序消息 广播模式 延迟消息 批量消息 过滤消息 事务消息 按照发送的特点分 同步消息 同步发送是指消息发送方发出数据后, ...

  5. RocketMQ(九):rocketMQ设计的全链路消息零丢失方案?+rocketmq消息中间件事务消息机制的底层实现原理?+half是什么?+half消息是如何对消费者不可见的?

    前言: 目前rocketmq更新已经更新了11篇博客了,预计接下来的2-3篇是暂时的更新进度了,准备更新一下springboot或者是jvm,mysql相关的专题出来,后续更新完事后,再分享一些实战性 ...

  6. 深入探究 RocketMQ 事务机制的实现流程,为什么它能做到发送消息零丢失?

    本文转载自公众号:石杉的架构笔记 本文来自狸猫技术窝专栏<从零开始带你成为消息中间件实战高手>,是作者原子弹大侠开放的试读 1.解决消息丢失的第一个问题:订单系统推送消息领丢失 既然我们已 ...

  7. 如何基于RocketMQ设计一套全链路消息不丢失方案?

    我们使用MQ作为消息中间件,传输一些消息的时候,必须考虑到消息丢失的可能.因为有的时候消息丢失了,会产生很严重的后果,比如消息计费数据,跟钱有关的消息. 这篇文章我们以RocketMQ为例来讲解,如何 ...

  8. rocketmq怎么保证消息一致性_从入门到入土(三)RocketMQ 怎么保证的消息不丢失?...

    精彩推荐 一百期Java面试题汇总SpringBoot内容聚合IntelliJ IDEA内容聚合Mybatis内容聚合 接上一篇:RocketMQ入门到入土(二)事务消息&顺序消息 面试官常常 ...

  9. rocketmq怎么保证数据不会重复_rocketmq如何保证消息不丢失

    一.大体可以从三方面来说: 分别从Producer发送机制.Broker的持久化机制,以及消费者的offSet机制来最大程度保证消息不易丢失 从Producer的视角来看:如果消息未能正确的存储在MQ ...

最新文章

  1. tensorflow中的梯度弥散与梯度爆炸
  2. 三代数据组装软件canu
  3. 优化tableView性能(针对滑动时出现卡的现象)
  4. 全国计算机等级考试题库二级C操作题100套(第34套)
  5. [Leedcode][JAVA][第16题][最接近的三数之和][双指针][数组]
  6. 运行MYSQL数据库命令时connetion Timeout expired异常问题
  7. Linux学习笔记第二周第四次课(2月1日)
  8. mybatis插件的执行顺序
  9. ubuntu安装utorrent,以闪电的速度在六维空间下载东西!
  10. linux 管道 减号,linux shell环境减号”-”的用途
  11. r语言峰峦图ggplot2_如何使用ggplot2在R中创建区域图
  12. c++中多个线程使用同一个函数
  13. MySQL中用户订单复购率的计算
  14. 监控摄像头与云服务器
  15. 微信小程序 诡异的异步调用问题,函数执行结果与预想不一致
  16. 文学随笔:《错过独白》
  17. Android 渲染机制——SurfaceFlinger
  18. 6to4隧道实验配置
  19. php实现bigpipe
  20. 深入理解指针数组、数组指针、函数指针、函数指针数组、指向函数指针数组的指针

热门文章

  1. 我让代码生了个孩子继承了他爸爸谁知他爸爸继承了他爷爷(16)
  2. 尾调用优化 java_为什么JVM仍然不支持尾调用优化?
  3. js 月份加6个月_美国切削工具6月份订单较上月增加10.1
  4. c语言将十进制转化为二进制算法_base64算法初探即逆向分析
  5. 一元二次方程求根公式的花样变换,你看懂了吗?
  6. 兄弟,就你这智商就别出轨了吧?
  7. 《SAS编程与数据挖掘商业案例》学习笔记之十
  8. linux内核分析与移植,内核分析移植
  9. linux安装定制添加输入,Arch Linux--定制自己的Linux操作系統(乙-國際化桌面安裝篇)...
  10. uboot移植9个步骤_不知道具体的新房装修步骤?9个步骤教你吃透装修