刷盘策略

CommitLogasyncPutMessage方法中可以看到在写入消息之后,调用了submitFlushRequest方法执行刷盘策略:

public class CommitLog {public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {// ...try {// 获取上一次写入的文件MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();// ...// 写入消息result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);// ...} finally {beginTimeInLock = 0;putMessageLock.unlock();}// ...// 执行刷盘CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);// ...}
}

刷盘有两种策略:

  • 同步刷盘,表示消息写入到内存之后需要立刻刷到磁盘文件中。

    同步刷盘会构建GroupCommitRequest组提交请求并设置本次刷盘后的位置偏移量的值(写入位置偏移量+写入数据字节数),然后将请求添加到flushDiskWatcherGroupCommitService中进行刷盘。

  • 异步刷盘,表示消息写入内存成功之后就返回,由MQ定时将数据刷入到磁盘中,会有一定的数据丢失风险。

public class CommitLog {// 监控刷盘private final FlushDiskWatcher flushDiskWatcher;public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {// 是否是同步刷盘if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {// 获取GroupCommitServicefinal GroupCommitService service = (GroupCommitService) this.flushCommitLogService;// 是否等待if (messageExt.isWaitStoreMsgOK()) {// 构建组提交请求,传入本次刷盘后位置的偏移量:写入位置偏移量+写入数据字节数GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());// 添加到wather中flushDiskWatcher.add(request);// 添加到serviceservice.putRequest(request);// 返回return request.future();} else {service.wakeup();return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}}// 如果是异步刷盘else {if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {flushCommitLogService.wakeup();} else  {commitLogService.wakeup();}return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}}
}

同步刷盘

如果使用的是同步刷盘,首先获取了GroupCommitService,然后构建GroupCommitRequest组提交请求,将请求添加到flushDiskWatcherGroupCommitService中,其中flushDiskWatcher用于监控刷盘是否超时,GroupCommitService用于提交刷盘数据。

构建GroupCommitRequest提交请求

GroupCommitRequestCommitLog的内部类:

  • nextOffset:写入位置偏移量+写入数据字节数,也就是本次刷盘成功后应该对应的flush偏移量
  • flushOKFuture:刷盘结果
  • deadLine:刷盘的限定时间,值为当前时间 + 传入的超时时间,超过限定时间还未刷盘完毕会被认为超时
public class CommitLog {public static class GroupCommitRequest {private final long nextOffset;// 刷盘状态private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();private final long deadLine;// 刷盘的限定时间,超过限定时间还未刷盘完毕会被认为超时public GroupCommitRequest(long nextOffset, long timeoutMillis) {this.nextOffset = nextOffset;// 设置限定时间:当前时间 + 超时时间this.deadLine = System.nanoTime() + (timeoutMillis * 1_000_000);}public void wakeupCustomer(final PutMessageStatus putMessageStatus) {// 结束刷盘,设置刷盘状态this.flushOKFuture.complete(putMessageStatus);}public CompletableFuture<PutMessageStatus> future() {// 返回刷盘状态return flushOKFuture;}}
}

GroupCommitService处理刷盘

GroupCommitServiceCommitLog的内部类,从继承关系中可知它实现了Runnable接口,在run方法调用waitForRunning等待刷盘请求的提交,然后处理刷盘,不过这个线程是在什么时候启动的呢?

public class CommitLog {/*** GroupCommit Service*/class GroupCommitService extends FlushCommitLogService {// ...// run方法public void run() {CommitLog.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// 等待刷盘请求的到来this.waitForRunning(10);// 处理刷盘this.doCommit();} catch (Exception e) {CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);}}// ...}}
}

刷盘线程的启动

在BrokerController的启动方法中,可以看到调用了messageStore的start方法,前面可知使用的是DefaultMessageStore,进入到DefaultMessageStore的start方法,它又调用了commitLog的start方法,在CommitLogstart方法中,启动了刷盘的线程和监控刷盘的线程:

public class BrokerController {public void start() throws Exception {if (this.messageStore != null) {// 启动this.messageStore.start();}// ...}
}public class DefaultMessageStore implements MessageStore {/*** @throws Exception*/public void start() throws Exception {// ...this.flushConsumeQueueService.start();// 调用CommitLog的启动方法this.commitLog.start();this.storeStatsService.start();// ...}
}public class CommitLog {private final FlushCommitLogService flushCommitLogService; // 刷盘private final FlushDiskWatcher flushDiskWatcher; // 监控刷盘private final FlushCommitLogService commitLogService; // commitLogServicepublic void start() {// 启动刷盘的线程this.flushCommitLogService.start();flushDiskWatcher.setDaemon(true);// 启动监控刷盘的线程flushDiskWatcher.start();if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {this.commitLogService.start();}}
}

刷盘请求的处理

既然知道了线程在何时启动的,接下来详细看一下GroupCommitService是如何处理刷盘提交请求的。

前面知道在GroupCommitService的run方法中,调用了waitForRunning方法等待刷盘请求,waitForRunningGroupCommitService父类ServiceThread中实现。ServiceThread是一个抽象类,实现了Runnable接口,里面使用了CountDownLatch进行线程间的通信,大小设为1。

waitForRunning方法在进入的时候先判断hasNotified是否为true(已通知),并尝试将其更新为false(未通知),由于hasNotified的初始化值为false,所以首次进入的时候条件不成立,不会进入到这个处理逻辑,会继续执行后面的代码。

接着调用 waitPoint的reset方法将其重置为1,并调用waitPoint的await方法进行等待:

// ServiceThread
public abstract class ServiceThread implements Runnable {// 是否通知,初始化为falseprotected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);// CountDownLatch用于线程间的通信protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);// 等待运行protected void waitForRunning(long interval) {// 判断hasNotified是否为true,并尝试将其更新为falseif (hasNotified.compareAndSet(true, false)) {// 调用onWaitEndthis.onWaitEnd();return;}// 重置waitPoint的值,也就是值为1waitPoint.reset();try {// 会一直等待waitPoint值降为0waitPoint.await(interval, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {log.error("Interrupted", e);} finally {// 是否被通知设置为falsehasNotified.set(false);this.onWaitEnd();}}
}

一、添加刷盘请求,唤醒刷盘线程

上面可知需要刷盘的时候调用了GroupCommitServiceputRequest方法添加刷盘请求,在putRequest方法中,将刷盘请求GroupCommitRequest添加到了requestsWrite组提交写请求链表中,然后调用wakeup方法唤醒刷盘线程,wakeup方法在它的父类ServiceThread中实现。

wakeup方法中可以看到,首先将hasNotified更改为了true表示处于已通知状态,然后调用了countDown方法,此时waitPoint值变成0,就会唤醒之前waitForRunning方法中一直在等待的线程。

public class CommitLog {/*** 组提交Service*/class GroupCommitService extends FlushCommitLogService {// 组提交写请求链表private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();// ...// 添加提交请求public synchronized void putRequest(final GroupCommitRequest request) {// 加锁lock.lock();try {// 加入到写请求链表this.requestsWrite.add(request);} finally {lock.unlock();}// 唤醒线程执行提交任务this.wakeup();}   // ...}}// ServiceThread
public abstract class ServiceThread implements Runnable {// CountDownLatch用于线程间的通信protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);// 唤醒刷盘线程public void wakeup() {// 更改状态为已通知状态if (hasNotified.compareAndSet(false, true)) {// waitPoint的值减1,由于大小设置为1,减1之后变为0,会唤醒等待的线程waitPoint.countDown(); }}// ...
}

二、线程被唤醒,执行刷盘前的操作

waitForRunning方法中的await方法一直在等待countdown的值变为0,当上一步调用了wakeup后,就会唤醒该线程,然后开始往下执行,在finally中可以看到将是否被通知hasNotified又设置为了false,然后调用了onWaitEnd方法,GroupCommitService方法中重写了该方法,里面又调用了swapRequests方法将读写请求列表的数据进行了交换,putRequest方法中将提交的刷盘请求放在了写链表中,经过交换,数据会被放在读链表中,后续进行刷盘时会从读链表中获取请求进行处理

// ServiceThread
public abstract class ServiceThread implements Runnable {// CountDownLatchprotected final CountDownLatch2 waitPoint = new CountDownLatch2(1);// 等待运行protected void waitForRunning(long interval) {if (hasNotified.compareAndSet(true, false)) {// 交换this.onWaitEnd();return;}// 重置waitPoint.reset();try {// 会一直等待countdown为0waitPoint.await(interval, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {log.error("Interrupted", e);} finally {// 是否被通知设置为falsehasNotified.set(false);this.onWaitEnd();}}
}public class CommitLog {/*** 组提交Service*/class GroupCommitService extends FlushCommitLogService {// 组提交写请求链表private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();// 组提交读请求链表private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();@Overrideprotected void onWaitEnd() {// 交换读写请求列表的数据请求this.swapRequests();}private void swapRequests() {// 加锁lock.lock();try {// 将读写请求链表的数据进行交换LinkedList<GroupCommitRequest> tmp = this.requestsWrite;this.requestsWrite = this.requestsRead;this.requestsRead = tmp;} finally {lock.unlock();}}// ...}
}

这里使用读写链表进行交换应该是为了提升性能,如果只使用一个链表,在提交请求的时候需要往链表中添加请求,此时需要加锁,而刷盘线程在处理完请求之后是需要从链表中移除请求的,假设添加请求时加的锁还未释放,刷盘线程就要一直等待,而添加和处理完全可以同时进行,所以使用了两个链表,在添加请求的时候使用写链表,处理请求的时候对读写链表的数据进行交换使用读链表,这样只需在交换数据的时候加锁,以此来提升性能。

三、执行刷盘

waitForRunning执行完毕后,会回到GroupCommitService中的run方法开始继续往后执行代码,从代码中可以看到接下来会调用doCommit方法执行刷盘。

doCommit方法中对读链表中的数据进行了判空,如果不为空,进行遍历处理每一个提交请求,处理逻辑如下:

  1. 获取CommitLog映射文件记录的刷盘位置偏移量flushedWhere,判断是否大于请求设定的刷盘位置偏移量nextOffset,正常情况下flush的位置应该小于本次刷入数据后的偏移量,所以如果flush位置大于等于本次请求设置的flush偏移量,本次将不能进行刷盘

  1. 开启一个循环,调用mappedFileQueueflush方法执行刷盘(具体的实现在异步刷盘的时候再看),由于CommitLog大小为1G,所以本次刷完之后,如果当前已经刷入的偏移量小于请求设定的位置,表示数据未刷完,需要继续刷,反之表示数据已经刷完,flushOK为true,for循环条件不满足结束执行。

  2. 请求处理之后会清空读链表。

public class CommitLog {/*** 组提交Service*/class GroupCommitService extends FlushCommitLogService {  // 运行public void run() {CommitLog.log.info(this.getServiceName() + " service started");// 如果没有停止while (!this.isStopped()) {try {// 等待唤醒刷盘线程this.waitForRunning(10);// 进行提交this.doCommit();} catch (Exception e) {CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);}}// 睡眠10毫秒try {Thread.sleep(10);} catch (InterruptedException e) {CommitLog.log.warn(this.getServiceName() + " Exception, ", e);}synchronized (this) {this.swapRequests();}// 停止之前提交一次this.doCommit();CommitLog.log.info(this.getServiceName() + " service end");}// 提交刷盘private void doCommit() {// 如果不为空if (!this.requestsRead.isEmpty()) {// 遍历刷盘请求for (GroupCommitRequest req : this.requestsRead) {// 获取映射文件的flush位置,判断是否大于请求设定的刷盘位置boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();for (int i = 0; i < 2 && !flushOK; i++) {// 进行刷盘CommitLog.this.mappedFileQueue.flush(0);// 由于CommitLog大小为1G,所以本次刷完之后,如果当前已经刷入的偏移量小于请求设定的位置,表示数据未刷完,需要继续刷,反之表示数据已经刷完,flushOK为true,for循环条件不满足结束执行flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();}// 设置刷盘结果req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);}long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();if (storeTimestamp > 0) {CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);}// 请求处理完之后清空链表this.requestsRead = new LinkedList<>();} else {// Because of individual messages is set to not sync flush, it// will come to this processCommitLog.this.mappedFileQueue.flush(0);}}}}

刷盘超时监控

FlushDiskWatcher用于监控刷盘请求的耗时,它也继承了ServiceThread,在Broker启动时开启了该线程,在run方法中,使用while循环,只要服务未停止,会一直从阻塞队列中获取提交的刷盘请求,开启while循环隔一段时间判断一下刷盘是否完成,如果未完成,会做如下判断:

  1. 使用当前时间减去请求设置的刷盘截止时间,如果已经超过截止时间,说明刷盘时间已经超时,调用wakeupCustomer方法设置刷盘结果为已超时
  2. 如果未超时,为了避免当前线程频繁的进行判断,将当前线程睡眠一会儿,睡眠的计算方式是使用刷盘请求设置的截止时间 - 当前时间,表示剩余的时间,然后除以1000000化为毫秒,得到距离刷盘截止时间的毫秒数sleepTime:
    • sleepTime如果为0,只能是当前时间等于截止时间,也就是到了截止时间,此时同样调用wakeupCustomer方法设置刷盘结果为已超时
    • sleepTime不为0,在10毫秒和sleepTime的值之间取较小的那个作为睡眠的毫秒数将当前线程睡眠,等待刷盘任务执行
public class FlushDiskWatcher extends ServiceThread {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);// 阻塞队列,存放提交请求private final LinkedBlockingQueue<GroupCommitRequest> commitRequests = new LinkedBlockingQueue<>();@Overridepublic String getServiceName() {return FlushDiskWatcher.class.getSimpleName();}@Overridepublic void run() {// 如果未停止while (!isStopped()) {GroupCommitRequest request = null;try {// 从阻塞队列中获取提交请求request = commitRequests.take();} catch (InterruptedException e) {log.warn("take flush disk commit request, but interrupted, this may caused by shutdown");continue;}// 如果还未完成while (!request.future().isDone()) {long now = System.nanoTime();// 如果已经超时if (now - request.getDeadLine() >= 0) {// 设置刷盘结果为超时request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);break;}// 避免频繁的判断,使用(截止时间 - 当前时间)/1000000 计算一个毫秒数long sleepTime = (request.getDeadLine() - now) / 1_000_000;// 在计算的毫秒数与10之间取最小的sleepTime = Math.min(10, sleepTime);// 如果sleepTime为0表示已经到了截止时间if (sleepTime == 0) {// 设置刷盘结果为超时request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);break;}try {// 睡眠等待刷盘任务的执行Thread.sleep(sleepTime);} catch (InterruptedException e) {log.warn("An exception occurred while waiting for flushing disk to complete. this may caused by shutdown");break;}}}}
}

异步刷盘

上面讲解了同步刷盘,接下来去看下异步刷盘,首先会判断是否使用了暂存池,如果未开启调用flushCommitLogServicewakeup唤醒刷盘线程,否则使用commitLogService先将数据写入到FileChannel,然后统一进行刷盘:

 public class CommitLog {private final FlushDiskWatcher flushDiskWatcher;public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {// 是否是同步刷盘if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {// ...}// 如果是异步刷盘else {// 如果未使用暂存池if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {// 唤醒刷盘线程进行刷盘flushCommitLogService.wakeup();} else  {// 如果使用暂存池,使用commitLogService,先将数据写入到FILECHANNEL,然后统一进行刷盘commitLogService.wakeup();}// 返回结果return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}}
}

CommitLog的构造函数中可以看到,commitLogService使用的是CommitRealTimeService进行实例化的,flushCommitLogService需要根据设置决定使用哪种类型进行实例化:

  • 如果是同步刷盘,使用GroupCommitService,由前面的同步刷盘可知,使用的就是GroupCommitService进行刷盘的。
  • 如果是异步刷盘,使用FlushRealTimeService

所以接下来需要关注CommitRealTimeServiceFlushRealTimeService

public class CommitLog {    private final FlushCommitLogService flushCommitLogService;// 刷盘Serviceprivate final FlushCommitLogService commitLogService;public CommitLog(final DefaultMessageStore defaultMessageStore) {// 如果设置的同步刷盘if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {// 使用GroupCommitServicethis.flushCommitLogService = new GroupCommitService();} else {// 使用FlushRealTimeServicethis.flushCommitLogService = new FlushRealTimeService();}// commitLogServicethis.commitLogService = new CommitRealTimeService();}
}

CommitRealTimeService

在开启暂存池时,会使用CommitRealTimeService,它继承了FlushCommitLogService,所以会实现run方法,处理逻辑如下:

  1. 从配置信息中获取提交间隔每次提交的最少页数两次提交的最大间隔时间
  2. 如果当前时间大于上次提交时间+两次提交的最大间隔时间,意味着已经有比较长的一段时间没有进行提交了,需要尽快刷盘,此时将每次提交的最少页数设置为0不限制提交页数
  3. 调用mappedFileQueuecommit方法进行提交,并返回提交的结果:
    • 如果结果为true表示未提交任何数据
    • 如果结果为false表示进行了数据提交,需要等待刷盘
  4. 判断提交返回结果是否返回false,如果是调用flushCommitLogService的wakeup方法唤醒刷盘线程,进行刷盘
  5. 调用waitForRunning等待下一次提交处理
class CommitRealTimeService extends FlushCommitLogService {// 上次提交时间戳private long lastCommitTimestamp = 0;@Overridepublic void run() {CommitLog.log.info(this.getServiceName() + " service started");// 如果未停止while (!this.isStopped()) {// 获取提交间隔int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();// 一次提交的最少页数int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();// 两次提交的最大间隔时间int commitDataThoroughInterval =CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();// 开始时间long begin = System.currentTimeMillis();// 如果当前时间大于上次提交时间+提交的最大间隔时间if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {this.lastCommitTimestamp = begin; // 提交时间commitDataLeastPages = 0;// 最少提交页数设为0,表示不限制提交页数}try {// 提交boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);// 提交结束时间long end = System.currentTimeMillis();// 如果返回false表示提交了一部分数据但是还未进行刷盘if (!result) {// 再次更新提交时间戳this.lastCommitTimestamp = end;// 唤醒flush线程进行刷盘flushCommitLogService.wakeup();}if (end - begin > 500) {log.info("Commit data to file costs {} ms", end - begin);}// 等待下一次提交this.waitForRunning(interval);} catch (Throwable e) {CommitLog.log.error(this.getServiceName() + " service has exception. ", e);}}boolean result = false;for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {result = CommitLog.this.mappedFileQueue.commit(0);CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));}CommitLog.log.info(this.getServiceName() + " service end");}}

提交

提交的方法在MappedFileQueuecommit方法中实现,处理逻辑如下:

  1. 根据记录的CommitLog文件提交位置的偏移量获取映射文件,如果获取不为空,调用MappedFile的commit方法进行提交,然后返回本次提交数据的偏移量
  2. 记录本次提交的偏移量:文件的偏移量 + 提交数据的偏移量
  3. 判断本次提交的偏移量是否等于上一次的提交偏移量,如果等于表示本次未提交任何数据,返回结果置为true,否则表示提交了数据,等待刷盘,返回结果为false
  4. 更新上一次提交偏移量committedWhere的值为本次的提交偏移量的值
public class MappedFileQueue {protected long flushedWhere = 0; // flush的位置偏移量private long committedWhere = 0; // 提交的位置偏移量public boolean commit(final int commitLeastPages) {boolean result = true;// 根据提交位置的偏移量获取映射文件MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);if (mappedFile != null) {// 调用mappedFile的commit方法进行提交,返回提交数据的偏移量int offset = mappedFile.commit(commitLeastPages);// 记录本次提交的偏移量:文件的偏移量 + 提交数据的偏移量long where = mappedFile.getFileFromOffset() + offset;// 设置返回结果,如果本次提交偏移量等于上一次的提交偏移量为true,表示什么也没干,否则表示提交了数据,等待刷盘result = where == this.committedWhere;// 更新上一次提交偏移量的值为本次的this.committedWhere = where;}return result;}
}

MappedFile

MappedFile中记录CommitLog的写入位置wrotePosition、提交位置committedPosition以及flush位置flushedPosition,在commit方法中,调用了isAbleToCommit判断是否可以提交数据,判断的流程如下:

  1. 获取提交数据的位置偏移量和写入数据的位置偏移量

  2. 如果最少提交页数大于0,计算本次写入的页数是否大于或等于最少提交页数

    本次写入数据的页数计算方法:写入位置/页大小 - flush位置/页大小

  3. 如果以上条件都满足,判断写入位置是否大于flush位置,如果大于表示有一部数据未flush可以进行提交

满足提交条件后,就会调用commit0方法提交数据,将数据写入到fileChannel中:

public class MappedFile extends ReferenceResource {// 数据写入位置protected final AtomicInteger wrotePosition = new AtomicInteger(0);// 数据提交位置protected final AtomicInteger committedPosition = new AtomicInteger(0);// 数据flush位置private final AtomicInteger flushedPosition = new AtomicInteger(0);// 提交数据public int commit(final int commitLeastPages) {// 如果writeBuffer为空if (writeBuffer == null) {// 不需要提交任何数据到,返回之前记录的写入位置return this.wrotePosition.get();}// 如果可以提交数据if (this.isAbleToCommit(commitLeastPages)) {if (this.hold()) {// 提交数据commit0();this.release();} else {log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());}}// All dirty data has been committed to FileChannel.if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {this.transientStorePool.returnBuffer(writeBuffer);this.writeBuffer = null;}// 返回提交位置return this.committedPosition.get();}// 是否可以提交数据protected boolean isAbleToCommit(final int commitLeastPages) {// 获取提交数据的位置偏移量int flush = this.committedPosition.get();// 获取写入数据的位置偏移量int write = this.wrotePosition.get();if (this.isFull()) {return true;}// 如果最少提交页数大于0if (commitLeastPages > 0) {// 写入位置/页大小 - flush位置/页大小 是否大于至少提交的页数return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;}// 判断是否需要flush数据return write > flush;}protected void commit0() {// 获取写入位置int writePos = this.wrotePosition.get();// 获取上次提交的位置int lastCommittedPosition = this.committedPosition.get();if (writePos - lastCommittedPosition > 0) {try {// 创建共享缓冲区ByteBuffer byteBuffer = writeBuffer.slice();// 设置上一次提交位置byteBuffer.position(lastCommittedPosition);byteBuffer.limit(writePos);this.fileChannel.position(lastCommittedPosition);// 数据写入fileChannelthis.fileChannel.write(byteBuffer);// 更新写入的位置this.committedPosition.set(writePos);} catch (Throwable e) {log.error("Error occurred when commit data to FileChannel.", e);}}}
}

FlushRealTimeService

如果未开启暂存池,会直接使用FlushRealTimeService进行刷盘,当然如果开启暂存池,写入一批数据后,同样会使用FlushRealTimeService进行刷盘FlushRealTimeService同样继承了FlushCommitLogService,是用于执行刷盘的线程,处理逻辑与提交刷盘数据逻辑相似,只不过不是提交数据,而是调用flush方法将提交的数据刷入磁盘:

  1. 从配置信息中获取flush间隔每次flush的最少页数两次flush的最大间隔时间
  2. 如果当前时间大于上次flush时间+两次flush的最大间隔时间,意味着已经有比较长的一段时间没有进行flush,此时将每次flush的最少页数设置为0不限制flush页数
  3. 调用waitForRunning等待被唤醒
  4. 如果被唤醒,调用mappedFileQueueflush方法进行刷盘
class FlushRealTimeService extends FlushCommitLogService {private long lastFlushTimestamp = 0; // 上一次flush的时间private long printTimes = 0;public void run() {CommitLog.log.info(this.getServiceName() + " service started");// 如果未停止while (!this.isStopped()) {// boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();// 获取flush间隔int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();// flush至少包含的页数int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();// 两次flush的时间间隔int flushPhysicQueueThoroughInterval =CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();boolean printFlushProgress = false;long currentTimeMillis = System.currentTimeMillis();// 如果当前毫秒数 大于上次flush时间 + 两次flush之间的间隔if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {this.lastFlushTimestamp = currentTimeMillis; // 更新flush时间flushPhysicQueueLeastPages = 0; // flush至少包含的页数置为0printFlushProgress = (printTimes++ % 10) == 0;}try {// if (flushCommitLogTimed) {// 睡眠Thread.sleep(interval);} else {// 等待flush被唤醒this.waitForRunning(interval);}if (printFlushProgress) {// 打印刷盘进程this.printFlushProgress();}long begin = System.currentTimeMillis();// 进行刷盘CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();if (storeTimestamp > 0) {CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);}long past = System.currentTimeMillis() - begin;if (past > 500) {log.info("Flush data to disk costs {} ms", past);}} catch (Throwable e) {CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);this.printFlushProgress();}}// 如果服务停止,确保数据被刷盘boolean result = false;for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {// 进行刷盘result = CommitLog.this.mappedFileQueue.flush(0);CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));}this.printFlushProgress();CommitLog.log.info(this.getServiceName() + " service end");}

刷盘

刷盘的方法在MappedFileQueueflush方法中实现,处理逻辑如下:

  1. 根据 flush的位置偏移量获取映射文件
  2. 调用mappedFile的flush方法进行刷盘,并返回刷盘后的位置偏移量
  3. 计算最新的flush偏移量
  4. 更新flushedWhere的值为最新的flush偏移量
public class MappedFileQueue {protected long flushedWhere = 0; // flush的位置偏移量private long committedWhere = 0; // 提交的位置偏移量// flush刷盘public boolean flush(final int flushLeastPages) {boolean result = true;// 获取flush的位置偏移量映射文件MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);if (mappedFile != null) {// 获取时间戳long tmpTimeStamp = mappedFile.getStoreTimestamp();// 调用MappedFile的flush方法进行刷盘,返回刷盘后的偏移量int offset = mappedFile.flush(flushLeastPages);// 计算最新的flush偏移量long where = mappedFile.getFileFromOffset() + offset;result = where == this.flushedWhere;// 更新flush偏移量this.flushedWhere = where;if (0 == flushLeastPages) {this.storeTimestamp = tmpTimeStamp;}}// 返回flush的偏移量return result;}
}

flush的逻辑也与commit方法的逻辑类似:

  1. 调用isAbleToFlush判断是否满足刷盘条件,获取上次flush位置偏移量和当前写入位置偏移量进行如下校验:

    • 文件是否已写满,即文件大小是否与写入数据位置相等,如果相等说明文件已经写满需要执行刷盘,满足刷盘条件

    • 如果最少flush页数大于0,计算本次flush的页数是否大于或等于最少flush页数,如果满足可以进行刷盘

      本次flush数据的页数计算方法:写入位置/页大小 - flush位置/页大小

    • 如果写入位置偏移量是否大于flush位置偏移量,如果大于表示有数据未进行刷盘,满足刷盘条件

  2. 调用fileChannel的force或者mappedByteBuffer的force方法进行刷盘

  3. 记录本次flush的位置,并作为结果返回

public class MappedFile extends ReferenceResource {protected final AtomicInteger wrotePosition = new AtomicInteger(0);protected final AtomicInteger committedPosition = new AtomicInteger(0);private final AtomicInteger flushedPosition = new AtomicInteger(0);/*** 进行刷盘并返回flush后的偏移量*/public int flush(final int flushLeastPages) {// 是否可以刷盘if (this.isAbleToFlush(flushLeastPages)) {if (this.hold()) {int value = getReadPosition();try {// 如果writeBuffer不为空if (writeBuffer != null || this.fileChannel.position() != 0) {// 将数据刷到硬盘this.fileChannel.force(false);} else {this.mappedByteBuffer.force();}} catch (Throwable e) {log.error("Error occurred when force data to disk.", e);}// 记录flush位置this.flushedPosition.set(value);this.release();} else {log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());this.flushedPosition.set(getReadPosition());}}// 返回flush位置return this.getFlushedPosition();}// 是否可以刷盘private boolean isAbleToFlush(final int flushLeastPages) {// 获取上次flush位置int flush = this.flushedPosition.get();// 写入位置偏移量int write = getReadPosition();if (this.isFull()) {return true;}// 如果flush的页数大于0,校验本次flush的页数是否满足条件if (flushLeastPages > 0) {// 本次flush的页数:写入位置偏移量/OS_PAGE_SIZE - 上次flush位置偏移量/OS_PAGE_SIZE,是否大于flushLeastPagesreturn ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;} // 写入位置偏移量是否大于flush位置偏移量return write > flush;}// 文件是否已写满public boolean isFull() {// 文件大小是否与写入数据位置相等return this.fileSize == this.wrotePosition.get();}/*** 返回当前有效数据的位置*/public int getReadPosition() {// 如果writeBuffer为空使用写入位置,否则使用提交位置return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();}
}

总结

参考
丁威、周继锋《RocketMQ技术内幕》

RocketMQ版本:4.9.3

【RocketMQ】消息的刷盘机制相关推荐

  1. RocketMQ消息存储之刷盘机制(原理篇)

    一.前言 RocketMQ的刷盘机制是一种确保消息可靠性的机制,简单来说就是Broker收到消息后,将消息存储到磁盘上.这样可以解决几个问题: 存储空间问题.内存空间有限,存入磁盘可以维护更多消息. ...

  2. RocketMQ5.0.0消息存储<四>_刷盘机制

    目录 一.刷盘概览 二.Broker刷盘机制 1. 同步刷盘 2. 异步刷盘 1):未开启堆外内存池 2):开启堆外内存池 三.参考资料 一.刷盘概览 RocketMQ存储与读写是基于JDK NIO的 ...

  3. RocketMQ刷盘机制

    概览 RocketMQ的存储读写是基于JDK NIO的内存映射机制的,消息存储时首先将消息追加到内存中.在根据不同的刷盘策略在不同的时间进行刷盘.如果是同步刷盘,消息追加到内存后,将同步调用Mappe ...

  4. 深入源码聊聊RocketMQ刷盘机制

    大家好,我是Leo. 今天聊一下RocketMQ的三种刷盘机制. 同步刷盘 异步刷盘(RocketMQ默认) 异步刷盘+缓冲区 出自微信公众号[欢少的成长之路] 本章概括 同步刷盘 整个同步刷盘策略由 ...

  5. RocketMQ的刷盘机制

    RocketMQ需要将消息存储到磁盘上,这样才能保证断电后消息不会丢失.同时这样才可以让存储的消息量可以超出内存的限制.RocketMQ为了提高性能,会尽量保证磁盘的顺序写.消息在写入磁盘时,有两种写 ...

  6. 顺藤摸瓜RocketMQ之刷盘机制debug解析

    文章目录 Rocketmq 刷盘机制 三个文件 indexFile consumeQueue commitlog 异步刷盘 consumerqueue和indexfile文件是什么时候更新的 同步刷盘 ...

  7. mysql刷盘机制详解

    目录 刷盘机制总览 log buffer(innodb的,由存储引擎分配) binlog cache(由server分配) buffer pool 自适应刷脏页Adaptive Flushing 刷盘 ...

  8. MySQL数据和日志的刷盘机制以及双一配置

    详细介绍了MySQL数据和日志的刷盘机制以及双一配置,双一配置可以保证Mysql日志数据不丢失. 文章目录 1 内存数据的刷盘机制 2 MySQL数据的刷盘 2.1 刷盘数据来源 2.2 脏页以及刷盘 ...

  9. RocketMQ消息存储、刷盘、负载均衡

    消息存储 消息存储是RocketMQ中最为复杂和最为重要的一部分. 消息存储总体架构 消息存储架构图: minOffset:当前队列的最小消息偏移量,如果消费时指定从最早消费,就是从该偏移量消费. m ...

最新文章

  1. 给图片加上带版权的水印
  2. 【EASYDOM系列教程】之 textContent 属性
  3. 成功解决CondaError: Error reading file, file should be a text file containing packages conda create --he
  4. goroutine sync.Mutex互斥锁Lock的使用
  5. PHP与MySQL开发中页面乱码的产生与解决
  6. java当前时间转化毫秒_Java中将毫秒转化为日期的方法
  7. HTMLParser-实战
  8. JDK8 lambda的会话指南–术语表
  9. vfp程序改错 计算机和英语,vfp程序改错教案.doc
  10. ICCV 2019丨基于跨视角信息融合的三维人体姿态估计
  11. 华为泛BYOD融合网络解决方案实践与演示
  12. CUDA编程:与OpenCV结合
  13. mysql导出数据大概得多久_MySQL 导出数据
  14. xdf文件转换成pdf_PDF文件转换成PPT演示文稿教程
  15. 计算机房维修保养记录表,机房设备系统运行及维护记录学习表格.docx
  16. Android 调用系统拍照后返回的图片变小了(变模糊了)
  17. Linux下软连接(softlink)和硬连接(hardlink)的区别
  18. Django-登录注册
  19. js获取网页元素文本
  20. 180亿美元估值,快手用什么来支撑?

热门文章

  1. 【转】当你 林俊杰 歌曲链接(支持QQ空间)
  2. ebay 获取商品详细信息 getitem getItemByLegacyId FindItemsByProduct getProductDetailsRequest
  3. 炫彩logo粒子效果
  4. 费马小定理证明 (copy的,自己捋清楚)
  5. 【日语口语词典学习】第0001页
  6. PICE(3):CassandraStreaming - gRPC-CQL Service
  7. 基于mmdetection 旋转目标检测(OBB detection)+DOTA数据集自定义数据集+配docker
  8. 利用iSpring Free上传PPt课件并上传至Moodle教学平台
  9. android studio lua插件,[置顶] android Studio 配置LUA 开发环境
  10. [系统分享]Windows 10 家庭中文版 纯净无捆绑