zoukankan      html  css  js  c++  java
  • Redis 3.0.4 客户端

      Redis服务器是典型的一对多服务器程序:一个服务器可以与多个客户端建立网络连接,每个客户端可以向服务器发送命令请求,而服务器则接受并处理客户端发送的命令请求,并向客户端返回命令回复。

      通过使用由I/O多路复用技术实现的文件事件处理器,Redis服务器使用单线程但进程的方式来处理命令请求,并由多个客户端进行网络通信。

    • 客户端属性
    1. 一类是比较通用的属性,这些属性很少与特定功能相关,无论客户端执行的是什么工作,他们都要用到这些属性
    2. 另外一种功能适合特定功能相关的属性,比如操作数据库时需要用到的db属性和dictid属性,执行事务时需要用到的mstate属性,以及执行watch命令时需要用到的watched_keys属性等等。
      • 套接字的描述符

          根据客户端的类型不同,fd的属性可以时-1或者是大于1的整数:

        1. 伪客户端:fd的属性时-1,伪客户端处理的命令请求来源于aof文件或者lua脚本,而不是网络,所以这种客户端不需要套接字链接,自然也不需要记录套接字描述符。目前Redis服务器会在两个地方用到伪客户端,一个是用于载入aof文件并还原数据库状态,而另一个是用于执行lua脚本中包含的Redis命令
        2. 普通客户端:普通客户端的套接字用来与服务器进行通信,fd的属性大于-1,所以服务器会用fd属性来记录客户端套接字的描述符,因为合法的套接字描述符不能是-1,所以普通客户端的套接字描述符的值必然是大于-1的整数。

          执行client list命令可以列出目前所有连接到服务器的普通客户端,命令输出中的fd显示了服务器连接客户端所使用的套接字描述符。

    • 客户端的结构

        在redis.h 中定义了redisclient的结构

     * Clients are taken in a linked list. */
    typedef struct redisClient {
        uint64_t id;            /* Client incremental unique ID. */ //服务器对于每一个链接进来的都会创建ID
        int fd;                 //当前客户端状态连接符,分为无网络链接的客户端和网络连接的客户端 无网络链接的客户端fd = -1 主要用于执行lua脚本
        redisDb *db;            //当前client的数据库 
        int dictid;
        robj *name;             /* As set by CLIENT SETNAME */ //客户端名字 
        sds querybuf;           //保存客户端发送命令请求的输入缓冲区
        size_t querybuf_peak;   /* Recent (100ms or more) peak of querybuf size */ //保存输入缓冲区的峰值
        int argc;               //命令参数个数
        robj **argv;            //命令参数列表
        struct redisCommand *cmd, *lastcmd;
        int reqtype;            //请求协议的类型
        int multibulklen;       /* number of multi bulk arguments left to read */   //缓冲区剩余未读命令的数量
        long bulklen;           /* length of bulk argument in multi bulk request */ //读入参数长度
        list *reply;
        unsigned long reply_bytes; /* Tot bytes of objects in reply list */
        int sentlen;            /* Amount of bytes already sent in the current //已发送回复的字节数
                                   buffer or object being sent. */
        time_t ctime;           /* Client creation time */
        time_t lastinteraction; /* time of the last interaction, used for timeout */
        time_t obuf_soft_limit_reached_time;
        int flags;              /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */  //客户端状态
        int authenticated;      /* when requirepass is non-NULL */
        int replstate;          /* replication state if this is a slave */
        int repl_put_online_on_ack; /* Install slave write handler on ACK. */
        int repldbfd;           /* replication DB file descriptor */
        off_t repldboff;        /* replication DB file offset */
        off_t repldbsize;       /* replication DB file size */
        sds replpreamble;       /* replication DB preamble. */
        long long reploff;      /* replication offset if this is our master */
        long long repl_ack_off; /* replication ack offset, if this is a slave */
        long long repl_ack_time;/* replication ack time, if this is a slave */
        long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
                                           copying this slave output buffer
                                           should use. */
        char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */
        int slave_listening_port; /* As configured with: SLAVECONF listening-port */
        int slave_capa;         /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
        multiState mstate;      /* MULTI/EXEC state */
        int btype;              /* Type of blocking op if REDIS_BLOCKED. */
        blockingState bpop;     /* blocking state */
        long long woff;         /* Last write global replication offset. */
        list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
        dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
        list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */
        sds peerid;             /* Cached peer ID. */
    
        /* Response buffer */
        int bufpos;
        char buf[REDIS_REPLY_CHUNK_BYTES]; //保存执行完命令所得命令回复信息的静态缓冲区,主要保存固定长度的命令回复,档处理一些返回大量回复的命令,则会将命令回复以链表形式连接起来
    } redisClient;
    • 一条命令的执行完成并且返回数据一共涉及三部分

        第一步是建立连接阶段,响应socket的建立,并创建了client对象;

        第二步是处理阶段,从socket读取数据到输入缓冲区,然后解析并获得命令,执行命令并将返回值存储到输出缓冲区; 

        第三步是数据返回阶段,将返回值从输入缓冲区写到socket中,返回给客户端,最后关闭client。

      这三个阶段之间是通过事件机制串联了,在Redis启动阶段首先要注册socket连接建立事件处理器:

      1. 当客户端发来socket的连接请求时,对应的处理器方法会被执行,建立连接阶段的相关处理就会进行,然后注册socket写事件处理器;
      2. 当客户端发来命令时,读取事件处理器的方法会被执行,对应处理阶段的相关逻辑都会被执行,然后注册socket写事件处理器;
      3. 当写事件处理器被执行时,就是将返回值写回到socket中。  

       1.启动时监听socket

          redis服务器启动时,会调用initServer方法,首先建立Redis自己的事件机制,eventLoop,然后在其上注册周期时间事件处理器,最后再坚挺socket上创建文件事件处理器,监听socket建立 连接事件,处理函数时acceptTcpHandler。    

    void initServer(void) {
        int j;
    
        signal(SIGHUP, SIG_IGN);
        signal(SIGPIPE, SIG_IGN);
        setupSignalHandlers();
    
        if (server.syslog_enabled) {
            openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
                server.syslog_facility);
        }
    
        server.pid = getpid();
        server.current_client = NULL;
        server.clients = listCreate();
        server.clients_to_close = listCreate();
        server.slaves = listCreate();
        server.monitors = listCreate();
        server.slaveseldb = -1; /* Force to emit the first SELECT command. */
        server.unblocked_clients = listCreate();
        server.ready_keys = listCreate();
        server.clients_waiting_acks = listCreate();
        server.get_ack_from_slaves = 0;
        server.clients_paused = 0;
    
        createSharedObjects();
        adjustOpenFilesLimit();
        //创建eventLoop
        server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
        server.db = zmalloc(sizeof(redisDb)*server.dbnum);
    
        /* Open the TCP listening socket for the user commands. */
        if (server.port != 0 &&
            listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
            exit(1);
    
        /* Open the listening Unix domain socket. */
        if (server.unixsocket != NULL) {
            unlink(server.unixsocket); /* don't care if this fails */
            server.sofd = anetUnixServer(server.neterr,server.unixsocket,
                server.unixsocketperm, server.tcp_backlog);
            if (server.sofd == ANET_ERR) {
                redisLog(REDIS_WARNING, "Opening Unix socket: %s", server.neterr);
                exit(1);
            }
            anetNonBlock(NULL,server.sofd);
        }
    
        /* Abort if there are no listening sockets at all. */
        if (server.ipfd_count == 0 && server.sofd < 0) {
            redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting.");
            exit(1);
        }
    
        /* Create the Redis databases, and initialize other internal state. */
        for (j = 0; j < server.dbnum; j++) {
            server.db[j].dict = dictCreate(&dbDictType,NULL);
            server.db[j].expires = dictCreate(&keyptrDictType,NULL);
            server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
            server.db[j].ready_keys = dictCreate(&setDictType,NULL);
            server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
            server.db[j].eviction_pool = evictionPoolAlloc();
            server.db[j].id = j;
            server.db[j].avg_ttl = 0;
        }
        server.pubsub_channels = dictCreate(&keylistDictType,NULL);
        server.pubsub_patterns = listCreate();
        listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
        listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
        server.cronloops = 0;
        server.rdb_child_pid = -1;
        server.aof_child_pid = -1;
        server.rdb_child_type = REDIS_RDB_CHILD_TYPE_NONE;
        aofRewriteBufferReset();
        server.aof_buf = sdsempty();
        server.lastsave = time(NULL); /* At startup we consider the DB saved. */
        server.lastbgsave_try = 0;    /* At startup we never tried to BGSAVE. */
        server.rdb_save_time_last = -1;
        server.rdb_save_time_start = -1;
        server.dirty = 0;
        resetServerStats();
        /* A few stats we don't want to reset: server startup time, and peak mem. */
        server.stat_starttime = time(NULL);
        server.stat_peak_memory = 0;
        server.resident_set_size = 0;
        server.lastbgsave_status = REDIS_OK;
        server.aof_last_write_status = REDIS_OK;
        server.aof_last_write_errno = 0;
        server.repl_good_slaves_count = 0;
        updateCachedTime();
        /* Create the serverCron() time event, that's our main way to process
         * background operations. */
        //注册时间事件,处理后台操作,比如客户端操作,过期键等
        if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
            redisPanic("Can't create the serverCron time event.");
            exit(1);
        }
    
        /* Create an event handler for accepting new connections in TCP and Unix
         * domain sockets. */
        //为所有监听的socket创建文件事件,监听刻度事件,事件处理函数为acceptTcpHandler
        for (j = 0; j < server.ipfd_count; j++) {
            if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
                acceptTcpHandler,NULL) == AE_ERR)
                {
                    redisPanic(
                        "Unrecoverable error creating server.ipfd file event.");
                }
        }
        if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
            acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
    
        /* Open the AOF file if needed. */
        if (server.aof_state == REDIS_AOF_ON) {
            server.aof_fd = open(server.aof_filename,
                                   O_WRONLY|O_APPEND|O_CREAT,0644);
            if (server.aof_fd == -1) {
                redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
                    strerror(errno));
                exit(1);
            }
        }
    
        /* 32 bit instances are limited to 4GB of address space, so if there is
         * no explicit limit in the user provided configuration we set a limit
         * at 3 GB using maxmemory with 'noeviction' policy'. This avoids
         * useless crashes of the Redis instance for out of memory. */
        if (server.arch_bits == 32 && server.maxmemory == 0) {
            redisLog(REDIS_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");
            server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
            server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION;
        }
    
        if (server.cluster_enabled) clusterInit();
        replicationScriptCacheInit();
        scriptingInit();
        slowlogInit();
        latencyMonitorInit();
        bioInit();
    }

          2.建立连接和client

        当客户端向Redis建立socket时,aeeventLoop会调用acceptTcpHandler处理函数,服务器会为每个连接创建一个client对象,并创建相应的文件事件来监听socket刻度事件,并制定事件处理函数。

        acceptTcpHandler函数会首先调用anetTcpAccept方法,他底层会调用socket的accept方法,也就是接受客户端来的建立连接请求,然后调用acceptCommonHandler方法,继续后续的逻辑处理。

    #define MAX_ACCEPTS_PER_CALL 1000
    static void acceptCommonHandler(int fd, int flags) {
        redisClient *c;
        //创建client 
        if ((c = createClient(fd)) == NULL) {
            redisLog(REDIS_WARNING,
                "Error registering fd event for the new client: %s (fd=%d)",
                strerror(errno),fd);
            close(fd); /* May be already closed, just ignore errors */
            return;
        }
        /* If maxclient directive is set and this is one client more... close the
         * connection. Note that we create the client instead to check before
         * for this condition, since now the socket is already set in non-blocking
         * mode and we can send an error for free using the Kernel I/O */
        //判断当前client的数量是否超出了配置的最大连接 maxclients
        if (listLength(server.clients) > server.maxclients) {
            char *err = "-ERR max number of clients reached
    ";
    
            /* That's a best effort error message, don't check write errors */
            if (write(c->fd,err,strlen(err)) == -1) {
                /* Nothing to do, Just to avoid the warning... */
            }
            server.stat_rejected_conn++;
            freeClient(c);
            return;
        }
        server.stat_numconnections++;
        c->flags |= flags;
    }
    
    void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
        int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
        char cip[REDIS_IP_STR_LEN];
        REDIS_NOTUSED(el);
        REDIS_NOTUSED(mask);
        REDIS_NOTUSED(privdata);
        //最多可以处理MAX_ACCEPTS_PER_CALL=1000次的新连接接入
        while(max--) {
            //底层调用socker 的sccept方法 接收客户端来的建立连接请求
            cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
            if (cfd == ANET_ERR) {
                if (errno != EWOULDBLOCK)
                    redisLog(REDIS_WARNING,
                        "Accepting client connection: %s", server.neterr);
                return;
            }
            redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
            //处理后续逻辑
            acceptCommonHandler(cfd,0);
        }
    }

        当客户端连接上服务器,服务器就会创建一个redisClient结构,用来保存客户端状态,每个客户端都有各自的输入缓冲区和输出缓冲区,输入缓冲区存储存储客户端通过socket发送过来的数据,输出缓冲区则存储着Redis对客户端的响应数据,

    redisClient *createClient(int fd) {
        redisClient *c = zmalloc(sizeof(redisClient));
    
        /* passing -1 as fd it is possible to create a non connected client.
         * This is useful since all the Redis commands needs to be executed
         * in the context of a client. When commands are executed in other
         * contexts (for instance a Lua script) we need a non connected client. */
        //fd = -1 表示创建的是一个无网络链接的伪客户端 用于执行lua脚本
        //如果fd != -1 表示创建一个有网络链接的客户端
        if (fd != -1) {
            //设置fd为非阻塞
            anetNonBlock(NULL,fd);
            //禁止使用 Nagle算法 client向内核提交的没个数据包都会理解发送到server TCP_NODELAY
    
            anetEnableTcpNoDelay(NULL,fd);
            //开启tcpkeepalive 设置SO_KEEPALIVE
            if (server.tcpkeepalive)
                //设置tcp链接的keep alive选项
                anetKeepAlive(NULL,fd,server.tcpkeepalive);
            //创建一个文件事件状态el 且监听读事件 开始接受命令的输入
            if (aeCreateFileEvent(server.el,fd,AE_READABLE,
                readQueryFromClient, c) == AE_ERR)
            {
                close(fd);
                zfree(c);
                return NULL;
            }
        }
        selectDb(c,0);
        c->id = server.next_client_id++;
        c->fd = fd;
        c->name = NULL;
        c->bufpos = 0;
        c->querybuf = sdsempty();
        c->querybuf_peak = 0;
        c->reqtype = 0;
        c->argc = 0;
        c->argv = NULL;
        c->cmd = c->lastcmd = NULL;
        c->multibulklen = 0;
        c->bulklen = -1;
        c->sentlen = 0;
        c->flags = 0;
        c->ctime = c->lastinteraction = server.unixtime;
        c->authenticated = 0;
        c->replstate = REDIS_REPL_NONE;
        c->repl_put_online_on_ack = 0;
        c->reploff = 0;
        c->repl_ack_off = 0;
        c->repl_ack_time = 0;
        c->slave_listening_port = 0;
        c->slave_capa = SLAVE_CAPA_NONE;
        c->reply = listCreate();
        c->reply_bytes = 0;
        c->obuf_soft_limit_reached_time = 0;
        listSetFreeMethod(c->reply,decrRefCountVoid);
        listSetDupMethod(c->reply,dupClientReplyValue);
        c->btype = REDIS_BLOCKED_NONE;
        c->bpop.timeout = 0;
        c->bpop.keys = dictCreate(&setDictType,NULL);
        c->bpop.target = NULL;
        c->bpop.numreplicas = 0;
        c->bpop.reploffset = 0;
        c->woff = 0;
        c->watched_keys = listCreate();
        c->pubsub_channels = dictCreate(&setDictType,NULL);
        c->pubsub_patterns = listCreate();
        c->peerid = NULL;
        listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
        listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
        if (fd != -1) listAddNodeTail(server.clients,c);
        initClientMultiState(c);
        return c;
    }

         3.读取socket数据到输入缓冲区

         readQueryFromClient会调用read方法读取数据到输入缓冲区。

    //读取client的输入缓冲区的内容
    //
    void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
        redisClient *c = (redisClient*) privdata;
        int nread, readlen;
        size_t qblen;
        REDIS_NOTUSED(el);
        REDIS_NOTUSED(mask);
    
        server.current_client = c;
        //读入长度默认16MB
        readlen = REDIS_IOBUF_LEN;
        /* If this is a multi bulk request, and we are processing a bulk reply
         * that is large enough, try to maximize the probability that the query
         * buffer contains exactly the SDS string representing the object, even
         * at the risk of requiring more read(2) calls. This way the function
         * processMultiBulkBuffer() can avoid copying buffers to create the
         * Redis Object representing the argument. */
        //如果是多条请求 根据请求的大小 设置读入的长度readlen
        //bulklen 标识读入参数的长度
        if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
            && c->bulklen >= REDIS_MBULK_BIG_ARG)
        {
            int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
    
            if (remaining < readlen) readlen = remaining;
        }
        //输入缓冲区的长度
        qblen = sdslen(c->querybuf);
        //更新缓冲区的峰值
        if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
        //扩展缓冲区的大小
        c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
        //将client发来的命令  读入到输入缓冲区中
        nread = read(fd, c->querybuf+qblen, readlen);
        if (nread == -1) {
            if (errno == EAGAIN) {
                nread = 0;
            } else {
                redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
                freeClient(c);
                return;
            }
        } else if (nread == 0) {
            redisLog(REDIS_VERBOSE, "Client closed connection");
            freeClient(c);
            return;
        }
        //读操作完成
        if (nread) {
            //更新输入缓冲区的已用大小和未用大小
            sdsIncrLen(c->querybuf,nread);
            //设置最后一次服务器与client的交互时间
            c->lastinteraction = server.unixtime;
            //如果是主节点,则更新复制操作的偏移量
            if (c->flags & REDIS_MASTER) c->reploff += nread;
            //更新从网络输入的字节数
            server.stat_net_input_bytes += nread;
        } else {
            server.current_client = NULL;
            return;
        }
        //如果输入缓冲区长度超过服务器设置的最大缓冲区长度
        if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
            //将client信息转换成sds
            sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
            //输入缓冲区保存在bytes中
            bytes = sdscatrepr(bytes,c->querybuf,64);
            //打印到日志
            redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
            //释放空间
            sdsfree(ci);
            sdsfree(bytes);
            freeClient(c);
            return;
        }
        processInputBuffer(c);
        server.current_client = NULL;
    }

        函数readQueryFromClient从文件描述符fd中读取客户端的数据,read读取的最大字节数是16k,并保留在querybuf中,并更新缓冲区的峰值,如果输入缓冲区长度超过服务器设置的最大缓冲区长度,则会退出,server.client_max_querybuf_len在服务器初始化时设置为PROTO_MAX_QUERYBUF_LEN (1024*1024*1024)也就是1G大小。

        将数据读取到输入缓冲区后,在redis版本小于4.0的版本中,读取到数据不会根据client的类型做不同的处理,版本大于等于4.0的redis,会根据client的不同类型进行处理,如果时普通类型直接调用processInputBuffer,如果client时master的连接,还需将自服务器传来的命令执行后,继续将命令同步给自己的从服务器。

    //redis 4.0.14 部分代码
    /* Time to process the buffer. If the client is a master we need to
         * compute the difference between the applied offset before and after
         * processing the buffer, to understand how much of the replication stream
         * was actually applied to the master state: this quantity, and its
         * corresponding part of the replication stream, will be propagated to
         * the sub-slaves and to the replication backlog. */
        if (!(c->flags & CLIENT_MASTER)) {
            processInputBuffer(c);
        } else {
            //如果client时master的连接
            size_t prev_offset = c->reploff;
            processInputBuffer(c);
            //判断是否同步偏移量发生变化,则通知slave
            size_t applied = c->reploff - prev_offset;
            if (applied) {
                replicationFeedSlavesFromMasterStream(server.slaves,
                        c->pending_querybuf, applied);
                sdsrange(c->pending_querybuf,applied,-1);
            }
        }

        4.解析输入缓冲区的命令

          processInputBuffer主要是将输入缓冲区中的数据解析成相应的命令,根据命令是REDIS_REQ_MULTIBULK(redis-cli)或REDIS_REQ_INLINE(telnet)分别调用函数processInlineBuffer或processMultibulkBuffer

          

    void processInputBuffer(redisClient *c) {
        /* Keep processing while there is something in the input buffer */
        //一直读输入缓冲区的内容
        while(sdslen(c->querybuf)) {
            /* Return if clients are paused. */
            //如果处于暂停状态  直接返回
            if (!(c->flags & REDIS_SLAVE) && clientsArePaused()) return;
            
            /* Immediately abort if the client is in the middle of something. */
            //如果client处于被阻塞状态 直接返回
            if (c->flags & REDIS_BLOCKED) return;
            
            /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
             * written to the client. Make sure to not let the reply grow after
             * this flag has been set (i.e. don't process more commands). */
            //如果client处于关闭状态 直接返回
            if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
            
            /* Determine request type when unknown. */
            //如果是未知请求类型 则判定请求类型
            if (!c->reqtype) {
                //如果是*开头 则是多条请求 是client发来的
                if (c->querybuf[0] == '*') {
                    c->reqtype = REDIS_REQ_MULTIBULK;
                } else {
                //否则就是内联请求  是telnet发来的
                    c->reqtype = REDIS_REQ_INLINE;
                }   
            }   
            //如果是内联请求
            if (c->reqtype == REDIS_REQ_INLINE) {
                //处理telnet发来的内联请求 并创建成对象 保存在client的参数列表中
                if (processInlineBuffer(c) != REDIS_OK) break;
            } else if (c->reqtype == REDIS_REQ_MULTIBULK) {
                //将client的querybuf中的协议内容转换成client的参数列表中的对象
                if (processMultibulkBuffer(c) != REDIS_OK) break;
            } else {
                redisPanic("Unknown request type");
            }   
            
            /* Multibulk processing could see a <= 0 length. */
            //如果参数是0  则重置client 
           if (c->argc == 0) {
                resetClient(c);
            } else {
                //执行命令成功后重置
                //Only reset the client when the command was executed. */
                if (processCommand(c) == REDIS_OK)
                    resetClient(c);
            }       
        }   
    }

        5.执行命令

          processCommand方法会处理很多逻辑,大致分为三部分,首先是调用lookupCommand方法获得对应的redisCommand,接着时检测当前redis是否可以执行命令,最后是调用call方法真正执行命令。

        processCommand会做如下逻辑处理:

      1. 如果命令是quit,则直接返回,并设置客户端标志位;
      2. 根据argv[0]从数据库的字典中查找对应的redisCommand,所有的命令都存储在命令字典redisCommandTable中,根据命令名称可以获取对应的命令;
      3. 进行用户权限校验;
      4. 如果是集群模式,处理集群重定向。当命令发送者是master或者命令没有任何key的参数时,可以不重定向;
      5. 预防maxmemory情况,先尝试回首一下,如果不行,则返回异常;
      6. 当此服务器是master时:aof持久化失败时,或上一次bgsave执行错误,且配置bgsave参数和stop_writes_on_bgsave_err,禁止执行写命令;
      7. 当此服务器是master时,如果配置了server.repl_min_slaves_to_write,当slave数目小于时,禁止执行写命令;
      8. 当时只读slave时,除了master不接受其他写命令;
      9. 当客户端正在订阅频道时,只会执行部分命令;
      10. 当服务器为slave,但是没有连接master时,只会执行带有REDIS_CMD_STALE标志的命令,如info;
      11. 正在加载数据库时,只会执行带有REDIS_CMD_LOADING标志的命令,其他都会拒绝;
      12. 当服务器因为执行lua脚本阻塞时,只会执行部分命令,其他都会拒绝;
      13. 如果是事务命令,则开启事务,命令进入等待队列,否则直接执行命令。
    /* If this function gets called we already read a whole
     * command, arguments are in the client argv/argc fields.
     * processCommand() execute the command or prepare the
     * server for a bulk read from the client.
     *
     * If 1 is returned the client is still alive and valid and
     * other operations can be performed by the caller. Otherwise
     * if 0 is returned the client was destroyed (i.e. after QUIT). */
    int processCommand(redisClient *c) {
        /* The QUIT command is handled separately. Normal command procs will
         * go through checking for replication and QUIT will cause trouble
         * when FORCE_REPLICATION is enabled and would be implemented in
         * a regular command proc. */
        //1.如果命令是quit,则直接返回,并设置客户端标志位;
        if (!strcasecmp(c->argv[0]->ptr,"quit")) {
            addReply(c,shared.ok);
            c->flags |= REDIS_CLOSE_AFTER_REPLY;
            return REDIS_ERR;
        }
    
        /* Now lookup the command and check ASAP about trivial error conditions
         * such as wrong arity, bad command name and so forth. */
        //2.从数据库的字典中查找命令
        c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
        //命令不存在
        if (!c->cmd) {
            flagTransaction(c);   //如果是事务状态的命令 则设置事务为失败
            addReplyErrorFormat(c,"unknown command '%s'",
                (char*)c->argv[0]->ptr);
            return REDIS_OK;
        } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
                   (c->argc < -c->cmd->arity)) {
            flagTransaction(c);  //如果是事务状态的命令 则设置事务为失败
            addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
                c->cmd->name);
            return REDIS_OK;
        }
    
        /* Check if the user is authenticated */
        //3.如果实例设置了密码 但是校验失败
        if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
        {
            flagTransaction(c);
            addReply(c,shared.noautherr);
            return REDIS_OK;
        }
        
        /* If cluster is enabled perform the cluster redirection here.
         * However we don't perform the redirection if:
         * 1) The sender of this command is our master.
         * 2) The command has no key arguments. */
        //4.如果是集群模式,处理集群重定向
        //如果开启了集群模式 则执行集群的定向操作 下面两种情况例外
        //1.命令的发送是主节点服务器
        //2.命令没有key
        if (server.cluster_enabled &&
            !(c->flags & REDIS_MASTER) &&
            !(c->flags & REDIS_LUA_CLIENT &&
              server.lua_caller->flags & REDIS_MASTER) &&
            !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
        {
            int hashslot;
    
            if (server.cluster->state != REDIS_CLUSTER_OK) {
                flagTransaction(c);
                clusterRedirectClient(c,NULL,0,REDIS_CLUSTER_REDIR_DOWN_STATE);
                return REDIS_OK;
            } else {
                int error_code;
                //从集群中返回一个能够执行命令的节点
                clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
                if (n == NULL || n != server.cluster->myself) {
                    flagTransaction(c);
                    //将事务状态置为失败
                    //执行client重定向操作
                    clusterRedirectClient(c,n,hashslot,error_code);
                    return REDIS_OK;
                }
            }
        }
    
        /* Handle the maxmemory directive.
         *
         * First we try to free some memory if possible (if there are volatile
         * keys in the dataset). If there are not the only thing we can do
         * is returning an error. */
        //5.按需释放一部分内存
        if (server.maxmemory) {
            
            int retval = freeMemoryIfNeeded();
            if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
                flagTransaction(c);
                addReply(c, shared.oomerr);
                return REDIS_OK;
            }
        }
        /* Don't accept write commands if there are problems persisting on disk
         * and if this is a master instance. */
         //6.当此服务器是master时:aof持久化失败时,或上一次bgsave执行错误,且配置bgsave参数和stop_writes_on_bgsave_err,禁止执行写命令;
        if (((server.stop_writes_on_bgsave_err &&
              server.saveparamslen > 0 &&
              server.lastbgsave_status == REDIS_ERR) ||
              server.aof_last_write_status == REDIS_ERR) &&
            server.masterhost == NULL &&
            (c->cmd->flags & REDIS_CMD_WRITE ||
             c->cmd->proc == pingCommand))
        {
            flagTransaction(c);
            if (server.aof_last_write_status == REDIS_OK)
                addReply(c, shared.bgsaveerr);
            else
                addReplySds(c,
                    sdscatprintf(sdsempty(),
                    "-MISCONF Errors writing to the AOF file: %s
    ",
                    strerror(server.aof_last_write_errno)));
            return REDIS_OK;
        }
    
        /* Don't accept write commands if there are not enough good slaves and
         * user configured the min-slaves-to-write option. */
        //7.当此服务器是master,并且配置了repl_min_slaves_to_write 当slave数小于,禁止写
        if (server.masterhost == NULL &&
            server.repl_min_slaves_to_write &&
            server.repl_min_slaves_max_lag &&
            c->cmd->flags & REDIS_CMD_WRITE &&
            server.repl_good_slaves_count < server.repl_min_slaves_to_write)
        {
            flagTransaction(c);
            addReply(c, shared.noreplicaserr);
            return REDIS_OK;
        }
    
        /* Don't accept write commands if this is a read only slave. But
         * accept write commands if this is our master. */
        //8.当是只读,除了master不接受其他写命令
        if (server.masterhost && server.repl_slave_ro &&
            !(c->flags & REDIS_MASTER) &&
            c->cmd->flags & REDIS_CMD_WRITE)
        {
            addReply(c, shared.roslaveerr);
            return REDIS_OK;
        }
        /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
        //9.当客户端正在订阅频道,只会执行部分命令
        if (c->flags & REDIS_PUBSUB &&
            c->cmd->proc != pingCommand &&
            c->cmd->proc != subscribeCommand &&
            c->cmd->proc != unsubscribeCommand &&
            c->cmd->proc != psubscribeCommand &&
            c->cmd->proc != punsubscribeCommand) {
            addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context");
            return REDIS_OK;
        }
    
        /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and
         * we are a slave with a broken link with master. */
        //10.服务器是slave 但是没有连接master 只会执行带有REDIS_CMD_STALE的命令,如info  其他命令会被拒绝
        if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED &&
            server.repl_serve_stale_data == 0 &&
            !(c->cmd->flags & REDIS_CMD_STALE))
        {
            flagTransaction(c);
            addReply(c, shared.masterdownerr);
            return REDIS_OK;
        }
    
        /* Loading DB? Return an error if the command has not the
         * REDIS_CMD_LOADING flag. */
        //11. 正在加载数据库时,只会执行代有REDIS_CMD_LOADING标志的命令 其他都会拒绝
        if (server.loading && !(c->cmd->flags & REDIS_CMD_LOADING)) {
            addReply(c, shared.loadingerr);
            return REDIS_OK;
        }
    
        /* Lua script too slow? Only allow a limited number of commands. */
        //12 服务器因为执行lua脚本阻塞时 只会执行部分命令 其他都会拒绝
        if (server.lua_timedout &&
              c->cmd->proc != authCommand &&
              c->cmd->proc != replconfCommand &&
            !(c->cmd->proc == shutdownCommand &&
              c->argc == 2 &&
              tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
            !(c->cmd->proc == scriptCommand &&
              c->argc == 2 &&
              tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
        {
            flagTransaction(c);
            addReply(c, shared.slowscripterr);
            return REDIS_OK;
        }
        /* Exec the command */
        //13 如果时事务命令,则开启事务,命令进入等待队列,都则直接执行命令
        if (c->flags & REDIS_MULTI &&
            c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
            c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
        {
            queueMultiCommand(c);
            addReply(c,shared.queued);
        } else {
            call(c,REDIS_CALL_FULL);
            c->woff = server.master_repl_offset;
            if (listLength(server.ready_keys))
                handleClientsBlockedOnLists();
        }
        return REDIS_OK;
    }
  • 相关阅读:
    Populating Next Right Pointers in Each Node II
    Populating Next Right Pointers in Each Node
    Construct Binary Tree from Preorder and Inorder Traversal
    Construct Binary Tree from Inorder and Postorder Traversal
    Path Sum
    Symmetric Tree
    Solve Tree Problems Recursively
    632. Smallest Range(priority_queue)
    609. Find Duplicate File in System
    poj3159最短路spfa+邻接表
  • 原文地址:https://www.cnblogs.com/chenyang920/p/13369383.html
Copyright © 2011-2022 走看看