0. 写在前面

Version Redis2.2.2

Redis中可以支持主从结构,本文主要从master和slave的心跳机制出发(PING),分析redis的命令行交互。

在Redis中,server为每个连接建立一个redisClient数据对象,来描述对应的连接。其中,redisClient为命令交互设置了缓冲区。querybuf用于存储客户端送过来的命令,buf和reply是用于应答的缓冲。querybuf是在文件事件readQueryFromClient中被填充,每次填充的最大字节数默认为1024B。而应答缓冲区是由addReply()函数填充,并由文件事件sendReplyToClient中发送给客户端。具体数据流如图1所示。MasterPorcess与SlaveProcess进行命令交互。其中,蓝色矩形框代表函数,白色矩形框代表数据,曲线描述数据流,折线描述数据间的从属关系。

图1. Master&Slave交互的数据流(蓝色矩形框代表函数,白色矩形框代表数据,曲线描述数据流,折线描述数据间的从属关系)

1. 相关数据结构

typedef struct redisClient {int fd;                   //connect fd...sds querybuf;            //命令缓冲区,由readQueryFromClient()事件进行填充(sds equals to char*)int argc;              //for command;记录参数个数robj **argv;         //for command;记录命令行参数int reqtype;            //命令解析协议:INLINE or MULTIBULK...time_t lastinteraction; /* 最近交互时间 */...list *reply;           //Replay object list/* Response buffer */char buf[REDIS_REPLY_CHUNK_BYTES]; //Reply buffer,由addReply()函数进行填充int bufpos;             //记录buf已填充的长度int sentlen;           //Replay阶段,记录当前buf已发送了多少字节
} redisClient;struct redisServer {...list *clients;dict *commands;             /* Command table hahs table */...list *slaves, *monitors;    //Master : slave链表char neterr[ANET_ERR_LEN];aeEventLoop *el;         //Event listint cronloops;              //ServerCorn 执行次数...redisClient *master;    //Slave :记录 master 的连接信息的clientint replstate;          //Slave :当前的状态...
};struct redisCommand readonlyCommandTable[] = {...{"sync",syncCommand,1,0,NULL,0,0,0},...{"ping",pingCommand,1,0,NULL,0,0,0},...
}

2. query的读取和命令的解析

从图1可以看出,命令交互数据query的读取是在文件事件readQueryFromClient中填充到c->querybuf中。之后,querybuf由函数processInputBuffer进行命令的解析。命令的解析过程如图2所示。在函数processInputBuffer中,将缓存与querybuf中的所有命令(命令间按\n\r分隔)进行解析。之后,查询命令hashtabe查找相关命令函数。最后调用相应命令hander执行命令。

图2.querybuf的解析

具体代码分析如下:

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {redisClient *c = (redisClient*) privdata;char buf[REDIS_IOBUF_LEN];int nread;REDIS_NOTUSED(el);REDIS_NOTUSED(mask);nread = read(fd, buf, REDIS_IOBUF_LEN);...check...if (nread) {c->querybuf = sdscatlen(c->querybuf,buf,nread);c->lastinteraction = time(NULL);//更新时间戳} else {return;}processInputBuffer(c);//处理client传输过来的数据
}void processInputBuffer(redisClient *c) {/* 执行querybub中的所有命令*/while(sdslen(c->querybuf)) {...check.../*判定命令的解析协议 */if (!c->reqtype) {if (c->querybuf[0] == '*') {c->reqtype = REDIS_REQ_MULTIBULK;} else {c->reqtype = REDIS_REQ_INLINE;//按行解析}}if (c->reqtype == REDIS_REQ_INLINE) {/*processInlineBuffer: 1. 取出c->querybuf起始端到\r\n位置的字符串,更新c->querybuf2. 将取出的字符串按照“ ”空格进行分段解析,得到命令及其参数格式为: argc,*argv[],其中argv[0]为命令,argv[1~argc-1]为参数*/if (processInlineBuffer(c) != REDIS_OK) break;} else if (c->reqtype == REDIS_REQ_MULTIBULK) {...}/* Multibulk processing could see a <= 0 length. */if (c->argc == 0) {resetClient(c);} else {/* Only reset the client when the command was executed. */if (processCommand(c) == REDIS_OK)   //执行命令resetClient(c);}}
}/* If this function gets called we already read a whole* command, argments are in the client argv/argc fields.* processCommand() execute the command or prepare the* server for a bulk read from the client.*/
int processCommand(redisClient *c) {struct redisCommand *cmd;.../* Now lookup the command and check ASAP about trivial error conditions* such wrong arity, bad command name and so forth. */cmd = lookupCommand(c->argv[0]->ptr);...check.../* Exec the command */if (c->flags & REDIS_MULTI &&cmd->proc != execCommand && cmd->proc != discardCommand &&cmd->proc != multiCommand && cmd->proc != watchCommand){queueMultiCommand(c,cmd);addReply(c,shared.queued);} else {if (server.vm_enabled && server.vm_max_threads > 0 &&blockClientOnSwappedKeys(c,cmd)) return REDIS_ERR;call(c,cmd);    //执行命令}return REDIS_OK;
}/* Call() is the core of Redis execution of a command */
void call(redisClient *c, struct redisCommand *cmd) {long long dirty;dirty = server.dirty;cmd->proc(c);     //执行命令dirty = server.dirty-dirty;if (server.appendonly && dirty)feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);if ((dirty || cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&listLength(server.slaves))replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);if (listLength(server.monitors))replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);server.stat_numcommands++;
}

3. 具体命令的执行(ping命令)

其中,addReply将相关命令执行结果放入client的reply缓冲区中。reply缓冲区的发送时机是在事件sendReplyToClient中进行。

#define REDIS_STRING 0
shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
//{"ping",pingCommand,1,0,NULL,0,0,0}
void pingCommand(redisClient *c) {addReply(c,shared.pong); //ping的回复是pong,打乒乓,呵呵
}//将命令执行的返回结构写入c->buf 或者 c->reply
void addReply(redisClient *c, robj *obj) {if (_installWriteEvent(c) != REDIS_OK) return;//创建event sendReplyToClientredisAssert(!server.vm_enabled || obj->storage == REDIS_VM_MEMORY);/* This is an important place where we can avoid copy-on-write* when there is a saving child running, avoiding touching the* refcount field of the object if it's not needed.** If the encoding is RAW and there is room in the static buffer* we'll be able to send the object to the client without* messing with its page. */if (obj->encoding == REDIS_ENCODING_RAW) {if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)_addReplyObjectToList(c,obj);} else {/* FIXME: convert the long into string and use _addReplyToBuffer()* instead of calling getDecodedObject. As this place in the* code is too performance critical. */obj = getDecodedObject(obj);if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)_addReplyObjectToList(c,obj);decrRefCount(obj);}
}

4. reply缓冲区数据的发送

将c->buf 和 c->reply中的数据发送到客户端(slave or master)。在每次文件事件中发送所有的reply缓冲区中的数据。

void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {redisClient *c = privdata;int nwritten = 0, totwritten = 0, objlen;robj *o;REDIS_NOTUSED(el);REDIS_NOTUSED(mask);while(c->bufpos > 0 || listLength(c->reply)) {if (c->bufpos > 0) {//发送c->buf中的数据if (c->flags & REDIS_MASTER) {/* Don't reply to a master */nwritten = c->bufpos - c->sentlen;} else {nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);if (nwritten <= 0) break;}c->sentlen += nwritten;totwritten += nwritten;/* If the buffer was sent, set bufpos to zero to continue with* the remainder of the reply. */if (c->sentlen == c->bufpos) {c->bufpos = 0;c->sentlen = 0;}} else {//发送c->reply中的数据o = listNodeValue(listFirst(c->reply));objlen = sdslen(o->ptr);if (objlen == 0) {listDelNode(c->reply,listFirst(c->reply));continue;}if (c->flags & REDIS_MASTER) {/* Don't reply to a master */nwritten = objlen - c->sentlen;} else {nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);if (nwritten <= 0) break;}c->sentlen += nwritten;totwritten += nwritten;/* If we fully sent the object on head go to the next one */if (c->sentlen == objlen) {listDelNode(c->reply,listFirst(c->reply));c->sentlen = 0;}}/* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT* bytes, in a single threaded server it's a good idea to serve* other clients as well, even if a very large request comes from* super fast link that is always able to accept data (in real world* scenario think about 'KEYS *' against the loopback interfae) */if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;}...check...if (totwritten > 0) c->lastinteraction = time(NULL);//??Why delete file event of write ? ?if (listLength(c->reply) == 0) {c->sentlen = 0;aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);/* Close connection after entire reply has been sent. */if (c->flags & REDIS_CLOSE_AFTER_REPLY) freeClient(c);}
}

5. 总结

命令行交互过程中,1.为每个连接有相应的数据进行描述(redisClient),这样便于连接的管理。2.命令行交互中,引入命令缓冲区querybuf,这样可以延时处理命令,这在事件轮询机制中,是至关重要的。

原文链接 http://blog.csdn.net/ordeder/article/details/16105345

Redis源码学习-MasterSlave的命令交互相关推荐

  1. Redis源码学习(20),学习感悟

      最近学习Redis源码也有半个月的时间了,有不少收获也有不少感悟,今天来好好聊聊我学习的感悟. 1 发现问题   人非圣贤孰能无过,只要是人难免会犯错,回顾我之前的学习历程,其实是可以发现不少的问 ...

  2. redis源码学习笔记目录

    Redis源码分析(零)学习路径笔记 Redis源码分析(一)redis.c //redis-server.c Redis源码分析(二)redis-cli.c Redis源码剖析(三)--基础数据结构 ...

  3. 【Redis学习笔记】2018-05-30 Redis源码学习之Ziplist、Server

    作者:施洪宝 顺风车运营研发团队 一. 压缩列表 压缩列表是Redis的关键数据结构之一.目前已经有大量的相关资料,下面几个链接都已经对Ziplist进行了详细的介绍. http://origin.r ...

  4. 【redis源码学习】simple dynamic strings(简单动态字符串 sds)

    文章目录 接 化 sds 结构分析 基本操作 创建字符串 释放字符串 sdsMakeRoomFor 扩容 小tip:`__attribute__ ((__packed__))` 发 接 阅读源码之前, ...

  5. Redis源码学习(13),t_set.c 学习(一),sadd,srem 命令学习

      学习完 t_string.c.t_list.c.t_hash.c文件后,现在开始学习 t_set.c 的代码,从文件名可以看到是相关集合相关命令的代码文件.总共5种数据结构,我们已经学习到第4个了 ...

  6. Redis源码学习(6),t_list.c 学习(一),rpush、lpush命令实现学习

    前言   大体学习完t_string.c的代码,我们正式进入下一个文件的学习,这一次我们学习的是t_list.c文件,从文件名我们可以知道这是一个关于列表的相关命令的源代码.   在学习列表命令之前, ...

  7. Redis源码学习(10),t_hash.c 学习(一),hset、hmset 命令学习

       学习完 t_string.c.t_list.c文件后,现在开始学习 t_hash.c 的代码,从文件名可以看到是相关hash的相关命令代码. 1 hsetCommand 1.1 方法说明    ...

  8. Redis源码学习(14),t_set.c 学习(二),sismember,scard,spop 命令学习

    1 sismemberCommand 1.1 方法说明   判断一个值是否属于集合的成员,成功返回1,失败返回0. 1.2 命令实践 1.3 方法源代码 void sismemberCommand(r ...

  9. 【Redis学习笔记】2018-06-14 Redis源码学习之sentinel

    顺风车运营研发团队 方波 sentinel是redis的高可用解决方案,由一个或多个sentinel实例组成的系统可以同时监听多组master-slave实例(后面简称一组),当发现master进入下 ...

最新文章

  1. Find函数使用语法
  2. SAP MM 维护公司间STO报错-No delivery type defined for supplying plant NMI1 and document type NB-
  3. python增量爬虫_python爬虫Scrapy框架之增量式爬虫
  4. 2021暑假实习-SSM超市积分管理系统-day04笔记
  5. Linux常用的命令及操作技巧
  6. 马化腾定义腾讯是普通公司,这波重新定义“普通”可还行......
  7. countif函数比较两列不同_VLOOKUP函数批量查找,这么长的公式你可以写出来,立马加薪...
  8. ROP_return to dl-resolve学习笔记
  9. 索罗斯说,我投机了,但我不觉得我做错了什么,我做的都是合法的。
  10. 如何免费下载外文文献
  11. 从辉煌走向消亡(下)——小型机之王DEC公司
  12. vue 利用科大讯飞实现实时语音转写
  13. python+selenium实现网页全屏截图
  14. 「2019纪中集训Day12」解题报告
  15. ppt流程图字体太小_【PPT】几种处理字体的小方法,让PPT中的字体更好看
  16. linux so lazyload,linux函数深入探索——open函数打开文件是否将文件内容加载到内存空间...
  17. IBM硬件默认的管理地址
  18. 正版免费图片编辑处理软件下载_图片处理软件
  19. 机动车 合格证 二维码 解密
  20. 使用 vue-lic3 创建 vue 项目

热门文章

  1. 光流 | 基于Matlab实现Lucas-Kanade方法:方法2(附源代码)
  2. Android studio | From Zero To One ——XML文件中的单行注释与多行注释
  3. 在虚拟机装一个linux系统
  4. C语言中的二级指针和二维数组问题
  5. python restful django_如何使用Django / Python从RESTful Web服务中使用XML?
  6. uni微信小程序 下载图片跟文字_微信小程序:图片与文字无法居中 最后解决的方法是——...
  7. 怎么制作铁闸门_红茶拿铁
  8. access开发精要(4)-参考与查阅
  9. 趣学python3(8)-循环语句(2)
  10. CVPR 2022 接收结果出炉!录用 2067 篇,接收数量上升24%(附最新论文下载)