zoukankan      html  css  js  c++  java
  • Redis5设计与源码分析 (第15章 有序集合相关命令的实现)

    介绍有序集合(SortedSet)相关命令的实现,包括基本操作,比如zadd/zrem/zscan等,批量的操作(zrange/zremrange),以及集合相关的操作(交集zinterstore和并集zunionstore)。有序集合中,用到的关键数据结构是ziplist以及dictskiplist,当服务器属性server.zset_max_ziplist_entries的值大于0且元素的member长度小于服务器属性server.zset_max_ziplist_value的值(默认为64)时,使用的是ziplist,否则使用的是dict和skiplist。

    15.1 相关命令介绍

    1.zadd命令

    格式:zadd key [NX|XX] [CH] [INCR] score member [score member ...]

    说明: 将一个或多个member元素及其分值score加入到有序集合对应的key当中。其中,分值score可以是整数值或双精度浮点数。

    ·XX:只更新已经存在的元素,不添加元素。

    ·NX:不更新已经存在的元素,总是添加新的元素。

    ·CH:将返回值从添加的新元素数量修改为更改的元素总数。

    ·INCR:当指定此选项时,zadd的行为与zincrby类似。

    注意: 当key存在但对应的类型不是有序集时,会返回一个错误。如果某个member已经是有序集的成员,那么更新这个member的score值,并通过重新插入这个member元素,来保证该member在正确的位置上。

    示例: zadd sortset 3 test

    2.zrem命令

    格式:zrem key member [member ...]

    说明: 删除有序集合key中的一个或者多个member。

    注意: 不存在的member将会被忽略;当key存在但不是有序集合时,会返回一个错误。

    示例: zrem sortset t1

    zrem sortset test

    3.zcard命令

    格式:zcard key

    说明: 获取有序集合key中的基数。注意: 不存在的key,返回0。

    示例: zcard sortset

    4.zcount命令

    格式:zcount key min max

    说明: 返回有序集key中score值在[min,max]区间的成员的数量。

    示例: zcount sortset 1 3

    5.zincrby命令

    格式:zincrby key increment member (自增/减值)

    说明: 在有序集合key的member的分值上增加increment

    注意: increment可以是负数,相当于减去相应的值;当key不是有序集合时,会返回一个错误;当key不存在时,或者member不在key中时,等同于zadd key increment member。

    示例: zincrby sortset 2 test

    6.zrank命令

    格式:zrank key member

    说明: 按照分值从小到大返回有序集合成员member的排名(第几名),排名从0开始计算。

    注意: 如果member不是有序集合key的成员,返回nil。

    示例:zrank sortset test3

    7.zrevrank命令

    格式:zrevrank key member

    说明: 跟zrank类似,唯一区别是按照从大到小返回member的排名。

    示例: zrevrank sortset test3

    8.zscore命令

    格式:zscore key member

    说明: 获取有序集合key中成员member的分值,返回值为字符串。

    注意: 对于不存在的key或者member,返回为nil。

    示例: zscore sortset test1

    9.zscan命令

    格式:zscan key cursor [MATCH pattern] [COUNT count]

    说明: 迭代有序集合中的元素成员和分值,其中cursor是游标,MATCH中可以通过正则来匹配元素,count是返回的元素数量。

    示例: zscan sortset 0

    10.zrange命令

    格式:zrange key start stop [WITHSCORES]

    说明: 获取有序集合key中指定区间的成员,成员按照分值递增排序,如果分值相同,成员按照字典序排序。

    注意: 超出范围的下标并不会引起错误。当start的值比有序集的最大下标还要大,或start>stop时,只是简单地返回一个空列表。start和stop支持使用负数下标,以-1表示最后一个成员,-2表示倒数第2个成员。如果stop参数的值比有序集的最大下标大,则返回最大下标来处理。

    示例: zrange sortset 0 10

    11.zrevrange命令

    格式:zrevrange key start stop [WITHSCORES]

    说明: 跟zrange相反,获取有序集合key中指定区间的成员,成员按照分值递减排序,如果分值相同,成员按照字典序排序。注意: 下标注意事项同zrange。

    示例:127.0.0.1:6379> zrevrange sortset 0 10

    12.zrangebyscore命令

    格式:zrangebyscore key min max [WITHSCORES] [LIMIT offset count]

    (在分值区间数据按score从小到大排列)

    说明: 返回有序集key中,所有score值介于min和max之间(包括等于min或max)的成员。有序集成员按score值递增(从小到大)次序排列。具有相同score值的成员按字典序排列。

    注意: 对于不存在的key或者member,返回为nil。

    示例:127.0.0.1:6379> zrangebyscore sortset 1 3

    13.zrevrangebyscore命令

    格式:zrevrangebyscore key max min [WITHSCORES] [LIMIT offset count]

    (在分值区间数据按score从大到小排列)

    说明: 除了有序集合按score值递减之外,跟zrangebyscore完全一样。

    示例:127.0.0.1:6379> zrevrangebyscore sortset 3 1

    14.zrangebylex命令

    格式:zrangebylex key min max [LIMIT offset count]

    (在值区间数据按字典序排列)

    说明: 返回给定的有序集合键key中介于min和max之间的成员,根据成员的字典序排序。合法的min和max参数必须包含"("或者"[",其中"("表示开区间,"["表示闭区间。

    示例:127.0.0.1:6379> zadd sortset 1 a 1 b 1 c 1 d 1 e 1 f

    127.0.0.1:6379> zrangebylex sortset - (d

    127.0.0.1:6379> zrangebylex sortset - [d

    15.zlexcount命令

    格式:zlexcount key min max (在值区间数据的成员数量)

    说明: 返回给定的有序集合键key中值介于min和max之间的成员数量。合法的min和max参数必须包含"("或者"[",其中"("表示开区间,"["表示闭区间。

    示例:127.0.0.1:6379> zlexcount sortset [b [d

    16.zremrangebyrank命令

    格式:zremrangebyrank key start stop

    说明: 移除有序集key中指定排名区间内的所有成员

    示例: 127.0.0.1:6379> zremrangebyrank sortset 0 1

    17.zremrangebyscore命令

    格式:zremrangebyscore key min max (移除 score区间内的成员)

    说明: 移除有序集key中所有score值介于[min,max]区间的成员

    示例: 127.0.0.1:6379> zremrangebyscore sortset 1 3

    18.zremrangebylex命令

    格式:zremrangebylex key min max (移除 值区间内的成员)

    说明: 移除该集合中成员介于min和max范围内(字典序)的所有元素。

    示例:127.0.0.1:6379>zremrangebylex sortset [b [d

    19.zunionstore命令

    格式:zunionstore destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX] (按统计方式并集计算到目标集合)

    说明: 计算给定的一个或多个(数量由numkeys指定)有序集的并集,将结果存储到destination中。结果集中某个成员的score值默认是所有给定集下该成员score值之和。AGGREGATE选项可以指定并集的结果集的聚合方式,其中SUM表示score的和,MIN表示某个成员的最小score值,MAX表示某个成员的最大score值。WEIGHTS选项可以在使用聚合函数时为每个有序集分别指定一个乘法因子。

    示例:127.0.0.1:6379> zadd setA 93 ZhangSan

    127.0.0.1:6379> zadd setA 88 LiSi

    127.0.0.1:6379> zadd setA 79 WangWu

    127.0.0.1:6379> zadd setA 100 ChenLiu

    127.0.0.1:6379> zadd setB 90 LiSi

    127.0.0.1:6379> zadd setB 60 WangWu

    127.0.0.1:6379> zunionstore setC 2 setA setB WEIGHTS 1 1 AGGREGATE MIN

    127.0.0.1:6379> zrange setC 0 -1 WITHSCORES

    20.zinterstore命令

    格式:zinterstore destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX] (按统计方式交集计算到目标集合)

    说明: 跟zunionstore类似,唯一的区别是本命令用来求交集。

    示例:127.0.0.1:6379> ZINTERSTORE setD 2 setA setB WEIGHTS 1 1 AGGREGATE MIN

    127.0.0.1:6379> ZRANGE setD 0 -1 WITHSCORES

    15.2 基本操作

    上面讲解的命令可分成基础操作、批量操作和集合计算,其中添加成员、删除成员、基数统计、数量计算、计数器、获取排名、获取分值和遍历划分到基础操作中;

    15.2.1 添加成员

    zadd对应的函数是zaddCommand,实际调用的是zaddGeneric-Command,

    zaddGenericCommand函数还用在了zincrby命令上,唯一的区别是第2个参数,zadd-Command是ZADD_NONE,zincrbyCommand是ZADD_INCR,这两个宏的定义如下:

    /* 输入的标记位 */

    #define ZADD_NONE 0

    #define ZADD_INCR (1<<0) /* 表示对score进行增加 */

    #define ZADD_NX (1<<1) /* 不更新已经存在的元素,总是添加新的元素,与参数NX对应 */

    #define ZADD_XX (1<<2) /* 只更新已经存在的元素,不添加元素,跟参数XX对应 */

    #define ZADD_CH (1<<16) /* 只用在了ZADD中,返回值为更改的元素总数,与CH对应*/

    在函数zaddGenericCommand中,主要分为以下5步。

    void zaddGenericCommand(client *c, int flags) {

    static char *nanerr = "resulting score is not a number (NaN)";

    robj *key = c->argv[1];

    robj *zobj;

    sds ele;

    double score = 0, *scores = NULL;

    int j, elements;

    int scoreidx = 0;

    int added = 0; /* 新加元素数量 */

    int updated = 0; /* 新加secore元素数量 */

    int processed = 0; /* 处理元素数量*/

    /* 1)解析参数,主要是对NX/XX/CH参数的解析,并使用"ZADD_*"的宏来标记*/

    scoreidx = 2;

    while(scoreidx < c->argc) {

    char *opt = c->argv[scoreidx]->ptr;

    if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;

    else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;

    else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;

    else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;

    else break;

    scoreidx++;

    }

    ...

    // 2)在redisDB中查找key是否存在,调用函数lookupKeyWrite,实质上是根据key在dict里面查找,代码如下:

    zobj = lookupKeyWrite(c->db,key);

    //3)如果key存在,则需要判断一下是不是有序集合;如果key不存在,那么需要将key插入到db中

    if (zobj == NULL) {

    if (xx) goto reply_to_client; /*设置了XX参数就不处理了,因为XX命令代表只更新已经存在的元素*/

    /同时需要初始化底层数据结构,这时候需要和参数zset_max_ziplist_value比较大小:

    if (server.zset_max_ziplist_entries == 0 ||

    server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))

    { //大于server.zset_max_ziplist_value,初始化dict和skiplist

    zobj = createZsetObject();

    } else { //小于server.zset_max_ziplist_value,初始化ziplist

    zobj = createZsetZiplistObject();

    }

    dbAdd(c->db,key,zobj);

    } else {

    if (zobj->type != OBJ_ZSET) {

    addReply(c,shared.wrongtypeerr);

    goto cleanup;

    }

    }

    //4)循环遍历elements,将element和score存入到ziplist或者skiplist中,调用函数zsetAdd:

    for (j = 0; j < elements; j++) {

    double newscore;

    score = scores[j];

    int retflags = flags;

    ele = c->argv[scoreidx+1+j*2]->ptr;

    int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);

    ...

    }

    server.dirty += (added+updated);

    //5)调用"addReply*"函数返回给客户端

    reply_to_client:

    if (incr) { /* ZINCRBY or INCR option. */

    if (processed)

    addReplyDouble(c,score);

    else

    addReplyNull(c);

    } else { /* ZADD. */

    addReplyLongLong(c,ch ? added+updated : added);

    }

     

    cleanup:

    zfree(scores);

    if (added || updated) {

    signalModifiedKey(c,c->db,key);

    notifyKeyspaceEvent(NOTIFY_ZSET,

    incr ? "zincr" : "zadd", key, c->db->id);

    }

    }

    zsetAdd函数: (redis 6)

    根据zobj->encoding分别对ziplist和dict&&skiplist进行处理:

    int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {

    ....

    /* 对ziplist处理 */

    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {

    unsigned char *eptr;

    //如果元素存在,则更新

    if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {

    /* score改变后则更新 */

    if (score != curscore) {

    zobj->ptr = zzlDelete(zobj->ptr,eptr); //删除后再重新插入

    zobj->ptr = zzlInsert(zobj->ptr,ele,score);

    *flags |= ZADD_UPDATED;

    }

    return 1;

    } else if (!xx) {

    /* 如果不存在,插入时需要注意:长度超zset_max_ziplist_entries,需要变换skiplis方式 */

    zobj->ptr = zzlInsert(zobj->ptr,ele,score);

    if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||

    sdslen(ele) > server.zset_max_ziplist_value)

    zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);

    if (newscore) *newscore = score;

    *flags |= ZADD_ADDED;

    return 1;

    } else {

    *flags |= ZADD_NOP;

    return 1;

    }

    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { /* 对SKIPLIST处理 */

    ...

        //先去dict中查找元素

    de = dictFind(zs->dict,ele);

    if (de != NULL) {

    /* NX? Return, same element already exists. */

    if (nx) {

    *flags |= ZADD_NOP;

    return 1;

    }

    curscore = *(double*)dictGetVal(de);

    /* Prepare the score for the increment if needed. */

    if (incr) {

    score += curscore;

    if (isnan(score)) {

    *flags |= ZADD_NAN;

    return 0;

    }

    if (newscore) *newscore = score;

    }

    /* Remove and re-insert when score changes. */

    if (score != curscore) {

    znode = zslUpdateScore(zs->zsl,curscore,ele,score);

    /* 不移除原来的元素,而是更新score*/

    dictGetVal(de) = &znode->score; /* Update score ptr. */

    *flags |= ZADD_UPDATED;

    }

    return 1;

    } else if (!xx) { //否则直接插入

    ele = sdsdup(ele);

    znode = zslInsert(zs->zsl,score,ele);

    serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);

    *flags |= ZADD_ADDED;

    if (newscore) *newscore = score;

    return 1;

    } else {

    *flags |= ZADD_NOP;

    return 1;

    }

    } else {

    serverPanic("Unknown sorted set encoding");

    }

    return 0; /* Never reached. */

    }

    15.2.2 删除成员

    命令zrem调用函数zremCommand,该函数主要做了3件事。

    void zremCommand(client *c) {

    if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||

    checkType(c,zobj,OBJ_ZSET)) return;

    for (j = 2; j < c->argc; j++) {

    // 1)调用zsetDel删除元素,分别对ziplist编码类型和skiplist编码类型区别:

    if (zsetDel(zobj,c->argv[j]->ptr)) deleted++;

    // 2)如果一个key中的元素全部删除了,那么删除db里面的key:

    if (zsetLength(zobj) == 0) { //计算有序集合的长度,具体见zcard命令

    dbDelete(c->db,key);

    keyremoved = 1;

    break;

    }

    }

    //3)广播删除事件并返回给客户端:

    if (deleted) {

    notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id);

    if (keyremoved)

    notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);

    signalModifiedKey(c,c->db,key);

    server.dirty += deleted;

    }

    addReplyLongLong(c,deleted);

    }

    分别对ziplist编码类型和skiplist编码类型区别:

    int zsetDel(robj *zobj, sds ele) {

    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {

    unsigned char *eptr;

    //对于ziplist编码类型,直接从ziplist中删除

    if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {

    zobj->ptr = zzlDelete(zobj->ptr,eptr);

    return 1;

    }

    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {

    //对于skiplist编码类型,先从dict里面删除,然后从skiplist中删除

    de = dictUnlink(zs->dict,ele);

    if (de != NULL) {

    score = *(double*)dictGetVal(de);

    dictFreeUnlinkedEntry(zs->dict,de);

    /* Delete from skiplist. */

    int retval = zslDelete(zs->zsl,score,ele,NULL);

    }

    } else { ... }

    return 0; /* No such element found. */

    }

    15.2.3 基数统计

    命令zcard函数是zcardCommand

    调用zsetLength函数计算有序集合的长度,代码如下:

    void zcardCommand(client *c) {

    robj *key = c->argv[1];

    robj *zobj;

    if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||

    checkType(c,zobj,OBJ_ZSET)) return;

    //通过函数zsetLength计算有序集合中基数的值

    addReplyLongLong(c,zsetLength(zobj));

    }

    在函数zsetLength中,会根据存储的类型,分别计算length,

    对于ziplist,ziplistLen计算长度,因为ziplist中存储key和value的对,基数值是ziplistLen()/2,

    如果底层存储使用的是skiplist,直接取skiplist里面的length即可;

    unsigned long zsetLength(const robj *zobj) {

    unsigned long length = 0;

    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {

    length = zzlLength(zobj->ptr); //return ziplistLen(zl)/2;

    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {

    length = ((const zset*)zobj->ptr)->zsl->length;

    } else {

    serverPanic("Unknown sorted set encoding");

    }

    return length;

    }

    15.2.4 数量计算

    命令zcount函数是zcountCommand,zcount命令会先找到区间的起始位置min,然后遍历找到结束位置max。针对ziplist和skiplist做了区分,

    对于ziplist的处理,首先通过zzlFirstInRange函数找到min位置,

    void zcountCommand(client *c) {

    robj *key = c->argv[1];

    robj *zobj;

    zrangespec range;

    unsigned long count = 0;

    /* 解析参数 */

    if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) {

    addReplyError(c,"min or max is not a float");

    return;

    }

    /* 查询set */

    if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||

    checkType(c, zobj, OBJ_ZSET)) return;

    // ziplist的处理

    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {

    unsigned char *zl = zobj->ptr;

    unsigned char *eptr, *sptr;

    double score;

     

    /* 首先通过zzlFirstInRange函数找到min位置 */

    eptr = zzlFirstInRange(zl,&range);

    /* 没有第一个元素*/

    if (eptr == NULL) {

    addReply(c, shared.czero);

    return;

    }

    /*第一个元素在范围内 */

    sptr = ziplistNext(zl,eptr);

    score = zzlGetScore(sptr);

    serverAssertWithInfo(c,zobj,zslValueLteMax(score,&range));

    /*范围内迭代元素 */

    while (eptr) {

    score = zzlGetScore(sptr);

    /* 不在范围内则停止. */

    if (!zslValueLteMax(score,&range)) {

    break;

    } else {

    count++;

    zzlNext(zl,&eptr,&sptr); //遍历对count进行+1,计算区间内的成员数量

    }

    }

    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { //使用skiplist存储

    zset *zs = zobj->ptr;

    zskiplist *zsl = zs->zsl;

    zskiplistNode *zn;

    unsigned long rank;

    zn = zslFirstInRange(zsl, &range); //获取区间的开始位置

    if (zn != NULL) {

    rank = zslGetRank(zsl, zn->score, zn->ele); //得到rank值

    count = (zsl->length - (rank - 1));

    /* 获取区间的结束位置 */

    zn = zslLastInRange(zsl, &range);

    if (zn != NULL) {

    rank = zslGetRank(zsl, zn->score, zn->ele);

    count -= (zsl->length - rank);

    }

    }

    } else { ... }

    addReplyLongLong(c, count);

    }

     

    判断闭区间和开区间函数zslParseRange,使用到一个数据结构叫做zrangespec;

    typedef struct {

    double min, max;

    int minex, maxex; /* 来标记是闭区间还是开区间*/*

    } zrangespec;

     

    static int zslParseRange(robj *min, robj *max, zrangespec *spec) {

    char *eptr;

    spec->minex = spec->maxex = 0;

    if (min->encoding == OBJ_ENCODING_INT) {

    spec->min = (long)min->ptr;

    } else {

    if (((char*)min->ptr)[0] == '(') { //开区间使用"("来表示

    spec->min = strtod((char*)min->ptr+1,&eptr);

    if (eptr[0] != '' || isnan(spec->min)) return C_ERR;

    spec->minex = 1;

    } else { //默认是闭区间

    spec->min = strtod((char*)min->ptr,&eptr);

    if (eptr[0] != '' || isnan(spec->min)) return C_ERR;

    }

    }

    .... //max类似上面min逻辑

    return C_OK;

    }

    //========================ziplist情况用到函数==================

    zzlFirstInRange函数: 遍历找到min的位置;

    unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec *range) {

    unsigned char *eptr = ziplistIndex(zl,0), *sptr;

        ...

    while (eptr != NULL) {

    sptr = ziplistNext(zl,eptr);

    score = zzlGetScore(sptr);

    if (zslValueGteMin(score,range)) { ... } //获取min的位置 ,开区间用>,闭区间用>=

    // zslValueGteMin : return spec->minex ? (value > spec->min) : (value >= spec->min);

    //继续遍历.

    eptr = ziplistNext(zl,sptr);

    }

    }

    //========================skiplist情况用到函数==================

    zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {

    zskiplistNode *x;

    int i; ...

    x = zsl->header;

    for (i = zsl->level-1; i >= 0; i--) { //遍历层数

    /* Go forward while *OUT* of range. */

    while (x->level[i].forward &&

    !zslValueGteMin(x->level[i].forward->score,range))

    x = x->level[i].forward;

    }

    /* This is an inner range, so the next node cannot be NULL. */

    x = x->level[0].forward;

    serverAssert(x != NULL);

    /* Check if score <= max. */

    if (!zslValueLteMax(x->score,range)) return NULL;

    return x;

    }

     

    15.2.5 计数器(增/减)

    命令zincrby对应函数zincrbyCommand,该函数调用的是zaddGenericCommand,代码如下:

    跟zadd不同的地方是flag为ZADD_INCR。

    15.2.6 获取排名

    命令zrank对应函数zrankCommand,原理很简单,底层不论是用的ziplist还是skiplist,因为本身两种结构都是有序的,遍历找到对应的成员,即可知成员的排名。

    void zrankGenericCommand(client *c, int reverse) {

    rank = zsetRank(zobj,ele->ptr,reverse);

    if (rank >= 0) { addReplyLongLong(c,rank);

    } else { addReplyNull(c); }

    }

    long zsetRank(robj *zobj, sds ele, int reverse) {

    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {

    eptr = ziplistIndex(zl,0);

    rank = 1;

    while(eptr != NULL) { //遍历找到对应的成员

    if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele)))

    break;

    rank++; //排名增加

    zzlNext(zl,&eptr,&sptr);

    }

    ...

    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {

    zset *zs = zobj->ptr;

    zskiplist *zsl = zs->zsl;

    dictEntry *de;

    de = dictFind(zs->dict,ele);

    if (de != NULL) {

    rank = zslGetRank(zsl,score,ele); //zslGetRank内部遍历

    }

    }

    15.2.7 获取分值

    命令zscore是用来获取分值的,基于ziplist的存储只需要遍历找到对应成员,取出score即可。

    对于底层使用skiplist的就更简单了,直接去dict里面取score即可,时间复杂度非常低。

    int zsetScore(robj *zobj, sds member, double *score) {

    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {

    if (zzlFind(zobj->ptr, member, score) == NULL) return C_ERR;

    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {

    zset *zs = zobj->ptr;

    dictEntry *de = dictFind(zs->dict, member);

    if (de == NULL) return C_ERR;

    *score = *(double*)dictGetVal(de);

    }

    }

    unsigned char *zzlFind(unsigned char *zl, sds ele, double *score) {

    unsigned char *eptr = ziplistIndex(zl,0), *sptr;

    while (eptr != NULL) {

    sptr = ziplistNext(zl,eptr);

    if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele))) {

    if (score != NULL) *score = zzlGetScore(sptr);

    return eptr;

    }

    eptr = ziplistNext(zl,sptr); //下一个

    }

    return NULL;

    }

    15.2.8 遍历

    命令zscan跟scan类似,调用的都是scanGenericCommand,;

    15.3 批量操作

    15.3.1 范围查找

    支持范围查找、包括按排名查找、按分值查找以及按照字典顺序查找等。

    1. 排名查找(zrange/zrevrange)

    zrange命令对应的函数为zrangeCommand,其核心步骤有以下3步。

    void zrangeGenericCommand(client *c, int reverse) {

    robj *key = c->argv[1];

    ...

    /* 1)start和end可以是负数,首先计算实际的start和end,获取真正的区间. */

    llen = zsetLength(zobj); //计算有序集合的长度

    if (start < 0) start = llen+start; //如果start小于0,则修改start为"llen+start"

    if (end < 0) end = llen+end; //如果end小于0,则修改end为"llen+end"

    if (start < 0) start = 0; //如果依然小于0,那么设置start为0

    ...

    //2)得到实际start和end之后,会根据编码类型进行不同的操作

    //首先看一下ziplist编码的实现:(遍历)

    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {

    unsigned char *zl = zobj->ptr;

    unsigned char *eptr, *sptr;

    unsigned char *vstr;

    unsigned int vlen;

    long long vlong;

    if (reverse)

    eptr = ziplistIndex(zl,-2-(2*start));

    else //如果是正向获取,因为是key-value对,所以位置是2*start

    eptr = ziplistIndex(zl,2*start);

    sptr = ziplistNext(zl,eptr);

     

    while (rangelen--) { .....

    //如果带有withscores参数,返回客户端时带上score

    if (withscores) addReplyDouble(c,zzlGetScore(sptr));

    if (reverse)

    zzlPrev(zl,&eptr,&sptr); //反向遍历

    else

    zzlNext(zl,&eptr,&sptr); //正向遍历

    }

    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { //skiplist编码类型(遍历)

    zset *zs = zobj->ptr;

    zskiplist *zsl = zs->zsl;

    zskiplistNode *ln;

    sds ele;

    /* 在遍历之前先通过rank找到对应的ele */

    if (reverse) {

    ln = zsl->tail;

    if (start > 0) //反向时,rank值为有序集合长度减去start

    ln = zslGetElementByRank(zsl,llen-start);

    } else {

    ln = zsl->header->level[0].forward;

    if (start > 0) //正向时,rank值为start+1

    ln = zslGetElementByRank(zsl,start+1);

    }

    while(rangelen--) {

    ele = ln->ele;

    if (withscores && c->resp > 2) addReplyArrayLen(c,2);

    addReplyBulkCBuffer(c,ele,sdslen(ele));

    if (withscores) addReplyDouble(c,ln->score);

    //开始遍历,正向访问forward,反向访问backward

    ln = reverse ? ln->backward : ln->level[0].forward;

    }

    } else {

    serverPanic("Unknown sorted set encoding");

    }

    }

    3)遍历时得到的结果需要暂存到缓冲区中,得到的结果不是直接返回给客户端,而是暂存到缓存"c->buf"中,最终统一输出。

    zrevrange与zrange相比,唯一不同是做了反转;

     

    2.分值查找(zrangebyscore)

    基本实现跟zrange类似,不同点是根据score的区间找到遍历开始的位置,

    对于ziplist编码类型,代码如下:

    if (reverse) {

    eptr = zzlLastInRange(zl,&range);

    } else {

    eptr = zzlFirstInRange(zl,&range);

    }

    这里可以参照zcount的实现。接下来就是从开始位置往后遍历,找到结束的位置。对于zrangebyscore来说是找到比max大的位置,对于zrevrangebyscore来说是找到比min小的位置代码如下:

    while (eptr && limit--) { //从开始位置开始遍历

    score = zzlGetScore(sptr);

    /* 找到不再是区间的位置跳出遍历 */

    if (reverse) {

    if (!zslValueGteMin(score,&range)) break;

    } else {

    if (!zslValueLteMax(score,&range)) break;

    }

    ...

    if (reverse) {

    zzlPrev(zl,&eptr,&sptr);

    } else {

    zzlNext(zl,&eptr,&sptr);

    }

    }

    对于编码是skiplist的,操作原理是一样的。

    3.字典序查找(zrangebylex)

    由于该命令只对有相同分值的元素才有作用,而在插入的时候元素是按字典顺序的,因此查找的时候原理很简单。同时这个命令作用有限,这里不再详细展开。

    15.3.2 范围删除

    包括3个命令,zremrangebyscore、zremrange和zremrangebylex,3个命令都调用函数zremrangeGenericCommand,只是传入的参数不同,因此一起分析。

    其中不同参数定义如下:

    #define ZRANGE_RANK 0 //对应zremrange

    #define ZRANGE_SCORE 1 //对应zremrangebyscore

    #define ZRANGE_LEX 2 //对应zremrangebylex

    函数zremrangeGenericCommand的实现主要分为如下4步;

    void zremrangeGenericCommand(client *c, int rangetype) {

    robj *key = c->argv[1];

    robj *zobj;

    int keyremoved = 0;

    unsigned long deleted = 0;

    zrangespec range;

    zlexrangespec lexrange;

    long start, end, llen;

    /* Step 1: 解析参数获取区间 */

    if (rangetype == ZRANGE_RANK) {

    if ((getLongFromObjectOrReply(c,c->argv[2],&start,NULL) != C_OK) ||

    (getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK))

    return;

    } else if (rangetype == ZRANGE_SCORE) {

    if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) {

    addReplyError(c,"min or max is not a float");

    return;

    }

    } else if (rangetype == ZRANGE_LEX) {

    if (zslParseLexRange(c->argv[2],c->argv[3],&lexrange) != C_OK) {

    addReplyError(c,"min or max not valid string range item");

    return;

    }

    }

     

    /* Step 2: 校验范围的合法性, 和zrange类似 */

    if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||

    checkType(c,zobj,OBJ_ZSET)) goto cleanup;

    if (rangetype == ZRANGE_RANK) {

    llen = zsetLength(zobj);

    if (start < 0) start = llen+start;

    if (end < 0) end = llen+end;

    if (start < 0) start = 0;

    if (start > end || start >= llen) {

    addReply(c,shared.czero);

    goto cleanup;

    }

    if (end >= llen) end = llen-1;

    }

     

    /* Step 3: 执行范围删除操作。对编码类型进行区分,根据类型调用不同的函数. */

    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {

    switch(rangetype) {

    case ZRANGE_RANK:

    zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);

    break;

    case ZRANGE_SCORE:

    zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,&range,&deleted);

    break;

    case ZRANGE_LEX:

    zobj->ptr = zzlDeleteRangeByLex(zobj->ptr,&lexrange,&deleted);

    break;

    }

    if (zzlLength(zobj->ptr) == 0) {

    dbDelete(c->db,key); //如果全部元素都删掉了,删除key

    keyremoved = 1;

    }

    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {

    ... //SKIPLIST跟上面类似

    }

     

    /* Step 4: 广播和回复客户端 */

    if (deleted) { ... }

    server.dirty += deleted;

    addReplyLongLong(c,deleted);

     

    cleanup:

    if (rangetype == ZRANGE_LEX) zslFreeLexRange(&lexrange);

    }

    15.4 集合运算

    包括两个命令:交集zinterstore,并集zunionstore。底层调用的都是zunionstore,其实现需要经过如下几步。

    1)解析参数,读取key。

    2)将key按照对应有序集合的大小从小到大排序,用来提升算法的性能,见代码:

    qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality)

    3)分别对交集和并集单独处理。

    交集计算,思路就是遍历最小集合元素,判断是否存在其他集合中,都存在则存入目标集合中,否则不存。

    并集计算,并集肯定大于等于最大的集合,先创建一个和最大集合一样大的集合,然后遍历所有集合,插入到新目标集合中去。

     

    15.5 本章小结

    有序集合相关的命令,有序集合根据元素大小,底层实现分为两种,一种是ziplist,另一种是dict和skiplist。基于这3种数据结构,分析有序集合的基本操作,批量操作和集合的运算。

      

  • 相关阅读:
    我的Javascript之旅——对象的原型链之由来
    如何增强Scrum Teams之间的协作(一)——Feature Team的优势
    如何增强Scrum Teams之间的协作——引子
    关于Automated Acceptance Testing Tool的讨论
    请不要代替你的团队思考
    如何增强Scrum Teams之间的协作(二)——Feature Team的挑战
    爬虫 xpath
    django 一
    Python Requests库详解
    python3爬虫环境搭建
  • 原文地址:https://www.cnblogs.com/coloz/p/13812853.html
Copyright © 2011-2022 走看看