介绍有序集合(SortedSet)相关命令的实现,包括基本操作,比如zadd/zrem/zscan等,批量的操作(zrange/zremrange),以及集合相关的操作(交集zinterstore和并集zunionstore)。有序集合中,用到的关键数据结构是ziplist以及dict和skiplist,当服务器属性server.zset_max_ziplist_entries的值大于0且元素的member长度小于服务器属性server.zset_max_ziplist_value的值(默认为64)时,使用的是ziplist,否则使用的是dict和skiplist。
15.1 相关命令介绍
1.zadd命令
格式:zadd key [NX|XX] [CH] [INCR] score member [score member ...]
说明: 将一个或多个member元素及其分值score加入到有序集合对应的key当中。其中,分值score可以是整数值或双精度浮点数。
·XX:只更新已经存在的元素,不添加元素。
·NX:不更新已经存在的元素,总是添加新的元素。
·CH:将返回值从添加的新元素数量修改为更改的元素总数。
·INCR:当指定此选项时,zadd的行为与zincrby类似。
注意: 当key存在但对应的类型不是有序集时,会返回一个错误。如果某个member已经是有序集的成员,那么更新这个member的score值,并通过重新插入这个member元素,来保证该member在正确的位置上。
示例: zadd sortset 3 test
2.zrem命令
格式:zrem key member [member ...]
说明: 删除有序集合key中的一个或者多个member。
注意: 不存在的member将会被忽略;当key存在但不是有序集合时,会返回一个错误。
示例: zrem sortset t1
zrem sortset test
3.zcard命令
格式:zcard key
说明: 获取有序集合key中的基数。注意: 不存在的key,返回0。
示例: zcard sortset
4.zcount命令
格式:zcount key min max
说明: 返回有序集key中score值在[min,max]区间的成员的数量。
示例: zcount sortset 1 3
5.zincrby命令
格式:zincrby key increment member (自增/减值)
说明: 在有序集合key的member的分值上增加increment。
注意: increment可以是负数,相当于减去相应的值;当key不是有序集合时,会返回一个错误;当key不存在时,或者member不在key中时,等同于zadd key increment member。
示例: zincrby sortset 2 test
6.zrank命令
格式:zrank key member
说明: 按照分值从小到大返回有序集合成员member的排名(第几名),排名从0开始计算。
注意: 如果member不是有序集合key的成员,返回nil。
示例:zrank sortset test3
7.zrevrank命令
格式:zrevrank key member
说明: 跟zrank类似,唯一区别是按照从大到小返回member的排名。
示例: zrevrank sortset test3
8.zscore命令
格式:zscore key member
说明: 获取有序集合key中成员member的分值,返回值为字符串。
注意: 对于不存在的key或者member,返回为nil。
示例: zscore sortset test1
9.zscan命令
格式:zscan key cursor [MATCH pattern] [COUNT count]
说明: 迭代有序集合中的元素成员和分值,其中cursor是游标,MATCH中可以通过正则来匹配元素,count是返回的元素数量。
示例: zscan sortset 0
10.zrange命令
格式:zrange key start stop [WITHSCORES]
说明: 获取有序集合key中指定区间的成员,成员按照分值递增排序,如果分值相同,成员按照字典序排序。
注意: 超出范围的下标并不会引起错误。当start的值比有序集的最大下标还要大,或start>stop时,只是简单地返回一个空列表。start和stop支持使用负数下标,以-1表示最后一个成员,-2表示倒数第2个成员。如果stop参数的值比有序集的最大下标大,则返回最大下标来处理。
示例: zrange sortset 0 10
11.zrevrange命令
格式:zrevrange key start stop [WITHSCORES]
说明: 跟zrange相反,获取有序集合key中指定区间的成员,成员按照分值递减排序,如果分值相同,成员按照字典序排序。注意: 下标注意事项同zrange。
示例:127.0.0.1:6379> zrevrange sortset 0 10
12.zrangebyscore命令
格式:zrangebyscore key min max [WITHSCORES] [LIMIT offset count]
(在分值区间数据按score从小到大排列)
说明: 返回有序集key中,所有score值介于min和max之间(包括等于min或max)的成员。有序集成员按score值递增(从小到大)次序排列。具有相同score值的成员按字典序排列。
注意: 对于不存在的key或者member,返回为nil。
示例:127.0.0.1:6379> zrangebyscore sortset 1 3
13.zrevrangebyscore命令
格式:zrevrangebyscore key max min [WITHSCORES] [LIMIT offset count]
(在分值区间数据按score从大到小排列)
说明: 除了有序集合按score值递减之外,跟zrangebyscore完全一样。
示例:127.0.0.1:6379> zrevrangebyscore sortset 3 1
14.zrangebylex命令
格式:zrangebylex key min max [LIMIT offset count]
(在值区间数据按字典序排列)
说明: 返回给定的有序集合键key中值介于min和max之间的成员,根据成员的字典序排序。合法的min和max参数必须包含"("或者"[",其中"("表示开区间,"["表示闭区间。
示例:127.0.0.1:6379> zadd sortset 1 a 1 b 1 c 1 d 1 e 1 f
127.0.0.1:6379> zrangebylex sortset - (d
127.0.0.1:6379> zrangebylex sortset - [d
15.zlexcount命令
格式:zlexcount key min max (在值区间数据的成员数量)
说明: 返回给定的有序集合键key中值介于min和max之间的成员数量。合法的min和max参数必须包含"("或者"[",其中"("表示开区间,"["表示闭区间。
示例:127.0.0.1:6379> zlexcount sortset [b [d
16.zremrangebyrank命令
格式:zremrangebyrank key start stop
说明: 移除有序集key中指定排名区间内的所有成员。
示例: 127.0.0.1:6379> zremrangebyrank sortset 0 1
17.zremrangebyscore命令
格式:zremrangebyscore key min max (移除 score区间内的成员)
说明: 移除有序集key中所有score值介于[min,max]区间的成员。
示例: 127.0.0.1:6379> zremrangebyscore sortset 1 3
18.zremrangebylex命令
格式:zremrangebylex key min max (移除 值区间内的成员)
说明: 移除该集合中成员介于min和max范围内(字典序)的所有元素。
示例:127.0.0.1:6379>zremrangebylex sortset [b [d
19.zunionstore命令
格式:zunionstore destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX] (按统计方式并集计算到目标集合)
说明: 计算给定的一个或多个(数量由numkeys指定)有序集的并集,将结果存储到destination中。结果集中某个成员的score值默认是所有给定集下该成员score值之和。AGGREGATE选项可以指定并集的结果集的聚合方式,其中SUM表示score的和,MIN表示某个成员的最小score值,MAX表示某个成员的最大score值。WEIGHTS选项可以在使用聚合函数时为每个有序集分别指定一个乘法因子。
示例:127.0.0.1:6379> zadd setA 93 ZhangSan
127.0.0.1:6379> zadd setA 88 LiSi
127.0.0.1:6379> zadd setA 79 WangWu
127.0.0.1:6379> zadd setA 100 ChenLiu
127.0.0.1:6379> zadd setB 90 LiSi
127.0.0.1:6379> zadd setB 60 WangWu
127.0.0.1:6379> zunionstore setC 2 setA setB WEIGHTS 1 1 AGGREGATE MIN
127.0.0.1:6379> zrange setC 0 -1 WITHSCORES
20.zinterstore命令
格式:zinterstore destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX] (按统计方式交集计算到目标集合)
说明: 跟zunionstore类似,唯一的区别是本命令用来求交集。
示例:127.0.0.1:6379> ZINTERSTORE setD 2 setA setB WEIGHTS 1 1 AGGREGATE MIN
127.0.0.1:6379> ZRANGE setD 0 -1 WITHSCORES
15.2 基本操作
上面讲解的命令可分成基础操作、批量操作和集合计算,其中添加成员、删除成员、基数统计、数量计算、计数器、获取排名、获取分值和遍历划分到基础操作中;
15.2.1 添加成员
zadd对应的函数是zaddCommand,实际调用的是zaddGeneric-Command,
zaddGenericCommand函数还用在了zincrby命令上,唯一的区别是第2个参数,zadd-Command是ZADD_NONE,zincrbyCommand是ZADD_INCR,这两个宏的定义如下:
/* 输入的标记位 */
#define ZADD_NONE 0
#define ZADD_INCR (1<<0) /* 表示对score进行增加 */
#define ZADD_NX (1<<1) /* 不更新已经存在的元素,总是添加新的元素,与参数NX对应 */
#define ZADD_XX (1<<2) /* 只更新已经存在的元素,不添加元素,跟参数XX对应 */
#define ZADD_CH (1<<16) /* 只用在了ZADD中,返回值为更改的元素总数,与CH对应*/
在函数zaddGenericCommand中,主要分为以下5步。
void zaddGenericCommand(client *c, int flags) {
static char *nanerr = "resulting score is not a number (NaN)";
robj *key = c->argv[1];
robj *zobj;
sds ele;
double score = 0, *scores = NULL;
int j, elements;
int scoreidx = 0;
int added = 0; /* 新加元素数量 */
int updated = 0; /* 新加secore元素数量 */
int processed = 0; /* 处理元素数量*/
/* 1)解析参数,主要是对NX/XX/CH参数的解析,并使用"ZADD_*"的宏来标记*/
scoreidx = 2;
while(scoreidx < c->argc) {
char *opt = c->argv[scoreidx]->ptr;
if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
else break;
scoreidx++;
}
...
// 2)在redisDB中查找key是否存在,调用函数lookupKeyWrite,实质上是根据key在dict里面查找,代码如下:
zobj = lookupKeyWrite(c->db,key);
//3)如果key存在,则需要判断一下是不是有序集合;如果key不存在,那么需要将key插入到db中
if (zobj == NULL) {
if (xx) goto reply_to_client; /*设置了XX参数就不处理了,因为XX命令代表只更新已经存在的元素*/
/同时需要初始化底层数据结构,这时候需要和参数zset_max_ziplist_value比较大小:
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
{ //大于server.zset_max_ziplist_value,初始化dict和skiplist
zobj = createZsetObject();
} else { //小于server.zset_max_ziplist_value,初始化ziplist
zobj = createZsetZiplistObject();
}
dbAdd(c->db,key,zobj);
} else {
if (zobj->type != OBJ_ZSET) {
addReply(c,shared.wrongtypeerr);
goto cleanup;
}
}
//4)循环遍历elements,将element和score存入到ziplist或者skiplist中,调用函数zsetAdd:
for (j = 0; j < elements; j++) {
double newscore;
score = scores[j];
int retflags = flags;
ele = c->argv[scoreidx+1+j*2]->ptr;
int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
...
}
server.dirty += (added+updated);
//5)调用"addReply*"函数返回给客户端
reply_to_client:
if (incr) { /* ZINCRBY or INCR option. */
if (processed)
addReplyDouble(c,score);
else
addReplyNull(c);
} else { /* ZADD. */
addReplyLongLong(c,ch ? added+updated : added);
}
cleanup:
zfree(scores);
if (added || updated) {
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_ZSET,
incr ? "zincr" : "zadd", key, c->db->id);
}
}
zsetAdd函数: (redis 6)
根据zobj->encoding分别对ziplist和dict&&skiplist进行处理:
int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
....
/* 对ziplist处理 */
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *eptr;
//如果元素存在,则更新
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
/* score改变后则更新 */
if (score != curscore) {
zobj->ptr = zzlDelete(zobj->ptr,eptr); //删除后再重新插入
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
*flags |= ZADD_UPDATED;
}
return 1;
} else if (!xx) {
/* 如果不存在,插入时需要注意:长度超zset_max_ziplist_entries,需要变换skiplis方式 */
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||
sdslen(ele) > server.zset_max_ziplist_value)
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
if (newscore) *newscore = score;
*flags |= ZADD_ADDED;
return 1;
} else {
*flags |= ZADD_NOP;
return 1;
}
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { /* 对SKIPLIST处理 */
...
//先去dict中查找元素
de = dictFind(zs->dict,ele);
if (de != NULL) {
/* NX? Return, same element already exists. */
if (nx) {
*flags |= ZADD_NOP;
return 1;
}
curscore = *(double*)dictGetVal(de);
/* Prepare the score for the increment if needed. */
if (incr) {
score += curscore;
if (isnan(score)) {
*flags |= ZADD_NAN;
return 0;
}
if (newscore) *newscore = score;
}
/* Remove and re-insert when score changes. */
if (score != curscore) {
znode = zslUpdateScore(zs->zsl,curscore,ele,score);
/* 不移除原来的元素,而是更新score*/
dictGetVal(de) = &znode->score; /* Update score ptr. */
*flags |= ZADD_UPDATED;
}
return 1;
} else if (!xx) { //否则直接插入
ele = sdsdup(ele);
znode = zslInsert(zs->zsl,score,ele);
serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
*flags |= ZADD_ADDED;
if (newscore) *newscore = score;
return 1;
} else {
*flags |= ZADD_NOP;
return 1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
return 0; /* Never reached. */
}
15.2.2 删除成员
命令zrem调用函数zremCommand,该函数主要做了3件事。
void zremCommand(client *c) {
if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
checkType(c,zobj,OBJ_ZSET)) return;
for (j = 2; j < c->argc; j++) {
// 1)调用zsetDel删除元素,分别对ziplist编码类型和skiplist编码类型区别:
if (zsetDel(zobj,c->argv[j]->ptr)) deleted++;
// 2)如果一个key中的元素全部删除了,那么删除db里面的key:
if (zsetLength(zobj) == 0) { //计算有序集合的长度,具体见zcard命令
dbDelete(c->db,key);
keyremoved = 1;
break;
}
}
//3)广播删除事件并返回给客户端:
if (deleted) {
notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id);
if (keyremoved)
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
signalModifiedKey(c,c->db,key);
server.dirty += deleted;
}
addReplyLongLong(c,deleted);
}
分别对ziplist编码类型和skiplist编码类型区别:
int zsetDel(robj *zobj, sds ele) {
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *eptr;
//对于ziplist编码类型,直接从ziplist中删除
if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
return 1;
}
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
//对于skiplist编码类型,先从dict里面删除,然后从skiplist中删除
de = dictUnlink(zs->dict,ele);
if (de != NULL) {
score = *(double*)dictGetVal(de);
dictFreeUnlinkedEntry(zs->dict,de);
/* Delete from skiplist. */
int retval = zslDelete(zs->zsl,score,ele,NULL);
}
} else { ... }
return 0; /* No such element found. */
}
15.2.3 基数统计
命令zcard函数是zcardCommand,
调用zsetLength函数计算有序集合的长度,代码如下:
void zcardCommand(client *c) {
robj *key = c->argv[1];
robj *zobj;
if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
checkType(c,zobj,OBJ_ZSET)) return;
//通过函数zsetLength计算有序集合中基数的值
addReplyLongLong(c,zsetLength(zobj));
}
在函数zsetLength中,会根据存储的类型,分别计算length,
对于ziplist,ziplistLen计算长度,因为ziplist中存储key和value的对,基数值是ziplistLen()/2,
如果底层存储使用的是skiplist,直接取skiplist里面的length即可;
unsigned long zsetLength(const robj *zobj) {
unsigned long length = 0;
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
length = zzlLength(zobj->ptr); //return ziplistLen(zl)/2;
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
length = ((const zset*)zobj->ptr)->zsl->length;
} else {
serverPanic("Unknown sorted set encoding");
}
return length;
}
15.2.4 数量计算
命令zcount函数是zcountCommand,zcount命令会先找到区间的起始位置min,然后遍历找到结束位置max。针对ziplist和skiplist做了区分,
对于ziplist的处理,首先通过zzlFirstInRange函数找到min位置,
void zcountCommand(client *c) {
robj *key = c->argv[1];
robj *zobj;
zrangespec range;
unsigned long count = 0;
/* 解析参数 */
if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) {
addReplyError(c,"min or max is not a float");
return;
}
/* 查询set */
if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
checkType(c, zobj, OBJ_ZSET)) return;
// ziplist的处理
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
double score;
/* 首先通过zzlFirstInRange函数找到min位置 */
eptr = zzlFirstInRange(zl,&range);
/* 没有第一个元素*/
if (eptr == NULL) {
addReply(c, shared.czero);
return;
}
/*第一个元素在范围内 */
sptr = ziplistNext(zl,eptr);
score = zzlGetScore(sptr);
serverAssertWithInfo(c,zobj,zslValueLteMax(score,&range));
/*范围内迭代元素 */
while (eptr) {
score = zzlGetScore(sptr);
/* 不在范围内则停止. */
if (!zslValueLteMax(score,&range)) {
break;
} else {
count++;
zzlNext(zl,&eptr,&sptr); //遍历对count进行+1,计算区间内的成员数量
}
}
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { //使用skiplist存储
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *zn;
unsigned long rank;
zn = zslFirstInRange(zsl, &range); //获取区间的开始位置
if (zn != NULL) {
rank = zslGetRank(zsl, zn->score, zn->ele); //得到rank值
count = (zsl->length - (rank - 1));
/* 获取区间的结束位置 */
zn = zslLastInRange(zsl, &range);
if (zn != NULL) {
rank = zslGetRank(zsl, zn->score, zn->ele);
count -= (zsl->length - rank);
}
}
} else { ... }
addReplyLongLong(c, count);
}
判断闭区间和开区间函数zslParseRange,使用到一个数据结构叫做zrangespec;
typedef struct {
double min, max;
int minex, maxex; /* 来标记是闭区间还是开区间*/*
} zrangespec;
static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
char *eptr;
spec->minex = spec->maxex = 0;
if (min->encoding == OBJ_ENCODING_INT) {
spec->min = (long)min->ptr;
} else {
if (((char*)min->ptr)[0] == '(') { //开区间使用"("来表示
spec->min = strtod((char*)min->ptr+1,&eptr);
if (eptr[0] != '