文章目录

  • Log文件组织方式
  • Log文件的写入
    • AddRecord
    • EmitPhysicalRecord
  • Log文件的读取
    • SkipToInitialBlock
    • ReadPhysicalRecord
    • ReadRecord

Log文件组织方式

前文《leveldb源码解析系列—整体架构》中提到了Log文件,本篇对Log文件的组织方式进行解析

leveldb/doc/log_format.md中介绍了Log文件的组织方式

The log file contents are a sequence of 32KB blocks. The only exception is that the tail of the file may contain a partial block.

Each block consists of a sequence of records

Log文件是由一系列32KB的block组成,一个block包含一个或多个record,record的组织方式如下:

record :=checksum: uint32     // crc32c of type and data[] ; little-endianlength: uint16       // little-endiantype: uint8          // One of FULL, FIRST, MIDDLE, LASTdata: uint8[length]

可以看到,record有四种类型,因为record是变长的,所以需要通过type将它们进行区分:

The FULL record contains the contents of an entire user record.

FIRST, MIDDLE, LAST are types used for user records that have been split into multiple fragments (typically because of block boundaries). FIRST is the type of the first fragment of a user record, LAST is the type of the last fragment of
a user record, and MIDDLE is the type of all interior fragments of a user record.

关于Log文件组织方式更详细的介绍可参考doc/log_format.md,下面就Log文件的写入和读取进行解析

Log文件的写入

关于log文件写入的相关代码位于db/log_writer.hdb/log_writer.cc

class Writer {public:// Create a writer that will append data to "*dest".// "*dest" must be initially empty.// "*dest" must remain live while this Writer is in use.explicit Writer(WritableFile* dest);// Create a writer that will append data to "*dest".// "*dest" must have initial length "dest_length".// "*dest" must remain live while this Writer is in use.Writer(WritableFile* dest, uint64_t dest_length);Writer(const Writer&) = delete;Writer& operator=(const Writer&) = delete;~Writer();Status AddRecord(const Slice& slice);private:Status EmitPhysicalRecord(RecordType type, const char* ptr, size_t length);WritableFile* dest_;int block_offset_;  // Current offset in block// crc32c values for all supported record types.  These are// pre-computed to reduce the overhead of computing the crc of the// record type stored in the header.uint32_t type_crc_[kMaxRecordType + 1];
};

可以看到主要的类为leveldb::log::Writer,主要包含dest_block_offset_两个成员变量,和AddRecord成员函数,其中leveldb::log::Writer使用到了leveldb::WritableFile,关于WritableFile暂不分析

leveldb::log::Writer的每一个构造函数中都调用了静态函数InitTypeCrc来初始化type_crc_,下面着重分析一下AddReocrEmitPhysicalRecord函数

AddRecord

Status Writer::AddRecord(const Slice& slice) {const char* ptr = slice.data();size_t left = slice.size();// Fragment the record if necessary and emit it.  Note that if slice// is empty, we still want to iterate once to emit a single// zero-length recordStatus s;bool begin = true;do {const int leftover = kBlockSize - block_offset_;assert(leftover >= 0);if (leftover < kHeaderSize) {// Switch to a new blockif (leftover > 0) {// Fill the trailer (literal below relies on kHeaderSize being 7)static_assert(kHeaderSize == 7, "");dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));}block_offset_ = 0;}// Invariant: we never leave < kHeaderSize bytes in a block.assert(kBlockSize - block_offset_ - kHeaderSize >= 0);const size_t avail = kBlockSize - block_offset_ - kHeaderSize;const size_t fragment_length = (left < avail) ? left : avail;RecordType type;const bool end = (left == fragment_length);if (begin && end) {type = kFullType;} else if (begin) {type = kFirstType;} else if (end) {type = kLastType;} else {type = kMiddleType;}s = EmitPhysicalRecord(type, ptr, fragment_length);ptr += fragment_length;left -= fragment_length;begin = false;} while (s.ok() && left > 0);return s;
}
  1. 获取slice数据和长度,赋值给ptr和left
  2. 循环,直到数据全部写入
    1. 计算当前块内剩余容量,如果小于kHeaderSize,则插入x00,并且置block_offset_ = 0
    2. 计算除kHeaderSize外剩余容量,并且与left比较取较小值,判断是否到达end
    3. 根据beginend判断类型
    4. 执行EmitPhysicalRecord写到log文件中
    5. 增加ptr并且减少left

EmitPhysicalRecord

Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr,size_t length) {assert(length <= 0xffff);  // Must fit in two bytesassert(block_offset_ + kHeaderSize + length <= kBlockSize);// Format the headerchar buf[kHeaderSize];buf[4] = static_cast<char>(length & 0xff);buf[5] = static_cast<char>(length >> 8);buf[6] = static_cast<char>(t);// Compute the crc of the record type and the payload.uint32_t crc = crc32c::Extend(type_crc_[t], ptr, length);crc = crc32c::Mask(crc);  // Adjust for storageEncodeFixed32(buf, crc);// Write the header and the payloadStatus s = dest_->Append(Slice(buf, kHeaderSize));if (s.ok()) {s = dest_->Append(Slice(ptr, length));if (s.ok()) {s = dest_->Flush();}}block_offset_ += kHeaderSize + length;return s;
}

执行的操作就是构造Header,调用WritableFileappendflush,将record刷到物理磁盘上

Log文件的读取

class Reader {public:// Interface for reporting errors.class Reporter {public:virtual ~Reporter();// Some corruption was detected.  "bytes" is the approximate number// of bytes dropped due to the corruption.virtual void Corruption(size_t bytes, const Status& status) = 0;};// Create a reader that will return log records from "*file".// "*file" must remain live while this Reader is in use.//// If "reporter" is non-null, it is notified whenever some data is// dropped due to a detected corruption.  "*reporter" must remain// live while this Reader is in use.//// If "checksum" is true, verify checksums if available.//// The Reader will start reading at the first record located at physical// position >= initial_offset within the file.Reader(SequentialFile* file, Reporter* reporter, bool checksum,uint64_t initial_offset);Reader(const Reader&) = delete;Reader& operator=(const Reader&) = delete;~Reader();// Read the next record into *record.  Returns true if read// successfully, false if we hit end of the input.  May use// "*scratch" as temporary storage.  The contents filled in *record// will only be valid until the next mutating operation on this// reader or the next mutation to *scratch.bool ReadRecord(Slice* record, std::string* scratch);// Returns the physical offset of the last record returned by ReadRecord.//// Undefined before the first call to ReadRecord.uint64_t LastRecordOffset();private:// Extend record types with the following special valuesenum {kEof = kMaxRecordType + 1,// Returned whenever we find an invalid physical record.// Currently there are three situations in which this happens:// * The record has an invalid CRC (ReadPhysicalRecord reports a drop)// * The record is a 0-length record (No drop is reported)// * The record is below constructor's initial_offset (No drop is reported)kBadRecord = kMaxRecordType + 2};// Skips all blocks that are completely before "initial_offset_".//// Returns true on success. Handles reporting.bool SkipToInitialBlock();// Return type, or one of the preceding special valuesunsigned int ReadPhysicalRecord(Slice* result);// Reports dropped bytes to the reporter.// buffer_ must be updated to remove the dropped bytes prior to invocation.void ReportCorruption(uint64_t bytes, const char* reason);void ReportDrop(uint64_t bytes, const Status& reason);SequentialFile* const file_;Reporter* const reporter_;bool const checksum_;char* const backing_store_;Slice buffer_;bool eof_;  // Last Read() indicated EOF by returning < kBlockSize// Offset of the last record returned by ReadRecord.uint64_t last_record_offset_;// Offset of the first location past the end of buffer_.uint64_t end_of_buffer_offset_;// Offset at which to start looking for the first record to returnuint64_t const initial_offset_;// True if we are resynchronizing after a seek (initial_offset_ > 0). In// particular, a run of kMiddleType and kLastType records can be silently// skipped in this modebool resyncing_;
};

leveldb::log::Reader中有几个成员变量需要解释

char* const backing_store_          //实际存储Slice数据的指针
Slice buffer_;                      //读取物理record时的内存buffer,如果不遇到eof则是一个kBlockSize
uint64_t last_record_offset_;       //Offset of the last record returned by ReadRecord.
uint64_t end_of_buffer_offset_;     //当前的读取偏移
uint64_t const initial_offset_;     //Offset at which to start looking for the first record to return
bool resyncing_;

SkipToInitialBlock

bool Reader::SkipToInitialBlock() {const size_t offset_in_block = initial_offset_ % kBlockSize;uint64_t block_start_location = initial_offset_ - offset_in_block;// Don't search a block if we'd be in the trailerif (offset_in_block > kBlockSize - 6) {block_start_location += kBlockSize;}end_of_buffer_offset_ = block_start_location;// Skip to start of first block that can contain the initial recordif (block_start_location > 0) {Status skip_status = file_->Skip(block_start_location);if (!skip_status.ok()) {ReportDrop(block_start_location, skip_status);return false;}}return true;
}

根据initial_offset_执行调整和跳转

  1. 计算块内偏移offset_in_block和起始块地址block_start_location
  2. 如果块内偏移位于trailer,起始块地址跳到下一个块
  3. end_of_buffer_offset = block_start_location
  4. file_->Skip(block_start_location)

ReadPhysicalRecord

unsigned int Reader::ReadPhysicalRecord(Slice* result) {while (true) {if (buffer_.size() < kHeaderSize) {if (!eof_) {// Last read was a full read, so this is a trailer to skipbuffer_.clear();Status status = file_->Read(kBlockSize, &buffer_, backing_store_);end_of_buffer_offset_ += buffer_.size();if (!status.ok()) {buffer_.clear();ReportDrop(kBlockSize, status);eof_ = true;return kEof;} else if (buffer_.size() < kBlockSize) {eof_ = true;}continue;} else {// Note that if buffer_ is non-empty, we have a truncated header at the// end of the file, which can be caused by the writer crashing in the// middle of writing the header. Instead of considering this an error,// just report EOF.buffer_.clear();return kEof;}}// Parse the headerconst char* header = buffer_.data();const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;const unsigned int type = header[6];const uint32_t length = a | (b << 8);if (kHeaderSize + length > buffer_.size()) {size_t drop_size = buffer_.size();buffer_.clear();if (!eof_) {ReportCorruption(drop_size, "bad record length");return kBadRecord;}// If the end of the file has been reached without reading |length| bytes// of payload, assume the writer died in the middle of writing the record.// Don't report a corruption.return kEof;}if (type == kZeroType && length == 0) {// Skip zero length record without reporting any drops since// such records are produced by the mmap based writing code in// env_posix.cc that preallocates file regions.buffer_.clear();return kBadRecord;}// Check crcif (checksum_) {uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);if (actual_crc != expected_crc) {// Drop the rest of the buffer since "length" itself may have// been corrupted and if we trust it, we could find some// fragment of a real log record that just happens to look// like a valid log record.size_t drop_size = buffer_.size();buffer_.clear();ReportCorruption(drop_size, "checksum mismatch");return kBadRecord;}}buffer_.remove_prefix(kHeaderSize + length);// Skip physical record that started before initial_offset_if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <initial_offset_) {result->clear();return kBadRecord;}*result = Slice(header + kHeaderSize, length);return type;}
}

读取下一个物理record

  1. 读取block数据到buffer_中,如果读取到的buffer_ < kBlockSize,说明读到了文件末尾,重新进入下一次循环后,如果buffer_.size() < kHeaderSize说明header被截断了,返回kEof。如果解析header后发现kHeaderSize + length > buffer_.size(),说明数据不完整,返回kEof
  2. 如果读取到了zero length record,返回kBadRecord
  3. 检查crc
  4. Skip physical record that started before initial_offset_
  5. 构造result

ReadRecord

bool Reader::ReadRecord(Slice* record, std::string* scratch) {if (last_record_offset_ < initial_offset_) {if (!SkipToInitialBlock()) {return false;}}scratch->clear();record->clear();bool in_fragmented_record = false;// Record offset of the logical record that we're reading// 0 is a dummy value to make compilers happyuint64_t prospective_record_offset = 0;Slice fragment;while (true) {const unsigned int record_type = ReadPhysicalRecord(&fragment);// ReadPhysicalRecord may have only had an empty trailer remaining in its// internal buffer. Calculate the offset of the next physical record now// that it has returned, properly accounting for its header size.uint64_t physical_record_offset =end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();if (resyncing_) {if (record_type == kMiddleType) {continue;} else if (record_type == kLastType) {resyncing_ = false;continue;} else {resyncing_ = false;}}switch (record_type) {case kFullType:if (in_fragmented_record) {// Handle bug in earlier versions of log::Writer where// it could emit an empty kFirstType record at the tail end// of a block followed by a kFullType or kFirstType record// at the beginning of the next block.if (!scratch->empty()) {ReportCorruption(scratch->size(), "partial record without end(1)");}}prospective_record_offset = physical_record_offset;scratch->clear();*record = fragment;last_record_offset_ = prospective_record_offset;return true;case kFirstType:if (in_fragmented_record) {// Handle bug in earlier versions of log::Writer where// it could emit an empty kFirstType record at the tail end// of a block followed by a kFullType or kFirstType record// at the beginning of the next block.if (!scratch->empty()) {ReportCorruption(scratch->size(), "partial record without end(2)");}}prospective_record_offset = physical_record_offset;scratch->assign(fragment.data(), fragment.size());in_fragmented_record = true;break;case kMiddleType:if (!in_fragmented_record) {ReportCorruption(fragment.size(),"missing start of fragmented record(1)");} else {scratch->append(fragment.data(), fragment.size());}break;case kLastType:if (!in_fragmented_record) {ReportCorruption(fragment.size(),"missing start of fragmented record(2)");} else {scratch->append(fragment.data(), fragment.size());*record = Slice(*scratch);last_record_offset_ = prospective_record_offset;return true;}break;case kEof:if (in_fragmented_record) {// This can be caused by the writer dying immediately after// writing a physical record but before completing the next; don't// treat it as a corruption, just ignore the entire logical record.scratch->clear();}return false;case kBadRecord:if (in_fragmented_record) {ReportCorruption(scratch->size(), "error in middle of record");in_fragmented_record = false;scratch->clear();}break;default: {char buf[40];std::snprintf(buf, sizeof(buf), "unknown record type %u", record_type);ReportCorruption((fragment.size() + (in_fragmented_record ? scratch->size() : 0)),buf);in_fragmented_record = false;scratch->clear();break;}}}return false;
}

读取下一个逻辑record

  1. 根据initial_offset_跳转到调用者指定的位置,开始读取日志文件。跳转就是调用SequentialFile的Seek接口
  2. 初始化in_fragmented_record = false,如果遇到kFirstType,这个标志置为true,记录该逻辑record是碎片组成的,分布在多个block内
  3. 初始化prospective_record_offset = 0,prospective_record_offset是逻辑record的偏移地址
  4. 执行循环,直到读取到所有的物理record
    1. 读取下一个物理record
    2. 判断record_type,如果是kFullType,更新last_record_offset_并返回;如果是kFirstType,更新scratch,并且置in_fragmented_record为true;如果是kMiddleType,先检查in_fragmented_record是否为true,如果不为true,report “missing start of fragmented record(1)”,否则更新scratch;如果是kLastType,同样检查in_fragmented_record,更新scratch并赋值给record,记录last_record_offset_,然后返回;如果是其它类型,会做一些错误处理
    3. 注意在record_type为kFullType和kFirstType时,同样也会检查in_fragmented_record,这是对leveldb早期版本的work around,leveldb早期是可以emit an empty kFirstType record at the tail end of a block的,但是这里并未做一些处理,而是直接ReportCorruption

leveldb源码解析系列—Log相关推荐

  1. TiKV 源码解析系列文章(二)raft-rs proposal 示例情景分析

    作者:屈鹏 本文为 TiKV 源码解析系列的第二篇,按照计划首先将为大家介绍 TiKV 依赖的周边库 raft-rs .raft-rs 是 Raft 算法的 Rust 语言实现.Raft 是分布式领域 ...

  2. Tomcat源码解析系列二:Tomcat总体架构

    Tomcat即是一个HTTP服务器,也是一个servlet容器,主要目的就是包装servlet,并对请求响应相应的servlet,纯servlet的web应用似乎很好理解Tomcat是如何装载serv ...

  3. TiKV 源码解析系列 - Raft 的优化

    这篇文章转载TiDB大牛 唐刘 的博客:https://mp.weixin.qq.com/s?__biz=MzI3NDIxNTQyOQ==&mid=2247484544&idx=1&a ...

  4. openGauss数据库源码解析系列文章--openGauss简介(一)

    openGauss数据库是华为深度融合在数据库领域多年经验,结合企业级场景要求推出的新一代企业级开源数据库.此前,Gauss松鼠会已经发布了openGauss数据库核心技术系列文章,介绍了openGa ...

  5. Redux 源码解析系列(一) -- Redux的实现思想

    文章来源: IMweb前端社区 黄qiong(imweb.io) IMweb团队正在招聘啦,简历发至jayccchen@tencent.com Redux 其实是用来帮我们管理状态的一个框架,它暴露给 ...

  6. prometheus变量_TiKV 源码解析系列文章(四)Prometheus(下)

    本文为 TiKV 源码解析系列的第四篇,接上篇继续为大家介绍 rust-prometheus.上篇主要介绍了基础知识以及最基本的几个指标的内部工作机制,本篇会进一步介绍更多高级功能的实现原理. 与上篇 ...

  7. Netty 源码解析系列-服务端启动流程解析

    netty源码解析系列 Netty 源码解析系列-服务端启动流程解析 Netty 源码解析系列-客户端连接接入及读I/O解析 五分钟就能看懂pipeline模型 -Netty 源码解析 1.服务端启动 ...

  8. Mybatis3 源码解析系列

    简介 Mybatis作为一个优秀的Java持久化框架,在我们的日常工作中相信都会用到,本次源码解析系列,就开始探索下Mybatis 总结 在MyBatis的学习中,首先通读了<MyBatis3源 ...

  9. openGauss数据库源码解析系列文章——openGauss开发快速入门(二)

    在上一篇openGauss数据库源码解析系列文章--openGauss开发快速入门(上)中,我们介绍了openGauss的安装部署方法,本篇将具体介绍openGauss基本使用. 二. openGau ...

最新文章

  1. LeetCode简单题之交替合并字符串
  2. flex--unable to transcode image
  3. C/C++变量存储区域
  4. 【正一专栏】欧冠四强猜想—不是冤家不聚首
  5. 面试官:为什么HTTPS是安全的
  6. oracle快速了解法,【oracle】rownum的快速了解
  7. cc java开发环境搭建_Windows系统下java开发环境搭建
  8. php api命名历史,PHP历史上的今天查询api源码
  9. 存储过程存储函数得简记(转)
  10. WVI职业价值观测量表
  11. C#下拉列表绑定数据库的使用三层实现
  12. java中文文档官方下载
  13. ubuntu 12.04 LTS 安装配置JDK1.6.0_45
  14. 遗传算法(Genetic Algorithm, GA)及MATLAB实现
  15. 协议--SIP/SDP
  16. ThingWorx公开课圆满结束
  17. 文献检索、整理、归纳
  18. ACL 2021 | 基于词依存信息类型映射记忆神经网络的关系抽取
  19. rqt teb参数动态调试工具_teb
  20. GitHub加载过慢或加载不出来应该怎么解决

热门文章

  1. Swin Transformer 论文精读,并解析其模型结构
  2. 【六】HTML之媒体标签
  3. Understand基本使用
  4. 如何编写微信小程序的网络请求的代码
  5. 【AAAI 2021】在线知识蒸馏中的对等协同学习:Peer Collaborative Learning for Online Knowledge Distillation
  6. 【深度学习】线上租用设备平台体验以及踩过的坑(非广告)
  7. 企业自动运行系统——产品策略
  8. 2021 年苏州大学计算机考研专业课 872 真题 --- 数据结构部分
  9. 偏差(Bias)与方差(Variance)
  10. python 网格数据插值_python – 网格数据的快速插值