用C语言从零开始实现SQLite clone系列:

没有什么比持久化存储更重要。——Calvin Coolidge我们的数据库目前支持插入,读取,但前提是必须保持程序运行。如果终止该程序并重启,则所有记录都将消失。下面是我们想要改进的:

it 'keeps data after closing connection' do

result1 = run_script([

"insert 1 user1 person1@example.com",

".exit",

])

expect(result1).to match_array([

"db > Executed.",

"db > ",

])

result2 = run_script([

"select",

".exit",

])

expect(result2).to match_array([

"db > (1, user1, person1@example.com)",

"Executed.",

"db > ",

])

end

与SQLite一样,我们将整个数据库保存到文件来持久化数据。

我们已经可以把序列化的数据存放到页面大小的内存块中。为了获得持久性,我们可以简单地将那些内存块中的数据写入文件,并在下次程序启动时将它们读回到内存中。

为了简化这个流程,我们将创建一个称为Pager的abstraction。我们向Pager询问页面编号x,pager给我们返回了一个内存地址。它首先在其缓存中查找。如果未找到,它将数据从磁盘复制到内存中(通过读取数据库文件)。

本程序与SQLite架构对应关系

Pager访问页面缓存和文件。 Table对象通过Pager发出页面请求:

+typedef struct {

+  int file_descriptor;

+  uint32_t file_length;

+  void* pages[TABLE_MAX_PAGES];

+} Pager;

+

typedef struct {

-  void* pages[TABLE_MAX_PAGES];

+  Pager* pager;

uint32_t num_rows;

} Table;

我将new_table()重命名为db_open(),因为它现在具有打开与数据库的连接的作用。连接意味着:

打开数据库文件

初始化Pager数据结构

初始化表的数据结构

-Table* new_table() {

+Table* db_open(const char* filename) {

+  Pager* pager = pager_open(filename);

+  uint32_t num_rows = pager->file_length / ROW_SIZE;

+

Table* table = malloc(sizeof(Table));

-  table->num_rows = 0;

+  table->pager = pager;

+  table->num_rows = num_rows;

return table;

}

db_open()依次调用pager_open(),这将打开数据库文件并跟踪其大小。它还将页面缓存初始化为all NULL。

+Pager* pager_open(const char* filename) {

+  int fd = open(filename,

+                O_RDWR |      // Read/Write mode

+                    O_CREAT,  // Create file if it does not exist

+                S_IWUSR |     // User write permission

+                    S_IRUSR   // User read permission

+                );

+

+  if (fd == -1) {

+    printf("Unable to open file\n");

+    exit(EXIT_FAILURE);

+  }

+

+  off_t file_length = lseek(fd, 0, SEEK_END);

+

+  Pager* pager = malloc(sizeof(Pager));

+  pager->file_descriptor = fd;

+  pager->file_length = file_length;

+

+  for (uint32_t i = 0; i

+    pager->pages[i] = NULL;

+  }

+

+  return pager;

+}

遵循我们的abstraction,我们将获取页面的逻辑移到了自己的方法中:

void* row_slot(Table* table, uint32_t row_num) {

uint32_t page_num = row_num / ROWS_PER_PAGE;

-  void* page = table->pages[page_num];

-  if (page == NULL) {

-    // Allocate memory only when we try to access page

-    page = table->pages[page_num] = malloc(PAGE_SIZE);

-  }

+  void* page = get_page(table->pager, page_num);

uint32_t row_offset = row_num % ROWS_PER_PAGE;

uint32_t byte_offset = row_offset * ROW_SIZE;

return page + byte_offset;

}

get_page()方法有一套逻辑来处理缓存未命中的问题。我们假定页面被一个接一个地保存在数据库文件中:页面0的偏移量为0,页面1的偏移量为4096,页面2的偏移量为8192,依此类推。如果请求的页面位于文件的边界之外,我们知道它应该为空,因此我们只分配一些内存并返回。当我们稍后将缓存刷新到磁盘时,该页面将被添加到文件中。

+void* get_page(Pager* pager, uint32_t page_num) {

+  if (page_num > TABLE_MAX_PAGES) {

+    printf("Tried to fetch page number out of bounds. %d > %d\n", page_num,

+           TABLE_MAX_PAGES);

+    exit(EXIT_FAILURE);

+  }

+

+  if (pager->pages[page_num] == NULL) {

+    // Cache miss. Allocate memory and load from file.

+    void* page = malloc(PAGE_SIZE);

+    uint32_t num_pages = pager->file_length / PAGE_SIZE;

+

+    // We might save a partial page at the end of the file

+    if (pager->file_length % PAGE_SIZE) {

+      num_pages += 1;

+    }

+

+    if (page_num <= num_pages) {

+      lseek(pager->file_descriptor, page_num * PAGE_SIZE, SEEK_SET);

+      ssize_t bytes_read = read(pager->file_descriptor, page, PAGE_SIZE);

+      if (bytes_read == -1) {

+        printf("Error reading file: %d\n", errno);

+        exit(EXIT_FAILURE);

+      }

+    }

+

+    pager->pages[page_num] = page;

+  }

+

+  return pager->pages[page_num];

+}

此时,我们等待用户关闭与数据库的连接,然后我们将缓存刷新到磁盘上。当用户退出时,我们将调用一个名为db_close()的新方法,该方法有如下作用:

将缓存刷新到磁盘上

关闭数据库文件

释放Pager的内存和表的数据结构

+void db_close(Table* table) {

+  Pager* pager = table->pager;

+  uint32_t num_full_pages = table->num_rows / ROWS_PER_PAGE;

+

+  for (uint32_t i = 0; i

+    if (pager->pages[i] == NULL) {

+      continue;

+    }

+    pager_flush(pager, i, PAGE_SIZE);

+    free(pager->pages[i]);

+    pager->pages[i] = NULL;

+  }

+

+  // There may be a partial page to write to the end of the file

+  // This should not be needed after we switch to a B-tree

+  uint32_t num_additional_rows = table->num_rows % ROWS_PER_PAGE;

+  if (num_additional_rows > 0) {

+    uint32_t page_num = num_full_pages;

+    if (pager->pages[page_num] != NULL) {

+      pager_flush(pager, page_num, num_additional_rows * ROW_SIZE);

+      free(pager->pages[page_num]);

+      pager->pages[page_num] = NULL;

+    }

+  }

+

+  int result = close(pager->file_descriptor);

+  if (result == -1) {

+    printf("Error closing db file.\n");

+    exit(EXIT_FAILURE);

+  }

+  for (uint32_t i = 0; i

+    void* page = pager->pages[i];

+    if (page) {

+      free(page);

+      pager->pages[i] = NULL;

+    }

+  }

+  free(pager);

+  free(table);

+}

+

-MetaCommandResult do_meta_command(InputBuffer* input_buffer) {

+MetaCommandResult do_meta_command(InputBuffer* input_buffer, Table* table) {

if (strcmp(input_buffer->buffer, ".exit") == 0) {

+    db_close(table);

exit(EXIT_SUCCESS);

} else {

return META_COMMAND_UNRECOGNIZED_COMMAND;

在我们当前的设计中,文件的长度记录数据库中有多少行,因此我们需要在文件末尾写入部分页面(partial page)。这就是为什么pager_flush()同时获得页码和大小。这不是最好的设计,但是当我们开始使用B树时,就不再需要它了。

+void pager_flush(Pager* pager, uint32_t page_num, uint32_t size) {

+  if (pager->pages[page_num] == NULL) {

+    printf("Tried to flush null page\n");

+    exit(EXIT_FAILURE);

+  }

+

+  off_t offset = lseek(pager->file_descriptor, page_num * PAGE_SIZE, SEEK_SET);

+

+  if (offset == -1) {

+    printf("Error seeking: %d\n", errno);

+    exit(EXIT_FAILURE);

+  }

+

+  ssize_t bytes_written =

+      write(pager->file_descriptor, pager->pages[page_num], size);

+

+  if (bytes_written == -1) {

+    printf("Error writing: %d\n", errno);

+    exit(EXIT_FAILURE);

+  }

+}

最后,我们需要接受文件名作为命令行参数。别忘了还要在do_meta_command中添加额外的参数:

int main(int argc, char* argv[]) {

-  Table* table = new_table();

+  if (argc

+    printf("Must supply a database filename.\n");

+    exit(EXIT_FAILURE);

+  }

+

+  char* filename = argv[1];

+  Table* table = db_open(filename);

+

InputBuffer* input_buffer = new_input_buffer();

while (true) {

print_prompt();

read_input(input_buffer);

if (input_buffer->buffer[0] == '.') {

-      switch (do_meta_command(input_buffer)) {

+      switch (do_meta_command(input_buffer, table)) {

进行了这些更改后,我们可以关闭并重新打开数据库,而我们的数据仍然存在!

~ ./db mydb.db

db > insert 1 cstack foo@bar.com

Executed.

db > insert 2 voltorb volty@example.com

Executed.

db > .exit

~

~ ./db mydb.db

db > select

(1, cstack, foo@bar.com)

(2, voltorb, volty@example.com)

Executed.

db > .exit

~

在看些有意思的,让我们看一下mydb.db如何存储数据。我使用vim作为十六进制编辑器来查看内存中的文件样式:

vim mydb.db

:%!xxd

前四个字节是第一行的ID(4个字节是因为我们用uint32_t格式存储)。它是以低位字节顺序存储的,因此最低有效字节排在第一位(01),然后是高位字节(00 00 00)。我们使用memcpy()函数将Row结构中的字节复制到页面缓存中,这意味着该结构以低位字节序排列在内存中。这是我的电脑编译程序的一个属性。如果我们想在电脑上写入数据库文件,然后在高位字节排序的电脑上读取它,我们必须更改serialize_row()和deserialize_row()方法,让程序始终以相同的顺序存储和读取字节。

接下来33个字节将用户名存储为以空值结尾的字符串。显然,“cstack”用ASCII十六进制表示为6373 7461 636b,后跟一个空字符(00)。 33个字节的其余部分未使用。

接下来的256个字节以相同的方式存储电子邮件信息。在这里,我们可以看到终止的空字符后出现一些随机垃圾。这很可能是由于Row结构中未初始化的内存。我们将整个256字节的电子邮件缓冲区复制到文件中,包括字符串末尾的所有字节。当我们分配该结构时,内存中的内容仍然存在。但是,由于我们使用终止的空字符,因此它对行为没有影响。

注意:如果我们要确保所有字节都被初始化,则在复制serialize_row中的用户名和电子邮件字段时使用strncpy而不是memcpy,如下所示:

void serialize_row(Row* source, void* destination) {

memcpy(destination + ID_OFFSET, &(source->id), ID_SIZE);

-    memcpy(destination + USERNAME_OFFSET, &(source->username), USERNAME_SIZE);

-    memcpy(destination + EMAIL_OFFSET, &(source->email), EMAIL_SIZE);

+    strncpy(destination + USERNAME_OFFSET, source->username, USERNAME_SIZE);

+    strncpy(destination + EMAIL_OFFSET, source->email, EMAIL_SIZE);

}

结论我们已经实现了持久化存储,但还没做到尽善尽美。比如你不打.exit就杀掉了程序,你就会丢失数据。另外,我们会把所有Page写到磁盘包括那些有更改的数据和没更改的。这些问题我们以后再解决。

本篇代码如下:

+#include

+#include

#include

#include

#include

#include

#include

+#include

struct InputBuffer_t {

char* buffer;

@@ -62,9 +65,16 @@ const uint32_t PAGE_SIZE = 4096;

const uint32_t ROWS_PER_PAGE = PAGE_SIZE / ROW_SIZE;

const uint32_t TABLE_MAX_ROWS = ROWS_PER_PAGE * TABLE_MAX_PAGES;

+typedef struct {

+  int file_descriptor;

+  uint32_t file_length;

+  void* pages[TABLE_MAX_PAGES];

+} Pager;

+

typedef struct {

uint32_t num_rows;

-  void* pages[TABLE_MAX_PAGES];

+  Pager* pager;

} Table;

@@ -84,32 +94,81 @@ void deserialize_row(void *source, Row* destination) {

memcpy(&(destination->email), source + EMAIL_OFFSET, EMAIL_SIZE);

}

+void* get_page(Pager* pager, uint32_t page_num) {

+  if (page_num > TABLE_MAX_PAGES) {

+     printf("Tried to fetch page number out of bounds. %d > %d\n", page_num,

+       TABLE_MAX_PAGES);

+     exit(EXIT_FAILURE);

+  }

+

+  if (pager->pages[page_num] == NULL) {

+     // Cache miss. Allocate memory and load from file.

+     void* page = malloc(PAGE_SIZE);

+     uint32_t num_pages = pager->file_length / PAGE_SIZE;

+

+     // We might save a partial page at the end of the file

+     if (pager->file_length % PAGE_SIZE) {

+         num_pages += 1;

+     }

+

+     if (page_num <= num_pages) {

+         lseek(pager->file_descriptor, page_num * PAGE_SIZE, SEEK_SET);

+         ssize_t bytes_read = read(pager->file_descriptor, page, PAGE_SIZE);

+         if (bytes_read == -1) {

+       printf("Error reading file: %d\n", errno);

+       exit(EXIT_FAILURE);

+         }

+     }

+

+     pager->pages[page_num] = page;

+  }

+

+  return pager->pages[page_num];

+}

+

void* row_slot(Table* table, uint32_t row_num) {

uint32_t page_num = row_num / ROWS_PER_PAGE;

-  void *page = table->pages[page_num];

-  if (page == NULL) {

-     // Allocate memory only when we try to access page

-     page = table->pages[page_num] = malloc(PAGE_SIZE);

-  }

+  void *page = get_page(table->pager, page_num);

uint32_t row_offset = row_num % ROWS_PER_PAGE;

uint32_t byte_offset = row_offset * ROW_SIZE;

return page + byte_offset;

}

-Table* new_table() {

-  Table* table = malloc(sizeof(Table));

-  table->num_rows = 0;

+Pager* pager_open(const char* filename) {

+  int fd = open(filename,

+         O_RDWR |  // Read/Write mode

+             O_CREAT,  // Create file if it does not exist

+         S_IWUSR | // User write permission

+             S_IRUSR   // User read permission

+         );

+

+  if (fd == -1) {

+     printf("Unable to open file\n");

+     exit(EXIT_FAILURE);

+  }

+

+  off_t file_length = lseek(fd, 0, SEEK_END);

+

+  Pager* pager = malloc(sizeof(Pager));

+  pager->file_descriptor = fd;

+  pager->file_length = file_length;

+

for (uint32_t i = 0; i

-     table->pages[i] = NULL;

+     pager->pages[i] = NULL;

}

-  return table;

+

+  return pager;

}

-void free_table(Table* table) {

-  for (int i = 0; table->pages[i]; i++) {

-     free(table->pages[i]);

-  }

-  free(table);

+Table* db_open(const char* filename) {

+  Pager* pager = pager_open(filename);

+  uint32_t num_rows = pager->file_length / ROW_SIZE;

+

+  Table* table = malloc(sizeof(Table));

+  table->pager = pager;

+  table->num_rows = num_rows;

+

+  return table;

}

InputBuffer* new_input_buffer() {

@@ -142,10 +201,76 @@ void close_input_buffer(InputBuffer* input_buffer) {

free(input_buffer);

}

+void pager_flush(Pager* pager, uint32_t page_num, uint32_t size) {

+  if (pager->pages[page_num] == NULL) {

+     printf("Tried to flush null page\n");

+     exit(EXIT_FAILURE);

+  }

+

+  off_t offset = lseek(pager->file_descriptor, page_num * PAGE_SIZE,

+            SEEK_SET);

+

+  if (offset == -1) {

+     printf("Error seeking: %d\n", errno);

+     exit(EXIT_FAILURE);

+  }

+

+  ssize_t bytes_written = write(

+     pager->file_descriptor, pager->pages[page_num], size

+     );

+

+  if (bytes_written == -1) {

+     printf("Error writing: %d\n", errno);

+     exit(EXIT_FAILURE);

+  }

+}

+

+void db_close(Table* table) {

+  Pager* pager = table->pager;

+  uint32_t num_full_pages = table->num_rows / ROWS_PER_PAGE;

+

+  for (uint32_t i = 0; i

+     if (pager->pages[i] == NULL) {

+         continue;

+     }

+     pager_flush(pager, i, PAGE_SIZE);

+     free(pager->pages[i]);

+     pager->pages[i] = NULL;

+  }

+

+  // There may be a partial page to write to the end of the file

+  // This should not be needed after we switch to a B-tree

+  uint32_t num_additional_rows = table->num_rows % ROWS_PER_PAGE;

+  if (num_additional_rows > 0) {

+     uint32_t page_num = num_full_pages;

+     if (pager->pages[page_num] != NULL) {

+         pager_flush(pager, page_num, num_additional_rows * ROW_SIZE);

+         free(pager->pages[page_num]);

+         pager->pages[page_num] = NULL;

+     }

+  }

+

+  int result = close(pager->file_descriptor);

+  if (result == -1) {

+     printf("Error closing db file.\n");

+     exit(EXIT_FAILURE);

+  }

+  for (uint32_t i = 0; i

+     void* page = pager->pages[i];

+     if (page) {

+         free(page);

+         pager->pages[i] = NULL;

+     }

+  }

+

+  free(pager);

+  free(table);

+}

+

MetaCommandResult do_meta_command(InputBuffer* input_buffer, Table *table) {

if (strcmp(input_buffer->buffer, ".exit") == 0) {

close_input_buffer(input_buffer);

-    free_table(table);

+    db_close(table);

exit(EXIT_SUCCESS);

} else {

return META_COMMAND_UNRECOGNIZED_COMMAND;

@@ -182,6 +308,7 @@ PrepareResult prepare_insert(InputBuffer* input_buffer, Statement* statement) {

return PREPARE_SUCCESS;

}

+

PrepareResult prepare_statement(InputBuffer* input_buffer,

Statement* statement) {

if (strncmp(input_buffer->buffer, "insert", 6) == 0) {

@@ -227,7 +354,14 @@ ExecuteResult execute_statement(Statement* statement, Table *table) {

}

int main(int argc, char* argv[]) {

-  Table* table = new_table();

+  if (argc

+      printf("Must supply a database filename.\n");

+      exit(EXIT_FAILURE);

+  }

+

+  char* filename = argv[1];

+  Table* table = db_open(filename);

+

InputBuffer* input_buffer = new_input_buffer();

while (true) {

print_prompt();

describe 'database' do

+  before do

+    `rm -rf test.db`

+  end

+

def run_script(commands)

raw_output = nil

-    IO.popen("./db", "r+") do |pipe|

+    IO.popen("./db test.db", "r+") do |pipe|

commands.each do |command|

pipe.puts command

end

@@ -28,6 +32,27 @@ describe 'database' do

])

end

+  it 'keeps data after closing connection' do

+    result1 = run_script([

+      "insert 1 user1 person1@example.com",

+      ".exit",

+    ])

+    expect(result1).to match_array([

+      "db > Executed.",

+      "db > ",

+    ])

+

+    result2 = run_script([

+      "select",

+      ".exit",

+    ])

+    expect(result2).to match_array([

+      "db > (1, user1, person1@example.com)",

+      "Executed.",

+      "db > ",

+    ])

+  end

+

it 'prints error message when table is full' do

script = (1..1401).map do |i|

"insert #{i} user#{i} person#{i}@example.com"

mysql数据的持久化_一起做个简单的数据库(五):持久化存储相关推荐

  1. 运维mysql数据库面试题_运维面试题之数据库

    mysql篇: mysql主从复制原理? mysql的复制是基于3个线程 1.master上的binlog dump线程负责把binlog 事件传到slave 2.slave上面的IO线程负责接收bi ...

  2. mysql 数据库命令大全_常用的MySQL数据库命令大全

    飞信2017V5.6.8860.0 官方正式版 类型:聊天其它大小:69.1M语言:中文 评分:9.6 标签: 立即下载 常用的MySQL命令大全 一.连接MySQL 格式: mysql -h主机地址 ...

  3. perl mysql 数据推拉_使用Perl DBI操作MySQL的一些建议

    使用perl连接mysql,这个网上有很多案例了,一般大家都是DBI下的DBD::MySQL这个模块进行.这里做一个mask弄一个TIPS: Perl DBI MySQL的字符集为UTF8 Perl ...

  4. mysql数据库容量估算_数据库性能与容量评估

    一.数据库设计 1.表结构设计 -表中的自增列(auto_increment属性)推荐使用bigint类型 -首选使用非空的唯一键, 其次选择自增列或发号器 不使用更新频繁的列,尽量不选择字符串列,不 ...

  5. mysql 数据筛选功能_关于数据筛选的详细介绍

    这篇文章主要介绍了MYSQL 一个巧用字符函数做数据筛选的题,需要的朋友可以参考下问题描述:结构:test 有两个字段,分别是col1和col2,都是字符字段,里面的内容都是用,号分隔的三个数字,并且 ...

  6. 易语言mysql数据同步程序_易语言mssql和mysql数据自动同步源码

    易语言mssql和mysql数据自动同步源码 易语言mssql和mysql数据自动同步源码 系统结构:RefreshTask,ComputeEndTime,ComputeOneTime,Compute ...

  7. java获取mysql数据定时执行_详解SpringBoot 创建定时任务(配合数据库动态执行)...

    序言:创建定时任务非常简单,主要有两种创建方式:一.基于注解(@Scheduled) 二.基于接口(SchedulingConfigurer). 前者相信大家都很熟悉,但是实际使用中我们往往想从数据库 ...

  8. mysql数据割接_割接常见问题

    #### 一.填写zkeys平台上的数据库后无法连接,提示下图 ![](https://upload.zkeys.com/2020/12/5fddf82699c72.png?token=SLOM1X6 ...

  9. redis做mysql缓存的优点_面试官:如何保障数据库和redis缓存的一致性

    随着互联网的高速发展,使用互联网产品的人也越来越多,团队不可避免得也会面对越来越复杂的高并发业务场景(如下图),比如热点视频/文章的观看(读场景),热点视频/文章的评论,点赞等(写场景). 众所周知, ...

最新文章

  1. PCA--主成分分析(Principal components analysis)-最大方差解释
  2. C#中操作XML (节点添加,修改,删除完整版)
  3. php integer
  4. 你必须具有权限才能读取此对象_win10中随心所欲设置文件/文件夹访问权限,可以轻松做到,并不难...
  5. 形象解释Momentum
  6. Redis 多线程网络模型全面揭秘|网络硬核系列
  7. 动态更新纹理闪烁问题
  8. java设置默认参数_关于java:如何设置默认方法参数值?
  9. maven无法下载依赖问题解决
  10. Unity3D 脚本实现动画效果
  11. 北华大学c语言题库百度云,北华大学C语言题库精简打印版(全).docx
  12. 特殊符号大全,方便大家输入特殊字符用
  13. java 使用类的方式描述计算机_用JAVA设计,实现并测试一个计算机类,它包括如下内容...
  14. 为什么全网通手机联通显示无服务器,手机卡无服务怎么回事
  15. Oracle 11G安装出错(Oracle执行先决条件检查失败)
  16. python的函数导入方法
  17. LoRa节点如何以OTAA方式入网TTN服务器?
  18. thinkPHP6解析二维码
  19. Ubuntu下Anaconda创建环境及环境配置
  20. 【财经期刊FM-Radio|2021年03月29日】

热门文章

  1. opj1837 Balance(dp)
  2. Robocode教程3——Robo机器剖析
  3. YOLO 检测算法分析
  4. Android高效加载大图、多图解决方案,有效避免程序内存溢出现象
  5. wine手动安装wine-mono和wine-gecko组件
  6. PulseAudio多线程通信:pthread_cond_broadcast/pthread_cond_signal/pthread_cond_wait(九)
  7. 客户端(https)与服务器交互过程
  8. C++ 调节PCM音量
  9. 毕业5年决定人的一生-- 大家千万不要错过这篇文章
  10. ios ffmpeg h264 encode