最低水位线(Low-Water Mark)

最低水位线是指在 WAL(Write Ahead Log)预写日志这种设计模式中,标记在这个位置之前的日志可以被丢弃。

问题背景

WAL(Write Ahead Log)预写日志维护了对于存储的每次更新,随着时间不断增长,这个日志文件会变得无限大。Segmented Log 分割日志这种设计模式可以让我们每次只处理一个更小的文件,但是日志如果不清理,会无休止增长以至于硬盘被占满。

解决方案

最低水位线这种设计模式会告诉系统哪一部分的日志可以被删除了,即在最低水位线之前的所有日志可以被清理掉。一般的方式是,程序内有一个线程运行一个定时任务,不断地检查哪一部分的日志可以被清理并且删除这些日志文件。

this.logCleaner = newLogCleaner(config);
this.logCleaner.startup();

这里的 LogCleaner 可以用定时任务实现:

public void startup() {scheduleLogCleaning();
}private void scheduleLogCleaning() {singleThreadedExecutor.schedule(() -> {cleanLogs();}, config.getCleanTaskIntervalMs(), TimeUnit.MILLISECONDS);
}

基于快照的最低水位线实现以及示例

大部分的分布式一致性系统(例如 Zookeeper(ZAB 简化 paxos协议),etcd(raft协议)),都实现了快照机制。在这种机制下,他们的存储引擎会定时的进行全量快照,并且记录下快照对应的日志位置,将这个位置作为最低水位线。

//进行快照
public SnapShot takeSnapshot() {//获取最近的日志idLong snapShotTakenAtLogIndex = wal.getLastLogEntryId();//利用这个日志 id 作为标识,生成快照return new SnapShot(serializeState(kv), snapShotTakenAtLogIndex);
}

当生成了快照并成功存储到了磁盘上,对应的最低水位线将用来清理老的日志:

//根据位置获取这个位置之前的所有日志文件
List<WALSegment> getSegmentsBefore(Long snapshotIndex) {List<WALSegment> markedForDeletion = new ArrayList<>();List<WALSegment> sortedSavedSegments = wal.sortedSavedSegments;for (WALSegment sortedSavedSegment : sortedSavedSegments) {//如果这个日志文件的最新log id 小于快照位置,证明可以被清理掉if (sortedSavedSegment.getLastLogEntryId() < snapshotIndex) {markedForDeletion.add(sortedSavedSegment);}}return markedForDeletion;
}

zookeeper 中的最低水位线实现

定时任务位于DatadirCleanupManagerstart方法:

public void start() {//只启动一次if (PurgeTaskStatus.STARTED == purgeTaskStatus) {LOG.warn("Purge task is already running.");return;}//检查定时间隔有效性if (purgeInterval <= 0) {LOG.info("Purge task is not scheduled.");return;}//启动定时任务timer = new Timer("PurgeTask", true);TimerTask task = new PurgeTask(dataLogDir, snapDir,snapRetainCount);timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));purgeTaskStatus = PurgeTaskStatus.STARTED;
}

核心方法为PurgeTxnLogpurge方法:

public static void purge(File dataDir, File snapDir, int num) throws IOException {//保留的snapshot数量不能超过3if (num < 3) {throw new IllegalArgumentException(COUNT_ERR_MSG);}FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);//统计文件数量List<File> snaps = txnLog.findNValidSnapshots(num);int numSnaps = snaps.size();if (numSnaps > 0) {//利用上一个文件的日志偏移,清理log文件和snapshot文件purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));}
}static void purgeOlderSnapshots(FileTxnSnapLog txnLog, File snapShot) {//名字包括开头的zxid,就是代表了日志位置final long leastZxidToBeRetain = Util.getZxidFromName(snapShot.getName(), PREFIX_SNAPSHOT);final Set<File> retainedTxnLogs = new HashSet<File>();retainedTxnLogs.addAll(Arrays.asList(txnLog.getSnapshotLogs(leastZxidToBeRetain)));class MyFileFilter implements FileFilter {private final String prefix;MyFileFilter(String prefix) {this.prefix = prefix;}public boolean accept(File f) {if (!f.getName().startsWith(prefix + ".")) {return false;}if (retainedTxnLogs.contains(f)) {return false;}long fZxid = Util.getZxidFromName(f.getName(), prefix);//根据文件名称代表的zxid,过滤出要删除的文件return fZxid < leastZxidToBeRetain;}}//筛选出符合条件的 log 文件和 snapshot 文件File[] logs = txnLog.getDataDir().listFiles(new MyFileFilter(PREFIX_LOG));List<File> files = new ArrayList<>();if (logs != null) {files.addAll(Arrays.asList(logs));}File[] snapshots = txnLog.getSnapDir().listFiles(new MyFileFilter(PREFIX_SNAPSHOT));if (snapshots != null) {files.addAll(Arrays.asList(snapshots));}//进行删除for (File f : files) {final String msg = String.format("Removing file: %s\t%s",DateFormat.getDateTimeInstance().format(f.lastModified()),f.getPath());LOG.info(msg);System.out.println(msg);if (!f.delete()) {System.err.println("Failed to remove " + f.getPath());}}}

那么是什么时候 snapshot 呢?查看SyncRequestProcessorrun方法,这个方法时处理请求,处理请求的时候记录操作日志到 log 文件,同时在有需要进行 snapshot 的时候进行 snapshot:

public void run() {try {//避免所有的server都同时进行snapshotresetSnapshotStats();lastFlushTime = Time.currentElapsedTime();while (true) {//获取请求代码省略// 请求操作纪录成功if (!si.isThrottled() && zks.getZKDatabase().append(si)) {//是否需要snapshotif (shouldSnapshot()) {//重置是否需要snapshot判断相关的统计resetSnapshotStats();//另起新文件zks.getZKDatabase().rollLog();//进行snapshot,先获取锁,保证只有一个进行中的snapshotif (!snapThreadMutex.tryAcquire()) {LOG.warn("Too busy to snap, skipping");} else {//异步snapshotnew ZooKeeperThread("Snapshot Thread") {public void run() {try {zks.takeSnapshot();} catch (Exception e) {LOG.warn("Unexpected exception", e);} finally {//释放锁snapThreadMutex.release();}}}.start();}}} //省略其他}} catch (Throwable t) {handleException(this.getName(), t);}
}

resetSnapshotStats()设置随机起始位,避免集群内所有实例同时进行 snapshot:

private void resetSnapshotStats() {//生成随机roll,snapCount(默认100000)randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2);//生成随机size,snapSizeInBytes(默认4GB)randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2));
}

shouldSnapshot()根据启动时设置的随机起始位以及配置,判断是否需要 snapshot

private boolean shouldSnapshot() {//获取日志计数int logCount = zks.getZKDatabase().getTxnCount();//获取大小long logSize = zks.getZKDatabase().getTxnSize();//当日志个数大于snapCount(默认100000)/2 + 随机roll,或者日志大小大于snapSizeInBytes(默认4GB)/2+随机sizereturn (logCount > (snapCount / 2 + randRoll))|| (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize));
}

``

基于时间的最低水位线实现与示例

在某些系统中,日志不是用来更新系统的状态,可以在一段时间之后删除,并且不用考虑任何子系统这个最低水位线之前的是否可以删除。例如,kafka 默认保留 7 天的 log,RocketMQ 默认保留 3 天的 commit log。

RocketMQ中最低水位线实现

DefaultMeesageStoreaddScheduleTask()方法中,定义了清理的定时任务:

private void addScheduleTask() {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {DefaultMessageStore.this.cleanFilesPeriodically();}}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);//忽略其他定时任务
}private void cleanFilesPeriodically() {//清理消息存储文件this.cleanCommitLogService.run();//清理消费队列文件this.cleanConsumeQueueService.run();
}

我们这里只关心清理消息存储文件,即DefaultMessageStoredeleteExpiredFiles方法:

private void deleteExpiredFiles() {int deleteCount = 0;//文件保留时间,就是文件最后一次更新时间到现在的时间间隔,如果超过了这个时间间隔,就认为可以被清理掉了long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();//删除文件的间隔,每次清理可能不止删除一个文件,这个配置指定两个文件删除之间的最小间隔int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();//清理文件时,可能文件被其他线程占用,例如读取消息,这时不能轻易删除//在第一次触发时,记录一个当前时间戳,当与当前时间间隔超过这个配置之后,强制删除int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();//判断是否要删除的时间到了boolean timeup = this.isTimeToDelete();//判断磁盘空间是否还充足boolean spacefull = this.isSpaceToDelete();//是否是手工触发boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;//满足其一,就执行清理if (timeup || spacefull || manualDelete) {if (manualDelete)this.manualDeleteFileSeveralTimes--;boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;fileReservedTime *= 60 * 60 * 1000;//清理文件deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,destroyMapedFileIntervalForcibly, cleanAtOnce);if (deleteCount > 0) {} else if (spacefull) {log.warn("disk space will be full soon, but delete file failed.");}}
}

清理文件的代码MappedFiledeleteExpiredFileByTime方法:

public int deleteExpiredFileByTime(final long expiredTime,final int deleteFilesInterval,final long intervalForcibly,final boolean cleanImmediately) {Object[] mfs = this.copyMappedFiles(0);if (null == mfs)return 0;//刨除最新的那个文件int mfsLength = mfs.length - 1;int deleteCount = 0;List<MappedFile> files = new ArrayList<MappedFile>();if (null != mfs) {for (int i = 0; i < mfsLength; i++) {MappedFile mappedFile = (MappedFile) mfs[i];long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;//如果超过了过期时间,或者需要立即清理if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {//关闭,清理并删除文件if (mappedFile.destroy(intervalForcibly)) {files.add(mappedFile);deleteCount++;if (files.size() >= DELETE_FILES_BATCH_MAX) {break;}//如果配置了删除文件时间间隔,则需要等待if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {try {Thread.sleep(deleteFilesInterval);} catch (InterruptedException e) {}}} else {break;}} else {//avoid deleting files in the middlebreak;}}}//从文件列表里面里将本次删除的文件剔除deleteExpiredFile(files);return deleteCount;
}

每日一刷,轻松提升技术,斩获各种offer:

请问你知道分布式系统设计模式的最低水位线思想么?相关推荐

  1. 分布式系统设计模式 - 最低水位线(Low-Water Mark)

    设计模式翻译自:https://martinfowler.com/articles/patterns-of-distributed-systems/low-watermark.html 最低水位线(L ...

  2. 分布式设计模式中的Quorum思想

    本文来说下分布式设计模式中的Quorum思想 文章目录 有效个数(Quorum) 问题背景 解决方案 如何设计集群个数 实现举例 Zookeeper 的两阶段提交 + 半数以上写入机制 Elastic ...

  3. 各种设计模式对比及编程思想总结

    各种设计模式对比及编程思想总结: 设计模式 一句话归纳 工厂模式(Factory) 只对结果负责,不要三无产品 单例模式(Singleton) 保证独一无二 适配器模式(Adapter) 需要一个转换 ...

  4. python不同时间周期k线_请问期货不同时间级别的k线呈现相反形态怎么判断买卖点?...

    题主的意思我用三角形态说说看. 日线级别上三角形态收敛,5分钟级别上K线是三角形态突破.这时候你就该怎么判断买卖点? 其实这个问题,就是个大小周期共振的问题. 我们一般以大周期的形态作为买卖方向的确定 ...

  5. 分布式系统设计模式,你用过哪些?

    1.布隆过滤器 Bloom过滤器是一种节省空间的概率数据结构,用于测试元素是否为某集合的成员.它用于我们只需要检查元素是否属于对象的场景. 在BigTable(和Cassandra)中,任何读取操作都 ...

  6. 19种分布式系统设计模式

    涉及与 分布式系统 相关的常见设计问题的关键模式: 1. 布隆过滤器 布隆过滤器是一种节省空间的概率数据结构,用于 测试元素是否是集合的成员 .它用于我们只需要知道元素是否属于它应该所在的地方(缓存) ...

  7. 游戏设计模式——面向数据编程思想

    前言:随着软件需求的日益复杂发展,远古时期面的向过程编程思想才渐渐萌生了面向对象编程思想. 当人们发现面向对象在应对高层软件的种种好处时,越来越沉醉于面向对象,热衷于研究如何更加优雅地抽象出对象. 然 ...

  8. 四阶龙格库塔法的基本思想_请问用四阶龙格库塔法解二阶微分方程的思想是什么?...

    默认y的单位是弧度 k=1000; t=0:0.001:1; Y=[]; err=1 K=[]; Ymax=[]; xishu=1.01; while err X=[0 0]; k=xishu*k; ...

  9. C语言编程的准则、设计模式、软件架构及思想

    文章目录 1. 概念 面向过程 面向对象 面向接口编程 模块化编程 软件分层架构 可重用性 2. 区别和联系 3. 项目代码架构思路 在文章开始之前,我们先明确一些概念,当然有些概念本身就没有统一权威 ...

最新文章

  1. ECS 实例网络带宽
  2. leetcode算法题--打印从1到最大的n位数
  3. Auto_ml与TPOT的区别
  4. 牛客网_PAT乙级_1017打印沙漏(20)
  5. mysql 闪回_MySQL数据误删除的快速解决方法(MySQL闪回工具)
  6. java 用面向接口编程的方式开发打印机_Java“打印机”模型理解面向接口编程。实现接口定义类,接口实现类,核心“业务”类分离...
  7. 计算(a+b)*c的值
  8. ModuleNotFoundError: No module named ‘pycocotools‘
  9. 10硬盘锁怎么解除_鉴定二手iPad是否拼装,扩容,隐藏ID锁的详细方法
  10. 脉冲宽度调制pdm_两个相同Vpp驱动的 PAM4 MZ调制器
  11. 4x4矩阵键盘c语言,4X4键盘矩阵键盘程序
  12. linux shell脚本 定义变量,Shell脚本应用 - 编制shell脚本、shell变量
  13. C4D新建立方体对象不显示?
  14. Python实现英文词频统计:以hamlet为例
  15. 【毕业设计】基于树莓派的指纹识别考勤系统 - 单片机 嵌入式 物联网
  16. 快速实现中文翻译多国语言
  17. 企业如何进行数字化管理?
  18. 租房心经--教你如何租房子
  19. Java matlab车牌识别,车牌识别matlab实现(蓝色车牌和新能源车牌)
  20. 应届毕业生面试软件测试...测试小白如何抓住机会

热门文章

  1. Tolua使用笔记一:开始使用Tolua的准备工作与lua文件读取方法
  2. 01-初识sketch-sketch优势
  3. 网络丢包问题的原因及解决办法
  4. 代数合并同类项计算机步骤,代数式(合并同类项)
  5. Docker 容器技术入门
  6. c#创建word 表格垂直居中
  7. 关于QQ的相关代码收集整理
  8. overflow:hidden属性
  9. 【算法】判断一个点是否在多边形之内
  10. Axon 4.4 中文版文档(十四)