前面几篇文章,我们完全领略了redis的string,hash,list,set数据类型的实现方法,相信对redis已经不再神秘。
本篇我们将介绍redis的最后一种数据类型: zset 的相关实现。
本篇过后,我们对redis的各种基础功能,应该不会再有疑惑。有可能的话,我们后续将会对redis的高级功能的实现做解析。(如复制、哨兵模式、集群模式)
回归本篇主题,zset。zset 又称有序集合(sorted set),即是序版本的set。经过上篇的介绍,大家可以看到,redis的读取功能相当有限,许多是基于随机数的方式进行读取,其原因就是set是无序的。当set有序之后,查询能力就会得到极大的提升。1. 可以根据下标进行定位元素; 2. 可以范围查询元素; 这是有序带来的好处。
那么,我们不妨先思考一下,如何实现有序?两种方法:1. 根据添加顺序定义,1、2、3... ; 2. 自定义排序值; 第1种方法实现简单,添加时复杂度小,但是功能受限;第2种方法相对自由,对于每次插入都可能涉及重排序问题,但是查询相对稳定,可以不必完全受限于系统实现;
同样,我们以功能列表,到数据结构,再功能实现的思路,来解析redis的zset有序集合的实现方式吧。
零、redis zset相关操作方法
zset: Redis 有序集合是string类型元素的集合,且不允许重复的成员。每个元素都会关联一个double类型的分数,通过分数来为集合中的成员进行从小到大的排序。
使用场景如: 保存任务队列,该队列由后台定时扫描; 排行榜;
从官方手册上查到相关使用方法如下:
1> ZADD key score1 member1 [score2 member2]
功能: 向有序集合添加一个或多个成员,或者更新已存在成员的分数
返回值: 添加成功的元素个数(已存在的添加不成功)2> ZCARD key
功能: 获取有序集合的成员数
返回值: 元素个数或03> ZCOUNT key min max
功能: 计算在有序集合中指定区间分数的成员数
返回值: 区间内的元素个数4> ZINCRBY key increment member
功能: 有序集合中对指定成员的分数加上增量 increment
返回值: member增加后的分数5> ZINTERSTORE destination numkeys key [key ...]
功能: 计算给定的一个或多个有序集的交集并将结果集存储在新的有序集合 key 中
返回值: 交集元素个数6> ZLEXCOUNT key min max
功能: 在有序集合中计算指定字典区间内成员数量
返回值: 区间内的元素个数7> ZRANGE key start stop [WITHSCORES]
功能: 通过索引区间返回有序集合指定区间内的成员
返回值: 区间内元素列表8> ZRANGEBYLEX key min max [LIMIT offset count]
功能: 通过字典区间返回有序集合的成员
返回值: 区间内元素列表9> ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT]
功能: 通过分数返回有序集合指定区间内的成员
返回值: 区间内元素列表10> ZRANK key member
功能: 返回有序集合中指定成员的索引
返回值: member的排名或者 nil11> ZREM key member [member ...]
功能: 移除有序集合中的一个或多个成员
返回值: 成功移除的元素个数12> ZREMRANGEBYLEX key min max
功能: 移除有序集合中给定的字典区间的所有成员
返回值: 成功移除的元素个数13> ZREMRANGEBYRANK key start stop
功能: 移除有序集合中给定的排名区间的所有成员
返回值: 成功移除的元素个数14> ZREMRANGEBYSCORE key min max
功能: 移除有序集合中给定的分数区间的所有成员
返回值: 成功移除的元素个数15> ZREVRANGE key start stop [WITHSCORES]
功能: 返回有序集中指定区间内的成员,通过索引,分数从高到低
返回值: 区间内元素列表及分数16> ZREVRANGEBYSCORE key max min [WITHSCORES]
功能: 返回有序集中指定分数区间内的成员,分数从高到低排序
返回值: 区间内元素列表及分数17> ZREVRANK key member
功能: 返回有序集合中指定成员的排名,有序集成员按分数值递减(从大到小)排序
返回值: member排名或者 nil18> ZSCORE key member
功能: 返回有序集中,成员的分数值
返回值: member分数19> ZUNIONSTORE destination numkeys key [key ...]
功能: 计算给定的一个或多个有序集的并集,并存储在新的 key 中
返回值: 存储到新key的元素个数20> ZSCAN key cursor [MATCH pattern] [COUNT count]
功能: 迭代有序集合中的元素(包括元素成员和元素分值)
返回值: 元素列表21> ZPOPMAX/ZPOPMIN/BZPOPMAX/BZPOPMIN
一、zset 相关数据结构
zset 的实现,使用了 ziplist, zskiplist 和 dict 进行实现。
/* ZSETs use a specialized version of Skiplists */ typedef struct zskiplistNode { sds ele; double score; struct zskiplistNode *backward; struct zskiplistLevel { struct zskiplistNode *forward; unsigned int span; } level[]; } zskiplistNode; // 跳跃链表 typedef struct zskiplist { struct zskiplistNode *header, *tail; unsigned long length; int level; } zskiplist; // zset 主数据结构,dict + zskiplist typedef struct zset { dict *dict; zskiplist *zsl; } zset; // zset 在合适场景下,将先使用 ziplist 存储数据 typedef struct zlentry { unsigned int prevrawlensize, prevrawlen; unsigned int lensize, len; unsigned int headersize; unsigned char encoding; unsigned char *p; } zlentry;
二、zadd 添加成员操作
从添加实现中,我们可以完整领略数据结构的运用。
// 用法: ZADD key score1 member1 [score2 member2] // t_zset.c void zaddCommand(client *c) { // zadd 的多个参数变形, 使用 flags 进行区分复用 zaddGenericCommand(c,ZADD_NONE); } void zaddGenericCommand(client *c, int flags) { static char *nanerr = "resulting score is not a number (NaN)"; robj *key = c->argv[1]; robj *zobj; sds ele; double score = 0, *scores = NULL, curscore = 0.0; int j, elements; int scoreidx = 0; /* The following vars are used in order to track what the command actually * did during the execution, to reply to the client and to trigger the * notification of keyspace change. */ int added = 0; /* Number of new elements added. */ int updated = 0; /* Number of elements with updated score. */ int processed = 0; /* Number of elements processed, may remain zero with options like XX. */ /* Parse options. At the end 'scoreidx' is set to the argument position * of the score of the first score-element pair. */ // 从第三位置开始尝试解析特殊标识(用法规范) // 按位与到 flags 中 scoreidx = 2; while(scoreidx < c->argc) { char *opt = c->argv[scoreidx]->ptr; // NX: 不更新已存在的元素,只做添加操作 if (!strcasecmp(opt,"nx")) flags |= ZADD_NX; // XX: 只做更新操作,不做添加操作 else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX; // CH: 将返回值从添加的新元素数修改为已更改元素的总数。 更改的元素是第添加的新元素以及已为其更新分数的现有元素。 因此,命令行中指定的具有与过去相同分数的元素将不计算在内。 注意:通常,ZADD的返回值仅计算添加的新元素的数量。 else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH; // INCR: 使用指定元素增加指定分数, 与 ZINCRBY 类似,此场景下,只允许操作一个元素 else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR; else break; scoreidx++; } /* Turn options into simple to check vars. */ int incr = (flags & ZADD_INCR) != 0; int nx = (flags & ZADD_NX) != 0; int xx = (flags & ZADD_XX) != 0; int ch = (flags & ZADD_CH) != 0; /* After the options, we expect to have an even number of args, since * we expect any number of score-element pairs. */ // 把特殊标识去除后,剩下的参数列表应该2n数,即 score-element 一一配对的,否则语法错误 elements = c->argc-scoreidx; if (elements % 2) { addReply(c,shared.syntaxerr); return; } elements /= 2; /* Now this holds the number of score-element pairs. */ /* Check for incompatible options. */ // 互斥项 if (nx && xx) { addReplyError(c, "XX and NX options at the same time are not compatible"); return; } // 语法检查,INCR 只能针对1个元素操作 if (incr && elements > 1) { addReplyError(c, "INCR option supports a single increment-element pair"); return; } /* Start parsing all the scores, we need to emit any syntax error * before executing additions to the sorted set, as the command should * either execute fully or nothing at all. */ // 解析所有的 score 值为double类型,赋值到 scores 中 scores = zmalloc(sizeof(double)*elements); for (j = 0; j < elements; j++) { if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL) != C_OK) goto cleanup; } /* Lookup the key and create the sorted set if does not exist. */ // 语法检查 zobj = lookupKeyWrite(c->db,key); if (zobj == NULL) { if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */ // 创建原始key对象 // 默认 zset_max_ziplist_entries=OBJ_ZSET_MAX_ZIPLIST_ENTRIES: 128 // 默认 zset_max_ziplist_value=OBJ_ZSET_MAX_ZIPLIST_VALUE: 64 // 所以此处默认主要是检查 第1个member的长度是大于 64 if (server.zset_max_ziplist_entries == 0 || server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr)) { // 2. 通用情况使用 dict+quicklist 型的zset zobj = createZsetObject(); } else { // 1. 元素比较小的情况下创建 ziplist 型的 zset zobj = createZsetZiplistObject(); } // 将对象添加到db中,后续所有操作针对 zobj 操作即是对db的操作 (引用传递) dbAdd(c->db,key,zobj); } else { if (zobj->type != OBJ_ZSET) { addReply(c,shared.wrongtypeerr); goto cleanup; } } // 一个个元素循环添加 for (j = 0; j < elements; j++) { score = scores[j]; ele = c->argv[scoreidx+1+j*2]->ptr; // 分当前zobj的编码不同进行添加 (ziplist, skiplist) // 3. ZIPLIST 编码下的zset添加操作 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *eptr; // 3.1. 查找是否存在要添加的元素 (确定添加或更新) if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) { if (nx) continue; if (incr) { score += curscore; if (isnan(score)) { addReplyError(c,nanerr); goto cleanup; } } /* Remove and re-insert when score changed. */ if (score != curscore) { // 3.2. 元素更新操作,先删再插入 zobj->ptr = zzlDelete(zobj->ptr,eptr); zobj->ptr = zzlInsert(zobj->ptr,ele,score); server.dirty++; updated++; } processed++; } else if (!xx) { /* Optimize: check if the element is too large or the list * becomes too long *before* executing zzlInsert. */ zobj->ptr = zzlInsert(zobj->ptr,ele,score); // 5. 超过一条件后,做 ziplist->skiplist 转换 // 默认 元素个数>128, 当前元素>64 // 这两个判断不会重复吗?? 两个原因: 1. 转换函数内部会重新判定; 2. 下一次循环时不会再走当前逻辑; if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries) zsetConvert(zobj,OBJ_ENCODING_SKIPLIST); if (sdslen(ele) > server.zset_max_ziplist_value) zsetConvert(zobj,OBJ_ENCODING_SKIPLIST); server.dirty++; added++; processed++; } } // 4. skiplist 下的zset元素添加 else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplistNode *znode; dictEntry *de; // 判断ele是否已存在,使用hash查找,快速 de = dictFind(zs->dict,ele); if (de != NULL) { if (nx) continue; curscore = *(double*)dictGetVal(de); if (incr) { score += curscore; if (isnan(score)) { addReplyError(c,nanerr); /* Don't need to check if the sorted set is empty * because we know it has at least one element. */ goto cleanup; } } /* Remove and re-insert when score changes. */ // 先删再插入 skiplist if (score != curscore) { zskiplistNode *node; serverAssert(zslDelete(zs->zsl,curscore,ele,&node)); znode = zslInsert(zs->zsl,score,node->ele); /* We reused the node->ele SDS string, free the node now * since zslInsert created a new one. */ node->ele = NULL; zslFreeNode(node); /* Note that we did not removed the original element from * the hash table representing the sorted set, so we just * update the score. */ // 更新dict中的分数引用 dictGetVal(de) = &znode->score; /* Update score ptr. */ server.dirty++; updated++; } processed++; } else if (!xx) { ele = sdsdup(ele); znode = zslInsert(zs->zsl,score,ele); // 添加skiplist的同时,也往 dict 中添加一份数据,因为hash的查找永远是最快的 serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK); server.dirty++; added++; processed++; } } else { serverPanic("Unknown sorted set encoding"); } } reply_to_client: if (incr) { /* ZINCRBY or INCR option. */ if (processed) addReplyDouble(c,score); else addReply(c,shared.nullbulk); } else { /* ZADD. */ addReplyLongLong(c,ch ? added+updated : added); } cleanup: zfree(scores); if (added || updated) { signalModifiedKey(c->db,key); notifyKeyspaceEvent(NOTIFY_ZSET, incr ? "zincr" : "zadd", key, c->db->id); } } // 1. 元素比较小的情况下创建 ziplist 型的 zset // object.c, 创建ziplist 的zset robj *createZsetZiplistObject(void) { unsigned char *zl = ziplistNew(); robj *o = createObject(OBJ_ZSET,zl); o->encoding = OBJ_ENCODING_ZIPLIST; return o; } // 2. 创建通用的 zset 实例 // object.c robj *createZsetObject(void) { zset *zs = zmalloc(sizeof(*zs)); robj *o; // zsetDictType 稍有不同 zs->dict = dictCreate(&zsetDictType,NULL); // 首次遇到 skiplist, 咱去瞅瞅是如何创建的 zs->zsl = zslCreate(); o = createObject(OBJ_ZSET,zs); o->encoding = OBJ_ENCODING_SKIPLIST; return o; } // server.c, zset创建时使用的dict类型,与hash有不同 /* Sorted sets hash (note: a skiplist is used in addition to the hash table) */ dictType zsetDictType = { dictSdsHash, /* hash function */ NULL, /* key dup */ NULL, /* val dup */ dictSdsKeyCompare, /* key compare */ NULL, /* Note: SDS string shared & freed by skiplist */ NULL /* val destructor */ }; // 创建 skiplist 对象 /* Create a new skiplist. */ zskiplist *zslCreate(void) { int j; zskiplist *zsl; zsl = zmalloc(sizeof(*zsl)); zsl->level = 1; zsl->length = 0; // 创建header节点,ZSKIPLIST_MAXLEVEL 32 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL); // 初始化header for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) { zsl->header->level[j].forward = NULL; zsl->header->level[j].span = 0; } zsl->header->backward = NULL; zsl->tail = NULL; return zsl; } /* Create a skiplist node with the specified number of levels. * The SDS string 'ele' is referenced by the node after the call. */ zskiplistNode *zslCreateNode(int level, double score, sds ele) { zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel)); zn->score = score; zn->ele = ele; return zn; } // 3. ZIPLIST 编码下的zset添加操作 // 3.1. 查找是否存在要添加的元素 (确定添加或更新) // t_zset.c, 查找指定ele unsigned char *zzlFind(unsigned char *zl, sds ele, double *score) { unsigned char *eptr = ziplistIndex(zl,0), *sptr; // 遍历所有ziplist // 可见,此时的ziplist并没有表现出有序啊 while (eptr != NULL) { // eptr 相当于是 key // sptr 相当于score sptr = ziplistNext(zl,eptr); serverAssert(sptr != NULL); if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele))) { /* Matching element, pull out score. */ // 找到相应的 key 后,解析下一值,即 score if (score != NULL) *score = zzlGetScore(sptr); return eptr; } /* Move to next element. */ // 移动两次对象,才会到下一元素(因为存储是 key-score 相邻存储) eptr = ziplistNext(zl,sptr); } return NULL; } // t_zset.c, 获取元素的score double zzlGetScore(unsigned char *sptr) { unsigned char *vstr; unsigned int vlen; long long vlong; char buf[128]; double score; serverAssert(sptr != NULL); serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong)); // 带小数点不带小数点 if (vstr) { memcpy(buf,vstr,vlen); buf[vlen] = '