RocketMQ源码分析(十二)之CommitLog同步与异步刷盘
文章目录
- 版本
- 简介
- FlushCommitLogService
- 同步刷盘
- GroupCommitService
- 异步刷盘
- CommitRealTimeService
- FlushRealTimeService
版本
- 基于
rocketmq-all-4.3.1
版本
简介
RocketMQ消息存储是首先将消息追加到内存中,然后根据刷盘策略在不同时间刷盘。
- 同步刷盘,消息追加到内存,调用**MappedByteBuffer.force()**方法实现刷盘
- 异步刷盘,消息追加到内存后,立即返回给Producer。使用单独的异步线程按照一定的频率执行刷盘操作
Index文件的刷盘并不是采取定时刷盘机制,而是每更新一次Index文件就会将上一次的改动写入磁盘
刷盘代码CommitLog#handleDiskFlush,可以看到同步刷盘由GroupCommitService完成
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {//刷盘策略,同步刷盘阻塞等待,异步刷盘唤醒commitLogService// Synchronization flushif (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;if (messageExt.isWaitStoreMsgOK()) {//构建刷盘请求放入GroupCommitService队列中(List中)GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());service.putRequest(request);//GroupCommitService线程在broker启动时会启动,阻塞,等待线程刷盘完成,默认超时时间5s,如果超时返回false//即如果超时,响应给生产者的是FLUSH_DISK_TIMEOUTboolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()+ " client address: " + messageExt.getBornHostString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);}} else {service.wakeup();}}// Asynchronous flushelse {if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {//使用MappedByteBuffer,默认策略flushCommitLogService.wakeup();} else {//异步刷盘,如果开启TransientStorePool,使用写入缓冲区+FileChannelcommitLogService.wakeup();}} }
FlushCommitLogService
UML图
实现类
CommitRealTimeService
:异步刷盘并且transientStorePoolEnable
设置为trueFlushRealTimeService
:异步刷盘并且transientStorePoolEnable
设置为falseGroupCommitService
:同步刷盘
FlushCommitLogService没有任何实现,只是定义了一个常量
abstract class FlushCommitLogService extends ServiceThread {protected static final int RETRY_TIMES_OVER = 10; }
同步刷盘
- 同步刷盘指的是在消息追加到内存映射文件(MappedByteBuffer)的内存中后,立即将数据从内存写入磁盘文件(MappedByteBuffer.force())
GroupCommitService
同步刷盘由GroupCommitService完成
- 第一步:构建刷盘请求对象
GroupCommitRequest
,并将对象添加到requestsWrite
队列中 - 第二步:默认等待5s,如果返回false,响应给生产者的是
FLUSH_DISK_TIMEOUT
- 第一步:构建刷盘请求对象
GroupCommitService有一个写队列和一个读队列,即将请求和刷盘进行读写分离。请求提交到写列表,刷盘时处理读列表,刷盘结束交换列表,循环往复
GroupCommitRequest
public static class GroupCommitRequest {private final long nextOffset;private final CountDownLatch countDownLatch = new CountDownLatch(1);//刷盘结果private volatile boolean flushOK = false;public GroupCommitRequest(long nextOffset) {this.nextOffset = nextOffset;}public long getNextOffset() {return nextOffset;}//唤醒阻塞等待的线程//FIXME by jannal 此处有并发问题,this.flushOK = flushOK不是原子操作。正常需要加同步//由于只有一个线程操作,所以即使不是原子性也问题不大public void wakeupCustomer(final boolean flushOK) {this.flushOK = flushOK;this.countDownLatch.countDown();}//等待刷盘public boolean waitForFlush(long timeout) {try {this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);return this.flushOK;} catch (InterruptedException e) {log.error("Interrupted", e);return false;}} }
GroupCommitService源码分析
class GroupCommitService extends FlushCommitLogService {//读写容器private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();public synchronized void putRequest(final GroupCommitRequest request) {//FIXME by jannal 思考:既然方法已经加锁,为什么此处需要再次加锁?//swapRequests可能在其他线程并发执行,所以需要给requestsWrite单独加锁//swapRequests导致requestsWrite的引用变化,会不会出现问题?//可以将swapRequests加一个与操作requestsWrite的锁,来优化此处代码,避免不好理解synchronized (this.requestsWrite) {this.requestsWrite.add(request);}// 通知服务线程已经接收到GroupCommitRequest//FIXME 直接调用父类的this.wakeUp()多好?if (hasNotified.compareAndSet(false, true)) {waitPoint.countDown(); // notify}}private void swapRequests() {// volatile可以保证可见性,requestsWrite写入时加锁了,所以此处无需加锁,通过volatile可以实现低开销的读List<GroupCommitRequest> tmp = this.requestsWrite;this.requestsWrite = this.requestsRead;this.requestsRead = tmp;}private void doCommit() {synchronized (this.requestsRead) {if (!this.requestsRead.isEmpty()) {for (GroupCommitRequest req : this.requestsRead) {// There may be a message in the next file, so a maximum of// two times the flushboolean flushOK = false;/*** 两个MappedFile(写第N个消息时,MappedFile 已满,创建了一个新的),所以需要有循环2次。*/for (int i = 0; i < 2 && !flushOK; i++) {//请求的offset超过已经flushed的offset,则强制刷盘flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();if (!flushOK) {CommitLog.this.mappedFileQueue.flush(0);}}req.wakeupCustomer(flushOK);}long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();if (storeTimestamp > 0) {//更新刷盘检测点StoreCheckpoint中的physicMsg Timestamp//刷盘检测点的刷盘操作将在刷写消息队列文件时触发CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);}this.requestsRead.clear();} else {// Because of individual messages is set to not sync flush, it// will come to this processCommitLog.this.mappedFileQueue.flush(0);}}}public void run() {CommitLog.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {//调用swapRequests=>doCommitthis.waitForRunning(10);this.doCommit();} catch (Exception e) {CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);}}// Under normal circumstances shutdown, wait for the arrival of the// request, and then flushtry {Thread.sleep(10);} catch (InterruptedException e) {CommitLog.log.warn("GroupCommitService Exception, ", e);}//FIXME by jannal 上面没有加锁,这里为啥加锁?synchronized (this) {this.swapRequests();}this.doCommit();CommitLog.log.info(this.getServiceName() + " service end");}@Overrideprotected void onWaitEnd() {this.swapRequests();}@Overridepublic String getServiceName() {return GroupCommitService.class.getSimpleName();}@Overridepublic long getJointime() {return 1000 * 60 * 5;} }
异步刷盘
CommitLog#handleDiskFlush中异步刷盘代码如下。异步刷盘有两种方式
- 开启
transientStorePoolEnable=true
机制则启动CommitRealTimeService
异步刷盘方式。 - 如果没有开启
transientStorePoolEnable=false
,则启动FlushRealTimeService
- CommitRealTimeService在commit成功后,会执行
flushCommitLogService.wakeup();
也就是让FlushRealTimeService将Page Cache中的数据同步至磁盘。
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {//使用MappedByteBuffer,默认策略flushCommitLogService.wakeup(); } else {//异步刷盘,如果开启TransientStorePool,使用写入缓冲区+FileChannelcommitLogService.wakeup(); }
- 开启
异步刷盘流程
CommitRealTimeService
如果
transientStorePoolEnable=true
,Broker会申请一个与CommitLog同样大小的堆外内存,该堆外内存会使用内存锁定(mlock),将其变为常驻内存,避免被操作系统调到swap空间中。- 消息追加到堆外内存
- 提交到内存映射文件中
- 使用flush刷盘
CommitRealTimeService服务线程执行逻辑
- 默认每200ms将ByteBuffer新追加的数据(
新追加的数据=wrotePosition-commitedPosition
)提交到FileChannel中
- 默认每200ms将ByteBuffer新追加的数据(
FlushRealTimeService
- 无论是否开启写入缓冲池,刷盘最终都由
FlushRealTimeService
来执行,CommitRealTimeService
在commit成功后,会执行flushCommitLogService.wakeup();
也就是让FlushRealTimeService将Page Cache中的数据同步至磁盘。 - 将内存(Page Cache)中的数据同步至磁盘(flush)有一些前提条件
- 若当前时间距离上次实际刷盘时间已经超过10S,则会忽略其他所有前提,确定刷盘,这样即使服务器宕机了最多也仅丢失10S的数据,提高了消息队列的可靠性。
- 正常情况下刷盘需要满足持久化数据大于配置的最小页数,默认4,也就是新写入内存中的数据大于或等于16KB(4*4KB)
- 当开启写入缓冲,也就是追加到fileChannel的数据大于或等于16KB
- 未开启写入缓冲则是追加到mappedByteBuffer的数据大于或等于16KB
RocketMQ源码分析(十二)之CommitLog同步与异步刷盘相关推荐
- 【转】ABP源码分析十二:本地化
本文逐个分析ABP中涉及到localization的接口和类,以及他们之间的关系.本地化主要涉及两个方面:一个是语言(Language)的管理,这部分相对简单.另一个是语言对应得本地化资源(Local ...
- 【vue-router源码】十二、useRoute、useRouter、useLink源码分析
[vue-rouer源码]系列文章 [vue-router源码]一.router.install解析 [vue-router源码]二.createWebHistory.createWebHashHis ...
- RocketMQ源码分析之延迟消息
文章目录 前言 一.延迟消息 1.特点 2.使用场景 3.demo 二.发送延迟消息 三.broker端存储延迟消息 四.总结 1.延迟消息工作原理 2.延迟消息在消费者消费重试中的应用 前言 本篇文 ...
- GCC源码分析(十六) — gimple转RTL(pass_expand)(下)
版权声明:本文为CSDN博主「ashimida@」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明. 原文链接:https://blog.csdn.net/lidan1 ...
- Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现(1)
一.综述 HDFS写文件是整个Hadoop中最为复杂的流程之一,它涉及到HDFS中NameNode.DataNode.DFSClient等众多角色的分工与合作. 首先上一段代码,客户端是如何写文件的: ...
- Spring Cloud源码分析(二)Ribbon(续)
因文章长度限制,故分为两篇.上一篇:<Spring Cloud源码分析(二)Ribbon> 负载均衡策略 通过上一篇对Ribbon的源码解读,我们已经对Ribbon实现的负载均衡器以及其中 ...
- Koa源码分析(二) -- co的实现
Abstract 本系列是关于Koa框架的文章,目前关注版本是Koa v1.主要分为以下几个方面: Koa源码分析(一) -- generator Koa源码分析(二) -- co的实现 Koa源码分 ...
- Flume 1.7 源码分析(二)整体架构
Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 Flume 1.7 源码分析(四)从Source写数据到Channe ...
- Cowboy 源码分析(十八)
在上一篇中,我们整理了下cowboy_http_protocol:header/3函数,在文章的末尾留下2个没有讲到的函数,今天,我们先看下cowboy_http_protocol:error_ter ...
最新文章
- 同时上哈佛,还一起一作发Nature!这对95后学霸情侣让人慕了……
- linux运维实战练习-2015年9月13日-9月15日课程作业(练习)安排
- 处理数字_6_NULL值的列的个数
- php 自带过滤和转义函数
- php符号 set,PHP 符号大全
- IDC:大数据——数字化转型时代的大商机
- 香农-范诺算法(Shannon-Fano coding)原理
- python中del怎么用_Python范例中的del关键字
- python3文本文件读取方法_Python3读取文件常用方法实例分析
- 如何查看2020最新版谷歌地球高精度卫星地图(附下载方法)
- 如何用计算机巧记英语词汇,小学英语单词巧记法
- 《日语综合教程》第七册 第六課 自然と人間
- 1182 -- 对决
- 牛客网华为云服务器,把通过牛客网注册的华为云服务器用起来!
- CTFhub命令注入,过滤了cat命令,过滤了空格,过滤目录分隔符,过滤运算符,综合练习
- 在iPad应用中嵌入字体的方法——非人云亦云版
- nginx和tomcat本地部署
- 985、211学校分为哪八个档次?
- 软件网关工业生产设备PLC数据采集转存数据库记录仪IOT gateway
- PNAS:青少年大脑功能连接的保守和破坏性模式变化