Redis源码剖析(十二)--客户端和服务器
客户端属性
客户端的状态保存在结构体 redisClient 中,下面给出redisClient的部分属性:
typedef struct redisClient{// 套接字描述符int fd; // 客户端状态标志int flags;// 输入缓冲区 sds querybuf;// 命令参数robj** argv;int argc;// 命令的实现函数struct redisCommand *cmd;// 固定输出缓冲区char buf[REDIS_REPLY_CHUNK_BYTES];int bufpos;// 可变大小输出缓冲区list* reply;// ...... };
fd 属性:客户端使用的套接字描述符,伪客户端的fd属性为 -1,普通客户端的fd属性为大于0的整数。
flags 属性:客户端状态。在主从服务器复制时,主服务器和从服务器互为客户端,REDIS_MASTER 标志表示客户端代表的是一个主服务器,REDIS_SLAVE 标志表示客户端代表的是一个从服务器。
querybuf 属性:保存客户端发送的命令请求。
argv、argc 属性:对客户端的命令请求分析,得到的命令参数及命令参数的个数。
cmd 属性:服务器从客户端发送的命令请求中分析得到argv、argc参数后,会根据argv[0]的值,去查找该命令对应的实现函数,并使cmd指针指向该实现函数。
buf、bufpos 属性:bufpos属性记录了buf 数组已使用的字节数量。
reply 属性:当buf 数组空间不够用时,服务器会使用 reply 可变大小缓冲区。
命令请求
命令的执行过程
服务器在接收到命令后,会将命令以对象的形式保存在服务器client的参数列表 robj** argv 中,因此服务器执行命令请求时,服务器已经读入了一套命令参数保存在参数列表中。执行命令的过程对应的函数是processCommand(),部分源码如下:
int processCommand(redisClient *c) {// 查找命令,并进行命令合法性检查,以及命令参数个数检查c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);if (!c->cmd) {// 没找到指定的命令 flagTransaction(c);addReplyErrorFormat(c,"unknown command '%s'",(char*)c->argv[0]->ptr);return REDIS_OK;} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||(c->argc < -c->cmd->arity)) {// 参数个数错误 flagTransaction(c);addReplyErrorFormat(c,"wrong number of arguments for '%s' command",c->cmd->name);return REDIS_OK;}// ....../* Exec the command */if (c->flags & REDIS_MULTI &&c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&c->cmd->proc != multiCommand && c->cmd->proc != watchCommand){// 在事务上下文中// 除 EXEC 、 DISCARD 、 MULTI 和 WATCH 命令之外// 其他所有命令都会被入队到事务队列中 queueMultiCommand(c);addReply(c,shared.queued);} else {// 执行命令 call(c,REDIS_CALL_FULL);c->woff = server.master_repl_offset;// 处理那些解除了阻塞的键if (listLength(server.ready_keys))handleClientsBlockedOnLists();}return REDIS_OK; }
我们总结出执行命令的大致过程:
查找命令。对应的代码是:c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr)
执行命令前的准备
执行命令。对应代码是:call(c,REDIS_CALL_FULL)
查找命令
lookupCommand 函数是对 dictFetchValue 函数的封装。dictFetchValue 函数会从 server.commands 字典中查找 name 命令。这个保存命令表的字典,键是命令的名称,值是命令表的地址。服务器初始化时会创建一张命令表。命令表部分代码如下:
struct redisCommand redisCommandTable[] = {{"get",getCommand,2,"r",0,NULL,1,1,1,0,0},{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},{"setnx",setnxCommand,3,"wm",0,NULL,1,1,1,0,0},// ...... };
执行命令
执行命令调用了call(c, CMD_CALL_FULL)函数,该函数是执行命令的核心。该函数其实是对 c->cmd->proc(c) 的封装, proc 指向命令的实现函数。
void call(redisClient *c, int flags) {// start 记录命令开始执行的时间long long dirty, start, duration;// 记录命令开始执行前的 FLAGint client_old_flags = c->flags; // 如果可以的话,将命令发送到 MONITORif (listLength(server.monitors) &&!server.loading &&!(c->cmd->flags & REDIS_CMD_SKIP_MONITOR)){replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);}/* Call the command. */c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);redisOpArrayInit(&server.also_propagate);// 保留旧 dirty 计数器值dirty = server.dirty;// 计算命令开始执行的时间start = ustime();// 执行实现函数c->cmd->proc(c);// 计算命令执行耗费的时间duration = ustime()-start;// 计算命令执行之后的 dirty 值dirty = server.dirty-dirty; // 不将从 Lua 中发出的命令放入 SLOWLOG ,也不进行统计if (server.loading && c->flags & REDIS_LUA_CLIENT)flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS); // 如果调用者是 Lua ,那么根据命令 FLAG 和客户端 FLAG// 打开传播(propagate)标志if (c->flags & REDIS_LUA_CLIENT && server.lua_caller) {if (c->flags & REDIS_FORCE_REPL)server.lua_caller->flags |= REDIS_FORCE_REPL;if (c->flags & REDIS_FORCE_AOF)server.lua_caller->flags |= REDIS_FORCE_AOF;} // 如果有需要,将命令放到 SLOWLOG 里面if (flags & REDIS_CALL_SLOWLOG && c->cmd->proc != execCommand)slowlogPushEntryIfNeeded(c->argv,c->argc,duration);// 更新命令的统计信息if (flags & REDIS_CALL_STATS) {c->cmd->microseconds += duration;c->cmd->calls++;} // 将命令复制到 AOF 和 slave 节点if (flags & REDIS_CALL_PROPAGATE) {int flags = REDIS_PROPAGATE_NONE;// 强制 REPL 传播if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;// 强制 AOF 传播if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;// 如果数据库有被修改,那么启用 REPL 和 AOF 传播if (dirty)flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);if (flags != REDIS_PROPAGATE_NONE)propagate(c->cmd,c->db->id,c->argv,c->argc,flags);} // 将客户端的 FLAG 恢复到命令执行之前// 因为 call 可能会递归执行c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);c->flags |= client_old_flags & (REDIS_FORCE_AOF|REDIS_FORCE_REPL); // 传播额外的命令if (server.also_propagate.numops) {int j;redisOp *rop;for (j = 0; j < server.also_propagate.numops; j++) {rop = &server.also_propagate.ops[j];propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target);}redisOpArrayFree(&server.also_propagate);}server.stat_numcommands++; }
执行命令 c->cmd->proc(c) 就相当于执行了命令实现的函数,然后会在执行完成后,由这些函数产生相应的命令回复,根据回复的大小,会将回复保存在输出缓冲区 buf 或可变输出缓冲区链表 reply 中。
maxmemory策略
Redis 服务器对内存使用会有一个server.maxmemory的限制,如果超过这个限制,就要通过删除一些键空间来释放一些内存,具体函数对应freeMemoryIfNeeded()。释放内存时,可以指定不同的策略。策略保存在maxmemory_policy中,可以指定以下的几个值:
#define MAXMEMORY_VOLATILE_LRU 0 #define MAXMEMORY_VOLATILE_TTL 1 #define MAXMEMORY_VOLATILE_RANDOM 2 #define MAXMEMORY_ALLKEYS_LRU 3 #define MAXMEMORY_ALLKEYS_RANDOM 4 #define MAXMEMORY_NO_EVICTION 5
可以看出主要分为三种:
- LRU:优先删除最近最少使用的键。
- TTL:优先删除生存时间最短的键。
- RANDOM:随机删除。
而ALLKEYS和VOLATILE的不同之处就是要确定是从数据库的键值对字典还是过期键字典中删除。
int freeMemoryIfNeeded(void) {size_t mem_used, mem_tofree, mem_freed;int slaves = listLength(server.slaves);// 计算出 Redis 目前占用的内存总数,但有两个方面的内存不会计算在内:// 1)从服务器的输出缓冲区的内存// 2)AOF 缓冲区的内存mem_used = zmalloc_used_memory();if (slaves) {listIter li;listNode *ln;listRewind(server.slaves,&li);while((ln = listNext(&li))) {redisClient *slave = listNodeValue(ln);unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave);if (obuf_bytes > mem_used)mem_used = 0;elsemem_used -= obuf_bytes;}}if (server.aof_state != REDIS_AOF_OFF) {mem_used -= sdslen(server.aof_buf);mem_used -= aofRewriteBufferSize();} // 如果目前使用的内存大小比设置的 maxmemory 要小,那么无须执行进一步操作if (mem_used <= server.maxmemory) return REDIS_OK;// 如果占用内存比 maxmemory 要大,但是 maxmemory 策略为不淘汰,那么直接返回if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION)return REDIS_ERR; /* We need to free memory, but policy forbids. */// 计算需要释放多少字节的内存mem_tofree = mem_used - server.maxmemory;// 初始化已释放内存的字节数为 0mem_freed = 0;// 根据 maxmemory 策略,// 遍历字典,释放内存并记录被释放内存的字节数while (mem_freed < mem_tofree) {int j, k, keys_freed = 0;// 遍历所有字典for (j = 0; j < server.dbnum; j++) {long bestval = 0; /* just to prevent warning */sds bestkey = NULL;dictEntry *de;redisDb *db = server.db+j;dict *dict;if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM){// 如果策略是 allkeys-lru 或者 allkeys-random // 那么淘汰的目标为所有数据库键dict = server.db[j].dict;} else {// 如果策略是 volatile-lru 、 volatile-random 或者 volatile-ttl // 那么淘汰的目标为带过期时间的数据库键dict = server.db[j].expires;}// 跳过空字典if (dictSize(dict) == 0) continue;/* volatile-random and allkeys-random policy */// 如果使用的是随机策略,那么从目标字典中随机选出键if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM ||server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_RANDOM){de = dictGetRandomKey(dict);bestkey = dictGetKey(de);} // 如果使用的是 LRU 策略,// 那么从一集 sample 键中选出 IDLE 时间最长的那个键else if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU){struct evictionPoolEntry *pool = db->eviction_pool;while(bestkey == NULL) {evictionPoolPopulate(dict, db->dict, db->eviction_pool);/* Go backward from best to worst element to evict. */for (k = REDIS_EVICTION_POOL_SIZE-1; k >= 0; k--) {if (pool[k].key == NULL) continue;de = dictFind(dict,pool[k].key);/* Remove the entry from the pool. */sdsfree(pool[k].key);/* Shift all elements on its right to left. */memmove(pool+k,pool+k+1,sizeof(pool[0])*(REDIS_EVICTION_POOL_SIZE-k-1));/* Clear the element on the right which is empty* since we shifted one position to the left. */pool[REDIS_EVICTION_POOL_SIZE-1].key = NULL;pool[REDIS_EVICTION_POOL_SIZE-1].idle = 0;/* If the key exists, is our pick. Otherwise it is* a ghost and we need to try the next element. */if (de) {bestkey = dictGetKey(de);break;} else {/* Ghost... */continue;}}}}// 策略为 volatile-ttl ,从一集 sample 键中选出过期时间距离当前时间最接近的键else if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_TTL) {for (k = 0; k < server.maxmemory_samples; k++) {sds thiskey;long thisval;de = dictGetRandomKey(dict);thiskey = dictGetKey(de);thisval = (long) dictGetVal(de);/* Expire sooner (minor expire unix timestamp) is better* candidate for deletion */if (bestkey == NULL || thisval < bestval) {bestkey = thiskey;bestval = thisval;}}} // 删除被选中的键if (bestkey) {long long delta;robj *keyobj = createStringObject(bestkey,sdslen(bestkey));propagateExpire(db,keyobj);// 计算删除键所释放的内存数量delta = (long long) zmalloc_used_memory();dbDelete(db,keyobj);delta -= (long long) zmalloc_used_memory();mem_freed += delta;// 对淘汰键的计数器增一server.stat_evictedkeys++;notifyKeyspaceEvent(REDIS_NOTIFY_EVICTED, "evicted",keyobj, db->id);decrRefCount(keyobj);keys_freed++; if (slaves) flushSlavesOutputBuffers();}}if (!keys_freed) return REDIS_ERR; /* nothing to free... */}return REDIS_OK; }
转载于:https://www.cnblogs.com/lizhimin123/p/10215368.html
Redis源码剖析(十二)--客户端和服务器相关推荐
- Redis源码剖析(二)io多路复用函数及事件驱动流程
作为服务器监听客户端请求的方法,io多路复用起到了不可忽略的作用,利用io复用监听的方法叫Reactor模式,在前一篇也提到过,使用io复用是现在常用的提高并发性的方法,而且效果显著. 通常io多路复 ...
- grpc-go源码剖析十二之平衡器基本原理介绍
本小节主要介绍平衡器的相关原理: 例如: 如何实现一个平衡器? 知道如何实现平衡器后,那么如何将平衡器注册到grpc框架里呢? 将平衡器注册到grpc框架内部后,grpc框架是如何创建 ...
- redis源码剖析(十五)——客户端思维导图整理
redis源码剖析(十五)--客户端执行逻辑结构整理 加载略慢
- Redis源码剖析和注释(十六)---- Redis输入输出的抽象(rio)
Redis源码剖析和注释(十六)---- Redis输入输出的抽象(rio) . https://blog.csdn.net/men_wen/article/details/71131550 Redi ...
- Redis源码剖析之GEO——Redis是如何高效检索地理位置的?
Redis GEO 用做存储地理位置信息,并对存储的信息进行操作.通过geo相关的命令,可以很容易在redis中存储和使用经纬度坐标信息.Redis中提供的Geo命令有如下几个: geoadd:添加经 ...
- 【Redis源码剖析】 - Redis持久化之RDB
原创作品,转载请标明:http://blog.csdn.net/xiejingfa/article/details/51553370 Redis源码剖析系列文章汇总:传送门 Redis是一个高效的内存 ...
- Redis源码剖析之内存淘汰策略(Evict)
文章目录 何为Evict 如何Evict Redis中的Evict策略 源码剖析 LRU具体实现 LFU具体实现 LFU计数器增长 LFU计数器衰减 evict执行过程 evict何时执行 evict ...
- STL源码剖析学习二:空间配置器(allocator)
STL源码剖析学习二:空间配置器(allocator) 标准接口: vlaue_type pointer const_pointer reference const_reference size_ty ...
- 价值4500的国际版多语言点赞抖音分享点赞任务平台源码(十二种语言)
介绍: 平台会员分享给我的,他自己搭建成功了,测试可用!我就不测试了,需要的拿! 九种语言 :西班牙语,泰语.日语,印度尼西亚语言.越南语言.英文.繁体中文,简体中文,印度语 前台支持更换5种颜色风格 ...
- 【Redis源码剖析】 - Redis内置数据结构之压缩列表ziplist
在前面的一篇文章[Redis源码剖析] - Redis内置数据结构之双向链表中,我们介绍了Redis封装的一种"传统"双向链表list,分别使用prev.next指针来指向当前节点 ...
最新文章
- crontab 备份mysql数据库_crontab定时备份mySQL数据库
- CXF的webservice接口中字符串参数中文问题
- 【错误记录】Android 应用运行报错 ( You need to use a Theme.AppCompat theme (or descendant) with this activity. )
- gantt project 使用
- display block 无法显示_display:inline-block产生的问题
- 【JFreeChart】JFreeChart—输出区域图
- PAT (Basic Level) - 1025 反转链表(模拟)
- 【Java正则表达式】正则基本语法、使用方式(分组、替换、分割)、简单爬虫基础
- 下班以后看什么,决定你人生的高度
- mysql5.7 gtid问题_MySQL 5.7.5: 新语法WAIT_FOR_EXECUTED_GTID_SET 及存在的问题-阿里云开发者社区...
- xcode armv6 armv7 armv7s arm64架构分析
- Android 8.1 Launcher3实现动态指针时钟
- java把行政区划放到一个节点树形中
- NLP 前置知识2 —— 深度学习算法
- 码支付系统 无授权—个人免签约支付系统二维码收款即时到账源码 –
- 计算机屏幕出现蓝色条,我的电脑屏幕中间为何会有一道蓝色线条
- 软件开发搞定计算机组成原理:计算篇
- sqlserver对数据进行加密、解密
- 同学录 — 二叉树实现
- 【1049】晶晶赴约会