leveldb源码分析:数据插入与删除(Put与Delete)
leveldb数据的插入与获取
leveldb提供的数据的交互接口如下;
// Set the database entry for "key" to "value". Returns OK on success,// and a non-OK status on error.// Note: consider setting options.sync = true.virtual Status Put(const WriteOptions& options, const Slice& key,const Slice& value) = 0;// Remove the database entry (if any) for "key". Returns OK on// success, and a non-OK status on error. It is not an error if "key"// did not exist in the database.// Note: consider setting options.sync = true.virtual Status Delete(const WriteOptions& options, const Slice& key) = 0;// Apply the specified updates to the database.// Returns OK on success, non-OK on failure.// Note: consider setting options.sync = true.virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0;// If the database contains an entry for "key" store the// corresponding value in *value and return OK.//// If there is no entry for "key" leave *value unchanged and return// a status for which Status::IsNotFound() returns true.//// May return some other Status on an error.virtual Status Get(const ReadOptions& options, const Slice& key,std::string* value) = 0;
主要提供了Write,Delete,Get和Put等操作接口,本文就来分析一下这些操作的具体流程。
Put和Delete操作
概述说明,在上一篇打开的流程过程中,调用了DB::Open的函数,该函数在完成打开操作之后,初始化了一个db,是初始化本质是一个DBImpl实现了DB接口的子类,所以在调用DB->Get或者DB->Put的时候其实是调用了DBImpl的对应方法,而DBImpl则直接类似与如下形式;
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {return DB::Put(o, key, val);
}
直接就调用了父类的DB的静态方法Put函数,同理我们查看Delete函数;
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {WriteBatch batch;batch.Put(key, value);return Write(opt, &batch);
}Status DB::Delete(const WriteOptions& opt, const Slice& key) {WriteBatch batch;batch.Delete(key);return Write(opt, &batch);
}
从代码执行过程可以看出,都是先通过一个WriteBatch来先保存操作的流程,然后调用Write方法,将对应的batch去执行。
WriteBatch类
class LEVELDB_EXPORT WriteBatch {public:class LEVELDB_EXPORT Handler {public:virtual ~Handler();virtual void Put(const Slice& key, const Slice& value) = 0;virtual void Delete(const Slice& key) = 0;};WriteBatch();// Intentionally copyable.WriteBatch(const WriteBatch&) = default;WriteBatch& operator=(const WriteBatch&) = default;~WriteBatch();// Store the mapping "key->value" in the database.void Put(const Slice& key, const Slice& value); // 添加// If the database contains a mapping for "key", erase it. Else do nothing.void Delete(const Slice& key); // 删除// Clear all updates buffered in this batch.void Clear();// The size of the database changes caused by this batch.//// This number is tied to implementation details, and may change across// releases. It is intended for LevelDB usage metrics.size_t ApproximateSize() const;// Copies the operations in "source" to this batch.//// This runs in O(source size) time. However, the constant factor is better// than calling Iterate() over the source batch with a Handler that replicates// the operations into this batch.void Append(const WriteBatch& source); // 追加// Support for iterating over the contents of a batch.Status Iterate(Handler* handler) const;private:friend class WriteBatchInternal;std::string rep_; // See comment in write_batch.cc for the format of rep_
};} // namespace leveldb
该类的具体实现如下;
static const size_t kHeader = 12;WriteBatch::WriteBatch() { Clear(); }WriteBatch::~WriteBatch() = default;WriteBatch::Handler::~Handler() = default;void WriteBatch::Clear() {rep_.clear(); // 清理 rep_.resize(kHeader);
}size_t WriteBatch::ApproximateSize() const { return rep_.size(); } // 返回rep_的字符串的大小Status WriteBatch::Iterate(Handler* handler) const { // 迭代器Slice input(rep_);if (input.size() < kHeader) { // 如果输入的大小小于头部信息的大小 则太小了return Status::Corruption("malformed WriteBatch (too small)");}input.remove_prefix(kHeader); // 移除头部Slice key, value;int found = 0;while (!input.empty()) { // 检查是否为空found++;char tag = input[0]; // 获取当前的taginput.remove_prefix(1); // 移除一个该位switch (tag) { // 检查该tag是Put还是Deletecase kTypeValue: // 如果是添加if (GetLengthPrefixedSlice(&input, &key) && GetLengthPrefixedSlice(&input, &value)) { // 分别获取key 和 valuehandler->Put(key, value); // 调用handler去添加} else {return Status::Corruption("bad WriteBatch Put");}break;case kTypeDeletion: // 如果是删除if (GetLengthPrefixedSlice(&input, &key)) { // 获取对应的keyhandler->Delete(key); // 调用handle的删除方法} else {return Status::Corruption("bad WriteBatch Delete");}break;default:return Status::Corruption("unknown WriteBatch tag"); // 如果tag不对则 返回错误}}if (found != WriteBatchInternal::Count(this)) { // 检查查找到的与当前数据保存的数据是否相同 return Status::Corruption("WriteBatch has wrong count");} else {return Status::OK(); // 返回成功}
}int WriteBatchInternal::Count(const WriteBatch* b) {return DecodeFixed32(b->rep_.data() + 8); // 获取大小
}void WriteBatchInternal::SetCount(WriteBatch* b, int n) {EncodeFixed32(&b->rep_[8], n); // 设置count
}SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {return SequenceNumber(DecodeFixed64(b->rep_.data())); // 获取序列号
}void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {EncodeFixed64(&b->rep_[0], seq); // 设置序列号
}void WriteBatch::Put(const Slice& key, const Slice& value) { // 插入数据WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); // 设置countrep_.push_back(static_cast<char>(kTypeValue)); // 压入类型数据PutLengthPrefixedSlice(&rep_, key); // 设置数据PutLengthPrefixedSlice(&rep_, value); // 设置value
}void WriteBatch::Delete(const Slice& key) {WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); // 设置计数rep_.push_back(static_cast<char>(kTypeDeletion)); // 压入类型数据PutLengthPrefixedSlice(&rep_, key); // 压入数据
}void WriteBatch::Append(const WriteBatch& source) {WriteBatchInternal::Append(this, &source); // 调用WriteBatchInternal的append函数
}namespace {
class MemTableInserter : public WriteBatch::Handler { // MemTable插入类public:SequenceNumber sequence_;MemTable* mem_;void Put(const Slice& key, const Slice& value) override { // 添加内容mem_->Add(sequence_, kTypeValue, key, value); // 添加序列号 插入类型 key valuesequence_++;}void Delete(const Slice& key) override {mem_->Add(sequence_, kTypeDeletion, key, Slice()); // 添加内容 序列号 删除类型 key 空的valuesequence_++;}
};
} // namespaceStatus WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {MemTableInserter inserter;inserter.sequence_ = WriteBatchInternal::Sequence(b); // 先获取序列号inserter.mem_ = memtable; // 设置memtabereturn b->Iterate(&inserter); // 迭代插入
}void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {assert(contents.size() >= kHeader);b->rep_.assign(contents.data(), contents.size()); // 重置内容为content的内容,并且设置内容大小
}void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {SetCount(dst, Count(dst) + Count(src));assert(src->rep_.size() >= kHeader);dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader); // 追加 内容加头部信息 大小要减去头部信息
}} // namespace leveldb
该类主要就是包括了对memtable的追加删除等操作,基本上都暴露了对外提供操作的接口。
Write函数
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {Writer w(&mutex_); // 初始化一个Writer w.batch = updates; // 保存对应的WriteBatchw.sync = options.sync; // 是否是同步写入w.done = false; // 是否已经完成MutexLock l(&mutex_); // 初始化线程锁writers_.push_back(&w); // 添加到队列中while (!w.done && &w != writers_.front()) { // 如果当前的任务还没有完成 并且当前的任务不是w 则等待w.cv.Wait();}if (w.done) { // 如果任务已经完成则返回状态否则就继续执行return w.status;}// May temporarily unlock and wait.Status status = MakeRoomForWrite(updates == nullptr); // 获取可以写入的空间uint64_t last_sequence = versions_->LastSequence(); // 获取最后的序列号Writer* last_writer = &w;if (status.ok() && updates != nullptr) { // nullptr batch is for compactions // 有空间可用并且update有内容输入WriteBatch* updates = BuildBatchGroup(&last_writer); // 获取当前要操作的WriteBatchWriteBatchInternal::SetSequence(updates, last_sequence + 1); // 设置序列号last_sequence += WriteBatchInternal::Count(updates); // 序列号加上当前执行完的大小// Add to log and apply to memtable. We can release the lock// during this phase since &w is currently responsible for logging// and protects against concurrent loggers and concurrent writes// into mem_.{mutex_.Unlock(); // 获取锁status = log_->AddRecord(WriteBatchInternal::Contents(updates)); // 将内容添加到当前的日志中bool sync_error = false;if (status.ok() && options.sync) { // 如果添加成功 并且需要同步写入status = logfile_->Sync(); // 调用同步写入的函数if (!status.ok()) { // 如果同步写入失败sync_error = true; // 设置同步写入失败的标志}}if (status.ok()) { // 如果成功status = WriteBatchInternal::InsertInto(updates, mem_); // 将内容插入到memTable中}mutex_.Lock(); // 释放if (sync_error) { // 如果同步失败 则记录失败的状态// The state of the log file is indeterminate: the log record we// just added may or may not show up when the DB is re-opened.// So we force the DB into a mode where all future writes fail.RecordBackgroundError(status);}}if (updates == tmp_batch_) tmp_batch_->Clear(); // 如果update与临时的batch相同则释放tmp_batchversions_->SetLastSequence(last_sequence); // 插入最新的序列号}while (true) { // 遍历writersWriter* ready = writers_.front();writers_.pop_front(); // 删除第一个if (ready != &w) { // 如果当前的第一个不等于wready->status = status; // 获取当前的状态ready->done = true; // 设置为doneready->cv.Signal(); // 唤醒其他等待的线程}if (ready == last_writer) break; // 如果等于当前的就停止}// Notify new head of write queueif (!writers_.empty()) { // 检查当前的writers队列是否为空 如果不为空则 唤醒剩余等待要执行的线程writers_.front()->cv.Signal();}return status; // 返回状态
}
Write函数主要就是讲数据封装成一个writer,然后将该writer压入一个队列中,如果队列中还有未完成的操作则进入等待,因为可以将多个操作压缩在一起执行,如果队列中没有其他数据或者阻塞的队列被唤醒,则先检查writer是否被执行完成,因为有可能压入队列的数据被批量执行完成,如果没有被完成,则首先去检查当前是否还有空间可用(MakeRoomForWrite),如果有内容并且update有数据可更新则将更新重新用BuildBatchGroup包装一下将多个操作数据压缩在一起,然后先将要操作的数据追加到日志中AddRecord,然后再添加通过InsertInto添加到MemTable中,基本执行逻辑如上所述。
MakeRoomForWrite检查空间
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(bool force) {mutex_.AssertHeld(); // 检查是否获取锁assert(!writers_.empty());bool allow_delay = !force;Status s;while (true) {if (!bg_error_.ok()) { // 如果出错则直接停止并设置出错状态并返回// Yield previous errors = bg_error_;break;} else if (allow_delay && versions_->NumLevelFiles(0) >=config::kL0_SlowdownWritesTrigger) { // 是否可以延迟 并检查延迟触发的时间是否大于配置值// We are getting close to hitting a hard limit on the number of// L0 files. Rather than delaying a single write by several// seconds when we hit the hard limit, start delaying each// individual write by 1ms to reduce latency variance. Also,// this delay hands over some CPU to the compaction thread in// case it is sharing the same core as the writer.mutex_.Unlock();env_->SleepForMicroseconds(1000); // 休息一秒 allow_delay = false; // Do not delay a single write more than once // 只能休眠一次mutex_.Lock(); } else if (!force &&(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { // 如果不是强制使用新的并且 当前mem可用的内存大小 小于等于待写的数据内容大小则证明有空间可用则终止循环并返回// There is room in current memtablebreak;} else if (imm_ != nullptr) { // 如果不为空// We have filled up the current memtable, but the previous// one is still being compacted, so we wait.Log(options_.info_log, "Current memtable full; waiting...\n");background_work_finished_signal_.Wait(); // 等待数据落盘之后被唤醒} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {// There are too many level-0 files.Log(options_.info_log, "Too many L0 files; waiting...\n"); // 如果太多了第0层文件则等待background_work_finished_signal_.Wait();} else {// Attempt to switch to a new memtable and trigger compaction of oldassert(versions_->PrevLogNumber() == 0); // 判断之前的日志数为0uint64_t new_log_number = versions_->NewFileNumber(); // 获取一个新的文件WritableFile* lfile = nullptr;s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile); // 设置一个新的log写文件if (!s.ok()) { // 如果出错// Avoid chewing through file number space in a tight loop.versions_->ReuseFileNumber(new_log_number); // 重新使用该文件号break;}delete log_;delete logfile_;logfile_ = lfile;logfile_number_ = new_log_number; // 设置文件并这只文件编号log_ = new log::Writer(lfile); // 生成一个log实例imm_ = mem_; // 获取旧的mem内容has_imm_.store(true, std::memory_order_release); // 保存该数据mem_ = new MemTable(internal_comparator_); // 申请一个新的memtablemem_->Ref();force = false; // Do not force another compaction if have roomMaybeScheduleCompaction(); // 将就数据调度落盘}}return s;
}
检查是否还有空间,如果空间不够则重新生成新的memtable空间来装载数据。
BuildBatchGroup合并操作内容
// REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-null batch
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {mutex_.AssertHeld();assert(!writers_.empty());Writer* first = writers_.front(); // 获取第一个WriteBatch* result = first->batch; // 获取第一个的batchassert(result != nullptr);size_t size = WriteBatchInternal::ByteSize(first->batch); // 获取操作的数据大小// Allow the group to grow up to a maximum size, but if the// original write is small, limit the growth so we do not slow// down the small write too much.size_t max_size = 1 << 20; // 获取最大的字节数if (size <= (128 << 10)) {max_size = size + (128 << 10);}*last_writer = first;std::deque<Writer*>::iterator iter = writers_.begin(); // 获取迭代器++iter; // Advance past "first"for (; iter != writers_.end(); ++iter) { // 循环数据Writer* w = *iter;if (w->sync && !first->sync) { // 检查当前的sync是否与第一个的sync一直 即如果w为同步 而first未异步则停止// Do not include a sync write into a batch handled by a non-sync write.break;}if (w->batch != nullptr) {size += WriteBatchInternal::ByteSize(w->batch); // 获取插入数据大小if (size > max_size) { // 如果超过最大值则停止// Do not make batch too bigbreak; }// Append to *resultif (result == first->batch) { // 如果result 与第一个的batch相同// Switch to temporary batch instead of disturbing caller's batchresult = tmp_batch_;assert(WriteBatchInternal::Count(result) == 0); WriteBatchInternal::Append(result, first->batch); // 追加数据}WriteBatchInternal::Append(result, w->batch); // 追加内容}*last_writer = w; // 重置最后一个继续循环}return result; // 返回结果
}
主要就是合并对应的操作数据,将相同的数据进行append,以此可以提高单次的操作效率。
总结
有关数据的基本的Put和Delete的流程,基本上就是将数据通过WriteBatch类,将插入操作和写入操作都做成了write的操作,以此提高了写入的效率,让在删除的时候直接以类型的形式去添加到数据中,本文只是从基本代码流程上分析了Put和Delete对应的操作。由于本人才疏学浅,如有错误请批评指正。
leveldb源码分析:数据插入与删除(Put与Delete)相关推荐
- Leveldb源码分析--1
[前言:看了一点oceanbase,没有意志力继续坚持下去了,暂时就此中断,基本上算把master看完了,比较重要的update server和merge server代码却没有细看.中间又陆续研究了 ...
- 【VUE】源码分析 - 数据劫持的基本原理
tips:本系列博客的代码部分(示例等除外),均出自vue源码内容,版本为2.6.14.但是为了增加易读性,会对不相关内容做选择性省略.如果大家想了解完整的源码,建议自行从官方下载.https://g ...
- leveldb源码分析:数据插入续(跳表)
leveldb数据的插入-跳表 本文主要是接着上一篇文章,继续深入探索Write函数调用插入之后的流程. status = WriteBatchInternal::InsertInto(updates ...
- leveldb源码分析:数据查询
leveldb数据查询 查询的示例代码如下: string res; status = db->Get(ReadOptions(), "KeyNameExample", &a ...
- leveldb源码分析:Open启动流程
leveldb概述 Leveldb 是一个持久化的KV存储系统,主要将大部分数据存储在磁盘上,在存储数据的过程中,根据记录的key值有序存储,当然使用者也可以自定义Key大小比较函数,一个leveld ...
- LevelDB 源码分析
本文基于leveldb 1.9.0代码. 整体架构 如上图,leveldb的数据存储在内存以及磁盘上,其中: memtable:存储在内存中的数据,使用skiplist实现. immutable me ...
- View系列 :源码分析:RecyclerView滑动删除 全解析
1:效果展示 效果很简单,就是 RecycleView的 滑动删除功能 2:效果分析 主要是三个步骤: 步骤一:是RecyclerView 的每一个条目上增加 删除 View控件,这个是静态xml页面 ...
- Nginx源码分析--数据对齐posix_memalign和memalign函数
posix_memalign函数() /* * 背景: * 1)POSIX 1003.1d * 2)POSIX 标明了通过malloc( ), calloc( ), 和 re ...
- 风讯dotNETCMS源码分析—数据存取篇
前几天突然对CMS感兴趣,就去下载了风讯dotNETCMS源码.当前版本是dotnetcms1.0 sp5免费版,风讯的官方主页上可以下载. 用Visual Studio 2008打开后,初步分析了它 ...
最新文章
- resultmap为list_MyBatis源码:原来 resultMap 解析完是这样
- tf.broadcast_dynamic_shape
- mac远程连接centos安装mysql_centos安装Mysql并远程连接
- ZeroMQ之Publish/Subscribe (Java)
- Unity3d烘焙常见黑斑解决方法(适用5.x、2017、2018、2019版)
- 2018年《大数据》杂志调查问卷
- SQL 一次插入多条记录
- MFC 小知识总结五
- 下载并还原AdventureWorksDW2012数据库
- 让Android 设备通过USB 转RJ45有线网卡上网
- Java并发练习:exchange简单使用
- 微信邮箱是什么?微信如何绑定邮箱收发邮件?
- lpc1768的gpio库函数_LPC1768之GPIO输入和输出配置基础例程
- 虚幻4引擎源码学习笔记(一):整体文件结构
- matlab getprmdflt,DFLT40A-7中文资料
- C语言中字符数组的初始化与赋值,字符串相关函数
- 中国内地城市CA分布
- JAVA科学计数法,金额数字,转换为中文大写字
- 学习真的是一件很枯燥的事情。
- 2022-2028全球与中国超级电容器市场现状及未来发展趋势