leveldb数据的插入与获取

leveldb提供的数据的交互接口如下;

// Set the database entry for "key" to "value".  Returns OK on success,// and a non-OK status on error.// Note: consider setting options.sync = true.virtual Status Put(const WriteOptions& options, const Slice& key,const Slice& value) = 0;// Remove the database entry (if any) for "key".  Returns OK on// success, and a non-OK status on error.  It is not an error if "key"// did not exist in the database.// Note: consider setting options.sync = true.virtual Status Delete(const WriteOptions& options, const Slice& key) = 0;// Apply the specified updates to the database.// Returns OK on success, non-OK on failure.// Note: consider setting options.sync = true.virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0;// If the database contains an entry for "key" store the// corresponding value in *value and return OK.//// If there is no entry for "key" leave *value unchanged and return// a status for which Status::IsNotFound() returns true.//// May return some other Status on an error.virtual Status Get(const ReadOptions& options, const Slice& key,std::string* value) = 0;

主要提供了Write,Delete,Get和Put等操作接口,本文就来分析一下这些操作的具体流程。

Put和Delete操作

概述说明,在上一篇打开的流程过程中,调用了DB::Open的函数,该函数在完成打开操作之后,初始化了一个db,是初始化本质是一个DBImpl实现了DB接口的子类,所以在调用DB->Get或者DB->Put的时候其实是调用了DBImpl的对应方法,而DBImpl则直接类似与如下形式;

Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {return DB::Put(o, key, val);
}

直接就调用了父类的DB的静态方法Put函数,同理我们查看Delete函数;

// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {WriteBatch batch;batch.Put(key, value);return Write(opt, &batch);
}Status DB::Delete(const WriteOptions& opt, const Slice& key) {WriteBatch batch;batch.Delete(key);return Write(opt, &batch);
}

从代码执行过程可以看出,都是先通过一个WriteBatch来先保存操作的流程,然后调用Write方法,将对应的batch去执行。

WriteBatch类
class LEVELDB_EXPORT WriteBatch {public:class LEVELDB_EXPORT Handler {public:virtual ~Handler();virtual void Put(const Slice& key, const Slice& value) = 0;virtual void Delete(const Slice& key) = 0;};WriteBatch();// Intentionally copyable.WriteBatch(const WriteBatch&) = default;WriteBatch& operator=(const WriteBatch&) = default;~WriteBatch();// Store the mapping "key->value" in the database.void Put(const Slice& key, const Slice& value);                                   // 添加// If the database contains a mapping for "key", erase it.  Else do nothing.void Delete(const Slice& key);                                                    // 删除// Clear all updates buffered in this batch.void Clear();// The size of the database changes caused by this batch.//// This number is tied to implementation details, and may change across// releases. It is intended for LevelDB usage metrics.size_t ApproximateSize() const;// Copies the operations in "source" to this batch.//// This runs in O(source size) time. However, the constant factor is better// than calling Iterate() over the source batch with a Handler that replicates// the operations into this batch.void Append(const WriteBatch& source);                                            // 追加// Support for iterating over the contents of a batch.Status Iterate(Handler* handler) const;private:friend class WriteBatchInternal;std::string rep_;  // See comment in write_batch.cc for the format of rep_
};}  // namespace leveldb

该类的具体实现如下;

static const size_t kHeader = 12;WriteBatch::WriteBatch() { Clear(); }WriteBatch::~WriteBatch() = default;WriteBatch::Handler::~Handler() = default;void WriteBatch::Clear() {rep_.clear();                               // 清理 rep_.resize(kHeader);
}size_t WriteBatch::ApproximateSize() const { return rep_.size(); }      // 返回rep_的字符串的大小Status WriteBatch::Iterate(Handler* handler) const {                    // 迭代器Slice input(rep_);if (input.size() < kHeader) {                                         // 如果输入的大小小于头部信息的大小 则太小了return Status::Corruption("malformed WriteBatch (too small)");}input.remove_prefix(kHeader);                                         // 移除头部Slice key, value;int found = 0;while (!input.empty()) {                                              // 检查是否为空found++;char tag = input[0];                                                // 获取当前的taginput.remove_prefix(1);                                             // 移除一个该位switch (tag) {                                                      // 检查该tag是Put还是Deletecase kTypeValue:                                                  // 如果是添加if (GetLengthPrefixedSlice(&input, &key) &&                       GetLengthPrefixedSlice(&input, &value)) {                   // 分别获取key 和 valuehandler->Put(key, value);                                     //  调用handler去添加} else {return Status::Corruption("bad WriteBatch Put");}break;case kTypeDeletion:                                               // 如果是删除if (GetLengthPrefixedSlice(&input, &key)) {                     // 获取对应的keyhandler->Delete(key);                                         // 调用handle的删除方法} else {return Status::Corruption("bad WriteBatch Delete");}break;default:return Status::Corruption("unknown WriteBatch tag");            // 如果tag不对则 返回错误}}if (found != WriteBatchInternal::Count(this)) {                       // 检查查找到的与当前数据保存的数据是否相同 return Status::Corruption("WriteBatch has wrong count");} else {return Status::OK();                                                // 返回成功}
}int WriteBatchInternal::Count(const WriteBatch* b) {return DecodeFixed32(b->rep_.data() + 8);                             // 获取大小
}void WriteBatchInternal::SetCount(WriteBatch* b, int n) {EncodeFixed32(&b->rep_[8], n);                                        // 设置count
}SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {return SequenceNumber(DecodeFixed64(b->rep_.data()));                 // 获取序列号
}void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {EncodeFixed64(&b->rep_[0], seq);                                      // 设置序列号
}void WriteBatch::Put(const Slice& key, const Slice& value) {            // 插入数据WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);    // 设置countrep_.push_back(static_cast<char>(kTypeValue));                        // 压入类型数据PutLengthPrefixedSlice(&rep_, key);                                   // 设置数据PutLengthPrefixedSlice(&rep_, value);                                 // 设置value
}void WriteBatch::Delete(const Slice& key) {WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);    // 设置计数rep_.push_back(static_cast<char>(kTypeDeletion));                           // 压入类型数据PutLengthPrefixedSlice(&rep_, key);                                         // 压入数据
}void WriteBatch::Append(const WriteBatch& source) {WriteBatchInternal::Append(this, &source);                              // 调用WriteBatchInternal的append函数
}namespace {
class MemTableInserter : public WriteBatch::Handler {                     // MemTable插入类public:SequenceNumber sequence_;MemTable* mem_;void Put(const Slice& key, const Slice& value) override {               // 添加内容mem_->Add(sequence_, kTypeValue, key, value);                         // 添加序列号 插入类型  key  valuesequence_++;}void Delete(const Slice& key) override {mem_->Add(sequence_, kTypeDeletion, key, Slice());                    // 添加内容 序列号  删除类型  key 空的valuesequence_++;}
};
}  // namespaceStatus WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {MemTableInserter inserter;inserter.sequence_ = WriteBatchInternal::Sequence(b);                   // 先获取序列号inserter.mem_ = memtable;                                               // 设置memtabereturn b->Iterate(&inserter);                                           // 迭代插入
}void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {assert(contents.size() >= kHeader);b->rep_.assign(contents.data(), contents.size());                        // 重置内容为content的内容,并且设置内容大小
}void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {SetCount(dst, Count(dst) + Count(src));assert(src->rep_.size() >= kHeader);dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);     // 追加 内容加头部信息  大小要减去头部信息
}}  // namespace leveldb

该类主要就是包括了对memtable的追加删除等操作,基本上都暴露了对外提供操作的接口。

Write函数
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {Writer w(&mutex_);                                    // 初始化一个Writer        w.batch = updates;                                    // 保存对应的WriteBatchw.sync = options.sync;                                // 是否是同步写入w.done = false;                                       // 是否已经完成MutexLock l(&mutex_);                                 // 初始化线程锁writers_.push_back(&w);                               // 添加到队列中while (!w.done && &w != writers_.front()) {           // 如果当前的任务还没有完成 并且当前的任务不是w 则等待w.cv.Wait();}if (w.done) {                                         // 如果任务已经完成则返回状态否则就继续执行return w.status;}// May temporarily unlock and wait.Status status = MakeRoomForWrite(updates == nullptr);             // 获取可以写入的空间uint64_t last_sequence = versions_->LastSequence();               // 获取最后的序列号Writer* last_writer = &w;if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions       // 有空间可用并且update有内容输入WriteBatch* updates = BuildBatchGroup(&last_writer);                              // 获取当前要操作的WriteBatchWriteBatchInternal::SetSequence(updates, last_sequence + 1);                      // 设置序列号last_sequence += WriteBatchInternal::Count(updates);                              // 序列号加上当前执行完的大小// Add to log and apply to memtable.  We can release the lock// during this phase since &w is currently responsible for logging// and protects against concurrent loggers and concurrent writes// into mem_.{mutex_.Unlock();                                                                // 获取锁status = log_->AddRecord(WriteBatchInternal::Contents(updates));                // 将内容添加到当前的日志中bool sync_error = false;if (status.ok() && options.sync) {                                              // 如果添加成功 并且需要同步写入status = logfile_->Sync();                                                    // 调用同步写入的函数if (!status.ok()) {                                                           // 如果同步写入失败sync_error = true;                                                          // 设置同步写入失败的标志}}if (status.ok()) {                                                              // 如果成功status = WriteBatchInternal::InsertInto(updates, mem_);                       // 将内容插入到memTable中}mutex_.Lock();                                                                  // 释放if (sync_error) {                                                               // 如果同步失败 则记录失败的状态// The state of the log file is indeterminate: the log record we// just added may or may not show up when the DB is re-opened.// So we force the DB into a mode where all future writes fail.RecordBackgroundError(status);}}if (updates == tmp_batch_) tmp_batch_->Clear();                                   // 如果update与临时的batch相同则释放tmp_batchversions_->SetLastSequence(last_sequence);                                        // 插入最新的序列号}while (true) {                                                                      // 遍历writersWriter* ready = writers_.front();writers_.pop_front();                                                             // 删除第一个if (ready != &w) {                                                                // 如果当前的第一个不等于wready->status = status;                                                         // 获取当前的状态ready->done = true;                                                             // 设置为doneready->cv.Signal();                                                             // 唤醒其他等待的线程}if (ready == last_writer) break;                                                  // 如果等于当前的就停止}// Notify new head of write queueif (!writers_.empty()) {                                                            // 检查当前的writers队列是否为空 如果不为空则 唤醒剩余等待要执行的线程writers_.front()->cv.Signal();}return status;                                                                      // 返回状态
}

Write函数主要就是讲数据封装成一个writer,然后将该writer压入一个队列中,如果队列中还有未完成的操作则进入等待,因为可以将多个操作压缩在一起执行,如果队列中没有其他数据或者阻塞的队列被唤醒,则先检查writer是否被执行完成,因为有可能压入队列的数据被批量执行完成,如果没有被完成,则首先去检查当前是否还有空间可用(MakeRoomForWrite),如果有内容并且update有数据可更新则将更新重新用BuildBatchGroup包装一下将多个操作数据压缩在一起,然后先将要操作的数据追加到日志中AddRecord,然后再添加通过InsertInto添加到MemTable中,基本执行逻辑如上所述。

MakeRoomForWrite检查空间
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(bool force) {mutex_.AssertHeld();                                      // 检查是否获取锁assert(!writers_.empty());bool allow_delay = !force;Status s;while (true) {if (!bg_error_.ok()) {                                  // 如果出错则直接停止并设置出错状态并返回// Yield previous errors = bg_error_;break;} else if (allow_delay && versions_->NumLevelFiles(0) >=config::kL0_SlowdownWritesTrigger) {      // 是否可以延迟  并检查延迟触发的时间是否大于配置值// We are getting close to hitting a hard limit on the number of// L0 files.  Rather than delaying a single write by several// seconds when we hit the hard limit, start delaying each// individual write by 1ms to reduce latency variance.  Also,// this delay hands over some CPU to the compaction thread in// case it is sharing the same core as the writer.mutex_.Unlock();env_->SleepForMicroseconds(1000);                                       // 休息一秒                       allow_delay = false;  // Do not delay a single write more than once      // 只能休眠一次mutex_.Lock();  } else if (!force &&(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {    // 如果不是强制使用新的并且 当前mem可用的内存大小 小于等于待写的数据内容大小则证明有空间可用则终止循环并返回// There is room in current memtablebreak;} else if (imm_ != nullptr) {                                           // 如果不为空// We have filled up the current memtable, but the previous// one is still being compacted, so we wait.Log(options_.info_log, "Current memtable full; waiting...\n");background_work_finished_signal_.Wait();                                // 等待数据落盘之后被唤醒} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {// There are too many level-0 files.Log(options_.info_log, "Too many L0 files; waiting...\n");              // 如果太多了第0层文件则等待background_work_finished_signal_.Wait();} else {// Attempt to switch to a new memtable and trigger compaction of oldassert(versions_->PrevLogNumber() == 0);                                // 判断之前的日志数为0uint64_t new_log_number = versions_->NewFileNumber();                   // 获取一个新的文件WritableFile* lfile = nullptr;s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);    // 设置一个新的log写文件if (!s.ok()) {                                                           // 如果出错// Avoid chewing through file number space in a tight loop.versions_->ReuseFileNumber(new_log_number);                            // 重新使用该文件号break;}delete log_;delete logfile_;logfile_ = lfile;logfile_number_ = new_log_number;                                         // 设置文件并这只文件编号log_ = new log::Writer(lfile);                                            // 生成一个log实例imm_ = mem_;                                                              // 获取旧的mem内容has_imm_.store(true, std::memory_order_release);                          // 保存该数据mem_ = new MemTable(internal_comparator_);                                // 申请一个新的memtablemem_->Ref();force = false;  // Do not force another compaction if have roomMaybeScheduleCompaction();                                                // 将就数据调度落盘}}return s;
}

检查是否还有空间,如果空间不够则重新生成新的memtable空间来装载数据。

BuildBatchGroup合并操作内容
// REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-null batch
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {mutex_.AssertHeld();assert(!writers_.empty());Writer* first = writers_.front();                                     // 获取第一个WriteBatch* result = first->batch;                                    // 获取第一个的batchassert(result != nullptr);size_t size = WriteBatchInternal::ByteSize(first->batch);             // 获取操作的数据大小// Allow the group to grow up to a maximum size, but if the// original write is small, limit the growth so we do not slow// down the small write too much.size_t max_size = 1 << 20;                                            // 获取最大的字节数if (size <= (128 << 10)) {max_size = size + (128 << 10);}*last_writer = first;std::deque<Writer*>::iterator iter = writers_.begin();                // 获取迭代器++iter;  // Advance past "first"for (; iter != writers_.end(); ++iter) {                              // 循环数据Writer* w = *iter;if (w->sync && !first->sync) {                                      // 检查当前的sync是否与第一个的sync一直 即如果w为同步  而first未异步则停止// Do not include a sync write into a batch handled by a non-sync write.break;}if (w->batch != nullptr) {size += WriteBatchInternal::ByteSize(w->batch);                   // 获取插入数据大小if (size > max_size) {                                            // 如果超过最大值则停止// Do not make batch too bigbreak;              }// Append to *resultif (result == first->batch) {                                     // 如果result 与第一个的batch相同// Switch to temporary batch instead of disturbing caller's batchresult = tmp_batch_;assert(WriteBatchInternal::Count(result) == 0);   WriteBatchInternal::Append(result, first->batch);              // 追加数据}WriteBatchInternal::Append(result, w->batch);                     // 追加内容}*last_writer = w;                                                   // 重置最后一个继续循环}return result;                                                        // 返回结果
}

主要就是合并对应的操作数据,将相同的数据进行append,以此可以提高单次的操作效率。

总结

有关数据的基本的Put和Delete的流程,基本上就是将数据通过WriteBatch类,将插入操作和写入操作都做成了write的操作,以此提高了写入的效率,让在删除的时候直接以类型的形式去添加到数据中,本文只是从基本代码流程上分析了Put和Delete对应的操作。由于本人才疏学浅,如有错误请批评指正。

leveldb源码分析:数据插入与删除(Put与Delete)相关推荐

  1. Leveldb源码分析--1

    [前言:看了一点oceanbase,没有意志力继续坚持下去了,暂时就此中断,基本上算把master看完了,比较重要的update server和merge server代码却没有细看.中间又陆续研究了 ...

  2. 【VUE】源码分析 - 数据劫持的基本原理

    tips:本系列博客的代码部分(示例等除外),均出自vue源码内容,版本为2.6.14.但是为了增加易读性,会对不相关内容做选择性省略.如果大家想了解完整的源码,建议自行从官方下载.https://g ...

  3. leveldb源码分析:数据插入续(跳表)

    leveldb数据的插入-跳表 本文主要是接着上一篇文章,继续深入探索Write函数调用插入之后的流程. status = WriteBatchInternal::InsertInto(updates ...

  4. leveldb源码分析:数据查询

    leveldb数据查询 查询的示例代码如下: string res; status = db->Get(ReadOptions(), "KeyNameExample", &a ...

  5. leveldb源码分析:Open启动流程

    leveldb概述 Leveldb 是一个持久化的KV存储系统,主要将大部分数据存储在磁盘上,在存储数据的过程中,根据记录的key值有序存储,当然使用者也可以自定义Key大小比较函数,一个leveld ...

  6. LevelDB 源码分析

    本文基于leveldb 1.9.0代码. 整体架构 如上图,leveldb的数据存储在内存以及磁盘上,其中: memtable:存储在内存中的数据,使用skiplist实现. immutable me ...

  7. View系列 :源码分析:RecyclerView滑动删除 全解析

    1:效果展示 效果很简单,就是 RecycleView的 滑动删除功能 2:效果分析 主要是三个步骤: 步骤一:是RecyclerView 的每一个条目上增加 删除 View控件,这个是静态xml页面 ...

  8. Nginx源码分析--数据对齐posix_memalign和memalign函数

    posix_memalign函数() /*  * 背景:  *      1)POSIX 1003.1d  *      2)POSIX 标明了通过malloc( ), calloc( ), 和 re ...

  9. 风讯dotNETCMS源码分析—数据存取篇

    前几天突然对CMS感兴趣,就去下载了风讯dotNETCMS源码.当前版本是dotnetcms1.0 sp5免费版,风讯的官方主页上可以下载. 用Visual Studio 2008打开后,初步分析了它 ...

最新文章

  1. resultmap为list_MyBatis源码:原来 resultMap 解析完是这样
  2. tf.broadcast_dynamic_shape
  3. mac远程连接centos安装mysql_centos安装Mysql并远程连接
  4. ZeroMQ之Publish/Subscribe (Java)
  5. Unity3d烘焙常见黑斑解决方法(适用5.x、2017、2018、2019版)
  6. 2018年《大数据》杂志调查问卷
  7. SQL 一次插入多条记录
  8. MFC 小知识总结五
  9. 下载并还原AdventureWorksDW2012数据库
  10. 让Android 设备通过USB 转RJ45有线网卡上网
  11. Java并发练习:exchange简单使用
  12. 微信邮箱是什么?微信如何绑定邮箱收发邮件?
  13. lpc1768的gpio库函数_LPC1768之GPIO输入和输出配置基础例程
  14. 虚幻4引擎源码学习笔记(一):整体文件结构
  15. matlab getprmdflt,DFLT40A-7中文资料
  16. C语言中字符数组的初始化与赋值,字符串相关函数
  17. 中国内地城市CA分布
  18. JAVA科学计数法,金额数字,转换为中文大写字
  19. 学习真的是一件很枯燥的事情。
  20. 2022-2028全球与中国超级电容器市场现状及未来发展趋势

热门文章

  1. 什么?我要对AI礼貌?人机交互面临的道德漏洞
  2. 低代码、RPA 和 AI,有什么区别
  3. 新转机!2020年想裸辞的程序员们注意了
  4. 想提前目睹人到中年的发型?试试这款自制秃头生成器
  5. 4 月 24 日开播!基于神经网络建模的信息传播预测
  6. 近期必读的6篇NeurIPS 2019零样本学习论文
  7. 十年公务员转行IT,自学AI三年,他淬炼出746页机器学习入门笔记
  8. 详解 | 推荐系统的工程实现
  9. 2018最后一个月的Python热文Top10!赶紧学起来~
  10. 不要再满屏写 try...catch 了!这个更香!