文章目录

  • AOF简介
  • AOF定时逻辑
  • AOF 持久化过程
    • 命令传播
    • 刷新缓冲区 && 同步磁盘

AOF简介

上一篇讲的RDB是批量的插入数据,但是如果单纯使用就会有一个隐患:突然断电的时候势必要失去一部分数据,因为还没来得及轮上。

那AOF是个什么情况呢?AOF在redis运行期间,不断的将执行过的命令写入到文件中,如果redis被重启了,只要将这些命令重复执行一遍即可。

那如果单纯的使用AOF呢?占用内存就会很大了,毕竟保存一整条命令和单纯保存一块数据,孰轻孰重还是分得清的。

所以这时候怎么办?RDB做大批量的持久化,在下次RDB来之前,用AOF顶上。
此外,redis 还提供了AOF重写功能,如果一个键有多次写入,只保存最后一次写入命令。
这两个方法我们都能想到,不过是怎么实现的呢?

AOF不是不丢数据,但是由于它的时间间隔很小,所以丢失风险相较于RDB大大降低了。


AOF定时逻辑

redis 通过 serverCron 函数实现定时生成 aof 文件的任务,在上一篇RDB里面一并出现了,我就直接放相应部分源码吧:

...
/* Start a scheduled AOF rewrite if this was requested by the user while* a BGSAVE was in progress. */if (!hasActiveChildProcess() &&server.aof_rewrite_scheduled){rewriteAppendOnlyFileBackground();}/* Check if a background saving or AOF rewrite in progress terminated. */if (hasActiveChildProcess() || ldbPendingChildren()){checkChildrenDone();} else {......//执行AOF的条件:/*1、服务器开启了AOF功能2、当前AOF文件大小大于aof_rewrite_min_size3、对于上次重写后的AOF文件大小,当前AOF文件增加的空间大小所占比例已经超过了aof_rewrite_perc 配置*/if (server.aof_state == AOF_ON &&!hasActiveChildProcess() &&server.aof_rewrite_perc &&server.aof_current_size > server.aof_rewrite_min_size){long long base = server.aof_rewrite_base_size ?server.aof_rewrite_base_size : 1;long long growth = (server.aof_current_size*100/base) - 100;if (growth >= server.aof_rewrite_perc) {serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);rewriteAppendOnlyFileBackground();}}}/* AOF postponed flush: Try at every cron cycle if the slow fsync* completed. */if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);/* AOF write errors: in this case we have a buffer to flush as well and* clear the AOF error in case of success to make the DB writable again,* however to try every second is enough in case of 'hz' is set to* an higher frequency. */run_with_period(1000) {if (server.aof_last_write_status == C_ERR)flushAppendOnlyFile(0);}/* Clear the paused clients flag if needed. */clientsArePaused(); /* Don't check return value, just use the side effect.*//* Replication cron function -- used to reconnect to master,* detect transfer failures, start background RDB transfers and so forth. */run_with_period(1000) replicationCron();/* Run the Redis Cluster cron. */run_with_period(100) {if (server.cluster_enabled) clusterCron();}
...

flushAppendOnlyFile 函数用于刷新AOF缓冲区的内容,写入文件中,只有在 AOF 缓冲区操作被延迟或者写入出错时才触发。
rewriteAppendOnlyFileBackground 函数负责重写AOF文件,在beforesleep函数中触发。


AOF 持久化过程

三部曲:
命令传播、
刷新AOF缓冲区、
同步磁盘

让我们一个一个来看:


命令传播

feedAppendOnlyFile 函数负责将命令传播到 AOF 缓冲区:


void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {sds buf = sdsempty();robj *tmpargv[3];/* The DB this command was targeting is not the same as the last command* we appended. To issue a SELECT command is needed. */if (dictid != server.aof_selected_db) {char seldb[64];snprintf(seldb,sizeof(seldb),"%d",dictid);buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",(unsigned long)strlen(seldb),seldb);server.aof_selected_db = dictid;}if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||cmd->proc == expireatCommand) {/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {/* Translate SETEX/PSETEX to SET and PEXPIREAT */tmpargv[0] = createStringObject("SET",3);tmpargv[1] = argv[1];tmpargv[2] = argv[3];buf = catAppendOnlyGenericCommand(buf,3,tmpargv);decrRefCount(tmpargv[0]);buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);} else if (cmd->proc == setCommand && argc > 3) {int i;robj *exarg = NULL, *pxarg = NULL;for (i = 3; i < argc; i ++) {if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];}serverAssert(!(exarg && pxarg));if (exarg || pxarg) {/* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */buf = catAppendOnlyGenericCommand(buf,3,argv);if (exarg)buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],exarg);if (pxarg)buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],pxarg);} else {buf = catAppendOnlyGenericCommand(buf,argc,argv);}} else {/* All the other commands don't need translation or need the* same translation already operated in the command vector* for the replication itself. */buf = catAppendOnlyGenericCommand(buf,argc,argv);}/* Append to the AOF buffer. This will be flushed on disk just before* of re-entering the event loop, so before the client will get a* positive reply about the operation performed. */if (server.aof_state == AOF_ON)server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));/* If a background append only file rewriting is in progress we want to* accumulate the differences between the child DB and the current one* in a buffer, so that when the child process will do its work we* can append the differences to the new append only file. */if (server.aof_child_pid != -1)aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));sdsfree(buf);
}

对于一些带有过期时间的命令,需要进行转换。
对于其他命令,直接写入buf暂存区。
如果服务器开启AOF功能,则将buf暂存区内容写入AOF缓冲区。
如果当前存在AOF进程执行AOF重写操作,则将buf暂存区的数据写入AOF重写缓冲区。


刷新缓冲区 && 同步磁盘

void flushAppendOnlyFile(int force) {ssize_t nwritten;int sync_in_progress = 0;mstime_t latency;if (sdslen(server.aof_buf) == 0) {/* Check if we need to do fsync even the aof buffer is empty,* because previously in AOF_FSYNC_EVERYSEC mode, fsync is* called only when aof buffer is not empty, so if users* stop write commands before fsync called in one second,* the data in page cache cannot be flushed in time. */if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&server.aof_fsync_offset != server.aof_current_size &&server.unixtime > server.aof_last_fsync &&!(sync_in_progress = aofFsyncInProgress())) {goto try_fsync;} else {return;}}//检查当前是否存在后台线程正在同步磁盘if (server.aof_fsync == AOF_FSYNC_EVERYSEC)sync_in_progress = aofFsyncInProgress();//如果存在,如果磁盘同步策略为每秒同步,则延迟该AOF缓冲区刷新操作,退出函数;//如果已延迟时间超过2s,则强制刷新AOF缓冲区,函数继续向下执行。if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {/* With this append fsync policy we do background fsyncing.* If the fsync is still in progress we can try to delay* the write for a couple of seconds. */if (sync_in_progress) {if (server.aof_flush_postponed_start == 0) {/* No previous write postponing, remember that we are* postponing the flush and return. */server.aof_flush_postponed_start = server.unixtime;return;} else if (server.unixtime - server.aof_flush_postponed_start < 2) {/* We were already waiting for fsync to finish, but for less* than two seconds this is still ok. Postpone again. */return;}/* Otherwise fall trough, and go write since we can't wait* over two seconds. */server.aof_delayed_fsync++;serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");}}/* We want to perform a single write. This should be guaranteed atomic* at least if the filesystem we are writing is a real physical one.* While this will save us against the server being killed I don't think* there is much to do about the whole server stopping for power problems* or alike */if (server.aof_flush_sleep && sdslen(server.aof_buf)) {usleep(server.aof_flush_sleep);}latencyStartMonitor(latency);nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));latencyEndMonitor(latency);/* We want to capture different events for delayed writes:* when the delay happens with a pending fsync, or with a saving child* active, and when the above two conditions are missing.* We also use an additional event name to save all samples which is* useful for graphing / monitoring purposes. */if (sync_in_progress) {latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);} else if (hasActiveChildProcess()) {latencyAddSampleIfNeeded("aof-write-active-child",latency);} else {latencyAddSampleIfNeeded("aof-write-alone",latency);}latencyAddSampleIfNeeded("aof-write",latency);/* We performed the write so reset the postponed flush sentinel to zero. */server.aof_flush_postponed_start = 0;if (nwritten != (ssize_t)sdslen(server.aof_buf)) {static time_t last_write_error_log = 0;int can_log = 0;/* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {can_log = 1;last_write_error_log = server.unixtime;}/* Log the AOF write error and record the error code. */if (nwritten == -1) {if (can_log) {serverLog(LL_WARNING,"Error writing to the AOF file: %s",strerror(errno));server.aof_last_write_errno = errno;}} else {if (can_log) {serverLog(LL_WARNING,"Short write while writing to ""the AOF file: (nwritten=%lld, ""expected=%lld)",(long long)nwritten,(long long)sdslen(server.aof_buf));}if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {if (can_log) {serverLog(LL_WARNING, "Could not remove short write ""from the append-only file.  Redis may refuse ""to load the AOF the next time it starts.  ""ftruncate: %s", strerror(errno));}} else {/* If the ftruncate() succeeded we can set nwritten to* -1 since there is no longer partial data into the AOF. */nwritten = -1;}server.aof_last_write_errno = ENOSPC;}/* Handle the AOF write error. */if (server.aof_fsync == AOF_FSYNC_ALWAYS) {/* We can't recover when the fsync policy is ALWAYS since the* reply for the client is already in the output buffers, and we* have the contract with the user that on acknowledged write data* is synced on disk. */serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");exit(1);} else {/* Recover from failed write leaving data into the buffer. However* set an error to stop accepting writes as long as the error* condition is not cleared. */server.aof_last_write_status = C_ERR;/* Trim the sds buffer if there was a partial write, and there* was no way to undo it with ftruncate(2). */if (nwritten > 0) {server.aof_current_size += nwritten;sdsrange(server.aof_buf,nwritten,-1);}return; /* We'll try again on the next call... */}} else {/* Successful write(2). If AOF was in error state, restore the* OK state and log the event. */if (server.aof_last_write_status == C_ERR) {serverLog(LL_WARNING,"AOF write error looks solved, Redis can write again.");server.aof_last_write_status = C_OK;}}server.aof_current_size += nwritten;/* Re-use AOF buffer when it is small enough. The maximum comes from the* arena size of 4k minus some overhead (but is otherwise arbitrary). */if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {sdsclear(server.aof_buf);} else {sdsfree(server.aof_buf);server.aof_buf = sdsempty();}//下面属于同步磁盘范畴
try_fsync:/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are* children doing I/O in the background. */if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())return;/* Perform the fsync if needed. */if (server.aof_fsync == AOF_FSYNC_ALWAYS) {/* redis_fsync is defined as fdatasync() for Linux in order to avoid* flushing metadata. */latencyStartMonitor(latency);redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */latencyEndMonitor(latency);latencyAddSampleIfNeeded("aof-fsync-always",latency);server.aof_fsync_offset = server.aof_current_size;server.aof_last_fsync = server.unixtime;} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&server.unixtime > server.aof_last_fsync)) {if (!sync_in_progress) {aof_background_fsync(server.aof_fd);server.aof_fsync_offset = server.aof_current_size;}server.aof_last_fsync = server.unixtime;}
}

【redis源码学习】持久化机制(2):AOF相关推荐

  1. redis源码阅读-持久化之aof与aof重写详解

    aof相关配置 aof-rewrite-incremental-fsync yes # aof 开关,默认是关闭的,改为yes表示开启 appendonly no # aof的文件名,默认 appen ...

  2. redis源码阅读-持久化之RDB

    持久化介绍: redis的持久化有两种方式: rdb :可以在指定的时间间隔内生成数据集的时间点快照(point-in-time snapshot) aof : 记录redis执行的所有写操作命令 根 ...

  3. redis源码学习笔记目录

    Redis源码分析(零)学习路径笔记 Redis源码分析(一)redis.c //redis-server.c Redis源码分析(二)redis-cli.c Redis源码剖析(三)--基础数据结构 ...

  4. Redis源码学习(20),学习感悟

      最近学习Redis源码也有半个月的时间了,有不少收获也有不少感悟,今天来好好聊聊我学习的感悟. 1 发现问题   人非圣贤孰能无过,只要是人难免会犯错,回顾我之前的学习历程,其实是可以发现不少的问 ...

  5. 【Redis学习笔记】2018-05-30 Redis源码学习之Ziplist、Server

    作者:施洪宝 顺风车运营研发团队 一. 压缩列表 压缩列表是Redis的关键数据结构之一.目前已经有大量的相关资料,下面几个链接都已经对Ziplist进行了详细的介绍. http://origin.r ...

  6. 【redis源码学习】simple dynamic strings(简单动态字符串 sds)

    文章目录 接 化 sds 结构分析 基本操作 创建字符串 释放字符串 sdsMakeRoomFor 扩容 小tip:`__attribute__ ((__packed__))` 发 接 阅读源码之前, ...

  7. 【redis源码学习】redis 中的“消息队列” Stream

    文章目录 关于redis Stream Stream 结构 Stream 操作 添加消息 新增消费组 删除消息 裁剪信息流 释放消费组 查找元素 关于redis Stream redis stream ...

  8. Redis源码学习-MasterSlave的命令交互

    0. 写在前面 Version Redis2.2.2 Redis中可以支持主从结构,本文主要从master和slave的心跳机制出发(PING),分析redis的命令行交互. 在Redis中,serv ...

  9. 【Redis学习笔记】2018-06-14 Redis源码学习之sentinel

    顺风车运营研发团队 方波 sentinel是redis的高可用解决方案,由一个或多个sentinel实例组成的系统可以同时监听多组master-slave实例(后面简称一组),当发现master进入下 ...

  10. Redis源码学习(14),t_set.c 学习(二),sismember,scard,spop 命令学习

    1 sismemberCommand 1.1 方法说明   判断一个值是否属于集合的成员,成功返回1,失败返回0. 1.2 命令实践 1.3 方法源代码 void sismemberCommand(r ...

最新文章

  1. 解锁新姿势 | 如何用配置中心实现全局动态流控?
  2. zbb20180710 maven Failed to read artifact descriptor--maven
  3. MariaDB基础(二)
  4. linux怎么不更新内核,Linux升级时不升级内核的方法
  5. 量化交易初探(图文版其一)
  6. word2016取消首字母大写 带图详细讲解
  7. php自定义请求headers,php通过header发送自定义数据方法
  8. 【SQL】连接 —— 内连接、外连接、左连接、右连接、交叉连接
  9. php mysql多条件查询界面_PHP组合查询多条件查询实例代码
  10. Proteus仿真STM32F103R6输出PWM波
  11. cholesky分解java代码,实数矩阵Cholesky分解算法的C++实现
  12. xp系统怎样启动打印机服务器,WinXP系统打印后台程序服务没有运行的解决办法...
  13. login主页面+接口+依赖
  14. python代理ip怎么写_python代理ip怎么写
  15. python 调用ocx
  16. linux查看某个端口的流量_linux中查看网卡流量六种方法
  17. (附源码)ssm+mysql+基于ssm技术的校自助阅览室的设计与实现 毕业设计242326
  18. 宜信davinci搭建
  19. 腾讯被爆内测配送机器人,与阿里顺丰直面物流竞争!
  20. php中数学如何二维数组,如何用数学知识理解多维数组

热门文章

  1. 分享几个免费IP地址查询接口(API)
  2. 共线性分析软件MCScanX安装、报错解决方法及使用
  3. 基于Nginx,搭建HLS(HTTP Live Streaming)server,点播直播
  4. gtp怎么安装系统_gpt格式硬盘如何安装win7系统教程
  5. 2007年沪市上涨前20与后20
  6. python读取文件时的相对路径
  7. 第5关:类与对象练习------Java面向对象 - 类与对象
  8. 脑科学和类脑智能技术综述学习笔记
  9. 程序猿必看的10部黑客电影
  10. 矩阵标准型的系数是特征值吗_矩阵分解术,不得不从高斯说起