zoukankan      html  css  js  c++  java
  • redis 处理命令的过程

    redis版本:redis-3.2.9

    在客户端输入 set name zhang,调试redis服务器,得到调用栈如下:

    在dictReplace中加了断点,结果跳出来4个线程,redis还是单进程单线程吗?

    上图的调用栈漏了一个栈帧:aeProcessEvents -> (networking.c) readQueryFromClient -> (networking.c) processInputBuffer

    aeMain 事件循环

    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);
            aeProcessEvents(eventLoop, AE_ALL_EVENTS);
        }
    }

    aeProcessEvents 先处理文件事件(使用 epoll 选择文件事件),再处理时间事件

    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
        int processed = 0, numevents;
    
        /* Nothing to do? return ASAP */
        if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    
        /* Note that we want call select() even if there are no
         * file events to process as long as we want to process time
         * events, in order to sleep until the next time event is ready
         * to fire. */
        if (eventLoop->maxfd != -1 ||
            ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
            int j;
            aeTimeEvent *shortest = NULL;
            struct timeval tv, *tvp;
    
            if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
                shortest = aeSearchNearestTimer(eventLoop);
            if (shortest) {
                long now_sec, now_ms;
    
                aeGetTime(&now_sec, &now_ms);
                tvp = &tv;
    
                /* How many milliseconds we need to wait for the next
                 * time event to fire? */
                long long ms =
                    (shortest->when_sec - now_sec)*1000 +
                    shortest->when_ms - now_ms;
    
                if (ms > 0) {
                    tvp->tv_sec = ms/1000;
                    tvp->tv_usec = (ms % 1000)*1000;
                } else {
                    tvp->tv_sec = 0;
                    tvp->tv_usec = 0;
                }
            } else {
                /* If we have to check for events but need to return
                 * ASAP because of AE_DONT_WAIT we need to set the timeout
                 * to zero */
                if (flags & AE_DONT_WAIT) {
                    tv.tv_sec = tv.tv_usec = 0;
                    tvp = &tv;
                } else {
                    /* Otherwise we can block */
                    tvp = NULL; /* wait forever */
                }
            }
    
            numevents = aeApiPoll(eventLoop, tvp);
            for (j = 0; j < numevents; j++) {
                aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
                int mask = eventLoop->fired[j].mask;
                int fd = eventLoop->fired[j].fd;
                int rfired = 0;
    
            /* note the fe->mask & mask & ... code: maybe an already processed
                 * event removed an element that fired and we still didn't
                 * processed, so we check if the event is still valid. */
                if (fe->mask & mask & AE_READABLE) {
                    rfired = 1;
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                }
                if (fe->mask & mask & AE_WRITABLE) {
                    if (!rfired || fe->wfileProc != fe->rfileProc)
                        fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                }
                processed++;
            }
        }
        /* Check time events */
        if (flags & AE_TIME_EVENTS)
            processed += processTimeEvents(eventLoop);
    
        return processed; /* return the number of processed file/time events */
    }

    readQueryFromClient 客户端的命令通过网络传输到达 server,读取命令,设置到 client 的 querybuf 中

    void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
        client *c = (client*) privdata;
        int nread, readlen;
        size_t qblen;
        UNUSED(el);
        UNUSED(mask);
    
        readlen = PROTO_IOBUF_LEN;
        /* If this is a multi bulk request, and we are processing a bulk reply
         * that is large enough, try to maximize the probability that the query
         * buffer contains exactly the SDS string representing the object, even
         * at the risk of requiring more read(2) calls. This way the function
         * processMultiBulkBuffer() can avoid copying buffers to create the
         * Redis Object representing the argument. */
        if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
            && c->bulklen >= PROTO_MBULK_BIG_ARG)
        {
            int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
    
            if (remaining < readlen) readlen = remaining;
        }
    
        qblen = sdslen(c->querybuf);
        if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
        c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
        nread = read(fd, c->querybuf+qblen, readlen);
        if (nread == -1) {
            if (errno == EAGAIN) {
                return;
            } else {
                serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
                freeClient(c);
                return;
            }
        } else if (nread == 0) {
            serverLog(LL_VERBOSE, "Client closed connection");
            freeClient(c);
            return;
        }
    
        sdsIncrLen(c->querybuf,nread);
        c->lastinteraction = server.unixtime;
        if (c->flags & CLIENT_MASTER) c->reploff += nread;
        server.stat_net_input_bytes += nread;
        if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
            sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
    
            bytes = sdscatrepr(bytes,c->querybuf,64);
            serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
            sdsfree(ci);
            sdsfree(bytes);
            freeClient(c);
            return;
        }
        processInputBuffer(c);
    }

    processInputBuffer 客户端的命令全部保存在 client 的 querybuf 属性中,可能包含多条命令,例如管道,从 querybuf 中逐条解析命令,并设置 client 的 argc 和 argv 属性。

    一条命令对应一个 argc 和 argv。例如 get name,此时 argv[0] 为 get,argv[1] 为 name。

    void processInputBuffer(client *c) {
        server.current_client = c;
        /* Keep processing while there is something in the input buffer */
        while(sdslen(c->querybuf)) {
            /* Return if clients are paused. */
            if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
    
            /* Immediately abort if the client is in the middle of something. */
            if (c->flags & CLIENT_BLOCKED) break;
    
            /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
             * written to the client. Make sure to not let the reply grow after
             * this flag has been set (i.e. don't process more commands).
             *
             * The same applies for clients we want to terminate ASAP. */
            if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
    
            /* Determine request type when unknown. */
            if (!c->reqtype) {
                if (c->querybuf[0] == '*') {
                    c->reqtype = PROTO_REQ_MULTIBULK;
                } else {
                    c->reqtype = PROTO_REQ_INLINE;
                }
            }
    
            if (c->reqtype == PROTO_REQ_INLINE) {
                if (processInlineBuffer(c) != C_OK) break;
            } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
                if (processMultibulkBuffer(c) != C_OK) break;
            } else {
                serverPanic("Unknown request type");
            }
    
            /* Multibulk processing could see a <= 0 length. */
            if (c->argc == 0) {
                resetClient(c);
            } else {
                /* Only reset the client when the command was executed. */
                if (processCommand(c) == C_OK)
                    resetClient(c);
                /* freeMemoryIfNeeded may flush slave output buffers. This may result
                 * into a slave, that may be the active client, to be freed. */
                if (server.current_client == NULL) break;
            }
        }
        server.current_client = NULL;
    }

    processCommand 根据命令名(即 argv[0])寻找对应的命令函数,给 c->cmd 赋值,以 'get name' 为例,c->cmd = {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0}

    int processCommand(client *c) {
        /* The QUIT command is handled separately. Normal command procs will
         * go through checking for replication and QUIT will cause trouble
         * when FORCE_REPLICATION is enabled and would be implemented in
         * a regular command proc. */
        if (!strcasecmp(c->argv[0]->ptr,"quit")) {
            addReply(c,shared.ok);
            c->flags |= CLIENT_CLOSE_AFTER_REPLY;
            return C_ERR;
        }
    
        /* Now lookup the command and check ASAP about trivial error conditions
         * such as wrong arity, bad command name and so forth. */
        c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
        if (!c->cmd) {
            flagTransaction(c);
            addReplyErrorFormat(c,"unknown command '%s'",
                (char*)c->argv[0]->ptr);
            return C_OK;
        } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
                   (c->argc < -c->cmd->arity)) {
            flagTransaction(c);
            addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
                c->cmd->name);
            return C_OK;
        }
    
        /* Check if the user is authenticated */
        if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
        {
            flagTransaction(c);
            addReply(c,shared.noautherr);
            return C_OK;
        }
    
        /* If cluster is enabled perform the cluster redirection here.
         * However we don't perform the redirection if:
         * 1) The sender of this command is our master.
         * 2) The command has no key arguments. */
        if (server.cluster_enabled &&
            !(c->flags & CLIENT_MASTER) &&
            !(c->flags & CLIENT_LUA &&
              server.lua_caller->flags & CLIENT_MASTER) &&
            !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
              c->cmd->proc != execCommand))
        {
            int hashslot;
            int error_code;
            clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
                                            &hashslot,&error_code);
            if (n == NULL || n != server.cluster->myself) {
                if (c->cmd->proc == execCommand) {
                    discardTransaction(c);
                } else {
                    flagTransaction(c);
                }
                clusterRedirectClient(c,n,hashslot,error_code);
                return C_OK;
            }
        }
    
        /* Handle the maxmemory directive.
         *
         * First we try to free some memory if possible (if there are volatile
         * keys in the dataset). If there are not the only thing we can do
         * is returning an error. */
        if (server.maxmemory) {
            int retval = freeMemoryIfNeeded();
            /* freeMemoryIfNeeded may flush slave output buffers. This may result
             * into a slave, that may be the active client, to be freed. */
            if (server.current_client == NULL) return C_ERR;
    
            /* It was impossible to free enough memory, and the command the client
             * is trying to execute is denied during OOM conditions? Error. */
            if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) {
                flagTransaction(c);
                addReply(c, shared.oomerr);
                return C_OK;
            }
        }
    
        /* Don't accept write commands if there are problems persisting on disk
         * and if this is a master instance. */
        if (((server.stop_writes_on_bgsave_err &&
              server.saveparamslen > 0 &&
              server.lastbgsave_status == C_ERR) ||
              server.aof_last_write_status == C_ERR) &&
            server.masterhost == NULL &&
            (c->cmd->flags & CMD_WRITE ||
             c->cmd->proc == pingCommand))
        {
            flagTransaction(c);
            if (server.aof_last_write_status == C_OK)
                addReply(c, shared.bgsaveerr);
            else
                addReplySds(c,
                    sdscatprintf(sdsempty(),
                    "-MISCONF Errors writing to the AOF file: %s
    ",
                    strerror(server.aof_last_write_errno)));
            return C_OK;
        }
    
        /* Don't accept write commands if there are not enough good slaves and
         * user configured the min-slaves-to-write option. */
        if (server.masterhost == NULL &&
            server.repl_min_slaves_to_write &&
            server.repl_min_slaves_max_lag &&
            c->cmd->flags & CMD_WRITE &&
            server.repl_good_slaves_count < server.repl_min_slaves_to_write)
        {
            flagTransaction(c);
            addReply(c, shared.noreplicaserr);
            return C_OK;
        }
    
        /* Don't accept write commands if this is a read only slave. But
         * accept write commands if this is our master. */
        if (server.masterhost && server.repl_slave_ro &&
            !(c->flags & CLIENT_MASTER) &&
            c->cmd->flags & CMD_WRITE)
        {
            addReply(c, shared.roslaveerr);
            return C_OK;
        }
    
        /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
        if (c->flags & CLIENT_PUBSUB &&
            c->cmd->proc != pingCommand &&
            c->cmd->proc != subscribeCommand &&
            c->cmd->proc != unsubscribeCommand &&
            c->cmd->proc != psubscribeCommand &&
            c->cmd->proc != punsubscribeCommand) {
            addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
            return C_OK;
        }
    
        /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and
         * we are a slave with a broken link with master. */
        if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
            server.repl_serve_stale_data == 0 &&
            !(c->cmd->flags & CMD_STALE))
        {
            flagTransaction(c);
            addReply(c, shared.masterdownerr);
            return C_OK;
        }
    
        /* Loading DB? Return an error if the command has not the
         * CMD_LOADING flag. */
        if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
            addReply(c, shared.loadingerr);
            return C_OK;
        }
    
        /* Lua script too slow? Only allow a limited number of commands. */
        if (server.lua_timedout &&
              c->cmd->proc != authCommand &&
              c->cmd->proc != replconfCommand &&
            !(c->cmd->proc == shutdownCommand &&
              c->argc == 2 &&
              tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
            !(c->cmd->proc == scriptCommand &&
              c->argc == 2 &&
              tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
        {
            flagTransaction(c);
            addReply(c, shared.slowscripterr);
            return C_OK;
        }
    
        /* Exec the command */
        if (c->flags & CLIENT_MULTI &&
            c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
            c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
        {
            queueMultiCommand(c);
            addReply(c,shared.queued);
        } else {
            call(c,CMD_CALL_FULL);
            c->woff = server.master_repl_offset;
            if (listLength(server.ready_keys))
                handleClientsBlockedOnLists();
        }
        return C_OK;
    }

    redis 所有的命令存硬编码在一个数组中,数组元素包含命令名,执行函数等属性

    struct redisCommand redisCommandTable[] = {
        {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
        {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
        {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
        {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
        {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
        {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},
        {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0},
        {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},
        {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0},
        {"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0},
        {"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0},
        {"bitfield",bitfieldCommand,-2,"wm",0,NULL,1,1,1,0,0},
        {"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0},
        {"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
        {"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
        {"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0},
        {"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0},
        {"mget",mgetCommand,-2,"r",0,NULL,1,-1,1,0,0},
        {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
        {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
        {"rpushx",rpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
        {"lpushx",lpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
        {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0},
        {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0},
        {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0},
        {"brpop",brpopCommand,-3,"ws",0,NULL,1,1,1,0,0},
        {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0},
        {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},
        {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0},
        {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0},
        {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0},
        {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0},
        {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0},
        {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0},
        {"rpoplpush",rpoplpushCommand,3,"wm",0,NULL,1,2,1,0,0},
        {"sadd",saddCommand,-3,"wmF",0,NULL,1,1,1,0,0},
        {"srem",sremCommand,-3,"wF",0,NULL,1,1,1,0,0},
        {"smove",smoveCommand,4,"wF",0,NULL,1,2,1,0,0},
        {"sismember",sismemberCommand,3,"rF",0,NULL,1,1,1,0,0},
        {"scard",scardCommand,2,"rF",0,NULL,1,1,1,0,0},
        {"spop",spopCommand,-2,"wRF",0,NULL,1,1,1,0,0},
        {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0},
        {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},
        {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
        {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},
        {"sunionstore",sunionstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
        {"sdiff",sdiffCommand,-2,"rS",0,NULL,1,-1,1,0,0},
        {"sdiffstore",sdiffstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
        {"smembers",sinterCommand,2,"rS",0,NULL,1,1,1,0,0},
        {"sscan",sscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
        {"zadd",zaddCommand,-4,"wmF",0,NULL,1,1,1,0,0},
        {"zincrby",zincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0},
        {"zrem",zremCommand,-3,"wF",0,NULL,1,1,1,0,0},
        {"zremrangebyscore",zremrangebyscoreCommand,4,"w",0,NULL,1,1,1,0,0},
        {"zremrangebyrank",zremrangebyrankCommand,4,"w",0,NULL,1,1,1,0,0},
        {"zremrangebylex",zremrangebylexCommand,4,"w",0,NULL,1,1,1,0,0},
        {"zunionstore",zunionstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},
        {"zinterstore",zinterstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},
        {"zrange",zrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
        {"zrangebyscore",zrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0},
        {"zrevrangebyscore",zrevrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0},
        {"zrangebylex",zrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0},
        {"zrevrangebylex",zrevrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0},
        {"zcount",zcountCommand,4,"rF",0,NULL,1,1,1,0,0},
        {"zlexcount",zlexcountCommand,4,"rF",0,NULL,1,1,1,0,0},
        {"zrevrange",zrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
        {"zcard",zcardCommand,2,"rF",0,NULL,1,1,1,0,0},
        {"zscore",zscoreCommand,3,"rF",0,NULL,1,1,1,0,0},
        {"zrank",zrankCommand,3,"rF",0,NULL,1,1,1,0,0},
        {"zrevrank",zrevrankCommand,3,"rF",0,NULL,1,1,1,0,0},
        {"zscan",zscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
        {"hset",hsetCommand,4,"wmF",0,NULL,1,1,1,0,0},
        {"hsetnx",hsetnxCommand,4,"wmF",0,NULL,1,1,1,0,0},
        {"hget",hgetCommand,3,"rF",0,NULL,1,1,1,0,0},
        {"hmset",hmsetCommand,-4,"wm",0,NULL,1,1,1,0,0},
        {"hmget",hmgetCommand,-3,"r",0,NULL,1,1,1,0,0},
        {"hincrby",hincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0},
        {"hincrbyfloat",hincrbyfloatCommand,4,"wmF",0,NULL,1,1,1,0,0},
        {"hdel",hdelCommand,-3,"wF",0,NULL,1,1,1,0,0},
        {"hlen",hlenCommand,2,"rF",0,NULL,1,1,1,0,0},
        {"hstrlen",hstrlenCommand,3,"rF",0,NULL,1,1,1,0,0},
        {"hkeys",hkeysCommand,2,"rS",0,NULL,1,1,1,0,0},
        {"hvals",hvalsCommand,2,"rS",0,NULL,1,1,1,0,0},
        {"hgetall",hgetallCommand,2,"r",0,NULL,1,1,1,0,0},
        {"hexists",hexistsCommand,3,"rF",0,NULL,1,1,1,0,0},
        {"hscan",hscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
        {"incrby",incrbyCommand,3,"wmF",0,NULL,1,1,1,0,0},
        {"decrby",decrbyCommand,3,"wmF",0,NULL,1,1,1,0,0},
        {"incrbyfloat",incrbyfloatCommand,3,"wmF",0,NULL,1,1,1,0,0},
        {"getset",getsetCommand,3,"wm",0,NULL,1,1,1,0,0},
        {"mset",msetCommand,-3,"wm",0,NULL,1,-1,2,0,0},
        {"msetnx",msetnxCommand,-3,"wm",0,NULL,1,-1,2,0,0},
        {"randomkey",randomkeyCommand,1,"rR",0,NULL,0,0,0,0,0},
        {"select",selectCommand,2,"lF",0,NULL,0,0,0,0,0},
        {"move",moveCommand,3,"wF",0,NULL,1,1,1,0,0},
        {"rename",renameCommand,3,"w",0,NULL,1,2,1,0,0},
        {"renamenx",renamenxCommand,3,"wF",0,NULL,1,2,1,0,0},
        {"expire",expireCommand,3,"wF",0,NULL,1,1,1,0,0},
        {"expireat",expireatCommand,3,"wF",0,NULL,1,1,1,0,0},
        {"pexpire",pexpireCommand,3,"wF",0,NULL,1,1,1,0,0},
        {"pexpireat",pexpireatCommand,3,"wF",0,NULL,1,1,1,0,0},
        {"keys",keysCommand,2,"rS",0,NULL,0,0,0,0,0},
        {"scan",scanCommand,-2,"rR",0,NULL,0,0,0,0,0},
        {"dbsize",dbsizeCommand,1,"rF",0,NULL,0,0,0,0,0},
        {"auth",authCommand,2,"sltF",0,NULL,0,0,0,0,0},
        {"ping",pingCommand,-1,"tF",0,NULL,0,0,0,0,0},
        {"echo",echoCommand,2,"F",0,NULL,0,0,0,0,0},
        {"save",saveCommand,1,"as",0,NULL,0,0,0,0,0},
        {"bgsave",bgsaveCommand,-1,"a",0,NULL,0,0,0,0,0},
        {"bgrewriteaof",bgrewriteaofCommand,1,"a",0,NULL,0,0,0,0,0},
        {"shutdown",shutdownCommand,-1,"alt",0,NULL,0,0,0,0,0},
        {"lastsave",lastsaveCommand,1,"RF",0,NULL,0,0,0,0,0},
        {"type",typeCommand,2,"rF",0,NULL,1,1,1,0,0},
        {"multi",multiCommand,1,"sF",0,NULL,0,0,0,0,0},
        {"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0},
        {"discard",discardCommand,1,"sF",0,NULL,0,0,0,0,0},
        {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
        {"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0},
        {"replconf",replconfCommand,-1,"aslt",0,NULL,0,0,0,0,0},
        {"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0},
        {"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0},
        {"sort",sortCommand,-2,"wm",0,sortGetKeys,1,1,1,0,0},
        {"info",infoCommand,-1,"lt",0,NULL,0,0,0,0,0},
        {"monitor",monitorCommand,1,"as",0,NULL,0,0,0,0,0},
        {"ttl",ttlCommand,2,"rF",0,NULL,1,1,1,0,0},
        {"touch",touchCommand,-2,"rF",0,NULL,1,1,1,0,0},
        {"pttl",pttlCommand,2,"rF",0,NULL,1,1,1,0,0},
        {"persist",persistCommand,2,"wF",0,NULL,1,1,1,0,0},
        {"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0},
        {"role",roleCommand,1,"lst",0,NULL,0,0,0,0,0},
        {"debug",debugCommand,-1,"as",0,NULL,0,0,0,0,0},
        {"config",configCommand,-2,"lat",0,NULL,0,0,0,0,0},
        {"subscribe",subscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
        {"unsubscribe",unsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
        {"psubscribe",psubscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
        {"punsubscribe",punsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
        {"publish",publishCommand,3,"pltF",0,NULL,0,0,0,0,0},
        {"pubsub",pubsubCommand,-2,"pltR",0,NULL,0,0,0,0,0},
        {"watch",watchCommand,-2,"sF",0,NULL,1,-1,1,0,0},
        {"unwatch",unwatchCommand,1,"sF",0,NULL,0,0,0,0,0},
        {"cluster",clusterCommand,-2,"a",0,NULL,0,0,0,0,0},
        {"restore",restoreCommand,-4,"wm",0,NULL,1,1,1,0,0},
        {"restore-asking",restoreCommand,-4,"wmk",0,NULL,1,1,1,0,0},
        {"migrate",migrateCommand,-6,"w",0,migrateGetKeys,0,0,0,0,0},
        {"asking",askingCommand,1,"F",0,NULL,0,0,0,0,0},
        {"readonly",readonlyCommand,1,"F",0,NULL,0,0,0,0,0},
        {"readwrite",readwriteCommand,1,"F",0,NULL,0,0,0,0,0},
        {"dump",dumpCommand,2,"r",0,NULL,1,1,1,0,0},
        {"object",objectCommand,3,"r",0,NULL,2,2,2,0,0},
        {"client",clientCommand,-2,"as",0,NULL,0,0,0,0,0},
        {"eval",evalCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
        {"evalsha",evalShaCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
        {"slowlog",slowlogCommand,-2,"a",0,NULL,0,0,0,0,0},
        {"script",scriptCommand,-2,"s",0,NULL,0,0,0,0,0},
        {"time",timeCommand,1,"RF",0,NULL,0,0,0,0,0},
        {"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0},
        {"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0},
        {"bitpos",bitposCommand,-3,"r",0,NULL,1,1,1,0,0},
        {"wait",waitCommand,3,"s",0,NULL,0,0,0,0,0},
        {"command",commandCommand,0,"lt",0,NULL,0,0,0,0,0},
        {"geoadd",geoaddCommand,-5,"wm",0,NULL,1,1,1,0,0},
        {"georadius",georadiusCommand,-6,"w",0,NULL,1,1,1,0,0},
        {"georadiusbymember",georadiusByMemberCommand,-5,"w",0,NULL,1,1,1,0,0},
        {"geohash",geohashCommand,-2,"r",0,NULL,1,1,1,0,0},
        {"geopos",geoposCommand,-2,"r",0,NULL,1,1,1,0,0},
        {"geodist",geodistCommand,-4,"r",0,NULL,1,1,1,0,0},
        {"pfselftest",pfselftestCommand,1,"a",0,NULL,0,0,0,0,0},
        {"pfadd",pfaddCommand,-2,"wmF",0,NULL,1,1,1,0,0},
        {"pfcount",pfcountCommand,-2,"r",0,NULL,1,-1,1,0,0},
        {"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0},
        {"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0},
        {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
        {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
        {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}
    };

    而这些命令会存放在一个字典中 dict *commands

    void populateCommandTable(void) {
        int j;
        int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
    
        for (j = 0; j < numcommands; j++) {
            struct redisCommand *c = redisCommandTable+j;
            char *f = c->sflags;
            int retval1, retval2;
    
            while(*f != '') {
                switch(*f) {
                case 'w': c->flags |= CMD_WRITE; break;
                case 'r': c->flags |= CMD_READONLY; break;
                case 'm': c->flags |= CMD_DENYOOM; break;
                case 'a': c->flags |= CMD_ADMIN; break;
                case 'p': c->flags |= CMD_PUBSUB; break;
                case 's': c->flags |= CMD_NOSCRIPT; break;
                case 'R': c->flags |= CMD_RANDOM; break;
                case 'S': c->flags |= CMD_SORT_FOR_SCRIPT; break;
                case 'l': c->flags |= CMD_LOADING; break;
                case 't': c->flags |= CMD_STALE; break;
                case 'M': c->flags |= CMD_SKIP_MONITOR; break;
                case 'k': c->flags |= CMD_ASKING; break;
                case 'F': c->flags |= CMD_FAST; break;
                default: serverPanic("Unsupported command flag"); break;
                }
                f++;
            }
    
            retval1 = dictAdd(server.commands, sdsnew(c->name), c);
            /* Populate an additional dictionary that will be unaffected
             * by rename-command statements in redis.conf. */
            retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
            serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
        }
    }

    call 执行具体命令,并且传播命令,等等。

    void call(client *c, int flags) {
        long long dirty, start, duration;
        int client_old_flags = c->flags;
    
        /* Sent the command to clients in MONITOR mode, only if the commands are
         * not generated from reading an AOF. */
        if (listLength(server.monitors) &&
            !server.loading &&
            !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
        {
            replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
        }
    
        /* Initialization: clear the flags that must be set by the command on
         * demand, and initialize the array for additional commands propagation. */
        c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
        redisOpArrayInit(&server.also_propagate);
    
        /* Call the command. */
        dirty = server.dirty;
        start = ustime();
        c->cmd->proc(c);
        duration = ustime()-start;
        dirty = server.dirty-dirty;
        if (dirty < 0) dirty = 0;
    
        /* When EVAL is called loading the AOF we don't want commands called
         * from Lua to go into the slowlog or to populate statistics. */
        if (server.loading && c->flags & CLIENT_LUA)
            flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
    
        /* If the caller is Lua, we want to force the EVAL caller to propagate
         * the script if the command flag or client flag are forcing the
         * propagation. */
        if (c->flags & CLIENT_LUA && server.lua_caller) {
            if (c->flags & CLIENT_FORCE_REPL)
                server.lua_caller->flags |= CLIENT_FORCE_REPL;
            if (c->flags & CLIENT_FORCE_AOF)
                server.lua_caller->flags |= CLIENT_FORCE_AOF;
        }
    
        /* Log the command into the Slow log if needed, and populate the
         * per-command statistics that we show in INFO commandstats. */
        if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
            char *latency_event = (c->cmd->flags & CMD_FAST) ?
                                  "fast-command" : "command";
            latencyAddSampleIfNeeded(latency_event,duration/1000);
            slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
        }
        if (flags & CMD_CALL_STATS) {
            c->lastcmd->microseconds += duration;
            c->lastcmd->calls++;
        }
    
        /* Propagate the command into the AOF and replication link */
        if (flags & CMD_CALL_PROPAGATE &&
            (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
        {
            int propagate_flags = PROPAGATE_NONE;
    
            /* Check if the command operated changes in the data set. If so
             * set for replication / AOF propagation. */
            if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
    
            /* If the client forced AOF / replication of the command, set
             * the flags regardless of the command effects on the data set. */
            if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
            if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
    
            /* However prevent AOF / replication propagation if the command
             * implementatino called preventCommandPropagation() or similar,
             * or if we don't have the call() flags to do so. */
            if (c->flags & CLIENT_PREVENT_REPL_PROP ||
                !(flags & CMD_CALL_PROPAGATE_REPL))
                    propagate_flags &= ~PROPAGATE_REPL;
            if (c->flags & CLIENT_PREVENT_AOF_PROP ||
                !(flags & CMD_CALL_PROPAGATE_AOF))
                    propagate_flags &= ~PROPAGATE_AOF;
    
            /* Call propagate() only if at least one of AOF / replication
             * propagation is needed. */
            if (propagate_flags != PROPAGATE_NONE)
                propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
        }
    
        /* Restore the old replication flags, since call() can be executed
         * recursively. */
        c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
        c->flags |= client_old_flags &
            (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
    
        /* Handle the alsoPropagate() API to handle commands that want to propagate
         * multiple separated commands. Note that alsoPropagate() is not affected
         * by CLIENT_PREVENT_PROP flag. */
        if (server.also_propagate.numops) {
            int j;
            redisOp *rop;
    
            if (flags & CMD_CALL_PROPAGATE) {
                for (j = 0; j < server.also_propagate.numops; j++) {
                    rop = &server.also_propagate.ops[j];
                    int target = rop->target;
                    /* Whatever the command wish is, we honor the call() flags. */
                    if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
                    if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
                    if (target)
                        propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
                }
            }
            redisOpArrayFree(&server.also_propagate);
        }
        server.stat_numcommands++;
    }

    c->cmd->proc(c); 是调用具体的命令。以 'get name' 为例,c->cmd->proc = getCommand

    int getGenericCommand(client *c) {
        robj *o;
    
        if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
            return C_OK;
    
        if (o->type != OBJ_STRING) {
            addReply(c,shared.wrongtypeerr);
            return C_ERR;
        } else {
            addReplyBulk(c,o);
            return C_OK;
        }
    }
    
    void getCommand(client *c) {
        getGenericCommand(c);
    }

    执行命令,把响应添加到缓冲区

    client 有 2 个输出缓冲区可用,一个是 char buf[PROTO_REPLY_CHUNK_BYTES],一个是 list *reply,首先使用 buf,当 buf 不足时,使用 reply

    发送响应: (t_string.c) setGenericCommand -> (networking.c) addReply

    void addReply(client *c, robj *obj) {
        if (prepareClientToWrite(c) != C_OK) return;
    
        /* This is an important place where we can avoid copy-on-write
         * when there is a saving child running, avoiding touching the
         * refcount field of the object if it's not needed.
         *
         * If the encoding is RAW and there is room in the static buffer
         * we'll be able to send the object to the client without
         * messing with its page. */
        if (sdsEncodedObject(obj)) {
            if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
                _addReplyObjectToList(c,obj);
        } else if (obj->encoding == OBJ_ENCODING_INT) {
            /* Optimization: if there is room in the static buffer for 32 bytes
             * (more than the max chars a 64 bit integer can take as string) we
             * avoid decoding the object and go for the lower level approach. */
            if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) {
                char buf[32];
                int len;
    
                len = ll2string(buf,sizeof(buf),(long)obj->ptr);
                if (_addReplyToBuffer(c,buf,len) == C_OK)
                    return;
                /* else... continue with the normal code path, but should never
                 * happen actually since we verified there is room. */
            }
            obj = getDecodedObject(obj);
            if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
                _addReplyObjectToList(c,obj);
            decrRefCount(obj);
        } else {
            serverPanic("Wrong obj->encoding in addReply()");
        }
    }
    
    int _addReplyToBuffer(client *c, const char *s, size_t len) {
        size_t available = sizeof(c->buf)-c->bufpos;
    
        if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;
    
        /* If there already are entries in the reply list, we cannot
         * add anything more to the static buffer. */
        if (listLength(c->reply) > 0) return C_ERR;
    
        /* Check that the buffer has enough space available for this string. */
        if (len > available) return C_ERR;
    
        memcpy(c->buf+c->bufpos,s,len);
        c->bufpos+=len;
        return C_OK;
    }

     那么客户端的响应是什么时候从缓冲区发送出去的呢?

    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);
            aeProcessEvents(eventLoop, AE_ALL_EVENTS);
        }
    }
  • 相关阅读:
    html实体引用
    nginx配置奇怪问题记录
    20191025-生产事故记录
    abp审计日志功能的关闭
    mysql根据一张表更新另一张表数据
    redis-cli连接redis服务器操作
    错误记录-MySql.Data.MySqlClient.MySqlException (0x80004005): Timeout expired.
    UID,GID,口令
    【搬砖】/etc/passwd 文件结构
    《图形学》实验五:改进的Bresenham算法画直线
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8466125.html
Copyright © 2011-2022 走看看