前段时间给小伙伴分享redis,顺带又把redis撸了一遍了,对其源码,又有了比较深入的了解。(ps: 分享的文章再丰富下再放出来)。

数据结构

我们先看下redis 5.0的代码。本次讲解主要是zset中的跳表。压缩列表不做讲解

/*** 跳跃表节点*/
typedef struct zskiplistNode {//member对象sds ele;//权重分值double score;//后退指针struct zskiplistNode *backward;//层级描述struct zskiplistLevel {//前进指针struct zskiplistNode *forward;//跨越节点的数量unsigned long span;} level[];
} zskiplistNode;
/*** zset的数据结构跳跃表*/
typedef struct zskiplist {//头尾节点指针struct zskiplistNode *header, *tail;//节点数量unsigned long length;//最大层数int level;
} zskiplist;/*** 跳表结构的zset*/
typedef struct zset {//kv形式,存储所有的member和对应的scoredict *dict;//跳跃表zskiplist *zsl;
} zset;

zadd添加数据流程

我们从源码zadd看下zset命令(精简后的源码),在t_zset.c中

void zaddGenericCommand(client *c, int flags) {//不存在,就创建if (zobj == NULL) {/*** 根据redis的配置,如果有序集合不使用ziplist存储或者第一次插入元素的个数大于设置的ziplist最大长度,则使用跳表*/if (server.zset_max_ziplist_entries == 0 ||server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr)){//这里创建了一个score为0,层级为64的元素为null的 头节点zobj = createZsetObject();} else {zobj = createZsetZiplistObject();}//插入entry到hash表dbAdd(c->db,key,zobj);} else {//存在,校验类型,不是zset,报错if (zobj->type != OBJ_ZSET) {addReply(c,shared.wrongtypeerr);goto cleanup;}}//遍历所有的<element,score>for (j = 0; j < elements; j++) {double newscore;score = scores[j];int retflags = flags;//获取元素数据的指针ele = c->argv[scoreidx+1+j*2]->ptr;//添加元素到zset,在zsetAdd 方法里进行了类型区分int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);if (retval == 0) {addReplyError(c,nanerr);goto cleanup;}//根据操作类型计数if (retflags & ZADD_ADDED) added++;if (retflags & ZADD_UPDATED) updated++;if (!(retflags & ZADD_NOP)) processed++;score = newscore;}//计数server.dirty += (added+updated);
}
/*** zset添加元素* @param zobj  zset的存储结构* @param score 添加的分值* @param ele  元素对象* @param flags* @param newscore 添加成功后的分值* @return*/
int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {//存在先删后插}else if (!xx) {//超过配置长度,就将压缩链表转到了跳跃表if (zzlLength(zobj->ptr)+1 > server.zset_max_ziplist_entries ||sdslen(ele) > server.zset_max_ziplist_value ||!ziplistSafeToAdd(zobj->ptr, sdslen(ele))){zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);} else {zobj->ptr = zzlInsert(zobj->ptr,ele,score);if (newscore) *newscore = score;*flags |= ZADD_ADDED;return 1;}}}if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {//zset 指针zset *zs = zobj->ptr;zskiplistNode *znode;dictEntry *de;//从zset的全局hash表中查找对应的key,找到说明已经存在,如果需要更新就操作,不需要就返回de = dictFind(zs->dict,ele);if (de != NULL) {if (score != curscore) {//这里先删除,然后重新插入,单线程保证了一致性,最后插入还是走的zslInsertznode = zslUpdateScore(zs->zsl,curscore,ele,score);//更新全局hash表里的权重分值dictGetVal(de) = &znode->score; /* Update score ptr. */*flags |= ZADD_UPDATED;}            }else if (!xx) {//将元素压缩成一个紧凑型的sdsele = sdsdup(ele);//插入元素znode = zslInsert(zs->zsl,score,ele);//将元素插入到对应的hash表中serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);*flags |= ZADD_ADDED;if (newscore) *newscore = score;return 1;}}
}

通过源码:

在插入的元素的时候逻辑如下:

  • 如果第一次为空,根据元素的个数创建zset的数据结构,默认是创建ziplist
  • 当使用压缩链表ziplist的时候

跳跃表的创建与插入

我们重点看下跳跃表的操作

/*** 创建一个跳跃表(具体实现)*/
zskiplist *zslCreate(void) {int j;zskiplist *zsl;zsl = zmalloc(sizeof(*zsl));zsl->level = 1;zsl->length = 0;// header是一个权重分值为0,元素为NULL的对象zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);//新创建的跳跃表的header是一个64层级的空表for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {zsl->header->level[j].forward = NULL;zsl->header->level[j].span = 0;}zsl->header->backward = NULL;zsl->tail = NULL;return zsl;
}
/*** 跳表结构插入一条数据* @param zsl 从zset上获取到跳跃表* @param score 权重分值* @param ele 元素* @return*/
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {/*** update 保存对应层级小于插入权重分值的前一个节点,如果没有为header*   新添加层级保存的是跳跃表的header指针* x 表示zskiplistNode节点指针*/zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;/*** 每一层对应到update对应层级那个位置的跨度*/unsigned int rank[ZSKIPLIST_MAXLEVEL];int i, level;serverAssert(!isnan(score));//最开始为头节点x = zsl->header;//逆序遍历当前的所有层级,找到新插入权重分值每一层左侧的数据for (i = zsl->level-1; i >= 0; i--) {/* store rank that is crossed to reach the insert position *///最上层 rank为0,否则为i+1(相当于逆序了)rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];/***  前驱指针存在,*   并且(当前指针对应的分值小于插入分值 或者(当前分值等于插入分值 并且现有元素和插入元素不相同))*  比如当前权重分值为 20,跨度5,插入权重分值为30 ,或者 权重分值都为20,但是元素长度不相同(分值相同的话看元素长度大小,小的在前)*  继续往下一个节点走,会记录下满足条件的跨度*  通过这块,可以看到在zset里是根据分数权重,然后根据元素的长度大小升序排序*/while (x->level[i].forward &&(x->level[i].forward->score < score ||(x->level[i].forward->score == score &&sdscmp(x->level[i].forward->ele,ele) < 0))){//将符合条件的层级跨度收集起来rank[i] += x->level[i].span;//链表往下走x = x->level[i].forward;}//将每一层的要插入值的最近一个节点更新到update数组里update[i] = x;}/*** 随机层级*   1层级的概率为 100%;*   2层级的概率为 1/4*   3层级的概率为 1/4 * 1/4* 后续每增加一层级的概率都是指数级上升*/level = zslRandomLevel();// 扩容层级,随机出来的层级> 当前层级if (level > zsl->level) {//这块增加的可能1层,也可能多层,最多(64-当前层级)for (i = zsl->level; i < level; i++) {//新添加的层级rank都为0rank[i] = 0;//新添加层级取的是zskplist的header对应的指针update[i] = zsl->header;//新添加层级的跨度就是元素的总数update[i]->level[i].span = zsl->length;}//更新层级数zsl->level = level;}//给新插入的元素和权重分值创建zskiplistNode,层级为刚随机出来的层级x = zslCreateNode(level,score,ele);/*** 把新插入的节点,插入到每一层级中* 更新每一层级的链表结构*  并将新插入节点对应层级的前驱指针和跨度维护进去*/for (i = 0; i < level; i++) {//链表插入节点x->level[i].forward = update[i]->level[i].forward;/***   新的层级update[i]为 header节点*/update[i]->level[i].forward = x;/* update span covered by update[i] as x is inserted here *///计算每一层级的跨度并更新进去x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);update[i]->level[i].span = (rank[0] - rank[i]) + 1;}/* increment span for untouched levels *//*** 对于没有达到的层级,增加1*/for (i = level; i < zsl->level; i++) {update[i]->level[i].span++;}//新插入节点的回退指针为最底层的前一个节点x->backward = (update[0] == zsl->header) ? NULL : update[0];if (x->level[0].forward)x->level[0].forward->backward = x;elsezsl->tail = x;zsl->length++;return x;
}
/*** 随机层级*   1层级的概率为 100%;*   2层级的概率为 1/4*   3层级的概率为 1/4 * 1/4* 后续每增加一层级的概率都是指数级上升* @return*/
int zslRandomLevel(void) {int level = 1;/*** 完全靠随机, 0xFFFF = 65535 ,  ZSKIPLIST_P = 0.25* 如果随机每次随机出来都小于 0.25*65535  level都加1,直到随机出大于*/while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))level += 1;return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

总结一下:

  • 在操作zset的时候,如果zset为null,则根据redis配置创建一个数据结构为跳跃表或者ziplist结构的zset

    • 在创建zset跳跃表结构的时候,会创建一个64层级,权重分值为0,元素为NUll的zskiplistNode作为头节点(这个节点从了当头节点,其他的分值什么的都没有任何意义)
    • header节点只是一个位置标识,新插入的节点在level[0]层都会在header后形成一个双向链表
    • hader节点不会进入双向链表(一会看图)
  • 通过zslInsert将元素插入到跳跃表中
    • ①:先找到所有层级中在插入元素分值左侧的元素,并插入到update[]里,然后把跨度填到rank[]中
    • ②:随机一个层级出来zslRandomLevel(),最小层级为1
    • ③:如果随机的层级>原来的层级,扩容update,新增加的层级为header节点,rank都为0
    • ④:创建元素
    • ⑤:链表插入,以update[]对应层级的元素为起始点插入元素
    • ⑥:如果随机的层级比原来的层级小,上面的层级跨度都得+1
    • ⑦:针对最底层也就是level[0] 需要做双向链表的引入
      • 这个时候,如果前面的节点是header节点,不做回退指向
      • 除了level[0]是一个双向链表
      • 其他层级都是一个单向链表
    • 通过上面
      • 跳跃表不是一个平衡的树
      • 极端情况下,跳跃表可能退化为链表,查询复杂度由O(1)降到O(n)
      • 业务元素都是在header节点后形成的链表
      • 在score相同的情况下,根据元素的长度排序,谁短谁在前
  • 插入成功后放入对应zset的 hash表中,key为元素,value为score
    • 所以能O(1)的速度拿到元素的对应的权重分值

上图

ADD添加对应的元素

# 语法
ZADD key score member [[score member] [score member] …]
# 添加
zadd yxkong 50 'a' 60 'b' 75 'c' 88 'd' 90 'e' 100 'f'

ZRANGEBYSCORE 获取分值在某个区间的元素(以跳跃表为例)

ZRANGEBYSCORE key min max WITHSCORES limit offset num
# min 可以是具体的数值,也可以是-inf(负无穷),也可以是(10
# max 可以是具体的数值,也可以是+inf(正无穷)  也可以是(30
# WITHSCORES 返回元素带score
# limit 限制返回的数量
# offset 从哪个索引开始
# num 返回的数量
如:
office:0>ZRANGEBYSCORE yxkong 55  80 WITHSCORES 1)  "b"2)  "60"3)  "c"4)  "75"

查看下源码

/***  获取指定score区间的元素* @param c* @param reverse  是否取反,0表示不取反,1表示取反*/
void genericZrangebyscoreCommand(client *c, int reverse) {//将分值区间解析到range中if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != C_OK) {addReplyError(c,"min or max is not a float");return;}if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {......} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {//最终都是找链表上的起始点if (reverse) {//查找终点  80  50ln = zslLastInRange(zsl,&range);} else {//查找起点 比如  50 80ln = zslFirstInRange(zsl,&range);}//遍历链表,如果有limit就递减,0为假while (ln && limit--) {/* Abort when the node is no longer in range. *///如果获取到的对象权重分值,已经不在范围内了,直接breakif (reverse) {if (!zslValueGteMin(ln->score,&range)) break;} else {if (!zslValueLteMax(ln->score,&range)) break;}rangelen++;//添加到回复缓冲区addReplyBulkCBuffer(c,ln->ele,sdslen(ln->ele));if (withscores) {//需要带权重分值,将权重分值添加到回复缓冲区addReplyDouble(c,ln->score);}/* Move to next node *///就是正反序遍历链表if (reverse) {//回退,永远是level[0]ln = ln->backward;} else {//最底层前进ln = ln->level[0].forward;}}        }
}/*** 获取第一个分值所在的节点* @param zsl* @param range  50 ~ 80* @return*/
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {zskiplistNode *x;int i;if (!zslIsInRange(zsl,range)) return NULL;x = zsl->header;//从上到下遍历所有的层级,定位到最小的元素for (i = zsl->level-1; i >= 0; i--) {/* Go forward while *OUT* of range. *///定位最小元素所在的位置while (x->level[i].forward &&!zslValueGteMin(x->level[i].forward->score,range))x = x->level[i].forward;}x = x->level[0].forward;serverAssert(x != NULL);/* Check if score <= max. */if (!zslValueLteMax(x->score,range)) return NULL;return x;
}

查找过程

  • ①:找链表的开始位置:通过zsl.level 获取最大的层级,然后倒序遍历层级,然后找到元素的开始或结束位置
  • ②:通过开始位置遍历链表
    • 如果有数量限制limit>0,递减limit查找,直到limit=0为止
    • 在取元素的时候,每次都会判断新遍历的元素的socre是否超范围了

阅读了redis的源码,redis好多地方都是用的近似算法,LFU、LRU、淘汰策略、以及这次的zset,跳跃表的索引也是近似,一但出现了极端情况,跳跃表就直接退化为了链表。

redis系列文章

redis源码阅读-入门篇

redis源码阅读二-终于把redis的启动流程搞明白了

redis源码阅读三-终于把主线任务执行搞明白了

redis源码阅读四-我把redis6里的io多线程执行流程梳理明白了

redis源码阅读五-为什么大量过期key会阻塞redis?

redis源码六-redis中的缓存淘汰策略处理分析

redis源码阅读-之哨兵流程

redis源码阅读-持久化之RDB

阅读redis源码的时候一些c知识

阅读redis持久化RDB源码的时候一些c知识

linux中的文件描述符与套接字socket

redis中的IO多路复用select和epoll

Reactor模式详解及redis如何使用

redis的key过期了还能取出来?

本文是Redis源码剖析系列博文,有想深入学习Redis的同学,欢迎star和关注;
Redis中文注解版:https://github.com/yxkong/redis/tree/5.0
如果觉得本文对你有用,欢迎一键三连;
同时可以关注微信公众号5ycode获取第一时间的更新哦;

redis源码阅读-zset相关推荐

  1. Redis源码阅读-Adlist双向链表

    Redis源码阅读-链表部分- 链表数据结构在Adlist.h   Adlist.c Redis的链表是双向链表,内部定义了一个迭代器. 双向链表的函数主要是链表创建.删除.节点插入.头插入.尾插入. ...

  2. redis源码阅读-持久化之RDB

    持久化介绍: redis的持久化有两种方式: rdb :可以在指定的时间间隔内生成数据集的时间点快照(point-in-time snapshot) aof : 记录redis执行的所有写操作命令 根 ...

  3. redis源码阅读-持久化之aof与aof重写详解

    aof相关配置 aof-rewrite-incremental-fsync yes # aof 开关,默认是关闭的,改为yes表示开启 appendonly no # aof的文件名,默认 appen ...

  4. redis 源码阅读

    双向链表(adlist.h/adlist.c) - redis 源码解析 1.0 documentation (redissrc.readthedocs.io) 如何阅读 Redis 源码? - bl ...

  5. Redis源码阅读,从入门到放弃

    作为后端工程师,我们在面试和工作中都会用到 Redis,特别是大型互联网公司面试时,不仅要求面试者能简单使用 Redis,还要求懂 Redis 源码层面的实现原理,具备解决常见问题的能力.可以说,熟练 ...

  6. Redis源码阅读01-读了一下redis启动流程涉及的源码我都读了个啥

    阅读源码是学习一门技术的必经之路,经过1周左右的c语言入门学习,我就开始硬读redis的源码了.因为公司的多版本的改造,所以源码就选择redis6.x的最高版本redis6.2.7. 在阅读源码前,首 ...

  7. redis源码阅读(1)

    redis 是c 编写的,首先看下redis 代码目录结构(对应版本3.25): 开发相关的放在deps下面: 主要代码放置在deps和src下面,utils 下面放置的是rb 脚本 首先看下src ...

  8. Redis源码阅读笔记(1)——简单动态字符串sds实现原理

    首先,sds即simple dynamic string,redis实现这个的时候使用了一个技巧,并且C99将其收录为标准,即柔性数组成员(flexible array member),参考资料见这里 ...

  9. redis源码阅读--hashTable

    为什么80%的码农都做不了架构师?>>>    公司有个项目用到hash,同事从redis的代码里抠了hashTable的源码出来,最近抽空看了一下,设计还是很精妙的. 现在把阅读的 ...

最新文章

  1. 一个小型网站多少台服务器,小型企业局域网需要服务器的价格在多少左右(40台机器)...
  2. centos 配置redis
  3. OpenGL HDR渲染
  4. 36 | 套路篇:怎么评估系统的网络性能?
  5. nv4_disp.dll 蓝屏
  6. Android 系统(71)---Android系统build.prop文件生成过程
  7. android 轻量级工具,轻量级文件粉碎工具(File Shredder Tool) v1.0 免费版
  8. C#中英文字符串固定长度换行
  9. 《探索需求》——阅读笔记三
  10. ASP.NET Core使用Session
  11. Windows7主题包:Aero Pure 0.2 更像Win8
  12. 【百度LOGO权限】三天时间从获取LOGO权限到审核通过
  13. linux7开放3306端口,CentOS 7 开放3306端口
  14. 风险偏好情绪有所改善,非美低位反弹
  15. 仙人掌圆方树学习笔记
  16. 学习笔记10----学成在线案例
  17. Heavy Pitch趣味漫画风格字体 for mac
  18. Matlab——行星轨道问题
  19. Java 性能调优的技巧有哪些?
  20. three.js 后期处理通道postprocessing

热门文章

  1. 小程序学习:bindtap和catchtap,wx:for和wx:for-item
  2. Open CV入门教程--电子书高清免费下载
  3. Mac|关于mac如何卸载mysql数据库超级简单
  4. 凡人如何让自己的工作独一无二
  5. 疯狂的程序员-第六十四章
  6. java实现冒泡排序+图解冒泡排序+代码实现+代码解析(java)
  7. 苏嵌实训——day17
  8. zsy后台管理系统-界面
  9. 小男孩与苹果树的故事(中英互译)
  10. 创建时间Time类,并实例化对象访问测试