大家好,我是一个喜欢诗词的java研发赛亚人,感谢您的关注~ ┗( ▔, ▔ )┛。微信搜索【程序猿卡卡罗特】,后续有更多硬核文章哦~

今日诗词:少年恃险若平地,独倚长剑凌清秋。 – [唐·顾况]《行路难三首》

今天我们来扒一扒RocketMQ重试机制的底裤,内容比较硬核,建议一键三联。哦不,走错片场了,建议点赞 + 收藏。

好嘞,咱们这就上车~

以下只设计Consumer的重试机制,Producer比较简单,只是单纯的重发(当然还有故障转移机制啦),暂不讨论…

设计知识点

  • ACK 重试机制原理
  • 死信队列(DLQ队列)

几个问题

  • 消息重试是什么意思?
  • Consumer 消费消息分为集群模式(Cluster)、广播模式(Broadcast),两种模式都会进行消息重试吗?
  • 消息重试的策略是什么?
  • 消息重试的延迟时间规则?
  • 什么叫死信队列?有什么特点?
  • Msg加入死信队列的条件是什么?

知识背景

我们知道Consumer拉取消息、消费消息时分开的,分别由两个类去实现:

  • 拉取消息:PullMessageService
  • 消费消息:ConsumeMessageConcurrentlyService

消息消费流程

下面只展示关键代码

1、假设我们拉取到消息,准备提交到 ConsumeMessageConcurrentlyService 中进行消费,会调如下代码:

// ConsumeMessageConcurrentlyService
public void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();// 假设未分页if (msgs.size() <= consumeBatchSize) {// 消息封装到里面ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {// 丢线程池消费this.consumeExecutor.submit(consumeRequest);}}
}

2、ConsumeRequest 内部代码

@Override
public void run() {// 1、Consumer 中设计的回调方法MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;boolean hasException = false;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {// 2、回调 Consumer 中的监听回调方法status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {hasException = true;}// 3、如果status 返回null,设置为 RECONSUME_LATER 类型if (null == status) {status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}// 4、对返回的 status 结果进行处理ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
}

什么?Consumer 中的监听回调方法是什么意思?

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("your_topic");// .... 省略部分代码// 1、设置监听回调方法
consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {try {System.out.println(result);// 2、返回成功表示消费成功,不会进行重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (UnsupportedEncodingException e) {e.printStackTrace();// 3、返回 RECONSUME_LATER 表示消息需要重试(返回NULL也是一样)// RECONSUME_LATER:通过单词我们知道是 稍后重新消费的意思,即重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}
});

回调方法就是上面你写的那个匿名类嘛。我猜您肯定知道的啦,真谦虚 (ー̀дー́)

3、根据返回的 status 判断是否需要重试

public void processConsumeResult(final ConsumeConcurrentlyStatus status,final ConsumeConcurrentlyContext context,final ConsumeRequest consumeRequest
) {int ackIndex = context.getAckIndex();switch (status) {// 1、消费成功case CONSUME_SUCCESS:if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1;}break;// 2、消费延迟case RECONSUME_LATER:ackIndex = -1;break;default:break;}// 3、针对不同的消息模式做不同的处理switch (this.defaultMQPushConsumer.getMessageModel()) {// 4、广播模式:如果消费是爱 ackIndex 为-1就会执行循环,可以看到只是打印日志,没有其它多余的操作case BROADCASTING:for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());}break;// 5、集群模式case CLUSTERING:List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());// 6、RECONSUME_LATER 时,ackIndex 为-1,执行循环。CONSUME_SUCCESS 时不会执行循环for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);// 7、能到这里说明是 RECONSUME_LATER 状态:回退Msg到Broker,也就是ACK(重试)boolean result = this.sendMessageBack(msg, context);// 8、ACK 可能会失败,需要记录失败的ACKif (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);msgBackFailed.add(msg);}}if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);// 9、存在ACK 失败的消息,将消息丢到线程池延迟 5s 重新消费this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;}// 10、更新消费的偏移量:注意这里 CONSUME_SUCCESS 和 RECONSUME_LATER 都会更新long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}
}

根据上面源码我们可以得出以下结论:

1、由第4步我们可知:广播模式 就算消费者消费失败,也不会进行重试,只是打印警告日志。

2、只有消费失败(没有返回 CONSUME_SUCCESS 都成为失败)的消息才需要发送ACK重试

3、如果ACK失败,(总感觉这里ACK叫起来怪怪的,《RocketMQ技术内幕》中成为ACK失败),我们叫重试失败吧。

​ 如果重试失败,就会继续被延迟5s重新消费(又会回调到Consumer中的回调方法)

4、消息被消费成功、失败,都会更新Consumer 的偏移量

4、ConsumeMessageConcurrentlyService.sendMessageBack:准备请求Broker

public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {// 1、注意这里:默认为0,其实一直都是0,其它地方没有修改。这表示RocketMQ延迟消息的 延迟级别int delayLevel = context.getDelayLevelWhenNextConsume();try {// 2、发送给Brokerthis.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());return true;} catch (Exception e) {log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);}return false;
}

什么?你不知道RocketMQ延迟消息的 延迟级别是啥意思? T_T"

RocketMQ官网延迟例子

我们知道RocketMQ延迟级别分为18级,delayLevel从1-18,每个数字对应一个延迟的时间。

延迟时间如下:

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

比如:delayLevel=1,表示延迟1s

​ 那 delayLevel=4,就是延迟30s的意思? 呀,你还学会了抢答,就是这个意思。你真聪明。 (o゚▽゚)o

Broker端对重试的处理

以下代码设计到Broker的源码,读者需要下载RocketMQ源码才看得到。

这个方法就是处理Consumer的重试请求的代码,方法中代码比较长。主要做了以下几个事:

  1. 更消息的 Topic 为 "%RETRY%"+ group,计算queueId(重试队列,队列数为1)
  2. 如果消息重试 >= 16次(默认)。继续更改消息的Topic 为死信队列的Topic:"%DLQ%" + group,消费队列为1(死信队列只有一个消费队列)
  3. 如果没有变成死信,计算消息的延迟级别
  4. 复制原来Msg,重新生成一个Msg,将新Msg丢给BrokerController中,然后存到CommitLog中进行存储(什么?你不知道什么是CommitLog? 下期写一篇RocketMQ内部存储结构)
    • 新的Msg 会有新的messageId
    • 非死信:该消息以新的Topic名:"%RETRY%"+ group 存到CommitLog中作为延迟消息
    • 死信:以"%DLQ%" + group为Topic名,存到CommitLog中:存到死信队列中的消息不会被Consumer消费了
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request){// 1、新的Topic名:"%RETRY%"+ groupString newTopic = MixAll.getRetryTopic(requestHeader.getGroup());// 重试队列数为1int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();// 2、都是为0int delayLevel = requestHeader.getDelayLevel();// 3、消息重试次数:重试几次这里存的就是低几次int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();// 4、如果超过最大重试次数(默认为16)if (msgExt.getReconsumeTimes() >= maxReconsumeTimes|| delayLevel < 0) {// 5、更改Topic 名为死信队列名:"%DLQ%" + groupnewTopic = MixAll.getDLQTopic(requestHeader.getGroup());// 6、默认死信队列数为1queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;} else {// 7、delayLevel 其实都为0,所以这里就相当于是重试次数 +3if (0 == delayLevel) {delayLevel = 3 + msgExt.getReconsumeTimes();}msgExt.setDelayTimeLevel(delayLevel);}// 8、新建消息,准备存到CommitLog中作为新消息MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(newTopic);msgInner.setQueueId(queueIdInt);// 8-1、重试次数+1。新消息被消费者消费时就会传上来,到第4步进行比较msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);// 9、作为新消息存到CommitLog中PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}

什么是死信队列(DLQ队列)?

参考博客,就不造轮子了

总结就是:

  • Broker中单独的一个队列(DLQ),该队列存储了Consumer端重试16次后都没成功消费的消息
  • 该队列:只有写权限,没有读权限。所以是不能被Consumer重新消费的,只能进行人工干预,重新投递(Rocket-MQ-Console 中可以操作)
  • DLQ队列中,该消息的TOPIC 重新被命名为: "%DLQ%" + groupName
  • DLQ队列其实就是(consumequeue文件夹的"%DLQ%" + groupName 命名的Topic文件夹下的队列)

什么?consumequeue 文件夹是什么鬼?等我…,马上就写一起RocketMQ消息存储结构你就清楚了

重试消息延时机制

我们说重试消息发到Broker后,被作为一个新的延迟消息存到了CommitLog中,当该消息到了消费时间点是会被Consumer重新消费的。

消息重试16次才会被丢到 死信队列中,才不会被消费了。

那其余15次消息每次延迟是延迟多久呢?

我们在上面的源码其实可以看得出:消息的延迟级别是受重试次数(reconsumeTimes)影响的。重试次数越大,延迟越久。

delayLevel = 3 + msgExt.getReconsumeTimes();

具体的重试延迟时间如下:图片来自阿里云

总结

我们回到我们刚开始的几个问题:

  • 消息重试是什么意思?
  • Consumer 消费消息分为集群模式(Cluster)、广播模式(Broadcast),两种模式都会进行消息重试吗?
  • 消息重试的策略是什么?
  • 消息重试的延迟时间规则?
  • 什么叫死信队列?有什么特点?
  • Msg加入死信队列的条件是什么?

消息重试是什么意思?

RocketMQ为了保证高可用,如果Consumer消费消息失败(回调函数没有返回 CONSUME_SUCCESS)就需要重新让消费者消费该条消息。

Consumer 消费消息分为集群模式(Cluster)、广播模式(Broadcast),两种模式都会进行消息重试吗?

广播模式只会以警告日志的形式记录消费失败的消息,并不会重试

集群模式才会执行消息的重试机制。

消息重试的策略是什么?

Broker 端采用延迟消息的方式,供Consumer再次消费。

消息重试的延迟时间规则?

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xRNGr03y-1628575923634)(imgs\消息重试机制\msg重试时间.png)]

Msg加入死信队列的条件是什么?

消息重试16次后,Consumer 还未消费成功。

最后

若有错误,欢迎各位爷在评论区不吝赐教。后续会继续更新RocektMQ相关文章,欢迎大家在评论区留言呐~

后续最新的文章会先更新到微信上,欢迎来骚扰鸭~ (ノ゚▽゚)ノ

RocketMQ重试机制(ACK确认机制)相关推荐

  1. TCP的状态:SYN, FIN, ACK, PSH, RST, URG 简介及 ACK确认机制

    1.TCP的状态FLAGS字段状态 在TCP层,有个FLAGS字段,这个字段有以下几个标识:SYN, FIN, ACK, PSH, RST, URG. 对于我们日常的分析有用的就是前面的五个字段:它们 ...

  2. rabbitmq消息ACK确认机制及发送失败处理

    rabbitmq为确保消息发送和接收成功,采用ack机制. (1)生产者producter发送消息到mq时,mq会发送ack给producter告知消息是否投递成功: (2)消费者consumer接收 ...

  3. 以修改注册表的方式避免ACK确认机制带来的延时现象

    TCP本身属面向链接的通讯协议.通讯双方的每一个收发动作,需要以通讯链路正常为前提.因此TCP协议内部提供了默认的ACK验证机制. 假定A.B之间存在一条TCP通讯链路,某一时刻A第一次向B发送数据, ...

  4. rabbitmq消息队列 ack机制(消息确认机制)和消息补偿机制

    参考:https://blog.csdn.net/pan_junbiao/article/details/112956537 ack 机制就是消息在 生产者在发布消息以后,消息存在内存中,如果消息被确 ...

  5. Storm编程入门API系列之Storm的可靠性的ACK消息确认机制

    概念,见博客 Storm概念学习系列之storm的可靠性  什么业务场景需要storm可靠性的ACK确认机制? 答:想要保住数据不丢,或者保住数据总是被处理.即若没被处理的,得让我们知道. publi ...

  6. RocketMQ 重试机制

    2019独角兽企业重金招聘Python工程师标准>>> RocketMQ 重试机制 消息重试分为2种:Producer端重试和Consumer端重试. Producer端重试 生产者 ...

  7. TCP的ACK原理和延迟确认机制

    原文地址:https://blog.csdn.net/gamekit/article/details/53898802 一.ACK定义 TCP协议中,接收方成功接收到数据后,会回复一个ACK数据包,表 ...

  8. Java短信确认机制_JAVA 消息确认机制之 ACK 模式

    JAVA 消息确认机制之 ACK 模式 CLIENT_ACKNOWLEDGE : 客户端手动确认, 这就意味着 AcitveMQ 将不会 "自作主张" 的为你 ACK 任何消息, ...

  9. RabbitMQ ACK消息确认机制 快速入门

    RabbitMQ 消息确认机制ACK ack机制保证的是broker和消费者之间的可靠性 ack表示的是消费端收到消息后的确认方式,有三种确认方式 自动确认:acknowledge="non ...

最新文章

  1. wince工业平板电脑_如何防止工业平板电脑温度过高?
  2. 【转载】分布式事务 介绍
  3. 7月1日 cf总结
  4. 如何安全使用计算机,如何安全的使用计算机
  5. android同步方法和对象的区别是什么,(4.1.10.8)Android Handler之同步屏障机制(sync barrier)...
  6. 资源过于硬核,8h删!这波福利....请笑纳~
  7. Django框架基础之session
  8. python商业爬虫学徒计划_(教程)下载:麻瓜编程Python商业爬虫学徒计划麻瓜编程的视频python办公自动化麻瓜...
  9. 图书管理系统数据库SQL设计思路
  10. js 实现删除确认提示框
  11. 氚云SaaS介绍文档
  12. 设计师的“通天塔”—浅谈设计沟通
  13. 服务器是Windows Server 2003 出现svchost.exe错误对话框的问题解决
  14. 预报精准的天气查询APP开发原理是什么
  15. 摩拜+小程序,让单车变得“触手可骑”
  16. Python3版本Django实现免费手机验证码注册
  17. android 相册分组,Android获取相册路径
  18. 当BERT遇上搜索引擎
  19. Javaweb之核心技术(绘话技术)
  20. 航天广电广播系统服务器调试,广播系统应该做哪些调试

热门文章

  1. 第八届山东省赛题 I Parity check 【找规律】
  2. Vue-cli 打包CSS、JS找不到路径问题,解决方案
  3. 什么是跨域?一次性带你理解透
  4. Go test 命令行参数
  5. storm the 少儿英语_storm是什么意思_storm在线翻译_英语_读音_用法_例句_海词词典...
  6. java实现table斜线,诸位大神 ,怎样在table的 td中添加 一条斜线
  7. 小说作者推荐:忘却的悠合集
  8. 电子厂计算机常用英语,电子厂常用英语
  9. Java 随机点名器
  10. 【USACO10HOL】 Cow Politics