文章目录

  • 前言
  • 使用方式
  • 实现原理
  • 总结

前言

很多时候,我们使用数据库时会有离线向数据库导入数据的需求。比如大量用户在本地的一些离线数据,想要将这一些数据导入到已有的数据库中;或者说NewSQL场景中部分机器离线,重新上线之后的数据增量/全量同步 等场景。这个时候 我们并不想要让这一些数据占用过多的系统资源,更不希望他们对正常的线上业务有影响,所以尽可能高效得完成这一些数据的同步就需要深入设计一番。

而如果底层引擎使用的是rocksdb,那就非常省事了,只需要组织好你们的数据调用接口就完事了,剩下的导入过程由引擎完成。 tikv便是通过 rocksdb的这个功能完成集群异常恢复之后 region之间的全量增量同步的。回到今天我们要讨论的主题,便是rocksdb的这个数据导入过程是如何尽可能快、尽可能高效得完成的。

使用方式

讲解实现原理之前我们先看看如何使用这个功能,功能的易用性也很重要,用户还是希望尽可能得少写代码来完成这个工作。使用上主要是两部分:创建SST文件 和 导入SST文件。

  • 创建sst文件:这一步主要是通过一个sst_filter_writer,将需要导入的 k/v 数据转换成sst文件

    需要注意的是:

    1. 用户k/v 数据需要按照options.comparator 严格有序,默认是按照key的字典序
    2. 这里的options 建议和db写入的options用一套(压缩选项,sst文件相关选项等)
    Options options;SstFileWriter sst_file_writer(EnvOptions(), options);
    // 指定形成的sst文件的路径
    std::string file_path = "/home/usr/file1.sst";// open file_path
    Status s = sst_file_writer.Open(file_path);
    for (...) {// 写入sst,用户保证k/v 的顺序s = sst_file_writer.Put(key, value);if (!s.ok()) {printf("Error while adding Key: %s, Error: %s\n", key.c_str(),s.ToString().c_str());return 1;}
    }// 完成写入
    s = sst_file_writer.Finish();
    
  • 导入sst文件:这个步骤就是将创建好的一个或者多个sst文件导入到db中,也允许向多个cf中导入

IngestExternalFileOptions ifo;
// Ingest the 2 passed SST files into the DB
// 导入数据
Status s = db_->IngestExternalFile({"/home/usr/file1.sst", "/home/usr/file2.sst"}, ifo);

使用还是比较简单的,整体的使用过程如下:

#include <iostream>
#include <vector>#include <gflags/gflags.h>#include <rocksdb/db.h>
#include <rocksdb/env.h>
#include <rocksdb/sst_file_writer.h>#define DATA_SIZE 10
#define VALUE_SIZE 1024using namespace std;// 比较函数
bool cmp(pair<string, string> str1,pair<string, string> str2) {if(str1.first < str2.first) {return true;} else if (str1.first == str2.first && str1.second < str2.second) {return true;} else {return false;}
}// 随机字符串
static string rand_data(long data_range) {char buff[30];unsigned long long num = 1;for (int i = 0;i < 4; ++i) {num *= (unsigned long long )rand();}sprintf(buff, "%llu", num % (unsigned long long)data_range );string data(buff);return data;
}// 构造有序数据
void construct_data(vector<pair<string,string>> &input) {int i;string key;string value;for (i = 0;i < DATA_SIZE; i++) {if(key == "0") {continue;}key = rand_data(VALUE_SIZE);value = rand_data(VALUE_SIZE);input.push_back(make_pair(key, value));}
}void traverse_data(vector<pair<string,string>> input) {int i;for(auto data : input) {cout << data.first << " " << data.second << endl;}
}// 创建sst文件
int create_sst(string file_path) {vector<pair<string,string>> input;vector<pair<string,string>>::iterator input_itr;rocksdb::Options option;/* open statistics and disable compression */option.create_if_missing = true;option.compression = rocksdb::CompressionType::kNoCompression;rocksdb::SstFileWriter sst_file_writer(rocksdb::EnvOptions(), option);rocksdb::Status s = sst_file_writer.Open(file_path);if (!s.ok()) {printf("Error while opening file %s, Error: %s\n", file_path.c_str(),s.ToString().c_str());return 1;}// 需要保证数据有序后再写入construct_data(input);sort(input.begin(), input.end(), cmp);traverse_data(input);// Insert rows into the SST file, note that inserted keys must be // strictly increasing (based on options.comparator)for (input_itr = input.begin(); input_itr != input.end();input_itr ++) {rocksdb::Slice key(input_itr->first);rocksdb::Slice value(input_itr->second);s = sst_file_writer.Put(key, value);if (!s.ok()) {printf("Error while adding Key: %s, Error: %s\n",key.ToString().c_str(),s.ToString().c_str());return 1;}}// Close the files = sst_file_writer.Finish();if (!s.ok()) {printf("Error while finishing file %s, Error: %s\n", file_path.c_str(),s.ToString().c_str());return 1;}return 0;
}static rocksdb::DB *db;void create_db() {rocksdb::Options option;/* open statistics and disable compression */option.create_if_missing = true;option.compression = rocksdb::CompressionType::kNoCompression;rocksdb::Status s = rocksdb::DB::Open( option,"./db", &db);if (!s.ok()) {printf("Open db failed : %s\n", s.ToString().c_str());return;}
}void db_write(int num_keys) {rocksdb::WriteOptions write_option;write_option.sync = true;rocksdb::Slice key;rocksdb::Slice value;rocksdb::Status s;int i;printf("begin write \n");for (i = 0;i < num_keys; i++) {key = rand_data(VALUE_SIZE);value = rand_data(VALUE_SIZE);s = db->Put(write_option, key, value);if (!s.ok()) {printf("Put db failed : %s\n", s.ToString().c_str());return;}}db->Flush(rocksdb::FlushOptions());printf("finish write \n");
}int main() {// 先写入一批数据create_db();db_write(100000);// 创建sst文件if (create_sst("./test.sst") == 0) {printf("creates sst success !\n");} else {printf("creates sst failed !\n");}// 导入数据rocksdb::IngestExternalFileOptions ifo;// Ingest the 2 passed SST files into the DBprintf("Ingest sst !\n");rocksdb::Status s = db->IngestExternalFile({"test.sst"}, ifo);if (!s.ok()) {printf("Error while adding file test.sst , Error %s\n",s.ToString().c_str());return 1;}return 0;
}

运行输出如下:

begin write
finish write
# consturct data,需按照字典序,如果没有按照字典序构造的话会报错
1008 232
240 880
288 63
410 768
506 56
534 256
640 180
72 248
800 672
944 217
creates sst success !

通过db日志可以看到我们创建的sst文件test.sst被成功导入到db,形成了./db/000020.sst,且在db目录中。

╰─$ cat db/LOG |grep ingested
[AddFile] External SST file test.sst was ingested in L0 with path ./db/000020.sst (global_seqno=200012)╰─$ ls db
000017.log               000020.sst               IDENTITY                 LOG                      LOG.old.1618643738564935 OPTIONS-000008
000019.sst               CURRENT                  LOCK                     LOG.old.1618123487361092 MANIFEST-000013          OPTIONS-000016

实现原理

从如何使用这个功能上我们能够感觉到这一些数据并不是通过rocksdb正常的I/O流程写入的。如果使用正常的接口,那我们用户不需要排序,而是直接通过db->Put接口将k/v写入,凡事都有但是,但是这样来导入离线数据在rocksdb内部后续的flush/compaction 都会消耗大量的系统资源,而这并不是我们想要的高效。所以,rocksdb提供的ingest接口肯定不会让这一些要导入的数据消耗过多的资源,接下来我们一起看看底层的详细实现。

为了更形象得告诉大家在rocksdb作为存储引擎的场景,如果通过传统的put接口导入数据会多出哪一些I/O,如下图

其中红色的尖头 是ingest file 相比于传统的put接口 少的I/O部分,可以说ingest方式导入数据极大得节约了整个系统资源的开销(包括但不限于I/O , CPU 资源的开销)。

下面主要介绍的是有了sst文件,接下来如何导入到db中的过程。关于通过sst_file_writer创建具体的sst文件的过程就不多说了,也就是按照sst文件的格式(datablock,index block…footer)等将有序的数据一个个添加进去而已。

主要有如下几步:

  1. 为待插入的sst文件创建file link到db目录,或者直接拷贝进去
  2. 停止写入,需要保证即将导入的sst文件在db中拥有一个安全合理的seqno,如果持续写入,那这个seqno可能不会全局递增了。
  3. 检查导入的sst文件是否和memtable中的key-range有重叠,有的话需要flush memtable
  4. 为这个sst文件 按照其key-range挑选一个合适的level放进去
  5. 为这个问天添加一个全局的seqno
  6. 恢复db的写入

其中停止写入到恢复写入这段时间对于用户来说越小越好,所以ingest的性能很重要。

接下来看看详细的源代码实现:

导入数据的函数入口是DBImpl::IngestExternalFiles

导入的sst文件最后都需要形成一个db内部的sst文件,因为这个时候已经停止写入了,所以会从最新的sst文件编号之后取一个文件编号,后续的其他要导入的sst文件会不断追加。

Status DBImpl::IngestExternalFiles(const std::vector<IngestExternalFileArg>& args) {...// 构造文件编号到next_file_number中Status status = ReserveFileNumbersBeforeIngestion(static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd(), total,pending_output_elem, &next_file_number);if (!status.ok()) {InstrumentedMutexLock l(&mutex_);ReleaseFileNumberFromPendingOutputs(pending_output_elem);return status;}...
}

有了在db内部的合法文件编号,我们就可以进行文件迁移了,将待导入的sst文件迁移到db内部已经构造好的sst文件编号之中。

会为每一个cf构造一个ingest_job, 将待导入文件拷贝/移动到 db内部的sst文件中,这个过程是在接下来的Prepare函数中。

  uint64_t start_file_number = next_file_number;for (size_t i = 1; i != num_cfs; ++i) {start_file_number += args[i - 1].external_files.size();auto* cfd =static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);// prepare 函数exec_results[i].second = ingestion_jobs[i].Prepare(args[i].external_files, start_file_number, super_version);exec_results[i].first = true;CleanupSuperVersion(super_version);}

看看Prepare的函数实现:

  1. 拿着输入的多个sst文件,如果有多个,则需要检查这一些文件之间是否有重叠key,有的话就不支持了(rocksdb除了l0,其他层不允许有重叠key)。
  2. 根据用户指定的ingest option: move_files 是否为true,来将待导入文件move到db中, 如果move失败了就拷贝文件。
Status ExternalSstFileIngestionJob::Prepare(const std::vector<std::string>& external_files_paths,uint64_t next_file_number, SuperVersion* sv) {// 解析文件信息for (const std::string& file_path : external_files_paths) {IngestedFileInfo file_to_ingest;status = GetIngestedFileInfo(file_path, &file_to_ingest, sv);if (!status.ok()) {return status;}files_to_ingest_.push_back(file_to_ingest);}// 确保导入的多个sst文件之间没有重叠......} else if (num_files > 1) {// Verify that passed files dont have overlapping rangesautovector<const IngestedFileInfo*> sorted_files;for (size_t i = 0; i < num_files; i++) {sorted_files.push_back(&files_to_ingest_[i]);}std::sort(sorted_files.begin(), sorted_files.end(),[&ucmp](const IngestedFileInfo* info1, const IngestedFileInfo* info2) {return sstableKeyCompare(ucmp, info1->smallest_internal_key,info2->smallest_internal_key) < 0;});// 如果有重叠的话,ingest也无法支持,因为在db中大于level0的更高层level内部的// sst文件之间是不允许有重叠的,加速更高层的二分查找。for (size_t i = 0; i < num_files - 1; i++) {if (sstableKeyCompare(ucmp, sorted_files[i]->largest_internal_key,sorted_files[i + 1]->smallest_internal_key) >= 0) {files_overlap_ = true;break;}}}......// 根据用户参数move文件if (ingestion_options_.move_files) {status = env_->LinkFile(path_outside_db, path_inside_db);...} else { // 否则就拷贝文件f.copy_file = true;}if (f.copy_file) {TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile",nullptr);// CopyFile also sync the new file.status = CopyFile(env_, path_outside_db, path_inside_db, 0,db_options_.use_fsync);}...
}

到此,文件就已经进入到了rocksdb 之中,ingest_job的prepare流程就结束了。

接下来 就到了我们前面介绍总步骤的第二步,停止用户对当前db的写入:

DBImpl::IngestExternalFilesWriteThread::EnterUnbatched

其中WriteThread::EnterUnbatched函数会让当前db的写入线程都处于wait状态。

接下来就是检查当前要导入的文件是否和memtable中的key-range有重叠,函数调用如下:

DBImpl::IngestExternalFilesExternalSstFileIngestionJob::NeedsFlushColumnFamilyData::RangesOverlapWithMemtables

这个函数ColumnFamilyData::RangesOverlapWithMemtables会拿着从ingest files中构造好的key-range和memtable中的 key-range 进行对比,如果有重叠key,则会将memtable flush置为true

Status ColumnFamilyData::RangesOverlapWithMemtables(const autovector<Range>& ranges, SuperVersion* super_version,bool* overlap) {...Status status;// 拿着ingest files的range中的每一个key,看是否能够从memtable中找到for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {auto* vstorage = super_version->current->storage_info();auto* ucmp = vstorage->InternalComparator()->user_comparator();InternalKey range_start(ranges[i].start, kMaxSequenceNumber,kValueTypeForSeek);// 从memtable中找memtable_iter->Seek(range_start.Encode());status = memtable_iter->status();ParsedInternalKey seek_result;if (status.ok()) {if (memtable_iter->Valid() &&!ParseInternalKey(memtable_iter->key(), &seek_result)) {status = Status::Corruption("DB have corrupted keys");}}// 找到了,则置overlap为trueif (status.ok()) {if (memtable_iter->Valid() &&ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {*overlap = true;} else if (range_del_agg.IsRangeOverlapped(ranges[i].start,ranges[i].limit)) {*overlap = true;}}}...
}

在后续的DBImpl::FlushMemTable函数中会flush memtable,不同的cf是分开进行的

DBImpl::IngestExternalFilesDBImpl::FlushMemTable

接下来就开始了第四步和第五步的处理逻辑,需要为每一个落到db中的sst文件挑选合适的level以及分配全局seqno,处理逻辑在Run函数中:

DBImpl::IngestExternalFilesExternalSstFileIngestionJob::Run

主要处理逻辑如下:

一个一个ingest file进行处理

  1. 选择一个合适的level,将ingest file插入进去
    如果user配置了allow_ingest_behind=true,即允许导入的数据直接插入到最后一层的文件位置,且ingest的时候配置的ingest option中ingest_behind=true,则会先尝试插入到bottomest level,如果最后一层的文件和待插入的文件有重叠,则插入失败。处理逻辑在CheckLevelForIngestedBehindFile函数之中。

    否则逐层遍历,找到第一个和这一些key-range有重叠的level即可。函数AssignLevelAndSeqnoForIngestedFile

  2. 找到了合适的level的同时会记录一个assigned_seqno,是在当前last_sequence的基础上+1得到的。函数AssignLevelAndSeqnoForIngestedFile之中。

  3. 为当前ingest_file 写入一个global seq no, 并执行fsync/sync。函数AssignGlobalSeqnoForIngestedFile之中。

  4. 最后就是将当完成更新的ingest file的元信息更新到VersionEdit之中。

接下来就进入尾声了:

  1. 将更新的VersionEdit写入到MANIFEST文件之中
  2. 更新每个ingest file对应的cf信息,并且调度compaction/flush, 因为之前ingest file时找的是有重叠key的一层。
  3. 恢复db的写入
         // 将`VersionEdit`写入到MANIFEST文件之中status =versions_->LogAndApply(cfds_to_commit, mutable_cf_options_list,edit_lists, &mutex_, directories_.GetDbDir());}if (status.ok()) {for (size_t i = 0; i != num_cfs; ++i) {auto* cfd =static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();if (!cfd->IsDropped()) {//更新每个ingest file对应的cf信息,并且调度compaction/flush, 因为之前ingest file时找的是有重叠key的一层InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i],*cfd->GetLatestMutableCFOptions());...}}}// 恢复db的写入,唤醒db的其他所有的writerwrite_thread_.ExitUnbatched(&w);

到此,整个ingest就算是结束了。

总结

通过ingest的实现,我们能够看到rocksdb通过ingest的方式支持离线数据导入确实能够极大得降低系统资源的开销。不需要一个key在LSM中被反复的写入、读取。

Rocksdb 通过ingestfile 来支持高效的离线数据导入相关推荐

  1. 离线数据同步神器:DataX,支持几乎所有异构数据源的离线同步到MaxCompute

    2019独角兽企业重金招聘Python工程师标准>>> 摘要: 概述 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL.Oracle.SqlSer ...

  2. CI130X智能语音芯片应用于智能面板,支持红外设备离线语音控制、场景控制等功能

    随着人们生活水平的提高,用户对产品的追求呈现多元化趋势,不仅仅只关注面板开关去控制的灯具亮灭,更注重产品的使用体验感.面板控制也演变了多种方式,从机械开关,轻触按键,手持遥控器到手机端APP,智能语音 ...

  3. Android的webview支持HTML5的离线应用功能

    HTML5的离线应用功能可以使得WebApp即使在网络断开的情况下仍能正常使用,这是个非常有用的功能.近来工作中也要用到HTML5离线应用功能,由于是在Android平台上做,所以自然而然的选择Web ...

  4. 滴滴海量离线数据的在线化 — FastLoad

    本文作者:赵锐,滴滴 |高级工程师,从事分布式存储NoSQL/NewSQL的相关研发,参与从零开始构建滴滴分布式存储Fusion,有PB级别存储.千万QPS的存储经验. 0. 目录 1. 业务背景:雄 ...

  5. 邀您参与 | 阿里巴巴如何扩展 K8s 调度器支持 AI 和大数据任务?

    简介:2020 年 7 月 15 日上午 10:00,<阿里巴巴如何扩展 K8s 调度器支持 AI 和大数据任务?>主题线上网络研讨会即将召开. 随着 Kubernetes 的广泛应用,越 ...

  6. 挖掘网站数据价值——大规模离线数据的分析处理应用

    文 / 蔡文志    汤子楠 为什么我们使用搜索引擎时,不同的用户搜索同样的关键词看到的广告却不同?为什么我们到电子商务网站购物时,每次浏览同样的商品时都可以得到不同的商品推荐?作为网站服务的开发者, ...

  7. DataX离线数据同步

    目录 1 DataX 2 ODPS同步数据到HDFS HA 配置 Kerberos 配置 域外访问配置 3 HDFS同步数据到另一个HDFS 4 MongoDB同步数据到HDFS 5 带 Kerber ...

  8. 离线数据同步平台datax+报表可视化平台metabase

    datax DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL.Oracle.SqlServer.Postgre.HDFS.Hive.ADS.HBase.TableS ...

  9. hybriddb mysql移植_HybridDB for MySQL 实现在线与离线数据分离的实践

    本文将重点介绍HybridDB for MySQL 实现在线与离线数据分离的实践,特别推荐! 核心业务简介 任务中心汇聚了集团的所有工作流任务,并提供统一的入口给用户处理集团的工作任务. 面临主要问题 ...

最新文章

  1. 英特尔用英伟达显卡,给GTA5打了个超强画质补丁
  2. Python PIP Install throws TypeError: unsupported operand type(s) for -=: 'Retry' and 'int'
  3. IRC BOT原来是利用IRC下发CC命令——在xx云环境遇到了,恶意软件开的是6666端口...
  4. Spring AOP学习
  5. 新版pycharm,亮瞎我的狗眼
  6. springboot actuator监控笔记
  7. 刚发现的2011年最给力的春联
  8. PyTorch多GPU并行训练方法及问题整理
  9. strstr 函数的实现
  10. c语言输入一串字符辨别奇偶,c语言设计输入一个正整数判断其中各个数字是否奇数偶数交替出现是输出yes不是输出no...
  11. Jfinal的七牛云存储插件:qiniuPlugin for jfinal.
  12. cwm oracle,ORA-06512: at OLAPSYS.CWM2_OLAP_UTILITY
  13. 按键精灵调用python文件_Python按键精灵自动化
  14. 8.7. Enumerated Types
  15. 微信小程序请求函数的封装
  16. 浏览器安全——Web页面安全浏览器网络安全(HTTPS)浏览器系统安全
  17. 找不到apt和vim命令
  18. 运行docker run显示is already in use by container
  19. xftp的免费下载和安装教程
  20. vue 点击打开小窗口

热门文章

  1. 合并本地Maven仓库
  2. Android手机指令操作释疑
  3. Caused by: java.sql.BatchUpdateException
  4. DP_knapsack
  5. linux 唯一行数量,linux – 确定bash中具有awk或类似内容的唯一行数
  6. python 空指针_python 空指针
  7. 三下乡辅导孩子计算机知识,【青春“三下乡”】普及家庭教育知识,共促孩子健康成长...
  8. python npz文件_numpy的文件存储 .npy .npz 文件
  9. linux分区通俗讲解,linux硬盘分区基础及设备号的解释
  10. Linux系统的快照是什么,linux – 文件系统快照与简单复制文件有何不同?