原创不易,转载请注明出处

文章目录

  • 前言
  • 1.延时消息的demo
  • 2.实现的原理

前言

今天要谈论的话题其实非常轻松,但是我们有些业务场景是离不开它的,其实说到延迟消息,不知道大家有没有想到它的业务场景,我这里说几个,比如我们购物下单,然后多长时间没有付款就会取消,或者我们下单打车,多长时间没有调度到车订单取消,不知道大家怎样实现这些业务场景,比如说我一个线程,然后一直扫订单表,扫出过期的订单啥的,但是这种方式会有一个问题,那就是订单量大的时候会给数据库带来很大的压力,而且很low,这种场景我们完全可以交给RocketMQ的延迟消息来处理。本文将从RocketMQ的延迟消息demo讲起,然后解释下其实现原理。

1.延时消息的demo

这里我们直接写一个RocketMQ消息生产者发送延时消息的一个demo


public class ScheduledMessageProducer {public static void main(String[] args) throws Exception {// 实例化一个生产者来产生延时消息DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");// 启动生产者producer.start();int totalMessagesToSend = 100;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)message.setDelayTimeLevel(3);// 发送消息producer.send(message);}// 关闭生产者producer.shutdown();}
}

其实这个demo非常简单,与发送普通消息区别不大,唯一区别的就是需要你在Message里面设置延迟等级,默认的延迟等级与时间对应关系如下

"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

也就是延迟等级1级对应的是1s,2级是5s,以此类推。如果你想改变这个等级的话,你可以设置broker的messageDelayLevel参数,s代表秒,m是分钟,h是小时,这里有点要注意的是,你消息消费失败重试也是要走延迟消息的,这块你需要注意下,然后别改变里延迟消息规则,然后造成失败重试有问题,默认失败重试规则就是3+失败重试次数的延时等级

2.实现的原理

实现原理其实挺简单的,在消息生产者端,你设置这延迟消息等级,其实就是往消息的property设置一个DELAY的key与value值,key就是DELAY,然后value 就是你延迟的等级,这就是消息生产者端要做的事情,接下来介绍下broker端的实现原理。
在broker 端,收到一个消息,在往commitlog进行putMessage的时候,也就是追加写入消息的时候,会检查这个消息的延迟等级,也就是下面这端代码

会将你的topic设置成SCHEDULE_TOPIC_XXXX,然后queueId设置成你延时等级-1 ,其实你有没有发现,他的实现与事务消息的差不多,都是修改topic 与queueId。
同时,在broker启动的时候,它会启动一个定时任务,定义一个timer,设置几个定时任务(定时任务的数量与延迟等级是有关系的,一个任务对应一个定时任务)不断的扫SCHEDULE_TOPIC_XXXX这个topic的消息。

接着我们看下这scheduleMessageService 的start方法的实现

其实这个start方法就是遍历这延时等级表,然后往timer里面添加延时任务,最后创建一个定时任务,每10s执行一次持久化任务。这个持久化任务其实不用担心,就是往磁盘里面写下,延时等级对应的offset。
接着我们看下DeliverDelayedMessageTimerTask这个任务的 实现

可以看到run方法里面执行了executeOnTimeup这个方法

 /// 取出consumeQueueConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));long failScheduleOffset = offset;if (cq != null) {/// 取出SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);if (bufferCQ != null) {try {long nextOffset = offset;int i = 0;ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();///遍历取for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {long offsetPy = bufferCQ.getByteBuffer().getLong();int sizePy = bufferCQ.getByteBuffer().getInt();long tagsCode = bufferCQ.getByteBuffer().getLong();if (cq.isExtAddr(tagsCode)) {if (cq.getExt(tagsCode, cqExtUnit)) {tagsCode = cqExtUnit.getTagsCode();} else {//can't find ext content.So re compute tags code.log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",tagsCode, offsetPy, sizePy);// 消息存入时间long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);// 计算交付时间tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);}}long now = System.currentTimeMillis();// 计算正确的交付时间戳long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);long countdown = deliverTimestamp - now;if (countdown <= 0) {// 拿出消息MessageExt msgExt =ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);if (msgExt != null) {try {///转成MessageExtBrokerInnerMessageExtBrokerInner msgInner = this.messageTimeup(msgExt);/// 写到 commitlog中PutMessageResult putMessageResult =ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);if (putMessageResult != null&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {continue;} else {// XXX: warn and notify me  失败log.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",msgExt.getTopic(), msgExt.getMsgId());// 这么做主要是为了防止os cache 繁忙ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,/// 延迟一段时间  10000L  = 10snextOffset), DELAY_FOR_A_PERIOD);/// 更新 这个等级的offsetScheduleMessageService.this.updateOffset(this.delayLevel,nextOffset);return;}} catch (Exception e) {/** XXX: warn and notify me*/log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt="+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="+ offsetPy + ",sizePy=" + sizePy, e);}}} else {// 重新添加任务ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),countdown);// 更新offsetScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;}} // end of fornextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(//100msthis.delayLevel, nextOffset), DELAY_FOR_A_WHILE);// 更新ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;} finally {bufferCQ.release();}} // end of if (bufferCQ != null)else {// 找出最小的long cqMinOffset = cq.getMinOffsetInQueue();if (offset < cqMinOffset) {failScheduleOffset = cqMinOffset;log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="+ cqMinOffset + ", queueId=" + cq.getQueueId());}}} // end of if (cq != null)ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,failScheduleOffset), DELAY_FOR_A_WHILE);

这个方法首先是获取对应延迟等级的consumeQueue这个队列,取出offset往后的消息,进行遍历,找出每个unit对应的commitlog真实offset,然后通过commitlog offset 从commitlog获取到真实的那个消息,根据它的存储实现与延迟时间,算出与真实交付时间的差值,如果是小于等于0的话,说明延迟时间到了,这个时候就要暴露给消息消费者了,它就会将topic与queueId转成之前的那个topic queueId,然后重新扔到commitlog中,这个时候通过reput线程的dispatch处理,消息消费者就能发现这个消息并消费,如果这个差值还不够的话,重新创建调度任务,然后延迟执行时间是这个差值,扔到timer中。

RocketMQ源码解析之延迟消息实现原理相关推荐

  1. RocketMQ源码分析之延迟消息

    文章目录 前言 一.延迟消息 1.特点 2.使用场景 3.demo 二.发送延迟消息 三.broker端存储延迟消息 四.总结 1.延迟消息工作原理 2.延迟消息在消费者消费重试中的应用 前言 本篇文 ...

  2. RocketMQ源码解析-PullConsumer取消息(1)

    PullConsumer取消息需要自己手动调用Consumer的pull方法主动拉取消息.需要的参数有具体的消息队列(调用消费者的fetchSubscibeMessageQueue()可以得到相应to ...

  3. RocketMQ源码解析-PullConsumer取消息(2)

    如果在调用DefaultMQPullConsumer的pull方法的时候添加了pullcallback参数,那么就会调用DefaultMQPullConsumerImpl的pullAsyncImpl( ...

  4. RocketMQ源码解析之消息消费者(consume Message)

    原创不易,转载请注明出处 文章目录 前言 1.消息流程介绍 2.源码解析 2.1 并发消费 2.2 顺序消费 前言 我们在<RocketMQ源码解析之消息消费者(pullMessage)> ...

  5. 6、RocketMQ 源码解析之 Broker 启动(上)

    上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...

  6. RocketMQ源码解析之broker文件清理

    原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...

  7. 4、Eureka 源码解析 之 Eureka Client 启动原理分析

    在前面的一篇文章 3.Eureka 源码解析 之 Eureka Server 启动原理分析当中我们分析了一下 Eureka Server 的启动.在集群环境下 Eureka Server 相互之前需要 ...

  8. RocketMQ源码解析:Filtersrv

    ???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...

  9. rocketmq源码解析之name启动(一)

    2019独角兽企业重金招聘Python工程师标准>>> 说在前面 主要解析namrsrv启动部分,namesrv配置加载.netty server创建.注册出处理器. 正文 源码解析 ...

最新文章

  1. 看完秒懂大数据用户画像!
  2. mysql商品管理系统总结_Mysql管理总结
  3. html table 筛选记录,JS实现table表格内针对某列内容进行即时搜索筛选功能
  4. [css] 用CSS画出一个任意角度的扇形,可以写多种实现的方法
  5. bootstrapt学习指南_TensorFlow 2.0深度强化学习指南
  6. 【算法1-2】排序(今天刷洛谷了嘛)
  7. 软件测试中python实用技巧,精选22个Python实用技巧,秀技能必备这份技术列表!...
  8. SQL 字段保留下划线后部分
  9. PPT设计制作与美化
  10. 123457123457#0#----com.MC.konglongtianse222----前拼后广--恐龙填色mc-mc1111
  11. TI - MCU - MSP430使用指南1 - MSP430简介及选型指南
  12. 华强北airpods三代连接安卓手机没声音_安卓手机体验华强北的顶配AirPods,“翻车”还是真香?...
  13. Markdown文档数学公式的使用
  14. 用互联网思想武装自己---雷军
  15. 2010年《杨卫华谈微博架构》视频摘抄
  16. 洛谷P1080 [NOIP2012 提高组] 国王游戏
  17. android打地鼠设计报告,android开发中利用handler制作一个打地鼠小游戏
  18. linux,rpm, tar, gz, bz, bz2, rar, zip, lha, deb, 解压
  19. 最新WEB前端从入门到资深专家全套项目实战(完整)
  20. vim使用技巧-如何暂时返回终端

热门文章

  1. python picklable
  2. 微信html5页面开发教程,微信网页开发,如何在H5页面中设置分享的标题,内容以及缩略图...
  3. 《倍增商业成功宝典》全新升级上线!炙夏新品,久等终至!
  4. 寂静之地百度云在线观看迅雷下载A Quiet Place高清BT下载
  5. matlab数学实验报告syms,MATLAB验练习题(计算机) 南邮 MATLAB 数学实验大作业答案
  6. 多元线性回归分析学习笔记
  7. 基于STM32F103RC的OneNET云端智能家居环境控制系统
  8. 学习php的步骤是什么?
  9. 【车载开发系列】UDS诊断---链接控制服务($0x87)
  10. 基于 SpringBoot + Vue 的智能停车场管理平台