zoukankan      html  css  js  c++  java
  • Redis源码解析:26集群(二)键的分配与迁移

             Redis集群通过分片的方式来保存数据库中的键值对:一个集群中,每个键都通过哈希函数映射到一个槽位,整个集群共分16384个槽位,集群中每个主节点负责其中的一部分槽位。

             当数据库中的16384个槽位都有节点在处理时,集群处于上线状态;相反,如果数据库中有任何一个槽没有得到处理,那么集群处于下线状态。

             所谓键的分配,实际上就是指槽位在集群节点中的分配;所谓键的迁移,实际上指槽位在集群节点间的迁移。

     

    一:数据结构   

             在集群最主要的数据结构,记录集群状态的clusterState结构体中,与槽位相关的属性有:

    clusterNode *slots[16384];
    clusterNode *migrating_slots_to[16384];
    clusterNode *importing_slots_from[16384];
    zskiplist *slots_to_keys;

             slots数组记录了16384个槽位,分别由哪个集群节点负责:比如server->cluster.slots[0] = node,这说明0号槽位由node节点负责;

             migrating_slots_to数组记录了16384个槽位中,当前节点所负责的槽位正在迁出到哪个节点。比如server.cluster->migrating_slots_to[0] = node,这说明当前节点负责的0号槽位,正在迁出到node节点;

             importing_slots_from数组记录了16384个槽位中,当前节点正在从哪个节点将某个槽位迁入到本节点中;比如server.cluster->importing_slots_from[0] = node,这说明当前节点正在从node节点处迁入0号槽位;

             通过以上这些属性,可以快速得到某个槽位由哪个节点负责,以及该槽位正在迁出或迁入到哪个节点。

             slots_to_keys是个跳跃表,该跳跃表中,以槽位号为分数进行排序。每个跳跃表节点保存了槽位号(分数),以及该槽位上的某个key。通过该跳跃表,可以快速得到当前节点所负责的每一个槽位中,都有哪些key。

     

             在表示集群节点的clusterNode结构体中,与槽位相关的属性有:

    unsigned char slots[16384/8];
    int numslots;

             slots记录了节点负责处理哪些槽位。它是个位数组,其中每一个比特位表示一个槽位号,如果该比特位置为1,则说明该槽位由该节点负责;

             numslots表示该节点负责的槽位总数;

             通过以上这些属性,可以快速得到某个节点负责哪些槽位。

     

    二:分配槽位

             在集群刚建立时,需要手动为每个集群主节点分配其负责的槽位。这主要是通过向节点发送”CLUSTER  ADDSLOTS”命令实现的。该命令的格式是:”CLUSTER  ADDSLOTS  <slot>  [slot]  ...”。

             “CLUSTER”命令的处理函数是clusterCommand。在该函数中,处理” CLUSTER ADDSLOTS”部分的代码是:

    else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
                   !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
        {
            /* CLUSTER ADDSLOTS <slot> [slot] ... */
            /* CLUSTER DELSLOTS <slot> [slot] ... */
            int j, slot;
            unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS);
            int del = !strcasecmp(c->argv[1]->ptr,"delslots");
    
            memset(slots,0,REDIS_CLUSTER_SLOTS);
            /* Check that all the arguments are parseable and that all the
             * slots are not already busy. */
            for (j = 2; j < c->argc; j++) {
                if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
                    zfree(slots);
                    return;
                }
                if (del && server.cluster->slots[slot] == NULL) {
                    addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
                    zfree(slots);
                    return;
                } else if (!del && server.cluster->slots[slot]) {
                    addReplyErrorFormat(c,"Slot %d is already busy", slot);
                    zfree(slots);
                    return;
                }
                if (slots[slot]++ == 1) {
                    addReplyErrorFormat(c,"Slot %d specified multiple times",
                        (int)slot);
                    zfree(slots);
                    return;
                }
            }
            for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
                if (slots[j]) {
                    int retval;
    
                    /* If this slot was set as importing we can clear this
                     * state as now we are the real owner of the slot. */
                    if (server.cluster->importing_slots_from[j])
                        server.cluster->importing_slots_from[j] = NULL;
    
                    retval = del ? clusterDelSlot(j) :
                                   clusterAddSlot(myself,j);
                    redisAssertWithInfo(c,NULL,retval == REDIS_OK);
                }
            }
            zfree(slots);
            clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
            addReply(c,shared.ok);
        } 

             这里” CLUSTER  ADDSLOTS”和” CLUSTER  DELSLOTS”命令,采用类似的代码进行处理。ADDSLOTS和DELSLOTS,分别用于将槽位分配给节点,以及将槽位从节点中删除。ADDSLOTS命令常用于新建集群时,给每个主节点分配槽位;DELSLOTS常用于手动修改集群配置,或者用于DEBUG操作,实际中很少用到。

     

             在代码中,首先,依次检查命令参数中的槽位号:如果是DELSLOTS操作,但是数组server.cluster->slots中,记录负责该槽位号的节点为NULL,则反馈给客户端"unassigned"错误;如果是ADDSLOTS操作,但是数组server.cluster->slots中,记录已经有节点负责该槽位号了,则反馈给客户端"busy"错误;然后将参数中的槽位号记录到数组slots中,如果slots中该槽位已经设置过了,说明发来的命令中,该槽位号出现了多次,因此反馈给客户端"multiple times"错误;

             然后,依次轮训slots中记录的每一个槽位号进行处理:首先如果该槽位号在数组server.cluster->importing_slots_from中不为NULL,则将其置为NULL,因为该槽位已经由本节点负责了;然后根据是ADDSLOTS,还是DELSLOTS操作,调用clusterAddSlot或clusterDelSlot处理;

             最后,反馈给客户端"OK";

     

             因此,clusterAddSlot才是是实际用于分配槽位的函数,该函数的实现如下:

    int clusterAddSlot(clusterNode *n, int slot) {
        if (server.cluster->slots[slot]) return REDIS_ERR;
        clusterNodeSetSlotBit(n,slot);
        server.cluster->slots[slot] = n;
        return REDIS_OK;
    }

             该函数的实现很简单,就是要设置位数组n->slots中的相应位,以及server.cluster->slots[slot]。

             首先,根据server.cluster->slots[slot]的值,判断该槽位是否已经分配给其他节点了,若是,则直接返回REDIS_ERR;

             然后调用clusterNodeSetSlotBit,在位数组n->slots中设置相应的位;

             最后,将server.cluster->slots[slot]置为n;

             以上,就相当于把slot槽位分配给了节点n。

     

             顺便看一下删除槽位的函数clusterDelSlot的实现:

    int clusterDelSlot(int slot) {
        clusterNode *n = server.cluster->slots[slot];
    
        if (!n) return REDIS_ERR;
        redisAssert(clusterNodeClearSlotBit(n,slot) == 1);
        server.cluster->slots[slot] = NULL;
        return REDIS_OK;
    }

             该函数清除slot槽位的信息,将其置为未分配的。成功返回REDIS_OK;否则若该槽位已经被置为未分配的了,则返回REDIS_ERR;

             该函数的实现很简单,就是清除位数组n->slots中的相应位,以及将server.cluster->slots[slot]置为NULL。

             首先从server.cluster->slots[slot]取得当前负责该槽位的节点n;如果n为NULL,则返回REDIS_ERR;

             然后调用clusterNodeClearSlotBit,将该槽位从位数组n->slots中清除;

             最后置server.cluster->slots[slot]为NULL;

             以上,就相当于把slot槽位置为未分配状态了。


             集群节点在发送心跳包时,会附带自己当前记录的槽位信息(clusterNode结构中的位数组slots),这样,最终集群中的每个节点都会知道所有槽位的分配情况。


    三:槽位迁移(重新分片)

             在集群稳定一段时间之后,如果有新的集群节点加入,或者某个集群节点下线了。此时就涉及到将某个节点上的槽位迁移到另一个节点上的问题。

             该过程也是需要手动完成的,Redis提供了辅助脚本redis-trib.rb,以”reshard”参数调用该脚本就可以实现重新分片的操作。但是本质上,该脚本就是通过向迁入节点和迁出节点发送一些命令实现的。

             槽位迁移的步骤是:

     

    1:向迁入节点发送” CLUSTER  SETSLOT  <slot>  IMPORTING  <node>”命令

             其中<slot>是要迁入的槽位号,<node>是当前负责该槽位的节点。在函数clusterCommand中,处理该命令的代码如下:

        else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
            /* SETSLOT 10 MIGRATING <node ID> */
            /* SETSLOT 10 IMPORTING <node ID> */
            /* SETSLOT 10 STABLE */
            /* SETSLOT 10 NODE <node ID> */
            int slot;
            clusterNode *n;
    
            if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
    
            if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
                ...
            } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
                if (server.cluster->slots[slot] == myself) {
                    addReplyErrorFormat(c,
                        "I'm already the owner of hash slot %u",slot);
                    return;
                }
                if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
                    addReplyErrorFormat(c,"I don't know about node %s",
                        (char*)c->argv[3]->ptr);
                    return;
                }
                server.cluster->importing_slots_from[slot] = n;
            } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
                ...
            } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
                ...
            } else {
                addReplyError(c,
                    "Invalid CLUSTER SETSLOT action or number of arguments");
                return;
            }
            clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
            addReply(c,shared.ok);
        }

             针对"CLUSTER SETSLOT"命令,首先从命令参数中取得槽位号slot,如果解析错误,则回复给客户端错误信息,然后直接返回;

             如果收到的是" CLUSTER  SETSLOT <SLOT>  IMPORTING  <node>"命令,说明本节点需要迁入槽位。

             因此,首先判断server.cluster->slots[slot]是否等于myself,若是,说明slot槽位已由本节点负责,因此回复客户端错误信息后直接返回;然后根据参数<node>在字典server.cluster->nodes中查询迁入槽位的源节点n,若找不到,则回复客户端错误信息后返回;最后,置server.cluster->importing_slots_from[slot]为n;

     

    2:向迁出节点发送” CLUSTER  SETSLOT  <slot>  MIGRATING  <node>”命令

             其中<slot>是要迁出的槽位号,<node>是迁出槽位的目的地节点。在函数clusterCommand中,处理该命令的代码如下:

        else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
            /* SETSLOT 10 MIGRATING <node ID> */
            /* SETSLOT 10 IMPORTING <node ID> */
            /* SETSLOT 10 STABLE */
            /* SETSLOT 10 NODE <node ID> */
            int slot;
            clusterNode *n;
    
            if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
    
            if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
                if (server.cluster->slots[slot] != myself) {
                    addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
                    return;
                }
                if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
                    addReplyErrorFormat(c,"I don't know about node %s",
                        (char*)c->argv[4]->ptr);
                    return;
                }
                server.cluster->migrating_slots_to[slot] = n;
            } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
                ...
            } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
                ...
            } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
                ...
            } else {
                addReplyError(c,
                    "Invalid CLUSTER SETSLOT action or number of arguments");
                return;
            }
            clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
            addReply(c,shared.ok);
        }

             如果收到的是"CLUSTER  SETSLOT <SLOT>  MIGRATING  <node>"命令,说明本节点需要迁出槽位。

             因此,首先判断server.cluster->slots[slot]是否等于myself,若不是,说明slot槽位不由本节点负责,因此回复客户端错误信息后直接返回;然后根据参数<node>在字典server.cluster->nodes中查询迁出的目的地节点n,若找不到,则回复客户端错误信息后返回;最后,置server.cluster->migrating_slots_to[slot]为n;

     

    3:向迁出节点发送”CLUSTER  GETKEYSINSLOT  <slot>  <count>”命令

             该命令主要用于获得迁出槽位<slot>中的<count>个key,以便下一步能够执行key的迁移操作。该命令以及下一步的key迁移操作需要执行多次,直到槽位<slot>中没有剩余的key为止。

     

             这里就需要用到之前介绍过的,clusterState结构体中的slots_to_keys跳跃表,该跳跃表中,以槽位号为分数进行排序。每个跳跃表节点保存了槽位号(分数),以及该槽位上的某个key。通过该跳跃表,可以快速得到当前节点所负责的每一个槽位中,都有哪些key。

             每当向数据库中添加或删除key时,同时也会向该跳跃表中添加和删除节点:当调用dbAdd函数向数据库添加key时,在dbAdd中,判断如果当前处于集群模式下,就会调用slotToKeyAdd函数,向slots_to_keys跳跃表中添加节点。slotToKeyAdd函数的代码如下:

    void slotToKeyAdd(robj *key) {
        unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
    
        zslInsert(server.cluster->slots_to_keys,hashslot,key);
        incrRefCount(key);
    }

             该函数很简单,首先计算该key对应的槽位号hashslot;然后以槽位号hashslot为分数,将hashslot和key插入到跳跃表server.cluster->slots_to_keys中;

     

             当调用dbDelete函数从数据库删除key时,在dbDelete中,判断如果当前处于集群模式下,就会调用slotToKeyDel函数,从slots_to_keys跳跃表中删除节点。slotToKeyDel函数的代码如下:

    void slotToKeyDel(robj *key) {
        unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
        zslDelete(server.cluster->slots_to_keys,hashslot,key);
    }

             该函数很简单,首先计算该key对应的槽位号hashslot;然后将该key,及其对应的槽位号,从跳跃表server.cluster->slots_to_keys中删除。

     

             回到”CLUSTER  GETKEYSINSLOT“命令,在函数clusterCommand中,处理该命令的代码如下:

        else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
            /* CLUSTER GETKEYSINSLOT <slot> <count> */
            long long maxkeys, slot;
            unsigned int numkeys, j;
            robj **keys;
    
            if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != REDIS_OK)
                return;
            if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL)
                != REDIS_OK)
                return;
            if (slot < 0 || slot >= REDIS_CLUSTER_SLOTS || maxkeys < 0) {
                addReplyError(c,"Invalid slot or number of keys");
                return;
            }
    
            keys = zmalloc(sizeof(robj*)*maxkeys);
            numkeys = getKeysInSlot(slot, keys, maxkeys);
            addReplyMultiBulkLen(c,numkeys);
            for (j = 0; j < numkeys; j++) addReplyBulk(c,keys[j]);
            zfree(keys);
        }

             首先从命令参数中解析出槽位号slot,以及要获取的key的数量maxkeys。如果解析失败,或者得到的值不正常,则回复客户端错误信息后直接返回;

             然后调用getKeysInSlot,从跳跃表server.cluster->slots_to_keys中取出slot槽位中最多maxkeys个key,取出的key存入数组keys中;getKeysInSlot函数返回实际取得的key的数量;

             最后,将取得的所有key及数量回复给客户端;

     

             getKeysInSlot函数的代码如下:

    unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
        zskiplistNode *n;
        zrangespec range;
        int j = 0;
    
        range.min = range.max = hashslot;
        range.minex = range.maxex = 0;
    
        n = zslFirstInRange(server.cluster->slots_to_keys, &range);
        while(n && n->score == hashslot && count--) {
            keys[j++] = n->obj;
            n = n->level[0].forward;
        }
        return j;
    }

             根据槽位号,得到要查找的范围是[hashslot,hashslot],首先调用zslFirstInRange,在跳跃表中找到第一个处于该范围的节点;然后依次轮训该节点及其在level0上的后继节点,只要节点的分数为hashslot,就将该节点的key填充到keys中;

             最后返回实际获取的key的个数。

     

    4:向迁出节点发送”MIGRATE <target_host> <target_port> <key> <target_database> <timeout>”命令

             针对上一步得到的每一个key,向迁出节点发送该命令,用于将<key>迁出到目标节点的<target_database>数据库中,迁出过程的超时时间为<timeout>,一旦超时,则回复客户端错误信息。

             该命令不仅可以用于集群节点间的key迁移,还能用于普通节点间的key迁移。如果是在集群模式下,则<target_database>固定为0。

             该命令是原子性的将key从A迁移到B,迁移过程中,节点A和节点B都会阻塞(很小的时间),从而避免了竞争的发生。

     

             4.1、缓存连接

             因为一般情况下,是需要将多个key从A迁移到B中,为了避免A和B之间需要多次TCP建链,这里采用了缓存连接的实现方法。具体而言,当迁移第一个key时,节点A向节点B建链,并将该TCP链接缓存起来,一定时间内,当需要迁移下一个key时,可以直接使用缓存的链接,而无需重复建链。缓存的链接如果长时间不用,则会自动释放。

             源码中使用migrateCachedSocket结构体表示缓存的TCP连接,该结构体的定义如下:

    typedef struct migrateCachedSocket {
        int fd;
        long last_dbid;
        time_t last_use_time;
    } migrateCachedSocket;

             该结构中保存了socket描述符fd,上一次使用的目的节点的数据库ID,以及该链接上一次被使用的时间。

     

             migrateGetSocket就是用于建链并缓存的函数,该函数的代码如下:

    migrateCachedSocket* migrateGetSocket(redisClient *c, robj *host, robj *port, long timeout) {
        int fd;
        sds name = sdsempty();
        migrateCachedSocket *cs;
    
        /* Check if we have an already cached socket for this ip:port pair. */
        name = sdscatlen(name,host->ptr,sdslen(host->ptr));
        name = sdscatlen(name,":",1);
        name = sdscatlen(name,port->ptr,sdslen(port->ptr));
        cs = dictFetchValue(server.migrate_cached_sockets,name);
        if (cs) {
            sdsfree(name);
            cs->last_use_time = server.unixtime;
            return cs;
        }
    
        /* No cached socket, create one. */
        if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
            /* Too many items, drop one at random. */
            dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
            cs = dictGetVal(de);
            close(cs->fd);
            zfree(cs);
            dictDelete(server.migrate_cached_sockets,dictGetKey(de));
        }
    
        /* Create the socket */
        fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
                                    atoi(c->argv[2]->ptr));
        if (fd == -1) {
            sdsfree(name);
            addReplyErrorFormat(c,"Can't connect to target node: %s",
                server.neterr);
            return NULL;
        }
        anetEnableTcpNoDelay(server.neterr,fd);
    
        /* Check if it connects within the specified timeout. */
        if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) {
            sdsfree(name);
            addReplySds(c,
                sdsnew("-IOERR error or timeout connecting to the client
    "));
            close(fd);
            return NULL;
        }
    
        /* Add to the cache and return it to the caller. */
        cs = zmalloc(sizeof(*cs));
        cs->fd = fd;
        cs->last_dbid = -1;
        cs->last_use_time = server.unixtime;
        dictAdd(server.migrate_cached_sockets,name,cs);
        return cs;
    }

             字典server.migrate_cached_sockets表示一个缓存连接池,该字典以目的节点的"<ip>:<port>"为key,以migrateCachedSocket结构为value。该字典中就保存了当前节点所有已经建链的TCP连接;

     

             函数中,首先根据参数host和port,组成key,使用该key查询字典server.migrate_cached_sockets中是否已经缓存了到该地址的连接cs,若找到了缓存的cs,则更新cs->last_use_time为当前时间,然后直接返回cs即可;

             若找不到相应的连接cs,则判断字典当前的大小是否已经达到了阈值64,若是,则从字典中随机选择一个字典项de,取出其中的连接cs,关闭cs->fd,释放cs结构,并将de从字典中删除;

             接下来,调用anetTcpNonBlockConnect,根据地址信息,向远端Redis发起TCP建链,如果anetTcpNonBlockConnect返回-1,则回复给客户端错误信息后,直接返回NULL;

             然后设置socket描述符fd的NO_DELAY选项;然后调用aeWait,等待可写事件的触发,等待时间为timeout,如果在该时间段内没有触发可写事件,则建链超时,因此回复给客户端错误信息,关闭socket描述符,返回NULL;否则,表示建链成功(实际上并没有检查建链是否真的成功,若建链失败,后续调用者在写消息时会发生错误,从而释放连接);

             接下来,构建一个migrateCachedSocket结构的cs,保存socket描述符,置其中的last_dbid为-1,置last_use_time属性为当前时间;然后将cs插入到字典server.migrate_cached_sockets中。

     

             当某个连接长时间不用时,需要断开连接,删除缓存的migrateCachedSocket结构。这是通过migrateCloseTimedoutSockets函数实现的。该函数每隔1秒在定时器函数serverCron中调用一次。该函数的代码如下:

    void migrateCloseTimedoutSockets(void) {
        dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
        dictEntry *de;
    
        while((de = dictNext(di)) != NULL) {
            migrateCachedSocket *cs = dictGetVal(de);
    
            if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
                close(cs->fd);
                zfree(cs);
                dictDelete(server.migrate_cached_sockets,dictGetKey(de));
            }
        }
        dictReleaseIterator(di);
    }

             轮训字典server.migrate_cached_sockets,针对其中的每一个migrateCachedSocket结构的cs,如果该cs的最后一次使用时间,距离当前时间已经超过10s,则关闭socket描述符,释放cs结构,并将其从字典中删除。

     

             4.2、MIGRATE命令

             MIGRATE命令的格式是:”MIGRATE <target_host> <target_port> <key> <target_database> <timeout>  [COPY |REPLACE]",如果最后一个参数是REPLACE,则发送成功之后,还要在当前实例中删除该key;如果是COPY,则无需删除key;默认参数就是REPLACE。

             MIGRATE命令的处理函数是migrateCommand,该函数的代码如下:

    void migrateCommand(redisClient *c) {
        migrateCachedSocket *cs;
        int copy, replace, j;
        long timeout;
        long dbid;
        long long ttl, expireat;
        robj *o;
        rio cmd, payload;
        int retry_num = 0;
    
    try_again:
        /* Initialization */
        copy = 0;
        replace = 0;
        ttl = 0;
    
        /* Parse additional options */
        for (j = 6; j < c->argc; j++) {
            if (!strcasecmp(c->argv[j]->ptr,"copy")) {
                copy = 1;
            } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
                replace = 1;
            } else {
                addReply(c,shared.syntaxerr);
                return;
            }
        }
    
        /* Sanity check */
        if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
            return;
        if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
            return;
        if (timeout <= 0) timeout = 1000;
    
        /* Check if the key is here. If not we reply with success as there is
         * nothing to migrate (for instance the key expired in the meantime), but
         * we include such information in the reply string. */
        if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
            addReplySds(c,sdsnew("+NOKEY
    "));
            return;
        }
    
        /* Connect */
        cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
        if (cs == NULL) return; /* error sent to the client by migrateGetSocket() */
    
        rioInitWithBuffer(&cmd,sdsempty());
    
        /* Send the SELECT command if the current DB is not already selected. */
        int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
        if (select) {
            redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
            redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
            redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
        }
    
        /* Create RESTORE payload and generate the protocol to call the command. */
        expireat = getExpire(c->db,c->argv[3]);
        if (expireat != -1) {
            ttl = expireat-mstime();
            if (ttl < 1) ttl = 1;
        }
        redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
        if (server.cluster_enabled)
            redisAssertWithInfo(c,NULL,
                rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
        else
            redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
        redisAssertWithInfo(c,NULL,sdsEncodedObject(c->argv[3]));
        redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,
                sdslen(c->argv[3]->ptr)));
        redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
    
        /* Emit the payload argument, that is the serialized object using
         * the DUMP format. */
        createDumpPayload(&payload,o);
        redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,
                                    sdslen(payload.io.buffer.ptr)));
        sdsfree(payload.io.buffer.ptr);
    
        /* Add the REPLACE option to the RESTORE command if it was specified
         * as a MIGRATE option. */
        if (replace)
            redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
    
        /* Transfer the query to the other node in 64K chunks. */
        errno = 0;
        {
            sds buf = cmd.io.buffer.ptr;
            size_t pos = 0, towrite;
            int nwritten = 0;
    
            while ((towrite = sdslen(buf)-pos) > 0) {
                towrite = (towrite > (64*1024) ? (64*1024) : towrite);
                nwritten = syncWrite(cs->fd,buf+pos,towrite,timeout);
                if (nwritten != (signed)towrite) goto socket_wr_err;
                pos += nwritten;
            }
        }
    
        /* Read back the reply. */
        {
            char buf1[1024];
            char buf2[1024];
    
            /* Read the two replies */
            if (select && syncReadLine(cs->fd, buf1, sizeof(buf1), timeout) <= 0)
                goto socket_rd_err;
            if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0)
                goto socket_rd_err;
            if ((select && buf1[0] == '-') || buf2[0] == '-') {
                /* On error assume that last_dbid is no longer valid. */
                cs->last_dbid = -1;
                addReplyErrorFormat(c,"Target instance replied with error: %s",
                    (select && buf1[0] == '-') ? buf1+1 : buf2+1);
            } else {
                /* Update the last_dbid in migrateCachedSocket */
                cs->last_dbid = dbid;
                robj *aux;
    
                addReply(c,shared.ok);
    
                if (!copy) {
                    /* No COPY option: remove the local key, signal the change. */
                    dbDelete(c->db,c->argv[3]);
                    signalModifiedKey(c->db,c->argv[3]);
                    server.dirty++;
    
                    /* Translate MIGRATE as DEL for replication/AOF. */
                    aux = createStringObject("DEL",3);
                    rewriteClientCommandVector(c,2,aux,c->argv[3]);
                    decrRefCount(aux);
                }
            }
        }
    
        sdsfree(cmd.io.buffer.ptr);
        return;
    
    socket_wr_err:
        sdsfree(cmd.io.buffer.ptr);
        migrateCloseSocket(c->argv[1],c->argv[2]);
        if (errno != ETIMEDOUT && retry_num++ == 0) goto try_again;
        addReplySds(c,
            sdsnew("-IOERR error or timeout writing to target instance
    "));
        return;
    
    socket_rd_err:
        sdsfree(cmd.io.buffer.ptr);
        migrateCloseSocket(c->argv[1],c->argv[2]);
        if (errno != ETIMEDOUT && retry_num++ == 0) goto try_again;
        addReplySds(c,
            sdsnew("-IOERR error or timeout reading from target node
    "));
        return;
    }

             首先检查最后一个命令参数,如果该参数既不是COPY,也不是REPLACE,则直接回复给客户端语法错误信息;然后从命令中解析出timeout和dbid,若解析错误,则直接回复给客户端错误信息。如果解析得到的timeout小于等于0,则将其置为1000,也就是1秒;

             然后从客户端当前连接的数据库中,查找key,得到其值对象o。如果找不到key,则回复给客户端"+NOKEY",这不算是错误,因为可能该key刚好超时被删除了;

             接下来,根据参数中的host和port,调用migrateGetSocket函数,得到与远端Redis的连接。如果之前已经与该Redis建链了,则该函数会返回之前缓存的连接,否则,直接向该Redis发起TCP同步建链,建链超时时间为timeout。如果建链失败,则在migrateGetSocket中回复给客户端错误信息后,直接返回;

     

             接下来,开始构建要发送给远端Redis的RESTORE命令:首先初始化rio结构的cmd,该结构中记录要发送的命令;如果命令参数中的dbid,与上次迁移时的dbid不同,则需要首先向cmd中填充"SELECT  <dbid>"命令;然后取得该key的超时时间expireat,将其转换为相对时间ttl;如果当前处于集群模式下,则向cmd中填充"RESTORE-ASKING"命令,否则填充"RESTORE"命令;然后向cmd中填充key,以及ttl;然后调用createDumpPayload函数,将值对象o,按照DUMP的格式填充到payload中,然后再将payload填充到cmd中;如果最后一个命令参数是REPLACE,则还需要填充"REPLACE"到cmd中;

     

             接下来,开始向远端Redis发送命令:循环调用syncWrite函数,向远端Redis同步发送cmd中的内容,每次最多发送64k个字节;

             发送完成后,开始读取远端Redis的回复:如果之前发送了"SELECT"命令,则首先读取"SELECT"命令的回复到buf1中;然后读取"RESTORE"命令的回复到buf2中。读取超时时间为timeout;

             如果buf1或buf2首字符为'-',说明远端Redis回复了错误信息,则先设置cs->last_dbid为-1,这样下次迁移时会强制发送"SELECT"命令,然后回复错误信息给客户端;否则,说明迁移成功了,先设置cs->last_dbid为dbid,然后回复客户端"OK"信息。

             如果客户端命令最后一个参数不是"COPY",则先将该key从数据库中删除,然后调用rewriteClientCommandVector函数,将当前客户端的命令修改为"DEL <key>",这样接下来在propagate函数中,会将该DEL命令传递给AOF文件或从节点;

     

             如果写命令或者读回复发生错误,则调用migrateCloseSocket关闭与远端Redis的连接,如果不是超时错误的话,则重试一次,否则回复给客户端相应的错误信息;

            

             注意:为了避免出现竞争条件(同一个key出现在两个节点中),在本函数中,涉及到向其他Redis服务器建链、发送命令和等待回复的过程,以上过程都是同步的,因此如果网络异常,并且超时时间又设置的比较大,则该函数有可能会阻塞Redis对于其他事件的处理,导致其他客户端无法操作当前Redis服务器(亲测)!!!

     

             4.3、RESTORE-ASKING(或RESTORE)命令

             key迁移的目的节点收到源节点发来的RESTORE-ASKING或RESTORE命令后,将命令中的key和value保存到本地数据库中。命令格式是:"RESTORE <key> <ttl> <serialized-value> [REPLACE]"或"RESTORE-ASKING  <key>  <ttl>  <serialized-value>  [REPLACE]"

             这两个命令的区别是:RESTORE-ASKING命令用于集群节点间的key迁移,RESTORE命令用于普通节点间的key迁移。RESTORE-ASKING命令对应的redisCommand结构标志位中带有'k'标记,这样在键迁移时,就不会返回ASK重定向错误;

            

             这两个命令都通过调用restoreCommand函数处理。该函数的代码如下:

    void restoreCommand(redisClient *c) {
        long long ttl;
        rio payload;
        int j, type, replace = 0;
        robj *obj;
    
        /* Parse additional options */
        for (j = 4; j < c->argc; j++) {
            if (!strcasecmp(c->argv[j]->ptr,"replace")) {
                replace = 1;
            } else {
                addReply(c,shared.syntaxerr);
                return;
            }
        }
    
        /* Make sure this key does not already exist here... */
        if (!replace && lookupKeyWrite(c->db,c->argv[1]) != NULL) {
            addReply(c,shared.busykeyerr);
            return;
        }
    
        /* Check if the TTL value makes sense */
        if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) {
            return;
        } else if (ttl < 0) {
            addReplyError(c,"Invalid TTL value, must be >= 0");
            return;
        }
    
        /* Verify RDB version and data checksum. */
        if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == REDIS_ERR)
        {
            addReplyError(c,"DUMP payload version or checksum are wrong");
            return;
        }
    
        rioInitWithBuffer(&payload,c->argv[3]->ptr);
        if (((type = rdbLoadObjectType(&payload)) == -1) ||
            ((obj = rdbLoadObject(type,&payload)) == NULL))
        {
            addReplyError(c,"Bad data format");
            return;
        }
    
        /* Remove the old key if needed. */
        if (replace) dbDelete(c->db,c->argv[1]);
    
        /* Create the key and set the TTL if any */
        dbAdd(c->db,c->argv[1],obj);
        if (ttl) setExpire(c->db,c->argv[1],mstime()+ttl);
        signalModifiedKey(c->db,c->argv[1]);
        addReply(c,shared.ok);
        server.dirty++;
    }

             首先,解析命令中第四个参数是否为"REPLACE",若是则置replace为1,否则,直接回复客户端语法错误信息;

             如果replace为1,则从数据库中查找相应的key,如果查不到,则直接回复客户端错误信息;

             然后从命令中解析ttl参数,如果解析错误,或者解析出的ttl小于0,则直接回复客户端错误信息;

             然后调用verifyDumpPayload函数,验证远端Redis发来的命令参数中,DUMP格式的值对象参数中的验证码是否正确,验证失败则回复客户端错误信息;

             接下来,从命令参数中解析出值对象的类型和值对象本身,将值对象保存在obj中,如果解析错误,则回复客户端错误信息;

             如果replace为1,则将该key从数据库中删除;然后将key和obj添加到数据库中;

             如果ttl不为0,则设置该key的超时时间;最后,回复客户端"OK"信息;

     

             以上,就完成了一个key的迁移过程。

     

    5:向所有节点发送”CLUSTER  SETSLOT  <slot>  NODE  <nodeid>”命令

             当槽位中的所有key都迁移完成之后,需要向集群中所有节点,包括迁移的源节点以及目的节点,发送”CLUSTER  SETSLOT  <slot> NODE  <nodeid>”命令,以便通知所有节点,更新槽位<slot> 新的负责节点为<nodeid>。

             在函数clusterCommand中,处理该命令的代码如下:

        else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
            /* SETSLOT 10 MIGRATING <node ID> */
            /* SETSLOT 10 IMPORTING <node ID> */
            /* SETSLOT 10 STABLE */
            /* SETSLOT 10 NODE <node ID> */
            int slot;
            clusterNode *n;
    
            if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
    
            if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
                ...
            } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
                ...
            } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
                ...
            } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
                /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
                clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
    
                if (!n) {
                    addReplyErrorFormat(c,"Unknown node %s",
                        (char*)c->argv[4]->ptr);
                    return;
                }
                /* If this hash slot was served by 'myself' before to switch
                 * make sure there are no longer local keys for this hash slot. */
                if (server.cluster->slots[slot] == myself && n != myself) {
                    if (countKeysInSlot(slot) != 0) {
                        addReplyErrorFormat(c,
                            "Can't assign hashslot %d to a different node "
                            "while I still hold keys for this hash slot.", slot);
                        return;
                    }
                }
                /* If this slot is in migrating status but we have no keys
                 * for it assigning the slot to another node will clear
                 * the migratig status. */
                if (countKeysInSlot(slot) == 0 &&
                    server.cluster->migrating_slots_to[slot])
                    server.cluster->migrating_slots_to[slot] = NULL;
    
                /* If this node was importing this slot, assigning the slot to
                 * itself also clears the importing status. */
                if (n == myself &&
                    server.cluster->importing_slots_from[slot])
                {
                    /* This slot was manually migrated, set this node configEpoch
                     * to a new epoch so that the new version can be propagated
                     * by the cluster.
                     *
                     * Note that if this ever results in a collision with another
                     * node getting the same configEpoch, for example because a
                     * failover happens at the same time we close the slot, the
                     * configEpoch collision resolution will fix it assigning
                     * a different epoch to each node. */
                    if (clusterBumpConfigEpochWithoutConsensus() == REDIS_OK) {
                        redisLog(REDIS_WARNING,
                            "configEpoch updated after importing slot %d", slot);
                    }
                    server.cluster->importing_slots_from[slot] = NULL;
                }
                clusterDelSlot(slot);
                clusterAddSlot(n,slot);
            } else {
                addReplyError(c,
                    "Invalid CLUSTER SETSLOT action or number of arguments");
                return;
            }
            clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
            addReply(c,shared.ok);
        }

             如果收到的是"CLUSTER  SETSLOT  <SLOT>  NODE  <nodeID>"命令,说明需要更新负责相应槽位的节点。

             首先根据参数<node ID>在字典server.cluster->nodes中查询新的负责该槽位的节点n,若找不到,则回复客户端错误信息后返回;

             如果目前负责该槽位的节点为当前节点myself,并且myself不等于n,说明当前节点正在将该槽位迁出到节点n中,调用countKeysInSlot函数计算该槽位中尚存多少个key,如果该函数返回值不为0,说明该槽位中还有未迁出的key,因此回复客户端错误信息后返回;

             如果当前节点正在迁出该槽位,并且该槽位中所有的key都已经迁出,则置server.cluster->migrating_slots_to[slot]为NULL;

             如果当前节点正在迁入该槽位,并且n就是myself,则首先调用函数clusterBumpConfigEpochWithoutConsensus增加纪元configEpoch的值,然后置server.cluster->importing_slots_from[slot]为NULL;

             最后,调用clusterDelSlot清空该slot相关的信息,然后调用clusterAddSlot,将该槽位的负责人改为节点n;

     

             至此,就完成了一次槽位迁移(重新分片)流程。

     

    四:集群节点执行命令

             在集群模式下,数据库的key分布在多个集群节点中。因此当某个集群节点收到客户端的命令时,与普通模式下稍有不同。这不同主要体现在:

             a:若命令中涉及到多个key,而这些key处于不同的槽位中,则该命令不能被执行,直接返回错误;

             b:某个集群节点收到客户端发来的命令后,会判断命令中的key是否由本节点负责,若是,则直接处理命令;若不是,则反馈给客户端MOVED重定向错误,错误中指明了该key真正的负责节点。客户端收到MOVED重定向错误之后,需要重新向真正的负责节点再次发送命令;

             c:如果节点A正在迁出槽位,此时收到了客户端的命令,而命令中的key已经迁入到了B节点,则节点A返回给客户端ASK重定向错误,该错误中指明了该key的迁入目的地节点。客户端收到ASK错误之后,需要先向B节点发送”ASKING”命令,然后在向B节点发送该命令。

     

             ASK错误和MOVED错误都会导致客户端转向,它们的区别在于:

             a:MOVED错误代表槽位的负责权已经从一个节点转移到了另一个节点:在客户端收到  关于槽位i的MOVED错误之后,会更新槽位i及其负责节点的对应关系,这样下次遇到关于槽位i的命令请求时,就可以直接将命令请求发送新的负责节点。

             b:ASK错误只是两个节点在迁移槽的过程中使用的一种临时措施:客户端收到关于槽位i的ASK错误之后,客户端只会在接下来的一次命令请求中将关于槽位i的命令请求发送至ASK错误所指示的节点,但这种重定向不会对客户端今后发送关于槽位i的命令请求产生任何影响,客户端之后仍然会将关于槽位i的命令请求发送至目前负责处理该槽位的节点,除非ASK错误再次出现。

     

             在处理客户端命令的函数processCommand中,如果Redis服务器处于集群模式下,在实际执行命令处理函数之前,需要判断当前节点是否能处理该命令中的key,若本节点不能处理该命令,则回复给客户端重定向错误,表示该命令应由其他集群节点处理。

             以下情况下,可以无需判断命令,本节点可以直接处理该命令:

             a:本节点为从节点,该命令是主节点发来的消息;

             b:该命令中不包含key;

             c:LUA客户端发来的命令;

     

             processCommand中的这部分代码如下:

        /* If cluster is enabled perform the cluster redirection here.
         * However we don't perform the redirection if:
         * 1) The sender of this command is our master.
         * 2) The command has no key arguments. */
        if (server.cluster_enabled &&
            !(c->flags & REDIS_MASTER) &&
            !(c->flags & REDIS_LUA_CLIENT &&
              server.lua_caller->flags & REDIS_MASTER) &&
            !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
        {
            int hashslot;
    
            if (server.cluster->state != REDIS_CLUSTER_OK) {
                flagTransaction(c);
                clusterRedirectClient(c,NULL,0,REDIS_CLUSTER_REDIR_DOWN_STATE);
                return REDIS_OK;
            } else {
                int error_code;
                clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
                if (n == NULL || n != server.cluster->myself) {
                    flagTransaction(c);
                    clusterRedirectClient(c,n,hashslot,error_code);
                    return REDIS_OK;
                }
            }
        }

             判断本节点是否能执行该命令的步骤是:

             如果当前集群的状态不是REDIS_CLUSTER_OK,则直接回复给客户端REDIS_CLUSTER_REDIR_DOWN_STATE错误,直接返回;

             否则,调用getNodeByQuery函数,查询能够处理该命令的节点n,如果n为NULL,或者n不是当前节点,则直接回复给客户端相应的错误,直接返回;

             其他情况,说明本节点可以处理该命令;

     

             getNodeByQuery函数是集群模式下,判断当前节点是否能处理客户端命令的函数,本函数还会查找能够处理客户端命令的节点。该函数的代码如下:

    clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
        clusterNode *n = NULL;
        robj *firstkey = NULL;
        int multiple_keys = 0;
        multiState *ms, _ms;
        multiCmd mc;
        int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;
    
        /* Set error code optimistically for the base case. */
        if (error_code) *error_code = REDIS_CLUSTER_REDIR_NONE;
    
        /* We handle all the cases as if they were EXEC commands, so we have
         * a common code path for everything */
        if (cmd->proc == execCommand) {
            /* If REDIS_MULTI flag is not set EXEC is just going to return an
             * error. */
            if (!(c->flags & REDIS_MULTI)) return myself;
            ms = &c->mstate;
        } else {
            /* In order to have a single codepath create a fake Multi State
             * structure if the client is not in MULTI/EXEC state, this way
             * we have a single codepath below. */
            ms = &_ms;
            _ms.commands = &mc;
            _ms.count = 1;
            mc.argv = argv;
            mc.argc = argc;
            mc.cmd = cmd;
        }
    
        /* Check that all the keys are in the same hash slot, and obtain this
         * slot and the node associated. */
        for (i = 0; i < ms->count; i++) {
            struct redisCommand *mcmd;
            robj **margv;
            int margc, *keyindex, numkeys, j;
    
            mcmd = ms->commands[i].cmd;
            margc = ms->commands[i].argc;
            margv = ms->commands[i].argv;
    
            keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
            for (j = 0; j < numkeys; j++) {
                robj *thiskey = margv[keyindex[j]];
                int thisslot = keyHashSlot((char*)thiskey->ptr,
                                           sdslen(thiskey->ptr));
    
                if (firstkey == NULL) {
                    /* This is the first key we see. Check what is the slot
                     * and node. */
                    firstkey = thiskey;
                    slot = thisslot;
                    n = server.cluster->slots[slot];
    
                    /* Error: If a slot is not served, we are in "cluster down"
                     * state. However the state is yet to be updated, so this was
                     * not trapped earlier in processCommand(). Report the same
                     * error to the client. */
                    if (n == NULL) {
                        getKeysFreeResult(keyindex);
                        if (error_code)
                            *error_code = REDIS_CLUSTER_REDIR_DOWN_UNBOUND;
                        return NULL;
                    }
    
                    /* If we are migrating or importing this slot, we need to check
                     * if we have all the keys in the request (the only way we
                     * can safely serve the request, otherwise we return a TRYAGAIN
                     * error). To do so we set the importing/migrating state and
                     * increment a counter for every missing key. */
                    if (n == myself &&
                        server.cluster->migrating_slots_to[slot] != NULL)
                    {
                        migrating_slot = 1;
                    } else if (server.cluster->importing_slots_from[slot] != NULL) {
                        importing_slot = 1;
                    }
                } else {
                    /* If it is not the first key, make sure it is exactly
                     * the same key as the first we saw. */
                    if (!equalStringObjects(firstkey,thiskey)) {
                        if (slot != thisslot) {
                            /* Error: multiple keys from different slots. */
                            getKeysFreeResult(keyindex);
                            if (error_code)
                                *error_code = REDIS_CLUSTER_REDIR_CROSS_SLOT;
                            return NULL;
                        } else {
                            /* Flag this request as one with multiple different
                             * keys. */
                            multiple_keys = 1;
                        }
                    }
                }
    
                /* Migarting / Improrting slot? Count keys we don't have. */
                if ((migrating_slot || importing_slot) &&
                    lookupKeyRead(&server.db[0],thiskey) == NULL)
                {
                    missing_keys++;
                }
            }
            getKeysFreeResult(keyindex);
        }
    
        /* No key at all in command? then we can serve the request
         * without redirections or errors. */
        if (n == NULL) return myself;
    
        /* Return the hashslot by reference. */
        if (hashslot) *hashslot = slot;
    
        /* This request is about a slot we are migrating into another instance?
         * Then if we have all the keys. */
    
        /* If we don't have all the keys and we are migrating the slot, send
         * an ASK redirection. */
        if (migrating_slot && missing_keys) {
            if (error_code) *error_code = REDIS_CLUSTER_REDIR_ASK;
            return server.cluster->migrating_slots_to[slot];
        }
    
        /* If we are receiving the slot, and the client correctly flagged the
         * request as "ASKING", we can serve the request. However if the request
         * involves multiple keys and we don't have them all, the only option is
         * to send a TRYAGAIN error. */
        if (importing_slot &&
            (c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING))
        {
            if (multiple_keys && missing_keys) {
                if (error_code) *error_code = REDIS_CLUSTER_REDIR_UNSTABLE;
                return NULL;
            } else {
                return myself;
            }
        }
    
        /* Handle the read-only client case reading from a slave: if this
         * node is a slave and the request is about an hash slot our master
         * is serving, we can reply without redirection. */
        if (c->flags & REDIS_READONLY &&
            cmd->flags & REDIS_CMD_READONLY &&
            nodeIsSlave(myself) &&
            myself->slaveof == n)
        {
            return myself;
        }
    
        /* Base case: just return the right node. However if this node is not
         * myself, set error_code to MOVED since we need to issue a rediretion. */
        if (n != myself && error_code) *error_code = REDIS_CLUSTER_REDIR_MOVED;
        return n;
    }

             参数c、cmd、argv和argc表示客户端及其发来的命令;参数hashslot为出参,返回命令中key所属的槽位号;参数error_code为出参,出错时设置为相应错误码,成功时设置为REDIS_CLUSTER_REDIR_NONE。该函数返回能够处理该命令的节点,若返回NULL,说明该命令目前无法在集群中执行。

             需要注意的是,如果当前处于事务模式下,则事务中的所有命令中的所有key,需要一起进行判断。对于非事务模式下的命令,也按照事务的方式进行处理,只不过本事务只包含当前一条命令;

     

             首先,如果命令执行函数为execCommand,则说明当前处于事务模式下,并且本条命令是事务中的最后一条命令"EXEC"。事务模式下,在c->mstate中保存了事务中之前的所有命令,因此将ms指向c->mstate。如果客户端没有设置REDIS_MULTI标志,则直接返回myself,表示当前节点能够处理该命令,但是实际上这种情况下,在命令处理函数execCommand中,会直接反馈给客户端"EXEC  without  MULTI"错误;

             如果命令处理函数不是execCommand,则构造伪事务结构ms,其中只包含当前命令这一条命令;

     

             接下来,针对ms中的每一条命令进行判断:调用getKeysFromCommand函数,从命令中得到所有key的索引,保存在数组keyindex中,以及key的个数numkeys;

             接下来就循环处理本条命令中的所有key:

             首先调用keyHashSlot函数,计算该key所属的槽位号thisslot;

             如果该key是命令中的第一个key,则用firstkey记录该key,用slot记录该key所属的槽位号;然后从server.cluster->slots中取得负责该槽位的节点n,如果n为NULL,则说明该槽位没有节点负责,集群目前处于下线状态,因此设置error_code为REDIS_CLUSTER_REDIR_DOWN_UNBOUND,并且返回NULL;如果节点n就是当前节点,并且当前节点正在迁出该槽位,则设置migrating_slot为1;否则如果当前节点正在迁入该槽位,则设置importing_slot为1;

             如果该key不是命令中的第一个key,则只要该key与第一个key内容不同,就比较该key所属的槽位是否与第一个key的槽位一致,若不一致,则设置错误码为REDIS_CLUSTER_REDIR_CROSS_SLOT,并返回NULL;若一致,则置multiple_keys为1;

             如果当前节点正在迁入或者迁出该槽位,并且在0号数据库中找不到该key,则增加missing_keys的值;

     

             遍历完所有命令的所有key后,走到现在,能保证所有key都属于同一个槽位slot,该槽位由节点n负责处理。接下来接着进行判断:

             如果n为NULL,说明所有命令中都不包含任何key,因此返回myself,表示当前节点可以处理该命令;

             将slot保存到出参hashslot中;

             如果当前节点正在迁出槽位,并且命令中的key有的已经不再当前节点中了,则设置错误码为REDIS_CLUSTER_REDIR_ASK,并返回该槽位所迁出的目的地节点;

             如果当前节点正在迁入槽位,并且客户端具有ASKING标记(客户端之前发来过”ASKING”命令)或者该命令本身就具有ASKING标记(”RESTORE-ASKING”命令),则只有在涉及多个key,并且有的key不在当前节点中的情况下,才设置错误码为REDIS_CLUSTER_REDIR_UNSTABLE,并返回NULL;否则,返回当前节点;

             以上两条判断条件,保证了当命令中只有一个key时,写(新增key)命令需直接写入到迁入节点中,读命令需在具有key的节点中读取;当涉及多个key时,写(新增key)命令既无法在迁出节点中执行,也无法在迁入节点中执行,读命令需在具有所有key的节点中读取;(亲测)

     

             如果当前节点正好为n节点的从节点,而且客户端是只读客户端,并且该命令是只读命令,则返回当前节点;

             其他情况下,如果当前节点不是n节点,则设置错误码为REDIS_CLUSTER_REDIR_MOVED,并返回节点n。

     

  • 相关阅读:
    Struts2:<s:action>的使用
    Struts2:Struts2在jsp中使用标签时值的获取
    jsp:useBean的使用
    关于Filter的一点误解
    Strust2: 工作流程
    java程序连接MySQL数据库
    python 开发工具简介
    NCEP CFSR数据下载
    美国NOAA/AVHRR遥感数据
    气象网站
  • 原文地址:https://www.cnblogs.com/gqtcgq/p/7247043.html
Copyright © 2011-2022 走看看