文章目录

  • 版本
  • 简介
  • FlushCommitLogService
  • 同步刷盘
    • GroupCommitService
  • 异步刷盘
    • CommitRealTimeService
    • FlushRealTimeService

版本

  1. 基于rocketmq-all-4.3.1版本

简介

  1. RocketMQ消息存储是首先将消息追加到内存中,然后根据刷盘策略在不同时间刷盘。

    • 同步刷盘,消息追加到内存,调用**MappedByteBuffer.force()**方法实现刷盘
    • 异步刷盘,消息追加到内存后,立即返回给Producer。使用单独的异步线程按照一定的频率执行刷盘操作
  2. Index文件的刷盘并不是采取定时刷盘机制,而是每更新一次Index文件就会将上一次的改动写入磁盘

  3. 刷盘代码CommitLog#handleDiskFlush,可以看到同步刷盘由GroupCommitService完成

    public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {//刷盘策略,同步刷盘阻塞等待,异步刷盘唤醒commitLogService// Synchronization flushif (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;if (messageExt.isWaitStoreMsgOK()) {//构建刷盘请求放入GroupCommitService队列中(List中)GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());service.putRequest(request);//GroupCommitService线程在broker启动时会启动,阻塞,等待线程刷盘完成,默认超时时间5s,如果超时返回false//即如果超时,响应给生产者的是FLUSH_DISK_TIMEOUTboolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()+ " client address: " + messageExt.getBornHostString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);}} else {service.wakeup();}}// Asynchronous flushelse {if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {//使用MappedByteBuffer,默认策略flushCommitLogService.wakeup();} else {//异步刷盘,如果开启TransientStorePool,使用写入缓冲区+FileChannelcommitLogService.wakeup();}}
    }

FlushCommitLogService

  1. UML图

  2. 实现类

    • CommitRealTimeService:异步刷盘并且transientStorePoolEnable设置为true
    • FlushRealTimeService:异步刷盘并且transientStorePoolEnable设置为false
    • GroupCommitService:同步刷盘
  3. FlushCommitLogService没有任何实现,只是定义了一个常量

    abstract class FlushCommitLogService extends ServiceThread {protected static final int RETRY_TIMES_OVER = 10;
    }
    

同步刷盘

  1. 同步刷盘指的是在消息追加到内存映射文件(MappedByteBuffer)的内存中后,立即将数据从内存写入磁盘文件(MappedByteBuffer.force()

GroupCommitService

  1. 同步刷盘由GroupCommitService完成

    • 第一步:构建刷盘请求对象GroupCommitRequest,并将对象添加到requestsWrite队列中
    • 第二步:默认等待5s,如果返回false,响应给生产者的是FLUSH_DISK_TIMEOUT
  2. GroupCommitService有一个写队列和一个读队列,即将请求和刷盘进行读写分离。请求提交到写列表,刷盘时处理读列表,刷盘结束交换列表,循环往复

  3. GroupCommitRequest

    public static class GroupCommitRequest {private final long nextOffset;private final CountDownLatch countDownLatch = new CountDownLatch(1);//刷盘结果private volatile boolean flushOK = false;public GroupCommitRequest(long nextOffset) {this.nextOffset = nextOffset;}public long getNextOffset() {return nextOffset;}//唤醒阻塞等待的线程//FIXME by jannal 此处有并发问题,this.flushOK = flushOK不是原子操作。正常需要加同步//由于只有一个线程操作,所以即使不是原子性也问题不大public void wakeupCustomer(final boolean flushOK) {this.flushOK = flushOK;this.countDownLatch.countDown();}//等待刷盘public boolean waitForFlush(long timeout) {try {this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);return this.flushOK;} catch (InterruptedException e) {log.error("Interrupted", e);return false;}}
    }
    
  4. GroupCommitService源码分析

    class GroupCommitService extends FlushCommitLogService {//读写容器private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();public synchronized void putRequest(final GroupCommitRequest request) {//FIXME by jannal 思考:既然方法已经加锁,为什么此处需要再次加锁?//swapRequests可能在其他线程并发执行,所以需要给requestsWrite单独加锁//swapRequests导致requestsWrite的引用变化,会不会出现问题?//可以将swapRequests加一个与操作requestsWrite的锁,来优化此处代码,避免不好理解synchronized (this.requestsWrite) {this.requestsWrite.add(request);}// 通知服务线程已经接收到GroupCommitRequest//FIXME 直接调用父类的this.wakeUp()多好?if (hasNotified.compareAndSet(false, true)) {waitPoint.countDown(); // notify}}private void swapRequests() {// volatile可以保证可见性,requestsWrite写入时加锁了,所以此处无需加锁,通过volatile可以实现低开销的读List<GroupCommitRequest> tmp = this.requestsWrite;this.requestsWrite = this.requestsRead;this.requestsRead = tmp;}private void doCommit() {synchronized (this.requestsRead) {if (!this.requestsRead.isEmpty()) {for (GroupCommitRequest req : this.requestsRead) {// There may be a message in the next file, so a maximum of// two times the flushboolean flushOK = false;/*** 两个MappedFile(写第N个消息时,MappedFile 已满,创建了一个新的),所以需要有循环2次。*/for (int i = 0; i < 2 && !flushOK; i++) {//请求的offset超过已经flushed的offset,则强制刷盘flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();if (!flushOK) {CommitLog.this.mappedFileQueue.flush(0);}}req.wakeupCustomer(flushOK);}long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();if (storeTimestamp > 0) {//更新刷盘检测点StoreCheckpoint中的physicMsg Timestamp//刷盘检测点的刷盘操作将在刷写消息队列文件时触发CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);}this.requestsRead.clear();} else {// Because of individual messages is set to not sync flush, it// will come to this processCommitLog.this.mappedFileQueue.flush(0);}}}public void run() {CommitLog.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {//调用swapRequests=>doCommitthis.waitForRunning(10);this.doCommit();} catch (Exception e) {CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);}}// Under normal circumstances shutdown, wait for the arrival of the// request, and then flushtry {Thread.sleep(10);} catch (InterruptedException e) {CommitLog.log.warn("GroupCommitService Exception, ", e);}//FIXME by jannal 上面没有加锁,这里为啥加锁?synchronized (this) {this.swapRequests();}this.doCommit();CommitLog.log.info(this.getServiceName() + " service end");}@Overrideprotected void onWaitEnd() {this.swapRequests();}@Overridepublic String getServiceName() {return GroupCommitService.class.getSimpleName();}@Overridepublic long getJointime() {return 1000 * 60 * 5;}
    }
    

异步刷盘

  1. CommitLog#handleDiskFlush中异步刷盘代码如下。异步刷盘有两种方式

    • 开启transientStorePoolEnable=true机制则启动CommitRealTimeService异步刷盘方式。
    • 如果没有开启transientStorePoolEnable=false,则启动FlushRealTimeService
    • CommitRealTimeService在commit成功后,会执行flushCommitLogService.wakeup();也就是让FlushRealTimeService将Page Cache中的数据同步至磁盘。
    if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {//使用MappedByteBuffer,默认策略flushCommitLogService.wakeup();
    } else {//异步刷盘,如果开启TransientStorePool,使用写入缓冲区+FileChannelcommitLogService.wakeup();
    }
    
  2. 异步刷盘流程

CommitRealTimeService

  1. 如果transientStorePoolEnable=true,Broker会申请一个与CommitLog同样大小的堆外内存,该堆外内存会使用内存锁定(mlock),将其变为常驻内存,避免被操作系统调到swap空间中。

    • 消息追加到堆外内存
    • 提交到内存映射文件中
    • 使用flush刷盘
  2. CommitRealTimeService服务线程执行逻辑

    • 默认每200ms将ByteBuffer新追加的数据(新追加的数据=wrotePosition-commitedPosition)提交到FileChannel中

FlushRealTimeService

  1. 无论是否开启写入缓冲池,刷盘最终都由FlushRealTimeService来执行,CommitRealTimeService在commit成功后,会执行flushCommitLogService.wakeup();也就是让FlushRealTimeService将Page Cache中的数据同步至磁盘。
  2. 将内存(Page Cache)中的数据同步至磁盘(flush)有一些前提条件
    • 若当前时间距离上次实际刷盘时间已经超过10S,则会忽略其他所有前提,确定刷盘,这样即使服务器宕机了最多也仅丢失10S的数据,提高了消息队列的可靠性。
    • 正常情况下刷盘需要满足持久化数据大于配置的最小页数,默认4,也就是新写入内存中的数据大于或等于16KB(4*4KB)
      • 当开启写入缓冲,也就是追加到fileChannel的数据大于或等于16KB
      • 未开启写入缓冲则是追加到mappedByteBuffer的数据大于或等于16KB

RocketMQ源码分析(十二)之CommitLog同步与异步刷盘相关推荐

  1. 【转】ABP源码分析十二:本地化

    本文逐个分析ABP中涉及到localization的接口和类,以及他们之间的关系.本地化主要涉及两个方面:一个是语言(Language)的管理,这部分相对简单.另一个是语言对应得本地化资源(Local ...

  2. 【vue-router源码】十二、useRoute、useRouter、useLink源码分析

    [vue-rouer源码]系列文章 [vue-router源码]一.router.install解析 [vue-router源码]二.createWebHistory.createWebHashHis ...

  3. RocketMQ源码分析之延迟消息

    文章目录 前言 一.延迟消息 1.特点 2.使用场景 3.demo 二.发送延迟消息 三.broker端存储延迟消息 四.总结 1.延迟消息工作原理 2.延迟消息在消费者消费重试中的应用 前言 本篇文 ...

  4. GCC源码分析(十六) — gimple转RTL(pass_expand)(下)

    版权声明:本文为CSDN博主「ashimida@」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明. 原文链接:https://blog.csdn.net/lidan1 ...

  5. Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现(1)

    一.综述 HDFS写文件是整个Hadoop中最为复杂的流程之一,它涉及到HDFS中NameNode.DataNode.DFSClient等众多角色的分工与合作. 首先上一段代码,客户端是如何写文件的: ...

  6. Spring Cloud源码分析(二)Ribbon(续)

    因文章长度限制,故分为两篇.上一篇:<Spring Cloud源码分析(二)Ribbon> 负载均衡策略 通过上一篇对Ribbon的源码解读,我们已经对Ribbon实现的负载均衡器以及其中 ...

  7. Koa源码分析(二) -- co的实现

    Abstract 本系列是关于Koa框架的文章,目前关注版本是Koa v1.主要分为以下几个方面: Koa源码分析(一) -- generator Koa源码分析(二) -- co的实现 Koa源码分 ...

  8. Flume 1.7 源码分析(二)整体架构

    Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 Flume 1.7 源码分析(四)从Source写数据到Channe ...

  9. Cowboy 源码分析(十八)

    在上一篇中,我们整理了下cowboy_http_protocol:header/3函数,在文章的末尾留下2个没有讲到的函数,今天,我们先看下cowboy_http_protocol:error_ter ...

最新文章

  1. 同时上哈佛,还一起一作发Nature!这对95后学霸情侣让人慕了……
  2. linux运维实战练习-2015年9月13日-9月15日课程作业(练习)安排
  3. 处理数字_6_NULL值的列的个数
  4. php 自带过滤和转义函数
  5. php符号 set,PHP 符号大全
  6. IDC:大数据——数字化转型时代的大商机
  7. 香农-范诺算法(Shannon-Fano coding)原理
  8. python中del怎么用_Python范例中的del关键字
  9. python3文本文件读取方法_Python3读取文件常用方法实例分析
  10. 如何查看2020最新版谷歌地球高精度卫星地图(附下载方法)
  11. 如何用计算机巧记英语词汇,小学英语单词巧记法
  12. 《日语综合教程》第七册 第六課 自然と人間
  13. 1182 -- 对决
  14. 牛客网华为云服务器,把通过牛客网注册的华为云服务器用起来!
  15. CTFhub命令注入,过滤了cat命令,过滤了空格,过滤目录分隔符,过滤运算符,综合练习
  16. 在iPad应用中嵌入字体的方法——非人云亦云版
  17. nginx和tomcat本地部署
  18. 985、211学校分为哪八个档次?
  19. 软件网关工业生产设备PLC数据采集转存数据库记录仪IOT gateway
  20. PNAS:青少年大脑功能连接的保守和破坏性模式变化

热门文章

  1. Java web导出excel文件 - poi
  2. Using Hibernate ResultTransformer
  3. ping命令和traceroute命令
  4. MySQL 数据库连接使用
  5. layui tips
  6. 医院全成本分析---数据来源网络,有部分自己的理解
  7. Java如何转换图片的格式?
  8. 【设计模式系列】5.装饰器模式和适配器模式
  9. Android 10 录屏适配
  10. 华为服务器连接显示器节电模式,显示器黑屏显示省电模式怎么办_显示器黑屏显示省电模式如何解决...