zoukankan      html  css  js  c++  java
  • Redis源码分析(九)--- t_list,t_string的分析

              今天我是看完了t_list和t_string的代码,从名字知道,这也是和之前的t_hash非常类似的,无非就是各种形式,转化来转化去的。先讲讲t_list,对比于昨天的t_hash,t_hash是ziplist和dict之间的转换,t_list则是描述的是ziplist压缩表和linedlist普通链表,换句话说,当client这个robj作为参数传入的时候,都分为ENCODING_ZIIPLIST和ENCODING_LINKEDLIST 2类编码方式处理。还有一个在t_list出现的一个 比较新颖的东西是出现了锁的概念,操作系统的东西在这里也会碰到了,这里的代码也非常值得学习。下面,我们看看一般的t_list中包括的一些方法和命令:

    /* List方法 */
    /* list的操作分为2种,LINKEDLIST普通链表和ZIPLIST压缩列表的相关操作 */
    void listTypeTryConversion(robj *subject, robj *value) /* 判断是否需要将ziplist转为linkedlist */
    void listTypePush(robj *subject, robj *value, int where) /* 在头部或尾部插入value元素 */
    robj *listTypePop(robj *subject, int where)  /* 在列表的头部或尾弹出元素 */
    unsigned long listTypeLength(robj *subject) /* 列表的长度 */
    listTypeIterator *listTypeInitIterator(robj *subject, long index, unsigned char direction) /* 返回列表迭代器,方向有头尾之分 */
    void listTypeReleaseIterator(listTypeIterator *li) /* 释放列表迭代器 */
    int listTypeNext(listTypeIterator *li, listTypeEntry *entry) /* 根据列表迭代器,获取下一个元素 */
    robj *listTypeGet(listTypeEntry *entry) /* 获取listType元素,有ziplist和linkedlist */
    void listTypeInsert(listTypeEntry *entry, robj *value, int where) /* listType了类型插入元素操作 */
    int listTypeEqual(listTypeEntry *entry, robj *o) /* 判断2个元素是否相等 */
    void listTypeDelete(listTypeEntry *entry) /* listType类型删除元素 */
    void listTypeConvert(robj *subject, int enc) /* listType类型的转换操作,这里指的是往linkedList上转 */
    	
    /* List的相关命令 */
    void pushGenericCommand(redisClient *c, int where) /* 插入操作命令的原始操作 */
    void lpushCommand(redisClient *c) /* 左边插入元素命令 */
    void rpushCommand(redisClient *c) /* 右边插入元素命令 */
    void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) /* 有返回状态的插入操作命令,假设首先都是能够实现插入命令的 */
    void lpushxCommand(redisClient *c) /* 左边插入元素有返回消息的命令 */
    void rpushxCommand(redisClient *c) /* 右边插入元素有返回消息的命令 */
    void linsertCommand(redisClient *c) /* 列表指定位置插入元素操作命令 */
    void llenCommand(redisClient *c) /* 列表返回长度命令 */
    void lindexCommand(redisClient *c) /* 获取index位置的上的元素 */
    void lsetCommand(redisClient *c) /* listType类型设置value命令 */
    void popGenericCommand(redisClient *c, int where) /* 实现弹出操作的原始命令 */
    void lpopCommand(redisClient *c) /* 左边弹出元素命令 */
    void rpopCommand(redisClient *c) /* 右边弹出操作命令 */
    void lrangeCommand(redisClient *c) /* 移动listType位置操作 */
    void ltrimCommand(redisClient *c) /* listType实现截取操作,把多余范围的元素删除 */
    void lremCommand(redisClient *c) /* 移除在listType中出现的与指定的元素相等的元素 */
    void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) /* 元素从一个obj右边弹出,在从左侧推入另一个obj列表操作 */
    void rpoplpushCommand(redisClient *c) /* 右边弹出,左边推入元素的命令 */
    void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) >/* 设置客户端为阻塞模式,并设置超时时间,当请求特定的key元素时 */
    void unblockClientWaitingData(redisClient *c) /* 客户端解锁操作 */
    void signalListAsReady(redisDb *db, robj *key) /* 将key存入server中,后续可以用于客户端的存取 */
    int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where) /* 根据server,Client的key,value情况,判断server此时能否服务于Client,否则Client将被阻塞 */
    void handleClientsBlockedOnLists(void) /* 服务端解除阻塞住的Client */
    int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) /* 获取超时时间 */ 
    void blockingPopGenericCommand(redisClient *c, int where) /* 阻塞弹出命令的原始操作 */
    void blpopCommand(redisClient *c) /* 左边弹出数据的阻塞式命令 */
    void brpopCommand(redisClient *c) /* 右边弹出数据的阻塞式命令 */
    void brpoplpushCommand(redisClient *c) /* 弹出推入阻塞式命令 */
    

    看到这么多API,是否有被吓到的感觉,但里面其实很多的方法都很简单,我只挑几个比较典型的方法做一下分析,向什么返回长度了,获取,设置键值的大家可以自己看具体的方法,不难的。一般的t_list的处理流程,比如获取迭代器的方法:

    /* Stores pointer to current the entry in the provided entry structure
     * and advances the position of the iterator. Returns 1 when the current
     * entry is in fact an entry, 0 otherwise. */
    int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
        /* Protect from converting when iterating */
        redisAssert(li->subject->encoding == li->encoding);
    
        entry->li = li;
        if (li->encoding == REDIS_ENCODING_ZIPLIST) {
            entry->zi = li->zi;
            if (entry->zi != NULL) {
                if (li->direction == REDIS_TAIL)
                	//根据方向调用pre或next的方法
                    li->zi = ziplistNext(li->subject->ptr,li->zi);
                else
                    li->zi = ziplistPrev(li->subject->ptr,li->zi);
                return 1;
            }
        } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
            entry->ln = li->ln;
            if (entry->ln != NULL) {
                if (li->direction == REDIS_TAIL)
                	//普通的列表的调用方式同上
                    li->ln = li->ln->next;
                else
                    li->ln = li->ln->prev;
                return 1;
            }
        } else {
            redisPanic("Unknown list encoding");
        }
        return 0;
    }
    

    很多API和t_hash的方法都是极其类似的,我就不多说了,我们将关注点,放在列表的锁控制上,先讲个前提,在redis客户端执行操作的时候,会有个请求超时时间,在这个请求的时间内,客户端如果没有找到key对应的数据,是会被阻塞的,什么意思呢,比如:

    /* 设置客户端为阻塞模式,并设置超时时间,当请求特定的key元素时 */
    /* 这个客户端阻塞的意思:当客户端请求list中某个特定key值时,如果key存在且列表非空,当然 */
    /* 当然不会阻塞,正常返回数据,如果当客户端请求某个key不存在,或列表为empty的时候,客户端将被阻塞 */
    /* 只有当有这个key被写入的时候,客户端才会被解锁 *
    这个其实就是操作系统中的PV操作类似的原理,跟java里的wait(),signal()方法的模型是一样的,他的实现代码如下:

    /* This is how the current blocking POP works, we use BLPOP as example:
     * - If the user calls BLPOP and the key exists and contains a non empty list
     *   then LPOP is called instead. So BLPOP is semantically the same as LPOP
     *   if blocking is not required.
     * - If instead BLPOP is called and the key does not exists or the list is
     *   empty we need to block. In order to do so we remove the notification for
     *   new data to read in the client socket (so that we'll not serve new
     *   requests if the blocking request is not served). Also we put the client
     *   in a dictionary (db->blocking_keys) mapping keys to a list of clients
     *   blocking for this keys.
     * - If a PUSH operation against a key with blocked clients waiting is
     *   performed, we mark this key as "ready", and after the current command,
     *   MULTI/EXEC block, or script, is executed, we serve all the clients waiting
     *   for this list, from the one that blocked first, to the last, accordingly
     *   to the number of elements we have in the ready list.
     */
    
    /* Set a client in blocking mode for the specified key, with the specified
     * timeout */
    /* 设置客户端为阻塞模式,并设置超时时间,当请求特定的key元素时 */
    /* 这个客户端阻塞的意思:当客户端请求list中某个特定key值时,如果key存在且列表非空,当然 */
    /* 当然不会阻塞,正常返回数据,如果当客户端请求某个key不存在,或列表为empty的时候,客户端将被阻塞 */
    /* 只有当有这个key被写入的时候,客户端才会被解锁 */		
    void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
        dictEntry *de;
        list *l;
        int j;
    
        //设置超时时间
        c->bpop.timeout = timeout;
        c->bpop.target = target;
    
        if (target != NULL) incrRefCount(target);
    
        for (j = 0; j < numkeys; j++) {
        	//下面为为找到的key上锁
            /* If the key already exists in the dict ignore it. */
            //如果此时,某些key已经存在了,直接忽略
            if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
            incrRefCount(keys[j]);
    
            /* And in the other "side", to map keys -> clients */
            //根据key找到对于请求该key的客户端,也就是将要被阻塞的Client
            de = dictFind(c->db->blocking_keys,keys[j]);
            if (de == NULL) {
                int retval;
    
                /* For every key we take a list of clients blocked for it */
                l = listCreate();
                //为c客户端添加一个阻塞的key
                retval = dictAdd(c->db->blocking_keys,keys[j],l);
                //增加key的引用次数
                incrRefCount(keys[j]);
                redisAssertWithInfo(c,keys[j],retval == DICT_OK);
            } else {
                l = dictGetVal(de);
            }
            listAddNodeTail(l,c);
        }
    
        /* Mark the client as a blocked client */
        /* 标记Client为阻塞的客户端 */
        c->flags |= REDIS_BLOCKED;
        //服务端的阻塞客户端计数递增
        server.bpop_blocked_clients++;
    }
    

    所有的key未来都会存在于server.readykeys里面,客户端判断自己所请求的key是否存在于服务端中的readykeys中,如果不存在就会被阻塞了。将key存入服务单的数据库的操作:

    /* If the specified key has clients blocked waiting for list pushes, this
     * function will put the key reference into the server.ready_keys list.
     * Note that db->ready_keys is a hash table that allows us to avoid putting
     * the same key again and again in the list in case of multiple pushes
     * made by a script or in the context of MULTI/EXEC.
     *
     * The list will be finally processed by handleClientsBlockedOnLists() */
    /* 将key存入server中,后续可以用于客户端的存取 */
    void signalListAsReady(redisDb *db, robj *key) {
        readyList *rl;
    
        /* No clients blocking for this key? No need to queue it. */
        /* 如果没有客户端为此key所阻塞的,直接不添加 */
        if (dictFind(db->blocking_keys,key) == NULL) return;
    
        /* Key was already signaled? No need to queue it again. */
        /* 如果key已经存在,不不要重复添加 */
        if (dictFind(db->ready_keys,key) != NULL) return;
    
        /* Ok, we need to queue this key into server.ready_keys. */
        rl = zmalloc(sizeof(*rl));
        rl->key = key;
        rl->db = db;
        incrRefCount(key);
        //添加到list列表尾部
        listAddNodeTail(server.ready_keys,rl);
    
        /* We also add the key in the db->ready_keys dictionary in order
         * to avoid adding it multiple times into a list with a simple O(1)
         * check. */
        incrRefCount(key);
        redisAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
    }
    

    其实这里我不免会有个小问题,为什么list出现Client被阻塞的操作,而在t_hash中不会出现类似的操作呢?略疑惑。

          下面说说t_string的用法,t_string里面没有涉及什么特定的结构体,就是最直接的键值对的设置,直接改变数据库中的键值,先列出里面的主要方法:

    /* 方法 API */
    static int checkStringLength(redisClient *c, long long size) /* 检查字符串长度是否超出限制最大长度,512*1024*1024个字节长度 */
    void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) /* 设置的泛型指令操作 */
    void setCommand(redisClient *c) /* 设置的综合方法,设置都是根据Client中的参数而定 */
    void setnxCommand(redisClient *c) /* key不存在的时候才设置值,flag为REDIS_SET_NX时 */
    void setexCommand(redisClient *c) /* key存在的时候才设置值,flag为REDIS_SET_NO_FLAGS,命令到期时间单位为秒 */
    void psetexCommand(redisClient *c) /* key存在的时候才设置值,flag为REDIS_SET_NO_FLAGS,命令到期时间单位为毫秒 */
    int getGenericCommand(redisClient *c) /* 获取命令的泛型命令 */
    void getCommand(redisClient *c) /* 获取命令的操作 */
    void getsetCommand(redisClient *c) /* 获取set命令的操作 */
    void setrangeCommand(redisClient *c) /* 设置rangge命令的操作 */
    void getrangeCommand(redisClient *c) /* 获取range命令的操作 */
    void mgetCommand(redisClient *c) /* 执行多次getCommand指令 */
    void msetGenericCommand(redisClient *c, int nx) /* 一次运行多个设置操作泛型命令 */
    void msetCommand(redisClient *c) /* 多次设置指令的非NX模式 */
    void msetnxCommand(redisClient *c)  /* 多次设置指令的NX模式,即只在不存在的时候才设置 */
    void incrDecrCommand(redisClient *c, long long incr) /* 增加或减少固定值的操作,减少其实就是增加-1 */
    void incrCommand(redisClient *c) /* 递增1操作,incr递增设置为1 */
    void decrCommand(redisClient *c) /* 递减1操作,incr递增设置为-1 */
    void incrbyCommand(redisClient *c) /* 增加指令,从Client中获取递增值 */
    void decrbyCommand(redisClient *c) /* 减少指令,从Client中获取减少值 */
    void incrbyfloatCommand(redisClient *c) /* 增加float类型值的操作命令,在获取原始值的时候,获取的方法不一样,为getLongDoubleFromObjectOrReply*/
    void appendCommand(redisClient *c) /* 追加命令操作,其实也是key:value的形式 */
    void strlenCommand(redisClient *c) /* 获取命令长度 */
    

    里面都是一些键值对的简单操作,有一个不同点是,里面的设置操作分为3种:

    /* SET操作的一些FLAG */
    #define REDIS_SET_NO_FLAGS 0
    #define REDIS_SET_NX (1<<0)     /* Set if key not exists. */
    #define REDIS_SET_XX (1<<1)     /* Set if key exists. */

    如上面所说,nx模式指的是key不存在的时候才设置值,xx为存在的时候设置,这种模式设置有命令到期时间的限制,分为单位为秒级和毫秒级,而nx模式下,没有时间上的限制,调用的泛型方法:

    void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply)
    注意其中的expire,和时间单位unit:

    /* key不存在的时候才设置值,flag为REDIS_SET_NX时 */
    void setnxCommand(redisClient *c) {
        c->argv[2] = tryObjectEncoding(c->argv[2]);
        setGenericCommand(c,REDIS_SET_NX,c->argv[1],c->argv[2],NULL,0,shared.cone,shared.czero);
    }
    
    /* key存在的时候才设置值,flag为REDIS_SET_NO_FLAGS,命令到期时间单位为秒 */
    void setexCommand(redisClient *c) {
        c->argv[3] = tryObjectEncoding(c->argv[3]);
        setGenericCommand(c,REDIS_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_SECONDS,NULL,NULL);
    }
    
    /* key存在的时候才设置值,flag为REDIS_SET_NO_FLAGS,命令到期时间单位为毫秒 */
    void psetexCommand(redisClient *c) {
        c->argv[3] = tryObjectEncoding(c->argv[3]);
        setGenericCommand(c,REDIS_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_MILLISECONDS,NULL,NULL);
    }

    nx模式时expire和unit参数分别为NULL,0,所以我做出了如下猜想,可能是因为,当键值对存在的时候,考虑多并发设置的情况,有些Client可能被阻塞,所以有超时时间的存在,而key不存在的时候,就看谁更快了吧,就是直接设置。下面还有一个这个文件比较有意思的方法,有点批处理的味道,换句话说,一个方法里执行多次set操作:

    /* 一次运行多个设置操作泛型命令 */
    void msetGenericCommand(redisClient *c, int nx) {
        int j, busykeys = 0;
    
        /* 设置操作一定是成对出现的 */
        if ((c->argc % 2) == 0) {
            addReplyError(c,"wrong number of arguments for MSET");
            return;
        }
        /* Handle the NX flag. The MSETNX semantic is to return zero and don't
         * set nothing at all if at least one already key exists. */
        if (nx) {
            for (j = 1; j < c->argc; j += 2) {
                if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
                    busykeys++;
                }
            }
            
            //如果key存在,返回0
            if (busykeys) {
                addReply(c, shared.czero);
                return;
            }
        }
    
    	//隔2个执行设置key操作指令一次
        for (j = 1; j < c->argc; j += 2) {
            c->argv[j+1] = tryObjectEncoding(c->argv[j+1]);
            setKey(c->db,c->argv[j],c->argv[j+1]);
            notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",c->argv[j],c->db->id);
        }
        server.dirty += (c->argc-1)/2;
        addReply(c, nx ? shared.cone : shared.ok);
    }

    也不是什么特别的操作,应该redis编写者为了方便效率的提高吧。其他的一些方法,请读者可以学习t_string.c的文件,还有提醒一点,在redis数据库中,存储的形式都是K-V形式的,所有的获取,设置都是通过KEY的形式操作,你会看到很多这样的方法:

    dbAdd(c->db,c->argv[1],new);
    setKey(c->db,c->argv[j],c->argv[j+1]);






  • 相关阅读:
    VC 6.0 编译器的使用帮助
    生成可执行jar文件(转)
    DataGridView填充、更新、删除(多行)Sql Express 2005数据库
    访问域服务器修改密码,报“未知的身份验证机制”的错误搞定!
    尝试用Word2007发布博客
    iBatisNet Tutorial
    Asp.net 技巧集合
    TableAdapter 无法插入或更新Access数据库
    OrcasBeta1 installed 玩一玩Silverlight
    Microsoft .NET Compact Framework 开发常见问题解答从msdn上找到。
  • 原文地址:https://www.cnblogs.com/bianqi/p/12184272.html
Copyright © 2011-2022 走看看