leveldb数据的插入-跳表

本文主要是接着上一篇文章,继续深入探索Write函数调用插入之后的流程。

status = WriteBatchInternal::InsertInto(updates, mem_);

InsertInto插入数据函数

namespace {
class MemTableInserter : public WriteBatch::Handler {                     // MemTable插入类public:SequenceNumber sequence_;MemTable* mem_;void Put(const Slice& key, const Slice& value) override {               // 添加内容mem_->Add(sequence_, kTypeValue, key, value);                         // 添加序列号 插入类型  key  valuesequence_++;}void Delete(const Slice& key) override {mem_->Add(sequence_, kTypeDeletion, key, Slice());                    // 添加内容 序列号  删除类型  key 空的valuesequence_++;}
};
}  // namespaceStatus WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {MemTableInserter inserter;inserter.sequence_ = WriteBatchInternal::Sequence(b);                   // 先获取序列号inserter.mem_ = memtable;                                               // 设置memtabereturn b->Iterate(&inserter);                                           // 迭代插入
}

可以得知,真正的插入数据的操作是在调用InsertInto函数,将序列化好的数据设置到inserter的sequence_属性中,传入当前的memtable,此时就调用WriteBatch的Iterate方法,来插入数据。

Status WriteBatch::Iterate(Handler* handler) const {                    // 迭代器Slice input(rep_);if (input.size() < kHeader) {                                         // 如果输入的大小小于头部信息的大小 则太小了return Status::Corruption("malformed WriteBatch (too small)");}input.remove_prefix(kHeader);                                         // 移除头部Slice key, value;int found = 0;while (!input.empty()) {                                              // 检查是否为空found++;char tag = input[0];                                                // 获取当前的taginput.remove_prefix(1);                                             // 移除一个该位switch (tag) {                                                      // 检查该tag是Put还是Deletecase kTypeValue:                                                  // 如果是添加if (GetLengthPrefixedSlice(&input, &key) &&                       GetLengthPrefixedSlice(&input, &value)) {                   // 分别获取key 和 valuehandler->Put(key, value);                                     //  调用handler去添加} else {return Status::Corruption("bad WriteBatch Put");}break;case kTypeDeletion:                                               // 如果是删除if (GetLengthPrefixedSlice(&input, &key)) {                     // 获取对应的keyhandler->Delete(key);                                         // 调用handle的删除方法} else {return Status::Corruption("bad WriteBatch Delete");}break;default:return Status::Corruption("unknown WriteBatch tag");            // 如果tag不对则 返回错误}}if (found != WriteBatchInternal::Count(this)) {                       // 检查查找到的与当前数据保存的数据是否相同 return Status::Corruption("WriteBatch has wrong count");} else {return Status::OK();                                                // 返回成功}
}

此时就调用了迭代的方法来插入数据,此时从执行流程可知,先检查头部信息,检查完成头部信息之后,然后再检查该数据的标志位,调用handler的Put或者Delete方法。

Status WriteBatch::Iterate(Handler* handler) const {                    // 迭代器Slice input(rep_);if (input.size() < kHeader) {                                         // 如果输入的大小小于头部信息的大小 则太小了return Status::Corruption("malformed WriteBatch (too small)");}input.remove_prefix(kHeader);                                         // 移除头部Slice key, value;int found = 0;while (!input.empty()) {                                              // 检查是否为空found++;char tag = input[0];                                                // 获取当前的taginput.remove_prefix(1);                                             // 移除一个该位switch (tag) {                                                      // 检查该tag是Put还是Deletecase kTypeValue:                                                  // 如果是添加if (GetLengthPrefixedSlice(&input, &key) &&                       GetLengthPrefixedSlice(&input, &value)) {                   // 分别获取key 和 valuehandler->Put(key, value);                                     //  调用handler去添加} else {return Status::Corruption("bad WriteBatch Put");}break;case kTypeDeletion:                                               // 如果是删除if (GetLengthPrefixedSlice(&input, &key)) {                     // 获取对应的keyhandler->Delete(key);                                         // 调用handle的删除方法} else {return Status::Corruption("bad WriteBatch Delete");}break;default:return Status::Corruption("unknown WriteBatch tag");            // 如果tag不对则 返回错误}}if (found != WriteBatchInternal::Count(this)) {                       // 检查查找到的与当前数据保存的数据是否相同 return Status::Corruption("WriteBatch has wrong count");} else {return Status::OK();                                                // 返回成功}
}

此时执行的handler就是MemTableInserter的实例,并调用该Put和Delete方法;

void Put(const Slice& key, const Slice& value) override {               // 添加内容mem_->Add(sequence_, kTypeValue, key, value);                         // 添加序列号 插入类型  key  valuesequence_++;}void Delete(const Slice& key) override {mem_->Add(sequence_, kTypeDeletion, key, Slice());                    // 添加内容 序列号  删除类型  key 空的valuesequence_++;}

此时就是调用了mem_的Add方法,只不过就是利用了不同的Type来标记是新增数据还是删除数据;此时查看MemTable相关内容

MemTable细节相关

MemTable就是内存中保存的数据,当内存数据规模达到阈值时,就会将内存数据写入到文件中,此时先查看Add方法。

mem_->Add方法
void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,const Slice& value) {// Format of an entry is concatenation of://  key_size     : varint32 of internal_key.size()//  key bytes    : char[internal_key.size()]//  value_size   : varint32 of value.size()//  value bytes  : char[value.size()]size_t key_size = key.size();                                               // 获取key大小size_t val_size = value.size();                                             // 获取value大小size_t internal_key_size = key_size + 8;                                    // 头部加上8个字节大小 表示是添加还是删除const size_t encoded_len = VarintLength(internal_key_size) +internal_key_size + VarintLength(val_size) +val_size;                                        // 包括保存数据的大小 即既保持数据又保存数据大小char* buf = arena_.Allocate(encoded_len);                                   // 申请内存char* p = EncodeVarint32(buf, internal_key_size);                           // 转换成字符偏移memcpy(p, key.data(), key_size);                                            // 拷贝数据到指针指向的位置p += key_size;EncodeFixed64(p, (s << 8) | type);                                          // 将类型大小存入p += 8;p = EncodeVarint32(p, val_size);                                            // 转换value字节大小memcpy(p, value.data(), val_size);                                          // 将数据拷贝到指定位置处assert(p + val_size == buf + encoded_len);table_.Insert(buf);                                                         // 此时就就将内容填充到buf处 调用table插入
}

此时主要就是将数据转换为buf,并调用table插入。数据格式如下;

SkipList跳表

在上一节中,最后调用了table_.Insert函数插入数据,此时的table定义如下;

  typedef SkipList<const char*, KeyComparator> Table;Table table_;

此时定义的就是SkipList的Insert方法,在MemTable初始化过程中;

MemTable::MemTable(const InternalKeyComparator& comparator): comparator_(comparator), refs_(0), table_(comparator_, &arena_) {}# DBImpl初始化MemTable
mem = new MemTable(internal_comparator_)

此时可知初始化table_的参数来源一个来自于DBImpl中的internal_comparator_,区域则来自于申请的内存地址。

此时查看SkipList的初始化过程;

template <typename Key, class Comparator>
SkipList<Key, Comparator>::SkipList(Comparator cmp, Arena* arena): compare_(cmp),arena_(arena),head_(NewNode(0 /* any key will do */, kMaxHeight)),max_height_(1),rnd_(0xdeadbeef) {for (int i = 0; i < kMaxHeight; i++) {head_->SetNext(i, nullptr);}
}

在初始化过程中,会初始化一个头部节点,然后初始化对应长度(默认是12)的链表,让列表中的数据都为空。

跳表插入数据
template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()// here since Insert() is externally synchronized.Node* prev[kMaxHeight];                                                 // 数组Node* x = FindGreaterOrEqual(key, prev);                                // 查找或者创建// Our data structure does not allow duplicate insertionassert(x == nullptr || !Equal(key, x->key));int height = RandomHeight();                                            // 获取随机的heightif (height > GetMaxHeight()) {                                          // 如果获取的值比当前保存的值大for (int i = GetMaxHeight(); i < height; i++) {                       // 遍历循环prev[i] = head_;                                                    // 将对应的头部数据设置为head_}// It is ok to mutate max_height_ without any synchronization// with concurrent readers.  A concurrent reader that observes// the new value of max_height_ will see either the old value of// new level pointers from head_ (nullptr), or a new value set in// the loop below.  In the former case the reader will// immediately drop to the next level since nullptr sorts after all// keys.  In the latter case the reader will use the new node.max_height_.store(height, std::memory_order_relaxed);                 // 修改当前的height值 原子修改}x = NewNode(key, height);                                               // 生成一个节点for (int i = 0; i < height; i++) {                                      // 遍历当前列表// NoBarrier_SetNext() suffices since we will add a barrier when// we publish a pointer to "x" in prev[i].x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));                  prev[i]->SetNext(i, x);                                               // 插入该节点}
}

此时的执行过程,首先会FindGreaterOrEqual查找当前的该函数主要是将当前的key遍历列表查找一个比该key晓得列表,如果没有则创建一个,这样是数据格式以大小来排序。然后再就是设置到当的数据到跳表中。

template <typename Key, class Comparator>
bool SkipList<Key, Comparator>::KeyIsAfterNode(const Key& key, Node* n) const {// null n is considered infinitereturn (n != nullptr) && (compare_(n->key, key) < 0);       // 比较key的大小 如果传入的n 不为空指针, 并且传入的长度值 小于 当前传入的key值
}template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key,Node** prev) const {Node* x = head_;                                      // 获取头部int level = GetMaxHeight() - 1;                       // 获取层级while (true) {Node* next = x->Next(level);                        // 依次遍历下一级if (KeyIsAfterNode(key, next)) {                    // 检查当前key的大小是否大于next的key大小// Keep searching in this listx = next;                                         // 如果是之后则继续深入} else {if (prev != nullptr) prev[level] = x;             // 如果指向不为空  且当前是最小数据长度  则 设置成头指针if (level == 0) {                                 // 如果为零就返回当前查找到的 否则下一个层级查找return next;                                    } else {// Switch to next listlevel--;}}}
}

其中compare_在默认情况下,其实调用的是Slice的比较函数;

inline int Slice::compare(const Slice& b) const {const size_t min_len = (size_ < b.size_) ? size_ : b.size_;int r = memcmp(data_, b.data_, min_len);if (r == 0) {if (size_ < b.size_)r = -1;else if (size_ > b.size_)r = +1;}return r;
}

此时就将数据按照长度大小插入到了跳表中。有关跳表的基本内容大家可自行查阅。

跳表的插入概述

初始如下

此时插入3.5:c之后

总结

本文主要是继续分析了数据插入到最后,将数据插入到跳表中的基本过程,源码相对查看的数据流程相对较繁琐,只要思路理解,就大致能理解数据整个的插入过程,至此插入与删除流程就分析完成。由于本人才疏学浅,如有错误请批评指正。

leveldb源码分析:数据插入续(跳表)相关推荐

  1. Leveldb源码分析--1

    [前言:看了一点oceanbase,没有意志力继续坚持下去了,暂时就此中断,基本上算把master看完了,比较重要的update server和merge server代码却没有细看.中间又陆续研究了 ...

  2. 【VUE】源码分析 - 数据劫持的基本原理

    tips:本系列博客的代码部分(示例等除外),均出自vue源码内容,版本为2.6.14.但是为了增加易读性,会对不相关内容做选择性省略.如果大家想了解完整的源码,建议自行从官方下载.https://g ...

  3. Django源码分析9:model.py表结构的初始化概述

    django源码分析 本文环境python3.5.2,django1.10.x系列 django源码分析-model概述 Django项目中提供了内置的orm框架,只需要在models.py文件中添加 ...

  4. JNI实现源码分析【三 间接引用表】

    在JNI实现源码分析[二 数据结构]的参数传递一节中,我们提到,JNI为了安全性的考虑使用了形如jobject的结构来传递参数.而jobject被表述为指针,但又不是直接指向Object的指针那么jo ...

  5. Redis源码剖析(十一)跳表

    在树形结构中,常见的平衡树有AVL树和红黑树,但是由于AVL树过于平衡,导致维护平衡所需的代价过大,使用的不多,不过其中几种旋转算法还是值得学习的.取而代之的是较为平衡的红黑树,STL中的map和se ...

  6. leveldb源码分析:数据查询

    leveldb数据查询 查询的示例代码如下: string res; status = db->Get(ReadOptions(), "KeyNameExample", &a ...

  7. leveldb源码分析:Open启动流程

    leveldb概述 Leveldb 是一个持久化的KV存储系统,主要将大部分数据存储在磁盘上,在存储数据的过程中,根据记录的key值有序存储,当然使用者也可以自定义Key大小比较函数,一个leveld ...

  8. LevelDB 源码分析

    本文基于leveldb 1.9.0代码. 整体架构 如上图,leveldb的数据存储在内存以及磁盘上,其中: memtable:存储在内存中的数据,使用skiplist实现. immutable me ...

  9. Nginx源码分析--数据对齐posix_memalign和memalign函数

    posix_memalign函数() /*  * 背景:  *      1)POSIX 1003.1d  *      2)POSIX 标明了通过malloc( ), calloc( ), 和 re ...

最新文章

  1. Windows系统安全管理
  2. union与struct的区别?
  3. 蚂蚁动态卡片,让App首页实现敏捷更新
  4. 取消Conda每次创建环境时默认下载的依赖包
  5. Qt文档阅读笔记-共享库的创建与调用
  6. 计算机英语input,人教版高中英语选修计算机英语VoiceInput.ppt
  7. 485通信自动收发数据实现
  8. InfluxDB简介,InfluxDB的基本操作
  9. 2018linux市场份额数据,2018年7月Windows 10市场份额上涨,Linux仅占1.35%
  10. 互联网日报 | 1月30日 星期六 | 苹果单季营收首破1000亿美元;特斯拉连续六个季度盈利;全球新冠肺炎确诊病例超1亿例...
  11. 山东科技大学计算机篮球,山东科技大学第十七届学生男子篮球赛开幕
  12. demo h5 touch 移动_移动端Touch事件与H5-Canvas像素点检测实现刮刮乐
  13. 交流电中为什么要用相量法?
  14. android 转场动画 监听,Android 中的转场动画及兼容处理
  15. 做一个快乐的程序员,去感受爱
  16. paper reading——《Improving Person Re-identification by Attribute and Identity Learning》
  17. 电脑辐射,电脑辐射危害大 五妙招正确防辐射
  18. 国家推行电子货币见解
  19. 网络传输中的数据长度
  20. 计算机基本结构quiz

热门文章

  1. 推荐 6 个好用到爆的 Pycharm 插件
  2. 7000 字精华总结,Pandas/Sklearn 进行机器学习之特征筛选,有效提升模型性能
  3. 加速产业生态算力升级,华为鲲鹏展翅福州
  4. Python画出心目中的自己
  5. 号称3个月发布最强量子计算机,卖口罩的霍尼韦尔凭什么?
  6. 力挺Python!同是程序员,为啥同事年前就实现了财务自由?
  7. ICLR 2020论文投稿2600篇,GNN、BERT、Transformer领跑热门研究方向
  8. AI一分钟 | 蔚来赴美IPO,开盘跌破发行价;TensorFlow开源新库TFDV
  9. python语音识别终极指南
  10. 李彦宏成为首登《时代周刊》的互联网大佬,百度研究院再添三名大牛