文章目录

  • 前言
  • 一、延迟消息
    • 1.特点
    • 2.使用场景
    • 3.demo
  • 二、发送延迟消息
  • 三、broker端存储延迟消息
  • 四、总结
    • 1.延迟消息工作原理
    • 2.延迟消息在消费者消费重试中的应用

前言

本篇文章将会分析延迟消息的工作原理以及其在consumer端消息重试场景中的应用。


一、延迟消息

1.特点

(1)与普通消息相比,延迟消息需要设置延迟级别,注意:延迟级别从1开始,如果延迟级别等于0则表示该消息不是延迟消息
(2)延迟消息发送到broker后不会立刻被消费,而是需要等待特定时间后才被投递到真正的topic中
(3)RocketMQ不支持任意时间延迟,broker端配置文件中可以配置延迟队列等级,默认值是1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,目前RocketMQ中支持的延迟时间的单位有4种:s(秒)、m(分钟)、h(小时)、d(天)

2.使用场景

(1)在电商购物场景中,如果用户在下单后没有立刻付款此时界面上就会提示:如果15分钟后没有支付那么订单将会被取消
(2)通过消息触发定时任务,例如在某一固定时间点向用户发送提醒消息

3.demo

示例源于官网。
(1)producer

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;public class ScheduledMessageProducer {public static void main(String[] args) throws Exception {// 实例化一个生产者来产生延时消息DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");// 启动生产者producer.start();int totalMessagesToSend = 100;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)message.setDelayTimeLevel(3);// 发送消息producer.send(message);}// 关闭生产者producer.shutdown();}
}

(2)consumer

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;public class ScheduledMessageConsumer {public static void main(String[] args) throws Exception {// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");// 订阅Topicsconsumer.subscribe("TestTopic", "*");// 注册消息监听者consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {// Print approximate delay time periodSystem.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();}
}

二、发送延迟消息

  producer发送延迟消息与普通消息的流程是一致的,唯一需要注意的是:需要在producer端调用setDelayTimeLevel(int level)方法为消息设置延迟等级,设置延迟等级实际上是在消息的properties属性中添加<DELAY, level>键值对。延迟消息的发送流程可以参考笔者之前的笔记:RocketMQ源码分析之普通消息发送


public void setDelayTimeLevel(int level) {this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));}void putProperty(final String name, final String value) {if (null == this.properties) {this.properties = new HashMap<String, String>();}this.properties.put(name, value);}

三、broker端存储延迟消息

  broker端存储延迟消息时会发生两次消息写入操作,一次是将消息写入“SCHEDULE_TOPIC_XXXX”的“delayLevel - 1”消息队列中,一次是将消息写入实际的topic和queueId中。接下来我们看看延迟消息在broker端存储的工作原理。

1.在broker端存储时会判断消息的properties属性中DELAY的值是否大于0,如果大于0则表示该消息是延迟消息,对延迟消息的处理逻辑如下:

  • 首先会对延迟级别进行判断,判断其是否超过了broker端设置的最大延迟级别,如果大于则将其重置为broker端的最大延迟级别
  • 将消息的原始topic和queueId存储在在properties的REAL_TOPIC和REAL_QID属性中
  • 将消息的topic和queueId别重置为SCHEDULE_TOPIC_XXXX和delayLevel - 1

这一步的处理对应于Commitlog.java中的asyncPutMessage(final MessageExtBrokerInner msg)方法,具体如下:

if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Deliveryif (msg.getDelayTimeLevel() > 0) {//如果设置的延迟级别大于broker端配置最大延迟级别则将该消息的延迟级别重置为broker端配置的最大延迟级别if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));//将消息的topic和queueId分别重置为SCHEDULE_TOPIC_XXXX和delayLevel - 1,而消息原始的topic和queueId记录在properties的REAL_TOPIC和REAL_QID属性中msg.setTopic(topic);msg.setQueueId(queueId);}}public static int delayLevel2QueueId(final int delayLevel) {return delayLevel - 1;}

注意:在将消息存入“SCHEDULE_TOPIC_XXXX”时,MessageQueue的queueId与delayLevel的对应关系是:queueId = delayLevel - 1。接着就是将消息写入到commitlog。

2.当commitlog中新添加消息后就会调用reputMessageService服务来构建DispatchRequest,后续会根据DispatchRequest来构建consumequeue和indexFile。在构建DispatchRequest时会调用checkMessageAndReturnSize方法,该方法中有关于延迟消息的处理需要注意,具体如下:调用computeDeliverTimestamp方法计算延迟消息的投递时间,并将投递时间放在consumequeue的tag字段,也就是此时构建的consumequeue中tag中存储的是“storeTimestamp+延迟级别对应的时间”

{String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {int delayLevel = Integer.parseInt(t);if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();}if (delayLevel > 0) {tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,storeTimestamp);}}}public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {Long time = this.delayLevelTable.get(delayLevel);if (time != null) {return time + storeTimestamp;}return storeTimestamp + 1000;}

在将延迟消息写入commitlog(topic:SCHEDULE_TOPIC_XXXX)以及构建完其对应的consumequeue后,后续都是由ScheduleMessageService服务来处理,这里我们先介绍下有关ScheduleMessageService服务的基础信息,然后再接着上面详细分析broker后续如何处理延迟消息。

3.ScheduleMessageService

关于ScheduleMessageService我们需要了解以下信息:

  • 其初始化及启动都是在broker启动的过程中完成的,其实现原理是Timer+TimerTask
  • 在其初始化完成后会执行load函数,主要完成两个任务,一个是将文件${Rocket_HOME}/store/config/delayOffset.json加载到内存中的offsetTable,一个是获取broker端配置的messageDelayLevel并将其解析到delayLevelTable,其数据结构是<delayLevel, delay timeMillis>,在解析的过程中会确定好maxDelayLevel
public boolean load() {boolean result = super.load();result = result && this.parseDelayLevel();return result;}
public boolean parseDelayLevel() {HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();//从这里可以看到目前支持4种时间单位timeUnitTable.put("s", 1000L);timeUnitTable.put("m", 1000L * 60);timeUnitTable.put("h", 1000L * 60 * 60);timeUnitTable.put("d", 1000L * 60 * 60 * 24);String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();try {String[] levelArray = levelString.split(" ");for (int i = 0; i < levelArray.length; i++) {String value = levelArray[i];String ch = value.substring(value.length() - 1);Long tu = timeUnitTable.get(ch);int level = i + 1;if (level > this.maxDelayLevel) {this.maxDelayLevel = level;}long num = Long.parseLong(value.substring(0, value.length() - 1));long delayTimeMillis = tu * num;this.delayLevelTable.put(level, delayTimeMillis);}} catch (Exception e) {log.error("parseDelayLevel exception", e);log.info("levelString String = {}", levelString);return false;}
  • 在启动ScheduleMessageService时会完成两个任务,一个是遍历delayLevelTable为每个延迟级别的队列创建一个DeliverDelayedMessageTimerTask,一个是创建定时任务将offsetTable持久化
public void start() {if (started.compareAndSet(false, true)) {this.timer = new Timer("ScheduleMessageTimerThread", true);for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {Integer level = entry.getKey();Long timeDelay = entry.getValue();//获取延迟级别对应的消息队列拉取进展,offsetTable中存储的是<delayLevel, consumequeue拉取进展>Long offset = this.offsetTable.get(level);if (null == offset) {offset = 0L;}if (timeDelay != null) {this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);}}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {if (started.get()) ScheduleMessageService.this.persist();} catch (Throwable e) {log.error("scheduleAtFixedRate flush exception", e);}}}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());}}

有了前面ScheduleMessageService的介绍,我们接着分析broker后续的处理,前面分析到构建“SCHEDULE_TOPIC_XXXX”的consumequeue,那么“SCHEDULE_TOPIC_XXXX”是由谁来消费呢?其实是有ScheduleMessageService为每个延迟队列构建的DeliverDelayedMessageTimerTask来消费。DeliverDelayedMessageTimerTask继承了TimeTask,也就是说它的本质就是一个TimeTask,其核心实现是在executeOnTimeup方法中
,我们来看下它都完成哪些操作:

  • 根据topic的名称“SCHEDULE_TOPIC_XXXX”以及delayLevel对应的queueId来查询其对应的consumequeue

  • 根据当前consumequeue的拉取进展来获取consumequeue中待读取的数据

  • 解析consumequeue中的数据:延迟消息在commitlog中的物理偏移量、消息大小以及消息tag的hashcode

  • 判断当前是否已经到了延迟消息投递时间,方法是计算投递时间与当前时间的差值countdown,如果countdown小于等于0表示已经到了消息投递时间,如果countdown大于0则表示还没有到延迟消息投递时间

  • 如果到达延迟消息投递时间则会根据该消息在commitlog中的物理偏移量以及消息大小来获取延迟消息msgExt,接着会调用messageTimeup方法,它会根据延迟消息构建一个新的消息,这里比较关键的操作有三个:一个是根据消息的tag来设置新消息的tagsCode,一个是将消息properties中key为“DELAY”的键值对删除了,最后一个是新消息的topic和queueId是原来消息中properties中REAL_TOPIC和REAL_QID对应的值,也就是说这一步是还原了最初的延迟消息,接着就是调用了putMessage方法将还原后的消息写入commitlog,如果写入失败则会在日志中打印失败的消息同时会在10秒后再次调度该DeliverDelayedMessageTimerTask任务

  • 如果没有到达延迟消息投递的时间则会在countdown时间之后再次调度该DeliverDelayedMessageTimerTask任务

  • 当该延迟队列中没有新的消息可以消费时,则会以0.1秒为周期调度DeliverDelayedMessageTimerTask任务

public void run() {try {if (isStarted()) {this.executeOnTimeup();}} catch (Exception e) {// XXX: warn and notify melog.error("ScheduleMessageService, executeOnTimeup exception", e);ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);}}
public void executeOnTimeup() {ConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));long failScheduleOffset = offset;if (cq != null) {SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);if (bufferCQ != null) {try {long nextOffset = offset;int i = 0;ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {long offsetPy = bufferCQ.getByteBuffer().getLong();int sizePy = bufferCQ.getByteBuffer().getInt();long tagsCode = bufferCQ.getByteBuffer().getLong();if (cq.isExtAddr(tagsCode)) {if (cq.getExt(tagsCode, cqExtUnit)) {tagsCode = cqExtUnit.getTagsCode();} else {//can't find ext content.So re compute tags code.log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",tagsCode, offsetPy, sizePy);long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);}}long now = System.currentTimeMillis();long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);long countdown = deliverTimestamp - now;if (countdown <= 0) {MessageExt msgExt =ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);if (msgExt != null) {try {MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",msgInner.getTopic(), msgInner);continue;}PutMessageResult putMessageResult =ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);if (putMessageResult != null&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {continue;} else {// XXX: warn and notify melog.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",msgExt.getTopic(), msgExt.getMsgId());ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,nextOffset), DELAY_FOR_A_PERIOD);ScheduleMessageService.this.updateOffset(this.delayLevel,nextOffset);return;}} catch (Exception e) {/** XXX: warn and notify me*/log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt="+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="+ offsetPy + ",sizePy=" + sizePy, e);}}} else {ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),countdown);ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;}} // end of fornextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;} finally {bufferCQ.release();}} // end of if (bufferCQ != null)else {long cqMinOffset = cq.getMinOffsetInQueue();if (offset < cqMinOffset) {failScheduleOffset = cqMinOffset;log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="+ cqMinOffset + ", queueId=" + cq.getQueueId());}}} // end of if (cq != null)ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,failScheduleOffset), DELAY_FOR_A_WHILE);}

当消息成功写入commitlog后,reputMessageService会构建DispatchRequest来构建consumequeue和indexFile,这样消费者就可以正常消费消息了。


四、总结

1.延迟消息工作原理

这里我们用一张图来总结延迟消息的工作原理:

延迟消息的整个流程可以概括为以下步骤:
(1)producer端发送延迟消息
(2)broker将延迟消息的topic和queueId分别替换为SCHEDULE_TOPIC_XXXX和delayLevel-1,将其存储在commitlog中并构建对应的consumequeue,此时该消息的consumequeue的tagsCode值为storeTimestamp+延迟级别对应的时间
(3)延迟队列对应的DeliverDelayedMessageTimerTask根据offsetTable中的拉取进展从consumequeue获取延迟消息在commitlog中的物理偏移量等信息
(4)从commitlog读取延迟消息并还原延迟消息的topic和queueId
(5)将还原后的消息再次写入commitlog中
(6)构建消息的consumequeue
(7)消费者正常消费消息
从上图我们可以看到整个过程中有两次消息写入,所以此时的tps会翻倍

2.延迟消息在消费者消费重试中的应用

在RocketMQ中延迟消息被用在了consumer端消息重试的场景中,现在来分析下具体是如何应用的。

(1)首先,当前consumer端消息完一条消息返回的状态是ConsumeConcurrentlyStatus.RECONSUME_LATER时,consumer端会向broker发送RequestCode.CONSUMER_SEND_MSG_BACK请求,broker在对该请求处理时有以下两点需要注意:

  • broker端存储的消息的topic名称是%RETRY%+consumerGroup,这里需要注意一点,consumer在启动的过程中除了会订阅消息本省的topic外还会订阅重试topic
private void copySubscription() throws MQClientException {try {Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();if (sub != null) {for (final Map.Entry<String, String> entry : sub.entrySet()) {final String topic = entry.getKey();final String subString = entry.getValue();SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),topic, subString);this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);}}if (null == this.messageListenerInner) {this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();}switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:break;case CLUSTERING://订阅重试topicfinal String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),retryTopic, SubscriptionData.SUB_ALL);this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);break;default:break;}} catch (Exception e) {throw new MQClientException("subscription exception", e);}}
  • broker端会计算delayLevel,计算方法是delayLevel = 3 + msgExt.getReconsumeTimes()
  • broker端会将消息的reconsumeTimes值加1
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
...
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) {newTopic = MixAll.getDLQTopic(requestHeader.getGroup());queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,DLQ_NUMS_PER_GROUP,PermName.PERM_WRITE, 0);if (null == topicConfig) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("topic[" + newTopic + "] not exist");return CompletableFuture.completedFuture(response);}} else {if (0 == delayLevel) {delayLevel = 3 + msgExt.getReconsumeTimes();}msgExt.setDelayTimeLevel(delayLevel);}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(newTopic);msgInner.setBody(msgExt.getBody());msgInner.setFlag(msgExt.getFlag());MessageAccessor.setProperties(msgInner, msgExt.getProperties());msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));msgInner.setQueueId(queueIdInt);msgInner.setSysFlag(msgExt.getSysFlag());msgInner.setBornTimestamp(msgExt.getBornTimestamp());msgInner.setBornHost(msgExt.getBornHost());msgInner.setStoreHost(msgExt.getStoreHost());msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);String originMsgId = MessageAccessor.getOriginMessageId(msgExt);MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);

(2)在往broker写(1)中的消息时,由于消息的delayTimeLevel大于0,所以会对消息进行本文第三部分的处理,这里就和延迟消息的工作原理衔接上了。虽然消息重试次数的增加,消息被延迟处理的时间也会越长。

最后用一张图来总结下一条消费重试的消息在broker端的流转过程:

RocketMQ源码分析之延迟消息相关推荐

  1. RocketMQ源码解析之延迟消息实现原理

    原创不易,转载请注明出处 文章目录 前言 1.延时消息的demo 2.实现的原理 前言 今天要谈论的话题其实非常轻松,但是我们有些业务场景是离不开它的,其实说到延迟消息,不知道大家有没有想到它的业务场 ...

  2. 《RocketMQ源码分析》NameServer如何处理Broker的连接

    <RocketMQ源码分析>NameServer如何处理Broker的连接 NameServer 介绍 NameServer 功能 动态路由发现和注册 服务剔除 创建NameServerC ...

  3. FFmpeg源码分析-直播延迟-内存泄漏

    FFmpeg源码分析-直播延迟-内存泄漏|FFmpeg源码分析方法|ffmpeg播放为什么容易产生延迟|解复用.解码内存泄漏分析 专注后台服务器开发,包括C/C++,Linux,Nginx,ZeroM ...

  4. RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)

    在阅读本文前,若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群 根据上文的描述,发送事务消息的入口为: TransactionMQProducer#sendMessageInTra ...

  5. rocketmq源码分析 -生产者

    概念 生产者producer,用于生产消息,在rocketmq中对应着MQProducer接口. 组件 Producer 消息生产者.在rocketmq中,生产者对应MQProducer接口: pub ...

  6. RocketMQ源码分析之request-reply特性

    1.什么是request-reply?   RocketMQ4.6.0版本中增加了request-reply新特性,该特性允许producer在发送消息后同步或者异步等待consumer消费完消息并返 ...

  7. RocketMQ源码分析(十二)之CommitLog同步与异步刷盘

    文章目录 版本 简介 FlushCommitLogService 同步刷盘 GroupCommitService 异步刷盘 CommitRealTimeService FlushRealTimeSer ...

  8. 【RocketMQ|源码分析】namesrv启动停止过程都做了什么

    简介 namesrv在是RocketMQ中一个十分重要的组件,它相当于是Kafka中的zookeeper,Spring Cloud Alibaba中的nacos.它的主要作用是为消息生产者和消息消费者 ...

  9. RocketMQ 源码分析 —— 集成 Spring Boot

    点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...

最新文章

  1. 汇编语言中将数据、代码、栈放入不同的段
  2. 刷了几千道算法题,我私藏的刷题网站都在这里了
  3. Android学习笔记之自定义Toast
  4. linux GCC、GDB、Makefile
  5. LeetCode 1806. 还原排列的最少操作步数(模拟)
  6. “有趣”的投影:当PCA失效时怎么办?
  7. 收藏 | 一文读懂机器学习中的正则化
  8. 【华为云技术分享】云图说 | ContainerOps推出灰度发布模式,助力企业落地容器DevOps最佳实践
  9. 基于Linux和MiniGUI的嵌入式系统软件开发指南(五)
  10. Unix/Linux入门篇
  11. python三元一次方程代码_求三元一次方程计算器代码
  12. 光纤配线柜如何选择,又该怎么安装?
  13. java pgm_如何读取Java中的PGM图像?
  14. 刮刮乐微信html5源码,微信小程序canvas实现刮刮乐效果
  15. Ubuntu18.04 + 树莓派4B + wifi + 换源 +ssh + 防火墙相关 + mate桌面 + + vnc + ROS Melodic
  16. C++实现get与set
  17. 令克软件再推OpenAPI与MAS系统服务,强大引擎赋能券商多元化发展
  18. Result Maps collection already contains value for com.anoxia.mapper.XXXMapper.BaseResultMap
  19. Discuz! X2.5 数据字典
  20. LTE关键技术之一:OFDM

热门文章

  1. python socket 阻塞
  2. 计算机竞赛游戏冒险岛,冒险岛4代电脑版
  3. 1、Python基础课件
  4. 早餐 | 第九期 · Demos和Samples介绍
  5. 编程思想:我亦无他,唯手熟尔
  6. 谭浩强C程序设计第五版课后答案视频+代码讲解完整版(合集)持续跟新中~~~
  7. iOS录制回放神器AutoTouch使用介绍
  8. scope=both和scope=spfile
  9. Facebook 广告视频信息获取
  10. TexMaker中文目录乱码