文章目录

  • 前言
  • 一、Radix Tree 是什么?
  • 二、Radix Tree 数据结构
    • 1. 结构
    • 2. 非压缩节点
    • 3. 压缩节点
    • 4. 图解Radix Tree
  • 三、源码分析
    • 1. 查询
    • 2. 插入
    • 3. 删除
    • 4. 遍历
  • 总结

前言

本文参考源码版本为 redis6.2

前缀树是字符串查找时,经常使用的一种数据结构,能够在一个字符串集合中快速查找到某个字符串,如下图所示:

              (f) ""\(o) "f"\(o) "fo"/  \(t)    (b) "foo"/      \"foot" (e)     (a) "foob"/           \"foote" (r)         (r) "fooba"/               \"footer" []               [] "foobar"

由于树中每个节点只存储字符串中的一个字符,故而有时会造成空间的浪费。

Rax 的出现就是为了解决这一问题。Redis 对于 Rax 的解释为 A radix tree implement,基数树的一种实现。

在前缀树中,如果一系列单字符节点之间的分支连接是唯一的,那么这些单字符节点就可以合并成一个节点,而这种结构的树,就正是 Radix Tree,也被称为基数树

Rax中不仅可以存储字符串,同时还可以为这个字符串设置一个值,也就是key-value。


一、Radix Tree 是什么?

Radix Tree 是属于前缀树的一种类型。前缀树也称为 Trie Tree,其特点是,保存在树上的每个 key 会被拆分成单字符,然后逐一保存在树上的节点中。

前缀树的根节点不保存任何字符,而除了根节点以外的其他节点,每个节点只保存一个字符。当把从根节点到当前节点的路径上的字符拼接在一起时,就可以得到相应 key 的值了。

不同的是,rax 在前缀树上做了一些优化,对上图做一些改变:

                  ["foo"] ""|[t   b] "foo"/     \"foot" ("er")    ("ar") "foob"/          \"footer" []          [] "foobar"

如果不理解,继续往下看~

二、Radix Tree 数据结构

1. 结构

raxNode定义:


typedef struct raxNode {uint32_t iskey:1;     //节点是否包含keyuint32_t isnull:1;    //节点的值是否为NULLuint32_t iscompr:1;   //节点是否被压缩uint32_t size:29;     //节点大小unsigned char data[]; //节点的实际存储数据
} raxNode;

该结构中的成员变量包括 4 个元数据,这四个元数据的含义分别如下。

  • iskey:表示从 Radix Tree 的根节点到当前节点路径上的字符组成的字符串,是否表示了一个完整的 key。如果是的话,那么 iskey 的值为 1。否则,iskey 的值为 0。不过,这里需要注意的是,当前节点所表示的 key,并不包含该节点自身的内容。
  • isnull:表示当前节点是否为空节点。如果当前节点是空节点,那么该节点就不需要为指向 value 的指针分配内存空间了。
  • iscompr:表示当前节点是非压缩节点,还是压缩节点。
  • size:表示当前节点的大小,具体值会根据节点是压缩节点还是非压缩节点而不同。如果当前节点是压缩节点,该值表示压缩数据的长度;如果是非压缩节点,该值表示该节点指向的子节点个数。

这 4 个元数据就对应了压缩节点非压缩节点头部的 HDR,其中,iskey、isnull 和 iscompr 分别用 1 bit 表示,而 size 占用 29 bit

另外,从 raxNode 结构体中,可以看到,除了元数据,该结构体中还有 char 类型数组 data。需要知道的是,data 是用来保存实际数据的。不过,这里保存的数据会根据当前节点的类型而有所不同:

  • 对于非压缩节点来说,data 数组包括子节点对应的字符、指向子节点的指针,以及节点表示 key 时对应的 value 指针;
  • 对于压缩节点来说,data 数组包括子节点对应的合并字符串、指向子节点的指针,以及节点为 key 时的 value 指针。

在 raxNode 的实现中,无论是非压缩节点还是压缩节点,具有两个特点:

  • 它们所代表的 key,是从根节点到当前节点路径上的字符串,但并不包含当前节点;
  • 它们本身就已经包含了子节点代表的字符或合并字符串。而对于它们的子节点来说,也都属于非压缩或压缩节点,所以,子节点本身又会保存,子节点的子节点所代表的字符或合并字符串。

而这两个特点给 Radix Tree 实际保存数据时的结构,有两个特征。

  • 首先,Radix Tree 非叶子节点,要么是压缩节点,只指向单个子节点,要么是非压缩节点,指向多个子节点,但每个子节点只表示一个字符。所以,非叶子节点无法同时指向表示单个字符的子节点和表示合并字符串的子节点。
  • 其次,对于 Radix Tree 的叶子节点来说,因为它没有子节点,所以,Redis 会用一个不包含子节点指针的 raxNode 节点来表示叶子节点,也就是说,叶子节点的 raxNode 元数据 size 为 0,没有子节点指针。如果叶子节点代表了一个 key,那么它的 raxNode 中是会保存这个 key 的 value 指针的。

如果你觉得晦涩不易懂,坚持,继续往下看~

2. 非压缩节点

这类节点会包含多个指向不同子节点的指针,以及多个子节点所对应的字符

同时,如果从根节点到一个非压缩节点的路径上的字符串,已经对应了 Radix Tree 中保存的一个 key,那么这个非压缩节点中还包含了指向这个 key 对应的 value 的指针。

结合 raxNode 结构来看看,如果是一个非压缩 node ,当我们有size个字符(bytes),就会有相对应size 个指向子节点 raxNode 的指针;

需要注意的是,字符不是存在子节点,而是存在于父节点中(可以思考下, 为什么不存在子节点中?)。

[header iscompr=0][abc][a-ptr][b-ptr][c-ptr](value-ptr?)

3. 压缩节点

这类节点会包含一个指向子节点的指针,以及子节点所代表的合并的字符串

和非压缩节点类似,如果从根节点到一个压缩节点的路径上的字符串,已经对应了 Radix Tree 中保存的一个 key,那么,这个压缩节点中还包含指向这个 key 对应的 value 的指针。

注意,压缩节点仅有一个子节点

[header iscompr=1][abc][c-ptr](value-ptr?)

4. 图解Radix Tree

假设Radix Tree树中包含以下几个字符串 “foo”, “foobar” 和 “footer”;如果node节点代表rax树中的一个key,就写在 [] 里面,反之写在 () 里面。

              (f) ""\(o) "f"\(o) "fo"\[t   b] "foo"/     \"foot" (e)     (a) "foob"/         \"foote" (r)         (r) "fooba"/             \"footer" []             [] "foobar"

然而,这里有个常见优化点,对于连续只有单个子节点的node可以压缩成一个 「压缩节点」,因此,上面的树形图变成了下面这种:

                  ["foo"] ""|[t   b] "foo"/     \"foot" ("er")    ("ar") "foob"/          \"footer" []          [] "foobar"

这便是Radix Tree的模型图了。

不过,这种树形图实现上却是有点麻烦,比如 当字符串 “first” 要插入上面的 Radix Tree 中,这里就涉及到节点切割了,因为此时 “foo” 就不再是一个公共前缀了,最终树形图如下:

                    (f) ""/(i o) "f"/   \"fi"  ("rst")  (o) "fo"/        \"first" []       [t   b] "foo"/     \"foot" ("er")    ("ar") "foob"/          \"footer" []          [] "foobar"

三、源码分析

1. 查询

rax提供了查找key的接口raxFind,该接口用于获取key对应的value值,定义如下:

/* Find a key in the rax, returns raxNotFound special void pointer value* if the item was not found, otherwise the value associated with the* item is returned. */
void *raxFind(rax *rax, unsigned char *s, size_t len) {raxNode *h;int splitpos = 0;size_t i = raxLowWalk(rax,s,len,&h,NULL,&splitpos,NULL);if (i != len || (h->iscompr && splitpos != 0) || !h->iskey)return raxNotFound;return raxGetData(h);
}

其中,该方法的作用有是,在rax中查找长度为 len 的字符串 s (s为rax中的key),找到之后返回 key 对应的 value。raxLowWalk 方法返回 s 的匹配长度,当 s ! = len 时,表示未查找到该key;当s == len时,需要检验 stopnode 是否为 key,并且当 stopnode 为压缩节点时,还需要检查splitpos是否为0(可能匹配到某个压缩节点中间的某个元素)。

raxLowWalk 方法作为底层方法,在增删查改操作中均有调用,接口定义为:

static inline size_t raxLowWalk(rax *rax, unsigned char *s, size_t len, raxNode **stopnode, raxNode ***plink, int *splitpos, raxStack *ts)

其中,入参定义为:

  • rax 为待查找的 Rax;
  • s 为待查找的 key;
  • len 为 s 的长度;
  • stopnode 为查找过程中的终止节点,也就意味着,当 rax 查找到该节点时,待查找的 key 已经匹配完成,或者当前节点无法与带查找的 key 匹配;
  • plink 用于记录父节点中指向 stopnode 的指针的位置,当 stopnode 变化时,也需要修改父节点指向该节点的指针;
  • splitpos 用于记录压缩节点的匹配位置;用于压缩节点中切割并插入
  • 当 ts 不为空时,会将查找该 key 的路径写入该变量

具体实现如下:

    raxNode *h = rax->head; // 从跟节点开始匹配raxNode **parentlink = &rax->head;size_t i = 0; /* Position in the string. */size_t j = 0; /* Position in the node children (or bytes if compressed).*/while(h->size && i < len) {unsigned char *v = h->data;if (h->iscompr) {// 压缩节点,挨个比对,相同则继续往下找;反之,则退出,说明匹配到这就开始不同或者已经到末尾了for (j = 0; j < h->size && i < len; j++, i++) {if (v[j] != s[i]) break;}if (j != h->size) break;} else { // 非压缩节点,只要找到一个相同的就可以继续往下,在子节点中查找了/* Even when h->size is large, linear scan provides good* performances compared to other approaches that are in theory* more sounding, like performing a binary search. */ for (j = 0; j < h->size; j++) {if (v[j] == s[i]) break;}if (j == h->size) break;// 可以匹配到当前字符,继续往下匹配下一个字符,然后会移动到子节点进行匹配i++;}// 记录路径信息if (ts) raxStackPush(ts,h); /* Save stack of parent nodes. */// 找到第一个子节点的地址raxNode **children = raxNodeFirstChildPtr(h);if (h->iscompr) j = 0; /* Compressed node only child is at index 0. */// 将当前节点移动到其第j个子节点memcpy(&h,children+j,sizeof(h));parentlink = children+j;j = 0; /* If the new node is non compressed and we do notiterate again (since i == len) set the splitposition to 0 to signal this node representsthe searched key. */}if (stopnode) *stopnode = h;if (plink) *plink = parentlink;if (splitpos && h->iscompr) *splitpos = j;return i;

实现逻辑:

  • 从根节点开始匹配,直到当前待查找节点无子节点或者 s 查找完毕。对于每个节点来说,如果为压缩节点,则需要与 s 中的字符完全匹配。如果为非压缩节点,则查找与当前待匹配字符相同的字符。
  • 如果当前待匹配节点能够与 s 匹配,则移动位置到其子节点,继续匹配,直至匹配完成。
  • 记录相关信息,比如路径等

2. 插入

插入元素比较复杂,源码中也花了大量注释说明。

向 rax 中插入 key-value 对,对于已存在的 key, rax 提供了2种方案,覆盖或者不覆盖原有的 value,对应的接口分别为 raxInsert、raxTryInsert。

插入操作的真正实现函数 raxGenericInsert,方法定义如下:

int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old, int overwrite)

第一步,当完全匹配并且目标位置的节点不需要切割

    size_t i;int j = 0; // 切割位置,当定位到具体的压缩节点时,j用于定位该压缩节点待插入的位置raxNode *h, **parentlink;// 在插入元素前,会通过 raxLowWalk 判断 key 是否存在或者找到待插入位置。i = raxLowWalk(rax,s,len,&h,&parentlink,&j,NULL);// 1. i == len(代表已经匹配到了整个字符串)// 2. 如果不是压缩节点 或者 是压缩节点但该压缩节点不需要切割(j==0)// 注:对于压缩节点node,如果要在node中插入新元素 或者 该node中代表了这次要插入元素的key, 那么将会重新分配该node, 确保空间足够if (i == len && (!h->iscompr || j == 0 /* not in the middle if j is 0 */)) {/* Make space for the value pointer if needed. */if (!h->iskey || (h->isnull && overwrite)) {h = raxReallocForData(h,data);if (h) memcpy(parentlink,&h,sizeof(h));}if (h == NULL) {errno = ENOMEM;return 0;}// 如果key存在,根据方法入参进行处理(获取旧数据或者重写)if (h->iskey) {if (old) *old = raxGetData(h);if (overwrite) raxSetData(h,data);errno = 0;return 0; /* Element already exists. */}// 给新增节点赋值raxSetData(h,data);rax->numele++;return 1; /* Element inserted. */}

第二步:对于需要切割的压缩节点,有几种情况:

假设我们当前所在节点 h 包含字符串 “ANNIBALE” ,h 有且仅有一个子节点(“SC”),并且该节点没有子节点,用 ‘0’ 表示,结构如下:

"ANNIBALE" -> "SCO" -> []

案例1: 插入 “ANNIENTARE”

  |B| -> "ALE" -> "SCO" -> []"ANNI" -> |-||E| -> (... continue algo ...) "NTARE" -> []

需要将 “ANNIBALE” 拆分成3部分:压缩节点,非压缩节点,压缩节点

案例2: 插入 “ANNIBALI”

                  |E| -> "SCO" -> []"ANNIBAL" -> |-||I| -> (... continue algo ...) []

需要将 “ANNIBALE” 拆成2部分:压缩节点,非压缩节点。

案例3: 插入 “AGO”(和案例1类似,不过,需要在原节点中设置 iscompr = 0)

            |N| -> "NIBALE" -> "SCO" -> []|A| -> |-||G| -> (... continue algo ...) |O| -> []

需要将 “ANNIBALE” 节点拆分为3部分:非压缩节点,非压缩节点,压缩节点。

案例4: 插入 “CIAO”

     |A| -> "NNIBALE" -> "SCO" -> []|-||C| -> (... continue algo ...) "IAO" -> []

此时需要将 “ANNIBALE” 节点拆分为2部分:第一部分是非压缩节点,第二部分为压缩节点。

案例5: 插入 “ANNI”

"ANNI" -> "BALE" -> "SCO" -> []

将 “ANNIBALE” 拆分成2个压缩节点。

综上,对于压缩节点拆分过程,总体而言分为2类:

  • 一类是新插入的key是当前节点的一部分
  • 另一类是新插入的key和压缩节点的某个位置不匹配

源码实现上,是用了两种算法覆盖以上5种案例:

算法1: 覆盖案例 1~4,出现字符不匹配的位置,是在某个压缩节点上。

SPLITPOS 代表压缩节点切割位置,例如 压缩节点包含 “ANNIBALE”,将插入 “ANNIENTARE”,那么 SPLITPOS = 4,也就是 未匹配元素的下标。

  • 1.保存压缩节点 NEXT 指针(指向唯一的子节点)
  • 2.创建切割节点 “split node”,即第一个未匹配位置,len = 1;例如,这里的 “B” 将作为切割节点。
  • 3a.如果 SPLITPOS == 0,用 “split node” 代替旧节点,并拷贝数据;也就意味着,该压缩节点只会被分成两部分
  • 3b. 如果 SPLITPOS != 0,也就说该压缩节点存在匹配成功的前缀部分,切割压缩节点,并修改相应指针。如果新压缩节点len == 1, 设置 iscompr = 0
  • 4a. 如果原压缩节点切割后,剩余后缀长度 postfix len 大于 0,就创建一个 “postfix node” 后缀节点;如果 “postfix node” len == 1, 设置 iscompr = 0;设置 “postfix node” 的子节点指针 指向 NEXT
  • 4b. 如果 postfix len == 0, 用 NEXT 作为 postfix 指针;这个时候,也只切割成两部分,公共前缀 和 split node
  • 5.设置 postfix 节点作为 splitnode 的第一个子节点
  • 6.设置 “split node” 为当前节点, 然后将处理,新key未匹配部分

源码实现如下:

    /* ------------------------- ALGORITHM 1 --------------------------- */if (h->iscompr && i != len) {/* 1: Save next pointer. */// 用 NEXT 保存 h的子节点指针raxNode **childfield = raxNodeLastChildPtr(h);raxNode *next;memcpy(&next,childfield,sizeof(next));if (h->iskey) {debugf("key value is %p\n", raxGetData(h));}// 设置新增节点长度,这里 j 是压缩节点切割位置,因此压缩节点会被切割成两部分,长度分别是 j 和 postfixlensize_t trimmedlen = j;size_t postfixlen = h->size - j - 1;// 仔细想想?当压缩节点被切割成两部分,并且没有公共前缀;如果该压缩节点本身 iskey, 那么切割后 split_node 自然也是 iskey (注意:iskey表示从根节点到压缩节点的父节点组成的key)int split_node_is_key = !trimmedlen && h->iskey && !h->isnull;size_t nodesize;/* 2: 创建 split node 节点,并分配空间 */// 可以看到,这里会切割成三部分,当切割位置在第一个字符时(j==0),即trimmedlen=0 表示没有公共前缀,此时会切割成两部分raxNode *splitnode = raxNewNode(1, split_node_is_key);  // 切割位置raxNode *trimmed = NULL; // 已匹配的前缀部分raxNode *postfix = NULL; // 切割位置之后,未匹配的部分// 如果匹配前缀大于0,分配新空间if (trimmedlen) {nodesize = sizeof(raxNode)+trimmedlen+raxPadding(trimmedlen)+sizeof(raxNode*);if (h->iskey && !h->isnull) nodesize += sizeof(void*);trimmed = rax_malloc(nodesize);}// 未匹配部分长度大于0,分配空间if (postfixlen) {nodesize = sizeof(raxNode)+postfixlen+raxPadding(postfixlen)+sizeof(raxNode*);postfix = rax_malloc(nodesize);}/* OOM? Abort now that the tree is untouched. */// 分配失败if (splitnode == NULL ||(trimmedlen && trimmed == NULL) ||(postfixlen && postfix == NULL)){rax_free(splitnode);rax_free(trimmed);rax_free(postfix);errno = ENOMEM;return 0;}splitnode->data[0] = h->data[j];// j == 0,意味着压缩节点只需拆分成两部分,split node 与 后缀部分已经拆分了,因此 parentlink 将指向 split nodeif (j == 0) {// 3a. 仔细想想?原压缩节点 iskey, 到这一步,split node 同样 iskey,因此将数据拷贝过去if (h->iskey) {void *ndata = raxGetData(h);raxSetData(splitnode,ndata);}memcpy(parentlink,&splitnode,sizeof(splitnode));} else {// 3b. 切割公共前缀,也就意味着,压缩节点将被分成三部分trimmed->size = j;memcpy(trimmed->data,h->data,j);trimmed->iscompr = j > 1 ? 1 : 0;trimmed->iskey = h->iskey;trimmed->isnull = h->isnull;if (h->iskey && !h->isnull) {void *ndata = raxGetData(h);raxSetData(trimmed,ndata);}raxNode **cp = raxNodeLastChildPtr(trimmed);memcpy(cp,&splitnode,sizeof(splitnode));memcpy(parentlink,&trimmed,sizeof(trimmed));parentlink = cp; /* Set parentlink to splitnode parent. */rax->numnodes++;}// 4. 创建 postfix 节点,即原压缩节点未匹配的后缀部分if (postfixlen) {/* 4a: create a postfix node. */postfix->iskey = 0;postfix->isnull = 0;postfix->size = postfixlen;postfix->iscompr = postfixlen > 1;memcpy(postfix->data,h->data+j+1,postfixlen);raxNode **cp = raxNodeLastChildPtr(postfix);memcpy(cp,&next,sizeof(next));rax->numnodes++;} else {/* 4b: just use next as postfix node. */postfix = next;}// 5. 设置 postfix 节点作为 splitnode 的第一个子节点raxNode **splitchild = raxNodeLastChildPtr(splitnode);memcpy(splitchild,&postfix,sizeof(postfix));// 6. 继续插入。以上只处理了原压缩节点,对于新插入key, 未匹配部分还未插入,源码稍后介绍;这里插入之后,splitnode将会多一个子节点rax_free(h);h = splitnode;}

算法2:应对场景5,即在压缩节点某个位置处,完成了字符串 s 的匹配,此时,只需将压缩节点拆分成两部分即可。

例如,rax树中存在节点 “ANNIBALE”,将插入 “ANNI”,此时 SPLITPOS = 4。

  • 1.保存当前压缩节点 NEXT 指针
  • 2.创建后缀节点 postfix node,保存从 SPLITPOS 之后的所有字符,如果 postfix node 的 len == 1, 设置 iscompr = 0
  • 3.切割匹配的公共部分 trimmed 节点,下标为0~SPLITPOS
  • 4.设置 postfix 为 trimmed 的第一个子节点

源码实现如下:

else if (h->iscompr && i == len) {/* ------------------------- ALGORITHM 2 --------------------------- */// 切割并分配 postfix 和 trimmed 节点size_t postfixlen = h->size - j;size_t nodesize = sizeof(raxNode)+postfixlen+raxPadding(postfixlen)+sizeof(raxNode*);if (data != NULL) nodesize += sizeof(void*);raxNode *postfix = rax_malloc(nodesize);nodesize = sizeof(raxNode)+j+raxPadding(j)+sizeof(raxNode*);if (h->iskey && !h->isnull) nodesize += sizeof(void*);raxNode *trimmed = rax_malloc(nodesize);// 分配失败if (postfix == NULL || trimmed == NULL) {rax_free(postfix);rax_free(trimmed);errno = ENOMEM;return 0;}/* 1: Save next pointer. */raxNode **childfield = raxNodeLastChildPtr(h);raxNode *next;memcpy(&next,childfield,sizeof(next));/* 2: Create the postfix node. */postfix->size = postfixlen;postfix->iscompr = postfixlen > 1;postfix->iskey = 1;postfix->isnull = 0;memcpy(postfix->data,h->data+j,postfixlen);raxSetData(postfix,data);raxNode **cp = raxNodeLastChildPtr(postfix);memcpy(cp,&next,sizeof(next));rax->numnodes++;/* 3: Trim the compressed node. */trimmed->size = j;trimmed->iscompr = j > 1;trimmed->iskey = 0;trimmed->isnull = 0;memcpy(trimmed->data,h->data,j);memcpy(parentlink,&trimmed,sizeof(trimmed));if (h->iskey) {void *aux = raxGetData(h);raxSetData(trimmed,aux);}/* Fix the trimmed node child pointer to point to* the postfix node. */cp = raxNodeLastChildPtr(trimmed);memcpy(cp,&postfix,sizeof(postfix));/* Finish! We don't need to continue with the insertion* algorithm for ALGO 2. The key is already inserted. */rax->numele++;rax_free(h);return 1; /* Key inserted. */}

到这里,还有一点工作需要完成, 那就是新key未匹配的部分,还需要插入rax树,源码实现如下:

    while(i < len) {raxNode *child;// 这里就是判断要处理成压缩节点还是非压缩节点if (h->size == 0 && len-i > 1) {debugf("Inserting compressed node\n");size_t comprsize = len-i;if (comprsize > RAX_NODE_MAX_SIZE)comprsize = RAX_NODE_MAX_SIZE;raxNode *newh = raxCompressNode(h,s+i,comprsize,&child);if (newh == NULL) goto oom;h = newh;memcpy(parentlink,&h,sizeof(h));parentlink = raxNodeLastChildPtr(h);i += comprsize;} else {debugf("Inserting normal node\n");raxNode **new_parentlink;raxNode *newh = raxAddChild(h,s[i],&child,&new_parentlink);if (newh == NULL) goto oom;h = newh;memcpy(parentlink,&h,sizeof(h));parentlink = new_parentlink;i++;}rax->numnodes++;h = child;}raxNode *newh = raxReallocForData(h,data);if (newh == NULL) goto oom;h = newh;if (!h->iskey) rax->numele++;raxSetData(h,data);memcpy(parentlink,&h,sizeof(h));return 1; /* Element inserted. */

以上便是插入新key的全部过程。可以看出,实现上根据列举的5种案例进行处理。

3. 删除

rax 的删除操作主要有3个接口,可以删除 rax 中的某个 key,或者释放整个 rax,在释放 rax 时,还可以设置释放回调函数,在释放rax 的每个 key 时,都会调用这个回调函数,3个接口的定义如下:

// 在 rax 中删除长度为 len 的 s
int raxRemove(rax *rax, unsigned char *s, size_t len, void **old);
// 释放 rax
void raxFree(rax *rax);
// 释放 rax, 释放每个 key 时,都会调用 free_callback 方法
void raxFreeWithCallback(rax *rax, void (*free_callback)(void*));

其中,rax 的释放操作,采用的是深度优先算法

下面重点介绍 raxRemove 方法:

第一步:当删除 rax 中的某个 key-value 对时,首先查找 key 是否存在,不存在则直接返回,存在则需要进行删除操作,实现如下:

    raxNode *h;raxStack ts;raxStackInit(&ts); // 保存路径int splitpos = 0;size_t i = raxLowWalk(rax,s,len,&h,NULL,&splitpos,&ts);if (i != len || (h->iscompr && splitpos != 0) || !h->iskey) { // 未找到目标 key, 直接退出raxStackFree(&ts);return 0;}

第二步,如果 key 存在,则需要进行删除操作,删除操作完成后,rax 树可能需要进行压缩。具体可以分为下面 2 种情况,此处所说的压缩是指将某个节点与其子节点压缩成一个节点,叶子节点没有子节点,不能进行压缩。

  • 某个节点只有一个子节点,该子节点之前是key,经删除操作后不再是key,此时可以将该节点与其子节点压缩。
  • 某个节点有两个子节点,经过删除操作后,只剩下一个子节点,如果这个子节点不是key,则可以将该节点与这个子节点压缩。

案例1:
假设 rax 树存储了 “FOO” = 1 和 “FOOBAR” = 2,结构如下:

"FOO" -> "BAR" -> [] (2)(1)

删除 “FOO” ,压缩后,结构如下:

"FOOBAR" -> [] (2)

案例2:
假设 rax 树存储了 “FOOBAR” = 1 和 “FOOTER” = 2

          |B| -> "AR" -> [] (1)"FOO" -> |-||T| -> "ER" -> [] (2)

删除 “FOOTER” 后,结构如下:

"FOO" -> |B| -> "AR" -> [] (1)

可以压缩成的结果如下:

"FOOBAR" -> [] (1)

删除操作具体可以分为2个阶段,删除阶段以及压缩阶段

1)删除阶段:

利用查找key时记录的匹配路径,依次向上直到无法删除为止。

    if (h->size == 0) {debugf("Key deleted in node without children. Cleanup needed.\n");raxNode *child = NULL;while(h != rax->head) {child = h;debugf("Freeing child %p [%.*s] key:%d\n", (void*)child,(int)child->size, (char*)child->data, child->iskey);rax_free(child);rax->numnodes--;h = raxStackPop(&ts);// 如果节点为 key,或者子节点 size != 1,则无法删除if (h->iskey || (!h->iscompr && h->size != 1)) break;}if (child) {raxNode *new = raxRemoveChild(h,child);if (new != h) {raxNode *parent = raxStackPeek(&ts);raxNode **parentlink;if (parent == NULL) {parentlink = &rax->head;} else {parentlink = raxFindParentLink(parent,h);}memcpy(parentlink,&new,sizeof(new));}// 删除后尝试进行压缩if (new->size == 1 && new->iskey == 0) {trycompress = 1;h = new;}}} else if (h->size == 1) {// 可以尝试进行压缩trycompress = 1;}

2)压缩阶段: 压缩过程可以细化为2步。

① 找到可以进行压缩的第一个元素,之后将所有可进行压缩的节点进行压缩。由于raxRowWalk函数已经记录了查找key的过程,压缩时只需从记录栈中不断弹出元素,即可找到可进行压缩的第一个元素,过程如下:

        raxNode *parent;while(1) {parent = raxStackPop(&ts);if (!parent || parent->iskey ||(!parent->iscompr && parent->size != 1)) break;h = parent;}// 可以进行压缩的第一个节点raxNode *start = h;

② 找到第一个可压缩节点后,进行数据压缩。由于可压缩的节点都只有一个子节点,压缩过程只需要读取每个节点的内容,创建新的节点,并填充新节点的内容即可:

       /* Scan chain of nodes we can compress. */size_t comprsize = h->size;int nodes = 1;while(h->size != 0) {raxNode **cp = raxNodeLastChildPtr(h);memcpy(&h,cp,sizeof(h));if (h->iskey || (!h->iscompr && h->size != 1)) break;/* Stop here if going to the next node would result into* a compressed node larger than h->size can hold. */if (comprsize + h->size > RAX_NODE_MAX_SIZE) break;nodes++;comprsize += h->size;}if (nodes > 1) {/* If we can compress, create the new node and populate it. */size_t nodesize =sizeof(raxNode)+comprsize+raxPadding(comprsize)+sizeof(raxNode*);raxNode *new = rax_malloc(nodesize);/* An out of memory here just means we cannot optimize this* node, but the tree is left in a consistent state. */if (new == NULL) {raxStackFree(&ts);return 1;}new->iskey = 0;new->isnull = 0;new->iscompr = 1;new->size = comprsize;rax->numnodes++;/* Scan again, this time to populate the new node content and* to fix the new node child pointer. At the same time we free* all the nodes that we'll no longer use. */comprsize = 0;h = start;while(h->size != 0) {memcpy(new->data+comprsize,h->data,h->size);comprsize += h->size;raxNode **cp = raxNodeLastChildPtr(h);raxNode *tofree = h;memcpy(&h,cp,sizeof(h));rax_free(tofree); rax->numnodes--;if (h->iskey || (!h->iscompr && h->size != 1)) break;}/* Now 'h' points to the first node that we still need to use,* so our new node child pointer will point to it. */raxNode **cp = raxNodeLastChildPtr(new);memcpy(cp,&h,sizeof(h));/* Fix parent link. */if (parent) {raxNode **parentlink = raxFindParentLink(parent,start);memcpy(parentlink,&new,sizeof(new));} else {rax->head = new;}}

4. 遍历

为了能够遍历 rax 中所有的 key, Redis 提供了迭代器操作。Redis 中实现的迭代器为双向迭代器,可以向前,也可以向后,顺序是按照 key 的字典序排列的。通过 rax 的结构图可以看出,如果某个节点为 key,则其子节点的 key 按照字典序比该节点的 key 大。另外,如果当前节点为非压缩节点,则其最左侧节点的key是其所有子节点的 key 中最小的。主要方法有:

void raxStart(raxIterator *it, rax *rt);
int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len);
int raxNext(raxIterator *it);
int raxPrev(raxIterator *it);

1)raxStart 用于初始化 raxIterator 结构:

/* Initialize a Rax iterator. This call should be performed a single time* to initialize the iterator, and must be followed by a raxSeek() call,* otherwise the raxPrev()/raxNext() functions will just return EOF. */
void raxStart(raxIterator *it, rax *rt) {it->flags = RAX_ITER_EOF; /* No crash if the iterator is not seeked. */it->rt = rt;it->key_len = 0;it->key = it->key_static_string;it->key_max = RAX_ITER_STATIC_LEN;it->data = NULL;it->node_cb = NULL;raxStackInit(&it->stack);
}

2)raxSeek,在raxStart初始化迭代器后,必须调用raxSeek函数初始化迭代器的位置。

int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len);
  • it为raxStart初始化的迭代器。
  • op为查找操作符,可以为大于(>)、小于(<)、大于等于(>=)、小于等于(<=)、等于(=)、首个元素(^)、末尾元素($)。
  • ele为待查找的key。
  • len为ele的长度。

查找末尾元素可以直接在Rax中找到最右侧的叶子节点,查找首个元素被转换为查找大于等于空的操作。

处理大于、小于、等于等操作主要分为以下几步:

  • 在rax中查找key:
  • 如果key找到,并且op中设置了等于,则操作完成:
  • 如果仅仅设置等于,并没有找到key,则将迭代器的标志位设置为末尾。
  • 如果设置了等于但没有找到key,或者设置了大于或者小于符号,则需要继续查找。

3)raxNext & raxPrev,主要用于迭代上一个或者下一个key

/* Go to the next element in the scope of the iterator 'it'.* If EOF (or out of memory) is reached, 0 is returned, otherwise 1 is* returned. In case 0 is returned because of OOM, errno is set to ENOMEM. */
int raxNext(raxIterator *it) {if (!raxIteratorNextStep(it,0)) {errno = ENOMEM;return 0;}if (it->flags & RAX_ITER_EOF) {errno = 0;return 0;}return 1;
}/* Go to the previous element in the scope of the iterator 'it'.* If EOF (or out of memory) is reached, 0 is returned, otherwise 1 is* returned. In case 0 is returned because of OOM, errno is set to ENOMEM. */
int raxPrev(raxIterator *it) {if (!raxIteratorPrevStep(it,0)) {errno = ENOMEM;return 0;}if (it->flags & RAX_ITER_EOF) {errno = 0;return 0;}return 1;
}

4)raxStop & raxEOF:
raxEOF用于标识迭代器迭代结束,raxStop用于结束迭代并释放相关资源:

/* Free the iterator. */
void raxStop(raxIterator *it) {if (it->key != it->key_static_string) rax_free(it->key);raxStackFree(&it->stack);
}/* Return if the iterator is in an EOF state. This happens when raxSeek()* failed to seek an appropriate element, so that raxNext() or raxPrev()* will return zero, or when an EOF condition was reached while iterating* with raxNext() and raxPrev(). */
int raxEOF(raxIterator *it) {return it->flags & RAX_ITER_EOF;
}

总结

首先,要理解 rax 底层实现,第一步需要理解 rax 的结构;其次,要理解 压缩节点 和 非压缩节点 的区别

来,再回顾下 压缩节点 和 非压缩节点 的区别:
它们的相同之处在于:

  • 都有保存元数据的节点头 HDR;
  • 都会包含指向子节点的指针,
  • 以及子节点所代表的字符串。从根节点到当前节点路径上的字符串如果是 Radix Tree 的一个 key,它们都会包含指向 key 对应 value 的指针。

不同之处在于:

  • 非压缩节点指向的子节点,每个子节点代表一个字符,非压缩节点可以指向多个子节点;
  • 压缩节点指向的子节点,代表的是一个合并字符串,压缩节点只能指向一个子节点。

相关参考:

  • redis readme.md
  • Redis · 引擎特性 · radix tree 源码解析

深入分析redis之rax底层原理,前缀树?相关推荐

  1. 深入分析 Redis Lua 脚本运行原理

    Redis 提供了非常丰富的指令集,但是用户依然不满足,希望可以自定义扩充若干指令来完成一些特定领域的问题.Redis 为这样的用户场景提供了 lua 脚本支持,用户可以向服务器发送 lua 脚本来执 ...

  2. 深入Redis数据结构和底层原理

    1 概述 Redis为什么能支持每秒钟十万级的高并发? 基于内存的存取方式 高效的数据结构 单线程,使用多路I/O复用模型,非阻塞IO - 其中一个重要的原因,就是Redis中高效的数据结构,因此我们 ...

  3. redis scan 命令底层原理(为什么会重复扫描?)

    文章目录 前言 一.迭代器 1. 全遍历 2. 间断遍历 二.scan 扫描原理 1. 扫描算法: 2. 减少重复扫描? 2.1 扩容 2.2 缩容 3. 迭代过程中正在进行rehash 4. 完整的 ...

  4. Linq的底层原理探讨

    前言 有一篇文章ABP-引入SqlSugar很多人都在催促我,出下一章因为工作忙一直没写.现在开第二节课Linq的底层原理探讨.一起探讨完,看看有没有引起SqlSugar的新思路. 这文章叫linq的 ...

  5. golang gin框架源码分析(二)---- 渐入佳境 摸索Engine ServeHTTP访问前缀树真正原理

    文章目录 全系列总结博客链接 前引 golang gin框架源码分析(二)---- 渐入佳境 摸索Engine ServeHTTP访问前缀树真正远原理 1.再列示例代码 从示例代码入手 2.r.Run ...

  6. 什么是m叉树_不懂数据库索引的底层原理?那是因为你心里没点b树

    点击上方"后端技术精选",选择"置顶公众号" 技术文章第一时间送达! 作者:苏苏喂 cnblogs.com/sujing/p/11110292.html 题外话 ...

  7. long 转为string_面试必问 Redis数据结构底层原理String、List篇

    点击关注上方"Java大厂面试官",第一时间送达技术干货. 阅读文本大概需要 8 分钟. 前言 今天来整理学习下Redis有哪些常用数据结构,都是怎么使用的呢?首先看下全局存储结构 ...

  8. redis单线程原理___Redis为何那么快-----底层原理浅析

    redis单线程原理 redis单线程问题 单线程指的是网络请求模块使用了一个线程(所以不需考虑并发安全性),即一个线程处理所有网络请求,其他模块仍用了多个线程. 1. 为什么说redis能够快速执行 ...

  9. oracle树子类遍历父类_不懂数据库索引的底层原理?那是因为你心里没点b树

    前几天下班回到家后正在处理一个白天没解决的bug,厕所突然传来对象的声音: 对象:xx,你有<时间简史>吗? 我:我去!妹子,你这啥癖好啊,我有时间也不会去捡屎啊! 对象:...人家说的是 ...

  10. 不懂数据库索引的底层原理?那是因为你心里没点b树

    点击上方"后端技术精选",选择"置顶公众号" 技术文章第一时间送达! 作者:苏苏喂 cnblogs.com/sujing/p/11110292.html 题外话 ...

最新文章

  1. [Android]用架构师角度看插件化(3)-Replugin 需要占坑跳转?
  2. B06_NumPy 切片和索引
  3. Android中BindService方式使用的理解
  4. CSS基础(part13)--浮动
  5. MATLAB绘制正弦波、方波、三角波、锯齿波的mif文件
  6. java方法传对象参数_Java方法中的参数太多,第2部分:参数对象
  7. WordPress窗体化侧边栏
  8. 【转】RabbitMQ六种队列模式-3.发布订阅模式
  9. php iframe 上传文件,php+iframe 实现上传文件功能示例
  10. MySQL数据库----触发器
  11. 【设计模式】各个击破单例模式的8种写法
  12. 10php1c,PHP程序员,进阶选择C还是C++亦或者别语言
  13. 大型Web 网站 Asp.net Session过期你怎么办
  14. 计算机关机 休眠睡眠有什么区别,电脑“关机”、“睡眠”、“休眠”三者区别是什么...
  15. 手机图片怎么加水印?只需三步即可完成
  16. 74HC595芯片工作原理细致分析(以及级联拓展应用),以及芯片控制继电器原理 / 代码
  17. TabLayout 之改变 Indicator 的宽度
  18. Switchport详细用法
  19. 令牌桶算法的python实现,人人都可以玩算法
  20. Oracle计算时间差

热门文章

  1. Android 9.0 10.0 手动安装Persistent app失败的解决方案
  2. AUTOCAD——制作剪裁图块
  3. 高数 | 导数零点定理为什么导数可以不连续?
  4. 八款android日历 [Calendar] 开源项目框架分类总汇
  5. 那些著名的黑客事件 十二
  6. [视频教程]MAME画质优化hq3x
  7. 平时收集的一些有关UED的团队和个人博客
  8. jquery 的税收计算器(仅限参考)
  9. php网络图片拼接,图片处理 - PHP图片拼接如何高效的实现
  10. openid是什么意思?token是什么意思?