zoukankan      html  css  js  c++  java
  • Redis系列(八):数据结构List双向链表中阻塞版本之BLPOP、BRPOP和LINDEX、LINSERT、LRANGE命令详解

    1.BRPOP、BLPOP

    BLPOP:

    BLPOP 是阻塞式列表的弹出原语。 它是命令 LPOP 的阻塞版本,这是因为当给定列表内没有任何元素可供弹出的时候,

    连接将被 BLPOP 命令阻塞。 当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。

    BRPOP:

    BRPOP 是一个阻塞的列表弹出原语。 它是 RPOP 的阻塞版本,因为这个命令会在给定list无法弹出任何元素的时候阻塞连接。 该命令会按照给出的 key 顺序查看 list,并在找到的第一个非空 list 的尾部弹出一个元素。

    请在 BLPOP 文档 中查看该命令的准确语义,因为 BRPOP 和 BLPOP 基本是完全一样的,除了它们一个是从尾部弹出元素,而另一个是从头部弹出元素。

    时间复杂度 :O(1)

    127.0.0.1:6379> rpush order 10001
    (integer) 1
    127.0.0.1:6379> rpush order 10002
    (integer) 2
    127.0.0.1:6379> rpush order 10003
    (integer) 3
    127.0.0.1:6379> rpush order 10004
    (integer) 4
    127.0.0.1:6379> rpush order 10005
    (integer) 5
    127.0.0.1:6379> rpush order 10006
    (integer) 1
    127.0.0.1:6379>
    127.0.0.1:6379> brpop order 0
    1) "order"
    2) "10005"
    127.0.0.1:6379> brpop order 0
    1) "order"
    2) "10004"
    127.0.0.1:6379> brpop order 0
    1) "order"
    2) "10003"
    127.0.0.1:6379> brpop order 0
    1) "order"
    2) "10002"
    127.0.0.1:6379> brpop order 0
    1) "order"
    2) "10001"
    127.0.0.1:6379> brpop order 0
    1) "order"
    2) "10006"
    (2765.54s)
    127.0.0.1:6379>

     源码解析

    void blpopCommand(client *c) {
        blockingPopGenericCommand(c,LIST_HEAD);
    }
    
    void brpopCommand(client *c) {
        blockingPopGenericCommand(c,LIST_TAIL);
    }
    blockingPopGenericCommand
    /* Blocking RPOP/LPOP */
    void blockingPopGenericCommand(client *c, int where) {
        robj *o;
        mstime_t timeout;
        int j;
    
        if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
            != C_OK) return;
    
        for (j = 1; j < c->argc-1; j++) {
            o = lookupKeyWrite(c->db,c->argv[j]);
            if (o != NULL) {
                if (o->type != OBJ_LIST) {
                    addReply(c,shared.wrongtypeerr);
                    return;
                } else {
                    if (listTypeLength(o) != 0) {
                        /* Non empty list, this is like a non normal [LR]POP. */
                        char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
                        robj *value = listTypePop(o,where);
                        serverAssert(value != NULL);
    
                        addReplyArrayLen(c,2);
                        addReplyBulk(c,c->argv[j]);
                        addReplyBulk(c,value);
                        decrRefCount(value);
                        notifyKeyspaceEvent(NOTIFY_LIST,event,
                                            c->argv[j],c->db->id);
                        if (listTypeLength(o) == 0) {
                            dbDelete(c->db,c->argv[j]);
                            notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
                                                c->argv[j],c->db->id);
                        }
                        signalModifiedKey(c,c->db,c->argv[j]);
                        server.dirty++;
    
                        /* Replicate it as an [LR]POP instead of B[LR]POP. */
                        rewriteClientCommandVector(c,2,
                            (where == LIST_HEAD) ? shared.lpop : shared.rpop,
                            c->argv[j]);
                        return;
                    }
                }
            }
        }
    
        /* If we are inside a MULTI/EXEC and the list is empty the only thing
         * we can do is treating it as a timeout (even with timeout 0). */
        if (c->flags & CLIENT_MULTI) {
            addReplyNullArray(c);
            return;
        }
    
        /* If the list is empty or the key does not exists we must block */
        blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,NULL);
    }
    blockForKeys
    void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) {
        dictEntry *de;
        list *l;
        int j;
    
        c->bpop.timeout = timeout;
        c->bpop.target = target;
    
        if (target != NULL) incrRefCount(target);
    
        for (j = 0; j < numkeys; j++) {
            /* Allocate our bkinfo structure, associated to each key the client
             * is blocked for. */
            bkinfo *bki = zmalloc(sizeof(*bki));
            if (btype == BLOCKED_STREAM)
                bki->stream_id = ids[j];
    
            /* If the key already exists in the dictionary ignore it. */
            if (dictAdd(c->bpop.keys,keys[j],bki) != DICT_OK) {
                zfree(bki);
                continue;
            }
            incrRefCount(keys[j]);
    
            /* And in the other "side", to map keys -> clients */
            de = dictFind(c->db->blocking_keys,keys[j]);
            if (de == NULL) {
                int retval;
    
                /* For every key we take a list of clients blocked for it */
                l = listCreate();
                retval = dictAdd(c->db->blocking_keys,keys[j],l);
                incrRefCount(keys[j]);
                serverAssertWithInfo(c,keys[j],retval == DICT_OK);
            } else {
                l = dictGetVal(de);
            }
            listAddNodeTail(l,c);
            bki->listnode = listLast(l);
        }
        blockClient(c,btype);
    }
    blockClient
    void blockClient(client *c, int btype) {
        c->flags |= CLIENT_BLOCKED;
        c->btype = btype;
        server.blocked_clients++;
        server.blocked_clients_by_type[btype]++;
        addClientToTimeoutTable(c);
    }
    addClientToTimeoutTable
    void addClientToTimeoutTable(client *c) {
        if (c->bpop.timeout == 0) return;
        uint64_t timeout = c->bpop.timeout;
        unsigned char buf[CLIENT_ST_KEYLEN];
        encodeTimeoutKey(buf,timeout,c);
        if (raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL))
            c->flags |= CLIENT_IN_TO_TABLE;
    }

    2.LINDEX

    返回列表里的元素的索引 index 存储在 key 里面。 下标是从0开始索引的,所以 0 是表示第一个元素,

    1 表示第二个元素,并以此类推。 负数索引用于指定从列表尾部开始索引的元素。在这种方法下,-1 表示最后一个元素,-2 表示倒数第二个元素,并以此往前推。

    当 key 位置的值不是一个列表的时候,会返回一个error。

    时间复杂度:O(N)

    127.0.0.1:6379> lpush mylist "World" "Hello"
    (integer) 2
    127.0.0.1:6379> lindex mylist 0
    "Hello"
    127.0.0.1:6379> lindex mylist 1
    "World"
    127.0.0.1:6379>

    源码解析

    void lindexCommand(client *c) {
        robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]);
        if (o == NULL || checkType(c,o,OBJ_LIST)) return;
        long index;
        robj *value = NULL;
    
        if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
            return;
    
        if (o->encoding == OBJ_ENCODING_QUICKLIST) {
            quicklistEntry entry;
            if (quicklistIndex(o->ptr, index, &entry)) {
                if (entry.value) {
                    value = createStringObject((char*)entry.value,entry.sz);
                } else {
                    value = createStringObjectFromLongLong(entry.longval);
                }
                addReplyBulk(c,value);
                decrRefCount(value);
            } else {
                addReplyNull(c);
            }
        } else {
            serverPanic("Unknown list encoding");
        }
    }
    /* Populate 'entry' with the element at the specified zero-based index
     * where 0 is the head, 1 is the element next to head
     * and so on. Negative integers are used in order to count
     * from the tail, -1 is the last element, -2 the penultimate
     * and so on. If the index is out of range 0 is returned.
     *
     * Returns 1 if element found
     * Returns 0 if element not found */
    int quicklistIndex(const quicklist *quicklist, const long long idx,
                       quicklistEntry *entry) {
        quicklistNode *n;
        unsigned long long accum = 0;
        unsigned long long index;
        int forward = idx < 0 ? 0 : 1; /* < 0 -> reverse, 0+ -> forward */
    
        initEntry(entry);
        entry->quicklist = quicklist;
    
        if (!forward) {
            index = (-idx) - 1;
            n = quicklist->tail;
        } else {
            index = idx;
            n = quicklist->head;
        }
    
        if (index >= quicklist->count)
            return 0;
    
        while (likely(n)) {
            if ((accum + n->count) > index) {
                break;
            } else {
                D("Skipping over (%p) %u at accum %lld", (void *)n, n->count,
                  accum);
                accum += n->count;
                n = forward ? n->next : n->prev;
            }
        }
    
        if (!n)
            return 0;
    
        D("Found node: %p at accum %llu, idx %llu, sub+ %llu, sub- %llu", (void *)n,
          accum, index, index - accum, (-index) - 1 + accum);
    
        entry->node = n;
        if (forward) {
            /* forward = normal head-to-tail offset. */
            entry->offset = index - accum;
        } else {
            /* reverse = need negative offset for tail-to-head, so undo
             * the result of the original if (index < 0) above. */
            entry->offset = (-index) - 1 + accum;
        }
    
        quicklistDecompressNodeForUse(entry->node);
        entry->zi = ziplistIndex(entry->node->zl, entry->offset);
        ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval);
        /* The caller will use our result, so we don't re-compress here.
         * The caller can recompress or delete the node as needed. */
        return 1;
    }

    3.LINSERT

    把 value 插入存于 key 的列表中在基准值 pivot 的前面或后面。

    当 key 不存在时,这个list会被看作是空list,任何操作都不会发生。

    当 key 存在,但保存的不是一个list的时候,会返回error。

    时间复杂度:O(N)

    127.0.0.1:6379> linsert mylist before "World" "There"
    (integer) 3
    127.0.0.1:6379> lrange mylist 0 -1
    1) "Hello"
    2) "There"
    3) "World"
    127.0.0.1:6379>

    4.LRANGE

    返回存储在 key 的列表里指定范围内的元素。 start 和 end 偏移量都是基于0的下标,即list的第一个元素下标是0(list的表头),第二个元素下标是1,以此类推。

    偏移量也可以是负数,表示偏移量是从list尾部开始计数。 例如, -1 表示列表的最后一个元素,-2 是倒数第二个,以此类推。

    时间复杂度:O(S+N)

    127.0.0.1:6379> lrange mylist 0 -1
    1) "Hello"
    2) "There"
    3) "World"
    127.0.0.1:6379> lrange mylist 0 1
    1) "Hello"
    2) "There"
    127.0.0.1:6379>

    源码解析

    void lrangeCommand(client *c) {
        robj *o;
        long start, end, llen, rangelen;
    
        if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
            (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
    
        if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL
             || checkType(c,o,OBJ_LIST)) return;
        llen = listTypeLength(o);
    
        /* convert negative indexes */
        if (start < 0) start = llen+start;
        if (end < 0) end = llen+end;
        if (start < 0) start = 0;
    
        /* Invariant: start >= 0, so this test will be true when end < 0.
         * The range is empty when start > end or start >= length. */
        if (start > end || start >= llen) {
            addReply(c,shared.emptyarray);
            return;
        }
        if (end >= llen) end = llen-1;
        rangelen = (end-start)+1;
    
        /* Return the result in form of a multi-bulk reply */
        addReplyArrayLen(c,rangelen);
        if (o->encoding == OBJ_ENCODING_QUICKLIST) {
            listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL);
    
            while(rangelen--) {
                listTypeEntry entry;
                listTypeNext(iter, &entry);
                quicklistEntry *qe = &entry.entry;
                if (qe->value) {
                    addReplyBulkCBuffer(c,qe->value,qe->sz);
                } else {
                    addReplyBulkLongLong(c,qe->longval);
                }
            }
            listTypeReleaseIterator(iter);
        } else {
            serverPanic("List encoding is not QUICKLIST!");
        }
    }
  • 相关阅读:
    博弈论基础与习题(未完)
    三视图求最多方块数
    二维前缀和应用
    卡特兰数
    UVa 11806 Cheerleaders(容斥定理)
    逃出升天
    求排列的逆序数
    求2进制下1的个数
    字符串哈希基础与应用
    网络流基础与应用
  • 原文地址:https://www.cnblogs.com/vic-tory/p/13214064.html
Copyright © 2011-2022 走看看