zoukankan      html  css  js  c++  java
  • 第10课:[实战] Redis 网络通信模块源码分析(3)

    redis-server 接收到客户端的第一条命令

      redis-cli 给 redis-server 发送的第一条数据是 *1 $7 COMMAND 。我们来看下对于这条数据如何处理,单步调试一下 readQueryFromClient 调用 read 函数收取完数据,接着继续处理 c→querybuf 的代码即可。经实际跟踪调试,调用的是 processInputBuffer 函数,位于 networking.c 文件中:

    /* This function is called every time, in the client structure 'c', there is
     * more query buffer to process, because we read more data from the socket
     * or because a client was blocked and later reactivated, so there could be
     * pending query buffer, already representing a full command, to process. */
    void processInputBuffer(client *c) {
        server.current_client = c;
        /* Keep processing while there is something in the input buffer */
        while(sdslen(c->querybuf)) {
            /* Return if clients are paused. */
            if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
    
            /* Immediately abort if the client is in the middle of something. */
            if (c->flags & CLIENT_BLOCKED) break;
    
            /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
             * written to the client. Make sure to not let the reply grow after
             * this flag has been set (i.e. don't process more commands).
             *
             * The same applies for clients we want to terminate ASAP. */
            if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
    
            /* Determine request type when unknown. */
            if (!c->reqtype) {
                if (c->querybuf[0] == '*') {
                    c->reqtype = PROTO_REQ_MULTIBULK;
                } else {
                    c->reqtype = PROTO_REQ_INLINE;
                }
            }
    
            if (c->reqtype == PROTO_REQ_INLINE) {
                if (processInlineBuffer(c) != C_OK) break;
            } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
                if (processMultibulkBuffer(c) != C_OK) break;
            } else {
                serverPanic("Unknown request type");
            }
    
            /* Multibulk processing could see a <= 0 length. */
            if (c->argc == 0) {
                resetClient(c);
            } else {
                /* Only reset the client when the command was executed. */
                if (processCommand(c) == C_OK) {
                    if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
                        /* Update the applied replication offset of our master. */
                        c->reploff = c->read_reploff - sdslen(c->querybuf);
                    }
    
                    /* Don't reset the client structure for clients blocked in a
                     * module blocking command, so that the reply callback will
                     * still be able to access the client argv and argc field.
                     * The client will be reset in unblockClientFromModule(). */
                    if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
                        resetClient(c);
                }
                /* freeMemoryIfNeeded may flush slave output buffers. This may
                 * result into a slave, that may be the active client, to be
                 * freed. */
                if (server.current_client == NULL) break;
            }
        }
        server.current_client = NULL;
    }
    

      processInputBuffer 先判断接收到的字符串是不是以星号( * )开头,这里是以星号开头,然后设置 client 对象的 reqtype 字段值为 PROTO_REQ_MULTIBULK 类型,接着调用 processMultibulkBuffer 函数继续处理剩余的字符串。处理后的字符串被解析成 redis 命令,记录在 client 对象的 argc 和 argv 两个字段中,前者记录当前命令的数目,后者存储的是命令对应结构体对象的地址。这些命令的相关内容不是我们本课程的关注点,不再赘述。

      命令解析完成以后,从 processMultibulkBuffer 函数返回,在 processCommand 函数中处理刚才记录在 client 对象 argv 字段中的命令。

      

    //为了与原代码保持一致,代码缩进未调整
    if (c->argc == 0) {
                resetClient(c);
            } else {
                /* Only reset the client when the command was executed. */
                if (processCommand(c) == C_OK) {
                    //省略部分代码
                }
    
            }
    

      

    在 processCommand 函数中处理命令,流程大致如下:

    (1)先判断是不是 quit 命令,如果是,则往发送缓冲区中添加一条应答命令( 应答 redis 客户端 ),并给当前 client 对象设置 CLIENT_CLOSE_AFTER_REPLY 标志,这个标志见名知意,即应答完毕后关闭连接。

    (2)如果不是 quit 命令,则使用 lookupCommand 函数从全局命令字典表中查找相应的命令,如果出错,则向发送缓冲区中添加出错应答。出错不是指程序逻辑出错,有可能是客户端发送的非法命令。如果找到相应的命令,则执行命令后添加应答。

    int processCommand(client *c) {
        /* The QUIT command is handled separately. Normal command procs will
         * go through checking for replication and QUIT will cause trouble
         * when FORCE_REPLICATION is enabled and would be implemented in
         * a regular command proc. */
        if (!strcasecmp(c->argv[0]->ptr,"quit")) {
            addReply(c,shared.ok);
            c->flags |= CLIENT_CLOSE_AFTER_REPLY;
            return C_ERR;
        }
    
        /* Now lookup the command and check ASAP about trivial error conditions
         * such as wrong arity, bad command name and so forth. */
        c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
        if (!c->cmd) {
            flagTransaction(c);
            addReplyErrorFormat(c,"unknown command '%s'",
                (char*)c->argv[0]->ptr);
            return C_OK;
        } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
                   (c->argc < -c->cmd->arity)) {
            flagTransaction(c);
            addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
                c->cmd->name);
            return C_OK;
        }
    
        //...省略部分代码
    
    }
    

      全局字典表是前面介绍的 server 全局变量(类型是 redisServer)的一个字段 commands 。

    struct redisServer {
        /* General */
        pid_t pid;                  /* Main process pid. */
        //无关字段省略
        dict *commands;             /* Command table */
    
        //无关字段省略
    }
    

      

      至于这个全局字典表在哪里初始化以及相关的数据结构类型,由于与本课程主题无关,这里就不分析了。

      下面重点探究如何将应答命令(包括出错的应答)添加到发送缓冲区去。我们以添加一个“ok”命令为例:

    void addReply(client *c, robj *obj) {
        if (prepareClientToWrite(c) != C_OK) return;
    
        /* This is an important place where we can avoid copy-on-write
         * when there is a saving child running, avoiding touching the
         * refcount field of the object if it's not needed.
         *
         * If the encoding is RAW and there is room in the static buffer
         * we'll be able to send the object to the client without
         * messing with its page. */
        if (sdsEncodedObject(obj)) {
            if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
                _addReplyObjectToList(c,obj);
        } else if (obj->encoding == OBJ_ENCODING_INT) {
            /* Optimization: if there is room in the static buffer for 32 bytes
             * (more than the max chars a 64 bit integer can take as string) we
             * avoid decoding the object and go for the lower level approach. */
            if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) {
                char buf[32];
                int len;
    
                len = ll2string(buf,sizeof(buf),(long)obj->ptr);
                if (_addReplyToBuffer(c,buf,len) == C_OK)
                    return;
                /* else... continue with the normal code path, but should never
                 * happen actually since we verified there is room. */
            }
            obj = getDecodedObject(obj);
            if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
                _addReplyObjectToList(c,obj);
            decrRefCount(obj);
        } else {
            serverPanic("Wrong obj->encoding in addReply()");
        }
    }
    

      addReply 函数中有两个关键的地方,一个是 prepareClientToWrite 函数调用,另外一个是 _addReplyToBuffer 函数调用。先来看 prepareClientToWrite ,这个函数中有这样一段代码:

      

    if (!clientHasPendingReplies(c) &&
            !(c->flags & CLIENT_PENDING_WRITE) &&
            (c->replstate == REPL_STATE_NONE ||
             (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
        {
            /* Here instead of installing the write handler, we just flag the
             * client and put it into a list of clients that have something
             * to write to the socket. This way before re-entering the event
             * loop, we can try to directly write to the client sockets avoiding
             * a system call. We'll only really install the write handler if
             * we'll not be able to write the whole reply at once. */
            c->flags |= CLIENT_PENDING_WRITE;
            listAddNodeHead(server.clients_pending_write,c);
        }
    

      这段代码先判断发送缓冲区中是否还有未发送的应答命令——通过判断 client 对象的 bufpos 字段( int 型 )和 reply 字段( 这是一个链表 )的长度是否大于 0 。

    /* Return true if the specified client has pending reply buffers to write to
     * the socket. */
    int clientHasPendingReplies(client *c) {
        return c->bufpos || listLength(c->reply);
    }
    

      

      如果当前 client 对象不是处于 CLIENT_PENDING_WRITE 状态,且在发送缓冲区没有剩余数据,则给该 client 对象设置 CLIENT_PENDING_WRITE 标志,并将当前 client 对象添加到全局 server 对象的名叫 clients_pending_write 链表中去。这个链表中存的是所有有数据要发送的 client 对象,注意和上面说的 reply 链表区分开来。

      关于 CLIENT_PENDING_WRITE 标志,redis 解释是:

      Client has output to send but a write handler is yet not installed

      

      翻译成中文就是:一个有数据需要发送,但是还没有注册可写事件的 client 对象。

      下面讨论 _addReplyToBuffer 函数,位于 networking.c 文件中。

    int _addReplyToBuffer(client *c, const char *s, size_t len) {
        size_t available = sizeof(c->buf)-c->bufpos;
    
        if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;
    
        /* If there already are entries in the reply list, we cannot
         * add anything more to the static buffer. */
        if (listLength(c->reply) > 0) return C_ERR;
    
        /* Check that the buffer has enough space available for this string. */
        if (len > available) return C_ERR;
    
        memcpy(c->buf+c->bufpos,s,len);
        c->bufpos+=len;
        return C_OK;
    }
    

      在这个函数中再次确保了 client 对象的 reply 链表长度不能大于 0( if 判断,如果不满足条件,则退出该函数 )。reply 链表存储的是待发送的应答命令。应答命令被存储在 client 对象的 buf 字段中,其长度被记录在 bufpos 字段中。buf 字段是一个固定大小的字节数组:

      

    typedef struct client {
        uint64_t id;            /* Client incremental unique ID. */
        int fd;                 /* Client socket. */
        redisDb *db;            /* Pointer to currently SELECTed DB. */
        robj *name;             /* As set by CLIENT SETNAME. */
        sds querybuf;           /* Buffer we use to accumulate client queries. */
        sds pending_querybuf;   /* If this is a master, this buffer represents the
                                   yet not applied replication stream that we
                                   are receiving from the master. */
       //省略部分字段...
    
        /* Response buffer */
        int bufpos;
        char buf[PROTO_REPLY_CHUNK_BYTES];
    } client;
    

      

      PROTO_REPLY_CHUNK_BYTES 在 redis 中的定义是 16*1024 ,也就是说应答命令数据包最长是 16k 。

      回到我们上面提的命令:*1 $7 COMMAND ,通过 lookupCommand 解析之后得到 command 命令,在 GDB 中显示如下:

      

    2345        c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    (gdb) n
    2346        if (!c->cmd) {
    (gdb) p c->cmd
    $23 = (struct redisCommand *) 0x742db0 <redisCommandTable+13040>
    (gdb) p *c->cmd
    $24 = {name = 0x4fda67 "command", proc = 0x42d920 <commandCommand>, arity = 0, sflags = 0x50dc3e "lt", flags = 1536, getkeys_proc = 0x0, firstkey = 0, lastkey = 0, 
      keystep = 0, microseconds = 1088, calls = 1}
    

      

    如何处理可写事件

      

      前面我们介绍了 redis-server 如何处理可读事件,整个流程就是注册可读事件回调函数,在回调函数中调用操作系统 API read 函数收取数据,然后解析数据得到 redis 命令,处理命令接着将应答数据包放到 client 对象的 buf 字段中去。那么放入 buf 字段的数据何时发给客户端呢?

      还记得我们前面课程提到的 while 事件循环吗?我们再来回顾一下它的代码:

    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);
            aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
        }
    }
    

      其中,先判断 eventLoop 对象的 beforesleep 对象是否设置了,这是一个回调函数。在 redis-server 初始化时已经设置好了。

    void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
        eventLoop->beforesleep = beforesleep;
    }
    

      我们在 aeSetBeforeSleepProc 这个函数上设置一个断点,然后重启一下 redis-server 来验证在何处设置的这个回调。

    Breakpoint 2, aeSetBeforeSleepProc (eventLoop=0x7ffff083a0a0, beforesleep=beforesleep@entry=0x4294f0 <beforeSleep>) at ae.c:507
    507         eventLoop->beforesleep = beforesleep;
    (gdb) bt
    #0  aeSetBeforeSleepProc (eventLoop=0x7ffff083a0a0, beforesleep=beforesleep@entry=0x4294f0 <beforeSleep>) at ae.c:507
    #1  0x00000000004238d2 in main (argc=<optimized out>, argv=0x7fffffffe588) at server.c:3892
    

      使用 f 1 命令切换到堆栈 #1 ,并输入 l 显示断点附近的代码:

    (gdb) l
    3887        /* Warning the user about suspicious maxmemory setting. */
    3888        if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
    3889            serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
    3890        }
    3891
    3892        aeSetBeforeSleepProc(server.el,beforeSleep);
    3893        aeSetAfterSleepProc(server.el,afterSleep);
    3894        aeMain(server.el);
    3895        aeDeleteEventLoop(server.el);
    3896        return 0;
    

      3892 行将这个回调设置成 beforeSleep 函数,因此每一轮循环都会调用这个 beforeSleep 函数。server.el 前面也介绍过即 aeEventLoop 对象,在这个 beforeSleep 函数中有一个 handleClientsWithPendingWrites 调用( 位于文件 server.c 中 ):

    void beforeSleep(struct aeEventLoop *eventLoop) {
        //省略无关代码...
    
        /* Handle writes with pending output buffers. */
        handleClientsWithPendingWrites();
    
        //省略无关代码...
    }
    

      handleClientsWithPendingWrites 函数调用即把记录在每个 client 中的数据发送出去。我们具体看一下发送的逻辑( 位于 networking.c 文件中 ):

    /* This function is called just before entering the event loop, in the hope
     * we can just write the replies to the client output buffer without any
     * need to use a syscall in order to install the writable event handler,
     * get it called, and so forth. */
    int handleClientsWithPendingWrites(void) {
        listIter li;
        listNode *ln;
        int processed = listLength(server.clients_pending_write);
    
        listRewind(server.clients_pending_write,&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            c->flags &= ~CLIENT_PENDING_WRITE;
            listDelNode(server.clients_pending_write,ln);
    
            /* Try to write buffers to the client socket. */
            if (writeToClient(c->fd,c,0) == C_ERR) continue;
    
            /* If there is nothing left, do nothing. Otherwise install
             * the write handler. */
            if (clientHasPendingReplies(c) &&
                aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
                    sendReplyToClient, c) == AE_ERR)
            {
                freeClientAsync(c);
            }
        }
        return processed;
    }
    

      上面的代码先从全局 server 对象的 clients_pending_write 字段( 存储 client 对象的链表 )挨个取出有数据要发送的 client 对象,然后调用 writeToClient 函数尝试将 client 中存储的应答数据发出去。

    //位于networking.c文件中
    int writeToClient(int fd, client *c, int handler_installed) {
        ssize_t nwritten = 0, totwritten = 0;
        size_t objlen;
        sds o;
    
        while(clientHasPendingReplies(c)) {
            if (c->bufpos > 0) {
                nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
                if (nwritten <= 0) break;
                c->sentlen += nwritten;
                totwritten += nwritten;
    
                /* If the buffer was sent, set bufpos to zero to continue with
                 * the remainder of the reply. */
                if ((int)c->sentlen == c->bufpos) {
                    c->bufpos = 0;
                    c->sentlen = 0;
                }
            } else {
                o = listNodeValue(listFirst(c->reply));
                objlen = sdslen(o);
    
                if (objlen == 0) {
                    listDelNode(c->reply,listFirst(c->reply));
                    continue;
                }
    
                nwritten = write(fd, o + c->sentlen, objlen - c->sentlen);
                if (nwritten <= 0) break;
                c->sentlen += nwritten;
                totwritten += nwritten;
    
                /* If we fully sent the object on head go to the next one */
                if (c->sentlen == objlen) {
                    listDelNode(c->reply,listFirst(c->reply));
                    c->sentlen = 0;
                    c->reply_bytes -= objlen;
                    /* If there are no longer objects in the list, we expect
                     * the count of reply bytes to be exactly zero. */
                    if (listLength(c->reply) == 0)
                        serverAssert(c->reply_bytes == 0);
                }
            }
            /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
             * bytes, in a single threaded server it's a good idea to serve
             * other clients as well, even if a very large request comes from
             * super fast link that is always able to accept data (in real world
             * scenario think about 'KEYS *' against the loopback interface).
             *
             * However if we are over the maxmemory limit we ignore that and
             * just deliver as much data as it is possible to deliver. */
            if (totwritten > NET_MAX_WRITES_PER_EVENT &&
                (server.maxmemory == 0 ||
                 zmalloc_used_memory() < server.maxmemory)) break;
        }
        server.stat_net_output_bytes += totwritten;
        if (nwritten == -1) {
            if (errno == EAGAIN) {
                nwritten = 0;
            } else {
                serverLog(LL_VERBOSE,
                    "Error writing to client: %s", strerror(errno));
                freeClient(c);
                return C_ERR;
            }
        }
        if (totwritten > 0) {
            /* For clients representing masters we don't count sending data
             * as an interaction, since we always send REPLCONF ACK commands
             * that take some time to just fill the socket output buffer.
             * We just rely on data / pings received for timeout detection. */
            if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
        }
        if (!clientHasPendingReplies(c)) {
            c->sentlen = 0;
            if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
    
            /* Close connection after entire reply has been sent. */
            if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
                freeClient(c);
                return C_ERR;
            }
        }
        return C_OK;
    }
    

      

      writeToClient 函数先把自己处理的 client 对象的 buf 字段的数据发出去,如果出错的话则释放这个 client 。如果数据能够全部发完,发完以后则会移除对应的 fd 上的可写事件( 如果添加了 );如果当前 client 设置了 CLIENT_CLOSE_AFTER_REPLY 标志,则发送完数据立即释放这个 client 对象。

      当然,可能存在一种情况是,由于网络或者客户端的原因,redis-server 某个客户端的数据发送不出去,或者只有部分可以发出去( 例如,服务器端给客户端发数据,客户端的应用层一直不从 Tcp 内核缓冲区中取出数据,这样服务器发送一段时间的数据后,客户端内核缓冲区满了,服务器再发数据就会发不出去,由于 fd 是非阻塞的,这个时候服务器调用 send 或者 write 函数会直接返回,返回值是 −1 ,错误码是 EAGAIN ,见上面的代码。)。不管哪种情况,数据这一次发不完。这个时候就需要监听可写事件了,因为在 handleClientsWithPendingWrites 函数中有如下代码:

    /* If there is nothing left, do nothing. Otherwise install
     * the write handler. */
    if (clientHasPendingReplies(c) && aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
                                                    sendReplyToClient, c) == AE_ERR)
    {
        freeClientAsync(c);
    }
    

      这里注册可写事件 AE_WRITABLE 的回调函数是 sendReplyToClient 。也就是说,当下一次某个触发可写事件时,调用的就是 sendReplyToClient 函数。可以猜想,sendReplyToClient 发送数据的逻辑和上面的 writeToClient 函数一模一样,不信请看( 位于 networking.c 文件中 ):

    /* Write event handler. Just send data to the client. */
    void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
        UNUSED(el);
        UNUSED(mask);
        writeToClient(fd,privdata,1);
    }
    

      

      至此,redis-server 发送数据的逻辑也理清楚了。这里简单做个总结:

      如果有数据要发送给某个 client ,不需要专门注册可写事件等触发可写事件再发送。通常的做法是在应答数据产生的地方直接发送,如果是因为对端 Tcp 窗口太小引起的发送不完,则将剩余的数据存储至某个缓冲区并注册监听可写事件,等下次触发可写事件后再尝试发送,一直到数据全部发送完毕后移除可写事件。

    redis-server 数据的发送逻辑与这个稍微有点差别,就是将数据发送的时机放到了 EventLoop 的某个时间点上( 这里是在 ProcessEvents 之前 ),其他的与上面完全一样。

      之所以不注册监听可写事件,等可写事件触发再发送数据,原因是通常情况下,网络通信的两端数据一般都是正常收发的,不会出现某一端由于 Tcp 窗口太小而使另外一端发不出去的情况。如果注册监听可写事件,那么这个事件会频繁触发,而触发时不一定有数据需要发送,这样不仅浪费系统资源,同时也浪费服务器程序宝贵的 CPU 时间片。

      

  • 相关阅读:
    随笔
    3.1作业
    关于JavaDate数据返回到前端变数字的问题(并引申到前后端时间的传输)
    utf-8转换为base64
    base64转换为utf-8
    Java Web基础——jsp调用动态界面
    Java Web基础——JSP指令标记
    2020软件工程最后一次作业
    软件工程第二次结对作业
    软件工程第三次作业
  • 原文地址:https://www.cnblogs.com/wzqstudy/p/10271915.html
Copyright © 2011-2022 走看看