zoukankan      html  css  js  c++  java
  • Redis之ZSet命令

    0.前言

      Redis有序集合ZSet可以按分数进行排序, 存储结构可能使用ziplist,skiplist和hash表, zset_max_ziplist_entries和zset_max_ziplist_value两个字段控制zset采用何种存储方式, zset_max_ziplist_entries表示ziplist中存储score和member占用的内存空间超过该值, 则存储结构会转变为skiplist和hash表; zset_max_ziplist_value表示ziplist中存储的member值占用的内存空间超过该值, 则存储结构会转变为skiplist和hash表. 存储使用ziplist时, ziplist存储格式为[member, score, member, score....], 以score值升序进行排序.存储使用skiplist时, 需要hash表配合使用, hash表存储以member为key, score为值, 加快member检索score速度; skiplist存储score和member, 并以score值进行升序排序.

    1.目录

    1.ZADD命令
    2.ZCOUNT命令
    3.ZRANGE命令
    4.交集并集命令

    2.ZADD命令

    添加元素到有序集合中, 命令格式 : ZADD key score member [[score member] [score member] ...], 入口函数zaddCommand

    void zaddCommand(redisClient *c) {
        zaddGenericCommand(c,0);
    }
    /*函数向有序集合中添加一个元素, 在incr值设置时, 同时可以实现对score值进行累加操作*/
    void zaddGenericCommand(redisClient *c, int incr) {
        static char *nanerr = "resulting score is not a number (NaN)";
        robj *key = c->argv[1];
        robj *ele;
        robj *zobj;
        robj *curobj;
        double score = 0, *scores = NULL, curscore = 0.0;
        int j, elements = (c->argc-2)/2;
        int added = 0, updated = 0;
    
        if (c->argc % 2) {
            addReply(c,shared.syntaxerr);
            return;
        }
    
        /* 获取scores值, score必须为数字, 否则直接返回错误*/
        scores = zmalloc(sizeof(double)*elements);
        for (j = 0; j < elements; j++) {
            if (getDoubleFromObjectOrReply(c,c->argv[2+j*2],&scores[j],NULL)
                != REDIS_OK) goto cleanup;
        }
    
        /* 如果有序集合不存在, 直接进行创建 */
        zobj = lookupKeyWrite(c->db,key);
        if (zobj == NULL) {
              /*对限制条件进行判断,选择存储结构*/
            if (server.zset_max_ziplist_entries == 0 ||
                server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
            {
                   /*创建有序集合, 存储结构式skiplist*/
                zobj = createZsetObject();
            } else {
                   /*创建有序集合, 存储结构式ziplist*/
                zobj = createZsetZiplistObject();
            }
            dbAdd(c->db,key,zobj);
        } else {
            if (zobj->type != REDIS_ZSET) {
                addReply(c,shared.wrongtypeerr);
                goto cleanup;
            }
        }
    
        for (j = 0; j < elements; j++) {
            score = scores[j];
    
            if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
                unsigned char *eptr;
    
                /* 在skiplist中进行查找, 找到则删除原来的, 插入新的, 否则直接进行插入操作*/
                ele = c->argv[3+j*2];
                if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
                       /*incr值设置, 则需要进行累加*/
                    if (incr) {
                        score += curscore;
                        if (isnan(score)) {
                            addReplyError(c,nanerr);
                            goto cleanup;
                        }
                    }
                    /* 如果member和score都没有变化, 则不进行任何操作*/
                    if (score != curscore) {
                        zobj->ptr = zzlDelete(zobj->ptr,eptr);
                        zobj->ptr = zzlInsert(zobj->ptr,ele,score);
                        server.dirty++;
                        updated++;
                    }
                } else {
                    /* 同样插入元素时进行检测ziplist转skiplist的阀值*/
                    zobj->ptr = zzlInsert(zobj->ptr,ele,score);
                    if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
                        zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
                    if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
                        zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
                    server.dirty++;
                    added++;
                }
            } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
                zset *zs = zobj->ptr;
                zskiplistNode *znode;
                dictEntry *de;
                   /*存储结构为skiplist时, 首先从hash表中通过member查找到score, 同样找到删除原来的, 找不到则直接插入*/
                ele = c->argv[3+j*2] = tryObjectEncoding(c->argv[3+j*2]);
                de = dictFind(zs->dict,ele);
                if (de != NULL) {
                    curobj = dictGetKey(de);
                    curscore = *(double*)dictGetVal(de);
                    if (incr) {
                        score += curscore;
                        if (isnan(score)) {
                            addReplyError(c,nanerr);
                            goto cleanup;
                        }
                    }
                    /* member和score完全一样, 则不进行任何操作*/
                    if (score != curscore) {
                        redisAssertWithInfo(c,curobj,zslDelete(zs->zsl,curscore,curobj));
                        znode = zslInsert(zs->zsl,score,curobj);
                        incrRefCount(curobj); /* Re-inserted in skiplist. */
                        dictGetVal(de) = &znode->score; /* Update score ptr. */
                        server.dirty++;
                        updated++;
                    }
                } else {
                    znode = zslInsert(zs->zsl,score,ele);
                    incrRefCount(ele); /* Inserted in skiplist. */
                    redisAssertWithInfo(c,NULL,dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
                    incrRefCount(ele); /* Added to dictionary. */
                    server.dirty++;
                    added++;
                }
            } else {
                redisPanic("Unknown sorted set encoding");
            }
        }
        if (incr) /* ZINCRBY */
            addReplyDouble(c,score);
        else /* ZADD */
            addReplyLongLong(c,added);
    
    cleanup:
        zfree(scores);
        if (added || updated) {
            signalModifiedKey(c->db,key);
            notifyKeyspaceEvent(REDIS_NOTIFY_ZSET,
                incr ? "zincr" : "zadd", key, c->db->id);
        }
    }
    

    ZCOUNT命令

    统计score值在一个范围内的元素数量, 命令格式: ZCOUNT key min max, zcount操作其实很简单, ziplist存储结构, 只需要依次遍历然后比较score值是否在范围内, 并记录满足条件的元素个数即可. skiplist可以对score值进行快速检索, 因此可以找到落入范围内开始元素和结束元素排名, 通过简单运算可以得出满足条件的元素数量.

    void zcountCommand(redisClient *c) {
        robj *key = c->argv[1];
        robj *zobj;
        zrangespec range;
        int count = 0;
    
        /* 解析min和max参数值, 并放入range中 */
        if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
            addReplyError(c,"min or max is not a float");
            return;
        }
        
         /*查找有序集合*/
        if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
            checkType(c, zobj, REDIS_ZSET)) return;
    
        if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
            unsigned char *zl = zobj->ptr;
            unsigned char *eptr, *sptr;
            double score;
    
            /* 查找第一个位于range范围内的元素*/
            eptr = zzlFirstInRange(zl,&range);
    
            /* 找不到直接返回空 */
            if (eptr == NULL) {
                addReply(c, shared.czero);
                return;
            }
    
            /* 找到第一个符号条件元素, 然后依次遍历ziplist对符合条件的元素进行计数*/
            sptr = ziplistNext(zl,eptr);
            score = zzlGetScore(sptr);
            redisAssertWithInfo(c,zobj,zslValueLteMax(score,&range));
            while (eptr) {
                score = zzlGetScore(sptr);
    
                /* score必须小于给定返回的最大值max, 否则计数结束 */
                if (!zslValueLteMax(score,&range)) {
                    break;
                } else {
                    count++;
                    zzlNext(zl,&eptr,&sptr);
                }
            }
        } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
            zset *zs = zobj->ptr;
            zskiplist *zsl = zs->zsl;
            zskiplistNode *zn;
            unsigned long rank;
    
            /* skiplist中查找第一个落入范围的元素 */
            zn = zslFirstInRange(zsl, &range);
            if (zn != NULL) {
                  /*首先计算出大于min的元素个数count, rank获取的是大于min值第一个元素排名*/
                rank = zslGetRank(zsl, zn->score, zn->obj);
                count = (zsl->length - (rank - 1));
    
                /* skiplist中查找最后一个落入范围内的元素 */
                zn = zslLastInRange(zsl, &range);
                if (zn != NULL) {
                        /*rank获取的是最后一个落入返回内的元素排名*/
                    rank = zslGetRank(zsl, zn->score, zn->obj);
                        /*(zsl->length-rank)表示所有大于max元素数量, 与count做减法计算出结果*/
                    count -= (zsl->length - rank);
                }
            }
        } else {
            redisPanic("Unknown sorted set encoding");
        }
        addReplyLongLong(c, count);
    }
    

    ZRANGE命令

    获取一个位置范围内的元素, 命令格式: ZRANGE key start stop [WITHSCORES], start, stop元素代表位置下标, 从0开始. 这里只对range操作进行讲述, 其他的range操作大同小异, 只是对增加了一些判断的条件参数, 不在展开一一说明.

    void zrangeCommand(redisClient *c) {
        zrangeGenericCommand(c,0);
    }
    /*求range范围内元素*/
    void zrangeGenericCommand(redisClient *c, int reverse) {
        robj *key = c->argv[1];
        robj *zobj;
        int withscores = 0;
        long start;
        long end;
        int llen;
        int rangelen;
        
         /*取出start和stop值*/
        if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
            (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
         /*设置withscores标志位*/
        if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
            withscores = 1;
        } else if (c->argc >= 5) {
            addReply(c,shared.syntaxerr);
            return;
        }
        if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
             || checkType(c,zobj,REDIS_ZSET)) return;
    
        /*由于start和end可以是负值, 全部进行转换为正值*/
        llen = zsetLength(zobj);
        if (start < 0) start = llen+start;
        if (end < 0) end = llen+end;
        if (start < 0) start = 0;
    
        /* 判断range范围是否符合条件,不合条件直接返回空 */
        if (start > end || start >= llen) {
            addReply(c,shared.emptymultibulk);
            return;
        }
         /*下标超出范围则置为为集合结尾元素位置*/
        if (end >= llen) end = llen-1;
        rangelen = (end-start)+1;
    
        /* Return the result in form of a multi-bulk reply */
        addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
    
        if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
            unsigned char *zl = zobj->ptr;
            unsigned char *eptr, *sptr;
            unsigned char *vstr;
            unsigned int vlen;
            long long vlong;
             
              /* ziplist首先找到start位置的元素, 然后依次遍历rangelen个元素, 返回给客户端*/
            if (reverse)
                eptr = ziplistIndex(zl,-2-(2*start));
            else
                eptr = ziplistIndex(zl,2*start);
    
            redisAssertWithInfo(c,zobj,eptr != NULL);
            sptr = ziplistNext(zl,eptr);
             
            while (rangelen--) {
                redisAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
                redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
                if (vstr == NULL)
                    addReplyBulkLongLong(c,vlong);
                else
                    addReplyBulkCBuffer(c,vstr,vlen);
    
                if (withscores)
                    addReplyDouble(c,zzlGetScore(sptr));
    
                if (reverse)
                    zzlPrev(zl,&eptr,&sptr);
                else
                    zzlNext(zl,&eptr,&sptr);
            }
    
        } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
            zset *zs = zobj->ptr;
            zskiplist *zsl = zs->zsl;
            zskiplistNode *ln;
            robj *ele;
    
            /* skiplist同样根据start位置, 找到相应的元素, 遍历rangelen个元素返回给客户端*/
            if (reverse) {
                ln = zsl->tail;
                if (start > 0)
                    ln = zslGetElementByRank(zsl,llen-start);
            } else {
                ln = zsl->header->level[0].forward;
                if (start > 0)
                    ln = zslGetElementByRank(zsl,start+1);
            }
    
            while(rangelen--) {
                redisAssertWithInfo(c,zobj,ln != NULL);
                ele = ln->obj;
                addReplyBulk(c,ele);
                if (withscores)
                    addReplyDouble(c,ln->score);
                ln = reverse ? ln->backward : ln->level[0].forward;
            }
        } else {
            redisPanic("Unknown sorted set encoding");
        }
    }
    

    交集并集命令

    求交集zinterstore, 求并集zunionstore, 两个命令操作相对比较复杂, 操作使用的是同一个函数, 命令格式如下, 非常类似.


    zinterstor命令格式 : ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]
    zunionstore命令格式: ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]

    /*有序集合求并集入口函数*/
    void zunionstoreCommand(redisClient *c) {
        zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
    }
    /*有序集合求交集入口函数*/
    void zinterstoreCommand(redisClient *c) {
        zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
    }
    
    #define REDIS_AGGR_SUM 1  //求和操作
    #define REDIS_AGGR_MIN 2  //取最小值
    #define REDIS_AGGR_MAX 3  //取最大值
    #define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))
    
    /*聚合操作函数, 比较大小和求和操作*/
    inline static void zunionInterAggregate(double *target, double val, int aggregate) {
        if (aggregate == REDIS_AGGR_SUM) {
            *target = *target + val;
            /* The result of adding two doubles is NaN when one variable
             * is +inf and the other is -inf. When these numbers are added,
             * we maintain the convention of the result being 0.0. */
            if (isnan(*target)) *target = 0.0;
        } else if (aggregate == REDIS_AGGR_MIN) {
            *target = val < *target ? val : *target;
        } else if (aggregate == REDIS_AGGR_MAX) {
            *target = val > *target ? val : *target;
        } else {
            /* safety net */
            redisPanic("Unknown ZUNION/INTER aggregate type");
        }
    }
    /*具体进行并集和交集操作的函数*/
    void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
        int i, j;
        long setnum;
        int aggregate = REDIS_AGGR_SUM;
        zsetopsrc *src;
        zsetopval zval;
        robj *tmp;
        unsigned int maxelelen = 0;
        robj *dstobj;
        zset *dstzset;
        zskiplistNode *znode;
        int touched = 0;
    
        /* 获取表示key数量的numkeys字段 */
        if ((getLongFromObjectOrReply(c, c->argv[2], &setnum, NULL) != REDIS_OK))
            return;
    
        if (setnum < 1) {
            addReplyError(c,
                "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
            return;
        }
    
        /* numkeys字段大于实际输入的key数量, 直接返回语法错误提示 */
        if (setnum > c->argc-3) {
            addReply(c,shared.syntaxerr);
            return;
        }
    
        /* 读取所有的key对应的集合 */
        src = zcalloc(sizeof(zsetopsrc) * setnum);
        for (i = 0, j = 3; i < setnum; i++, j++) {
            robj *obj = lookupKeyWrite(c->db,c->argv[j]);
            if (obj != NULL) {
                if (obj->type != REDIS_ZSET && obj->type != REDIS_SET) {
                    zfree(src);
                    addReply(c,shared.wrongtypeerr);
                    return;
                }
    
                src[i].subject = obj;
                src[i].type = obj->type;
                src[i].encoding = obj->encoding;
            } else {
                src[i].subject = NULL;
            }
            /*weight默认为1*/
            src[i].weight = 1.0;
        }
    
        /* 如果后面还有参数, 解析剩余参数weights和aggregate字段 */
        if (j < c->argc) {
            int remaining = c->argc - j;
    
            while (remaining) {
                if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
                    j++; remaining--;
                    for (i = 0; i < setnum; i++, j++, remaining--) {
                        if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
                                "weight value is not a float") != REDIS_OK)
                        {
                            zfree(src);
                            return;
                        }
                    }
                } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
                    j++; remaining--;
                    if (!strcasecmp(c->argv[j]->ptr,"sum")) {
                        aggregate = REDIS_AGGR_SUM;
                    } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
                        aggregate = REDIS_AGGR_MIN;
                    } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
                        aggregate = REDIS_AGGR_MAX;
                    } else {
                        zfree(src);
                        addReply(c,shared.syntaxerr);
                        return;
                    }
                    j++; remaining--;
                } else {
                    zfree(src);
                    addReply(c,shared.syntaxerr);
                    return;
                }
            }
        }
    
        /* 对集合按集合元素多少进行升序排列 */
        qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
        
         /*创建一个新的集合存放计算结果*/
        dstobj = createZsetObject();
        dstzset = dstobj->ptr;
        memset(&zval, 0, sizeof(zval));
    
        if (op == REDIS_OP_INTER) {
            /* 最少元素集合为空直接跳过不执行 */
            if (zuiLength(&src[0]) > 0) {
                /* 类似于无序集合求交集, 遍历第一个集合, 并在剩余的集合中查找, 查找不到则跳过该元素, 全部查找到则将该元素放入结果集合dstzset中*/
                zuiInitIterator(&src[0]);
                while (zuiNext(&src[0],&zval)) {
                    double score, value;
    
                    score = src[0].weight * zval.score;
                    if (isnan(score)) score = 0;
    
                    for (j = 1; j < setnum; j++) {
                        /* 如果后面集合中有和第一个集合和第一个集合是同一个集合, 则特殊判断, 因为迭代操作不安全 */
                        if (src[j].subject == src[0].subject) {
                            value = zval.score*src[j].weight;
                            zunionInterAggregate(&score,value,aggregate);
                        } else if (zuiFind(&src[j],&zval,&value)) {
                                  /* 找到元素, 然后score值与weight值做乘积, 最后进行聚合操作*/
                            value *= src[j].weight;
                            zunionInterAggregate(&score,value,aggregate);
                        } else {
                            break;
                        }
                    }
                       
                        /*只有待查元素在所有集合中都出现,才将此元素添加进结果集合中*/
                    if (j == setnum) {
                        tmp = zuiObjectFromValue(&zval);
                        znode = zslInsert(dstzset->zsl,score,tmp);
                        incrRefCount(tmp); /* added to skiplist */
                        dictAdd(dstzset->dict,tmp,&znode->score);
                        incrRefCount(tmp); /* added to dictionary */
                             /*判断并存储最大元素长度, 后面判断是否需要转换数据结构*/
                        if (tmp->encoding == REDIS_ENCODING_RAW)
                            if (sdslen(tmp->ptr) > maxelelen)
                                maxelelen = sdslen(tmp->ptr);
                    }
                }
                zuiClearIterator(&src[0]);
            }
        } else if (op == REDIS_OP_UNION) {
            dict *accumulator = dictCreate(&setDictType,NULL);
            dictIterator *di;
            dictEntry *de;
            double score;
    
            if (setnum) {
                /*为了尽可能的减少rehash操作, 扩展存放结果字典空间为最后一个集合的大小, 上面已经排序过, 最后一个是最大的集合*/
                dictExpand(accumulator,zuiLength(&src[setnum-1]));
            }
    
            /* 下面开始循环所有集合, 并在accumulator中查找, 如果找到则进行相应的运算, 否则直接插入accumulator中*/
            for (i = 0; i < setnum; i++) {
                if (zuiLength(&src[i]) == 0) continue;
    
                zuiInitIterator(&src[i]);
                while (zuiNext(&src[i],&zval)) {
                    /* Initialize value */
                    score = src[i].weight * zval.score;
                    if (isnan(score)) score = 0;
    
                    /* 查找元素是否已经在accumulator字典中 */
                    de = dictFind(accumulator,zuiObjectFromValue(&zval));
                    if (de == NULL) {
                        tmp = zuiObjectFromValue(&zval);
                        /* 记录元素最长的值, 后面用于判断是否需要对集合进行转换*/
                        if (tmp->encoding == REDIS_ENCODING_RAW) {
                            if (sdslen(tmp->ptr) > maxelelen)
                                maxelelen = sdslen(tmp->ptr);
                        }
                        /* 直接添加到字典中 */
                        de = dictAddRaw(accumulator,tmp);
                        incrRefCount(tmp);
                        dictSetDoubleVal(de,score);
                    } else {
                        /* 元素存在,按照指定的规则进行运算 */
                        zunionInterAggregate(&de->v.d,score,aggregate);
                    }
                }
                zuiClearIterator(&src[i]);
            }
             
              /*遍历将accumulator字典转化为有序集合*/
             
            di = dictGetIterator(accumulator);
            dictExpand(dstzset->dict,dictSize(accumulator));
            while((de = dictNext(di)) != NULL) {
                robj *ele = dictGetKey(de);
                score = dictGetDoubleVal(de);
                znode = zslInsert(dstzset->zsl,score,ele);
                incrRefCount(ele); /* added to skiplist */
                dictAdd(dstzset->dict,ele,&znode->score);
                incrRefCount(ele); /* added to dictionary */
            }
            dictReleaseIterator(di);
    
            /* We can free the accumulator dictionary now. */
            dictRelease(accumulator);
        } else {
            redisPanic("Unknown operator");
        }
        
         /*存储目标key存在,则删除原来的集合*/
        if (dbDelete(c->db,dstkey)) {
            signalModifiedKey(c->db,dstkey);
            touched = 1;
            server.dirty++;
        }
        if (dstzset->zsl->length) {
            /* 判断是否需要将存储结构转换为ziplist */
            if (dstzset->zsl->length <= server.zset_max_ziplist_entries &&
                maxelelen <= server.zset_max_ziplist_value)
                    zsetConvert(dstobj,REDIS_ENCODING_ZIPLIST);
    
            dbAdd(c->db,dstkey,dstobj);
            addReplyLongLong(c,zsetLength(dstobj));
            if (!touched) signalModifiedKey(c->db,dstkey);
            notifyKeyspaceEvent(REDIS_NOTIFY_ZSET,
                (op == REDIS_OP_UNION) ? "zunionstore" : "zinterstore",
                dstkey,c->db->id);
            server.dirty++;
        } else {
            decrRefCount(dstobj);
            addReply(c,shared.czero);
            if (touched)
                notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",dstkey,c->db->id);
        }
        zfree(src);
    }
    
  • 相关阅读:
    makefile
    xcode
    centos
    debug
    服务器开发
    socket
    xcode
    调用cpp库更基本
    nodejs cpp动态库
    html5图片裁剪法--
  • 原文地址:https://www.cnblogs.com/ourroad/p/4900061.html
Copyright © 2011-2022 走看看