RocketMQ之消息重试。
RocketMQ使用过程中,如何进行消息重试。
首先,我们需要明确,只有当消费模式为 MessageModel.CLUSTERING(集群模式) 时,Broker才会自动进行重试,对于广播消息是不会重试的。
集群消费模式下,当消息消费失败,RocketMQ会通过消息重试机制重新投递消息,努力使该消息消费成功。
当消费者消费该重试消息后,需要返回结果给broker,告知broker消费成功(ConsumeConcurrentlyStatus.CONSUME_SUCCESS)或者需要重新消费(ConsumeConcurrentlyStatus.RECONSUME_LATER)。
这里有个问题,如果消费者业务本身故障导致某条消息一直无法消费成功,难道要一直重试下去吗?
答案是显而易见的,并不会一直重试。
事实上,对于一直无法消费成功的消息,RocketMQ会在达到最大重试次数之后,将该消息投递至死信队列。然后我们需要关注死信队列,并对死信队列中的消息做人工的业务补偿操作。
那如何返回消息消费失败呢?
RocketMQ规定,以下三种情况统一按照消费失败处理并会发起重试。
业务消费方返回ConsumeConcurrentlyStatus.RECONSUME_LATER
业务消费方返回null
业务消费方主动/被动抛出异常前两种情况较容易理解,当返回ConsumeConcurrentlyStatus.RECONSUME_LATER或者null时,broker会知道消费失败,后续就会发起消息重试,重新投递该消息。
注意 对于抛出异常的情况,只要我们在业务逻辑中显式抛出异常或者非显式抛出异常,broker也会重新投递消息,如果业务对异常做了捕获,那么该消息将不会发起重试。因此对于需要重试的业务,消费方在捕获异常时要注意返回ConsumeConcurrentlyStatus.RECONSUME_LATER或null,输出日志并打印当前重试次数。推荐返回ConsumeConcurrentlyStatus.RECONSUME_LATER。
RocketMQ重试时间窗
这里介绍一下Apache RocketMQ的重试时间窗,当消息需要重试时,会按照该规则进行重试。
我们可以在RocketMQ源码的MessageStoreConfig.java中找到消息重试配置:
1MessageStoreConfig.java2// 消息延迟级别定义3private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
默认重试次数与重试时间间隔对应关系表如下:
可以看到,RocketMQ采用了“时间衰减策略”进行消息的重复投递,即重试次数越多,消息消费成功的可能性越小。
源码分析
在RocketMQ的客户端源码DefaultMQPushConsumerImpl.java中,对重试机制做了说明,源码如下:
1private int getMaxReconsumeTimes() {2 // default reconsume times: 163 if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {4 return 16;5 } else {6 return this.defaultMQPushConsumer.getMaxReconsumeTimes();7 }8}
解释下,首先判断消费端有没有显式设置最大重试次数 MaxReconsumeTimes, 如果没有,则设置默认重试次数为16,否则以设置的最大重试次数为准。
当消息消费失败,服务端会发起消费重试,具体逻辑在broker源码的 SendMessageProcessor.java 中的consumerSendMsgBack方法中涉及,源码如下:
1if (msgExt.getReconsumeTimes() >= maxReconsumeTimes 2 || delayLevel < 0) { 3 newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); 4 queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; 5 6 topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, 7 DLQ_NUMS_PER_GROUP, 8 PermName.PERM_WRITE, 0 9 );10 if (null == topicConfig) {11 response.setCode(ResponseCode.SYSTEM_ERROR);12 response.setRemark("topic[" + newTopic + "] not exist");13 return response;14 }15} else {16 if (0 == delayLevel) {17 delayLevel = 3 + msgExt.getReconsumeTimes();18 }1920 msgExt.setDelayTimeLevel(delayLevel);21}
判断消息当前重试次数是否大于等于最大重试次数,如果达到最大重试次数,或者配置的重试级别小于0,则获取死信队列的Topic,后续将超时的消息send到死信队列中。
正常的消息会进入else分支,对于首次重试的消息,默认的delayLevel是0,rocketMQ会将给该level + 3,也就是加到3,这就是说,如果没有显示的配置延时级别,消息消费重试首次,是延迟了第三个级别发起的重试,从表格中看也就是距离首次发送10s后重试。
当延时级别设置完成,刷新消息的重试次数为当前次数加1,broker将该消息刷盘,逻辑如下:
1MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); 2msgInner.setTopic(newTopic); 3msgInner.setBody(msgExt.getBody()); 4msgInner.setFlag(msgExt.getFlag()); 5MessageAccessor.setProperties(msgInner, msgExt.getProperties()); 6msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); 7msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags())); 8 9msgInner.setQueueId(queueIdInt);10msgInner.setSysFlag(msgExt.getSysFlag());11msgInner.setBornTimestamp(msgExt.getBornTimestamp());12msgInner.setBornHost(msgExt.getBornHost());13msgInner.setStoreHost(this.getStoreHost());14// 刷新消息的重试次数为当前次数加115msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);1617String originMsgId = MessageAccessor.getOriginMessageId(msgExt);18MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);19// 消息刷盘20PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
那么什么是msgInner呢,即:MessageExtBrokerInner,也就是对重试的消息,rocketMQ会创建一个新的 MessageExtBrokerInner 对象,它实际上是继承了MessageExt。
我们继续进入消息刷盘逻辑,即putMessage(msgInner)方法,实现类为:DefaultMessageStore.java, 核心代码如下:
1long beginTime = this.getSystemClock().now();2PutMessageResult result = this.commitLog.putMessage(msg);
主要关注 this.commitLog.putMessage(msg); 这句代码,通过commitLog我们可以认为这里是真实刷盘操作,也就是消息被持久化了。
我们继续进入commitLog的putMessage方法,看到如下核心代码段:
1if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE 2 || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { 3 // 处理延时级别 4 if (msg.getDelayTimeLevel() > 0) { 5 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { 6 msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); 7 } 8 // 更换Topic 9 topic = ScheduleMessageService.SCHEDULE_TOPIC;10 // 队列ID为延迟级别-111 queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());1213 // Backup real topic, queueId14 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());15 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));16 msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));1718 // 重置topic及queueId19 msg.setTopic(topic);20 msg.setQueueId(queueId);21 }22}2324ScheduleMessageService.java25public static int delayLevel2QueueId(final int delayLevel) {26 return delayLevel - 1;27}
可以看到,如果是重试消息,在进行延时级别判断时候,返回true,则进入分支逻辑,通过这段逻辑我们可以知道,对于重试的消息,rocketMQ并不会从原队列中获取消息,而是创建了一个新的Topic进行消息存储的。也就是代码中的SCHEDULE_TOPIC,看一下具体是什么内容:
1public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
这个名字比较有意思,就叫 SCHEDULE_TOPIC_XXXX。
到这里我们可以得到一个结论:
对于所有消费者消费失败的消息,rocketMQ都会把重试的消息 重新new出来(即上文提到的MessageExtBrokerInner对象),然后投递到主题 SCHEDULE_TOPIC_XXXX 下的队列中,然后由定时任务进行调度重试,而重试的周期符合我们在上文中提到的delayLevel周期,也就是:
1MessageStoreConfig.java2// 消息延迟级别定义3private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
同时为了保证消息可被找到,也会将原先的topic存储到properties中,也就是如下这段代码
1// Backup real topic, queueId2MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());3MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));4msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
这里将原先的topic和队列id做了备份。
死信的业务处理方式
默认的处理机制中,如果我们只对消息做重复消费,达到最大重试次数之后消息就进入死信队列了。
我们也可以根据业务的需要,定义消费的最大重试次数,每次消费的时候判断当前消费次数是否等于最大重试次数的阈值。
如:重试三次就认为当前业务存在异常,继续重试下去也没有意义了,那么我们就可以将当前的这条消息进行提交,返回broker状态ConsumeConcurrentlyStatus.CONSUME_SUCCES,让消息不再重发,同时将该消息存入我们业务自定义的死信消息表,将业务参数入库,相关的运营通过查询死信表来进行对应的业务补偿操作。
RocketMQ 的处理方式为将达到最大重试次数(16次)的消息标记为死信消息,将该死信消息投递到 DLQ 死信队列中,业务需要进行人工干预。实现的逻辑在 SendMessageProcessor 的 consumerSendMsgBack 方法中,大致思路为首先判断重试次数是否超过16或者消息发送延时级别是否小于0,如果已经超过16或者发送延时级别小于0,则将消息设置为新的死信。死信 topic 为:%DLQ%+consumerGroup。
我们接着看一下死信的源码实现机制。
死信源码分析
1private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) 2 throws RemotingCommandException { 3 final RemotingCommand response = RemotingCommand.createResponseCommand(null); 4 final ConsumerSendMsgBackRequestHeader requestHeader = 5 (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); 6 7 ...... 8 9 // 0.首先判断重试次数是否大于等于16,或者消息延迟级别是否小于010 if (msgExt.getReconsumeTimes() >= maxReconsumeTimes11 || delayLevel < 0) {12 // 1. 如果满足判断条件,设置死信队列topic= %DLQ%+consumerGroup13 newTopic = MixAll.getDLQTopic(requestHeader.getGroup());14 queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;1516 topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,17 DLQ_NUMS_PER_GROUP,18 PermName.PERM_WRITE, 019 );20 if (null == topicConfig) {21 response.setCode(ResponseCode.SYSTEM_ERROR);22 response.setRemark("topic[" + newTopic + "] not exist");23 return response;24 }25 } else {26 // 如果延迟级别为0,则设置下一次延迟级别为3+当前重试消费次数,达到时间衰减效果27 if (0 == delayLevel) {28 delayLevel = 3 + msgExt.getReconsumeTimes();29 }3031 msgExt.setDelayTimeLevel(delayLevel);32 }3334 MessageExtBrokerInner msgInner = new MessageExtBrokerInner();35 msgInner.setTopic(newTopic);36 msgInner.setBody(msgExt.getBody());37 msgInner.setFlag(msgExt.getFlag());38 MessageAccessor.setProperties(msgInner, msgExt.getProperties());39 msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));40 msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));4142 msgInner.setQueueId(queueIdInt);43 msgInner.setSysFlag(msgExt.getSysFlag());44 msgInner.setBornTimestamp(msgExt.getBornTimestamp());45 msgInner.setBornHost(msgExt.getBornHost());46 msgInner.setStoreHost(this.getStoreHost());47 msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);4849 String originMsgId = MessageAccessor.getOriginMessageId(msgExt);50 MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);5152 // 3.死信消息投递到死信队列中并落盘53 PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);54 ......55 return response;56 }
我们总结一下死信的处理逻辑:
首先判断消息当前重试次数是否大于等于16,或者消息延迟级别是否小于0
只要满足上述的任意一个条件,设置新的topic(死信topic)为:%DLQ%+consumerGroup
进行前置属性的添加
将死信消息投递到上述步骤2建立的死信topic对应的死信队列中并落盘,使消息持久化。
发送失败如何重试
当发生网络抖动等异常情况,Producer生产者侧往broker发送消息失败,即:生产者侧没收到broker返回的ACK,导致Consumer无法进行消息消费,这时RocketMQ会进行发送重试。
使用DefaultMQProducer进行普通消息发送时,我们可以设置消息发送失败后最大重试次数,并且能够灵活的配合超时时间进行业务重试逻辑的开发,使用的API如下:
1/**设置消息发送失败时最大重试次数*/ 2public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) { 3 this.retryTimesWhenSendFailed = retryTimesWhenSendFailed; 4} 5 6/**同步发送消息,并指定超时时间*/ 7public SendResult send(Message msg, 8 long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 9 return this.defaultMQProducerImpl.send(msg, timeout);10}
通过API可以看出,生产者侧的重试是比较简单的,例如:设置生产者在3s内没有发送成功则重试3次的代码如下:
1/**同步发送消息,如果3秒内没有发送成功,则重试3次*/2DefaultMQProducer producer = new DefaultMQProducer("DefaultProducerGroup");3producer.setRetryTimesWhenSendFailed(3);4producer.send(msg, 3000L);
小结
本文中,我们主要介绍了RocketMQ的消息重试机制,该机制能够最大限度的保证业务能够往我们期望的方向流转。
这里还需要注意,业务重试的时候我们的消息消费端需要保证消费的幂等性, 关于消息消费的幂等如何处理,可以查阅。
品略图书馆 http://www.pinlue.com/
RocketMQ之消息重试。相关推荐
- 【RocketMQ】消息重试、重试次数设置、死信队列
文章目录 1. 死信队列 1.1 死信特性 1.2 查看死信消息 2.重试次数参数 2.1 Producer端重试 2.2 Consumer端重试 3.1 异常重试 3.2 超时重试 参考 1. 死信 ...
- RocketMQ错误消息重试策略之Consumer的重试机制(Exception情况)
consumer端重试 消费者端的失败,分为2种情况,一个是exception,一个是timeout. exception 消息正常的到了消费者,结果消费者发生异常,处理失败了.例如反序列化失败,消息 ...
- RocketMQ错误消息重试策略之Consumer的重试机制(timeout情况)
timeout 比如由于网络原因导致消息压根就没有从MQ到消费者上,那么在RocketMQ内部会不断的尝试发送这条消息,直至发送成功为止! 也就是说,服务端没有接收到消息的反馈,既不是成功也不是失败, ...
- RocketMQ错误消息重试策略之重试情况的分析
- RocketMQ的消费者消息重试和生产者消息重投
详细介绍了RocketMQ的消息重试机制,RocketMQ的消息重试可以分为生产者重试和消费者重试两个部分. 文章目录 1 生产者重试 2 消费者重试 2.1 异常重试 2.1.1 并发消费的重试 2 ...
- 深入理解RocketMQ延迟消息
延迟消息是实际开发中一个非常有用的功能,本文第一部分从整体上介绍秒级精度延迟消息的实现思路,在第二部分结合RocketMQ的延迟消息实现,进行细致的讲解,点出关键部分的源码.第三步介绍延迟消息与消息重 ...
- rocketmq广播消息为什么不能重试_几分钟带你看懂“消息队列和RocketMQ”的入门总结
消息队列扫盲 消息队列顾名思义就是存放消息的队列,队列我就不解释了,别告诉我你连队列都不知道似啥吧? 所以问题并不是消息队列是什么,而是 消息队列为什么会出现?消息队列能用来干什么?用它来干这些事会带 ...
- RocketMQ(四)——消息重试
文章目录 一. Producer端重试 二. Consumer端重试 1. Exception 2. Timeout 总结 对于MQ,可能存在各种异常情况,导致消息无法最终被Consumer消费掉,因 ...
- RocketMQ源码 — 十一、 RocketMQ事务消息
分布式事务是一个复杂的问题,rmq实现了事务的最终一致性,rmq保证本地事务成功消息一定会发送成功并被成功消费,如果本地事务失败了,消息不会被发送. rmq事务消息的实现过程为: producer发送 ...
最新文章
- 三大阶段,四大领域,详解你不知道的AIoT!
- ViewPager,TabLayout,Fragment实现tabs滑动
- js 提交form表单,js更改form表单的action属性
- MAC如何查看某个端口的占用情况
- javascript之window对象
- L型代码结构案例:Link访问权限(上)
- C4C权限控制的一些特色功能和测试
- mirna富集分析_经验之谈丨生信分析文章套路原来这么简单!
- php选择nginx还是apache,浅谈apache和nginx的rewrite的区别
- Android中如何提取和生成mp4文件
- 深度学习2.08.tensorflow的高阶操作之张量排序
- 大一计算机理论总结,大一计算机理论基础总结论文.doc
- 微电子专业深度盘点:哪所大学芯片最强?强在哪?(第2弹)
- 微软模拟飞行2020服务器多少内存,《微软模拟飞行2020》到底有多大?我们的硬盘装得下吗?...
- 什么是主数据?什么是主数据管理系统?
- 银行c语言面试题,笔试题(商业银行方面)
- 两个重要极限 常用等价无穷下小 泰勒展开 麦克劳林公式
- onvif开发踩坑【二】鉴权失败
- 针对PVS方式的VDI部署趋势杀毒优化方法
- 物联网Wifi三大新主流势均力敌 SIP时代即将来袭
热门文章
- 计算机中的电子科学与技术论文,电子科学与技术优秀毕业论文.doc
- 芯片ECO的种类和修复方法介绍
- 有n个人围成一圈,从第1个人开始,1、2、3报数,报至3出局,余下的人继续从1、2、3报数,问:最后剩下的一人是原来的第几号?同时求出被淘汰编号的序列。(要求:用循环队列解决该问题。)
- centOS关机重启,保存内存中数据
- nonnegative matrix factorization (NMF)的R实现
- 成熟FOC电机控制代码 大厂成熟FOC电机控制图。 可用于电动自行车,滑板 车,电机FOC控制等
- 天翼云弹性计算服务器,获取天翼云Windows弹性云主机的密码操作步骤
- 如何快速从国外官网下载软件
- 测试路由和设备连接速度的软件,如何简单,快速地在家中测试无线路由器的速度和性能?...
- 大学生或者程序员上B站学习编程必备的几位优秀UP主