zoukankan      html  css  js  c++  java
  • redis学习笔记(六): processCommand

    在看它的command处理之前,先说一下redis中C/S交互的流程(不知道怎么用图来表示流程,先码在这里):

    1. 在initServer中调用aeCreateFileEvent给tcp listen socket注册 acceptTcpHandler 做为rfileProc
    2. 有客户端连接过来时,在aeApiPoll中,listen套接字上来了可读事件,调用其注册的rfileProc,也就是acceptTcpHandler
    3. 在acceptTcpHandler的处理当中会调用createClient,它除了分配新的redisClient结构之外,还会调用aeCreateFileEvent为新的fd注册可读事件上的rfileProc: readQueryFromClient
    4. 当客户端发过来请求时,aeApiPoll中,client套接字上来了可读事件,调用rfileProc也就是readQueryFromClient读取client的请求
    5. 在处理完请求之后(一般是执行相应的命令),将需要返回给client的内容放在redisClient结构中的buf成员中,然后将它对应的fd以可写事件加入poll中,对应的callback为sendReplyToClient
    6. 下一次aeApiPoll时,可写事件就绪,就会调用sendReplyToClient发回给客户
    所以命令的接收跟命令的执行(queue也算是执行)是按顺序执行的,给client的回复是异步做的
    所以对于同一个client,尽可能多地收集回复再一次性发出去能减少网络I/O的次数

    命令交互的主要入口函数是readQueryFromClient,它从套接字中读取内容、再处理成argc/argv的格式、最后去执行:

    void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
        redisClient *c = (redisClient*) privdata;
        int nread, readlen;
        size_t qblen;
        REDIS_NOTUSED(el);
        REDIS_NOTUSED(mask);
    
        /* 设置server.current_client,没有多线程的影响? */
        server.current_client = c;
        readlen = REDIS_IOBUF_LEN;
        /* If this is a multi bulk request, and we are processing a bulk reply
         * that is large enough, try to maximize the probability that the query
         * buffer contains exactly the SDS string representing the object, even
         * at the risk of requiring more read(2) calls. This way the function
         * processMultiBulkBuffer() can avoid copying buffers to create the
         * Redis Object representing the argument. */
        /* client第一次来请求时,reqtype是0,不会进下面这个判断
         * 后面的processInputBuffer中会把reqtype设置为REDIS_REQ_MULTIBULK,所以这个函数会进来多次?从调用流程上来看,有可能是因为内容太多,一次read没能完全读取完?
         */
        if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
            && c->bulklen >= REDIS_MBULK_BIG_ARG)
        {
            int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
    
            if (remaining < readlen) readlen = remaining;
        }
    
        /* 初始情况长度为0 */
        qblen = sdslen(c->querybuf);
        if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
        /* 接收buf的大小最大为REDIS_IOBUF_LEN */
        c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
        /* 读取数据 */
        nread = read(fd, c->querybuf+qblen, readlen);
        if (nread == -1) {
            if (errno == EAGAIN) {
                nread = 0;
            } else {
                redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
                freeClient(c);
                return;
            }
        } else if (nread == 0) {
            redisLog(REDIS_VERBOSE, "Client closed connection");
            freeClient(c);
            return;
        }
        if (nread) {
            /* 更新querybuf的len, free字段,并在内容的最后加'' */
            sdsIncrLen(c->querybuf,nread);
            /* 记录最后一次交互的时间 */
            c->lastinteraction = server.unixtime;
            /* master? */
            if (c->flags & REDIS_MASTER) c->reploff += nread;
            server.stat_net_input_bytes += nread;
        } else {
            server.current_client = NULL;
            return;
        }
        /* 如果请求内容的长度超过了最大长度的限制,记录client info和请求长度,释放client并返回 */
        if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
            sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
    
            bytes = sdscatrepr(bytes,c->querybuf,64);
            redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
            sdsfree(ci);
            sdsfree(bytes);
            freeClient(c);
            return;
        }
        processInputBuffer(c);
        server.current_client = NULL;
    }

    它最后调用processInputBuffer对接收到的内容进行处理

    void processInputBuffer(redisClient *c) {
        /* Keep processing while there is something in the input buffer */
        /* 从这个循环来看,querybuf里如果是'*'开头的multi-bulk格式的内容,则一定会解析完成?不存在没有读取完整数据的情况? */
        while(sdslen(c->querybuf)) {
            /* Return if clients are paused. */
            if (!(c->flags & REDIS_SLAVE) && clientsArePaused()) return;
    
            /* Immediately abort if the client is in the middle of something. */
            if (c->flags & REDIS_BLOCKED) return;
    
            /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
             * written to the client. Make sure to not let the reply grow after
             * this flag has been set (i.e. don't process more commands). */
            if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
    
            /* Determine request type when unknown. */
            /* 只要这里设置过一次reqtype,除非调用resetClient,否则不会再进入这个判断里面 
             * 但是,不是以'*'开头的命令格式会是什么样的命令?
             */
            if (!c->reqtype) {            
                if (c->querybuf[0] == '*') {
                    c->reqtype = REDIS_REQ_MULTIBULK;
                } else {
                    c->reqtype = REDIS_REQ_INLINE;
                }
            }
    
            if (c->reqtype == REDIS_REQ_INLINE) {
                if (processInlineBuffer(c) != REDIS_OK) break;
            } else if (c->reqtype == REDIS_REQ_MULTIBULK) {
                if (processMultibulkBuffer(c) != REDIS_OK) break;
            } else {
                redisPanic("Unknown request type");
            }
    
            /* Multibulk processing could see a <= 0 length. */
            /* */
            if (c->argc == 0) {
                resetClient(c);
            } else {
                /* Only reset the client when the command was executed. */
                if (processCommand(c) == REDIS_OK)
                    resetClient(c);
            }
        }
    }

    processMultibulkBuffer就是主要的解析过程。对于set a 1这条命令,server端收到的内容应该是: *3 $3 set $1 a $1 1 :

    int processMultibulkBuffer(redisClient *c) {
        char *newline = NULL;
        int pos = 0, ok;
        long long ll;
    
        /* 第一次进来的时候,这个值应该都是0 */
        if (c->multibulklen == 0) {
            /* The client should have been reset */
            redisAssertWithInfo(c,NULL,c->argc == 0);
    
            /* 一定要有
    才能进行multi-bulk解析 */
            /* Multi bulk length cannot be read without a 
     */
            /* 先找到
     */
            newline = strchr(c->querybuf,'
    ');
            /* 如果没有
    ,就返回错误*/
            if (newline == NULL) {
                if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
                    addReplyError(c,"Protocol error: too big mbulk count string");
                    setProtocolError(c,0);
                }
                return REDIS_ERR;
            }
    
            /* Buffer should also contain 
     */
            /* 如果第一个
    之前数据部分的长度大于整个读取内容长度减2,就返回错误 
             * 为什么长度的判断能确保有一个
    ?
             */
            if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
                return REDIS_ERR;
    
            /* We know for sure there is a whole line since newline != NULL,
             * so go ahead and find out the multi bulk length. */
            redisAssertWithInfo(c,NULL,c->querybuf[0] == '*');
            /* 解析'*'号之后的数字,表示这一个bulk的数量 */
            ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
            if (!ok || ll > 1024*1024) {
                addReplyError(c,"Protocol error: invalid multibulk length");
                setProtocolError(c,pos);
                return REDIS_ERR;
            }
    
            /* 用长度记录下一行的开始位置 */
            pos = (newline-c->querybuf)+2;
            if (ll <= 0) {
                sdsrange(c->querybuf,pos,-1);
                return REDIS_OK;
            }
    
            /* 这一块bulk的数量 */
            c->multibulklen = ll;
    
            /* Setup argv array on client structure */
            /* 释放之前的argv?只有multibulklen为0才一定会走到这里。 */
            if (c->argv) zfree(c->argv);
            /* 重新分配argv的空间 */
            c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
        }
    
        redisAssertWithInfo(c,NULL,c->multibulklen > 0);
        /* 开始解析每一个bulk的数据。*/
        while(c->multibulklen) {
            /* Read bulk length if unknown */
            /* bulklen等于-1说明解析完了一个bulk */
            if (c->bulklen == -1) {
                /* pos记录了上一行的结尾部分,这里开始处理下一行 */
                newline = strchr(c->querybuf+pos,'
    ');
                /* 如果后面没有再找到
    ,认为是处理完成,跳出循环 */
                if (newline == NULL) {
                    if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
                        addReplyError(c,
                            "Protocol error: too big bulk count string");
                        setProtocolError(c,0);
                        return REDIS_ERR;
                    }
                    break;
                }
    
                /* Buffer should also contain 
     */
                if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
                    break;
    
                /* 一定是以'$'开头 */
                if (c->querybuf[pos] != '$') {
                    addReplyErrorFormat(c,
                        "Protocol error: expected '$', got '%c'",
                        c->querybuf[pos]);
                    setProtocolError(c,pos);
                    return REDIS_ERR;
                }
    
                /* 解析'$'后面跟的数字,表示后接的参数(字符串表示)的长度 */
                ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
                if (!ok || ll < 0 || ll > 512*1024*1024) {
                    addReplyError(c,"Protocol error: invalid bulk length");
                    setProtocolError(c,pos);
                    return REDIS_ERR;
                }
    
                /* 记录下一行的位置 */
                pos += newline-(c->querybuf+pos)+2;
                /* 如果这个参数的长度大于32k,则进行一些特殊处理 */
                if (ll >= REDIS_MBULK_BIG_ARG) {
                    size_t qblen;
    
                    /* If we are going to read a large object from network
                     * try to make it likely that it will start at c->querybuf
                     * boundary so that we can optimize object creation
                     * avoiding a large copy of data. */
                    /* 截取下一行一直到末尾的子串 */ 
                    sdsrange(c->querybuf,pos,-1);
                    pos = 0;
                    /* 子串的长度 */
                    qblen = sdslen(c->querybuf);
                    /* Hint the sds library about the amount of bytes this string is
                     * going to contain. */
                    /* 如果子串长度小于'$'后面指示的长度,说明这一次没有读取完数据,因此在querybuf上make room,确保下一次读取完整bulk的数据 */
                    if (qblen < (size_t)ll+2)
                        c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-qblen);
                }
                /* 这一个参数的长度 */
                c->bulklen = ll;
            }
    
            /* Read bulk argument */
            /* 如果剩下的部分的长度小于待解析参数的长度,表明数据不完整,跳出 */
            if (sdslen(c->querybuf)-pos < (unsigned)(c->bulklen+2)) {
                /* Not enough data (+2 == trailing 
    ) */
                break;
            } else {
                /* Optimization: if the buffer contains JUST our bulk element
                 * instead of creating a new object by *copying* the sds we
                 * just use the current sds string. */
                /* 如果是超长的参数,并且整个querybuf都只包含这一个参数,则使用createObject,否则,使用createStringObject */
                if (pos == 0 &&
                    c->bulklen >= REDIS_MBULK_BIG_ARG &&
                    (signed) sdslen(c->querybuf) == c->bulklen+2)
                {
                    c->argv[c->argc++] = createObject(REDIS_STRING,c->querybuf);
                    sdsIncrLen(c->querybuf,-2); /* remove CRLF */
                    /* createObject是直接使用了querybuf表示的空间,所以下面需要再创造出另一个相同长度的空的buffer */
                    c->querybuf = sdsempty();
                    /* Assume that if we saw a fat argument we'll see another one
                     * likely... */
                    c->querybuf = sdsMakeRoomFor(c->querybuf,c->bulklen+2);
                    pos = 0;
                } else {
                    c->argv[c->argc++] = createStringObject(c->querybuf+pos,c->bulklen);
                    pos += c->bulklen+2;
                }
                /* 这一块处理完成 */
                c->bulklen = -1;
                c->multibulklen--;
            }
        }
    
        /* Trim to pos */
        /* 如果pos非0,截取剩下的部分 */
        if (pos) sdsrange(c->querybuf,pos,-1);
    
        /* We're done when c->multibulk == 0 */
        /* 所有bulk数据都处理完成 */
        if (c->multibulklen == 0) return REDIS_OK;
    
        /* Still not read to process the command */
        /* 如果是由于数据没有读取完整,也会返回err? */
        return REDIS_ERR;
    }

    processMultibulkBuffer解析完成之后,redisClient中argc和argv就已经有了所有参数的信息了。

    解析完了命令内容之后,接下来,processInputBuffer会调用processCommand进行处理。processCommand的大部分工作是做一些检查工作,以确保当前的命令是可以被执行的。

    每一步检查工作的目的,代码中的注释也写的比较详细了,其实现如下:

    /* If this function gets called we already read a whole
     * command, arguments are in the client argv/argc fields.
     * processCommand() execute the command or prepare the
     * server for a bulk read from the client.
     *
     * If 1 is returned the client is still alive and valid and
     * other operations can be performed by the caller. Otherwise
     * if 0 is returned the client was destroyed (i.e. after QUIT). */
    int processCommand(redisClient *c) {
        /* The QUIT command is handled separately. Normal command procs will
         * go through checking for replication and QUIT will cause trouble
         * when FORCE_REPLICATION is enabled and would be implemented in
         * a regular command proc. */
        /* 在这个调用流程上,如果processCommand返回了REDIS_OK,client会被reset掉,所以这里只是打上REDIS_CLOSE_AFTER_REPLY的标记并返回REDIS_ERR */
        if (!strcasecmp(c->argv[0]->ptr,"quit")) {
            addReply(c,shared.ok);
            c->flags |= REDIS_CLOSE_AFTER_REPLY;
            return REDIS_ERR;
        }
    
        /* Now lookup the command and check ASAP about trivial error conditions
         * such as wrong arity, bad command name and so forth. */
        c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
        /* 由于一次只会处理一条命令,所以c->cmd->arity大于0的话则一定会等于c->argc 
         * 如果arity小于0,说明该命令的参数个数至少是-arity个
         */
        if (!c->cmd) {
            flagTransaction(c);
            addReplyErrorFormat(c,"unknown command '%s'",
                (char*)c->argv[0]->ptr);
            return REDIS_OK;
        } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
                   (c->argc < -c->cmd->arity)) {
            flagTransaction(c);
            addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
                c->cmd->name);
            return REDIS_OK;
        }
    
        /* Check if the user is authenticated */
        /* 要求认证 */
        if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
        {
            flagTransaction(c);
            addReply(c,shared.noautherr);
            return REDIS_OK;
        }
    
        /* If cluster is enabled perform the cluster redirection here.
         * However we don't perform the redirection if:
         * 1) The sender of this command is our master.
         * 2) The command has no key arguments. */
        if (server.cluster_enabled &&
            !(c->flags & REDIS_MASTER) &&
            !(c->flags & REDIS_LUA_CLIENT &&
              server.lua_caller->flags & REDIS_MASTER) &&
            !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
        {
            int hashslot;
    
            if (server.cluster->state != REDIS_CLUSTER_OK) {
                flagTransaction(c);
                clusterRedirectClient(c,NULL,0,REDIS_CLUSTER_REDIR_DOWN_STATE);
                return REDIS_OK;
            } else {
                int error_code;
                clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
                if (n == NULL || n != server.cluster->myself) {
                    flagTransaction(c);
                    clusterRedirectClient(c,n,hashslot,error_code);
                    return REDIS_OK;
                }
            }
        }
    
        /* Handle the maxmemory directive.
         *
         * First we try to free some memory if possible (if there are volatile
         * keys in the dataset). If there are not the only thing we can do
         * is returning an error. */
        if (server.maxmemory) {
            int retval = freeMemoryIfNeeded();
            /* freeMemoryIfNeeded may flush slave output buffers. This may result
             * into a slave, that may be the active client, to be freed. */
            if (server.current_client == NULL) return REDIS_ERR;
    
            /* It was impossible to free enough memory, and the command the client
             * is trying to execute is denied during OOM conditions? Error. */
            if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
                flagTransaction(c);
                addReply(c, shared.oomerr);
                return REDIS_OK;
            }
        }
    
        /* Don't accept write commands if there are problems persisting on disk
         * and if this is a master instance. */
        if (((server.stop_writes_on_bgsave_err &&
              server.saveparamslen > 0 &&
              server.lastbgsave_status == REDIS_ERR) ||
              server.aof_last_write_status == REDIS_ERR) &&
            server.masterhost == NULL &&
            (c->cmd->flags & REDIS_CMD_WRITE ||
             c->cmd->proc == pingCommand))
        {
            flagTransaction(c);
            if (server.aof_last_write_status == REDIS_OK)
                addReply(c, shared.bgsaveerr);
            else
                addReplySds(c,
                    sdscatprintf(sdsempty(),
                    "-MISCONF Errors writing to the AOF file: %s
    ",
                    strerror(server.aof_last_write_errno)));
            return REDIS_OK;
        }
    
        /* Don't accept write commands if there are not enough good slaves and
         * user configured the min-slaves-to-write option. */
        if (server.masterhost == NULL &&
            server.repl_min_slaves_to_write &&
            server.repl_min_slaves_max_lag &&
            c->cmd->flags & REDIS_CMD_WRITE &&
            server.repl_good_slaves_count < server.repl_min_slaves_to_write)
        {
            flagTransaction(c);
            addReply(c, shared.noreplicaserr);
            return REDIS_OK;
        }
    
        /* Don't accept write commands if this is a read only slave. But
         * accept write commands if this is our master. */
        if (server.masterhost && server.repl_slave_ro &&
            !(c->flags & REDIS_MASTER) &&
            c->cmd->flags & REDIS_CMD_WRITE)
        {
            addReply(c, shared.roslaveerr);
            return REDIS_OK;
        }
    
        /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
        if (c->flags & REDIS_PUBSUB &&
            c->cmd->proc != pingCommand &&
            c->cmd->proc != subscribeCommand &&
            c->cmd->proc != unsubscribeCommand &&
            c->cmd->proc != psubscribeCommand &&
            c->cmd->proc != punsubscribeCommand) {
            addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
            return REDIS_OK;
        }
    
        /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and
         * we are a slave with a broken link with master. */
        if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED &&
            server.repl_serve_stale_data == 0 &&
            !(c->cmd->flags & REDIS_CMD_STALE))
        {
            flagTransaction(c);
            addReply(c, shared.masterdownerr);
            return REDIS_OK;
        }
    
        /* Loading DB? Return an error if the command has not the
         * REDIS_CMD_LOADING flag. */
        if (server.loading && !(c->cmd->flags & REDIS_CMD_LOADING)) {
            addReply(c, shared.loadingerr);
            return REDIS_OK;
        }
    
        /* Lua script too slow? Only allow a limited number of commands. */
        if (server.lua_timedout &&
              c->cmd->proc != authCommand &&
              c->cmd->proc != replconfCommand &&
            !(c->cmd->proc == shutdownCommand &&
              c->argc == 2 &&
              tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
            !(c->cmd->proc == scriptCommand &&
              c->argc == 2 &&
              tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
        {
            flagTransaction(c);
            addReply(c, shared.slowscripterr);
            return REDIS_OK;
        }
    
        /* Exec the command */
        if (c->flags & REDIS_MULTI &&
            c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
            c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
        {
            queueMultiCommand(c);
            addReply(c,shared.queued);
        } else {
            call(c,REDIS_CALL_FULL);
            c->woff = server.master_repl_offset;
            if (listLength(server.ready_keys))
                handleClientsBlockedOnLists();
        }
        return REDIS_OK;
    }
  • 相关阅读:
    HDU 1312 Red and Black(经典DFS)
    POJ 1274 The Perfect Stall(二分图 && 匈牙利 && 最小点覆盖)
    POJ 3041 Asteroids(二分图 && 匈牙利算法 && 最小点覆盖)
    HDU 1016 素数环(dfs + 回溯)
    HDU 1035 Robot Motion(dfs + 模拟)
    vjudge Trailing Zeroes (III) (二分答案 && 数论)
    openjudge 和为给定数(二分答案)
    图的存储
    二分查找
    快速选择算法
  • 原文地址:https://www.cnblogs.com/flypighhblog/p/7763392.html
Copyright © 2011-2022 走看看