RocketMQ源码解析之延迟消息实现原理
原创不易,转载请注明出处
文章目录
- 前言
- 1.延时消息的demo
- 2.实现的原理
前言
今天要谈论的话题其实非常轻松,但是我们有些业务场景是离不开它的,其实说到延迟消息,不知道大家有没有想到它的业务场景,我这里说几个,比如我们购物下单,然后多长时间没有付款就会取消,或者我们下单打车,多长时间没有调度到车订单取消,不知道大家怎样实现这些业务场景,比如说我一个线程,然后一直扫订单表,扫出过期的订单啥的,但是这种方式会有一个问题,那就是订单量大的时候会给数据库带来很大的压力,而且很low,这种场景我们完全可以交给RocketMQ的延迟消息来处理。本文将从RocketMQ的延迟消息demo讲起,然后解释下其实现原理。
1.延时消息的demo
这里我们直接写一个RocketMQ消息生产者发送延时消息的一个demo
public class ScheduledMessageProducer {public static void main(String[] args) throws Exception {// 实例化一个生产者来产生延时消息DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");// 启动生产者producer.start();int totalMessagesToSend = 100;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)message.setDelayTimeLevel(3);// 发送消息producer.send(message);}// 关闭生产者producer.shutdown();}
}
其实这个demo非常简单,与发送普通消息区别不大,唯一区别的就是需要你在Message里面设置延迟等级,默认的延迟等级与时间对应关系如下
"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
也就是延迟等级1级对应的是1s,2级是5s,以此类推。如果你想改变这个等级的话,你可以设置broker的messageDelayLevel
参数,s代表秒,m是分钟,h是小时,这里有点要注意的是,你消息消费失败重试也是要走延迟消息的,这块你需要注意下,然后别改变里延迟消息规则,然后造成失败重试有问题,默认失败重试规则就是3+失败重试次数的延时等级
2.实现的原理
实现原理其实挺简单的,在消息生产者端,你设置这延迟消息等级,其实就是往消息的property设置一个DELAY的key与value值,key就是DELAY,然后value 就是你延迟的等级,这就是消息生产者端要做的事情,接下来介绍下broker端的实现原理。
在broker 端,收到一个消息,在往commitlog进行putMessage的时候,也就是追加写入消息的时候,会检查这个消息的延迟等级,也就是下面这端代码
会将你的topic设置成SCHEDULE_TOPIC_XXXX
,然后queueId设置成你延时等级-1 ,其实你有没有发现,他的实现与事务消息的差不多,都是修改topic 与queueId。
同时,在broker启动的时候,它会启动一个定时任务,定义一个timer,设置几个定时任务(定时任务的数量与延迟等级是有关系的,一个任务对应一个定时任务)不断的扫SCHEDULE_TOPIC_XXXX
这个topic的消息。
接着我们看下这scheduleMessageService 的start方法的实现
其实这个start方法就是遍历这延时等级表,然后往timer里面添加延时任务,最后创建一个定时任务,每10s执行一次持久化任务。这个持久化任务其实不用担心,就是往磁盘里面写下,延时等级对应的offset。
接着我们看下DeliverDelayedMessageTimerTask
这个任务的 实现
可以看到run方法里面执行了executeOnTimeup
这个方法
/// 取出consumeQueueConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));long failScheduleOffset = offset;if (cq != null) {/// 取出SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);if (bufferCQ != null) {try {long nextOffset = offset;int i = 0;ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();///遍历取for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {long offsetPy = bufferCQ.getByteBuffer().getLong();int sizePy = bufferCQ.getByteBuffer().getInt();long tagsCode = bufferCQ.getByteBuffer().getLong();if (cq.isExtAddr(tagsCode)) {if (cq.getExt(tagsCode, cqExtUnit)) {tagsCode = cqExtUnit.getTagsCode();} else {//can't find ext content.So re compute tags code.log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",tagsCode, offsetPy, sizePy);// 消息存入时间long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);// 计算交付时间tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);}}long now = System.currentTimeMillis();// 计算正确的交付时间戳long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);long countdown = deliverTimestamp - now;if (countdown <= 0) {// 拿出消息MessageExt msgExt =ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);if (msgExt != null) {try {///转成MessageExtBrokerInnerMessageExtBrokerInner msgInner = this.messageTimeup(msgExt);/// 写到 commitlog中PutMessageResult putMessageResult =ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);if (putMessageResult != null&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {continue;} else {// XXX: warn and notify me 失败log.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",msgExt.getTopic(), msgExt.getMsgId());// 这么做主要是为了防止os cache 繁忙ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,/// 延迟一段时间 10000L = 10snextOffset), DELAY_FOR_A_PERIOD);/// 更新 这个等级的offsetScheduleMessageService.this.updateOffset(this.delayLevel,nextOffset);return;}} catch (Exception e) {/** XXX: warn and notify me*/log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt="+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="+ offsetPy + ",sizePy=" + sizePy, e);}}} else {// 重新添加任务ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),countdown);// 更新offsetScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;}} // end of fornextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(//100msthis.delayLevel, nextOffset), DELAY_FOR_A_WHILE);// 更新ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;} finally {bufferCQ.release();}} // end of if (bufferCQ != null)else {// 找出最小的long cqMinOffset = cq.getMinOffsetInQueue();if (offset < cqMinOffset) {failScheduleOffset = cqMinOffset;log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="+ cqMinOffset + ", queueId=" + cq.getQueueId());}}} // end of if (cq != null)ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,failScheduleOffset), DELAY_FOR_A_WHILE);
这个方法首先是获取对应延迟等级的consumeQueue这个队列,取出offset往后的消息,进行遍历,找出每个unit对应的commitlog真实offset,然后通过commitlog offset 从commitlog获取到真实的那个消息,根据它的存储实现与延迟时间,算出与真实交付时间的差值,如果是小于等于0的话,说明延迟时间到了,这个时候就要暴露给消息消费者了,它就会将topic与queueId转成之前的那个topic queueId,然后重新扔到commitlog中,这个时候通过reput线程的dispatch处理,消息消费者就能发现这个消息并消费,如果这个差值还不够的话,重新创建调度任务,然后延迟执行时间是这个差值,扔到timer中。
RocketMQ源码解析之延迟消息实现原理相关推荐
- RocketMQ源码分析之延迟消息
文章目录 前言 一.延迟消息 1.特点 2.使用场景 3.demo 二.发送延迟消息 三.broker端存储延迟消息 四.总结 1.延迟消息工作原理 2.延迟消息在消费者消费重试中的应用 前言 本篇文 ...
- RocketMQ源码解析-PullConsumer取消息(1)
PullConsumer取消息需要自己手动调用Consumer的pull方法主动拉取消息.需要的参数有具体的消息队列(调用消费者的fetchSubscibeMessageQueue()可以得到相应to ...
- RocketMQ源码解析-PullConsumer取消息(2)
如果在调用DefaultMQPullConsumer的pull方法的时候添加了pullcallback参数,那么就会调用DefaultMQPullConsumerImpl的pullAsyncImpl( ...
- RocketMQ源码解析之消息消费者(consume Message)
原创不易,转载请注明出处 文章目录 前言 1.消息流程介绍 2.源码解析 2.1 并发消费 2.2 顺序消费 前言 我们在<RocketMQ源码解析之消息消费者(pullMessage)> ...
- 6、RocketMQ 源码解析之 Broker 启动(上)
上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...
- RocketMQ源码解析之broker文件清理
原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...
- 4、Eureka 源码解析 之 Eureka Client 启动原理分析
在前面的一篇文章 3.Eureka 源码解析 之 Eureka Server 启动原理分析当中我们分析了一下 Eureka Server 的启动.在集群环境下 Eureka Server 相互之前需要 ...
- RocketMQ源码解析:Filtersrv
???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...
- rocketmq源码解析之name启动(一)
2019独角兽企业重金招聘Python工程师标准>>> 说在前面 主要解析namrsrv启动部分,namesrv配置加载.netty server创建.注册出处理器. 正文 源码解析 ...
最新文章
- 看完秒懂大数据用户画像!
- mysql商品管理系统总结_Mysql管理总结
- html table 筛选记录,JS实现table表格内针对某列内容进行即时搜索筛选功能
- [css] 用CSS画出一个任意角度的扇形,可以写多种实现的方法
- bootstrapt学习指南_TensorFlow 2.0深度强化学习指南
- 【算法1-2】排序(今天刷洛谷了嘛)
- 软件测试中python实用技巧,精选22个Python实用技巧,秀技能必备这份技术列表!...
- SQL 字段保留下划线后部分
- PPT设计制作与美化
- 123457123457#0#----com.MC.konglongtianse222----前拼后广--恐龙填色mc-mc1111
- TI - MCU - MSP430使用指南1 - MSP430简介及选型指南
- 华强北airpods三代连接安卓手机没声音_安卓手机体验华强北的顶配AirPods,“翻车”还是真香?...
- Markdown文档数学公式的使用
- 用互联网思想武装自己---雷军
- 2010年《杨卫华谈微博架构》视频摘抄
- 洛谷P1080 [NOIP2012 提高组] 国王游戏
- android打地鼠设计报告,android开发中利用handler制作一个打地鼠小游戏
- linux,rpm, tar, gz, bz, bz2, rar, zip, lha, deb, 解压
- 最新WEB前端从入门到资深专家全套项目实战(完整)
- vim使用技巧-如何暂时返回终端
热门文章
- python picklable
- 微信html5页面开发教程,微信网页开发,如何在H5页面中设置分享的标题,内容以及缩略图...
- 《倍增商业成功宝典》全新升级上线!炙夏新品,久等终至!
- 寂静之地百度云在线观看迅雷下载A Quiet Place高清BT下载
- matlab数学实验报告syms,MATLAB验练习题(计算机) 南邮 MATLAB 数学实验大作业答案
- 多元线性回归分析学习笔记
- 基于STM32F103RC的OneNET云端智能家居环境控制系统
- 学习php的步骤是什么?
- 【车载开发系列】UDS诊断---链接控制服务($0x87)
- 基于 SpringBoot + Vue 的智能停车场管理平台