文章目录

  • 日志结构
  • 读写流程
    • 写入
    • 读取
  • 崩溃恢复

当向 LevelDB 写入数据时,只需要将数据写入内存中的 MemTable,而由于内存是易失性存储,因此 LevelDB 需要一个额外的持久化文件:预写日志(Write-Ahead Log,WAL),又称重做日志。这是一个追加修改、顺序写入磁盘的文件。当宕机或者程序崩溃时 WAL 能够保证写入成功的数据不会丢失。将 MemTable 成功写入 SSTable 后,相应的预写日志就可以删除了。

日志结构

Log 文件以块为基本单位,一条记录可能全部写到一个块上,也可能跨几个块。记录的格式如下图所示:

Log记录格式

首先我们来看看 Log 中的数据格式,代码如下:

// https://github.com/google/leveldb/blob/master/db/log_format.hnamespace log {enum RecordType {// Zero is reserved for preallocated fileskZeroType = 0,kFullType = 1,// For fragmentskFirstType = 2,kMiddleType = 3,kLastType = 4};static const int kMaxRecordType = kLastType;static const int kBlockSize = 32768;// Header is checksum (4 bytes), length (2 bytes), type (1 byte).static const int kHeaderSize = 4 + 2 + 1;}  // namespace log

结合上面的代码和图片,我们可以看到每一个块大小为 32768 字节,并且每一个块由头部和正文组成。头部由 4 字节校验,2 字节的长度与 1 字节的类型构成,即每一个块的开始 7 字节属于头部。头部中的类型字段有如下 4 种:

  • kZeroType:为预分配的文件保留。
  • kFullType:表示一条记录完整地写到了一个块上。
  • kFirstType:表示该条记录的第一部分。
  • kMiddleType:表示该条记录的中间部分。
  • kLastType:表示该条记录的最后一部分。

通过记录结构可以推测出 Log 文件的读取流程,即首先根据头部的长度字段确定需要读取多少字节,然后根据头部类型字段确定该条记录是否已经完整读取,如果没有完整读取,继续按该流程进行,直到读取到记录的最后一部分,其头部类型为 kLastType。

读写流程

写入

Log 的读取主要由 Writer 中的 AddRecord 实现,代码如下:

// https://github.com/google/leveldb/blob/master/db/log_writer.hclass Writer {public:explicit Writer(WritableFile* dest);Writer(WritableFile* dest, uint64_t dest_length);Writer(const Writer&) = delete;Writer& operator=(const Writer&) = delete;~Writer();Status AddRecord(const Slice& slice);private:Status EmitPhysicalRecord(RecordType type, const char* ptr, size_t length);WritableFile* dest_;int block_offset_;  // Current offset in blockuint32_t type_crc_[kMaxRecordType + 1];
};// https://github.com/google/leveldb/blob/master/db/log_writer.ccStatus Writer::AddRecord(const Slice& slice) {const char* ptr = slice.data();size_t left = slice.size();Status s;//begin表明本条记录是第一次写入,即当前块中第一条记录bool begin = true;do {//当前块剩余空间,用于判断头部能否完整写入const int leftover = kBlockSize - block_offset_;assert(leftover >= 0);if (leftover < kHeaderSize) {//如果块剩余空间小于七个字节且不等于0,说明当前无法完整写入数据,此时填充\x00,从下一个块写入if (leftover > 0) {static_assert(kHeaderSize == 7, "");dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));}//此时块正好写满,将block_offset_置为0,表明开始写入新的块block_offset_ = 0;}assert(kBlockSize - block_offset_ - kHeaderSize >= 0);//计算块剩余空间const size_t avail = kBlockSize - block_offset_ - kHeaderSize;//计算当前块能够写入的数据大小(块剩余空间和记录剩余内容中最小的)const size_t fragment_length = (left < avail) ? left : avail;RecordType type;//end表明该记录是否已经完整写入,即最后一条记录const bool end = (left == fragment_length);//根据begin与end来确定记录类型if (begin && end) {//记录为第一条且同时又是最后一条,说明当前是完整的记录,状态为kFullTypetype = kFullType;} else if (begin) {//记录为第一条,状态为kFirstTypetype = kFirstType;} else if (end) {//记录为最后一条,标记状态为kLastTypetype = kLastType;} else {//记录不为第一条,也并非最后一条,则说明是中间状态,标记为kMiddleTypetype = kMiddleType;}//将数据按照格式写入,并刷新到磁盘文件中s = EmitPhysicalRecord(type, ptr, fragment_length);ptr += fragment_length;left -= fragment_length;begin = false;} while (s.ok() && left > 0);//循环至数据完全写入或者写入失败时才停止return s;
}

写入流程如下:

  1. 判断头部能否完整写入,如果不能则将剩余空间用 \x00 填充,接着从新的块开始写入。
  2. 根据 begin 和 end 判断记录类型。
  3. 将数据按照格式写入,并刷新到磁盘文件中。
  4. 循环至数据完全写入或者写入失败后停止,将结果返回。

读取

Log 的读取主要由 Reader 中的 ReadRecord 实现,代码如下:

// https://github.com/google/leveldb/blob/master/db/log_reader.hclass Reader {public:// Interface for reporting errors.class Reporter {public:virtual ~Reporter();virtual void Corruption(size_t bytes, const Status& status) = 0;};Reader(SequentialFile* file, Reporter* reporter, bool checksum,uint64_t initial_offset);Reader(const Reader&) = delete;Reader& operator=(const Reader&) = delete;~Reader();bool ReadRecord(Slice* record, std::string* scratch);uint64_t LastRecordOffset();private:enum {kEof = kMaxRecordType + 1,kBadRecord = kMaxRecordType + 2};
};// https://github.com/google/leveldb/blob/master/db/log_reader.ccbool Reader::ReadRecord(Slice* record, std::string* scratch) {if (last_record_offset_ < initial_offset_) {if (!SkipToInitialBlock()) {return false;}}scratch->clear();record->clear();bool in_fragmented_record = false;uint64_t prospective_record_offset = 0;Slice fragment;while (true) {//ReadPhysicalRecord读取log文件并将记录保存到fragment,同时返回记录的类型const unsigned int record_type = ReadPhysicalRecord(&fragment);uint64_t physical_record_offset =end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();if (resyncing_) {if (record_type == kMiddleType) {continue;} else if (record_type == kLastType) {resyncing_ = false;continue;} else {resyncing_ = false;}}//根据记录的类型来判断是否需要将当前记录附加到scratch后并继续读取switch (record_type) {//类型为kFullType则说明当前是完整的记录,直接赋值给record后返回case kFullType:if (in_fragmented_record) {if (!scratch->empty()) {ReportCorruption(scratch->size(), "partial record without end(1)");}}prospective_record_offset = physical_record_offset;scratch->clear();*record = fragment;last_record_offset_ = prospective_record_offset;return true;//类型为kFirstType则说明当前是第一部分,先将记录复制到scratch后继续读取case kFirstType:if (in_fragmented_record) {if (!scratch->empty()) {ReportCorruption(scratch->size(), "partial record without end(2)");}}prospective_record_offset = physical_record_offset;scratch->assign(fragment.data(), fragment.size());in_fragmented_record = true;break;//类型为kMiddleType则说明当前是中间部分,先将记录追加到scratch后继续读取case kMiddleType://初始读取到的类型为kMiddleType或者kLastType,则需要忽略并且继续偏移if (!in_fragmented_record) {ReportCorruption(fragment.size(),"missing start of fragmented record(1)");} else {scratch->append(fragment.data(), fragment.size());}break;//类型为kLastType则说明当前为最后,继续追加到scratch,并将scratch赋值给record并返回case kLastType:if (!in_fragmented_record) {ReportCorruption(fragment.size(),"missing start of fragmented record(2)");} else {scratch->append(fragment.data(), fragment.size());*record = Slice(*scratch);last_record_offset_ = prospective_record_offset;return true;}break;//如果状态为kEof、kBadRecord时说明日志损坏,此时清空scratch并返回falsecase kEof:if (in_fragmented_record) {scratch->clear();}return false;case kBadRecord:if (in_fragmented_record) {ReportCorruption(scratch->size(), "error in middle of record");in_fragmented_record = false;scratch->clear();}break;//未定义的类型,输出日志,剩余同上处理default: {char buf[40];std::snprintf(buf, sizeof(buf), "unknown record type %u", record_type);ReportCorruption((fragment.size() + (in_fragmented_record ? scratch->size() : 0)),buf);in_fragmented_record = false;scratch->clear();break;}}}return false;
}

执行流程如下:

  1. ReadRecord 读取一条记录到 fragment 变量中,并且返回该条记录的类型。
  2. 根据记录的类型来判断是否需要将当前记录附加到 scratch 后并继续读取:
    • kFullType:当前是完整的记录,直接赋值给 record 后返回。
    • kFirstType:当前是第一部分,先将记录覆盖到 scratch 后继续读取。
    • kMiddleType:当前是中间部分,先将记录追加到 scratch 后继续读取。
    • kLastType:当前为最后部分,继续追加到 scratch,并将完整的 scratch 赋值给 record 后返回。
    • 其它/异常:清空 scratch 并返回 false,如果是未定义类型需要输出日志。

这里还有一个需要注意的细节,由于读取 Log 文件时可以从指定偏移量开始,所以如果初始读取到的类型为 kMiddleType 或者 kLastType,则需要忽略并且继续偏移,直到碰见第一个 kFirstType。

崩溃恢复

当打开一个 LevelDB 的数据文件时,需先检验是否进行崩溃恢复,如果需要,则会从 Log 文件生成一个MemTable,其实现代码如下:

// https://github.com/google/leveldb/blob/master/db/db_impl.ccStatus DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,bool* save_manifest, VersionEdit* edit,SequenceNumber* max_sequence) {struct LogReporter : public log::Reader::Reporter {Env* env;Logger* info_log;const char* fname;Status* status;  // null if options_.paranoid_checks==falsevoid Corruption(size_t bytes, const Status& s) override {Log(info_log, "%s%s: dropping %d bytes; %s",(this->status == nullptr ? "(ignoring error) " : ""), fname,static_cast<int>(bytes), s.ToString().c_str());if (this->status != nullptr && this->status->ok()) *this->status = s;}};mutex_.AssertHeld();//打开log文件std::string fname = LogFileName(dbname_, log_number);SequentialFile* file;Status status = env_->NewSequentialFile(fname, &file);if (!status.ok()) {MaybeIgnoreError(&status);return status;}//创建log reader.LogReporter reporter;reporter.env = env_;reporter.info_log = options_.info_log;reporter.fname = fname.c_str();reporter.status = (options_.paranoid_checks ? &status : nullptr);log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/);Log(options_.info_log, "Recovering log #%llu",(unsigned long long)log_number);//读取所有的records并写入一个memtablestd::string scratch;Slice record;WriteBatch batch;int compactions = 0;MemTable* mem = nullptr;//循环读取日志文件while (reader.ReadRecord(&record, &scratch) && status.ok()) {if (record.size() < 12) {reporter.Corruption(record.size(),Status::Corruption("log record too small"));continue;}WriteBatchInternal::SetContents(&batch, record);if (mem == nullptr) {mem = new MemTable(internal_comparator_);mem->Ref();}//将records写入memtablestatus = WriteBatchInternal::InsertInto(&batch, mem);MaybeIgnoreError(&status);if (!status.ok()) {break;}const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +WriteBatchInternal::Count(&batch) - 1;if (last_seq > *max_sequence) {*max_sequence = last_seq;}//如果memtable大于阈值,则将其转换成sstable(默认4MB)   if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {compactions++;*save_manifest = true;status = WriteLevel0Table(mem, edit, nullptr);mem->Unref();mem = nullptr;if (!status.ok()) {break;}}}delete file;//判断是否应该继续重复使用最后一个日志文件if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {assert(logfile_ == nullptr);assert(log_ == nullptr);assert(mem_ == nullptr);uint64_t lfile_size;if (env_->GetFileSize(fname, &lfile_size).ok() &&env_->NewAppendableFile(fname, &logfile_).ok()) {Log(options_.info_log, "Reusing old log %s \n", fname.c_str());log_ = new log::Writer(logfile_, lfile_size);logfile_number_ = log_number;if (mem != nullptr) {mem_ = mem;mem = nullptr;} else {mem_ = new MemTable(internal_comparator_);mem_->Ref();}}}if (mem != nullptr) {if (status.ok()) {*save_manifest = true;status = WriteLevel0Table(mem, edit, nullptr);}mem->Unref();}return status;
}

具体的逻辑如下:

  1. 打开 log,创建 log reader 开始读取数据。
  2. 循环读取日志文件,并将其写入 MemTable 中。
  3. 如果 MemTable 过大,则将其转换为 SSTable。

LevelDB 源码剖析(六)WAL模块:LOG 结构、读写流程、崩溃恢复相关推荐

  1. 【Redis源码剖析】 - Redis内置数据结构之压缩列表ziplist

    在前面的一篇文章[Redis源码剖析] - Redis内置数据结构之双向链表中,我们介绍了Redis封装的一种"传统"双向链表list,分别使用prev.next指针来指向当前节点 ...

  2. 阿里中间件seata源码剖析六:TCC模式中2阶段提交实现

    目录 TM通知TC事务状态 TC通知RM分支事务提交 RM处理TC提交事务请求 总结 上篇文章中,我们以TCC模式的demo为例,讲解了seata中全局事务的开启.在这个demo中,TM作为一个全局事 ...

  3. LevelDB 源码剖析(一)准备工作:环境搭建、接口使用、常用优化

    文章目录 环境搭建 实战使用 创建.关闭数据库 数据读.写.删除 批量处理 迭代器遍历 常用优化方案 压缩 缓存 过滤器 命名 环境搭建 # 下载源码 git clone https://github ...

  4. SpringMVC源码剖析(三)- DispatcherServlet的初始化流程

    我们启动web服务器,在浏览器中输入地址,就可以看到浏览器上输出我们写好的页面.为了更好的理解上面这个过程,你需要学习关于Servlet生命周期的三个阶段,就是所谓的"init-servic ...

  5. LevelDB 源码剖析(九)DBImpl模块:Open、Get、Put、Delete、Write

    文章目录 Open Get Put.Delete.Write Open 数据库 Open 操作主要用于创建新的 LevelDB 数据库或打开一个已存在的数据库.Open 操作的主要函数共需传递 3 个 ...

  6. frp源码剖析-frp中的log模块

    前言&引入 一个好的log模块可以帮助我们排错,分析,统计 一般来说log中需要有时间.栈信息(比如说文件名行号等),这些东西一般某些底层log模块已经帮我们做好了.但在业务中还有很多我们需要 ...

  7. LevelDB 源码剖析(八)Compaction模块:Minor Compaction、Major Compaction、文件选取、执行流程、垃圾回收

    文章目录 结构 Minor Compaction 定义 触发时机 核心要点 Major Compaction 定义 触发时机 Size Compaction Seek Compaction Manua ...

  8. Apache Flink fault tolerance源码剖析(六)

    上篇文章我们分析了基于检查点的用户状态的保存机制--状态终端.这篇文章我们来分析barrier(中文常译为栅栏或者屏障,为了避免引入名称争议,此处仍用英文表示).检查点的barrier是提供exact ...

  9. LevelDB 源码剖析(三)公共基础:内存管理、数值编码、Env家族、文件操作

    文章目录 内存管理 Arena 结构 内存分配 内存使用率统计 TCMalloc Env家族 PosixEnv EnvWrapper InMemoryEnv 文件操作 SequentialFile W ...

最新文章

  1. css 层叠式样式表(2)
  2. 查询在应用程序运行得很慢, 但在SSMS运行得很快的原因探究
  3. 学界 | 进化算法可以不再需要计算集群,开普敦大学的新方法用一块GPU也能刷新MNIST记录
  4. 遗传算法(GA)中的编码方式-二进制编码、格雷编码、实数编码
  5. 康托展开式---我排第几+逆康托展开
  6. OpenCV周期性除噪滤波器
  7. 应用的生命周期各个程序运行状态时代理的回调
  8. ASP.NET Core 双因素验证2FA 实战经验分享
  9. Java总结:SpringBoot的使用cmd命令进行Gradle构建
  10. OpenShift 4 Tekton - 用Webhook实现CI/CD
  11. BCHN近期收到一笔1000 BCH的匿名捐款
  12. GCC Command Options
  13. 对抗神经网络对抗攻击_您将如何检测针对神经网络的网络攻击?
  14. Android App通过应用宝获取版本信息,从而跳转到不同应用商店更新APP
  15. 不要说话 -- 陈奕迅/小柯
  16. 新东方王强的一篇精彩演讲
  17. HanLP中文分词、人名识别、地名识别
  18. 【5G核心网】 5GC核心网之网元PCF
  19. 队伍不好带!周鸿祎要拆分360业务 鼓励内部创业
  20. 医疗器械产品 EMC 测试与整改思路

热门文章

  1. SpringBoot_数据访问-整合MyBatis(一)-基础环境搭建
  2. 基本的SQL-SELECT语句
  3. Hmac - Java加密与安全
  4. 自定义idea archetype
  5. 【报错笔记】MAVEN pom.xml 报错解决方法
  6. “==”和“equals”
  7. 2019年我总结的前端面试题
  8. Smart Link
  9. Git + Maven + Jenkins 实现自动化部署
  10. git使用的一些常用命令