设计模式翻译自:https://martinfowler.com/articles/patterns-of-distributed-systems/low-watermark.html

最低水位线(Low-Water Mark)

最低水位线是指在 WAL(Write Ahead Log)预写日志这种设计模式中,标记在这个位置之前的日志可以被丢弃。

问题背景

WAL(Write Ahead Log)预写日志维护了对于存储的每次更新,随着时间不断增长,这个日志文件会变得无限大。Segmented Log 分割日志这种设计模式可以让我们每次只处理一个更小的文件,但是日志如果不清理,会无休止增长以至于硬盘被占满。

解决方案

最低水位线这种设计模式会告诉系统哪一部分的日志可以被删除了,即在最低水位线之前的所有日志可以被清理掉。一般的方式是,程序内有一个线程运行一个定时任务,不断地检查哪一部分的日志可以被清理并且删除这些日志文件。

this.logCleaner = newLogCleaner(config);
this.logCleaner.startup();

这里的 LogCleaner 可以用定时任务实现:

public void startup() {scheduleLogCleaning();
}private void scheduleLogCleaning() {singleThreadedExecutor.schedule(() -> {cleanLogs();}, config.getCleanTaskIntervalMs(), TimeUnit.MILLISECONDS);
}

基于快照的最低水位线实现以及示例

大部分的分布式一致性系统(例如 Zookeeper(ZAB 简化 paxos协议),etcd(raft协议)),都实现了快照机制。在这种机制下,他们的存储引擎会定时的进行全量快照,并且记录下快照对应的日志位置,将这个位置作为最低水位线。

//进行快照
public SnapShot takeSnapshot() {//获取最近的日志idLong snapShotTakenAtLogIndex = wal.getLastLogEntryId();//利用这个日志 id 作为标识,生成快照return new SnapShot(serializeState(kv), snapShotTakenAtLogIndex);
}

当生成了快照并成功存储到了磁盘上,对应的最低水位线将用来清理老的日志:

//根据位置获取这个位置之前的所有日志文件
List<WALSegment> getSegmentsBefore(Long snapshotIndex) {List<WALSegment> markedForDeletion = new ArrayList<>();List<WALSegment> sortedSavedSegments = wal.sortedSavedSegments;for (WALSegment sortedSavedSegment : sortedSavedSegments) {//如果这个日志文件的最新log id 小于快照位置,证明可以被清理掉if (sortedSavedSegment.getLastLogEntryId() < snapshotIndex) {markedForDeletion.add(sortedSavedSegment);}}return markedForDeletion;
}

zookeeper 中的最低水位线实现

定时任务位于DatadirCleanupManagerstart方法:

public void start() {//只启动一次if (PurgeTaskStatus.STARTED == purgeTaskStatus) {LOG.warn("Purge task is already running.");return;}//检查定时间隔有效性if (purgeInterval <= 0) {LOG.info("Purge task is not scheduled.");return;}//启动定时任务timer = new Timer("PurgeTask", true);TimerTask task = new PurgeTask(dataLogDir, snapDir,snapRetainCount);timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));purgeTaskStatus = PurgeTaskStatus.STARTED;
}

核心方法为PurgeTxnLogpurge方法:

public static void purge(File dataDir, File snapDir, int num) throws IOException {//保留的snapshot数量不能超过3if (num < 3) {throw new IllegalArgumentException(COUNT_ERR_MSG);}FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);//统计文件数量List<File> snaps = txnLog.findNValidSnapshots(num);int numSnaps = snaps.size();if (numSnaps > 0) {//利用上一个文件的日志偏移,清理log文件和snapshot文件purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));}
}static void purgeOlderSnapshots(FileTxnSnapLog txnLog, File snapShot) {//名字包括开头的zxid,就是代表了日志位置final long leastZxidToBeRetain = Util.getZxidFromName(snapShot.getName(), PREFIX_SNAPSHOT);final Set<File> retainedTxnLogs = new HashSet<File>();retainedTxnLogs.addAll(Arrays.asList(txnLog.getSnapshotLogs(leastZxidToBeRetain)));class MyFileFilter implements FileFilter {private final String prefix;MyFileFilter(String prefix) {this.prefix = prefix;}public boolean accept(File f) {if (!f.getName().startsWith(prefix + ".")) {return false;}if (retainedTxnLogs.contains(f)) {return false;}long fZxid = Util.getZxidFromName(f.getName(), prefix);//根据文件名称代表的zxid,过滤出要删除的文件return fZxid < leastZxidToBeRetain;}}//筛选出符合条件的 log 文件和 snapshot 文件File[] logs = txnLog.getDataDir().listFiles(new MyFileFilter(PREFIX_LOG));List<File> files = new ArrayList<>();if (logs != null) {files.addAll(Arrays.asList(logs));}File[] snapshots = txnLog.getSnapDir().listFiles(new MyFileFilter(PREFIX_SNAPSHOT));if (snapshots != null) {files.addAll(Arrays.asList(snapshots));}//进行删除for (File f : files) {final String msg = String.format("Removing file: %s\t%s",DateFormat.getDateTimeInstance().format(f.lastModified()),f.getPath());LOG.info(msg);System.out.println(msg);if (!f.delete()) {System.err.println("Failed to remove " + f.getPath());}}}

那么是什么时候 snapshot 呢?查看SyncRequestProcessorrun方法,这个方法时处理请求,处理请求的时候记录操作日志到 log 文件,同时在有需要进行 snapshot 的时候进行 snapshot:

public void run() {try {//避免所有的server都同时进行snapshotresetSnapshotStats();lastFlushTime = Time.currentElapsedTime();while (true) {//获取请求代码省略// 请求操作纪录成功if (!si.isThrottled() && zks.getZKDatabase().append(si)) {//是否需要snapshotif (shouldSnapshot()) {//重置是否需要snapshot判断相关的统计resetSnapshotStats();//另起新文件zks.getZKDatabase().rollLog();//进行snapshot,先获取锁,保证只有一个进行中的snapshotif (!snapThreadMutex.tryAcquire()) {LOG.warn("Too busy to snap, skipping");} else {//异步snapshotnew ZooKeeperThread("Snapshot Thread") {public void run() {try {zks.takeSnapshot();} catch (Exception e) {LOG.warn("Unexpected exception", e);} finally {//释放锁snapThreadMutex.release();}}}.start();}}} //省略其他}} catch (Throwable t) {handleException(this.getName(), t);}
}

resetSnapshotStats()设置随机起始位,避免集群内所有实例同时进行 snapshot:

private void resetSnapshotStats() {//生成随机roll,snapCount(默认100000)randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2);//生成随机size,snapSizeInBytes(默认4GB)randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2));
}

shouldSnapshot()根据启动时设置的随机起始位以及配置,判断是否需要 snapshot

private boolean shouldSnapshot() {//获取日志计数int logCount = zks.getZKDatabase().getTxnCount();//获取大小long logSize = zks.getZKDatabase().getTxnSize();//当日志个数大于snapCount(默认100000)/2 + 随机roll,或者日志大小大于snapSizeInBytes(默认4GB)/2+随机sizereturn (logCount > (snapCount / 2 + randRoll))|| (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize));
}

``

基于时间的最低水位线实现与示例

在某些系统中,日志不是用来更新系统的状态,可以在一段时间之后删除,并且不用考虑任何子系统这个最低水位线之前的是否可以删除。例如,kafka 默认保留 7 天的 log,RocketMQ 默认保留 3 天的 commit log。

RocketMQ中最低水位线实现

DefaultMeesageStoreaddScheduleTask()方法中,定义了清理的定时任务:

private void addScheduleTask() {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {DefaultMessageStore.this.cleanFilesPeriodically();}}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);//忽略其他定时任务
}private void cleanFilesPeriodically() {//清理消息存储文件this.cleanCommitLogService.run();//清理消费队列文件this.cleanConsumeQueueService.run();
}

我们这里只关心清理消息存储文件,即DefaultMessageStoredeleteExpiredFiles方法:

private void deleteExpiredFiles() {int deleteCount = 0;//文件保留时间,就是文件最后一次更新时间到现在的时间间隔,如果超过了这个时间间隔,就认为可以被清理掉了long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();//删除文件的间隔,每次清理可能不止删除一个文件,这个配置指定两个文件删除之间的最小间隔int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();//清理文件时,可能文件被其他线程占用,例如读取消息,这时不能轻易删除//在第一次触发时,记录一个当前时间戳,当与当前时间间隔超过这个配置之后,强制删除int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();//判断是否要删除的时间到了boolean timeup = this.isTimeToDelete();//判断磁盘空间是否还充足boolean spacefull = this.isSpaceToDelete();//是否是手工触发boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;//满足其一,就执行清理if (timeup || spacefull || manualDelete) {if (manualDelete)this.manualDeleteFileSeveralTimes--;boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;fileReservedTime *= 60 * 60 * 1000;//清理文件deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,destroyMapedFileIntervalForcibly, cleanAtOnce);if (deleteCount > 0) {} else if (spacefull) {log.warn("disk space will be full soon, but delete file failed.");}}
}

清理文件的代码MappedFiledeleteExpiredFileByTime方法:

public int deleteExpiredFileByTime(final long expiredTime,final int deleteFilesInterval,final long intervalForcibly,final boolean cleanImmediately) {Object[] mfs = this.copyMappedFiles(0);if (null == mfs)return 0;//刨除最新的那个文件int mfsLength = mfs.length - 1;int deleteCount = 0;List<MappedFile> files = new ArrayList<MappedFile>();if (null != mfs) {for (int i = 0; i < mfsLength; i++) {MappedFile mappedFile = (MappedFile) mfs[i];long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;//如果超过了过期时间,或者需要立即清理if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {//关闭,清理并删除文件if (mappedFile.destroy(intervalForcibly)) {files.add(mappedFile);deleteCount++;if (files.size() >= DELETE_FILES_BATCH_MAX) {break;}//如果配置了删除文件时间间隔,则需要等待if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {try {Thread.sleep(deleteFilesInterval);} catch (InterruptedException e) {}}} else {break;}} else {//avoid deleting files in the middlebreak;}}}//从文件列表里面里将本次删除的文件剔除deleteExpiredFile(files);return deleteCount;
}

分布式系统设计模式 - 最低水位线(Low-Water Mark)相关推荐

  1. 请问你知道分布式系统设计模式的最低水位线思想么?

    最低水位线(Low-Water Mark) 最低水位线是指在 WAL(Write Ahead Log)预写日志这种设计模式中,标记在这个位置之前的日志可以被丢弃. 问题背景 WAL(Write Ahe ...

  2. Oracle 高水位(HWM: High Water Mark) 说明

    一. 准备知识:ORACLE的逻辑存储管理.        ORACLE在逻辑存储上分4个粒度: 表空间, 段, 区 和 块.        1.1 块: 是粒度最小的存储单位,现在标准的块大小是8K ...

  3. oracle water,对于Oracle High Water Mark(HWM)的理解

    在网上看到不少关于Oracle High Water Mark(HWM)的理解,但是都弄的不太清楚,而且有些说法是不对的,所以还是逼迫自己写篇文章来学习一下,如有不恰当之处请指教,呵呵~ 先看看官方对 ...

  4. oracle hwm 查询,Oracle HWM( High Water Mark)

    Oracle HWM( High Water Mark) 1.什么是HWM 注意:此部分内容请先了解oracle物理结构和逻辑结构 顾名思义,这是一条水位线,oracle的每一个对象都是一个segem ...

  5. zeromq: hwm; high water mark: 高水位线

    文章目录 参考 描述 HWM是怎么触发线程block? How does the HWM (high water mark) work with any socket type? 参考 libzmq/ ...

  6. 分布式系统设计模式,你用过哪些?

    1.布隆过滤器 Bloom过滤器是一种节省空间的概率数据结构,用于测试元素是否为某集合的成员.它用于我们只需要检查元素是否属于对象的场景. 在BigTable(和Cassandra)中,任何读取操作都 ...

  7. 19种分布式系统设计模式

    涉及与 分布式系统 相关的常见设计问题的关键模式: 1. 布隆过滤器 布隆过滤器是一种节省空间的概率数据结构,用于 测试元素是否是集合的成员 .它用于我们只需要知道元素是否属于它应该所在的地方(缓存) ...

  8. 【计算广告】在线分配算法之 —— HWM(High water mark)介绍

    该算法是雅虎工程师提出的一个解决合约制广告或者说GD(担保式投放)投放系统在线分配问题的贪心算法,思路很直接,下面是本人对照其论文整理的思路,里面有自己的理解. 论文题目:Ad Serving Usi ...

  9. C++设计模式实践——线上购物系统

    项目源码:https://github.com/Pistachiout/DesignPattern 一.系统的主要目标与功能   在本次设计中,考虑到目前疫情反复不断,为了方便群众,超市都推出在线购物 ...

最新文章

  1. 电商搜索能力解读-实体识别(NER)
  2. java rc4_nodejs 和 java 进行 rc4 加密得到的结果不一样
  3. 谈谈Tensorflow的Batch Normalization
  4. python3 x和python2 x区别_Python3.x和Python2.x的区别(转存参考)
  5. C# WindowService 动态修改服务名
  6. Google Chrome 势要消灭不安全的非 HTTPS 页面!
  7. webService调用模式比较
  8. JavaSE总结(适合Java期末考试复习,JavaSE部分知识回顾)
  9. 华为热设计工程师待遇_【华为热设计工程师面试】华为热设计工程师大家要慎重考虑。-看准网...
  10. java 获取当天的0点和24点
  11. Python实现大文本文件分割成多个小文件
  12. 设计模式——访问者模式
  13. 电脑端bilibili视频缓存合并视频的解惑
  14. ps奥顿柔焦效果+提取线稿
  15. idea 2019.2顶部菜单栏隐藏的恢复办法
  16. 购买完域名之后能干什么事儿?
  17. java增删改查 jsp生成_jsp+servlet实现最简单的增删改查代码分享
  18. python快递费用计算_Python实现快递查询
  19. Linux内核和用户空间通信的方法
  20. 华为鸿蒙系统手表,鸿蒙2.0系统发布!年底适配最新华为旗舰,系统比安卓还要好?...

热门文章

  1. eeprom与pcf859
  2. 深度学习 数码管_创新研发基于深度学习的可见光智能检测技术
  3. linux编辑conf,Linux:我如何编辑resolv.conf
  4. 微型计算机原理中jge,微机原理第三章课件.ppt
  5. 评价数据离散度方法(转)
  6. 望尽天涯路--从理财角度看高可用
  7. 【CCAI 2016】人工智能青年论坛:论青年正确拥抱AI的姿势
  8. 项目管理- 项目阶段划分
  9. AndroidStudio中虚拟机的联网问题
  10. 数学和编程到底是什么关系?