RocketMQ使用过程中,如何进行消息重试。

首先,我们需要明确,只有当消费模式为 MessageModel.CLUSTERING(集群模式) 时,Broker才会自动进行重试,对于广播消息是不会重试的。

集群消费模式下,当消息消费失败,RocketMQ会通过消息重试机制重新投递消息,努力使该消息消费成功。

当消费者消费该重试消息后,需要返回结果给broker,告知broker消费成功(ConsumeConcurrentlyStatus.CONSUME_SUCCESS)或者需要重新消费(ConsumeConcurrentlyStatus.RECONSUME_LATER)。

这里有个问题,如果消费者业务本身故障导致某条消息一直无法消费成功,难道要一直重试下去吗?

答案是显而易见的,并不会一直重试。

事实上,对于一直无法消费成功的消息,RocketMQ会在达到最大重试次数之后,将该消息投递至死信队列。然后我们需要关注死信队列,并对死信队列中的消息做人工的业务补偿操作。

那如何返回消息消费失败呢?

RocketMQ规定,以下三种情况统一按照消费失败处理并会发起重试。

  1. 业务消费方返回ConsumeConcurrentlyStatus.RECONSUME_LATER

  2. 业务消费方返回null

  3. 业务消费方主动/被动抛出异常前两种情况较容易理解,当返回ConsumeConcurrentlyStatus.RECONSUME_LATER或者null时,broker会知道消费失败,后续就会发起消息重试,重新投递该消息。

注意 对于抛出异常的情况,只要我们在业务逻辑中显式抛出异常或者非显式抛出异常,broker也会重新投递消息,如果业务对异常做了捕获,那么该消息将不会发起重试。因此对于需要重试的业务,消费方在捕获异常时要注意返回ConsumeConcurrentlyStatus.RECONSUME_LATER或null,输出日志并打印当前重试次数。推荐返回ConsumeConcurrentlyStatus.RECONSUME_LATER。

RocketMQ重试时间窗


这里介绍一下Apache RocketMQ的重试时间窗,当消息需要重试时,会按照该规则进行重试。

我们可以在RocketMQ源码的MessageStoreConfig.java中找到消息重试配置:

1MessageStoreConfig.java2// 消息延迟级别定义3private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

默认重试次数与重试时间间隔对应关系表如下:

可以看到,RocketMQ采用了“时间衰减策略”进行消息的重复投递,即重试次数越多,消息消费成功的可能性越小。

源码分析

在RocketMQ的客户端源码DefaultMQPushConsumerImpl.java中,对重试机制做了说明,源码如下:

1private int getMaxReconsumeTimes() {2    // default reconsume times: 163    if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {4        return 16;5    } else {6        return this.defaultMQPushConsumer.getMaxReconsumeTimes();7    }8}

解释下,首先判断消费端有没有显式设置最大重试次数 MaxReconsumeTimes, 如果没有,则设置默认重试次数为16,否则以设置的最大重试次数为准。

当消息消费失败,服务端会发起消费重试,具体逻辑在broker源码的 SendMessageProcessor.java 中的consumerSendMsgBack方法中涉及,源码如下:

 1if (msgExt.getReconsumeTimes() >= maxReconsumeTimes 2    || delayLevel < 0) { 3    newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); 4    queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; 5 6    topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, 7        DLQ_NUMS_PER_GROUP, 8        PermName.PERM_WRITE, 0 9    );10    if (null == topicConfig) {11        response.setCode(ResponseCode.SYSTEM_ERROR);12        response.setRemark("topic[" + newTopic + "] not exist");13        return response;14    }15} else {16    if (0 == delayLevel) {17        delayLevel = 3 + msgExt.getReconsumeTimes();18    }1920    msgExt.setDelayTimeLevel(delayLevel);21}

判断消息当前重试次数是否大于等于最大重试次数,如果达到最大重试次数,或者配置的重试级别小于0,则获取死信队列的Topic,后续将超时的消息send到死信队列中。

正常的消息会进入else分支,对于首次重试的消息,默认的delayLevel是0,rocketMQ会将给该level + 3,也就是加到3,这就是说,如果没有显示的配置延时级别,消息消费重试首次,是延迟了第三个级别发起的重试,从表格中看也就是距离首次发送10s后重试。

当延时级别设置完成,刷新消息的重试次数为当前次数加1,broker将该消息刷盘,逻辑如下:

 1MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); 2msgInner.setTopic(newTopic); 3msgInner.setBody(msgExt.getBody()); 4msgInner.setFlag(msgExt.getFlag()); 5MessageAccessor.setProperties(msgInner, msgExt.getProperties()); 6msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); 7msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags())); 8 9msgInner.setQueueId(queueIdInt);10msgInner.setSysFlag(msgExt.getSysFlag());11msgInner.setBornTimestamp(msgExt.getBornTimestamp());12msgInner.setBornHost(msgExt.getBornHost());13msgInner.setStoreHost(this.getStoreHost());14// 刷新消息的重试次数为当前次数加115msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);1617String originMsgId = MessageAccessor.getOriginMessageId(msgExt);18MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);19// 消息刷盘20PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

那么什么是msgInner呢,即:MessageExtBrokerInner,也就是对重试的消息,rocketMQ会创建一个新的 MessageExtBrokerInner 对象,它实际上是继承了MessageExt。

我们继续进入消息刷盘逻辑,即putMessage(msgInner)方法,实现类为:DefaultMessageStore.java, 核心代码如下:

1long beginTime = this.getSystemClock().now();2PutMessageResult result = this.commitLog.putMessage(msg);

主要关注 this.commitLog.putMessage(msg); 这句代码,通过commitLog我们可以认为这里是真实刷盘操作,也就是消息被持久化了。

我们继续进入commitLog的putMessage方法,看到如下核心代码段:

 1if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE 2    || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { 3    // 处理延时级别 4    if (msg.getDelayTimeLevel() > 0) { 5        if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { 6            msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); 7        } 8        // 更换Topic 9        topic = ScheduleMessageService.SCHEDULE_TOPIC;10        // 队列ID为延迟级别-111        queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());1213        // Backup real topic, queueId14        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());15        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));16        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));1718        // 重置topic及queueId19        msg.setTopic(topic);20        msg.setQueueId(queueId);21    }22}2324ScheduleMessageService.java25public static int delayLevel2QueueId(final int delayLevel) {26    return delayLevel - 1;27}  

可以看到,如果是重试消息,在进行延时级别判断时候,返回true,则进入分支逻辑,通过这段逻辑我们可以知道,对于重试的消息,rocketMQ并不会从原队列中获取消息,而是创建了一个新的Topic进行消息存储的。也就是代码中的SCHEDULE_TOPIC,看一下具体是什么内容:

1public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";

这个名字比较有意思,就叫 SCHEDULE_TOPIC_XXXX。

到这里我们可以得到一个结论:

对于所有消费者消费失败的消息,rocketMQ都会把重试的消息 重新new出来(即上文提到的MessageExtBrokerInner对象),然后投递到主题 SCHEDULE_TOPIC_XXXX 下的队列中,然后由定时任务进行调度重试,而重试的周期符合我们在上文中提到的delayLevel周期,也就是:

1MessageStoreConfig.java2// 消息延迟级别定义3private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

同时为了保证消息可被找到,也会将原先的topic存储到properties中,也就是如下这段代码

1// Backup real topic, queueId2MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());3MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));4msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));  

这里将原先的topic和队列id做了备份。

死信的业务处理方式


默认的处理机制中,如果我们只对消息做重复消费,达到最大重试次数之后消息就进入死信队列了。

我们也可以根据业务的需要,定义消费的最大重试次数,每次消费的时候判断当前消费次数是否等于最大重试次数的阈值。

如:重试三次就认为当前业务存在异常,继续重试下去也没有意义了,那么我们就可以将当前的这条消息进行提交,返回broker状态ConsumeConcurrentlyStatus.CONSUME_SUCCES,让消息不再重发,同时将该消息存入我们业务自定义的死信消息表,将业务参数入库,相关的运营通过查询死信表来进行对应的业务补偿操作。

RocketMQ 的处理方式为将达到最大重试次数(16次)的消息标记为死信消息,将该死信消息投递到 DLQ 死信队列中,业务需要进行人工干预。实现的逻辑在 SendMessageProcessor 的 consumerSendMsgBack 方法中,大致思路为首先判断重试次数是否超过16或者消息发送延时级别是否小于0,如果已经超过16或者发送延时级别小于0,则将消息设置为新的死信。死信 topic 为:%DLQ%+consumerGroup。

我们接着看一下死信的源码实现机制。

死信源码分析

 1private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) 2        throws RemotingCommandException { 3        final RemotingCommand response = RemotingCommand.createResponseCommand(null); 4        final ConsumerSendMsgBackRequestHeader requestHeader = 5            (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); 6 7        ...... 8 9        // 0.首先判断重试次数是否大于等于16,或者消息延迟级别是否小于010        if (msgExt.getReconsumeTimes() >= maxReconsumeTimes11            || delayLevel < 0) {12            // 1. 如果满足判断条件,设置死信队列topic= %DLQ%+consumerGroup13            newTopic = MixAll.getDLQTopic(requestHeader.getGroup());14            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;1516            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,17                DLQ_NUMS_PER_GROUP,18                PermName.PERM_WRITE, 019            );20            if (null == topicConfig) {21                response.setCode(ResponseCode.SYSTEM_ERROR);22                response.setRemark("topic[" + newTopic + "] not exist");23                return response;24            }25        } else {26            // 如果延迟级别为0,则设置下一次延迟级别为3+当前重试消费次数,达到时间衰减效果27            if (0 == delayLevel) {28                delayLevel = 3 + msgExt.getReconsumeTimes();29            }3031            msgExt.setDelayTimeLevel(delayLevel);32        }3334        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();35        msgInner.setTopic(newTopic);36        msgInner.setBody(msgExt.getBody());37        msgInner.setFlag(msgExt.getFlag());38        MessageAccessor.setProperties(msgInner, msgExt.getProperties());39        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));40        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));4142        msgInner.setQueueId(queueIdInt);43        msgInner.setSysFlag(msgExt.getSysFlag());44        msgInner.setBornTimestamp(msgExt.getBornTimestamp());45        msgInner.setBornHost(msgExt.getBornHost());46        msgInner.setStoreHost(this.getStoreHost());47        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);4849        String originMsgId = MessageAccessor.getOriginMessageId(msgExt);50        MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);5152        // 3.死信消息投递到死信队列中并落盘53        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);54        ......55        return response;56    }

我们总结一下死信的处理逻辑:

  1. 首先判断消息当前重试次数是否大于等于16,或者消息延迟级别是否小于0

  2. 只要满足上述的任意一个条件,设置新的topic(死信topic)为:%DLQ%+consumerGroup

  3. 进行前置属性的添加

  4. 将死信消息投递到上述步骤2建立的死信topic对应的死信队列中并落盘,使消息持久化。

    发送失败如何重试


当发生网络抖动等异常情况,Producer生产者侧往broker发送消息失败,即:生产者侧没收到broker返回的ACK,导致Consumer无法进行消息消费,这时RocketMQ会进行发送重试。

使用DefaultMQProducer进行普通消息发送时,我们可以设置消息发送失败后最大重试次数,并且能够灵活的配合超时时间进行业务重试逻辑的开发,使用的API如下:

 1/**设置消息发送失败时最大重试次数*/ 2public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) { 3    this.retryTimesWhenSendFailed = retryTimesWhenSendFailed; 4} 5 6/**同步发送消息,并指定超时时间*/ 7public SendResult send(Message msg, 8                    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 9    return this.defaultMQProducerImpl.send(msg, timeout);10}

通过API可以看出,生产者侧的重试是比较简单的,例如:设置生产者在3s内没有发送成功则重试3次的代码如下:

1/**同步发送消息,如果3秒内没有发送成功,则重试3次*/2DefaultMQProducer producer = new DefaultMQProducer("DefaultProducerGroup");3producer.setRetryTimesWhenSendFailed(3);4producer.send(msg, 3000L);

小结


本文中,我们主要介绍了RocketMQ的消息重试机制,该机制能够最大限度的保证业务能够往我们期望的方向流转。

这里还需要注意,业务重试的时候我们的消息消费端需要保证消费的幂等性, 关于消息消费的幂等如何处理,可以查阅。

品略图书馆 http://www.pinlue.com/

RocketMQ之消息重试。相关推荐

  1. 【RocketMQ】消息重试、重试次数设置、死信队列

    文章目录 1. 死信队列 1.1 死信特性 1.2 查看死信消息 2.重试次数参数 2.1 Producer端重试 2.2 Consumer端重试 3.1 异常重试 3.2 超时重试 参考 1. 死信 ...

  2. RocketMQ错误消息重试策略之Consumer的重试机制(Exception情况)

    consumer端重试 消费者端的失败,分为2种情况,一个是exception,一个是timeout. exception 消息正常的到了消费者,结果消费者发生异常,处理失败了.例如反序列化失败,消息 ...

  3. RocketMQ错误消息重试策略之Consumer的重试机制(timeout情况)

    timeout 比如由于网络原因导致消息压根就没有从MQ到消费者上,那么在RocketMQ内部会不断的尝试发送这条消息,直至发送成功为止! 也就是说,服务端没有接收到消息的反馈,既不是成功也不是失败, ...

  4. RocketMQ错误消息重试策略之重试情况的分析

  5. RocketMQ的消费者消息重试和生产者消息重投

    详细介绍了RocketMQ的消息重试机制,RocketMQ的消息重试可以分为生产者重试和消费者重试两个部分. 文章目录 1 生产者重试 2 消费者重试 2.1 异常重试 2.1.1 并发消费的重试 2 ...

  6. 深入理解RocketMQ延迟消息

    延迟消息是实际开发中一个非常有用的功能,本文第一部分从整体上介绍秒级精度延迟消息的实现思路,在第二部分结合RocketMQ的延迟消息实现,进行细致的讲解,点出关键部分的源码.第三步介绍延迟消息与消息重 ...

  7. rocketmq广播消息为什么不能重试_几分钟带你看懂“消息队列和RocketMQ”的入门总结

    消息队列扫盲 消息队列顾名思义就是存放消息的队列,队列我就不解释了,别告诉我你连队列都不知道似啥吧? 所以问题并不是消息队列是什么,而是 消息队列为什么会出现?消息队列能用来干什么?用它来干这些事会带 ...

  8. RocketMQ(四)——消息重试

    文章目录 一. Producer端重试 二. Consumer端重试 1. Exception 2. Timeout 总结 对于MQ,可能存在各种异常情况,导致消息无法最终被Consumer消费掉,因 ...

  9. RocketMQ源码 — 十一、 RocketMQ事务消息

    分布式事务是一个复杂的问题,rmq实现了事务的最终一致性,rmq保证本地事务成功消息一定会发送成功并被成功消费,如果本地事务失败了,消息不会被发送. rmq事务消息的实现过程为: producer发送 ...

最新文章

  1. 三大阶段,四大领域,详解你不知道的AIoT!
  2. ViewPager,TabLayout,Fragment实现tabs滑动
  3. js 提交form表单,js更改form表单的action属性
  4. MAC如何查看某个端口的占用情况
  5. javascript之window对象
  6. L型代码结构案例:Link访问权限(上)
  7. C4C权限控制的一些特色功能和测试
  8. mirna富集分析_经验之谈丨生信分析文章套路原来这么简单!
  9. php选择nginx还是apache,浅谈apache和nginx的rewrite的区别
  10. Android中如何提取和生成mp4文件
  11. 深度学习2.08.tensorflow的高阶操作之张量排序
  12. 大一计算机理论总结,大一计算机理论基础总结论文.doc
  13. 微电子专业深度盘点:哪所大学芯片最强?强在哪?(第2弹)
  14. 微软模拟飞行2020服务器多少内存,《微软模拟飞行2020》到底有多大?我们的硬盘装得下吗?...
  15. 什么是主数据?什么是主数据管理系统?
  16. 银行c语言面试题,笔试题(商业银行方面)
  17. 两个重要极限 常用等价无穷下小 泰勒展开 麦克劳林公式
  18. onvif开发踩坑【二】鉴权失败
  19. 针对PVS方式的VDI部署趋势杀毒优化方法
  20. 物联网Wifi三大新主流势均力敌 SIP时代即将来袭

热门文章

  1. 计算机中的电子科学与技术论文,电子科学与技术优秀毕业论文.doc
  2. 芯片ECO的种类和修复方法介绍
  3. 有n个人围成一圈,从第1个人开始,1、2、3报数,报至3出局,余下的人继续从1、2、3报数,问:最后剩下的一人是原来的第几号?同时求出被淘汰编号的序列。(要求:用循环队列解决该问题。)
  4. centOS关机重启,保存内存中数据
  5. nonnegative matrix factorization (NMF)的R实现
  6. 成熟FOC电机控制代码 大厂成熟FOC电机控制图。 可用于电动自行车,滑板 车,电机FOC控制等
  7. 天翼云弹性计算服务器,获取天翼云Windows弹性云主机的密码操作步骤
  8. 如何快速从国外官网下载软件
  9. 测试路由和设备连接速度的软件,如何简单,快速地在家中测试无线路由器的速度和性能?...
  10. 大学生或者程序员上B站学习编程必备的几位优秀UP主