zoukankan      html  css  js  c++  java
  • Redis 3.0.4 客户端

      Redis服务器是典型的一对多服务器程序:一个服务器可以与多个客户端建立网络连接,每个客户端可以向服务器发送命令请求,而服务器则接受并处理客户端发送的命令请求,并向客户端返回命令回复。

      通过使用由I/O多路复用技术实现的文件事件处理器,Redis服务器使用单线程但进程的方式来处理命令请求,并由多个客户端进行网络通信。

    • 客户端属性
    1. 一类是比较通用的属性,这些属性很少与特定功能相关,无论客户端执行的是什么工作,他们都要用到这些属性
    2. 另外一种功能适合特定功能相关的属性,比如操作数据库时需要用到的db属性和dictid属性,执行事务时需要用到的mstate属性,以及执行watch命令时需要用到的watched_keys属性等等。
      • 套接字的描述符

          根据客户端的类型不同,fd的属性可以时-1或者是大于1的整数:

        1. 伪客户端:fd的属性时-1,伪客户端处理的命令请求来源于aof文件或者lua脚本,而不是网络,所以这种客户端不需要套接字链接,自然也不需要记录套接字描述符。目前Redis服务器会在两个地方用到伪客户端,一个是用于载入aof文件并还原数据库状态,而另一个是用于执行lua脚本中包含的Redis命令
        2. 普通客户端:普通客户端的套接字用来与服务器进行通信,fd的属性大于-1,所以服务器会用fd属性来记录客户端套接字的描述符,因为合法的套接字描述符不能是-1,所以普通客户端的套接字描述符的值必然是大于-1的整数。

          执行client list命令可以列出目前所有连接到服务器的普通客户端,命令输出中的fd显示了服务器连接客户端所使用的套接字描述符。

    • 客户端的结构

        在redis.h 中定义了redisclient的结构

     * Clients are taken in a linked list. */
    typedef struct redisClient {
        uint64_t id;            /* Client incremental unique ID. */ //服务器对于每一个链接进来的都会创建ID
        int fd;                 //当前客户端状态连接符,分为无网络链接的客户端和网络连接的客户端 无网络链接的客户端fd = -1 主要用于执行lua脚本
        redisDb *db;            //当前client的数据库 
        int dictid;
        robj *name;             /* As set by CLIENT SETNAME */ //客户端名字 
        sds querybuf;           //保存客户端发送命令请求的输入缓冲区
        size_t querybuf_peak;   /* Recent (100ms or more) peak of querybuf size */ //保存输入缓冲区的峰值
        int argc;               //命令参数个数
        robj **argv;            //命令参数列表
        struct redisCommand *cmd, *lastcmd;
        int reqtype;            //请求协议的类型
        int multibulklen;       /* number of multi bulk arguments left to read */   //缓冲区剩余未读命令的数量
        long bulklen;           /* length of bulk argument in multi bulk request */ //读入参数长度
        list *reply;
        unsigned long reply_bytes; /* Tot bytes of objects in reply list */
        int sentlen;            /* Amount of bytes already sent in the current //已发送回复的字节数
                                   buffer or object being sent. */
        time_t ctime;           /* Client creation time */
        time_t lastinteraction; /* time of the last interaction, used for timeout */
        time_t obuf_soft_limit_reached_time;
        int flags;              /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */  //客户端状态
        int authenticated;      /* when requirepass is non-NULL */
        int replstate;          /* replication state if this is a slave */
        int repl_put_online_on_ack; /* Install slave write handler on ACK. */
        int repldbfd;           /* replication DB file descriptor */
        off_t repldboff;        /* replication DB file offset */
        off_t repldbsize;       /* replication DB file size */
        sds replpreamble;       /* replication DB preamble. */
        long long reploff;      /* replication offset if this is our master */
        long long repl_ack_off; /* replication ack offset, if this is a slave */
        long long repl_ack_time;/* replication ack time, if this is a slave */
        long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
                                           copying this slave output buffer
                                           should use. */
        char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */
        int slave_listening_port; /* As configured with: SLAVECONF listening-port */
        int slave_capa;         /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
        multiState mstate;      /* MULTI/EXEC state */
        int btype;              /* Type of blocking op if REDIS_BLOCKED. */
        blockingState bpop;     /* blocking state */
        long long woff;         /* Last write global replication offset. */
        list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
        dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
        list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */
        sds peerid;             /* Cached peer ID. */
    
        /* Response buffer */
        int bufpos;
        char buf[REDIS_REPLY_CHUNK_BYTES]; //保存执行完命令所得命令回复信息的静态缓冲区,主要保存固定长度的命令回复,档处理一些返回大量回复的命令,则会将命令回复以链表形式连接起来
    } redisClient;
    • 一条命令的执行完成并且返回数据一共涉及三部分

        第一步是建立连接阶段,响应socket的建立,并创建了client对象;

        第二步是处理阶段,从socket读取数据到输入缓冲区,然后解析并获得命令,执行命令并将返回值存储到输出缓冲区; 

        第三步是数据返回阶段,将返回值从输入缓冲区写到socket中,返回给客户端,最后关闭client。

      这三个阶段之间是通过事件机制串联了,在Redis启动阶段首先要注册socket连接建立事件处理器:

      1. 当客户端发来socket的连接请求时,对应的处理器方法会被执行,建立连接阶段的相关处理就会进行,然后注册socket写事件处理器;
      2. 当客户端发来命令时,读取事件处理器的方法会被执行,对应处理阶段的相关逻辑都会被执行,然后注册socket写事件处理器;
      3. 当写事件处理器被执行时,就是将返回值写回到socket中。  

       1.启动时监听socket

          redis服务器启动时,会调用initServer方法,首先建立Redis自己的事件机制,eventLoop,然后在其上注册周期时间事件处理器,最后再坚挺socket上创建文件事件处理器,监听socket建立 连接事件,处理函数时acceptTcpHandler。    

    void initServer(void) {
        int j;
    
        signal(SIGHUP, SIG_IGN);
        signal(SIGPIPE, SIG_IGN);
        setupSignalHandlers();
    
        if (server.syslog_enabled) {
            openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
                server.syslog_facility);
        }
    
        server.pid = getpid();
        server.current_client = NULL;
        server.clients = listCreate();
        server.clients_to_close = listCreate();
        server.slaves = listCreate();
        server.monitors = listCreate();
        server.slaveseldb = -1; /* Force to emit the first SELECT command. */
        server.unblocked_clients = listCreate();
        server.ready_keys = listCreate();
        server.clients_waiting_acks = listCreate();
        server.get_ack_from_slaves = 0;
        server.clients_paused = 0;
    
        createSharedObjects();
        adjustOpenFilesLimit();
        //创建eventLoop
        server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
        server.db = zmalloc(sizeof(redisDb)*server.dbnum);
    
        /* Open the TCP listening socket for the user commands. */
        if (server.port != 0 &&
            listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
            exit(1);
    
        /* Open the listening Unix domain socket. */
        if (server.unixsocket != NULL) {
            unlink(server.unixsocket); /* don't care if this fails */
            server.sofd = anetUnixServer(server.neterr,server.unixsocket,
                server.unixsocketperm, server.tcp_backlog);
            if (server.sofd == ANET_ERR) {
                redisLog(REDIS_WARNING, "Opening Unix socket: %s", server.neterr);
                exit(1);
            }
            anetNonBlock(NULL,server.sofd);
        }
    
        /* Abort if there are no listening sockets at all. */
        if (server.ipfd_count == 0 && server.sofd < 0) {
            redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting.");
            exit(1);
        }
    
        /* Create the Redis databases, and initialize other internal state. */
        for (j = 0; j < server.dbnum; j++) {
            server.db[j].dict = dictCreate(&dbDictType,NULL);
            server.db[j].expires = dictCreate(&keyptrDictType,NULL);
            server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
            server.db[j].ready_keys = dictCreate(&setDictType,NULL);
            server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
            server.db[j].eviction_pool = evictionPoolAlloc();
            server.db[j].id = j;
            server.db[j].avg_ttl = 0;
        }
        server.pubsub_channels = dictCreate(&keylistDictType,NULL);
        server.pubsub_patterns = listCreate();
        listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
        listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
        server.cronloops = 0;
        server.rdb_child_pid = -1;
        server.aof_child_pid = -1;
        server.rdb_child_type = REDIS_RDB_CHILD_TYPE_NONE;
        aofRewriteBufferReset();
        server.aof_buf = sdsempty();
        server.lastsave = time(NULL); /* At startup we consider the DB saved. */
        server.lastbgsave_try = 0;    /* At startup we never tried to BGSAVE. */
        server.rdb_save_time_last = -1;
        server.rdb_save_time_start = -1;
        server.dirty = 0;
        resetServerStats();
        /* A few stats we don't want to reset: server startup time, and peak mem. */
        server.stat_starttime = time(NULL);
        server.stat_peak_memory = 0;
        server.resident_set_size = 0;
        server.lastbgsave_status = REDIS_OK;
        server.aof_last_write_status = REDIS_OK;
        server.aof_last_write_errno = 0;
        server.repl_good_slaves_count = 0;
        updateCachedTime();
        /* Create the serverCron() time event, that's our main way to process
         * background operations. */
        //注册时间事件,处理后台操作,比如客户端操作,过期键等
        if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
            redisPanic("Can't create the serverCron time event.");
            exit(1);
        }
    
        /* Create an event handler for accepting new connections in TCP and Unix
         * domain sockets. */
        //为所有监听的socket创建文件事件,监听刻度事件,事件处理函数为acceptTcpHandler
        for (j = 0; j < server.ipfd_count; j++) {
            if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
                acceptTcpHandler,NULL) == AE_ERR)
                {
                    redisPanic(
                        "Unrecoverable error creating server.ipfd file event.");
                }
        }
        if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
            acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
    
        /* Open the AOF file if needed. */
        if (server.aof_state == REDIS_AOF_ON) {
            server.aof_fd = open(server.aof_filename,
                                   O_WRONLY|O_APPEND|O_CREAT,0644);
            if (server.aof_fd == -1) {
                redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
                    strerror(errno));
                exit(1);
            }
        }
    
        /* 32 bit instances are limited to 4GB of address space, so if there is
         * no explicit limit in the user provided configuration we set a limit
         * at 3 GB using maxmemory with 'noeviction' policy'. This avoids
         * useless crashes of the Redis instance for out of memory. */
        if (server.arch_bits == 32 && server.maxmemory == 0) {
            redisLog(REDIS_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");
            server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
            server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION;
        }
    
        if (server.cluster_enabled) clusterInit();
        replicationScriptCacheInit();
        scriptingInit();
        slowlogInit();
        latencyMonitorInit();
        bioInit();
    }

          2.建立连接和client

        当客户端向Redis建立socket时,aeeventLoop会调用acceptTcpHandler处理函数,服务器会为每个连接创建一个client对象,并创建相应的文件事件来监听socket刻度事件,并制定事件处理函数。

        acceptTcpHandler函数会首先调用anetTcpAccept方法,他底层会调用socket的accept方法,也就是接受客户端来的建立连接请求,然后调用acceptCommonHandler方法,继续后续的逻辑处理。

    #define MAX_ACCEPTS_PER_CALL 1000
    static void acceptCommonHandler(int fd, int flags) {
        redisClient *c;
        //创建client 
        if ((c = createClient(fd)) == NULL) {
            redisLog(REDIS_WARNING,
                "Error registering fd event for the new client: %s (fd=%d)",
                strerror(errno),fd);
            close(fd); /* May be already closed, just ignore errors */
            return;
        }
        /* If maxclient directive is set and this is one client more... close the
         * connection. Note that we create the client instead to check before
         * for this condition, since now the socket is already set in non-blocking
         * mode and we can send an error for free using the Kernel I/O */
        //判断当前client的数量是否超出了配置的最大连接 maxclients
        if (listLength(server.clients) > server.maxclients) {
            char *err = "-ERR max number of clients reached
    ";
    
            /* That's a best effort error message, don't check write errors */
            if (write(c->fd,err,strlen(err)) == -1) {
                /* Nothing to do, Just to avoid the warning... */
            }
            server.stat_rejected_conn++;
            freeClient(c);
            return;
        }
        server.stat_numconnections++;
        c->flags |= flags;
    }
    
    void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
        int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
        char cip[REDIS_IP_STR_LEN];
        REDIS_NOTUSED(el);
        REDIS_NOTUSED(mask);
        REDIS_NOTUSED(privdata);
        //最多可以处理MAX_ACCEPTS_PER_CALL=1000次的新连接接入
        while(max--) {
            //底层调用socker 的sccept方法 接收客户端来的建立连接请求
            cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
            if (cfd == ANET_ERR) {
                if (errno != EWOULDBLOCK)
                    redisLog(REDIS_WARNING,
                        "Accepting client connection: %s", server.neterr);
                return;
            }
            redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
            //处理后续逻辑
            acceptCommonHandler(cfd,0);
        }
    }

        当客户端连接上服务器,服务器就会创建一个redisClient结构,用来保存客户端状态,每个客户端都有各自的输入缓冲区和输出缓冲区,输入缓冲区存储存储客户端通过socket发送过来的数据,输出缓冲区则存储着Redis对客户端的响应数据,

    redisClient *createClient(int fd) {
        redisClient *c = zmalloc(sizeof(redisClient));
    
        /* passing -1 as fd it is possible to create a non connected client.
         * This is useful since all the Redis commands needs to be executed
         * in the context of a client. When commands are executed in other
         * contexts (for instance a Lua script) we need a non connected client. */
        //fd = -1 表示创建的是一个无网络链接的伪客户端 用于执行lua脚本
        //如果fd != -1 表示创建一个有网络链接的客户端
        if (fd != -1) {
            //设置fd为非阻塞
            anetNonBlock(NULL,fd);
            //禁止使用 Nagle算法 client向内核提交的没个数据包都会理解发送到server TCP_NODELAY
    
            anetEnableTcpNoDelay(NULL,fd);
            //开启tcpkeepalive 设置SO_KEEPALIVE
            if (server.tcpkeepalive)
                //设置tcp链接的keep alive选项
                anetKeepAlive(NULL,fd,server.tcpkeepalive);
            //创建一个文件事件状态el 且监听读事件 开始接受命令的输入
            if (aeCreateFileEvent(server.el,fd,AE_READABLE,
                readQueryFromClient, c) == AE_ERR)
            {
                close(fd);
                zfree(c);
                return NULL;
            }
        }
        selectDb(c,0);
        c->id = server.next_client_id++;
        c->fd = fd;
        c->name = NULL;
        c->bufpos = 0;
        c->querybuf = sdsempty();
        c->querybuf_peak = 0;
        c->reqtype = 0;
        c->argc = 0;
        c->argv = NULL;
        c->cmd = c->lastcmd = NULL;
        c->multibulklen = 0;
        c->bulklen = -1;
        c->sentlen = 0;
        c->flags = 0;
        c->ctime = c->lastinteraction = server.unixtime;
        c->authenticated = 0;
        c->replstate = REDIS_REPL_NONE;
        c->repl_put_online_on_ack = 0;
        c->reploff = 0;
        c->repl_ack_off = 0;
        c->repl_ack_time = 0;
        c->slave_listening_port = 0;
        c->slave_capa = SLAVE_CAPA_NONE;
        c->reply = listCreate();
        c->reply_bytes = 0;
        c->obuf_soft_limit_reached_time = 0;
        listSetFreeMethod(c->reply,decrRefCountVoid);
        listSetDupMethod(c->reply,dupClientReplyValue);
        c->btype = REDIS_BLOCKED_NONE;
        c->bpop.timeout = 0;
        c->bpop.keys = dictCreate(&setDictType,NULL);
        c->bpop.target = NULL;
        c->bpop.numreplicas = 0;
        c->bpop.reploffset = 0;
        c->woff = 0;
        c->watched_keys = listCreate();
        c->pubsub_channels = dictCreate(&setDictType,NULL);
        c->pubsub_patterns = listCreate();
        c->peerid = NULL;
        listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
        listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
        if (fd != -1) listAddNodeTail(server.clients,c);
        initClientMultiState(c);
        return c;
    }

         3.读取socket数据到输入缓冲区

         readQueryFromClient会调用read方法读取数据到输入缓冲区。

    //读取client的输入缓冲区的内容
    //
    void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
        redisClient *c = (redisClient*) privdata;
        int nread, readlen;
        size_t qblen;
        REDIS_NOTUSED(el);
        REDIS_NOTUSED(mask);
    
        server.current_client = c;
        //读入长度默认16MB
        readlen = REDIS_IOBUF_LEN;
        /* If this is a multi bulk request, and we are processing a bulk reply
         * that is large enough, try to maximize the probability that the query
         * buffer contains exactly the SDS string representing the object, even
         * at the risk of requiring more read(2) calls. This way the function
         * processMultiBulkBuffer() can avoid copying buffers to create the
         * Redis Object representing the argument. */
        //如果是多条请求 根据请求的大小 设置读入的长度readlen
        //bulklen 标识读入参数的长度
        if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
            && c->bulklen >= REDIS_MBULK_BIG_ARG)
        {
            int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
    
            if (remaining < readlen) readlen = remaining;
        }
        //输入缓冲区的长度
        qblen = sdslen(c->querybuf);
        //更新缓冲区的峰值
        if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
        //扩展缓冲区的大小
        c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
        //将client发来的命令  读入到输入缓冲区中
        nread = read(fd, c->querybuf+qblen, readlen);
        if (nread == -1) {
            if (errno == EAGAIN) {
                nread = 0;
            } else {
                redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
                freeClient(c);
                return;
            }
        } else if (nread == 0) {
            redisLog(REDIS_VERBOSE, "Client closed connection");
            freeClient(c);
            return;
        }
        //读操作完成
        if (nread) {
            //更新输入缓冲区的已用大小和未用大小
            sdsIncrLen(c->querybuf,nread);
            //设置最后一次服务器与client的交互时间
            c->lastinteraction = server.unixtime;
            //如果是主节点,则更新复制操作的偏移量
            if (c->flags & REDIS_MASTER) c->reploff += nread;
            //更新从网络输入的字节数
            server.stat_net_input_bytes += nread;
        } else {
            server.current_client = NULL;
            return;
        }
        //如果输入缓冲区长度超过服务器设置的最大缓冲区长度
        if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
            //将client信息转换成sds
            sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
            //输入缓冲区保存在bytes中
            bytes = sdscatrepr(bytes,c->querybuf,64);
            //打印到日志
            redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
            //释放空间
            sdsfree(ci);
            sdsfree(bytes);
            freeClient(c);
            return;
        }
        processInputBuffer(c);
        server.current_client = NULL;
    }

        函数readQueryFromClient从文件描述符fd中读取客户端的数据,read读取的最大字节数是16k,并保留在querybuf中,并更新缓冲区的峰值,如果输入缓冲区长度超过服务器设置的最大缓冲区长度,则会退出,server.client_max_querybuf_len在服务器初始化时设置为PROTO_MAX_QUERYBUF_LEN (1024*1024*1024)也就是1G大小。

        将数据读取到输入缓冲区后,在redis版本小于4.0的版本中,读取到数据不会根据client的类型做不同的处理,版本大于等于4.0的redis,会根据client的不同类型进行处理,如果时普通类型直接调用processInputBuffer,如果client时master的连接,还需将自服务器传来的命令执行后,继续将命令同步给自己的从服务器。

    //redis 4.0.14 部分代码
    /* Time to process the buffer. If the client is a master we need to
         * compute the difference between the applied offset before and after
         * processing the buffer, to understand how much of the replication stream
         * was actually applied to the master state: this quantity, and its
         * corresponding part of the replication stream, will be propagated to
         * the sub-slaves and to the replication backlog. */
        if (!(c->flags & CLIENT_MASTER)) {
            processInputBuffer(c);
        } else {
            //如果client时master的连接
            size_t prev_offset = c->reploff;
            processInputBuffer(c);
            //判断是否同步偏移量发生变化,则通知slave
            size_t applied = c->reploff - prev_offset;
            if (applied) {
                replicationFeedSlavesFromMasterStream(server.slaves,
                        c->pending_querybuf, applied);
                sdsrange(c->pending_querybuf,applied,-1);
            }
        }

        4.解析输入缓冲区的命令

          processInputBuffer主要是将输入缓冲区中的数据解析成相应的命令,根据命令是REDIS_REQ_MULTIBULK(redis-cli)或REDIS_REQ_INLINE(telnet)分别调用函数processInlineBuffer或processMultibulkBuffer

          

    void processInputBuffer(redisClient *c) {
        /* Keep processing while there is something in the input buffer */
        //一直读输入缓冲区的内容
        while(sdslen(c->querybuf)) {
            /* Return if clients are paused. */
            //如果处于暂停状态  直接返回
            if (!(c->flags & REDIS_SLAVE) && clientsArePaused()) return;
            
            /* Immediately abort if the client is in the middle of something. */
            //如果client处于被阻塞状态 直接返回
            if (c->flags & REDIS_BLOCKED) return;
            
            /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
             * written to the client. Make sure to not let the reply grow after
             * this flag has been set (i.e. don't process more commands). */
            //如果client处于关闭状态 直接返回
            if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
            
            /* Determine request type when unknown. */
            //如果是未知请求类型 则判定请求类型
            if (!c->reqtype) {
                //如果是*开头 则是多条请求 是client发来的
                if (c->querybuf[0] == '*') {
                    c->reqtype = REDIS_REQ_MULTIBULK;
                } else {
                //否则就是内联请求  是telnet发来的
                    c->reqtype = REDIS_REQ_INLINE;
                }   
            }   
            //如果是内联请求
            if (c->reqtype == REDIS_REQ_INLINE) {
                //处理telnet发来的内联请求 并创建成对象 保存在client的参数列表中
                if (processInlineBuffer(c) != REDIS_OK) break;
            } else if (c->reqtype == REDIS_REQ_MULTIBULK) {
                //将client的querybuf中的协议内容转换成client的参数列表中的对象
                if (processMultibulkBuffer(c) != REDIS_OK) break;
            } else {
                redisPanic("Unknown request type");
            }   
            
            /* Multibulk processing could see a <= 0 length. */
            //如果参数是0  则重置client 
           if (c->argc == 0) {
                resetClient(c);
            } else {
                //执行命令成功后重置
                //Only reset the client when the command was executed. */
                if (processCommand(c) == REDIS_OK)
                    resetClient(c);
            }       
        }   
    }

        5.执行命令

          processCommand方法会处理很多逻辑,大致分为三部分,首先是调用lookupCommand方法获得对应的redisCommand,接着时检测当前redis是否可以执行命令,最后是调用call方法真正执行命令。

        processCommand会做如下逻辑处理:

      1. 如果命令是quit,则直接返回,并设置客户端标志位;
      2. 根据argv[0]从数据库的字典中查找对应的redisCommand,所有的命令都存储在命令字典redisCommandTable中,根据命令名称可以获取对应的命令;
      3. 进行用户权限校验;
      4. 如果是集群模式,处理集群重定向。当命令发送者是master或者命令没有任何key的参数时,可以不重定向;
      5. 预防maxmemory情况,先尝试回首一下,如果不行,则返回异常;
      6. 当此服务器是master时:aof持久化失败时,或上一次bgsave执行错误,且配置bgsave参数和stop_writes_on_bgsave_err,禁止执行写命令;
      7. 当此服务器是master时,如果配置了server.repl_min_slaves_to_write,当slave数目小于时,禁止执行写命令;
      8. 当时只读slave时,除了master不接受其他写命令;
      9. 当客户端正在订阅频道时,只会执行部分命令;
      10. 当服务器为slave,但是没有连接master时,只会执行带有REDIS_CMD_STALE标志的命令,如info;
      11. 正在加载数据库时,只会执行带有REDIS_CMD_LOADING标志的命令,其他都会拒绝;
      12. 当服务器因为执行lua脚本阻塞时,只会执行部分命令,其他都会拒绝;
      13. 如果是事务命令,则开启事务,命令进入等待队列,否则直接执行命令。
    /* If this function gets called we already read a whole
     * command, arguments are in the client argv/argc fields.
     * processCommand() execute the command or prepare the
     * server for a bulk read from the client.
     *
     * If 1 is returned the client is still alive and valid and
     * other operations can be performed by the caller. Otherwise
     * if 0 is returned the client was destroyed (i.e. after QUIT). */
    int processCommand(redisClient *c) {
        /* The QUIT command is handled separately. Normal command procs will
         * go through checking for replication and QUIT will cause trouble
         * when FORCE_REPLICATION is enabled and would be implemented in
         * a regular command proc. */
        //1.如果命令是quit,则直接返回,并设置客户端标志位;
        if (!strcasecmp(c->argv[0]->ptr,"quit")) {
            addReply(c,shared.ok);
            c->flags |= REDIS_CLOSE_AFTER_REPLY;
            return REDIS_ERR;
        }
    
        /* Now lookup the command and check ASAP about trivial error conditions
         * such as wrong arity, bad command name and so forth. */
        //2.从数据库的字典中查找命令
        c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
        //命令不存在
        if (!c->cmd) {
            flagTransaction(c);   //如果是事务状态的命令 则设置事务为失败
            addReplyErrorFormat(c,"unknown command '%s'",
                (char*)c->argv[0]->ptr);
            return REDIS_OK;
        } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
                   (c->argc < -c->cmd->arity)) {
            flagTransaction(c);  //如果是事务状态的命令 则设置事务为失败
            addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
                c->cmd->name);
            return REDIS_OK;
        }
    
        /* Check if the user is authenticated */
        //3.如果实例设置了密码 但是校验失败
        if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
        {
            flagTransaction(c);
            addReply(c,shared.noautherr);
            return REDIS_OK;
        }
        
        /* If cluster is enabled perform the cluster redirection here.
         * However we don't perform the redirection if:
         * 1) The sender of this command is our master.
         * 2) The command has no key arguments. */
        //4.如果是集群模式,处理集群重定向
        //如果开启了集群模式 则执行集群的定向操作 下面两种情况例外
        //1.命令的发送是主节点服务器
        //2.命令没有key
        if (server.cluster_enabled &&
            !(c->flags & REDIS_MASTER) &&
            !(c->flags & REDIS_LUA_CLIENT &&
              server.lua_caller->flags & REDIS_MASTER) &&
            !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
        {
            int hashslot;
    
            if (server.cluster->state != REDIS_CLUSTER_OK) {
                flagTransaction(c);
                clusterRedirectClient(c,NULL,0,REDIS_CLUSTER_REDIR_DOWN_STATE);
                return REDIS_OK;
            } else {
                int error_code;
                //从集群中返回一个能够执行命令的节点
                clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
                if (n == NULL || n != server.cluster->myself) {
                    flagTransaction(c);
                    //将事务状态置为失败
                    //执行client重定向操作
                    clusterRedirectClient(c,n,hashslot,error_code);
                    return REDIS_OK;
                }
            }
        }
    
        /* Handle the maxmemory directive.
         *
         * First we try to free some memory if possible (if there are volatile
         * keys in the dataset). If there are not the only thing we can do
         * is returning an error. */
        //5.按需释放一部分内存
        if (server.maxmemory) {
            
            int retval = freeMemoryIfNeeded();
            if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
                flagTransaction(c);
                addReply(c, shared.oomerr);
                return REDIS_OK;
            }
        }
        /* Don't accept write commands if there are problems persisting on disk
         * and if this is a master instance. */
         //6.当此服务器是master时:aof持久化失败时,或上一次bgsave执行错误,且配置bgsave参数和stop_writes_on_bgsave_err,禁止执行写命令;
        if (((server.stop_writes_on_bgsave_err &&
              server.saveparamslen > 0 &&
              server.lastbgsave_status == REDIS_ERR) ||
              server.aof_last_write_status == REDIS_ERR) &&
            server.masterhost == NULL &&
            (c->cmd->flags & REDIS_CMD_WRITE ||
             c->cmd->proc == pingCommand))
        {
            flagTransaction(c);
            if (server.aof_last_write_status == REDIS_OK)
                addReply(c, shared.bgsaveerr);
            else
                addReplySds(c,
                    sdscatprintf(sdsempty(),
                    "-MISCONF Errors writing to the AOF file: %s
    ",
                    strerror(server.aof_last_write_errno)));
            return REDIS_OK;
        }
    
        /* Don't accept write commands if there are not enough good slaves and
         * user configured the min-slaves-to-write option. */
        //7.当此服务器是master,并且配置了repl_min_slaves_to_write 当slave数小于,禁止写
        if (server.masterhost == NULL &&
            server.repl_min_slaves_to_write &&
            server.repl_min_slaves_max_lag &&
            c->cmd->flags & REDIS_CMD_WRITE &&
            server.repl_good_slaves_count < server.repl_min_slaves_to_write)
        {
            flagTransaction(c);
            addReply(c, shared.noreplicaserr);
            return REDIS_OK;
        }
    
        /* Don't accept write commands if this is a read only slave. But
         * accept write commands if this is our master. */
        //8.当是只读,除了master不接受其他写命令
        if (server.masterhost && server.repl_slave_ro &&
            !(c->flags & REDIS_MASTER) &&
            c->cmd->flags & REDIS_CMD_WRITE)
        {
            addReply(c, shared.roslaveerr);
            return REDIS_OK;
        }
        /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
        //9.当客户端正在订阅频道,只会执行部分命令
        if (c->flags & REDIS_PUBSUB &&
            c->cmd->proc != pingCommand &&
            c->cmd->proc != subscribeCommand &&
            c->cmd->proc != unsubscribeCommand &&
            c->cmd->proc != psubscribeCommand &&
            c->cmd->proc != punsubscribeCommand) {
            addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context");
            return REDIS_OK;
        }
    
        /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and
         * we are a slave with a broken link with master. */
        //10.服务器是slave 但是没有连接master 只会执行带有REDIS_CMD_STALE的命令,如info  其他命令会被拒绝
        if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED &&
            server.repl_serve_stale_data == 0 &&
            !(c->cmd->flags & REDIS_CMD_STALE))
        {
            flagTransaction(c);
            addReply(c, shared.masterdownerr);
            return REDIS_OK;
        }
    
        /* Loading DB? Return an error if the command has not the
         * REDIS_CMD_LOADING flag. */
        //11. 正在加载数据库时,只会执行代有REDIS_CMD_LOADING标志的命令 其他都会拒绝
        if (server.loading && !(c->cmd->flags & REDIS_CMD_LOADING)) {
            addReply(c, shared.loadingerr);
            return REDIS_OK;
        }
    
        /* Lua script too slow? Only allow a limited number of commands. */
        //12 服务器因为执行lua脚本阻塞时 只会执行部分命令 其他都会拒绝
        if (server.lua_timedout &&
              c->cmd->proc != authCommand &&
              c->cmd->proc != replconfCommand &&
            !(c->cmd->proc == shutdownCommand &&
              c->argc == 2 &&
              tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
            !(c->cmd->proc == scriptCommand &&
              c->argc == 2 &&
              tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
        {
            flagTransaction(c);
            addReply(c, shared.slowscripterr);
            return REDIS_OK;
        }
        /* Exec the command */
        //13 如果时事务命令,则开启事务,命令进入等待队列,都则直接执行命令
        if (c->flags & REDIS_MULTI &&
            c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
            c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
        {
            queueMultiCommand(c);
            addReply(c,shared.queued);
        } else {
            call(c,REDIS_CALL_FULL);
            c->woff = server.master_repl_offset;
            if (listLength(server.ready_keys))
                handleClientsBlockedOnLists();
        }
        return REDIS_OK;
    }
  • 相关阅读:
    inputstream和outputstream读写数据模板代码
    如何显示包的上一层包
    我的cnblogs设置代码
    myeclipse ctrl+shift+F失效
    数据包加密解密
    用VisualSVN Server创建版本库,以及TortoiseSVN的使用
    权限验证MVC
    Asp.net MVC23 使用Areas功能的常见错误
    MVC基础知识
    最全的Resharper快捷键汇总
  • 原文地址:https://www.cnblogs.com/chenyang920/p/13369383.html
Copyright © 2011-2022 走看看