Redis中的Stream的实现Radix Tree源码解读

这篇文章,我们继续从底层数据结构的视角出发,来聊聊 Redis 中的 Stream 数据类型是如何保存消息的。

Redis 从 5.0 版本开始支持提供 Stream 数据类型,它可以用来保存消息数据,进而能帮助我们实现一个带有消息读写基本功能的消息队列,并用于日常的分布式程序通信当中。我在讲如何使用 Redis 实现消息队列的时候,曾介绍过 Stream。当时,有不少同学就说想学习了解下 Stream 的实现,以便掌握 Stream 内部结构的操作特点,但自己对 Stream 类型不太熟悉,不知道 Stream 底层是采用怎样的数据结构来保存消息数据的。

其实,为了节省内存空间,在 Stream 数据类型的底层数据结构中,采用了 Radix Tree 和 listpack 两种数据结构来保存消息。我在之前已经介绍过了 listpack,它是一个紧凑型列表,在保存数据时会非常节省内存。

所以今天这节课,我就来介绍下 Stream 用到的另一个数据结构 Radix Tree。这个数据结构的最大特点是适合保存具有相同前缀的数据,从而实现节省内存空间的目标,以及支持范围查询。

同时,和常见的 B 树或 B+ 树类似,Radix Tree 也是一种重要的树型结构,在操作系统内核和数据库中也有应用。所以,了解 Radix Tree 的设计与实现,既可以帮助我们掌握 Stream 的实现思路,还可以让我们把 Radix Tree 应用到需要节省内存的有序树型索引场景中,进一步解决具有公共前缀的大量数据保存时的内存开销问题。

好,那么接下来,我们先来了解下 Stream 保存的消息数据的特征,这也是 Redis 使用 Radix Tree 和 listpack 作为底层结构保存消息的重要考虑因素。

Stream 消息数据的特征

首先,Stream 作为消息队列,它保存的消息通常具有以下两个特征:

  • 一条消息由一个或多个键值对组成;
  • 每插入一条消息,这条消息都会对应一个消息 ID。

我们一般会让 Redis 服务器自动生成递增的消息 ID。此时,消息 ID 由时间戳和序号组成。其中,时间戳是消息插入时,以毫秒为单位的服务器当时时间,序号是插入消息在当前毫秒内的序号。

比如,我在 Redis 实例中执行以下操作,可以向名为 devmsg 的消息流中,连续插入 5 条消息。其中,每条消息记录的是某个设备 ID 对应的设备温度信息。

从上面的插入数据和返回结果中,我们可以看到,对应 Stream 类型来说,它需要保存的数据也具有两个特征:

  • 连续插入的消息 ID,其前缀有较多部分是相同的。比如,刚才插入的 5 条消息,它们消息 ID 的前 8 位都是 16362968。

  • 连续插入的消息,它们对应键值对中的键通常是相同的。比如,刚才插入的 5 条消息,它们消息中的键都是 dev 和 temp。

那么,针对 Stream 的这两个数据特征,我们该设计使用什么样的数据结构来保存这些消息数据呢?

你可能会想到使用哈希表,一个消息 ID 对应哈希表中的一个 key,消息内容对应这个 key 的 value。但是,就像刚才介绍的数据特征一样,消息 ID 和消息中的键经常会有重复的部分。如果使用哈希表,就会导致有不少冗余数据,这会浪费 Redis 宝贵的内存空间。

因此,为了充分节省内存空间,Stream 使用了两种内存友好的数据结构:listpack 和 Radix Tree。其中,消息 ID 是作为 Radix Tree 中的 key,消息具体数据是使用 listpack 保存,并作为 value 和消息 ID 一起保存到 Radix Tree 中

你可以看看下面的 Stream 结构体定义,其中,消息就是使用 Radix Tree 类型的结构*rax来保存的。

stream.h文件中可以找到

typedef struct stream {//保存消息的Radix Treerax *rax;               /* The radix tree holding the stream. *///消息流中的消息个数uint64_t length;        /* Number of elements inside this stream. *///当前消息流中最后插入的消息的IDstreamID last_id;       /* Zero if there are yet no items. *///当前消息流的消费组信息,也是用Radix Tree保存rax *cgroups;           /* Consumer groups dictionary: name -> streamCG */
} stream;

好了,那么 Radix Tree 的结构到底是怎样的呢?下面我们就来学习下 Radix Tree 的基本结构。

Radix Tree 的基本结构

Radix Tree 是属于前缀树的一种类型。前缀树也称为 Trie Tree,它的特点是,保存在树上的每个 key 会被拆分成单字符,然后逐一保存在树上的节点中。前缀树的根节点不保存任何字符,而除了根节点以外的其他节点,每个节点只保存一个字符。当我们把从根节点到当前节点的路径上的字符拼接在一起时,就可以得到相应 key 的值了。下面这张图展示了一个简单的前缀树,你可以看下。图中的前缀树有两个叶子节点,将根节点到这两个叶子节点的路径上,对应的字符拼接起来后,就得到了两个 key:ab和 ac、AB、AC。

此图来源于LeetCode「宫水三叶」写的,https://leetcode-cn.com/problems/implement-trie-prefix-tree/solution/gong-shui-san-xie-yi-ti-shuang-jie-er-we-esm9/,觉得好可以关注他写的。

另外从图中,我们还可以看到,前缀树是把保存的 key 的公共前缀(即 a,A)独立出来共享使用的。这样一来,就可以避免在树中对相同的字符做重复存储。而如果不采用这种方法,只是把这两个 key 保存在哈希表中,那么 key 的相同前缀就会被单独存储,这样就会导致内存空间的浪费。所以,相比哈希表的保存方式,前缀树能够很好地节省内存空间,这对于 Redis 来说是非常重要的。

前缀树的不足和 Radix Tree 的改进

当然,前缀树在每个节点中只保存一个字符,这样做的好处就是可以尽可能地共享不同 key 的公共前缀。但是,这也会导致 key 中的某些字符串,虽然不再被共享,可仍然会按照每个节点一个字符的形式来保存,这样反而会造成空间的浪费和查询性能的降低。

我来给你举个例子,假设有 5 个 key,分别是 radix、race、read、real 和 redis,它们在前缀树上的布局如下图所示。

对于“redis”来说,因为它和“read”“real”共享“r”和“e”,和“radix”“race”共享“r”,也就是说“r”和“e”节点都分别指向多个子节点。类似的,“real”和“read”共享了“r”“e”和“a”前缀,“a”节点也指向了多个子节点。所以,在前缀树的节点中单独保存“r”“e”“a”是很有必要的。

但是,我们还是看“redis”这个 key,除了“r”“e”字符和其他 key 有共享外,“re”后面的“dis”没有再被其他 key 共享了。所以此时,其实并没有必要再对“dis”进行拆分,将其分成单个字符“d”“i”和“s”来保存,而是可以把它们合并在一起保存。那么到这里,你就可以发现,在前缀树上,确实有的字符需要单独保存,用来作为不同 key 的公共前缀进行共享,但其实有的单字符节点可以和其他单字符节点进行合并,这样能进一步节省空间。

而从一个更加通用的角度来说,在前缀树的某个节点开始,如果从该节点到另外一个节点之间,每一个节点都只有一个子节点,那就表明这些节点对应的字符,并没有和其他节点共享了。那么如果我们还是按照前缀树的方式,为每一个字符创建一个节点进行保存的话,一是会浪费内存空间,二是在进行查询时,还需要逐一匹配每个节点表示的字符,对查询性能也会造成影响。

所以,在前缀树中,如果一系列单字符节点之间的分支连接是唯一的,那么这些单字符节点就可以合并成一个节点,而这种结构的树,就正是 Radix Tree,也被称为基数树。相比前缀树来说,Radix Tree 既可以节约内存的使用,同时还可以提高查询访问的效率。

我画了下面这张图,展示了刚才介绍的前缀树上的 5 个 key(radix、race、read、real 和 redis),在 Radix Tree 上的布局,你可以对照着看下它们在前缀树布局上的不同之处。

Radix Tree 数据结构

好了,从刚才介绍的 Radix Tree 的结构中,我们其实可以发现,在 Radix Tree 中存在两类节点。

  • 第一类节点是非压缩节点,这类节点会包含多个指向不同子节点的指针,以及多个子节点所对应的字符,比如前面 Radix Tree 例子中的节点“r”,这个节点就包含了指向子节点“a”和“e”的指针。同时,如果从根节点到一个非压缩节点的路径上的字符串,已经对应了 Radix Tree 中保存的一个 key,那么这个非压缩节点中还包含了指向这个 key 对应的 value 的指针。比如,下面这张图就显示了刚才例子中的节点 r,它是一个非压缩节点,指向了两个子节点,这两个子节点对应的字符分别是“a”和“e”,这个非压缩节点包含了指向子节点 a 和 e 的指针。此外,非压缩节点头部保存的 HDR,是 Radix Tree 节点数据结构中的元数据,我一会儿会给你具体介绍它。

  • 第二类节点是压缩节点,这类节点会包含一个指向子节点的指针,以及子节点所代表的合并的字符串。比如前面 Radix Tree 例子中的节点 e,这个节点指向的子节点包含的字符串就是合并的字符串“dis”。和非压缩节点类似,如果从根节点到一个压缩节点的路径上的字符串,已经对应了 Radix Tree 中保存的一个 key,那么,这个压缩节点中还包含指向这个 key 对应的 value 的指针。

既然,这两类节点的头部 HDR 中都保存了元数据,下面我们就来看看,这些元数据都包括了什么内容。首先,我们需要了解下 Radix Tree 的节点数据结构。Radix Tree 节点的数据结构是由rax.h文件中的 raxNode 定义的,如下所示:

typedef struct raxNode {//节点是否包含keyuint32_t iskey:1;     /* Does this node contain a key? *///节点的值是否为NULLuint32_t isnull:1;    /* Associated value is NULL (don't store it). *///节点是否被压缩uint32_t iscompr:1;   /* Node is compressed. *///节点大小uint32_t size:29;     /* Number of children, or compressed string len. *//* Data layout is as follows:** If node is not compressed we have 'size' bytes, one for each children* character, and 'size' raxNode pointers, point to each child node.* Note how the character is not stored in the children but in the* edge of the parents:** [header iscompr=0][abc][a-ptr][b-ptr][c-ptr](value-ptr?)** if node is compressed (iscompr bit is 1) the node has 1 children.* In that case the 'size' bytes of the string stored immediately at* the start of the data section, represent a sequence of successive* nodes linked one after the other, for which only the last one in* the sequence is actually represented as a node, and pointed to by* the current compressed node.** [header iscompr=1][xyz][z-ptr](value-ptr?)** Both compressed and not compressed nodes can represent a key* with associated data in the radix tree at any level (not just terminal* nodes).** If the node has an associated key (iskey=1) and is not NULL* (isnull=0), then after the raxNode pointers pointing to the* children, an additional value pointer is present (as you can see* in the representation above as "value-ptr" field).*///节点的实际存储数据unsigned char data[];
} raxNode;

该结构中的成员变量包括 4 个元数据,这四个元数据的含义分别如下。

  • iskey:表示从 Radix Tree 的根节点到当前节点路径上的字符组成的字符串,是否表示了一个完整的 key。如果是的话,那么 iskey 的值为 1。否则,iskey 的值为 0。不过,这里需要注意的是,当前节点所表示的 key,并不包含该节点自身的内容。
  • isnull:表示当前节点是否为空节点。如果当前节点是空节点,那么该节点就不需要为指向 value 的指针分配内存空间了。
  • iscompr:表示当前节点是非压缩节点,还是压缩节点。
  • size:表示当前节点的大小,具体值会根据节点是压缩节点还是非压缩节点而不同。如果当前节点是压缩节点,该值表示压缩数据的长度;如果是非压缩节点,该值表示该节点指向的子节点个数。

这 4 个元数据就对应了刚才介绍的压缩节点和非压缩节点头部的 HDR,其中,iskey、isnull 和 iscompr 分别用 1 bit 表示,而 size 占用 29 bit。另外,从 raxNode 结构体中,我们还可以看到,除了元数据,该结构体中还有 char 类型数组 data。我们知道,data 是用来保存实际数据的。不过,这里保存的数据会根据当前节点的类型而有所不同:

  • 对于非压缩节点来说,data 数组包括子节点对应的字符、指向子节点的指针,以及节点表示 key 时对应的 value 指针;
  • 对于压缩节点来说,data 数组包括子节点对应的合并字符串、指向子节点的指针,以及节点为 key 时的 value 指针。

好了,到这里,你可能已经发现,在 raxNode 的实现中,无论是非压缩节点还是压缩节点,其实具有两个特点:

  • 它们所代表的 key,是从根节点到当前节点路径上的字符串,但并不包含当前节点;
  • 它们本身就已经包含了子节点代表的字符或合并字符串。而对于它们的子节点来说,也都属于非压缩或压缩节点,所以,子节点本身又会保存,子节点的子节点所代表的字符或合并字符串。

而这两个特点就给 Radix Tree 实际保存数据时的结构,带来了两个方面的变化。一方面,Radix Tree 非叶子节点,要不然是压缩节点,只指向单个子节点,要不然是非压缩节点,指向多个子节点,但每个子节点只表示一个字符。所以,非叶子节点无法同时指向表示单个字符的子节点和表示合并字符串的子节点

Rax Insert

以下用几个示例来详解rax tree插入的流程。假设j是遍历已有节点的游标,i是遍历新增节点的游标。

1. 只插入redis

z-ptr指向的叶子节点iskey=1,使用了压缩前缀。

2. 在redis之后插入redission

从redis父节点的每个压缩前缀字符比较,遍历完所有redis节点后指向了其空子节点,j = 0, i < len(redission)。
查找到redis的空子节点,直接将sion赋值到子节点上,成为redis的子节点。sion节点被标记为iskey=1,用来标识redis这个key。sion节点下再创建一个空子节点,iskey=1来表示redission这个key。

3. 在redis之后插入re

re在redis能找到前两位的前缀,也就是i=len(re),j < len(redis)。
将redis分割成re和dis两个子节点,dis也是一个压缩前缀节点,dis同时被标记为iskey=1,来表示re这个key。
dis下挂着一个空子节点,来标记redis这个key。

具体过程可以参考: https://blog.csdn.net/yunqiinsight/article/details/89394215

我给你举个例子,在下图的左半部分,节点 r 的子节点 a,它的两个子节点表示的都是合并字符串“dix”和“ce”。因此,节点 a 的 raxNode 结构,无法同时指向 dix 子节点和 ce 子节点。类似的,r 节点的子节点 e,它的两个子节点,一个表示的是单字符“a”,另一个表示的是合并字符串“dis”,节点 e 的 raxNode 结构也无法同时指向这两个子节点。所以,在实际使用 raxNode 结构保存数据时,节点 dix 会被拆为节点 d 和 ix,节点 ce 会被拆为节点 c 和 e,节点 dis 会被拆为节点 d 和 is,如下图的右半部分所示。这样一来,节点 r 的子节点 a 和 e,就可以用非压缩节点的结构来保存了。

我们再来看另一方面,对于 Radix Tree 的叶子节点来说,因为它没有子节点了,所以,Redis 会用一个不包含子节点指针的 raxNode 节点来表示叶子节点,也就是说,叶子节点的 raxNode 元数据 size 为 0,没有子节点指针。如果叶子节点代表了一个 key,那么它的 raxNode 中是会保存这个 key 的 value 指针的。

为了便于你理解非压缩节点、压缩节点和叶子节点的 raxNode 结构内容,我画了下面这张图,你可以看下。

这张图上显示了 Radix Tree 最右侧分支的 4 个节点 r、e、d、is 和它们各自的 raxNode 内容。其中,节点 r、e 和 d 都不代表 key,所以它们的 iskey 值为 0,isnull 值为 1,没有为 value 指针分配空间。

节点 r 和 e 指向的子节点都是单字符节点,所以它们不是压缩节点,iscompr 值为 0。而节点 d 的子节点包含了合并字符串“is”,所以该节点是压缩节点,iscompr 值为 1。最后的叶子节点 is,它的 raxNode 的 size 为 0,没有子节点指针。不过,因为从根节点到节点 is 路径上的字符串代表了 key“redis”,所以,节点 is 的 value 指针指向了“redis”对应的 value 数据。

这里,你需要注意的是,为了满足内存对齐的需要,raxNode 会根据保存的字符串长度,在字符串后面填充一些字节,也就是图中的 padding 部分

好了,到这里,你应该就理解了 Radix Tree 中不同节点的 raxNode 结构内容。那么接下来,我们再来了解下 Radix Tree 的基本操作函数。

Radix Tree 的操作函数

Radix Tree 的基本操作函数都是在rax.c文件中实现的,主要有以下几种。

raxNew函数

该函数的原型如下,它会调用 rax_malloc 函数分配一个新的 rax 结构体空间。

rax.c文件中可以查看

/* Allocate a new rax and return its pointer. On out of memory the function* returns NULL. */
rax *raxNew(void) {rax *rax = rax_malloc(sizeof(*rax));if (rax == NULL) return NULL;rax->numele = 0;rax->numnodes = 1;rax->head = raxNewNode(0,0);if (rax->head == NULL) {rax_free(rax);return NULL;} else {return rax;}
}

rax 结构体的定义如下所示,其中包含了 Radix Tree 中的 key 个数、节点个数,以及指向头节点的指针,而 raxNew 函数会调用 raxNewNode 函数来创建头节点。

rax.h文件中可以查看

typedef struct rax {// Radix Tree的头指针raxNode *head;// Radix Tree中key的个数uint64_t numele;// Radix Tree中raxNode的个数uint64_t numnodes;
} rax;

raxNewNode 函数

该函数的原型如下,用来创建一个新的非压缩节点。它的参数 children 表示该非压缩节点的子节点个数,参数 datafield 表示是否要为 value 指针分配空间。

/* Allocate a new non compressed node with the specified number of children.* If datafiled is true, the allocation is made large enough to hold the* associated data pointer.* Returns the new node pointer. On out of memory NULL is returned. */
raxNode *raxNewNode(size_t children, int datafield) {size_t nodesize = sizeof(raxNode)+children+raxPadding(children)+sizeof(raxNode*)*children;if (datafield) nodesize += sizeof(void*);raxNode *node = rax_malloc(nodesize);if (node == NULL) return NULL;node->iskey = 0;node->isnull = 0;node->iscompr = 0;node->size = children;return node;
}

这里,你需要注意的是,压缩节点的创建并不是通过 raxNewNode 函数来完成的,而是通过 raxCompressNode 函数来实现的。

raxGenericInsert 函数

该函数原型如下,用来向 Radix Tree 中插入一个长度为 len 的字符串 s。

/* Insert the element 's' of size 'len', setting as auxiliary data* the pointer 'data'. If the element is already present, the associated* data is updated (only if 'overwrite' is set to 1), and 0 is returned,* otherwise the element is inserted and 1 is returned. On out of memory the* function returns 0 as well but sets errno to ENOMEM, otherwise errno will* be set to 0.*/
/*
插入大小为“len”的元素“s”,设置为辅助数据指针“data”。
- 如果元素已存在,则关联的更新数据(仅当“overwrite”设置为1时),并返回0.
- 否则将插入元素并返回1。
- 内存不足函数也返回0,但将errno设置为ENOMEM,
- 否则errno将设置为0。
*/
int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old, int overwrite) {size_t i;int j = 0; /* Split position. If raxLowWalk() stops in a compressednode, the index 'j' represents the char we stopped within thecompressed node, that is, the position where to split thenode for insertion. */raxNode *h, **parentlink;debugf("### Insert %.*s with value %p\n", (int)len, s, data);i = raxLowWalk(rax,s,len,&h,&parentlink,&j,NULL);/* If i == len we walked following the whole string. If we are not* in the middle of a compressed node, the string is either already* inserted or this middle node is currently not a key, but can represent* our key. We have just to reallocate the node and make space for the* data pointer. */if (i == len && (!h->iscompr || j == 0 /* not in the middle if j is 0 */)) {debugf("### Insert: node representing key exists\n");/* Make space for the value pointer if needed. */if (!h->iskey || (h->isnull && overwrite)) {h = raxReallocForData(h,data);if (h) memcpy(parentlink,&h,sizeof(h));}if (h == NULL) {errno = ENOMEM;return 0;}/* Update the existing key if there is already one. */if (h->iskey) {if (old) *old = raxGetData(h);if (overwrite) raxSetData(h,data);errno = 0;return 0; /* Element already exists. */}/* Otherwise set the node as a key. Note that raxSetData()* will set h->iskey. */raxSetData(h,data);rax->numele++;return 1; /* Element inserted. */}/* If the node we stopped at is a compressed node, we need to* split it before to continue.** Splitting a compressed node have a few possible cases.* Imagine that the node 'h' we are currently at is a compressed* node containing the string "ANNIBALE" (it means that it represents* nodes A -> N -> N -> I -> B -> A -> L -> E with the only child* pointer of this node pointing at the 'E' node, because remember that* we have characters at the edges of the graph, not inside the nodes* themselves.** In order to show a real case imagine our node to also point to* another compressed node, that finally points at the node without* children, representing 'O':**     "ANNIBALE" -> "SCO" -> []** When inserting we may face the following cases. Note that all the cases* require the insertion of a non compressed node with exactly two* children, except for the last case which just requires splitting a* compressed node.** 1) Inserting "ANNIENTARE"**               |B| -> "ALE" -> "SCO" -> []*     "ANNI" -> |-|*               |E| -> (... continue algo ...) "NTARE" -> []** 2) Inserting "ANNIBALI"**                  |E| -> "SCO" -> []*     "ANNIBAL" -> |-|*                  |I| -> (... continue algo ...) []** 3) Inserting "AGO" (Like case 1, but set iscompr = 0 into original node)**            |N| -> "NIBALE" -> "SCO" -> []*     |A| -> |-|*            |G| -> (... continue algo ...) |O| -> []** 4) Inserting "CIAO"**     |A| -> "NNIBALE" -> "SCO" -> []*     |-|*     |C| -> (... continue algo ...) "IAO" -> []** 5) Inserting "ANNI"**     "ANNI" -> "BALE" -> "SCO" -> []** The final algorithm for insertion covering all the above cases is as* follows.** ============================= ALGO 1 =============================** For the above cases 1 to 4, that is, all cases where we stopped in* the middle of a compressed node for a character mismatch, do:** Let $SPLITPOS be the zero-based index at which, in the* compressed node array of characters, we found the mismatching* character. For example if the node contains "ANNIBALE" and we add* "ANNIENTARE" the $SPLITPOS is 4, that is, the index at which the* mismatching character is found.** 1. Save the current compressed node $NEXT pointer (the pointer to the*    child element, that is always present in compressed nodes).** 2. Create "split node" having as child the non common letter*    at the compressed node. The other non common letter (at the key)*    will be added later as we continue the normal insertion algorithm*    at step "6".** 3a. IF $SPLITPOS == 0:*     Replace the old node with the split node, by copying the auxiliary*     data if any. Fix parent's reference. Free old node eventually*     (we still need its data for the next steps of the algorithm).** 3b. IF $SPLITPOS != 0:*     Trim the compressed node (reallocating it as well) in order to*     contain $splitpos characters. Change child pointer in order to link*     to the split node. If new compressed node len is just 1, set*     iscompr to 0 (layout is the same). Fix parent's reference.** 4a. IF the postfix len (the length of the remaining string of the*     original compressed node after the split character) is non zero,*     create a "postfix node". If the postfix node has just one character*     set iscompr to 0, otherwise iscompr to 1. Set the postfix node*     child pointer to $NEXT.** 4b. IF the postfix len is zero, just use $NEXT as postfix pointer.** 5. Set child[0] of split node to postfix node.** 6. Set the split node as the current node, set current index at child[1]*    and continue insertion algorithm as usually.** ============================= ALGO 2 =============================** For case 5, that is, if we stopped in the middle of a compressed* node but no mismatch was found, do:** Let $SPLITPOS be the zero-based index at which, in the* compressed node array of characters, we stopped iterating because* there were no more keys character to match. So in the example of* the node "ANNIBALE", addig the string "ANNI", the $SPLITPOS is 4.** 1. Save the current compressed node $NEXT pointer (the pointer to the*    child element, that is always present in compressed nodes).** 2. Create a "postfix node" containing all the characters from $SPLITPOS*    to the end. Use $NEXT as the postfix node child pointer.*    If the postfix node length is 1, set iscompr to 0.*    Set the node as a key with the associated value of the new*    inserted key.** 3. Trim the current node to contain the first $SPLITPOS characters.*    As usually if the new node length is just 1, set iscompr to 0.*    Take the iskey / associated value as it was in the orignal node.*    Fix the parent's reference.** 4. Set the postfix node as the only child pointer of the trimmed*    node created at step 1.*//* ------------------------- ALGORITHM 1 --------------------------- */if (h->iscompr && i != len) {debugf("ALGO 1: Stopped at compressed node %.*s (%p)\n",h->size, h->data, (void*)h);debugf("Still to insert: %.*s\n", (int)(len-i), s+i);debugf("Splitting at %d: '%c'\n", j, ((char*)h->data)[j]);debugf("Other (key) letter is '%c'\n", s[i]);/* 1: Save next pointer. */raxNode **childfield = raxNodeLastChildPtr(h);raxNode *next;memcpy(&next,childfield,sizeof(next));debugf("Next is %p\n", (void*)next);debugf("iskey %d\n", h->iskey);if (h->iskey) {debugf("key value is %p\n", raxGetData(h));}/* Set the length of the additional nodes we will need. */size_t trimmedlen = j;size_t postfixlen = h->size - j - 1;int split_node_is_key = !trimmedlen && h->iskey && !h->isnull;size_t nodesize;/* 2: Create the split node. Also allocate the other nodes we'll need*    ASAP, so that it will be simpler to handle OOM. */raxNode *splitnode = raxNewNode(1, split_node_is_key);raxNode *trimmed = NULL;raxNode *postfix = NULL;if (trimmedlen) {nodesize = sizeof(raxNode)+trimmedlen+raxPadding(trimmedlen)+sizeof(raxNode*);if (h->iskey && !h->isnull) nodesize += sizeof(void*);trimmed = rax_malloc(nodesize);}if (postfixlen) {nodesize = sizeof(raxNode)+postfixlen+raxPadding(postfixlen)+sizeof(raxNode*);postfix = rax_malloc(nodesize);}/* OOM? Abort now that the tree is untouched. */if (splitnode == NULL ||(trimmedlen && trimmed == NULL) ||(postfixlen && postfix == NULL)){rax_free(splitnode);rax_free(trimmed);rax_free(postfix);errno = ENOMEM;return 0;}splitnode->data[0] = h->data[j];if (j == 0) {/* 3a: Replace the old node with the split node. */if (h->iskey) {void *ndata = raxGetData(h);raxSetData(splitnode,ndata);}memcpy(parentlink,&splitnode,sizeof(splitnode));} else {/* 3b: Trim the compressed node. */trimmed->size = j;memcpy(trimmed->data,h->data,j);trimmed->iscompr = j > 1 ? 1 : 0;trimmed->iskey = h->iskey;trimmed->isnull = h->isnull;if (h->iskey && !h->isnull) {void *ndata = raxGetData(h);raxSetData(trimmed,ndata);}raxNode **cp = raxNodeLastChildPtr(trimmed);memcpy(cp,&splitnode,sizeof(splitnode));memcpy(parentlink,&trimmed,sizeof(trimmed));parentlink = cp; /* Set parentlink to splitnode parent. */rax->numnodes++;}/* 4: Create the postfix node: what remains of the original* compressed node after the split. */if (postfixlen) {/* 4a: create a postfix node. */postfix->iskey = 0;postfix->isnull = 0;postfix->size = postfixlen;postfix->iscompr = postfixlen > 1;memcpy(postfix->data,h->data+j+1,postfixlen);raxNode **cp = raxNodeLastChildPtr(postfix);memcpy(cp,&next,sizeof(next));rax->numnodes++;} else {/* 4b: just use next as postfix node. */postfix = next;}/* 5: Set splitnode first child as the postfix node. */raxNode **splitchild = raxNodeLastChildPtr(splitnode);memcpy(splitchild,&postfix,sizeof(postfix));/* 6. Continue insertion: this will cause the splitnode to* get a new child (the non common character at the currently* inserted key). */rax_free(h);h = splitnode;} else if (h->iscompr && i == len) {/* ------------------------- ALGORITHM 2 --------------------------- */debugf("ALGO 2: Stopped at compressed node %.*s (%p) j = %d\n",h->size, h->data, (void*)h, j);/* Allocate postfix & trimmed nodes ASAP to fail for OOM gracefully. */size_t postfixlen = h->size - j;size_t nodesize = sizeof(raxNode)+postfixlen+raxPadding(postfixlen)+sizeof(raxNode*);if (data != NULL) nodesize += sizeof(void*);raxNode *postfix = rax_malloc(nodesize);nodesize = sizeof(raxNode)+j+raxPadding(j)+sizeof(raxNode*);if (h->iskey && !h->isnull) nodesize += sizeof(void*);raxNode *trimmed = rax_malloc(nodesize);if (postfix == NULL || trimmed == NULL) {rax_free(postfix);rax_free(trimmed);errno = ENOMEM;return 0;}/* 1: Save next pointer. */raxNode **childfield = raxNodeLastChildPtr(h);raxNode *next;memcpy(&next,childfield,sizeof(next));/* 2: Create the postfix node. */postfix->size = postfixlen;postfix->iscompr = postfixlen > 1;postfix->iskey = 1;postfix->isnull = 0;memcpy(postfix->data,h->data+j,postfixlen);raxSetData(postfix,data);raxNode **cp = raxNodeLastChildPtr(postfix);memcpy(cp,&next,sizeof(next));rax->numnodes++;/* 3: Trim the compressed node. */trimmed->size = j;trimmed->iscompr = j > 1;trimmed->iskey = 0;trimmed->isnull = 0;memcpy(trimmed->data,h->data,j);memcpy(parentlink,&trimmed,sizeof(trimmed));if (h->iskey) {void *aux = raxGetData(h);raxSetData(trimmed,aux);}/* Fix the trimmed node child pointer to point to* the postfix node. */cp = raxNodeLastChildPtr(trimmed);memcpy(cp,&postfix,sizeof(postfix));/* Finish! We don't need to continue with the insertion* algorithm for ALGO 2. The key is already inserted. */rax->numele++;rax_free(h);return 1; /* Key inserted. */}/* We walked the radix tree as far as we could, but still there are left* chars in our string. We need to insert the missing nodes. */while(i < len) {raxNode *child;/* If this node is going to have a single child, and there* are other characters, so that that would result in a chain* of single-childed nodes, turn it into a compressed node. */if (h->size == 0 && len-i > 1) {debugf("Inserting compressed node\n");size_t comprsize = len-i;if (comprsize > RAX_NODE_MAX_SIZE)comprsize = RAX_NODE_MAX_SIZE;raxNode *newh = raxCompressNode(h,s+i,comprsize,&child);if (newh == NULL) goto oom;h = newh;memcpy(parentlink,&h,sizeof(h));parentlink = raxNodeLastChildPtr(h);i += comprsize;} else {debugf("Inserting normal node\n");raxNode **new_parentlink;raxNode *newh = raxAddChild(h,s[i],&child,&new_parentlink);if (newh == NULL) goto oom;h = newh;memcpy(parentlink,&h,sizeof(h));parentlink = new_parentlink;i++;}rax->numnodes++;h = child;}raxNode *newh = raxReallocForData(h,data);if (newh == NULL) goto oom;h = newh;if (!h->iskey) rax->numele++;raxSetData(h,data);memcpy(parentlink,&h,sizeof(h));return 1; /* Element inserted. */oom:/* This code path handles out of memory after part of the sub-tree was* already modified. Set the node as a key, and then remove it. However we* do that only if the node is a terminal node, otherwise if the OOM* happened reallocating a node in the middle, we don't need to free* anything. */if (h->size == 0) {h->isnull = 1;h->iskey = 1;rax->numele++; /* Compensate the next remove. */assert(raxRemove(rax,s,i,NULL) != 0);}errno = ENOMEM;return 0;
}

raxLowWalk 函数

该函数原型如下,当需要在 Radix Tree 中查找、插入或是删除节点时,都会调用该函数。

/* Low level function that walks the tree looking for the string* 's' of 'len' bytes. The function returns the number of characters* of the key that was possible to process: if the returned integer* is the same as 'len', then it means that the node corresponding to the* string was found (however it may not be a key in case the node->iskey is* zero or if simply we stopped in the middle of a compressed node, so that* 'splitpos' is non zero).** Otherwise if the returned integer is not the same as 'len', there was an* early stop during the tree walk because of a character mismatch.** The node where the search ended (because the full string was processed* or because there was an early stop) is returned by reference as* '*stopnode' if the passed pointer is not NULL. This node link in the* parent's node is returned as '*plink' if not NULL. Finally, if the* search stopped in a compressed node, '*splitpos' returns the index* inside the compressed node where the search ended. This is useful to* know where to split the node for insertion.** Note that when we stop in the middle of a compressed node with* a perfect match, this function will return a length equal to the* 'len' argument (all the key matched), and will return a *splitpos which is* always positive (that will represent the index of the character immediately* *after* the last match in the current compressed node).** When instead we stop at a compressed node and *splitpos is zero, it* means that the current node represents the key (that is, none of the* compressed node characters are needed to represent the key, just all* its parents nodes). */
static inline size_t raxLowWalk(rax *rax, unsigned char *s, size_t len, raxNode **stopnode, raxNode ***plink, int *splitpos, raxStack *ts) {raxNode *h = rax->head;raxNode **parentlink = &rax->head;size_t i = 0; /* Position in the string. */size_t j = 0; /* Position in the node children (or bytes if compressed).*/while(h->size && i < len) {debugnode("Lookup current node",h);unsigned char *v = h->data;if (h->iscompr) {for (j = 0; j < h->size && i < len; j++, i++) {if (v[j] != s[i]) break;}if (j != h->size) break;} else {/* Even when h->size is large, linear scan provides good* performances compared to other approaches that are in theory* more sounding, like performing a binary search. */for (j = 0; j < h->size; j++) {if (v[j] == s[i]) break;}if (j == h->size) break;i++;}if (ts) raxStackPush(ts,h); /* Save stack of parent nodes. */raxNode **children = raxNodeFirstChildPtr(h);if (h->iscompr) j = 0; /* Compressed node only child is at index 0. */memcpy(&h,children+j,sizeof(h));parentlink = children+j;j = 0; /* If the new node is non compressed and we do notiterate again (since i == len) set the splitposition to 0 to signal this node representsthe searched key. */}debugnode("Lookup stop node is",h);if (stopnode) *stopnode = h;if (plink) *plink = parentlink;if (splitpos && h->iscompr) *splitpos = j;return i;
}

raxGetData/raxSetData 函数

这两个函数的原型如下所示,它们分别用来获得 raxNode 中保存的 value 指针,以及设置 raxNode 中保存的 value 指针。

  • raxGetData
/* Get the node auxiliary data. */
void *raxGetData(raxNode *n) {if (n->isnull) return NULL;void **ndata =(void**)((char*)n+raxNodeCurrentLength(n)-sizeof(void*));void *data;memcpy(&data,ndata,sizeof(data));return data;
}
  • raxSetData
/* Set the node auxiliary data to the specified pointer. */
/* 将节点辅助数据设置为指定的指针 */
void raxSetData(raxNode *n, void *data) {n->iskey = 1;if (data != NULL) {n->isnull = 0;void **ndata = (void**) ((char*)n+raxNodeCurrentLength(n)-sizeof(void*));memcpy(ndata,&data,sizeof(data));} else {n->isnull = 1;}
}

好了,了解了 Radix Tree 的基本操作函数后,我们最后再来看下,Stream 是如何把 Radix Tree 和 listpack 组合起来使用的。

Stream 如何组合使用 Radix Tree 和 listpack?

我们知道,Stream 保存的消息数据,按照 key-value 形式来看的话,消息 ID 就相当于 key,而消息内容相当于是 value。也就是说,Stream 会使用 Radix Tree 来保存消息 ID,然后将消息内容保存在 listpack 中,并作为消息 ID 的 value,用 raxNode 的 value 指针指向对应的 listpack。

这里我放了一张图,展示了 Stream 结构、rax、raxNode 以及 listpack 相互之间的关系。注意,在这张图中,我们假设就只有一个 streamID 作为 key。

我们可以看到,stream 结构体中的 rax 指针,指向了 Radix Tree 的头节点,也就是 rax 结构体。rax 结构体中的头指针进一步指向了第一个 raxNode。因为我们假设就只有一个 streamID,暂时没有其他 streamID 和该 streamID 共享前缀,所以,当前这个 streamID 就可以用压缩节点保存。

然后,第一个 raxNode 指向了下一个 raxNode,也是 Radix Tree 的叶子节点。这个节点的 size 为 0,它的 value 指针指向了实际的消息内容。

而在消息内容这里,是使用了 listpack 进行保存的。你可以看到,listpack 中是使用了 master entry 来保存键值对类型消息中的键,而值会在 master entry 后面保存。这种保存方式其实也是为了节省内存空间,这是因为很多消息的键是相同的,保存一份就行。关于在 Stream 中,将消息的键和值分开保存到 listpack 中的这种设计方法,我会在后面的课程中继续给你详细介绍。

另外,为了方便你更好地掌握非压缩节点和压缩节点,我再给你总结下它们的相同之处和区别,你也可以来整体回顾下。

它们的相同之处在于:

  • 都有保存元数据的节点头 HDR;都会包含指向子节点的指针,以及子节点所代表的字符串。
  • 从根节点到当前节点路径上的字符串如果是 Radix Tree 的一个 key,它们都会包含指向 key 对应 value 的指针。

不同之处在于:

  • 非压缩节点指向的子节点,每个子节点代表一个字符,非压缩节点可以指向多个子节点;
  • 压缩节点指向的子节点,代表的是一个合并字符串,压缩节点只能指向一个子节点。

而除了学习 raxNode,我还给你介绍了下 Radix Tree 中几个基本操作函数的作用,并展示了 Stream 类型是如何把消息 ID 和消息内容,分别保存在 Radix Tree 和 listpack 中的。这里你要注意的是,因为 Radix Tree 在保存具有公共前缀的数据时,能有效节省内存开销。同时,Radix Tree 本身也是有序的树型索引,可以支持单点和范围查询。所以,Redis 把消息 ID 保存在 Radix Tree 中,既可以节省内存空间,也能高效支持消息 ID 的查询。而 listpack 本身是紧凑列表,在保存大量消息内容的同时,也能有效节省内存。所以我希望,你能通过 Stream 对 Radix Tree 和 listpack 的使用,举一反三,把它们用在相应的消息存取或是大量字符串存取的场景中。

Radix Tree 优劣势

作为有序索引,Radix Tree 也能提供范围查询,和我们日常使用的 B+ 树,以及第5讲中介绍的跳表相比,你觉得 Radix Tree 有什么优势和不足么?

1、Radix Tree 优势

  • 本质上是前缀树,所以存储有「公共前缀」的数据时,比 B+ 树、跳表节省内存
  • 没有公共前缀的数据项,压缩存储,value 用 listpack 存储,也可以节省内存
  • 查询复杂度是 O(K),只与「目标长度」有关,与总数据量无关
  • 这种数据结构也经常用在搜索引擎提示、文字自动补全等场景

Stream 在存消息时,推荐使用默认自动生成的「时间戳+序号」作为消息 ID,不建议自己指定消息 ID,这样才能发挥 Radix Tree 公共前缀的优势。

2、Radix Tree 不足

  • 如果数据集公共前缀较少,会导致内存占用多
  • 增删节点需要处理其它节点的「分裂、合并」,跳表只需调整前后指针即可
  • B+ 树、跳表范围查询友好,直接遍历链表即可,Radix Tree 需遍历树结构
  • 实现难度高比 B+ 树、跳表复杂

每种数据结构都是在面对不同问题场景下,才被设计出来的,结合各自场景中的数据特点,使用优势最大的数据结构才是正解。

Redis中的Stream的实现Radix Tree源码解读相关推荐

  1. Redis radix tree源码解析

    Redis实现了不定长压缩前缀的radix tree,用在集群模式下存储slot对应的的所有key信息.本文将详述在Redis中如何实现radix tree. 核心数据结构 raxNode是radix ...

  2. 【页高速缓存】radix tree 源码解析

    项目要在内核做和页高速缓存相类似缓存机制,在写内核代码之前必须先搞清楚页高速缓存源码是什么情况. 之前有一篇博客分析过了页高速缓存的基础,但是远远没有达到动手写代码的基础.这几天端午节假期集中精力,搞 ...

  3. 从plugin路径中读取依赖并构造对象——Azkaban源码解读之Alert plugin实现(一)

    第一步加载类路径:azkaban.executor.AlerterHolder allAlerters 是一个HashMap ,key为String,value为Alerter mailAlerter ...

  4. Redis在Java中的使用及连接数据库(附源码)

    Redis在Java中的使用及连接数据库(附源码) 引言: 本文主要分享了Redis如何在IDEA中部署,运行:模拟加入Redis的操作: 文章目录 Redis在Java中的使用及连接数据库(附源码) ...

  5. python处理回显_Python中getpass模块无回显输入源码解析

    本文主要讨论了python中getpass模块的相关内容,具体如下. getpass模块 昨天跟学弟吹牛b安利Python标准库官方文档的时候偶然发现了这个模块.仔细一看内容挺少的,只有两个主要api ...

  6. redis源码解读二

    上一篇解读了一下SDS,本来觉得完了,但之后想想感觉少点什么,现在我们从使用的角度去梳理一下,大家想想对于字符串, 我们经常使用的有哪几个方法呢?这些方法又是怎么实现的? 在研究上面的几个方法之前我们 ...

  7. Redis 源码解读之 Rehash 的调用时机

    Redis 源码解读之 Rehash 的调用时机 背景和问题 本文想要解决的问题 什么时机触发 Rehash 操作? 什么时机实际执行 Rehash 函数? 结论 什么时机触发 Rehash 操作? ...

  8. Hive中lateral view的应用到源码解读

    对于从事大数据开发的同学,经常会应用到explode(炸裂函数)和lateral view(侧输出流). Explode(炸裂函数) 参数必须是array或者map格式(通常跟split函数使用): ...

  9. mac通过tree源码编译安装tree

    通过tree源码编译安装  下载源码:curl -O ftp://mama.indstate.edu/linux/tree/tree-1.6.0.tgz  解压源码:tar xzvf tree-1.6 ...

  10. python删除链表中重复的节点_Java编程删除链表中重复的节点问题解决思路及源码分享...

    一. 题目 在一个排序的链表中,存在重复的结点,请删除该链表中重复的结点,重复的结点不保留,返回链表头指针. 二. 例子 输入链表:1->2->3->3->4->4-&g ...

最新文章

  1. 一款让你轻松在IDEA画图的插件!
  2. Redis在Linux系统的配置优化
  3. 数字新写法3_000_000,简单明了
  4. 插件不既有Chrome版也有飞鸽传书
  5. RTX5 | 时间延时
  6. 餐饮后台UI模板有这个就够了!
  7. 100台服务器分发文件,通过简单shell脚本+rsync实现单一文件分发到多台服务器
  8. 2017.5.12PM
  9. js-JavaScript常见的创建对象的几种方式
  10. [网络安全自学篇] 九十二.《Windows黑客编程技术详解》之病毒启动技术创建进程API、突破SESSION0隔离、内存加载详解(3)
  11. 利用DDS IP实现线性调频信号(二)
  12. 苏教版四年级下册计算机说课稿,苏教版四年级下册认识多位数说课稿
  13. 大学生性价比计算机推荐,快开学了 大学生该如何选择一款高性价比电脑?
  14. 10 场年薪 60W 的 DBA 面试,被问到最多的 10 道题
  15. CSS3 变形:平移、旋转与缩放
  16. Java实现一个简单的日历表
  17. 《大富翁8》中智力问答的题目、答案
  18. mysql 二级什么意思_MySQL二级等级考试归纳——概念篇
  19. 给未来的你 — 李开复在2011级大学新生学习规划讲座上的演讲
  20. 计算机专业方面主要有哪些证书

热门文章

  1. 服务器安装linux后一直停留在光标,Ubuntu14更新后无法进入系统卡在光标界面解怎么办?...
  2. 手机上日程应该怎么设置提醒
  3. Linux中etc目录etc是什么单词的缩写
  4. 【规范】C/C++注释格式
  5. Java的sort用法深究,compare按照姓氏排序
  6. 支付宝"手机网站支付"主域名申请了,二级域名还要申请吗
  7. SpringBoot+Vue项目小区物业管理系统
  8. Fishermen Gym - 101964E(二分+前缀数组)
  9. 2015中国国内元器件分销商10亿俱乐部20强榜单
  10. 能上QQ不能打开网页的情况之一