leveldb数据查询

查询的示例代码如下:

string res;
status = db->Get(ReadOptions(), "KeyNameExample", &res);

本文就先分析一下数据的获取流程。

db->Get获取数据

主要就是调用db的Get方法来查找数据;

Status DBImpl::Get(const ReadOptions& options, const Slice& key,std::string* value) {Status s;MutexLock l(&mutex_);SequenceNumber snapshot;if (options.snapshot != nullptr) {                                  // 检查快照是否为空指针snapshot =static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();   // 获取快照对应的序列号} else {snapshot = versions_->LastSequence();                                        // 否则就获取版本最新的序列号}MemTable* mem = mem_;                                                           // 当前memtable内容MemTable* imm = imm_;                                                           // 不可变内容Version* current = versions_->current();                                        // 获取当前的版本mem->Ref();if (imm != nullptr) imm->Ref();current->Ref();bool have_stat_update = false;                                                  // 是否有更新标志位 设置为FalseVersion::GetStats stats;// Unlock while reading from files and memtables{mutex_.Unlock();                                                              // 获取锁// First look in the memtable, then in the immutable memtable (if any).LookupKey lkey(key, snapshot);                                                // 将内容包装成LookupKey实例if (mem->Get(lkey, value, &s)) {                                              // 先在mem当中查找key// Done} else if (imm != nullptr && imm->Get(lkey, value, &s)) {                     // 如果在mem中没有查找到该key则在imm中查找数据// Done} else {s = current->Get(options, lkey, value, &stats);                             // 最后再文件中查找 即level层级的数据块中查找have_stat_update = true;                                                    // 此时设置更新为true}mutex_.Lock();}if (have_stat_update && current->UpdateStats(stats)) {                          // 如果在level层级文件中查找 并且当前的内容有更改则调用合并MaybeScheduleCompaction();}mem->Unref();                                                                   // 引用计数恢复if (imm != nullptr) imm->Unref();current->Unref();return s;                                                                       // 返回状态
}

从执行流程可知,获取数据时的优先级主要就是三个;

  1. 从当前内存memTable中获取;
  2. 如果第一步未获取到,则从当前的不可修改的imm中获取;
  3. 如果第二步未获取到,则从level层中去获取数据;
从memeTable中查找
mem->Get(lkey, value, &s)

此时调用的就是mem的Get方法来查找;

bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {Slice memkey = key.memtable_key();                                          // 先获取key的数据Table::Iterator iter(&table_);                                              // 生成table的迭代器iter.Seek(memkey.data());                                                   // 查找数据 if (iter.Valid()) {                                                         // 如果找到// entry format is://    klength  varint32//    userkey  char[klength]//    tag      uint64//    vlength  varint32//    value    char[vlength]// Check that it belongs to same user key.  We do not check the// sequence number since the Seek() call above should have skipped// all entries with overly large sequence numbers.                        // 获取整个数据const char* entry = iter.key();uint32_t key_length;const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);      // 获取指针内容值if (comparator_.comparator.user_comparator()->Compare(Slice(key_ptr, key_length - 8), key.user_key()) == 0) {           // key内容是否相同// Correct user keyconst uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);           // 获取该key的标志位  是删除还是新增数据switch (static_cast<ValueType>(tag & 0xff)) {case kTypeValue: {Slice v = GetLengthPrefixedSlice(key_ptr + key_length);             // 如果是未删除数据value->assign(v.data(), v.size());                                  // 设置到velue中并返回return true;}case kTypeDeletion:*s = Status::NotFound(Slice());                                     // 如果该数据为删除则标记为NotFoundreturn true;}}}return false;
}

其中有关iter.Seek的方法,本质上其实就是调用的是table_.Seek方法,而table_又是SkipList类型所以最终调用的是FindGreaterOrEqual;

template <typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::Seek(const Key& target) {node_ = list_->FindGreaterOrEqual(target, nullptr);
}template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key,Node** prev) const {Node* x = head_;                                      // 获取头部int level = GetMaxHeight() - 1;                       // 获取层级while (true) {Node* next = x->Next(level);                        // 依次遍历下一级if (KeyIsAfterNode(key, next)) {                    // 检查当前key的大小是否大于next的key大小// Keep searching in this listx = next;                                         // 如果是之后则继续深入} else {if (prev != nullptr) prev[level] = x;             // 如果指向不为空  且当前是最小数据长度  则 设置成头指针if (level == 0) {                                 // 如果为零就返回当前查找到的 否则下一个层级查找return next;                                    } else {// Switch to next listlevel--;}}}
}

在mem中查找的过程其实就和数据插入的过程比较类似。因为imm不可变table查找方式与该流程一样故不再叙述。

从level文件中查找
s = current->Get(options, lkey, value, &stats)

此时current其实就是Version对象,此时就是调用的Version的Get方法;

Status Version::Get(const ReadOptions& options, const LookupKey& k,std::string* value, GetStats* stats) {stats->seek_file = nullptr;                                             // 初始化 stats seek_file为空 查找层级为-1stats->seek_file_level = -1;struct State {                                                          // 定义一个State结构体Saver saver;GetStats* stats;const ReadOptions* options;                                           // 设置选项Slice ikey;FileMetaData* last_file_read;                                         // 设置文件源信息int last_file_read_level;VersionSet* vset;Status s;bool found;static bool Match(void* arg, int level, FileMetaData* f) {            // 匹配方法State* state = reinterpret_cast<State*>(arg);if (state->stats->seek_file == nullptr &&state->last_file_read != nullptr) {// We have had more than one seek for this read.  Charge the 1st file.state->stats->seek_file = state->last_file_read;                        // 设置当前查找值state->stats->seek_file_level = state->last_file_read_level;}state->last_file_read = f;                                                // 设置当前元信息state->last_file_read_level = level;                                      // 设置当前层级state->s = state->vset->table_cache_->Get(*state->options, f->number,f->file_size, state->ikey,&state->saver, SaveValue);      // 查找具体数据 调用table_cache_的Get方法查找if (!state->s.ok()) {                                                     // 如果查找是否 设置已查找 返回Falsestate->found = true;return false;}switch (state->saver.state) {                                             // 状态判断case kNotFound:return true;  // Keep searching in other filescase kFound:state->found = true;                                                  // 如果找到则返回falsereturn false;case kDeleted:return false;                                                         // 如果已删除也返回falsecase kCorrupt:state->s =Status::Corruption("corrupted key for ", state->saver.user_key);state->found = true;return false;}}};State state;                                                    // 初始化 state实例state.found = false;state.stats = stats;state.last_file_read = nullptr;state.last_file_read_level = -1;state.options = &options;state.ikey = k.internal_key();state.vset = vset_;state.saver.state = kNotFound;state.saver.ucmp = vset_->icmp_.user_comparator();state.saver.user_key = k.user_key();state.saver.value = value;ForEachOverlapping(state.saver.user_key, state.ikey, &state, &State::Match);      // 查找keyreturn state.found ? state.s : Status::NotFound(Slice());
}

主要在该方法内部定义了一个State结构体,然后调用ForEachOverlapping方法去具体查找内容;

void Version::ForEachOverlapping(Slice user_key, Slice internal_key, void* arg,bool (*func)(void*, int, FileMetaData*)) {const Comparator* ucmp = vset_->icmp_.user_comparator();                    // 先获取比较方法// Search level-0 in order from newest to oldest.std::vector<FileMetaData*> tmp;                                             // 初始化 一个 列表tmp.reserve(files_[0].size());                                              // 设置为层级为第一层的大小for (uint32_t i = 0; i < files_[0].size(); i++) {                           // 遍历第一层FileMetaData* f = files_[0][i];                                           // 获取元文件信息if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&ucmp->Compare(user_key, f->largest.user_key()) <= 0) {                // 如果当前值比最小的要大 比最大的要小 则压入数据中tmp.push_back(f);}}if (!tmp.empty()) {                                                         // 如果当前查找的不为空std::sort(tmp.begin(), tmp.end(), NewestFirst);                           // 排序该列表 排序规则按照大小排序for (uint32_t i = 0; i < tmp.size(); i++) {if (!(*func)(arg, 0, tmp[i])) {                                         // 遍历当前列表 并执行回调函数 该回调函数就是State中的Match函数如果找到则返回return;}}}// Search other levels.for (int level = 1; level < config::kNumLevels; level++) {                  // 如果在第一层没有找到 开始从第二层开始查找size_t num_files = files_[level].size();                                  // 获取当前层级的大小if (num_files == 0) continue;                                             // 如果当前层级为空则循环下一个// Binary search to find earliest index whose largest key >= internal_key.uint32_t index = FindFile(vset_->icmp_, files_[level], internal_key);     // 查找文件 中是否包含该值if (index < num_files) {                                                  // 如果当前索引值小于 当前层级数FileMetaData* f = files_[level][index];                                 // 获取当前的元信息if (ucmp->Compare(user_key, f->smallest.user_key()) < 0) {              // 编辑是否小于该文件最小的值 如果比最小值要大// All of "f" is past any data for user_key} else {if (!(*func)(arg, level, f)) {                                        // 使用回调函数检查是否找到该值return;}}}}
}

通过该方法可知,首先查找第一层,如果第一层找到了就直接返回,如果第一层没有找到则继续往下层查找,默认最高层数是7,此时查找的过程中,都是先比较每一层的元文件信息,比较该数据是否在该元文件信息之间,如果是之间,则在调用传入的回调函数Match进行精准查找,在Match方法中主要的比较方法如下;

state->s = state->vset->table_cache_->Get(*state->options, f->number,f->file_size, state->ikey,&state->saver, SaveValue);

调用了vset的table_cache_方法中的Get方法;

Status TableCache::Get(const ReadOptions& options, uint64_t file_number,uint64_t file_size, const Slice& k, void* arg,void (*handle_result)(void*, const Slice&,const Slice&)) {Cache::Handle* handle = nullptr;Status s = FindTable(file_number, file_size, &handle);                            // 查找文件if (s.ok()) {Table* t = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;       // 如果找到 则获取tables = t->InternalGet(options, k, arg, handle_result);cache_->Release(handle);}return s;
}

其中传入了SaveValue的回调处理函数,来再次确认找到的值,并将值保存;此时首先调用FindTable方法去查找文件;

Status TableCache::FindTable(uint64_t file_number, uint64_t file_size,Cache::Handle** handle) {Status s;char buf[sizeof(file_number)];                                          // 获取当前的文件大小EncodeFixed64(buf, file_number);Slice key(buf, sizeof(buf));*handle = cache_->Lookup(key);                                          // 调用cache_的Lookup方法来查找该key 使用了LRU算法if (*handle == nullptr) {                                               // 如果没有找到 则新生成std::string fname = TableFileName(dbname_, file_number);              // 新生成一个TableFileName RandomAccessFile* file = nullptr;Table* table = nullptr;s = env_->NewRandomAccessFile(fname, &file);                          // 初始化 检查 该文件是否可用if (!s.ok()) {std::string old_fname = SSTTableFileName(dbname_, file_number);     // 生成一个SSTTableFileName文件实例if (env_->NewRandomAccessFile(old_fname, &file).ok()) {             // 检查是否成功s = Status::OK();}}if (s.ok()) {s = Table::Open(options_, file, file_size, &table);                 // 如果检查成功 则打开该文件}if (!s.ok()) {                                                        // ruguo 打开失败则 重置数据assert(table == nullptr);delete file;// We do not cache error results so that if the error is transient,// or somebody repairs the file, we recover automatically.} else {TableAndFile* tf = new TableAndFile;                                // 新生成一个TableFileName tf->file = file;tf->table = table;*handle = cache_->Insert(key, tf, 1, &DeleteEntry);                 // 在缓存中插入该数据}}return s;
}

主要就是检查输入的文件是否可以打开,并检查模式是否可以,最后将生成的文件加入到缓存中,以便后续查找该值能够更快找到;

Status Table::InternalGet(const ReadOptions& options, const Slice& k, void* arg,void (*handle_result)(void*, const Slice&,const Slice&)) {Status s;Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator);       // 设置迭代器iiter->Seek(k);                                                                   // 查找该key if (iiter->Valid()) {                                                             // 如果找到Slice handle_value = iiter->value();                                            // 设置该值FilterBlockReader* filter = rep_->filter;BlockHandle handle;if (filter != nullptr && handle.DecodeFrom(&handle_value).ok() &&!filter->KeyMayMatch(handle.offset(), k)) {                                 // 通过过滤器来检查该值是否合法// Not found} else {Iterator* block_iter = BlockReader(this, options, iiter->value());            // 设置一个迭代器    block_iter->Seek(k);                                                          // 查找该值if (block_iter->Valid()) {(*handle_result)(arg, block_iter->key(), block_iter->value());              // 调用回调函数处理}s = block_iter->status();                                                     // 获取状态delete block_iter;}}if (s.ok()) {s = iiter->status();}delete iiter;return s;
}

该方法主要就是查找值,通过多层次的iter迭代器的包装,主要是为了加入其它的如加入缓存,或者注册相关的处理事件,所以导致BlockReader和rep_->index_block->NewIterator多次检查了待查找的值;

Iterator* Table::BlockReader(void* arg, const ReadOptions& options,const Slice& index_value) {Table* table = reinterpret_cast<Table*>(arg);                 // 获取table Cache* block_cache = table->rep_->options.block_cache;        // 获取cache  Block* block = nullptr;Cache::Handle* cache_handle = nullptr;BlockHandle handle;Slice input = index_value;Status s = handle.DecodeFrom(&input);// We intentionally allow extra stuff in index_value so that we// can add more features in the future.if (s.ok()) {BlockContents contents;if (block_cache != nullptr) {char cache_key_buffer[16];EncodeFixed64(cache_key_buffer, table->rep_->cache_id);             // 获取缓存的值EncodeFixed64(cache_key_buffer + 8, handle.offset());               // 获取八个偏移内容Slice key(cache_key_buffer, sizeof(cache_key_buffer));cache_handle = block_cache->Lookup(key);                            // 查找该值if (cache_handle != nullptr) {block = reinterpret_cast<Block*>(block_cache->Value(cache_handle));     //如果找到则 设置} else {s = ReadBlock(table->rep_->file, options, handle, &contents);           // 新生成一个if (s.ok()) {block = new Block(contents);                                          // 新生成一个blockif (contents.cachable && options.fill_cache) {cache_handle = block_cache->Insert(key, block, block->size(),&DeleteCachedBlock);             // 在缓存中插入}}}} else {s = ReadBlock(table->rep_->file, options, handle, &contents);if (s.ok()) {block = new Block(contents);}}}Iterator* iter;                                                                 // 设置迭代器if (block != nullptr) {iter = block->NewIterator(table->rep_->options.comparator);                   // 生成一个默认的迭代器if (cache_handle == nullptr) {iter->RegisterCleanup(&DeleteBlock, block, nullptr);                        // 注册一个删除 列表 等到执行完成后删除} else {iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle);}} else {iter = NewErrorIterator(s);}return iter;
}

至此,在层级文件中的查找流程基本完成,从流程可以看出在层级查找的过程中,机制更为复杂,设置了更多的缓存与检查机制。

总结

本文主要是讲述了leveldb数据的获取流程,当获取数据的时候可能会出现从当前内存中的memtable中获取或者是不可变immtable中获取,如果两者都获取不到,则去层级文件中去查找,在层级文件中查找,还需要确定查找的内容是属于哪一层级,然后通过添加缓存等方式,来提高读的性能,然后注册相应的回调机制来保证数据的流程的高效。由于本人才疏学浅,如有错误请批评指正。

leveldb源码分析:数据查询相关推荐

  1. Leveldb源码分析--1

    [前言:看了一点oceanbase,没有意志力继续坚持下去了,暂时就此中断,基本上算把master看完了,比较重要的update server和merge server代码却没有细看.中间又陆续研究了 ...

  2. 【VUE】源码分析 - 数据劫持的基本原理

    tips:本系列博客的代码部分(示例等除外),均出自vue源码内容,版本为2.6.14.但是为了增加易读性,会对不相关内容做选择性省略.如果大家想了解完整的源码,建议自行从官方下载.https://g ...

  3. leveldb源码分析:数据插入续(跳表)

    leveldb数据的插入-跳表 本文主要是接着上一篇文章,继续深入探索Write函数调用插入之后的流程. status = WriteBatchInternal::InsertInto(updates ...

  4. leveldb源码分析:Open启动流程

    leveldb概述 Leveldb 是一个持久化的KV存储系统,主要将大部分数据存储在磁盘上,在存储数据的过程中,根据记录的key值有序存储,当然使用者也可以自定义Key大小比较函数,一个leveld ...

  5. android+小米文件管理器源码,[MediaStore]小米文件管理器android版源码分析——数据来源...

    打开小米的文件管理器,我们很快会看到如下图所示的界面: 其中,会把各种文件分类显示.并且显示出每种文件的个数. 这是怎么做到的呢?当然不是每次启动都查询sdcard和应用程序data目录文件啦,那样实 ...

  6. 风讯dotNETCMS源码分析—数据存取篇

    前几天突然对CMS感兴趣,就去下载了风讯dotNETCMS源码.当前版本是dotnetcms1.0 sp5免费版,风讯的官方主页上可以下载. 用Visual Studio 2008打开后,初步分析了它 ...

  7. Nginx源码分析--数据对齐posix_memalign和memalign函数

    posix_memalign函数() /*  * 背景:  *      1)POSIX 1003.1d  *      2)POSIX 标明了通过malloc( ), calloc( ), 和 re ...

  8. leveldb源码分析:数据插入与删除(Put与Delete)

    leveldb数据的插入与获取 leveldb提供的数据的交互接口如下: // Set the database entry for "key" to "value&qu ...

  9. LevelDB 源码分析

    本文基于leveldb 1.9.0代码. 整体架构 如上图,leveldb的数据存储在内存以及磁盘上,其中: memtable:存储在内存中的数据,使用skiplist实现. immutable me ...

最新文章

  1. delphi编程模拟发送QQ2008消息!
  2. Mysql 常用函数汇总
  3. LeetCode单链表题目测试代码(只需添加对应题目,本地即可debug)
  4. Java-Character String StringBuffer StringBuilder
  5. Image Control
  6. js 函数实参列表arguments和形参的那点事儿
  7. c语言中变量的大小,C语言变量定义
  8. 陈丽琳:如何以大数据助力商场运营
  9. 编程指南_halide编程技术指南(连载一)
  10. 每个java小应用程序必须是,每个Java小应用程序必须定义为()。 A.Applet类或JApplet类的子类B.JFrame类的子类...
  11. 全国计算机二级c语言题库,计算机二级c语言题库及答案
  12. Python实现简单命令行版《中国象棋》不使用第三方库
  13. ArcGis中计算栅格数据指定区域的面积
  14. easyui combobox设置只能选择下拉
  15. 人生杂感随笔-观佛教与道教(六道轮回)
  16. java中override快捷键_Java高级应用简笔
  17. 【学习笔记】特殊数论函数求和
  18. Java操作桌面应用 --- Desktop 类
  19. 怎样获得手机的外网ip???
  20. Python绘制图像(Matplotlib)(Ⅵ)

热门文章

  1. 基于聚类的图像分割(Python)
  2. 芯片开发语言:Verilog 在左,Chisel 在右
  3. 作为西二旗程序员,我是这样学习的.........
  4. 滴滴自动驾驶部门成立独立公司,CTO张博兼任新公司CEO
  5. Facebook 的AI翻身之战!
  6. 1024块TPU在燃烧!BERT训练从3天缩短到76分钟 | 技术头条
  7. 详解 | Dropout为何能防止过拟合?
  8. 帅气逼人的Redis可视化工具
  9. 面试问了这两个问题,很多人的回答都自相矛盾
  10. Redis是如何实现点赞、取消点赞的?