Rocksdb 的 MergeOperator 简单使用记录
本篇仅仅是一个记录 MergeOperator 的使用方式。
Rocksdb 使用MergeOperator 来代替Update 场景中的读改写操作,即用户的一个Update 操作需要调用rocksdb的 Get + Put 接口才能完成。
而这种情况下会引入一些额外的读写放大,对于支持SQL这种update 频繁的场景来说实在是不划算,所以Merge 操作横空出世,用户只需要实现自己的 Merge 操作,通过option 传入,接下来有update 的场景时只需要调用一个Merge
就可以完成了,后续针对当前key的 real update 都会在后台Compaction 以及 用户调研 Get
或者 迭代器操作 时会进行合并。当然,Merge本身也存在问题,就是如果kMergeType得不到及时得compaction 调度,那可能读得负载就重了,因为读需要将之前的未Merge 都进行Merge 才能返回。
因为MergeOperator
虚基类 的函数太多了,会区分 full merge 和 partial merge,但是对于很多用户来说就是一个计数累加或者 string-append 操作,并没有过于复杂的操作,所以rocksdb 提供了 更为通用的虚基类AssociativeMergeOperator
来屏蔽复杂的Full merge 和 partial merge,继承这个类则只需要主体实现一个Merge
和 Name
函数即可。
如下代码使用了Rocksdb 已经封装好的两个 Merge操作,一个是StringAppendOperator
,另一个是UInt64AddOperator
,Merge
本身就是写一个key/value,只不过key的type是kMergeType
,value 也是实际存在的。
StringAppendOperator
StringAppend
的简单测试代码如下,我们使用同一个key进行两次Merge
操作,相当于写入了两条kMergeType
的key到db中,然后调用一次Flush,会生成一个sst文件(进行Merge),再Get会发现这个key的value 按照StringAppend中的行为完成了Merge。
#include <iostream>
#include <vector>#include <rocksdb/db.h>
#include <rocksdb/table.h>
#include <rocksdb/options.h>
#include <rocksdb/merge_operator.h>
#include <rocksdb/filter_policy.h>
#include <rocksdb/perf_context.h>
#include <rocksdb/iostats_context.h>
#include <rocksdb/trace_reader_writer.h>
#include "utilities/merge_operators.h"using namespace rocksdb;using namespace std;
rocksdb::DB* db;
rocksdb::Options option;void OpenDB() {option.create_if_missing = true;option.compression = rocksdb::CompressionType::kNoCompression;rocksdb::BlockBasedTableOptions table_options;table_options.no_block_cache = true;table_options.cache_index_and_filter_blocks = false;option.table_factory.reset(NewBlockBasedTableFactory(table_options));// 默认会用 逗号分隔 不同的mergeoption.merge_operator = MergeOperators::CreateStringAppendOperator();auto s = rocksdb::DB::Open(option, "./db", &db);if (!s.ok()) {cout << "open faled : " << s.ToString() << endl;exit(-1);}cout << "Finish open !"<< endl;
}void DoWrite() {int j = 0;string key = std::to_string(j);std::string value;char buf[8];rocksdb::Status s;EncodeFixed64(buf, 2);s = db->Merge(rocksdb::WriteOptions(),key, "2");s = db->Merge(rocksdb::WriteOptions(),key, "3");db->Flush(rocksdb::FlushOptions());if (!s.ok()) {cout << "Merge value failed: " << s.ToString() << endl;exit(-1);}s = db->Get(rocksdb::ReadOptions(), key, &value);if (!s.ok()) {cout << "Get after only merge is failed " << s.ToString() << endl;exit(-1);}cout << "Get merge value " << value.size() << " " << value << endl;
}int main() {OpenDB();DoWrite();return 0;
}
输出如下:
Finish open !
Finish merge !
Get merge value len: 3 data: 2,3
可以看到Get到的value 已经进行合并了。
UInt64AddOperator
这个是一个自增计数的Merge 案例。
需要主要的是如果自己实现 MergeOperator
底层有编解码,那上层用户侧请求的写入也需要 编码方式写入 以及 按照底层的解码方式读取。
Rocksdb实现的案例代码在拿到用户传入的value的时候会进行编解码:
// A 'model' merge operator with uint64 addition semantics
// Implemented as an AssociativeMergeOperator for simplicity and example.
class UInt64AddOperator : public AssociativeMergeOperator {public:bool Merge(const Slice& /*key*/, const Slice* existing_value,const Slice& value, std::string* new_value,Logger* logger) const override {uint64_t orig_value = 0;if (existing_value){// 解码以存在的value,则我们上层调用Merge 写入的时候需要按照Fixed64进行编码orig_value = DecodeInteger(*existing_value, logger);}uint64_t operand = DecodeInteger(value, logger);assert(new_value);new_value->clear();PutFixed64(new_value, orig_value + operand);return true; // Return true always since corruption will be treated as 0}const char* Name() const override { return "UInt64AddOperator"; }private:// Takes the string and decodes it into a uint64_t// On error, prints a message and returns 0uint64_t DecodeInteger(const Slice& value, Logger* logger) const {uint64_t result = 0;if (value.size() == sizeof(uint64_t)) {result = DecodeFixed64(value.data());} else if (logger != nullptr) {// If value is corrupted, treat it as 0ROCKS_LOG_ERROR(logger, "uint64 value corruption, size: %" ROCKSDB_PRIszt" > %" ROCKSDB_PRIszt,value.size(), sizeof(uint64_t));}return result;}};
案例代码:
#include <iostream>
#include <vector>#include <rocksdb/db.h>
#include <rocksdb/table.h>
#include <rocksdb/options.h>
#include <rocksdb/merge_operator.h>
#include <rocksdb/filter_policy.h>
#include <rocksdb/perf_context.h>
#include <rocksdb/iostats_context.h>
#include <rocksdb/trace_reader_writer.h>
#include "utilities/merge_operators.h"using namespace rocksdb;using namespace std;
rocksdb::DB* db;
rocksdb::Options option;static bool LittleEndian() {int i = 1;return *((char*)(&i));
}inline uint32_t DecodeFixed32(const char* ptr) {if (LittleEndian()) {// Load the raw bytesuint32_t result;memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain loadreturn result;} else {return ((static_cast<uint32_t>(static_cast<unsigned char>(ptr[0])))| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[1])) << 8)| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[2])) << 16)| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[3])) << 24));}
}inline uint64_t DecodeFixed64(const char* ptr) {if (LittleEndian()) {// Load the raw bytesuint64_t result;memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain loadreturn result;} else {uint64_t lo = DecodeFixed32(ptr);uint64_t hi = DecodeFixed32(ptr + 4);return (hi << 32) | lo;}
}inline void EncodeFixed64(char* buf, uint64_t value) {if (LittleEndian()) {memcpy(buf, &value, sizeof(value));} else {buf[0] = value & 0xff;buf[1] = (value >> 8) & 0xff;buf[2] = (value >> 16) & 0xff;buf[3] = (value >> 24) & 0xff;buf[4] = (value >> 32) & 0xff;buf[5] = (value >> 40) & 0xff;buf[6] = (value >> 48) & 0xff;buf[7] = (value >> 56) & 0xff;}
}void OpenDB() {option.create_if_missing = true;option.compression = rocksdb::CompressionType::kNoCompression;rocksdb::BlockBasedTableOptions table_options;table_options.no_block_cache = true;table_options.cache_index_and_filter_blocks = false;option.table_factory.reset(NewBlockBasedTableFactory(table_options));option.merge_operator = MergeOperators::CreateUInt64AddOperator();auto s = rocksdb::DB::Open(option, "./db", &db);if (!s.ok()) {cout << "open faled : " << s.ToString() << endl;exit(-1);}cout << "Finish open !"<< endl;
}void DoWrite() {int j = 0;string key = std::to_string(j);std::string value;char buf[8];rocksdb::Status s;// 因为底层实现的Uint64AddOperator 会进行编码 以及 解码EncodeFixed64(buf, 2);// 对同一个key ,merge 两个2,则最后Get的时候会变成4s = db->Merge(rocksdb::WriteOptions(),key, std::string(buf,8));s = db->Merge(rocksdb::WriteOptions(),key, std::string(buf,8));db->Flush(rocksdb::FlushOptions());if (!s.ok()) {cout << "Merge value failed: " << s.ToString() << endl;exit(-1);}cout << "Finish merge !" << endl;s = db->Get(rocksdb::ReadOptions(), key, &value);if (!s.ok()) {cout << "Get after only merge is failed " << s.ToString() << endl;exit(-1);}cout << "Get merge value " << value.size() << " " << DecodeFixed64(value.data()) << endl;
}int main() {OpenDB();DoWrite();return 0;
}
输出如下:
Finish open !
Finish merge !
Get merge value 8 4
Rocksdb 的 MergeOperator 简单使用记录相关推荐
- 语言都是相通的,学好一门语言,再学第二门语言就很简单,记录一下我复习c语言的过程。...
语言都是相通的,学好一门语言,再学第二门语言就很简单,记录一下我复习c语言的过程. 为了将本人的python培训提高一个层次,本人最近买了很多算法的书. 这个书上的代码基本都是c语言实现的,c语言很久 ...
- php记录网站访问,PHP简单实现记录网站访问量的功能
这篇文章主要介绍了PHP简单实现记录网站访问量功能,涉及php针对文件加锁读写及日期时间转换等相关操作技巧,需要的朋友可以参考下 本文实例讲述了PHP简单实现记录网站访问量功能.分享给大家供大家参考, ...
- java util logging_简单日志记录,使用java.util.logging
jsp+servlet+JavaBean模式下,可以做个简单的日志记录,日志文件保存在服务器.(Tomcat) package controller; import java.io.File; imp ...
- 简单的记录一下使用HAL库的SPI外挂W25Q32
简单的记录一下使用HAL库的SPI外挂W25Q32 抽筋了,想记录一下. cubeMX配置SPI CS脚 spi.h 里添加 #define FLASH_ID 0XEF14//指令表#define W ...
- HTML简单学习记录
文章目录 HTML简单学习记录 简介 HTML基本结构 网页基本标签 图像标签 链接标签 超链接 锚链接 功能性链接 行内元素和块元素 列表 表格 视频和音频 视频 音频 页面的简单布局 iframe ...
- python-pynput库用法 及 简单实现记录键盘
pynput软件包说明文档 控制及监听鼠标 #测试pynput第三方库 #控制鼠标 import timefrom pynput.mouse import Button,Controller #导入控 ...
- 开发一个简单错误记录功能小模块,能够记录出错的代码所在的文件名称和行号。
开发一个简单错误记录功能小模块,能够记录出错的代码所在的文件名称和行号. 处理:1.记录最多8条错误记录,对相同的错误记录(即文件名称和行号完全匹配)只记录一条,错误计数增加:(文件所在的目录不同,文 ...
- php网站统计浏览量,PHP简单实现记录网站访问量功能示例
本文实例讲述了PHP简单实现记录网站访问量功能.分享给大家供大家参考,具体如下: tongji/index.php文件: $file = dirname(__FILE__).'/tongji.db'; ...
- Ubuntu20.04下编译测试RocksDB以及遇到的问题记录
rockdb编译 git clone https://github.com/facebook/rocksdb.git //如果上面的命令报错无法连接,则换成下面这句 git clone git://g ...
最新文章
- python语言入门n-Python基础语法学习笔记
- Ranorex中利用code module对于测试数据的管理
- java配置mongo最大连接数
- Java Enum 枚举
- win2003主/辅DNS服务器详细配置
- java 工厂模式 计算器_java设计模式之简单工厂模式
- Java EE CDI处理程序方法示例
- java字符串反转异或_字符串反转总结】Java中七种方法实现
- linux用sed命令修改IP地址,通过sed命令获取IP地址
- 读书会 | 第一季读书会《蛤蟆先生去看心理医生》完美收官啦
- NPN与PNP的区别
- 制作字幕.html教程,手机拍的视频如何加字幕 字幕制作软件使用教程
- SWUN 1431 - 伊邪那美(Ⅱ)
- 回头再说说音乐--香港流行音乐25年回顾
- 2019-01-09 工作日志:记录web3连接 respon
- EasyExcel 导出 excel(二)添加序号列,设置excel打印样式,导出即可打印
- 造轮子之后台管理模板
- [Python深度学习入门]实战一·Numpy梯度下降求最小值
- 【高等数学】常见的均值不等式
- 超级细菌战:一场人类无法打赢的战争