RocketMQ 延迟消息(定时消息)4.9.3 版本优化 异步投递支持
1. 概述
在 RocketMQ 4.9.3 版本中,@Git-Yang 对延迟消息做了很大的优化,大幅度提升了延迟消息的性能。
其中,PR#3287 将原先用来启动周期性任务的 Timer
改为使用 ScheduledExecutorService
,将多延迟等级下同时发送延迟消息的性能提升了 3+ 倍。
本文主要讲解的是另一个改动 PR#3458:支持延迟消息的异步投递。老版本中,延迟消息到期投递到 CommitLog 的动作是同步的,在 Dledger 模式下性能很差。新的改动将延迟消息的到期投递模式改为可配置,使用 BrokerConfig 的 enableScheduleAsyncDeliver
属性进行配置。改成异步投递后,在 Dledger 下的性能提升了 3 倍左右。
本文着重讲解定时消息异步投递的逻辑,老版本的延迟消息流程和源码解析可以看这篇文章:RocketMQ 延迟消息(定时消息)
2. 改动解析
2.1 将多延迟等级延迟消息扫描和投递的任务从单线程执行改为多线程
这个改动将延迟消息的任务调度器从 Timer
改为 ScheduledExecutorService
。
在老版本中,所有 18 个延迟等级的定时消息扫描和投递任务都是由一个 Timer
启动定时任务执行的。Timer
中所有定时任务都是由一个工作线程单线程处理的,如果某个任务处理慢了,后续有新的任务进来,会导致新的任务需要等待前一个任务执行结束。
改为 ScheduledExecutorService
线程池之后多线程处理任务,可以大幅度提高延迟消息处理速度,并且避免多延迟等级消息同时发送时造成的阻塞。
改动后的性能变化,出处:https://github.com/apache/rocketmq/issues/3286
改动前,同时向 4 个延迟等级发送延迟消息,TPS: 657
改动后,同时向4个延迟等级发送延迟消息,TPS: 2453
2.2 支持延迟消息异步投递,提升 Dledger 模式下的投递性能
原本的定时消息投递为单线程同步投递,在 DLedger 模式下存在性能瓶颈。
因为在 DLedger 模式下,主节点的角色会变为 SYNC_MASTER,同步复制。即需要足够多的从节点存储了该消息后,才会向主节点返回写入成功。
本次改动将延迟消息的写入改成可配置同步或异步写入,异步写入在 DLedger 模式下性能提升了 3 倍左右。
2.2.1 异步投递的注意点
异步投递的两个主要缺点是
- 无法保证消息投递的顺序
- 消息可能重复投递
异步投递的注意点
需要做流控,当写入 TPS 过高时,页缓存可能会繁忙;甚至节点内存会被打爆。
可能存在消息可能丢失的情况,比如投递时页缓存繁忙或者其他原因导致一次投递失败。这时候的处理是对失败消息进行重新投递,重试 3 次失败后,阻塞当前延迟等级对应的线程,直到重新投递成功。
2.2.2 异步投递逻辑
首先回顾一下同步投递的逻辑:每个延迟等级都分配一个线程,不断启动任务去扫描该等级对应的消费队列中是否有到期的消息。如果有则将到期的消息一个个同步投递,投递成功后更新该等级对应的 offset,下个任务从该 offset 开始扫描新的消息。
异步投递的逻辑相比于同步投递有一些不同:
异步投递采用了生产-消费模式,生产和消费的对象是异步投递的任务。生产者线程负责将到期的消息创建投递任务,消费者消费这些任务,根据任务的执行状态来更新 offset 或者重试。
这里引入了一个阻塞队列作为异步投递任务的容器,阻塞队列的大小可以配置,表示可以同时投递的消息数。当队列中投递任务满时触发流控。
将对应延迟等级的消息异步投递时,需要将异步投递的任务放入处理队列。此时,可能由于流控等原因,投递任务未能放入队列,那么等待一会后再次执行扫描-投递逻辑。
消息并不会直接投递成功,所以需要消费者线程从队列中消费并判断这些异步投递任务的状态。如果投递任务已完成,则更新 offset;如果投递异常,则等待一会后重新同步投递;投递成功则更新 offset,投递失败则继续重试。
3. 异步投递详解
延迟消息的投递逻辑全部在 ScheduleMessageService
类中。
下面以一个延迟等级的处理为例,用图展示一下消息投递线程和任务更新线程的工作流程。
左边是定时消息到期投递线程,右边是投递过程状态更新线程。
3.1 定时消息投递线程
延迟消息投递服务中维护了一个 offset 表offsetTable
,表示每个延迟等级当前投递的消息在 ConsumeQuque 中的逻辑 offset。
它用来在关机恢复时标明扫描开始位置,所以这个表会定期持久化到磁盘中,并且从节点会定期从主节点拉去该表的最新值。
延迟消息处理服务启动时会在 deliverExecutorService
线程池为每个延迟等级创建并执行一个 DeliverDelayedMessageTimerTask
任务,这个任务并不是周期性任务,而是在一个任务的末尾执行下一个任务。这个任务的 executeOnTimeup()
方法即消息投递的逻辑。上图展示的就是该方法中的逻辑。
- 获取该等级的 ConsumeQueue,依次扫描消息是否到期
- 如果消息到期,从 CommitLog 中查出该消息的完整信息,从属性中恢复它的真实 Topic 和 QueueId,然后投递。(根据配置同步或者异步投递,这里按异步讲解)
- 异步消息投递后,投递的过程被放入阻塞队列
deliverPendingTable
- 如果放入队列失败,表示此时出现流控或者阻塞,需要等待一会然后重新投递
- 如果全部投递成功,将 offset 更新为当前投递消息的 offset + 1,表示下一次从下一个 offset 开始扫描
3.2 投递过程状态更新线程
每个延迟等级在 handleExecutorService
线程池中启动了一个状态更新线程,每个线程执行 HandlePutResultTask
任务。同样,这个任务不是周期性任务,而是一个任务末尾启动一个新的任务。
HandlePutResultTask
任务不断从阻塞队列头部获取异步投递过程对象,判断其状态
- 如果投递成功,更新 offset 和统计数据,并从队列中移除投递任务
- 如果投递中,无动作
- 如果投递错误,根据是否配置自动重试来执行重试或者直接跳过
- 重试投递时采用同步投递,投递成功则更新 offset 和统计数据,然后移除;否则继续重新投递
全部任务扫描完毕后等待一会,执行新的HandlePutResultTask
任务。
4. 源码解析
4.1 定时消息投递任务
public void executeOnTimeup() {// 根据delayLevel查找对应的延迟消息ConsumeQueueConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));if (cq == null) {this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);return;}// 根据ConsumeQueue的有效延迟消息逻辑offset,获取所有有效的消息SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);if (bufferCQ == null) {long resetOffset;if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",this.offset, resetOffset, cq.getQueueId());} else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",this.offset, resetOffset, cq.getQueueId());} else {resetOffset = this.offset;}this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);return;}long nextOffset = this.offset;try {int i = 0;ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();// 遍历ConsumeQueue中的所有有效消息for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {// 获取ConsumeQueue索引的三个关键属性long offsetPy = bufferCQ.getByteBuffer().getLong();int sizePy = bufferCQ.getByteBuffer().getInt();long tagsCode = bufferCQ.getByteBuffer().getLong();if (cq.isExtAddr(tagsCode)) {if (cq.getExt(tagsCode, cqExtUnit)) {tagsCode = cqExtUnit.getTagsCode();} else {//can't find ext content.So re compute tags code.log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",tagsCode, offsetPy, sizePy);long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);}}// ConsumeQueue里面的tagsCode实际是一个时间点(投递时间点)long now = System.currentTimeMillis();long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);// 如果现在已经到了投递时间点,投递消息// 如果现在还没到投递时间点,继续创建一个定时任务,countdown秒之后执行long countdown = deliverTimestamp - now;if (countdown > 0) {this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);return;}MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);if (msgExt == null) {continue;}MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",msgInner.getTopic(), msgInner);continue;}// 重新投递消息到CommitLogboolean deliverSuc;if (ScheduleMessageService.this.enableAsyncDeliver) {// 异步投递deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);} else {// 同步投递deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);}// 投递失败(流控、阻塞、投递异常等原因),等待0.1s再次执行投递任务if (!deliverSuc) {this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);return;}}nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);} catch (Exception e) {log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);} finally {bufferCQ.release();}// 该条ConsumeQueue索引对应的消息如果未到投递时间,那么创建一个定时任务,到投递时间时执行// 如果有还未投递的消息,创建定时任务后直接返回this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}
private boolean asyncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,int sizePy) {Queue<PutResultProcess> processesQueue = ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);//Flow Control 流控,如果阻塞队列中元素数量大于阈值则触发流控int currentPendingNum = processesQueue.size();int maxPendingLimit = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().getScheduleAsyncDeliverMaxPendingLimit();if (currentPendingNum > maxPendingLimit) {log.warn("Asynchronous deliver triggers flow control, " +"currentPendingNum={}, maxPendingLimit={}", currentPendingNum, maxPendingLimit);return false;}//Blocked 阻塞,如果有一个投递任务重试 3 次以上,阻塞该延迟等级的消息投递,直到该任务投递成功PutResultProcess firstProcess = processesQueue.peek();if (firstProcess != null && firstProcess.need2Blocked()) {log.warn("Asynchronous deliver block. info={}", firstProcess.toString());return false;}PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, true);processesQueue.add(resultProcess);return true;
}
4.2 异步投递过程状态更新任务
public void run() {LinkedBlockingQueue<PutResultProcess> pendingQueue =ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);PutResultProcess putResultProcess;// 循环获取队列中第一个投递任务,查看其执行状态并执行对应操作while ((putResultProcess = pendingQueue.peek()) != null) {try {switch (putResultProcess.getStatus()) {case SUCCESS:// 消息投递成功,从队列中移除该投递任务ScheduleMessageService.this.updateOffset(this.delayLevel, putResultProcess.getNextOffset());pendingQueue.remove();break;case RUNNING:// 正在投递,不做操作break;case EXCEPTION:// 投递出错if (!isStarted()) {log.warn("HandlePutResultTask shutdown, info={}", putResultProcess.toString());return;}log.warn("putResultProcess error, info={}", putResultProcess.toString());// onException 方法执行重试putResultProcess.onException();break;case SKIP:// 跳过,直接从队列中移除log.warn("putResultProcess skip, info={}", putResultProcess.toString());pendingQueue.remove();break;}} catch (Exception e) {log.error("HandlePutResultTask exception. info={}", putResultProcess.toString(), e);putResultProcess.onException();}}// 等待0.01s,继续下一次扫描if (isStarted()) {ScheduleMessageService.this.handleExecutorService.schedule(new HandlePutResultTask(this.delayLevel), DELAY_FOR_A_SLEEP, TimeUnit.MILLISECONDS);}
}
private void resend() {log.info("Resend message, info: {}", this.toString());// Gradually increase the resend interval.try {Thread.sleep(Math.min(this.resendCount++ * 100, 60 * 1000));} catch (InterruptedException e) {e.printStackTrace();}try {// 从 CommitLog 中查询消息完整信息MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(this.physicOffset, this.physicSize);// 如果查询失败,检查重试次数,如果到达 6 次则打印日志并跳过该消息if (msgExt == null) {log.warn("ScheduleMessageService resend not found message. info: {}", this.toString());this.status = need2Skip() ? ProcessStatus.SKIP : ProcessStatus.EXCEPTION;return;}MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);// 同步投递PutMessageResult result = ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);// 根据结果更新状态this.handleResult(result);if (result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK) {log.info("Resend message success, info: {}", this.toString());}} catch (Exception e) {this.status = ProcessStatus.EXCEPTION;log.error("Resend message error, info: {}", this.toString(), e);}
}
RocketMQ 延迟消息(定时消息)4.9.3 版本优化 异步投递支持相关推荐
- ActiveMQ—消息特性(延迟和定时消息投递)
ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article ...
- telegram定时消息_ActiveMQ(18):Message之延迟和定时消息投递
一.简介 延迟和定时消息投递(Delay and Schedule Message Delivery) 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消 ...
- rocketmq 重复消费_消息队列 RocketMQ
引言 本文整理了RocketMQ的相关知识,方便以后查阅. 功能介绍 简单来说,消息队列就是基础数据结构课程里"先进先出"的一种数据结构,但是如果要消除单点故障,保证消息传输的可靠 ...
- 阿里云ONS / RocketMQ的定时消息 / 延时消息
考虑延时和定时消息,是因为遇到了一个业务场景: 前置任务完成时发送消息,但因为一些业务原因,不希望消息马上被消费,因此需要设置延时. 文章目录 几种解决思路 实现方案 ONS延迟消息 RocketMQ ...
- 深入理解RocketMQ延迟消息
延迟消息是实际开发中一个非常有用的功能,本文第一部分从整体上介绍秒级精度延迟消息的实现思路,在第二部分结合RocketMQ的延迟消息实现,进行细致的讲解,点出关键部分的源码.第三步介绍延迟消息与消息重 ...
- 记录Rocketmq定时消息不消费问题的排查过程
记录Rocketmq定时消息不消费问题的排查过程 写在前面 问题根源初步确认 问题根源再次确认. 了解rocketmq定时消息原理以及处理过程. 源码调试 结论求证 后记 写在前面 此本记录了一个项目 ...
- RocketMQ延迟消息
延时消息即消息发送后并不立即对消费者可见,而是在用户指定的时间投递给消费者.比如我们现在发送一条延时30秒的消息,消息发送后立即发送给服务器,但是服务器在30秒后才将该消息交给消费者. RocketM ...
- 面试常问Rocketmq延迟消息原理
延迟消息在业务场景中使用的非常多,订单失效,过期通知等功能都可以借助延迟消息机制来实现.本文将从源码层面来分析Rocketmq的延迟消息实现原理机制. 一.延迟消息的使用 ...
- 【RocketMQ工作原理】消息堆积与消费延迟
概念 消息处理流程中,如果Consumer的消费速度跟不上Producer的发送速度,MQ中未处理的消息会越来 越多(进的多出的少),这部分消息就被称为堆积消息.消息出现堆积进而会造成消息的消费延迟. ...
- 厚积薄发--一文带您了解阿里云 RocketMQ 轻量版消息队列(MNS)
作者: 周新宇&陈涛&李凯 阿里云 RocketMQ 轻量版(MNS)消息队列是一个轻量.可靠.可扩展且完全托管的分布式消息队列服务.MNS 能够帮助应用开发者在他们应用的分布式组件上 ...
最新文章
- 辞去美国终身教职回国的帅教授,拟增列为顶尖985大学博导
- 你的简历写了 “熟悉” zookeeper ?那这些你会吗?
- 从洗牌算法谈起--Python的random.shuffle函数实现原理
- css涟漪光圈扩散_CSS动画实例:圆的涟漪扩散
- 信息系统项目管理师历年论文题目
- javascript --- 手写Promise、快排、冒泡、单例模式+观察者模式
- 11. IDEA 在同一工作空间创建多个项目
- node.js npm常用命令
- python编程考试_《Python程序设计》试题库
- MPLS ××× Carrier Supporting Carrier Option AB(二)
- Material Design基础
- 103-PHP定义一个类
- [网络安全自学篇] 八十八.基于机器学习的恶意代码检测技术详解
- 小红书用户画像分析_棋牌游戏如何做好用户画像分析?
- GBK 与GB2312 互查 区位码
- 用python画机器猫--哆啦A梦,开干!
- git推送不能完全退出错误
- UESTC ACM训练题二
- toAppendStream doesn‘t support consuming update and delete changes which is produced by node XXX
- 华为鸿蒙删除视频报错rm: local.mp4: Owner died