上一章,笔者和大家一起学习了如何把一个节点插入到跳跃表中。现在,再让我们学习如何将一个元素从跳跃表中移除。如果大家对zset还有印象,应该会记得zset这个结构体有两个字段:zsl(跳跃表)和dict(字典)。这个字典是用于存储元素和分值用的,当我们需要查询或删除一个元素的分值,如果没有这个字典,我们遍历跳跃表需要O(N)的时间复杂度,即从L0层开始逐个递进遍历,因为我们并不知道这个节点的分值,但是有这个字典,我们通过字典获取元素对应的分值,可以在O(logN)的时间复杂度内定位到元素在跳跃表中的位置。
在删除有序集合中的元素时,就用到从字典中获取元素对应分值,再根据元素和分值移除跳跃表中的节点:
/* Delete the element 'ele' from the sorted set, returning 1 if the element * existed and was deleted, 0 otherwise (the element was not there). */ int zsetDel(robj *zobj, sds ele) { if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {//如果有序集合编码是ziplist则走这个分支的删除逻辑 //do something…… } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {//如果有序集合的编码是跳跃表则走这个分支的删除逻辑 zset *zs = zobj->ptr; dictEntry *de; double score; de = dictUnlink(zs->dict, ele);//<1>把元素从字典中移除,并返回被移除元素 if (de != NULL) {//如果元素不为NULL,则进入到删除跳跃表节点逻辑 /* * Get the score in order to delete from the skiplist later. * de(dictEntry)对象会记录元素及分支,这里从de对象获取元素分值, * 以便后续根据元素、分值删除跳跃表中的节点。 * */ score = *(double *) dictGetVal(de); /* * Delete from the hash table and later from the skiplist. * Note that the order is important: deleting from the skiplist * actually releases the SDS string representing the element, * which is shared between the skiplist and the hash table, so * we need to delete from the skiplist as the final step. * 我们在代码<1>处将元素从字典中删除,下面要释放de的内存。 * */ dictFreeUnlinkedEntry(zs->dict, de); /* * Delete from skiplist. * 从跳跃表中移除元素 * */ int retval = zslDelete(zs->zsl, score, ele, NULL);//<2> serverAssert(retval); if (htNeedsResize(zs->dict)) dictResize(zs->dict); return 1; } } else { serverPanic("Unknown sorted set encoding"); } return 0; /* No such element found. */ }
上面的代码并没有完全展示如何从跳跃表移除节点,我们在追溯到上述代码<2>处的zslDelete()方法:
/* Delete an element with matching score/element from the skiplist. * The function returns 1 if the node was found and deleted, otherwise * 0 is returned. * 从跳跃表中找到匹配分值和元素的节点,如果节点存在并删除则返回1,否则返回0。 * * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise * it is not freed (but just unlinked) and *node is set to the node pointer, * so that it is possible for the caller to reuse the node (including the * referenced SDS string at node->ele). * 如果node为空,则节点从跳跃表中移除后会调用zslFreeNode()释放节点占用的内存,如果 * node不为空,则仅仅把节点从跳跃表中移除,但不释放节点所占据的内存空间,并且把节点的 * 地址存放到node引用,因为有些调用方希望在将节点从跳跃表移除后,还需要根据节点的数据 * 做一些额外操作,如果这时候释放节点占据的内存,会对调用方产生限制。 * */ int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; int i; //这段代码就是找到每一层删除节点的后置节点 x = zsl->header; for (i = zsl->level - 1; i >= 0; i--) { while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele, ele) < 0))) { x = x->level[i].forward; } update[i] = x; } /* * We may have multiple elements with the same score, what we need * is to find the element with both the right score and object. * 由于可能存在多个元素都是同一分值,所以我们找到与元素和分值相匹配的节点。 * */ x = x->level[0].forward; if (x && score == x->score && sdscmp(x->ele, ele) == 0) { //从跳跃表中删除节点 zslDeleteNode(zsl, x, update);//<1> //如果node为NULL,则释放删除节点的空间,否则将删除节点的引用赋值给node if (!node) zslFreeNode(x); else *node = x; return 1; } return 0; /* not found */ }
从上面的代码我们可以知道,zslDelete()依旧不是将节点从跳跃表移除的方法,在<1>处的zslDeleteNode()方法才是,但zslDelete()方法会在跳跃表移除元素后,根据传入的node指针是否为空,判断是否要释放被移除节点的内存空间。我们继续追溯到zslDeleteNode()方法:
/* Internal function used by zslDelete, zslDeleteRangeByScore and * zslDeleteRangeByRank. * 内部方法, zslDelete、zslDeleteRangeByScore、zslDeleteRangeByRank都是 * 通过此方法完成节点的移除。 * */ void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) { int i; //从最高层开始逐层移除对删除节点x的引用 for (i = 0; i < zsl->level; i++) { /* * 如果第i层的后继节点指向的前驱为x,则将后继节点在i层的跨度更新 * 为后继节点指向x在i层前驱的跨度,再把后继节点在i层的前驱更新为x在i层指向的前驱。 */ if (update[i]->level[i].forward == x) { update[i]->level[i].span += x->level[i].span - 1; update[i]->level[i].forward = x->level[i].forward; } else { /* * 如果在i层后继节点的前驱并不指向x,则代表x的层高<i,只要 * 简单把后继节点的跨度-1即可。 */ update[i]->level[i].span -= 1; } } /* * 如果被删除节点的前驱节点在L0层有前驱节点,则更新前驱节点的backward为 * 被删除节点的backward,否则被删除节点为跳跃表的末尾节点,则将跳跃表的 * 末尾指针指向被删除节点的backward指向的节点。 */ if (x->level[0].forward) { x->level[0].forward->backward = x->backward; } else { zsl->tail = x->backward; } /* * 在删除节点时,我们有可能删除跳跃表中层高最高的节点,所以这里要更新跳跃表当前 * 最高的层高。如果跳跃表层高大于1,则代表跳跃表中一定有节点,于是我们从头结点的 * 最高层出发,判断头节点在zsl->level - 1层是否有前驱节点,如果没有的话则代表 * 跳跃表最高层的节点被删除,这里要更新跳跃表的层高,直到判断头节点从第i层出发, * 前驱节点不为NULL,则跳跃表的层高为i+1。 */ while (zsl->level > 1 && zsl->header->level[zsl->level - 1].forward == NULL) zsl->level--; //更新跳跃表节点数。 zsl->length--; }
除了可以移除跳跃表中的元素,我们还可以根据索引区间、分值区间来移除跳跃表的元素,这两个操作都是由zremrangeGenericCommand()完成:
/* * Implements ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREMRANGEBYLEX commands. * 如果rangetype传入ZRANGE_RANK,执行:ZREMRANGEBYRANK key start stop * 如果rangetype传入ZRANGE_SCORE,执行:ZREMRANGEBYSCORE key min max * 如果rangetype传入ZRANGE_LEX,执行:ZREMRANGEBYLEX key min max * */ #define ZRANGE_RANK 0 #define ZRANGE_SCORE 1 #define ZRANGE_LEX 2 void zremrangeGenericCommand(client *c, int rangetype) { robj *key = c->argv[1]; robj *zobj; int keyremoved = 0; unsigned long deleted = 0; zrangespec range; zlexrangespec lexrange; long start, end, llen; /* * Step 1: Parse the range. * 解析删除区间 * */ if (rangetype == ZRANGE_RANK) { if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) || (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return; } else if (rangetype == ZRANGE_SCORE) { if (zslParseRange(c->argv[2], c->argv[3], &range) != C_OK) { addReplyError(c, "min or max is not a float"); return; } } else if (rangetype == ZRANGE_LEX) { if (zslParseLexRange(c->argv[2], c->argv[3], &lexrange) != C_OK) { addReplyError(c, "min or max not valid string range item"); return; } } /* * Step 2: Lookup & range sanity checks if needed. * 检查key是否存在,类型是否为zset * */ if ((zobj = lookupKeyWriteOrReply(c, key, shared.czero)) == NULL || checkType(c, zobj, OBJ_ZSET)) goto cleanup; if (rangetype == ZRANGE_RANK) { /* * Sanitize indexes. * 检查索引是否有效。 * */ llen = zsetLength(zobj); if (start < 0) start = llen + start; if (end < 0) end = llen + end; if (start < 0) start = 0; /* Invariant: start >= 0, so this test will be true when end < 0. * The range is empty when start > end or start >= length. */ if (start > end || start >= llen) { addReply(c, shared.czero); goto cleanup; } if (end >= llen) end = llen - 1; } /* * Step 3: Perform the range deletion operation. * 根据区间删除元素。 * */ if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { //压缩列表删除逻辑... } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; switch (rangetype) { case ZRANGE_RANK: /* * 如果是根据索引删除元素,这里会把索引转成跨度,比如我们要删除跳跃表索引落在 * [0,5]区间的元素,跳跃表索引0的节点,为头节点L0层的前驱节点,跨度为1,start+1=0+1=1, * 同理索引为5的节点,在跳跃表的跨度为end+1=5+1=6。 */ deleted = zslDeleteRangeByRank(zs->zsl, start + 1, end + 1, zs->dict); break; case ZRANGE_SCORE://根据分值删除元素 deleted = zslDeleteRangeByScore(zs->zsl, &range, zs->dict); break; case ZRANGE_LEX://ZREMRANGEBYLEX使用场景较少,就不介绍了 deleted = zslDeleteRangeByLex(zs->zsl, &lexrange, zs->dict); break; } if (htNeedsResize(zs->dict)) dictResize(zs->dict); if (dictSize(zs->dict) == 0) { dbDelete(c->db, key); keyremoved = 1; } } else { serverPanic("Unknown sorted set encoding"); } /* Step 4: Notifications and reply. */ if (deleted) { char *event[3] = {"zremrangebyrank", "zremrangebyscore", "zremrangebylex"}; signalModifiedKey(c, c->db, key); notifyKeyspaceEvent(NOTIFY_ZSET, event[rangetype], key, c->db->id); if (keyremoved) notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id); } server.dirty += deleted; addReplyLongLong(c, deleted); cleanup: if (rangetype == ZRANGE_LEX) zslFreeLexRange(&lexrange); }
从上面的代码我们可以知道zremrangeGenericCommand()是根据传入的rangetype来决定是执行索引区间删除还是执行分值区间删除,但真正的删除工作还是交给zslDeleteRangeByRank()和zslDeleteRangeByScore()两个方法来做的,所以我们还要追溯到这两个方法里,一探删除的逻辑。
我们先来看看zslDeleteRangeByRank()是如何根据跨度删除节点的:
/* Delete all the elements with rank between start and end from the skiplist. * Start and end are inclusive. Note that start and end need to be 1-based */ unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; unsigned long traversed = 0, removed = 0; int i; //找到跨度为start的节点的后继节点 x = zsl->header; for (i = zsl->level - 1; i >= 0; i--) { while (x->level[i].forward && (traversed + x->level[i].span) < start) { traversed += x->level[i].span; x = x->level[i].forward; } update[i] = x; } /* * x原先是L0层跨度为start的节点的后继节点,x前进一个节点到 * 跨度为start的节点,即删除区间最开始的节点,这里对traversed+1, * 此时traversed等于start,traversed本质上为跨度,代表删除节点的进度, * 如果traversed<=end则表明跳跃表中还存在待删除区间的节点。 * 在删除跨度落在[start,end]区间的节点时,会先声明一个next指针,指向x * 在L0层的前驱节点,之后调用zslDeleteNode()从跳跃表中移除x,再删除x元素 * 及分值在字典上的映射关系,然后回收x指向的内存区域。然后对删除数量removed+1, * 此时跨度为traversed的节点已删,对traversed+1,将next指向的内存地址赋值给x, * 如果对traversed<=end,则继续重复上面的删除操作,直到删除跳跃表最后一个元素,x * 为NULL,或者删除进度traversed>end。 */ traversed++; x = x->level[0].forward; while (x && traversed <= end) { //next指向x的前驱节点 zskiplistNode *next = x->level[0].forward; //从跳跃表中移除x zslDeleteNode(zsl, x, update); //从字典里删除x dictDelete(dict, x->ele); //释放x占用的内存空间 zslFreeNode(x); //删除数量+1 removed++; //跨度+1 traversed++; //x指向next x = next; } return removed; }
在了解如何根据分值删除元素之前,我们先来了解一个结构体zrangespec,我们知道,Redis允许我们传入一个分值区间进行节点的查询和删除,这个区间可以是开区间也可以是闭区间,Redis用zrangespec.min和zrangespec.max两个字段存储分值的范围,用zrangespec.minex和zrangespec.maxex是否为0表示区间的开闭:
/* * Struct to hold a inclusive/exclusive range spec by score comparison. * 此结构体用于表示一个指定区间,minex为0时表示在进行最小值比较时,要包含最小值本身 * 同理maxex为0时表示进行最大值比较时,要包含最大值本身。 * 比如:min=2,max=9 * 当minex=0,maxex=0时,区间为:[2,9] * 当minex=1,maxex=0时,区间为:(2,9] * 当minex=0,maxex=1时,区间为:[2,9) * 当minex=1,maxex=1时,区间为:(2,9) * */ typedef struct { double min, max; int minex, maxex; /* are min or max exclusive? */ } zrangespec;
了解了结构体zrangespec后,我们再来看看zslDeleteRangeByScore()是如何根据分值区间删除节点的:
/* Delete all the elements with score between min and max from the skiplist. * Min and max are inclusive, so a score >= min || score <= max is deleted. * Note that this function takes the reference to the hash table view of the * sorted set, in order to remove the elements from the hash table too. */ unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; unsigned long removed = 0; int i; /* * 查找删除区间范围内第一个节点的每一层后继节点,如果range->minex不为0, * 我们要删除分值大于min,且不包含min的元素,所以要找到分值小于等于range->min的后继节点。 * 如果range->minex为0,我们要删除分值大于等于min的元素,所以要找到分值小于range->min的 * 后继节点。 */ x = zsl->header; for (i = zsl->level - 1; i >= 0; i--) { while (x->level[i].forward && (range->minex ? x->level[i].forward->score <= range->min : x->level[i].forward->score < range->min)) x = x->level[i].forward; update[i] = x; } /* * Current node is the last with score < or <= min. * x在L0层前进一个节点即到达删除区间的第一个节点。 * */ x = x->level[0].forward; /* * Delete nodes while in range. * 如果range->maxex不为0,我们要删除分值小于range->max的节点,如果range->maxex * 为0,则删除分值小于等于range->max的元素。 * 下面的移除逻辑和上面根据跨度移除的逻辑比较类似,依旧是先持当前待移除节点的前驱节点, * 从跳跃表移除当前节点,移除元素和分值在字典上的映射,释放节点的内存,然后对累计删除 * removed+1,在把x指向前驱节点,如果x不为NULL,且分值依旧落在区间内,则执行移除操作, * 直到尾节点被移除,x为NULL,或者x的分值不在区间内。 * */ while (x && (range->maxex ? x->score < range->max : x->score <= range->max)) { zskiplistNode *next = x->level[0].forward; zslDeleteNode(zsl, x, update); dictDelete(dict, x->ele); zslFreeNode(x); /* Here is where x->ele is actually released. */ removed++; x = next; } return removed; }
至此,我们了解了跳跃表是如何来移除元素的。
我们已经学习了如何将元素插入到跳跃表,如何从跳跃表中移除元素。那么下一步,就是如何修改跳跃表的元素。我们不能保证跳跃表元素的分值永远不变,比如笔者在第一章举的一个例子,用一个跳跃表存储不同人的体重,如果体重变了,那么要修改跳跃表的分值又该怎么做呢?更新元素分值的步骤简单到让人大跌眼镜,笔者之前特意先介绍新增节点再介绍删除节点也是为了更好的介绍如何更新节点,可能有人已经猜到了,跳跃表更新节点时,就是把原先的节点删除,然后再根据元素和当前分值创建一个节点,插入到跳跃表中:
/* Update the score of an elmenent inside the sorted set skiplist. * Note that the element must exist and must match 'score'. * This function does not update the score in the hash table side, the * caller should take care of it. * 更新跳跃表中元素的分值,元素必须存在在跳跃表中,且传入的curscore必须与当前分值 * 在跳跃表中的分值相等,一般从zset->dict获取元素对应的分值,作为curscore传入。 * 调用方要注意的是:这个方法不会去更新该元素在zset->dict中的分值。 * * Note that this function attempts to just update the node, in case after * the score update, the node would be exactly at the same position. * Otherwise the skiplist is modified by removing and re-adding a new * element, which is more costly. * 此方法只是尝试更新节点,如果节点在跳跃表中的分值和newscore,则节点在跳跃表的位置不变, * 则退出更新。如果分值与newscore不等,则会先移除该节点,再把节点重新加入到跳跃表中。 * * The function returns the updated element skiplist node pointer. * 此方法会返回更新节点的指针。 * * */ zskiplistNode *zslUpdateScore(zskiplist *zsl, double curscore, sds ele, double newscore) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; int i; /* We need to seek to element to update to start: this is useful anyway, * we'll have to update or remove it. */ x = zsl->header; //查找每一层更新节点的后继节点 for (i = zsl->level - 1; i >= 0; i--) { while (x->level[i].forward && (x->level[i].forward->score < curscore || (x->level[i].forward->score == curscore && sdscmp(x->level[i].forward->ele, ele) < 0))) { x = x->level[i].forward; } update[i] = x; } /* * Jump to our element: note that this function assumes that the * element with the matching score exists. * x->level[0].forward会前进到待更新节点,待更新节点必须不为NULL,且分值和元素 * 内容完全匹配curscore和ele,才能进行下一步操作。 * */ x = x->level[0].forward; serverAssert(x && curscore == x->score && sdscmp(x->ele, ele) == 0); /* If the node, after the score update, would be still exactly * at the same position, we can just update the score without * actually removing and re-inserting the element in the skiplist. * * 如果分值修改后, 排序未发生变化,则直接修改分数即可,不需要移除元素再插入元素。 * 这里分四种情况讨论什么时候仅更新分值,不需要移除再新增: * 1.(x->backward == NULL&&x->level[0].forward == NULL)为true: * x->backward为NULL,代表x是跳跃表中除头节点外第一个节点,x->level[0].forward为NULL, * 代表当前跳跃表只有一个节点,即x是尾节点,此时仅更新分值即可。 * 2.(x->backward->score < newscore&&x->level[0].forward == NULL)为true: * x->level[0].forward == NULL代表x是末尾节点,而x->backward->score < newscore如果成立 * 则表明倒数第二个节点的分值依旧小于末尾节点的新分值,即便更新x分值后依旧为末尾节点。 * 3.(x->backward == NULL&&x->x->level[0].forward->score > newscore)为true: * x为跳跃表中除头节点外第一个节点,且除头结点外的第二个节点的分值大于x的新分值,即便更新x * 分值后x节点也不能前进。 * 4.(x->backward->score < newscore&&x->level[0].forward->score > newscore)为true: * 跳跃表中最靠近x的后继节点的分值小于x的新分值,最靠近x的前驱节点的分值大于x的新分值,如果这两个 * 条件成立,即便更新x的分值后x节点也不能前进或者后退。 * */ if ((x->backward == NULL || x->backward->score < newscore) && (x->level[0].forward == NULL || x->level[0].forward->score > newscore)) { x->score = newscore; return x; } /* No way to reuse the old node: we need to remove and insert a new * one at a different place. * * 更新节点在跳跃表中的位置发生变化,先删除节点,再插入节点 * */ zslDeleteNode(zsl, x, update); // 添加新的节点 zskiplistNode *newnode = zslInsert(zsl, newscore, x->ele); /* * We reused the old node x->ele SDS string, free the node now * since zslInsert created a new one. * 释放原先节点占据的空点 * */ x->ele = NULL; zslFreeNode(x); return newnode; }
我们知道,不管我们是要往跳跃表新增节点亦或是更新节点,只要调用ZADD命令即可:ZADD key score1 member1 [score2 member2],先前我们只是看了跳跃表的新增zslInsert()和修改zslUpdateScore()方法,但我们并不知道ZADD命令是如何来编排这两个方法的。下面就让我们看看ZADD命令的执行流程,ZADD命令由zaddCommand()进而调用到zaddGenericCommand()方法。这里笔者仅节选了zaddGenericCommand()方法的主要流程,当我们要将若干个元素和分值加入到跳跃表中,Redis会先根据我们选定的db判断我们输入的key是否存在,如果不存在则根据<1>处的条件决定是创建跳跃表还是压缩列表;如果根据key取出来的redisObject对象不为NULL,则判断其类型是否为zset,不是的话则进入<2>处的分支报错。如果为zobj创建好zset对象,或者确认好已存在的zobj的类型为zset,则执行<3>处的for循环将我们输入的元素和分值添加或更新到zset中,可以看到添加/更新的操作是在<4>处的zsetAdd()方法完成的、
//当执行ZADD命令会调用到zaddCommand()方法 void zaddCommand(client *c) { zaddGenericCommand(c, ZADD_NONE); } void zaddGenericCommand(client *c, int flags) { static char *nanerr = "resulting score is not a number (NaN)"; robj *key = c->argv[1]; robj *zobj; sds ele; double score = 0, *scores = NULL; int j, elements; int scoreidx = 0; /* The following vars are used in order to track what the command actually * did during the execution, to reply to the client and to trigger the * notification of keyspace change. */ int added = 0; /* 新增节点数量 */ int updated = 0; /* 更新节点分值数量 */ int processed = 0; /* Number of elements processed, may remain zero with options like XX. */ …… /* * Start parsing all the scores, we need to emit any syntax error * before executing additions to the sorted set, as the command should * either execute fully or nothing at all. * 在将新节点加入或更新到有序集合前会用语法分析解析所有分值,以保证待加入或者待更新的 * 节点要嘛全部加入或全部更新,要嘛既不加入也不更新。 * */ scores = zmalloc(sizeof(double) * elements); for (j = 0; j < elements; j++) { if (getDoubleFromObjectOrReply(c, c->argv[scoreidx + j * 2], &scores[j], NULL) != C_OK) goto cleanup; } /* * Lookup the key and create the sorted set if does not exist. */ zobj = lookupKeyWrite(c->db, key); if (zobj == NULL) {//如果key不存在我们选定的db中,则创建一个有序集合 if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */ /* * 这里会根据最大的压缩列表节点大小,或者插入元素的长度判断有序集合的编码是跳跃表 * 还是压缩列表。 */ if (server.zset_max_ziplist_entries == 0 || server.zset_max_ziplist_value < sdslen(c->argv[scoreidx + 1]->ptr)) {//<1> zobj = createZsetObject(); } else { zobj = createZsetZiplistObject(); } //在选定的db上存储key和zset的关系映射 dbAdd(c->db, key, zobj); } else { if (zobj->type != OBJ_ZSET) {//<2>如果key在选定的db已存在,但类型不是zset则报错 addReply(c, shared.wrongtypeerr); goto cleanup; } } for (j = 0; j < elements; j++) {//<3> double newscore; //分值 score = scores[j]; int retflags = flags; //元素 ele = c->argv[scoreidx + 1 + j * 2]->ptr; int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);//<4>往zobj添加元素 if (retval == 0) { addReplyError(c, nanerr); goto cleanup; } //如果条件成立,代表当前操作为新增节点 if (retflags & ZADD_ADDED) added++; //如果条件成立,代表当前操作仅为更新节点分值 if (retflags & ZADD_UPDATED) updated++; if (!(retflags & ZADD_NOP)) processed++; score = newscore; } server.dirty += (added + updated); reply_to_client: if (incr) { /* ZINCRBY or INCR option. */ if (processed) addReplyDouble(c, score); else addReplyNull(c); } else { /* ZADD. */ addReplyLongLong(c, ch ? added + updated : added); } cleanup: zfree(scores); if (added || updated) { signalModifiedKey(c, c->db, key); notifyKeyspaceEvent(NOTIFY_ZSET, incr ? "zincr" : "zadd", key, c->db->id); } }
上面的zaddGenericCommand()方法并没有完整展示ZADD命令是如何把节点插入或更新到zset,我们继续追溯到zsetAdd()方法,这里笔者依旧仅节选了部分主要代码,如果传入的zobj对象编码为压缩列表,则判断传入的元素是否在压缩列表中,在的话则取出其在压缩列表中节点,从节点中获取分值,判断要更新的分值和当前分值是否相等,只有不等才移除压缩列表原先的节点,再把当前元素和分值插入到压缩列表中。如果压缩列表不存在此元素,则将元素和分值插入到压缩列表,插入之后,再判断压缩列表的长度是否可以转换为跳跃表,或是新插入节点元素的长度达到压缩列表允许转化为跳跃表的长度。
如果zobj对象的编码为跳跃表,则判断元素是否在跳跃表的字典上,如果存在的话代表当前操作为更新操作,这里会获取元素当前的分值,调用zslUpdateScore()方法将跳跃表、当前分值、元素和新分值传入,更新节点在跳跃表中的分值。如果元素不存在字典上,则需要执行新增节点操作,调用zslInsert()方法会创建一个节点存储元素及分值,并插入到跳跃表中,需要注意的一点是,zslInsert()方法仅仅是将节点插入到跳跃表,不会保存元素和分值在zset->dict的映射,所以在调用完zslInsert()方法后,需要手动在zset->dict上建立元素和分值的映射。
/* Add a new element or update the score of an existing element in a sorted * set, regardless of its encoding. * 在不考虑有序集合的编码是压缩列表(ziplist)或是跳跃表(skillist)的情况下,往有序集合 * 新增一个节点或者更新已有节点分值, * * The command as a side effect of adding a new element may convert the sorted * set internal encoding from ziplist to hashtable+skiplist. * 这个方法可能在加入元素时将有序集合内部实现由压缩列表转换为哈希表+跳跃表 * */ int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) { /* Turn options into simple to check vars. 可选参数解析 */ int incr = (*flags & ZADD_INCR) != 0; int nx = (*flags & ZADD_NX) != 0; int xx = (*flags & ZADD_XX) != 0; /* * We'll return our response flags. * flags的结果会告诉调用方当前节点在跳跃表是新增还是更新或是其他状态。 * */ *flags = 0; double curscore; /* * NaN as input is an error regardless of all the other parameters. * 是否分值不是有效数值则返回。 */ if (isnan(score)) { *flags = ZADD_NAN; return 0; } /* Update the sorted set according to its encoding. * 根据有序集合的编码更新有序集合 */ if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {//如果有序集合的编码是压缩列表 unsigned char *eptr; //如果在压缩列表中能找到该元素,则把元素的分值存储到curscore指针指向的内存 if ((eptr = zzlFind(zobj->ptr, ele, &curscore)) != NULL) { …… /* Remove and re-insert when score changed. 如果元素当前的分值与要更新的分值不相等,则从压缩列表删除节点再重新插入 */ if (score != curscore) { zobj->ptr = zzlDelete(zobj->ptr, eptr); zobj->ptr = zzlInsert(zobj->ptr, ele, score); *flags |= ZADD_UPDATED; } return 1; } else if (!xx) {//如果是往压缩列表新增元素 /* * Optimize: check if the element is too large or the list * becomes too long *before* executing zzlInsert. * 插入元素到跳跃表后,如果发现压缩列表过长或者元素过大,则要转换 * 有序集合编码从压缩列表到跳跃表。 * */ zobj->ptr = zzlInsert(zobj->ptr, ele, score); if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries || sdslen(ele) > server.zset_max_ziplist_value)//如果条件成立则将压缩列表转换为跳跃表 zsetConvert(zobj, OBJ_ENCODING_SKIPLIST); if (newscore) *newscore = score; *flags |= ZADD_ADDED; return 1; } else { *flags |= ZADD_NOP; return 1; } } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {//如果有序集合的编码是跳跃表 zset *zs = zobj->ptr; zskiplistNode *znode; dictEntry *de; //如果元素在跳跃表的字典里已存在,这里做更新分值操作,否则做新增节点操作 de = dictFind(zs->dict, ele); if (de != NULL) {//更新分值 …… //获取元素当前分值 curscore = *(double *) dictGetVal(de); …… /* Remove and re-insert when score changes. * 如果当前分值和要更新的分值不等,则做移除-重新插入的操作 */ if (score != curscore) { znode = zslUpdateScore(zs->zsl, curscore, ele, score); /* Note that we did not removed the original element from * the hash table representing the sorted set, so we just * update the score. * 哈希表中不需要移除元素,仅修改分值即可 * */ dictGetVal(de) = &znode->score; /* Update score ptr. */ *flags |= ZADD_UPDATED; } return 1; // 元素不存在 } else if (!xx) {//往跳跃表新增节点 ele = sdsdup(ele); //插入新元素,并断言元素及其分值是否成功加入到字典 znode = zslInsert(zs->zsl, score, ele); serverAssert(dictAdd(zs->dict, ele, &znode->score) == DICT_OK); *flags |= ZADD_ADDED; return 1; } else { *flags |= ZADD_NOP; return 1; } } else {//如果有序集合的编码既不是压缩列表也不是跳跃表则进入此分支,一般不会进入 serverPanic("Unknown sorted set encoding"); } return 0; /* Never reached. */ }