RocketMQ源码分析之延迟消息
文章目录
- 前言
- 一、延迟消息
- 1.特点
- 2.使用场景
- 3.demo
- 二、发送延迟消息
- 三、broker端存储延迟消息
- 四、总结
- 1.延迟消息工作原理
- 2.延迟消息在消费者消费重试中的应用
前言
本篇文章将会分析延迟消息的工作原理以及其在consumer端消息重试场景中的应用。
一、延迟消息
1.特点
(1)与普通消息相比,延迟消息需要设置延迟级别,注意:延迟级别从1开始,如果延迟级别等于0则表示该消息不是延迟消息
(2)延迟消息发送到broker后不会立刻被消费,而是需要等待特定时间后才被投递到真正的topic中
(3)RocketMQ不支持任意时间延迟,broker端配置文件中可以配置延迟队列等级,默认值是1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,目前RocketMQ中支持的延迟时间的单位有4种:s(秒)、m(分钟)、h(小时)、d(天)
2.使用场景
(1)在电商购物场景中,如果用户在下单后没有立刻付款此时界面上就会提示:如果15分钟后没有支付那么订单将会被取消
(2)通过消息触发定时任务,例如在某一固定时间点向用户发送提醒消息
3.demo
示例源于官网。
(1)producer
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;public class ScheduledMessageProducer {public static void main(String[] args) throws Exception {// 实例化一个生产者来产生延时消息DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");// 启动生产者producer.start();int totalMessagesToSend = 100;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)message.setDelayTimeLevel(3);// 发送消息producer.send(message);}// 关闭生产者producer.shutdown();}
}
(2)consumer
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;public class ScheduledMessageConsumer {public static void main(String[] args) throws Exception {// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");// 订阅Topicsconsumer.subscribe("TestTopic", "*");// 注册消息监听者consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {// Print approximate delay time periodSystem.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();}
}
二、发送延迟消息
producer发送延迟消息与普通消息的流程是一致的,唯一需要注意的是:需要在producer端调用setDelayTimeLevel(int level)方法为消息设置延迟等级,设置延迟等级实际上是在消息的properties属性中添加<DELAY, level>键值对。延迟消息的发送流程可以参考笔者之前的笔记:RocketMQ源码分析之普通消息发送
public void setDelayTimeLevel(int level) {this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));}void putProperty(final String name, final String value) {if (null == this.properties) {this.properties = new HashMap<String, String>();}this.properties.put(name, value);}
三、broker端存储延迟消息
broker端存储延迟消息时会发生两次消息写入操作,一次是将消息写入“SCHEDULE_TOPIC_XXXX”的“delayLevel - 1”消息队列中,一次是将消息写入实际的topic和queueId中。接下来我们看看延迟消息在broker端存储的工作原理。
1.在broker端存储时会判断消息的properties属性中DELAY的值是否大于0,如果大于0则表示该消息是延迟消息,对延迟消息的处理逻辑如下:
- 首先会对延迟级别进行判断,判断其是否超过了broker端设置的最大延迟级别,如果大于则将其重置为broker端的最大延迟级别
- 将消息的原始topic和queueId存储在在properties的REAL_TOPIC和REAL_QID属性中
- 将消息的topic和queueId别重置为SCHEDULE_TOPIC_XXXX和delayLevel - 1
这一步的处理对应于Commitlog.java中的asyncPutMessage(final MessageExtBrokerInner msg)方法,具体如下:
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Deliveryif (msg.getDelayTimeLevel() > 0) {//如果设置的延迟级别大于broker端配置最大延迟级别则将该消息的延迟级别重置为broker端配置的最大延迟级别if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));//将消息的topic和queueId分别重置为SCHEDULE_TOPIC_XXXX和delayLevel - 1,而消息原始的topic和queueId记录在properties的REAL_TOPIC和REAL_QID属性中msg.setTopic(topic);msg.setQueueId(queueId);}}public static int delayLevel2QueueId(final int delayLevel) {return delayLevel - 1;}
注意:在将消息存入“SCHEDULE_TOPIC_XXXX”时,MessageQueue的queueId与delayLevel的对应关系是:queueId = delayLevel - 1。接着就是将消息写入到commitlog。
2.当commitlog中新添加消息后就会调用reputMessageService服务来构建DispatchRequest,后续会根据DispatchRequest来构建consumequeue和indexFile。在构建DispatchRequest时会调用checkMessageAndReturnSize方法,该方法中有关于延迟消息的处理需要注意,具体如下:调用computeDeliverTimestamp方法计算延迟消息的投递时间,并将投递时间放在consumequeue的tag字段,也就是此时构建的consumequeue中tag中存储的是“storeTimestamp+延迟级别对应的时间”
{String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {int delayLevel = Integer.parseInt(t);if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();}if (delayLevel > 0) {tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,storeTimestamp);}}}public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {Long time = this.delayLevelTable.get(delayLevel);if (time != null) {return time + storeTimestamp;}return storeTimestamp + 1000;}
在将延迟消息写入commitlog(topic:SCHEDULE_TOPIC_XXXX)以及构建完其对应的consumequeue后,后续都是由ScheduleMessageService服务来处理,这里我们先介绍下有关ScheduleMessageService服务的基础信息,然后再接着上面详细分析broker后续如何处理延迟消息。
3.ScheduleMessageService
关于ScheduleMessageService我们需要了解以下信息:
- 其初始化及启动都是在broker启动的过程中完成的,其实现原理是Timer+TimerTask
- 在其初始化完成后会执行load函数,主要完成两个任务,一个是将文件${Rocket_HOME}/store/config/delayOffset.json加载到内存中的offsetTable,一个是获取broker端配置的messageDelayLevel并将其解析到delayLevelTable,其数据结构是<delayLevel, delay timeMillis>,在解析的过程中会确定好maxDelayLevel
public boolean load() {boolean result = super.load();result = result && this.parseDelayLevel();return result;}
public boolean parseDelayLevel() {HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();//从这里可以看到目前支持4种时间单位timeUnitTable.put("s", 1000L);timeUnitTable.put("m", 1000L * 60);timeUnitTable.put("h", 1000L * 60 * 60);timeUnitTable.put("d", 1000L * 60 * 60 * 24);String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();try {String[] levelArray = levelString.split(" ");for (int i = 0; i < levelArray.length; i++) {String value = levelArray[i];String ch = value.substring(value.length() - 1);Long tu = timeUnitTable.get(ch);int level = i + 1;if (level > this.maxDelayLevel) {this.maxDelayLevel = level;}long num = Long.parseLong(value.substring(0, value.length() - 1));long delayTimeMillis = tu * num;this.delayLevelTable.put(level, delayTimeMillis);}} catch (Exception e) {log.error("parseDelayLevel exception", e);log.info("levelString String = {}", levelString);return false;}
- 在启动ScheduleMessageService时会完成两个任务,一个是遍历delayLevelTable为每个延迟级别的队列创建一个DeliverDelayedMessageTimerTask,一个是创建定时任务将offsetTable持久化
public void start() {if (started.compareAndSet(false, true)) {this.timer = new Timer("ScheduleMessageTimerThread", true);for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {Integer level = entry.getKey();Long timeDelay = entry.getValue();//获取延迟级别对应的消息队列拉取进展,offsetTable中存储的是<delayLevel, consumequeue拉取进展>Long offset = this.offsetTable.get(level);if (null == offset) {offset = 0L;}if (timeDelay != null) {this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);}}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {if (started.get()) ScheduleMessageService.this.persist();} catch (Throwable e) {log.error("scheduleAtFixedRate flush exception", e);}}}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());}}
有了前面ScheduleMessageService的介绍,我们接着分析broker后续的处理,前面分析到构建“SCHEDULE_TOPIC_XXXX”的consumequeue,那么“SCHEDULE_TOPIC_XXXX”是由谁来消费呢?其实是有ScheduleMessageService为每个延迟队列构建的DeliverDelayedMessageTimerTask来消费。DeliverDelayedMessageTimerTask继承了TimeTask,也就是说它的本质就是一个TimeTask,其核心实现是在executeOnTimeup方法中
,我们来看下它都完成哪些操作:
根据topic的名称“SCHEDULE_TOPIC_XXXX”以及delayLevel对应的queueId来查询其对应的consumequeue
根据当前consumequeue的拉取进展来获取consumequeue中待读取的数据
解析consumequeue中的数据:延迟消息在commitlog中的物理偏移量、消息大小以及消息tag的hashcode
判断当前是否已经到了延迟消息投递时间,方法是计算投递时间与当前时间的差值countdown,如果countdown小于等于0表示已经到了消息投递时间,如果countdown大于0则表示还没有到延迟消息投递时间
如果到达延迟消息投递时间则会根据该消息在commitlog中的物理偏移量以及消息大小来获取延迟消息msgExt,接着会调用messageTimeup方法,它会根据延迟消息构建一个新的消息,这里比较关键的操作有三个:一个是根据消息的tag来设置新消息的tagsCode,一个是将消息properties中key为“DELAY”的键值对删除了,最后一个是新消息的topic和queueId是原来消息中properties中REAL_TOPIC和REAL_QID对应的值,也就是说这一步是还原了最初的延迟消息,接着就是调用了putMessage方法将还原后的消息写入commitlog,如果写入失败则会在日志中打印失败的消息同时会在10秒后再次调度该DeliverDelayedMessageTimerTask任务
如果没有到达延迟消息投递的时间则会在countdown时间之后再次调度该DeliverDelayedMessageTimerTask任务
当该延迟队列中没有新的消息可以消费时,则会以0.1秒为周期调度DeliverDelayedMessageTimerTask任务
public void run() {try {if (isStarted()) {this.executeOnTimeup();}} catch (Exception e) {// XXX: warn and notify melog.error("ScheduleMessageService, executeOnTimeup exception", e);ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);}}
public void executeOnTimeup() {ConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));long failScheduleOffset = offset;if (cq != null) {SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);if (bufferCQ != null) {try {long nextOffset = offset;int i = 0;ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {long offsetPy = bufferCQ.getByteBuffer().getLong();int sizePy = bufferCQ.getByteBuffer().getInt();long tagsCode = bufferCQ.getByteBuffer().getLong();if (cq.isExtAddr(tagsCode)) {if (cq.getExt(tagsCode, cqExtUnit)) {tagsCode = cqExtUnit.getTagsCode();} else {//can't find ext content.So re compute tags code.log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",tagsCode, offsetPy, sizePy);long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);}}long now = System.currentTimeMillis();long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);long countdown = deliverTimestamp - now;if (countdown <= 0) {MessageExt msgExt =ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);if (msgExt != null) {try {MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",msgInner.getTopic(), msgInner);continue;}PutMessageResult putMessageResult =ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);if (putMessageResult != null&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {continue;} else {// XXX: warn and notify melog.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",msgExt.getTopic(), msgExt.getMsgId());ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,nextOffset), DELAY_FOR_A_PERIOD);ScheduleMessageService.this.updateOffset(this.delayLevel,nextOffset);return;}} catch (Exception e) {/** XXX: warn and notify me*/log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt="+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="+ offsetPy + ",sizePy=" + sizePy, e);}}} else {ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),countdown);ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;}} // end of fornextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;} finally {bufferCQ.release();}} // end of if (bufferCQ != null)else {long cqMinOffset = cq.getMinOffsetInQueue();if (offset < cqMinOffset) {failScheduleOffset = cqMinOffset;log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="+ cqMinOffset + ", queueId=" + cq.getQueueId());}}} // end of if (cq != null)ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,failScheduleOffset), DELAY_FOR_A_WHILE);}
当消息成功写入commitlog后,reputMessageService会构建DispatchRequest来构建consumequeue和indexFile,这样消费者就可以正常消费消息了。
四、总结
1.延迟消息工作原理
这里我们用一张图来总结延迟消息的工作原理:
延迟消息的整个流程可以概括为以下步骤:
(1)producer端发送延迟消息
(2)broker将延迟消息的topic和queueId分别替换为SCHEDULE_TOPIC_XXXX和delayLevel-1,将其存储在commitlog中并构建对应的consumequeue,此时该消息的consumequeue的tagsCode值为storeTimestamp+延迟级别对应的时间
(3)延迟队列对应的DeliverDelayedMessageTimerTask根据offsetTable中的拉取进展从consumequeue获取延迟消息在commitlog中的物理偏移量等信息
(4)从commitlog读取延迟消息并还原延迟消息的topic和queueId
(5)将还原后的消息再次写入commitlog中
(6)构建消息的consumequeue
(7)消费者正常消费消息
从上图我们可以看到整个过程中有两次消息写入,所以此时的tps会翻倍
2.延迟消息在消费者消费重试中的应用
在RocketMQ中延迟消息被用在了consumer端消息重试的场景中,现在来分析下具体是如何应用的。
(1)首先,当前consumer端消息完一条消息返回的状态是ConsumeConcurrentlyStatus.RECONSUME_LATER时,consumer端会向broker发送RequestCode.CONSUMER_SEND_MSG_BACK请求,broker在对该请求处理时有以下两点需要注意:
- broker端存储的消息的topic名称是%RETRY%+consumerGroup,这里需要注意一点,consumer在启动的过程中除了会订阅消息本省的topic外还会订阅重试topic
private void copySubscription() throws MQClientException {try {Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();if (sub != null) {for (final Map.Entry<String, String> entry : sub.entrySet()) {final String topic = entry.getKey();final String subString = entry.getValue();SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),topic, subString);this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);}}if (null == this.messageListenerInner) {this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();}switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:break;case CLUSTERING://订阅重试topicfinal String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),retryTopic, SubscriptionData.SUB_ALL);this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);break;default:break;}} catch (Exception e) {throw new MQClientException("subscription exception", e);}}
- broker端会计算delayLevel,计算方法是delayLevel = 3 + msgExt.getReconsumeTimes()
- broker端会将消息的reconsumeTimes值加1
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
...
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) {newTopic = MixAll.getDLQTopic(requestHeader.getGroup());queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,DLQ_NUMS_PER_GROUP,PermName.PERM_WRITE, 0);if (null == topicConfig) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("topic[" + newTopic + "] not exist");return CompletableFuture.completedFuture(response);}} else {if (0 == delayLevel) {delayLevel = 3 + msgExt.getReconsumeTimes();}msgExt.setDelayTimeLevel(delayLevel);}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(newTopic);msgInner.setBody(msgExt.getBody());msgInner.setFlag(msgExt.getFlag());MessageAccessor.setProperties(msgInner, msgExt.getProperties());msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));msgInner.setQueueId(queueIdInt);msgInner.setSysFlag(msgExt.getSysFlag());msgInner.setBornTimestamp(msgExt.getBornTimestamp());msgInner.setBornHost(msgExt.getBornHost());msgInner.setStoreHost(msgExt.getStoreHost());msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);String originMsgId = MessageAccessor.getOriginMessageId(msgExt);MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
(2)在往broker写(1)中的消息时,由于消息的delayTimeLevel大于0,所以会对消息进行本文第三部分的处理,这里就和延迟消息的工作原理衔接上了。虽然消息重试次数的增加,消息被延迟处理的时间也会越长。
最后用一张图来总结下一条消费重试的消息在broker端的流转过程:
RocketMQ源码分析之延迟消息相关推荐
- RocketMQ源码解析之延迟消息实现原理
原创不易,转载请注明出处 文章目录 前言 1.延时消息的demo 2.实现的原理 前言 今天要谈论的话题其实非常轻松,但是我们有些业务场景是离不开它的,其实说到延迟消息,不知道大家有没有想到它的业务场 ...
- 《RocketMQ源码分析》NameServer如何处理Broker的连接
<RocketMQ源码分析>NameServer如何处理Broker的连接 NameServer 介绍 NameServer 功能 动态路由发现和注册 服务剔除 创建NameServerC ...
- FFmpeg源码分析-直播延迟-内存泄漏
FFmpeg源码分析-直播延迟-内存泄漏|FFmpeg源码分析方法|ffmpeg播放为什么容易产生延迟|解复用.解码内存泄漏分析 专注后台服务器开发,包括C/C++,Linux,Nginx,ZeroM ...
- RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)
在阅读本文前,若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群 根据上文的描述,发送事务消息的入口为: TransactionMQProducer#sendMessageInTra ...
- rocketmq源码分析 -生产者
概念 生产者producer,用于生产消息,在rocketmq中对应着MQProducer接口. 组件 Producer 消息生产者.在rocketmq中,生产者对应MQProducer接口: pub ...
- RocketMQ源码分析之request-reply特性
1.什么是request-reply? RocketMQ4.6.0版本中增加了request-reply新特性,该特性允许producer在发送消息后同步或者异步等待consumer消费完消息并返 ...
- RocketMQ源码分析(十二)之CommitLog同步与异步刷盘
文章目录 版本 简介 FlushCommitLogService 同步刷盘 GroupCommitService 异步刷盘 CommitRealTimeService FlushRealTimeSer ...
- 【RocketMQ|源码分析】namesrv启动停止过程都做了什么
简介 namesrv在是RocketMQ中一个十分重要的组件,它相当于是Kafka中的zookeeper,Spring Cloud Alibaba中的nacos.它的主要作用是为消息生产者和消息消费者 ...
- RocketMQ 源码分析 —— 集成 Spring Boot
点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...
最新文章
- 汇编语言中将数据、代码、栈放入不同的段
- 刷了几千道算法题,我私藏的刷题网站都在这里了
- Android学习笔记之自定义Toast
- linux GCC、GDB、Makefile
- LeetCode 1806. 还原排列的最少操作步数(模拟)
- “有趣”的投影:当PCA失效时怎么办?
- 收藏 | 一文读懂机器学习中的正则化
- 【华为云技术分享】云图说 | ContainerOps推出灰度发布模式,助力企业落地容器DevOps最佳实践
- 基于Linux和MiniGUI的嵌入式系统软件开发指南(五)
- Unix/Linux入门篇
- python三元一次方程代码_求三元一次方程计算器代码
- 光纤配线柜如何选择,又该怎么安装?
- java pgm_如何读取Java中的PGM图像?
- 刮刮乐微信html5源码,微信小程序canvas实现刮刮乐效果
- Ubuntu18.04 + 树莓派4B + wifi + 换源 +ssh + 防火墙相关 + mate桌面 + + vnc + ROS Melodic
- C++实现get与set
- 令克软件再推OpenAPI与MAS系统服务,强大引擎赋能券商多元化发展
- Result Maps collection already contains value for com.anoxia.mapper.XXXMapper.BaseResultMap
- Discuz! X2.5 数据字典
- LTE关键技术之一:OFDM