文章目录

  • 消息存储机制
    • 1.前言
    • 2.核心存储类:DefaultMessageStore
    • 3.消息存储流程
    • 4.消息存储文件
    • 5.存储文件内存映射
      • 5.1.MapperFileQueue
      • 5.2.MappedFile
        • 5.2.1.commit
        • 5.2.2.flush
      • 5.3.TransientStorePool
    • 6.刷盘机制
      • 6.1.同步刷盘
      • 6.2.异步刷盘

消息存储机制

1.前言

本文主要讲解内容是Broker接收到消息生产者发送的消息之后,如何将消息持久化存储在Broker中。

2.核心存储类:DefaultMessageStore

private final MessageStoreConfig messageStoreConfig; //消息配置属性
private final CommitLog commitLog;      //CommitLog文件存储的实现类->消息存储在commitLog中
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;    //消息队列存储缓存表,按照消息主题分组
private final FlushConsumeQueueService flushConsumeQueueService;    //消息队列文件刷盘服务线程
private final CleanCommitLogService cleanCommitLogService;  //过期CommitLog文件删除服务
private final CleanConsumeQueueService cleanConsumeQueueService;    //过期ConsumerQueue队列文件删除服务
private final IndexService indexService;    //索引服务
private final AllocateMappedFileService allocateMappedFileService;  //MappedFile分配服务->内存映射处理commitLog、consumerQueue文件
private final ReputMessageService reputMessageService;//CommitLog消息分发,根据CommitLog文件构建ConsumerQueue、IndexFile文件
private final HAService haService;  //消息主从同步实现服务
private final ScheduleMessageService scheduleMessageService;    //消息服务调度服务
private final StoreStatsService storeStatsService;  //消息存储服务
private final MessageArrivingListener messageArrivingListener;  //消息到达监听器
private final TransientStorePool transientStorePool;    //消息堆外内存缓存
private final BrokerStatsManager brokerStatsManager;    //Broker状态管理器
private final MessageArrivingListener messageArrivingListener;  //消息拉取长轮询模式消息达到监听器
private final BrokerConfig brokerConfig;    //Broker配置类
private StoreCheckpoint storeCheckpoint;    //文件刷盘监测点
private final LinkedList<CommitLogDispatcher> dispatcherList; //CommitLog文件转发请求

以上属性是消息存储的核心,需要重点关注每个属性的具体作用。

3.消息存储流程

消息存储时序图如下:

消息存储入口:DefaultMessageStore#putMessage

//检查Broker是否是Slave || 判断当前写入状态如果是正在写入,则不能继续
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
}//检查消息主题和消息体长度是否合法
PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
}
//记录开始写入时间
long beginTime = this.getSystemClock().now();
//写入消息
CompletableFuture<PutMessageResult> resultFuture = this.commitLog.asyncPutMessages(messageExtBatch);resultFuture.thenAccept((result) -> {long elapsedTime = this.getSystemClock().now() - beginTime;if (elapsedTime > 500) {log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);}//记录相关统计信息this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);//存储失败if (null == result || !result.isOk()) {//存储状态服务->消息存储失败次数自增this.storeStatsService.getPutMessageFailedTimes().add(1);}
});return resultFuture;

DefaultMessageStore#checkStoreStatus

//存储服务已停止
if (this.shutdown) {log.warn("message store has shutdown, so putMessage is forbidden");return PutMessageStatus.SERVICE_NOT_AVAILABLE;
}
//Broker为Slave->不可写入
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {long value = this.printTimes.getAndIncrement();if ((value % 50000) == 0) {log.warn("broke role is slave, so putMessage is forbidden");}return PutMessageStatus.SERVICE_NOT_AVAILABLE;
}//不可写入->broker磁盘已满/写入逻辑队列错误/写入索引文件错误
if (!this.runningFlags.isWriteable()) {long value = this.printTimes.getAndIncrement();if ((value % 50000) == 0) {log.warn("the message store is not writable. It may be caused by one of the following reasons: " +"the broker's disk is full, write to logic queue error, write to index file error, etc");}return PutMessageStatus.SERVICE_NOT_AVAILABLE;
} else {this.printTimes.set(0);
}
//操作系统页写入是否繁忙
if (this.isOSPageCacheBusy()) {return PutMessageStatus.OS_PAGECACHE_BUSY;
}
return PutMessageStatus.PUT_OK;

CommitLog#asyncPutMessages

//记录消息存储时间
messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
AppendMessageResult result;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());//消息类型是否合法
if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
}//....//获取上一个MapperFile对象->内存映射的具体实现
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();//追加消息需要加锁->串行化处理
putMessageLock.lock();
try {long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;//记录消息存储时间->保证消息的有序性messageExtBatch.setStoreTimestamp(beginLockTimestamp);//判断如果mappedFile如果为空或者已满,创建新的mappedFile文件if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}//如果创建失败,直接返回if (null == mappedFile) {log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));}//!!!写入消息到mappedFile中!!!result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);//根据写入结果做不同的处理switch (result.getStatus()) {case PUT_OK:break;case END_OF_FILE:unlockMappedFile = mappedFile;// Create a new file, re-write the messagemappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));}result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));case UNKNOWN_ERROR:default:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));}elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;beginTimeInLock = 0;
} finally {putMessageLock.unlock();
}if (elapsedTimeInLock > 500) {log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result);
}if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(result.getMsgNum());
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(result.getWroteBytes());//根据刷盘策略进行刷盘
CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, messageExtBatch);
//主从同步
CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, messageExtBatch);

MappedFile#appendMessagesInner

assert messageExt != null;
assert cb != null;//获取写指针/写入位置
int currentPos = this.wrotePosition.get();//写指针偏移量小于文件指定大小
if (currentPos < this.fileSize) {//写入缓冲区ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();byteBuffer.position(currentPos);AppendMessageResult result;//根据消息类型->批量/单个->进行不同处理if (messageExt instanceof MessageExtBrokerInner) {//单个消息//调用回调方法写入磁盘->CommitLog#doAppendresult = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBrokerInner) messageExt, putMessageContext);} else if (messageExt instanceof MessageExtBatch) {//批量消息result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBatch) messageExt, putMessageContext);} else {//未知消息->返回异常结果return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}//更新写指针this.wrotePosition.addAndGet(result.getWroteBytes());//更新写入时间戳this.storeTimestamp = result.getStoreTimestamp();//返回写入结果->成功return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);

CommitLog#doAppend

public AppendMessageResult doAppend(final long fileFromOffset,           //文件序列偏移量final ByteBuffer byteBuffer,       //NIO字节容器final int maxBlank,                   //最大可写入字节数   final MessageExtBrokerInner msgInner, //消息封装实体PutMessageContext putMessageContext) {//文件写入偏移量long wroteOffset = fileFromOffset + byteBuffer.position();//构建msgIdSupplier<String> msgIdSupplier = () -> {//系统标识int sysflag = msgInner.getSysFlag();//msgId底层存储由16个字节组成int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;//分配16个字节的存储空间ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);//8个字节->ip、host各占用4个字节MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);//清除缓冲区->因为接下来需要翻转缓冲区msgIdBuffer.clear();//剩下的8个字节用来存储commitLog偏移量-wroteOffsetmsgIdBuffer.putLong(msgIdLen - 8, wroteOffset);return UtilAll.bytes2string(msgIdBuffer.array());};//获取当前主题消息队列唯一keyString key = putMessageContext.getTopicQueueTableKey();//根据key获取消息存储偏移量Long queueOffset = CommitLog.this.topicQueueTable.get(key);if (null == queueOffset) {queueOffset = 0L;CommitLog.this.topicQueueTable.put(key, queueOffset);}// Transaction messages that require special handlingfinal int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());switch (tranType) {// Prepared and Rollback message is not consumed, will not enter the// consumer queueccase MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:queueOffset = 0L;break;case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:default:break;}ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();//计算消息存储长度final int msgLen = preEncodeBuffer.getInt(0);// Determines whether there is sufficient free space//消息是如果没有足够的存储空间则新创建CommitLog文件if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {this.msgStoreItemMemory.clear();// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(maxBlank);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);// 3 The remaining space may be any value// Here the length of the specially set maxBlankfinal long beginTimeMills = CommitLog.this.defaultMessageStore.now();byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */msgIdSupplier, msgInner.getStoreTimestamp(),queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);}int pos = 4 + 4 + 4 + 4 + 4;// 6 QUEUEOFFSETpreEncodeBuffer.putLong(pos, queueOffset);pos += 8;// 7 PHYSICALOFFSETpreEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;// 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMPpos += 8 + 4 + 8 + ipLen;// refresh store time stamp in lockpreEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());final long beginTimeMills = CommitLog.this.defaultMessageStore.now();// Write messages to the queue buffer//将消息存储到byteBuffer中byteBuffer.put(preEncodeBuffer);msgInner.setEncodedBuff(null);//返回AppendMessageResultAppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);switch (tranType) {case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:// The next update ConsumeQueue informationCommitLog.this.topicQueueTable.put(key, ++queueOffset);break;default:break;}return result;
}

AppendMessageResult

public class AppendMessageResult {private AppendMessageStatus status;        //消息追加结果private long wroteOffset;              //消息写入偏移量    private int wroteBytes;                //消息待写入字节private String msgId;                  //消息ID   private Supplier<String> msgIdSupplier; //消息IDprivate long storeTimestamp;            //消息写入时间戳private long logicsOffset;            //消息队列偏移量private long pagecacheRT = 0;         //消息开始写入时间戳
}

返回消息写入结果,回到CommitLog#asyncPutMessages

result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
switch (result.getStatus()) {case PUT_OK:break;
}
//释放锁
putMessageLock.unlock();
//存储数据统计
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(result.getMsgNum());
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(result.getWroteBytes());//根据刷盘策略进行刷盘
CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, messageExtBatch);
//消息主从同步
CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, messageExtBatch);

4.消息存储文件

  • commitLog:消息存储目录
  • config:配置信息
  • consumerqueue:消息队列存储目录
  • index:消息索引文件存储目录
  • abort:Broker异常关闭时信息记录
  • checkpoint:文件监测点,存储commitlog、consumerqueue、index文件最后一次刷盘时间戳。

5.存储文件内存映射

RocketMQ通过使用内存映射文件提高IO访问性能,无论是CommitLog、ConsumerQueue还是IndexFile,单个文件都被设计为固定长度。

如果一个文件写满以后再创建一个新文件,文件名就为该文件第一条消息对应的全局物理偏移量,如下图所示。

5.1.MapperFileQueue

//存储目录
private final String storePath;//单个文件大小
protected final int mappedFileSize;//MappedFile文件集合
protected final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();//映射文件MapperFile分配服务线程
private final AllocateMappedFileService allocateMappedFileService;//刷盘指针
protected long flushedWhere = 0;//当前数据提交指针
private long committedWhere = 0;

根据存储时间获取对应的MappedFile

public MappedFile getMappedFileByTime(final long timestamp) {//拷贝映射文件Object[] mfs = this.copyMappedFiles(0);if (null == mfs) {return null;}//遍历映射文件数组for (int i = 0; i < mfs.length; i++) {MappedFile mappedFile = (MappedFile) mfs[i];//MappedFile的最后修改时间大于指定时间戳->返回该文件if (mappedFile.getLastModifiedTimestamp() >= timestamp) {return mappedFile;}}return (MappedFile) mfs[mfs.length - 1];
}

根据消息存储偏移量查找MappedFile

public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {//分别获取第一个和最后一个映射文件MappedFile firstMappedFile = this.getFirstMappedFile();MappedFile lastMappedFile = this.getLastMappedFile();//第一个文件和最后一个文件均不为空,则进行处理if (firstMappedFile != null && lastMappedFile != null) {if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",offset,firstMappedFile.getFileFromOffset(),lastMappedFile.getFileFromOffset() + this.mappedFileSize,this.mappedFileSize,this.mappedFiles.size());} else {//获得文件索引int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));//目标映射文件MappedFile targetFile = null;try {//根据文件索引查找目标文件targetFile = this.mappedFiles.get(index);} catch (Exception ignored) {}//对获取到的映射文件进行检查-判空-偏移量是否合法if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {return targetFile;}//继续选择映射文件for (MappedFile tmpMappedFile : this.mappedFiles) {if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {return tmpMappedFile;}}}//返回第一个映射文件if (returnFirstOnNotFound) {return firstMappedFile;}}} catch (Exception e) {log.error("findMappedFileByOffset Exception", e);}return null;
}

获取存储文件最小偏移量

public long getMinOffset() {if (!this.mappedFiles.isEmpty()) {try {return this.mappedFiles.get(0).getFileFromOffset();} catch (IndexOutOfBoundsException e) {//continue;} catch (Exception e) {log.error("getMinOffset has exception.", e);}}return -1;
}

获取存储文件最大偏移量

public long getMaxOffset() {//最后一个映射文件MappedFile mappedFile = getLastMappedFile();if (mappedFile != null) {return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();}return 0;
}

获取存储文件当前写指针

public long getMaxWrotePosition() {MappedFile mappedFile = getLastMappedFile();if (mappedFile != null) {return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();}return 0;
}

5.2.MappedFile

//操作系统每页刷写大小,默认4K
public static final int OS_PAGE_SIZE = 1024 * 4;
//当前JVM实例中MappedFile虚拟内存
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
//当前JVM实例中MappedFile对象个数
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
//当前文件的写指针
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
//当前文件的提交指针
protected final AtomicInteger committedPosition = new AtomicInteger(0);
//刷盘指针
private final AtomicInteger flushedPosition = new AtomicInteger(0);
//文件大小
protected int fileSize;
//文件通道
protected FileChannel fileChannel;/*** 堆外内存ByteBuffer* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.*/
protected ByteBuffer writeBuffer = null;
//堆外内存池
protected TransientStorePool transientStorePool = null;
//文件名称
private String fileName;
//该文件的处理偏移量
private long fileFromOffset;
//物理文件
private File file;
//文件映射缓冲区
private MappedByteBuffer mappedByteBuffer;
//存储时间戳
private volatile long storeTimestamp = 0;
//是否是初次创建
private boolean firstCreateInQueue = false;

MappedFile初始化

private void init(final String fileName, final int fileSize) throws IOException {this.fileName = fileName;this.fileSize = fileSize;this.file = new File(fileName);this.fileFromOffset = Long.parseLong(this.file.getName());boolean ok = false;//确保文件目录正确ensureDirOK(this.file.getParent());try {this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);TOTAL_MAPPED_FILES.incrementAndGet();ok = true;} catch (FileNotFoundException e) {log.error("Failed to create file " + this.fileName, e);throw e;} catch (IOException e) {log.error("Failed to map file " + this.fileName, e);throw e;} finally {if (!ok && this.fileChannel != null) {this.fileChannel.close();}}
}

值得注意的是MappedFile还有一个属性值transientStorePoolEnable,当这个属性值为true时,数据会先存储到对外内存,如何通过commit线程将数据提交到内存映射buffer中,最后通过flush线程将内存映射刷写到磁盘中。

开启transientStorePoolEnable

public void init(final String fileName, final int fileSize,final TransientStorePool transientStorePool) throws IOException {init(fileName, fileSize);//初始化对外内存缓冲区this.writeBuffer = transientStorePool.borrowBuffer();this.transientStorePool = transientStorePool;
}
5.2.1.commit

刷盘文件提交流程大致如下:

DefaultMessageStore#flush→CommitLog→MappedFileQueue→MappedFile

//DefaultMessageStore
public long flush() {return this.commitLog.flush();
}
//CommitLog
public long flush() {//----------↓-----------this.mappedFileQueue.commit(0);this.mappedFileQueue.flush(0);return this.mappedFileQueue.getFlushedWhere();
}
//MappedFileQueue
public boolean commit(final int commitLeastPages) {boolean result = true;MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);if (mappedFile != null) {//----------↓-----------int offset = mappedFile.commit(commitLeastPages);long where = mappedFile.getFileFromOffset() + offset;result = where == this.committedWhere;this.committedWhere = where;}return result;
}

最后进入MappedFile进行数据刷写提交:

MappedFile#commit

public int commit(final int commitLeastPages) {//如果为空->说明没有开启transientStorePoolEnable->无需向文件通道fileChannel提交数据 //将wrotePosition视为committedPosition并返回->然后直接进行flush操作if (writeBuffer == null) {return this.wrotePosition.get();}//提交数据页数大于commitLeastPagesif (this.isAbleToCommit(commitLeastPages)) {//MappedFile是否被销毁//hold()->isAvailable()->MappedFile.available<属性继承于ReferenceResource>//文件如何被摧毁可见下文中的shutdown()if (this.hold()) {//--↓--commit0();this.release();} else {log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());}}// All dirty data has been committed to FileChannel.// 所有数据提交后,清空缓冲区if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {this.transientStorePool.returnBuffer(writeBuffer);this.writeBuffer = null;}return this.committedPosition.get();
}

MappedFile#isAbleToCommit

//已提交刷盘的指针
int flush = this.committedPosition.get();
//文件写指针
int write = this.wrotePosition.get();//刷盘已写满
if (this.isFull()) {return true;
}if (commitLeastPages > 0) {//文件内容达到commitLeastPages->进行刷盘return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
}
return write > flush;

MappedFile#commit0

//写指针
int writePos = this.wrotePosition.get();
//上次提交指针
int lastCommittedPosition = this.committedPosition.get();
//写指针一定要大于上次提交指针
if (writePos - lastCommittedPosition > 0) {try {//复制共享内存区域ByteBuffer byteBuffer = writeBuffer.slice();//设置提交位置是上次提交位置byteBuffer.position(lastCommittedPosition);//最大提交数量byteBuffer.limit(writePos);//设置fileChannel位置是上次提交位置this.fileChannel.position(lastCommittedPosition);//将lastCommittedPosition到writePos的数据复制到FileChannel中this.fileChannel.write(byteBuffer);//重置提交位置为writePos->以此反复避免提交重复数据this.committedPosition.set(writePos);} catch (Throwable e) {log.error("Error occurred when commit data to FileChannel.", e);}
}
5.2.2.flush

刷写磁盘,直接调用MappedByteBuffer或fileChannel的force方法将内存中的数据持久化到磁盘,那么flushedPosition应该等于MappedByteBuffer中的写指针;

  • 如果writeBuffer不为空,则flushPosition应该等于上一次的commit指针;因为上一次提交的数据就是进入到MappedByteBuffer中的数据;
  • 如果writeBuffer为空,数据时直接进入到MappedByteBuffer,wrotePosition代表的是MappedByteBuffer中的指针,故设置flushPosition为wrotePosition。

提交数据到fileChannel后开始刷盘,步骤如下:

CommitLog#flush→MappedFileQueue#flush→MappedFile#flush

MappedFile#flush

//达到刷盘条件
if (this.isAbleToFlush(flushLeastPages)) {//加锁,同步刷盘if (this.hold()) {//读指针int value = getReadPosition();try {//开启TransientStorePool->fileChannel//关闭TransientStorePool->mappedByteBuffer//We only append data to fileChannel or mappedByteBuffer, never both.//数据从writeBuffer提交数据到fileChannel->forceif (writeBuffer != null || this.fileChannel.position() != 0) {this.fileChannel.force(false);}//数据直接传到mappedByteBuffer->forceelse {this.mappedByteBuffer.force();}} catch (Throwable e) {log.error("Error occurred when force data to disk.", e);}//更新刷盘位置this.flushedPosition.set(value);this.release();} else {log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());this.flushedPosition.set(getReadPosition());}
}
return this.getFlushedPosition();

MappedFile#getReadPosition

/*** 获取当前文件最大可读指针* @return The max position which have valid data*/
public int getReadPosition() {//如果writeBuffer为空直接返回当前的写指针,否则返回上次提交的指针//在MappedFile中,只有提交了的数据(写入到MappedByteBuffer或FileChannel中的数据)才是安全的数据return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}

MappedFile#shutdown

MappedFile文件销毁的实现方法为ReferenceResource中的public boolean destory(long intervalForcibly)intervalForcibly表示拒绝被销毁的最大存活时间。

if (this.available) {//关闭MappedFilethis.available = false;//设置关闭时间戳this.firstShutdownTimestamp = System.currentTimeMillis();//释放资源this.release();
} else if (this.getRefCount() > 0) {if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {this.refCount.set(-1000 - this.getRefCount());this.release();}
}

5.3.TransientStorePool

用于短时间存储数据的存储池。RocketMQ单独创建ByteBuffer内存缓冲区,用来临时存储数据,数据先写入该内存映射,然后由commit线程将数据复制到目标物理文件所对应的内存映射中。RocketMQ引入该机制主要的原因是提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘。

private final int poolSize;      //availableBuffers个数
private final int fileSize;     //每个ByteBuffer大小
private final Deque<ByteBuffer> availableBuffers; //双端队列-存储可用缓冲区的容器
private final MessageStoreConfig storeConfig;       //消息存储配置

初始化:

public void init() {//创建poolSize个堆外内存区for (int i = 0; i < poolSize; i++) {//分配内存ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);//内存地址final long address = ((DirectBuffer) byteBuffer).address();Pointer pointer = new Pointer(address);//使用com.sun.jna.Library类库将该批内存锁定,避免被置换到交换区,提高存储性能LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));availableBuffers.offer(byteBuffer);}
}

6.刷盘机制

6.1.同步刷盘

CommitLog#submitFlushRequest

//同步刷盘
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {//刷写CommitLog服务线程final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;//需要等待消息存储结果if (messageExt.isWaitStoreMsgOK()) {//封装刷盘请求GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());//将request放入刷写磁盘服务线程中//--------↓--------service.putRequest(request);//等待写入结果返回return request.future();} else {//唤醒同步刷盘线程service.wakeup();return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}
}
else {//异步刷盘....
}

GroupCommitRequest

public static class GroupCommitRequest {private final long nextOffset;private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();private final long startTimestamp = System.currentTimeMillis();private long timeoutMillis = Long.MAX_VALUE;
}

GroupCommitService

class GroupCommitService extends FlushCommitLogService {//分别存储写请求和读请求的容器private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();//消息存储自旋锁-保护以上容器线程安全private final PutMessageSpinLock lock = new PutMessageSpinLock();
}

GroupCommitService#putRequest

//加上自旋锁
lock.lock();
try {//将写请求放入容器this.requestsWrite.add(request);
} finally {lock.unlock();
}
//唤醒线程
this.wakeup();

GroupCommitService#run

CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {try {//等待线程10sthis.waitForRunning(10);//执行提交任务this.doCommit();} catch (Exception e) {CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);}
}
// Under normal circumstances shutdown, wait for the arrival of the
// request, and then flush
try {Thread.sleep(10);
} catch (InterruptedException e) {CommitLog.log.warn("GroupCommitService Exception, ", e);
}
synchronized (this) {this.swapRequests();
}
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end");

GroupCommitService#doCommit

if (!this.requestsRead.isEmpty()) {//遍历requestsReadfor (GroupCommitRequest req : this.requestsRead) {//刷盘后指针位置大于请求指针偏移量则代表已经刷盘成功//下一个文件中可能有消息,所以最多两次flushboolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();for (int i = 0; i < 2 && !flushOK; i++) {CommitLog.this.mappedFileQueue.flush(0);flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();}//唤醒发送消息客户端    req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);}//更新刷盘监测点long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();if (storeTimestamp > 0) {CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);}//清空任务容器this.requestsRead = new LinkedList<>();
} else {//因为个别消息设置为异步flush,所以会走到这个过程CommitLog.this.mappedFileQueue.flush(0);
}
6.2.异步刷盘

在消息追加到内存后,立即返回给消息发送端。如果开启transientStorePoolEnable,RocketMQ会单独申请一个与目标物理文件(commitLog)同样大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到物理文件的内存映射中,然后刷写到磁盘。如果未开启transientStorePoolEnable,消息直接追加到物理文件直接映射文件中,然后刷写到磁盘中。

开启transientStorePoolEnable后异步刷盘步骤:

  • 将消息直接追加到ByteBuffer堆外内存
  • CommitRealTimeService线程每隔200ms将ByteBuffer中的消息提交到fileChannel
  • commit操作成功,将commitedPosition向后移动
  • FlushRealTimeService线程每隔500ms将fileChannel的数据刷写到磁盘
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {...
}
// Asynchronous flush
else {//开启TransientStorePoolEnableif (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {//唤醒flushCommitLogService服务线程flushCommitLogService.wakeup();} else  {commitLogService.wakeup();}return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}

CommitRealTimeService#run

提交线程工作机制:

 //间隔时间:200msint interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();//一次提交的最少页数:4int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();//两次提交的最大间隔:200msint commitDataThoroughInterval =CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();//上次提交间隔超过commitDataThoroughInterval,则忽略提交commitDataLeastPages参数,直接提交long begin = System.currentTimeMillis();if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {this.lastCommitTimestamp = begin;//忽略提交页数要求commitDataLeastPages = 0;}try {//执行提交操作,将待提交数据提交到物理文件的内存映射区并返回提交结果boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);long end = System.currentTimeMillis();//提交成功if (!result) {this.lastCommitTimestamp = end; // result = false means some data committed.//now wake up flush thread.//唤醒刷盘线程FlushRealTimeService(FlushCommitLogService的子类)flushCommitLogService.wakeup();}if (end - begin > 500) {log.info("Commit data to file costs {} ms", end - begin);}this.waitForRunning(interval);} catch (Throwable e) {CommitLog.log.error(this.getServiceName() + " service has exception. ", e);}
}

FlushCommitLogService#run

刷盘线程工作机制:

//线程不停止
while (!this.isStopped()) {//线程执行间隔:500msint interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();//一次刷盘任务最少包含页数:4int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();//两次刷盘任务最大间隔:10sint flushPhysicQueueThoroughInterval =CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();boolean printFlushProgress = false;// Print flush progresslong currentTimeMillis = System.currentTimeMillis();//如果当前时间戳大于上次刷盘时间+最大刷盘任务间隔 则本次刷盘任务忽略flushPhysicQueueLeastPages(设置为0) 直接提交刷盘任务if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {this.lastFlushTimestamp = currentTimeMillis;flushPhysicQueueLeastPages = 0;printFlushProgress = (printTimes++ % 10) == 0;}try {if (flushCommitLogTimed) {//线程执行间隔-500mThread.sleep(interval);} else {this.waitForRunning(interval);}if (printFlushProgress) {this.printFlushProgress();}long begin = System.currentTimeMillis();//刷写磁盘CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);//更新存储监测点文件的时间戳long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();if (storeTimestamp > 0) {CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);}long past = System.currentTimeMillis() - begin;if (past > 500) {log.info("Flush data to disk costs {} ms", past);}} catch (Throwable e) {CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);this.printFlushProgress();}
}

本文仅作为个人学习使用,如有不足或错误请指正!

RocketMQ:消息存储机制详解与源码解析相关推荐

  1. SpringMVC异常处理机制详解[附带源码分析]

    SpringMVC异常处理机制详解[附带源码分析] 参考文章: (1)SpringMVC异常处理机制详解[附带源码分析] (2)https://www.cnblogs.com/fangjian0423 ...

  2. Android应用Context详解及源码解析

    [工匠若水 http://blog.csdn.net/yanbober 转载烦请注明出处,尊重分享成果] 1 背景 今天突然想起之前在上家公司(做TV与BOX盒子)时有好几个人问过我关于Android ...

  3. FreeRTOS之Tracealyzer for FreeRTOS(FreeRTOS+Trace) 详解(源码解析+移植)

    源:FreeRTOS之Tracealyzer for FreeRTOS(FreeRTOS+Trace) 详解(源码解析+移植)

  4. okhttp的应用详解与源码解析--http的发展史

    乘5G之势,借物联网之风,Android未来亦可期,Android优势在于开放,手机.平板.车载设备.智能家居等都是Android的舞台,Google不倒,Android不灭,本专栏的同步视频教程已经 ...

  5. Diffusion Model原理详解及源码解析

    作者:秃头小苏@CSDN 编辑:3D视觉开发者社区 文章目录 Diffusion Model原理详解及源码解析 写在前面 Diffusion Model原理详解✨✨✨ 整体思路 实施细节 正向过程 逆 ...

  6. RN FlatList使用详解及源码解析

    FlatList使用详解及源码解析 前言 长列表或者无限下拉列表是最常见的应用场景之一.RN 提供的 ListView 组件,在长列表这种数据量大的场景下,性能堪忧.而在最新的 0.43 版本中,提供 ...

  7. cvHoughLines2霍夫直线检测函数详解及源码解析

    转载请注明出处. 文章链接:https://blog.csdn.net/duiwangxiaomi/article/details/126406184 博文目录 一. 前言 二. cvHoughLin ...

  8. 【OS xv6】1 万字详解shell源码解析命令(内含wsl+vscode调试xv6教程 文档第一章助读)

    现在前面的 嘻嘻几百年没写文了确实没时间,等搞完毕设可以一起重温重温.最近学os,读源码发现还挺多东西得整理的,尤其途中有必要找资料整理的时候,内容有点多有点乱,写在源码已经显得不现实了.用的vsco ...

  9. FreeRTOS 之二 Tracealyzer for FreeRTOS(FreeRTOS+Trace) 详解(源码解析+移植)

    2020/5/19 更新了在使用 4.3.8 时遇到的一些问题说明 2018/5/16 大约一个月之前,Tracealyzer for FreeRTOS目前更新到了4.x,新版本不在区分针对哪个系统, ...

最新文章

  1. JPA J2SE 桌面应用范例
  2. C运算符解析及优先级
  3. virtualbox cannot access the kernel driver的解决办法
  4. Ubuntu-10.04中设置和修改root密码
  5. 三角函数公式大全(速查手册)
  6. MapStruct 详解
  7. 时间序列复杂性的度量—近似熵和样本熵
  8. Emacs的日常生活
  9. linux怎么用水星无线,用手机设置水星路由器步骤_手机设置mercury无线路由器-192路由网...
  10. Chrome 书签你知道怎么导入吗(谷歌浏览器的书签保存在哪里 、谷歌浏览器书签保存在哪个文件夹)
  11. A 大吉大利,今晚吃鸡--枪械篇
  12. HDU 6441(费马大定理+奇偶数列法)
  13. 软件测试-自动化测试及工具
  14. 帆软:像阿甘一样,奔跑在商业智能的赛道上
  15. OpenSSL BIO 自我扫盲
  16. 中英文paper文献引用技巧
  17. php搞笑证件,怎么制作搞笑证件 网络搞笑证件制作的软件怎么用的
  18. 安装Docker Desktop报错WSL 2 installation is incomplete的问题(解决报错)
  19. 计算机项目 rolling ball
  20. [市场产品部]MP部副部长(陈晓慧):餐饮公司LOGO设计与制作

热门文章

  1. CNN Long Short-Term Memory
  2. 148. Leetcode 455. 分发饼干 (贪心算法-基础题目)
  3. 子矩阵的最大累加和问题
  4. 用户画像-撸一部分代码啊
  5. 关于anaconda顺利安装之后,cmd提示conda不是内部命令的问题
  6. matlab编程风格
  7. (建议收藏)万字长文,带你一文吃透 Linux 提权
  8. matlab中的cellstr的用法,matlab中的cell array, cellstr()和char()的用法
  9. 当前订单不支持只花呗支付是什么意思_1、(跑腿介绍篇)支付宝花呗分期线下推广...
  10. Matlab中typecast函数由int8转换为int32