一、BIO 之后台IO操作
BIO : Background I/O service for Redis.

负责我们需要在后台执行的操作。现在redis的版本中只有两类的操作,后台的close及fsync 系统调用。

将数据从内存写入磁盘这点非常重要,即fdatasync、因此就需要调用 fsync() 把文件数据和文件元信息写入强制刷新到磁盘中,这个速度是比较慢的、而其调用频度又会很高,所以有必要不能因IO而堵住现有的流程操作。

REDIS 允许有三种不同的策略:

<span style="font-size:18px;">/* Append only defines */
// 让kernel后台线程去做  这个线程默认可能是30秒去做一次
#define AOF_FSYNC_NO 0
// 每次有write操作到AOF里 就会调用fsync
// 每秒调用一次fsync


<span style="font-size:18px;">/* 刷新缓存区的内容到磁盘中 */
void flushAppendOnlyFile(int force) {int sync_in_progress = 0;if (server.aof_fsync == AOF_FSYNC_EVERYSEC) // 这个判定是否后台正在执行 fsync sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;// 这里根据时间进行判定if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {/* With this append fsync policy we do background fsyncing.* If the fsync is still in progress we can try to delay* the write for a couple of seconds. */if (sync_in_progress) {if (server.aof_flush_postponed_start == 0) {/* No previous write postponing, remember that we are* postponing the flush and return. */server.aof_flush_postponed_start = server.unixtime;return;} else if (server.unixtime - server.aof_flush_postponed_start < 2) {/* We were already waiting for fsync to finish, but for less* than two seconds this is still ok. Postpone again. */return;}/* Otherwise fall trough, and go write since we can't wait* over two seconds. */server.aof_delayed_fsync++;serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");}}/* We want to perform a single write. This should be guaranteed atomic* at least if the filesystem we are writing is a real physical one.* While this will save us against the server being killed I don't think* there is much to do about the whole server stopping for power problems* or alike *///在进行写入操作的时候,还监听了延迟、write函数由于aof_buf一般不大很快就能返回,而阻塞的是fdatasync导致write等待latencyStartMonitor(latency);nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));latencyEndMonitor(latency);// 调用偏移量server.aof_current_size += nwritten;// 优化内存重复使用性/* Re-use AOF buffer when it is small enough. The maximum comes from the* arena size of 4k minus some overhead (but is otherwise arbitrary). */if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {sdsclear(server.aof_buf);} else {sdsfree(server.aof_buf);server.aof_buf = sdsempty();}/* Perform the fsync if needed. */if (server.aof_fsync == AOF_FSYNC_ALWAYS) {/* aof_fsync is defined as fdatasync() for Linux in order to avoid* flushing metadata. */latencyStartMonitor(latency);aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */latencyEndMonitor(latency);latencyAddSampleIfNeeded("aof-fsync-always",latency);server.aof_last_fsync = server.unixtime;} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&server.unixtime > server.aof_last_fsync)) {// 交由后台执行fsync操作if (!sync_in_progress) aof_background_fsync(server.aof_fd);server.aof_last_fsync = server.unixtime;}


<span style="font-size:18px;">/* Background job opcodes */
#define BIO_CLOSE_FILE    0 /* Deferred close(2) syscall. */
#define BIO_AOF_FSYNC     1 /* Deferred AOF fsync. */
#define BIO_NUM_OPS       2</span>

主要两类作业类型:1.close 2.aof_fsync

<span style="font-size:18px;">//使用互斥量+条件变量,作为线程的保护条件
static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS];
static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS];//两类作业的队列、工作与挂起队列
static list *bio_jobs[REDIS_BIO_NUM_OPS];
static unsigned long long bio_pending[REDIS_BIO_NUM_OPS];/* This structure represents a background Job. It is only used locally to this* file as the API does not expose the internals at all. */
struct bio_job {time_t time; /* Time at which the job was created. *//* Job specific arguments pointers. If we need to pass more than three* arguments we can just pass a pointer to a structure or alike. */void *arg1, *arg2, *arg3;
};// 初始化相应变量并建立后台线程bioProcessBackgroundJobs
/* Initialize the background system, spawning the thread. */
void bioInit(void) {pthread_attr_t attr;pthread_t thread;size_t stacksize;int j;/* Initialization of state vars and objects */for (j = 0; j < BIO_NUM_OPS; j++) {pthread_mutex_init(&bio_mutex[j],NULL);pthread_cond_init(&bio_condvar[j],NULL);bio_jobs[j] = listCreate();bio_pending[j] = 0;}/* Set the stack size as by default it may be small in some system */pthread_attr_init(&attr);pthread_attr_getstacksize(&attr,&stacksize);if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;pthread_attr_setstacksize(&attr, stacksize);/* Ready to spawn our threads. We use the single argument the thread* function accepts in order to pass the job ID the thread is* responsible of. */for (j = 0; j < BIO_NUM_OPS; j++) {void *arg = (void*)(unsigned long) j;if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");exit(1);}bio_threads[j] = thread;}
}// 创建后台作业、并将作业挂成链表
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {struct bio_job *job = zmalloc(sizeof(*job));job->time = time(NULL);job->arg1 = arg1;job->arg2 = arg2;job->arg3 = arg3;pthread_mutex_lock(&bio_mutex[type]);// 作业加入到队尾listAddNodeTail(bio_jobs[type],job);// 挂起的作业队列数目加1bio_pending[type]++;pthread_cond_signal(&bio_condvar[type]);pthread_mutex_unlock(&bio_mutex[type]);
}// 后台处理线程
void *bioProcessBackgroundJobs(void *arg) {struct bio_job *job;unsigned long type = (unsigned long) arg;sigset_t sigset;/* Check that the type is within the right interval. */if (type >= BIO_NUM_OPS) {serverLog(LL_WARNING,"Warning: bio thread started with wrong type %lu",type);return NULL;}/* Make the thread killable at any time, so that bioKillThreads()* can work reliably. */pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);pthread_mutex_lock(&bio_mutex[type]);/* Block SIGALRM so we are sure that only the main thread will* receive the watchdog signal. */sigemptyset(&sigset);sigaddset(&sigset, SIGALRM);if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))serverLog(LL_WARNING,"Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));while(1) {listNode *ln;// 是否有作业未做,如果无任何作业则wait/* The loop always starts with the lock hold. */if (listLength(bio_jobs[type]) == 0) {pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);continue;}// 取出链表头作业结点/* Pop the job from the queue. */ln = listFirst(bio_jobs[type]);job = ln->value;/* It is now possible to unlock the background system as we know have* a stand alone job structure to process.*/pthread_mutex_unlock(&bio_mutex[type]);// 真正执行/* Process the job accordingly to its type. */if (type == BIO_CLOSE_FILE) {close((long)job->arg1);} else if (type == BIO_AOF_FSYNC) {aof_fsync((long)job->arg1);} else {serverPanic("Wrong job type in bioProcessBackgroundJobs().");}zfree(job);/* Lock again before reiterating the loop, if there are no longer* jobs to process we'll block again in pthread_cond_wait(). */pthread_mutex_lock(&bio_mutex[type]);// 从链表中删除已完成的作业listDelNode(bio_jobs[type],ln);// 挂起的作业队列数目减1bio_pending[type]--;}

1、针对耗时的 close及fsync 进行另起线程后台执行、可以避免主线程阻塞问题。

二、RIO (统一buffer、file、socket不同对象IO操作)

   read、write、tell、flush操作。比如,对于file rio对象,底层通过fwrite函数完成写操作,通过fread
先看一下 struct rio 结构:

<span style="font-size:18px;">// 系统IO操作的封装
struct _rio {/* Backend functions.* Since this functions do not tolerate short writes or reads the return* value is simplified to: zero on error, non zero on complete success. */// 数据流的读方法size_t (*read)(struct _rio *, void *buf, size_t len);// 数据流的写方法size_t (*write)(struct _rio *, const void *buf, size_t len);// 获取当前的读写偏移量off_t (*tell)(struct _rio *);// flush操作int (*flush)(struct _rio *);/* The update_cksum method if not NULL is used to compute the checksum of* all the data that was read or written so far. The method should be* designed so that can be called with the current checksum, and the buf* and len fields pointing to the new block of data to add to the checksum* computation. */// 更新校验和void (*update_cksum)(struct _rio *, const void *buf, size_t len);/* The current checksum */// 当前校验和uint64_t cksum;/* number of bytes read or written */// 已读或已写的字节数size_t processed_bytes;/* maximum single read or write chunk size */// 每次读或写操作的最大字节数size_t max_processing_chunk;/* Backend-specific vars. */// 不同的io变量union {/* In-memory buffer target. */// 内存缓冲区buffer结构体(buffer指针及偏移量)struct {sds ptr;off_t pos;} buffer;/* Stdio file pointer target. */// 文件结构体(文件句柄)struct {FILE *fp;// 最后一个fsync后写入的字节数off_t buffered; /* Bytes written since last fsync. */// 多少字节进行一次fsync操作off_t autosync; /* fsync after 'autosync' bytes written. */} file;/* Multiple FDs target (used to write to N sockets). */// 封装了多个文件描述符结构体(写同样的数据到多个socket fd中)struct {// 文件描述符数组int *fds;       /* File descriptors. */int *state;     /* Error state of each fd. 0 (if ok) or errno. */// 文件描述符的个数int numfds;// 偏移量off_t pos;// 缓冲区sds buf;} fdset;} io;


<span style="font-size:18px;">static inline size_t rioWrite(rio *r, const void *buf, size_t len) {while (len) {//判断当前操作字节长度是否超过最大长度size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;//写入新的数据时,更新校验和if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);//执行写方法if (r->write(r,buf,bytes_to_write) == 0)return 0;buf = (char*)buf + bytes_to_write;len -= bytes_to_write;//操作字节数增加  r->processed_bytes += bytes_to_write;}return 1;
}static inline size_t rioRead(rio *r, void *buf, size_t len) {while (len) {//判断当前操作字节长度是否超过最大长度 size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;//读数据方法  if (r->read(r,buf,bytes_to_read) == 0)return 0;//读数据时,更新校验和  if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read);buf = (char*)buf + bytes_to_read;len -= bytes_to_read;r->processed_bytes += bytes_to_read;}return 1;


下面继续分析 buffer IO和File IO及Socket IO.
rioFdsetIO使用多个socket fd写数据的IO操作

<span style="font-size:18px;">static const rio rioBufferIO = {rioBufferRead,rioBufferWrite,rioBufferTell,rioBufferFlush,NULL,           /* update_checksum */0,              /* current checksum */0,              /* bytes read or written */0,              /* read/write chunk size */{ { NULL, 0 } } /* union for io-specific vars */
};static const rio rioFileIO = {rioFileRead,rioFileWrite,rioFileTell,rioFileFlush,NULL,           /* update_checksum */0,              /* current checksum */0,              /* bytes read or written */0,              /* read/write chunk size */{ { NULL, 0 } } /* union for io-specific vars */
};static const rio rioFdsetIO = {rioFdsetRead,rioFdsetWrite,rioFdsetTell,rioFdsetFlush,NULL,           /* update_checksum */0,              /* current checksum */0,              /* bytes read or written */0,              /* read/write chunk size */{ { NULL, 0 } } /* union for io-specific vars */

以上的几个函数都很简单、稍微看下就能明白意思,就不细讲了。这里说下file write函数,有个细节是

<span style="font-size:18px;">static size_t rioFileWrite(rio *r, const void *buf, size_t len) {size_t retval;retval = fwrite(buf,len,1,r->io.file.fp);r->io.file.buffered += len;//判读是否需要同步if (r->io.file.autosync &&r->io.file.buffered >= r->io.file.autosync){fflush(r->io.file.fp);aof_fsync(fileno(r->io.file.fp));r->io.file.buffered = 0;}return retval;


<span style="font-size:18px;">// 以【"*<count>\r\n"】 的形式将count以字符串的格式写入rio对象中,返回写入的字节数。
size_t rioWriteBulkCount(rio *r, char prefix, int count);// 以【"$<count>\r\n<payload>\r\n"】格式往rio对象中写入二进制安全字符串。
size_t rioWriteBulkString(rio *r, const char *buf, size_t len);// 以【"$<count>\r\n<payload>\r\n"】的格式往rio对象中写入long long类型的值。
size_t rioWriteBulkLongLong(rio *r, long long l);// 以【"$<count>\r\n<payload>\r\n"】的格式往rio对象中写入double类型的值。
size_t rioWriteBulkDouble(rio *r, double d);


Redis 之BIO与RIO相关推荐

  1. Redis源码剖析和注释(十六)---- Redis输入输出的抽象(rio)

    Redis源码剖析和注释(十六)---- Redis输入输出的抽象(rio) . https://blog.csdn.net/men_wen/article/details/71131550 Redi ...

  2. 【Redis源码剖析】 - Redis IO操作之rio

    原创作品,转载请标明:http://blog.csdn.net/xiejingfa/article/details/51433696 Redis源码剖析系列文章汇总:传送门 Reids内部封装了一个I ...

  3. Redis源码解析——前言

    今天开启Redis源码的阅读之旅.对于一些没有接触过开源代码分析的同学来说,可能这是一件很麻烦的事.但是我总觉得做一件事,不管有多大多难,我们首先要在战略上蔑视它,但是要在战术上重视它.除了一些高大上 ...

  4. Redis数据持久化机制AOF原理分析一---转

    http://blog.csdn.net/acceptedxukai/article/details/18136903 http://blog.csdn.net/acceptedxukai/artic ...

  5. Redis的搭建和Redis的集群搭建

    1.Redis的官网:https://redis.io/      Redis的测试网站:http://try.redis.io/ 2.参考博客:https://www.cnblogs.com/maf ...

  6. Redis数据持久化机制AOF原理分析二

    本文所引用的源码全部来自Redis2.8.2版本. Redis AOF数据持久化机制的实现相关代码是redis.c, redis.h, aof.c, bio.c, rio.c, config.c 在阅 ...

  7. 草稿——记录一下计网学习问题以及redis学习日志

    计算机网络最近看到了TCP的三次握手四次挥手详解,为什么需要三次握手?二次,四次为什么不行?为什么需要四次挥手?三次握手的各个状态是什么?四次挥手的各个状态是什么?close_wait有什么意义?ti ...

  8. redis3.0.2安装

    1:下载http://download.redis.io/releases/redis-3.0.2.tar.gz [root@web02 jifeng]# wget http://download.r ...

  9. redis bio线程任务队列

    bio部分是redis中负责后台进行文件关闭,aof文件缓冲区同步到磁盘,清理对象三种任务的部分. /* Background job opcodes */ #define BIO_CLOSE_FIL ...


  1. Linux centos7 利用公钥,私钥实现免密登录SSH
  2. cocoapos错误信息
  3. Linux C 算法——查找
  4. lintcode 中等题:Single number III 落单的数III
  5. php 语义解析,[扩展推荐] PHP 语义化版本(SemVer)辅助库
  6. xadmin在Django 1.11中的使用及中英文切换
  7. 08-09 性能测试--CPU分析
  8. BitNami-Redmine1.1.0安装和VisualSVN-Server配合使用
  9. SGU 201 Non Absorbing DFA (DP)
  10. java字节对齐原则_C struct 中字节对齐问题
  11. 分子重构技术_分子影像重构精准未来:百名专家云端共筑 One MI 生态圈
  12. 基于深度学习的目标检测(object detection)—— rcnn、fast-rcnn、faster-rcnn、SSD、YOLO
  13. 引用 你唯一能把握的是变成最好的自己
  14. unity实现游戏帧同步之确定性物理引擎
  15. 数据分析师岗位需求分析
  16. R/S方法计算Hurst指数
  17. 智能名片小程序开发文档概要
  18. 更改vs code的界面颜色 vscode修改界面颜色及风格(中文英文界面都有) 手动设置vs code的界面背景颜色
  19. 关于telnet逛bbs论坛
  20. 关于OLAP数仓,这大概是史上最全面的总结!(万字干货)


  1. 杰理ac18芯片_AC6905B/AC6905C杰理JL24脚四合一蓝牙芯片
  2. java学习笔记2(datawhale教程):运算符和表达式、流程控制、数组
  3. PYthon 素数,质数的密码
  4. 如何学习数值模拟(一)
  5. 软考高级,考网规还是考项管?
  6. Easier UVM Coding Guidelines / 便捷UVM 编码指南
  7. linux wol 戴尔工作站,linux 通过wol远程开机【转】
  8. It做形式主语和宾语
  9. 第二章:图像基本操作 1-计算机眼中的图像
  10. 解决fatal error C1859: “Debug\thread5.pch”意外的预编译头错误的方法