本篇仅仅是一个记录 MergeOperator 的使用方式。
Rocksdb 使用MergeOperator 来代替Update 场景中的读改写操作,即用户的一个Update 操作需要调用rocksdb的 Get + Put 接口才能完成。
而这种情况下会引入一些额外的读写放大,对于支持SQL这种update 频繁的场景来说实在是不划算,所以Merge 操作横空出世,用户只需要实现自己的 Merge 操作,通过option 传入,接下来有update 的场景时只需要调用一个Merge 就可以完成了,后续针对当前key的 real update 都会在后台Compaction 以及 用户调研 Get 或者 迭代器操作 时会进行合并。当然,Merge本身也存在问题,就是如果kMergeType得不到及时得compaction 调度,那可能读得负载就重了,因为读需要将之前的未Merge 都进行Merge 才能返回。

因为MergeOperator虚基类 的函数太多了,会区分 full merge 和 partial merge,但是对于很多用户来说就是一个计数累加或者 string-append 操作,并没有过于复杂的操作,所以rocksdb 提供了 更为通用的虚基类AssociativeMergeOperator来屏蔽复杂的Full merge 和 partial merge,继承这个类则只需要主体实现一个MergeName函数即可。

如下代码使用了Rocksdb 已经封装好的两个 Merge操作,一个是StringAppendOperator ,另一个是UInt64AddOperatorMerge本身就是写一个key/value,只不过key的type是kMergeType,value 也是实际存在的。

StringAppendOperator

StringAppend的简单测试代码如下,我们使用同一个key进行两次Merge操作,相当于写入了两条kMergeType的key到db中,然后调用一次Flush,会生成一个sst文件(进行Merge),再Get会发现这个key的value 按照StringAppend中的行为完成了Merge。

#include <iostream>
#include <vector>#include <rocksdb/db.h>
#include <rocksdb/table.h>
#include <rocksdb/options.h>
#include <rocksdb/merge_operator.h>
#include <rocksdb/filter_policy.h>
#include <rocksdb/perf_context.h>
#include <rocksdb/iostats_context.h>
#include <rocksdb/trace_reader_writer.h>
#include "utilities/merge_operators.h"using namespace rocksdb;using namespace std;
rocksdb::DB* db;
rocksdb::Options option;void OpenDB() {option.create_if_missing = true;option.compression = rocksdb::CompressionType::kNoCompression;rocksdb::BlockBasedTableOptions table_options;table_options.no_block_cache = true;table_options.cache_index_and_filter_blocks = false;option.table_factory.reset(NewBlockBasedTableFactory(table_options));// 默认会用 逗号分隔 不同的mergeoption.merge_operator = MergeOperators::CreateStringAppendOperator();auto s = rocksdb::DB::Open(option, "./db", &db);if (!s.ok()) {cout << "open faled :  " << s.ToString() << endl;exit(-1);}cout << "Finish open !"<< endl;
}void DoWrite() {int j = 0;string key = std::to_string(j);std::string value;char buf[8];rocksdb::Status s;EncodeFixed64(buf, 2);s = db->Merge(rocksdb::WriteOptions(),key, "2");s = db->Merge(rocksdb::WriteOptions(),key, "3");db->Flush(rocksdb::FlushOptions());if (!s.ok()) {cout << "Merge value failed: " << s.ToString() << endl;exit(-1);}s = db->Get(rocksdb::ReadOptions(), key, &value);if (!s.ok()) {cout << "Get after only merge is failed " << s.ToString() << endl;exit(-1);}cout << "Get merge value " << value.size() << " " << value << endl;
}int main() {OpenDB();DoWrite();return 0;
}

输出如下:

Finish open !
Finish merge !
Get merge value len: 3 data: 2,3

可以看到Get到的value 已经进行合并了。

UInt64AddOperator

这个是一个自增计数的Merge 案例。
需要主要的是如果自己实现 MergeOperator底层有编解码,那上层用户侧请求的写入也需要 编码方式写入 以及 按照底层的解码方式读取。
Rocksdb实现的案例代码在拿到用户传入的value的时候会进行编解码:

// A 'model' merge operator with uint64 addition semantics
// Implemented as an AssociativeMergeOperator for simplicity and example.
class UInt64AddOperator : public AssociativeMergeOperator {public:bool Merge(const Slice& /*key*/, const Slice* existing_value,const Slice& value, std::string* new_value,Logger* logger) const override {uint64_t orig_value = 0;if (existing_value){// 解码以存在的value,则我们上层调用Merge 写入的时候需要按照Fixed64进行编码orig_value = DecodeInteger(*existing_value, logger);}uint64_t operand = DecodeInteger(value, logger);assert(new_value);new_value->clear();PutFixed64(new_value, orig_value + operand);return true;  // Return true always since corruption will be treated as 0}const char* Name() const override { return "UInt64AddOperator"; }private:// Takes the string and decodes it into a uint64_t// On error, prints a message and returns 0uint64_t DecodeInteger(const Slice& value, Logger* logger) const {uint64_t result = 0;if (value.size() == sizeof(uint64_t)) {result = DecodeFixed64(value.data());} else if (logger != nullptr) {// If value is corrupted, treat it as 0ROCKS_LOG_ERROR(logger, "uint64 value corruption, size: %" ROCKSDB_PRIszt" > %" ROCKSDB_PRIszt,value.size(), sizeof(uint64_t));}return result;}};

案例代码:

#include <iostream>
#include <vector>#include <rocksdb/db.h>
#include <rocksdb/table.h>
#include <rocksdb/options.h>
#include <rocksdb/merge_operator.h>
#include <rocksdb/filter_policy.h>
#include <rocksdb/perf_context.h>
#include <rocksdb/iostats_context.h>
#include <rocksdb/trace_reader_writer.h>
#include "utilities/merge_operators.h"using namespace rocksdb;using namespace std;
rocksdb::DB* db;
rocksdb::Options option;static bool LittleEndian() {int i = 1;return *((char*)(&i));
}inline uint32_t DecodeFixed32(const char* ptr) {if (LittleEndian()) {// Load the raw bytesuint32_t result;memcpy(&result, ptr, sizeof(result));  // gcc optimizes this to a plain loadreturn result;} else {return ((static_cast<uint32_t>(static_cast<unsigned char>(ptr[0])))| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[1])) << 8)| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[2])) << 16)| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[3])) << 24));}
}inline uint64_t DecodeFixed64(const char* ptr) {if (LittleEndian()) {// Load the raw bytesuint64_t result;memcpy(&result, ptr, sizeof(result));  // gcc optimizes this to a plain loadreturn result;} else {uint64_t lo = DecodeFixed32(ptr);uint64_t hi = DecodeFixed32(ptr + 4);return (hi << 32) | lo;}
}inline void EncodeFixed64(char* buf, uint64_t value) {if (LittleEndian()) {memcpy(buf, &value, sizeof(value));} else {buf[0] = value & 0xff;buf[1] = (value >> 8) & 0xff;buf[2] = (value >> 16) & 0xff;buf[3] = (value >> 24) & 0xff;buf[4] = (value >> 32) & 0xff;buf[5] = (value >> 40) & 0xff;buf[6] = (value >> 48) & 0xff;buf[7] = (value >> 56) & 0xff;}
}void OpenDB() {option.create_if_missing = true;option.compression = rocksdb::CompressionType::kNoCompression;rocksdb::BlockBasedTableOptions table_options;table_options.no_block_cache = true;table_options.cache_index_and_filter_blocks = false;option.table_factory.reset(NewBlockBasedTableFactory(table_options));option.merge_operator = MergeOperators::CreateUInt64AddOperator();auto s = rocksdb::DB::Open(option, "./db", &db);if (!s.ok()) {cout << "open faled :  " << s.ToString() << endl;exit(-1);}cout << "Finish open !"<< endl;
}void DoWrite() {int j = 0;string key = std::to_string(j);std::string value;char buf[8];rocksdb::Status s;// 因为底层实现的Uint64AddOperator 会进行编码 以及 解码EncodeFixed64(buf, 2);// 对同一个key ,merge 两个2,则最后Get的时候会变成4s = db->Merge(rocksdb::WriteOptions(),key, std::string(buf,8));s = db->Merge(rocksdb::WriteOptions(),key, std::string(buf,8));db->Flush(rocksdb::FlushOptions());if (!s.ok()) {cout << "Merge value failed: " << s.ToString() << endl;exit(-1);}cout << "Finish merge !" << endl;s = db->Get(rocksdb::ReadOptions(), key, &value);if (!s.ok()) {cout << "Get after only merge is failed " << s.ToString() << endl;exit(-1);}cout << "Get merge value " << value.size() << " " << DecodeFixed64(value.data()) << endl;
}int main() {OpenDB();DoWrite();return 0;
}

输出如下:

Finish open !
Finish merge !
Get merge value 8 4

Rocksdb 的 MergeOperator 简单使用记录相关推荐

  1. 语言都是相通的,学好一门语言,再学第二门语言就很简单,记录一下我复习c语言的过程。...

    语言都是相通的,学好一门语言,再学第二门语言就很简单,记录一下我复习c语言的过程. 为了将本人的python培训提高一个层次,本人最近买了很多算法的书. 这个书上的代码基本都是c语言实现的,c语言很久 ...

  2. php记录网站访问,PHP简单实现记录网站访问量的功能

    这篇文章主要介绍了PHP简单实现记录网站访问量功能,涉及php针对文件加锁读写及日期时间转换等相关操作技巧,需要的朋友可以参考下 本文实例讲述了PHP简单实现记录网站访问量功能.分享给大家供大家参考, ...

  3. java util logging_简单日志记录,使用java.util.logging

    jsp+servlet+JavaBean模式下,可以做个简单的日志记录,日志文件保存在服务器.(Tomcat) package controller; import java.io.File; imp ...

  4. 简单的记录一下使用HAL库的SPI外挂W25Q32

    简单的记录一下使用HAL库的SPI外挂W25Q32 抽筋了,想记录一下. cubeMX配置SPI CS脚 spi.h 里添加 #define FLASH_ID 0XEF14//指令表#define W ...

  5. HTML简单学习记录

    文章目录 HTML简单学习记录 简介 HTML基本结构 网页基本标签 图像标签 链接标签 超链接 锚链接 功能性链接 行内元素和块元素 列表 表格 视频和音频 视频 音频 页面的简单布局 iframe ...

  6. python-pynput库用法 及 简单实现记录键盘

    pynput软件包说明文档 控制及监听鼠标 #测试pynput第三方库 #控制鼠标 import timefrom pynput.mouse import Button,Controller #导入控 ...

  7. 开发一个简单错误记录功能小模块,能够记录出错的代码所在的文件名称和行号。

    开发一个简单错误记录功能小模块,能够记录出错的代码所在的文件名称和行号. 处理:1.记录最多8条错误记录,对相同的错误记录(即文件名称和行号完全匹配)只记录一条,错误计数增加:(文件所在的目录不同,文 ...

  8. php网站统计浏览量,PHP简单实现记录网站访问量功能示例

    本文实例讲述了PHP简单实现记录网站访问量功能.分享给大家供大家参考,具体如下: tongji/index.php文件: $file = dirname(__FILE__).'/tongji.db'; ...

  9. Ubuntu20.04下编译测试RocksDB以及遇到的问题记录

    rockdb编译 git clone https://github.com/facebook/rocksdb.git //如果上面的命令报错无法连接,则换成下面这句 git clone git://g ...

最新文章

  1. python语言入门n-Python基础语法学习笔记
  2. Ranorex中利用code module对于测试数据的管理
  3. java配置mongo最大连接数
  4. Java Enum 枚举
  5. win2003主/辅DNS服务器详细配置
  6. java 工厂模式 计算器_java设计模式之简单工厂模式
  7. Java EE CDI处理程序方法示例
  8. java字符串反转异或_字符串反转总结】Java中七种方法实现
  9. linux用sed命令修改IP地址,通过sed命令获取IP地址
  10. 读书会 | 第一季读书会《蛤蟆先生去看心理医生》完美收官啦
  11. NPN与PNP的区别
  12. 制作字幕.html教程,手机拍的视频如何加字幕 字幕制作软件使用教程
  13. SWUN 1431 - 伊邪那美(Ⅱ)
  14. 回头再说说音乐--香港流行音乐25年回顾
  15. 2019-01-09 工作日志:记录web3连接 respon
  16. EasyExcel 导出 excel(二)添加序号列,设置excel打印样式,导出即可打印
  17. 造轮子之后台管理模板
  18. [Python深度学习入门]实战一·Numpy梯度下降求最小值
  19. 【高等数学】常见的均值不等式
  20. 超级细菌战:一场人类无法打赢的战争

热门文章

  1. 【傻瓜教程】CentOS 7 下 LNMP 环境搭建过程
  2. 【ArcGIS for Android】基于位置查询Graphic和Feature
  3. hdu 1166 敌兵布阵(树状数组)
  4. 十五天精通WCF——第六天 你必须要了解的3种通信模式
  5. 微信公众平台消息接口星标功能
  6. 一个虚函数和虚继承的问题。
  7. SQL SERVER 触发器示例
  8. java运行时_java编译时与运行时概念与实例详解
  9. float最大值_float.h库
  10. android zxing作用,Android / ZXing不再有效