zoukankan      html  css  js  c++  java
  • Redis 源码走读(一)事件驱动机制与命令处理

    eventloop 

    从 server.c 的 main 方法看起

    int main(int argc, char **argv) {
    .......
    
        aeSetBeforeSleepProc(server.el,beforeSleep);
        aeSetAfterSleepProc(server.el,afterSleep);
        aeMain(server.el);
        aeDeleteEventLoop(server.el);
        return 0;
    }

    aeMain.c

    //在死循环中调用 aeProcessEvents 方法,处理可以执行的 time event 与 file event
    // 在 server.c 的 main 函数中会被调用
    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);
            aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
        }
    }
    /* Process every pending time event, then every pending file event
     * (that may be registered by time event callbacks just processed).
     * Without special flags the function sleeps until some file event
     * fires, or when the next time event occurs (if any).
     *
     * If flags is 0, the function does nothing and returns.
     * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
     * if flags has AE_FILE_EVENTS set, file events are processed.
     * if flags has AE_TIME_EVENTS set, time events are processed.
     * if flags has AE_DONT_WAIT set the function returns ASAP until all
     * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
     * the events that's possible to process without to wait are processed.
     *
     * The function returns the number of events processed. */
    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
    ........
    
        /* Note that we want call select() even if there are no
         * file events to process as long as we want to process time
         * events, in order to sleep until the next time event is ready
         * to fire. */
        //优先执行 time event
        if (eventLoop->maxfd != -1 ||
            ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
            int j;
            aeTimeEvent *shortest = NULL;
            struct timeval tv, *tvp;
    
            if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
                //找到time event 链表里,最近的 time event
                shortest = aeSearchNearestTimer(eventLoop);
            //计算从现在起到这个time event 被执行,要等待多久
            if (shortest) {
                long now_sec, now_ms;
    
                aeGetTime(&now_sec, &now_ms);
                tvp = &tv;
    
                /* How many milliseconds we need to wait for the next
                 * time event to fire? */
                long long ms =
                    (shortest->when_sec - now_sec)*1000 +
                    shortest->when_ms - now_ms;
    
                if (ms > 0) {
                    tvp->tv_sec = ms/1000;
                    tvp->tv_usec = (ms % 1000)*1000;
                } else {
                    tvp->tv_sec = 0;
                    tvp->tv_usec = 0;
                }
            } else {
                /* If we have to check for events but need to return
                 * ASAP because of AE_DONT_WAIT we need to set the timeout
                 * to zero */
                if (flags & AE_DONT_WAIT) {
                    tv.tv_sec = tv.tv_usec = 0;
                    tvp = &tv;
                } else {
                    /* Otherwise we can block */
                    tvp = NULL; /* wait forever */
                }
            }
    
            /* Call the multiplexing API, will return only on timeout or when
             * some event fires. */
            //调用 IO 多路复用的代码,找到可读写的 file event
            numevents = aeApiPoll(eventLoop, tvp);
    
            /* After sleep callback. */
            if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
                eventLoop->aftersleep(eventLoop);
    
            //遍历 event loop 的 fired 数组对应的 fd
            for (j = 0; j < numevents; j++) {
                aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
                int mask = eventLoop->fired[j].mask;//记录了事件类型:read/write
                int fd = eventLoop->fired[j].fd;//事件的 fd
                int fired = 0; /* Number of events fired for current fd. */
    
                /* Normally we execute the readable event first, and the writable
                 * event laster. This is useful as sometimes we may be able
                 * to serve the reply of a query immediately after processing the
                 * query.
                 *
                 * However if AE_BARRIER is set in the mask, our application is
                 * asking us to do the reverse: never fire the writable event
                 * after the readable. In such a case, we invert the calls.
                 * This is useful when, for instance, we want to do things
                 * in the beforeSleep() hook, like fsynching a file to disk,
                 * before replying to a client. */
                int invert = fe->mask & AE_BARRIER;
    
            /* Note the "fe->mask & mask & ..." code: maybe an already
                 * processed event removed an element that fired and we still
                 * didn't processed, so we check if the event is still valid.
                 *
                 * Fire the readable event if the call sequence is not
                 * inverted. */
                if (!invert && fe->mask & mask & AE_READABLE) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
    
                /* Fire the writable event. */
                if (fe->mask & mask & AE_WRITABLE) {
                    if (!fired || fe->wfileProc != fe->rfileProc) {
                        fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                        fired++;
                    }
                }
    
                /* If we have to invert the call, fire the readable event now
                 * after the writable one. */
                if (invert && fe->mask & mask & AE_READABLE) {
                    if (!fired || fe->wfileProc != fe->rfileProc) {
                        fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                        fired++;
                    }
                }
    
                processed++;
            }
        }
        /* Check time events */
        if (flags & AE_TIME_EVENTS)
            processed += processTimeEvents(eventLoop);
    
        return processed; /* return the number of processed file/time events */
    }

    标准的事件驱动框架,在死循环中调用aeProcessEvents方法

    aeProcessEvents 方法比较长,里面会处理两种事件TimeEvent 与 FileEvent,本文关注的重点是 FileEvent

    aeProcessEvents 调用 aeApiPoll 方法来查找监听的 fd 上有哪些是可用的,找到可用的 fd 之后,根据 fd 的事件类型,决定调用 wfileProc 还是rfileProc 来处理相关的事件, 本文里我们关心的是 client 发来的 command 会被如何处理,那就是rfileProc了,rfileProc的设置过程在后文中被提及

    aeApiPoll 在多个文件中被实现,Redis 用条件编译的手法决定采用哪种实现,很有意思

    /* Include the best multiplexing layer supported by this system.
     * The following should be ordered by performances, descending. */
     //用宏实现编译期重载,很稳
    #ifdef HAVE_EVPORT
    #include "ae_evport.c"
    #else
        #ifdef HAVE_EPOLL
        #include "ae_epoll.c"
        #else
            #ifdef HAVE_KQUEUE
            #include "ae_kqueue.c"
            #else
            #include "ae_select.c"
            #endif
        #endif
    #endif

    就看最经典的 epoll 好了:

    typedef struct aeApiState {
        int epfd;
        struct epoll_event *events;
    } aeApiState;
    
    //创建eventloop
    static int aeApiCreate(aeEventLoop *eventLoop) {
        aeApiState *state = zmalloc(sizeof(aeApiState));
    
        if (!state) return -1;
        state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
        if (!state->events) {
            zfree(state);
            return -1;
        }
        state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
        if (state->epfd == -1) {
            zfree(state->events);
            zfree(state);
            return -1;
        }
        eventLoop->apidata = state;
        return 0;
    }
    
    static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
        aeApiState *state = eventLoop->apidata;
        struct epoll_event ee = {0}; /* avoid valgrind warning */
        /* If the fd was already monitored for some event, we need a MOD
         * operation. Otherwise we need an ADD operation. */
        int op = eventLoop->events[fd].mask == AE_NONE ?
                EPOLL_CTL_ADD : EPOLL_CTL_MOD;//epoll_ctl函数的 op 参数的可能的取值:EPOLL_CTL_ADD 注册、EPOLL_CTL_MOD 修 改、EPOLL_CTL_DEL 删除
    
        ee.events = 0;
        //同时修改 eventLoop 里 event 的 mask 标记,和关联的 epoll fd 所监听的事件集合
        mask |= eventLoop->events[fd].mask; /* Merge old events */
        if (mask & AE_READABLE) ee.events |= EPOLLIN;
        if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
        ee.data.fd = fd;
        if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
        return 0;
    }
    
    //传入的 tvp 是 epoll 超时时间,如果 tvp 为 null,则永久阻塞
    static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
        aeApiState *state = eventLoop->apidata;
        int retval, numevents = 0;
    
        retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
                tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
        if (retval > 0) {
            int j;
    
            numevents = retval;
            //遍历可读写的 fd
            for (j = 0; j < numevents; j++) {
                int mask = 0;
                struct epoll_event *e = state->events+j;
    
                if (e->events & EPOLLIN) mask |= AE_READABLE;
                if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
                if (e->events & EPOLLERR) mask |= AE_WRITABLE;
                if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
    
                //设置 eventLoop.fired 数组里的元素,这些元素代表可读写的 fd
                eventLoop->fired[j].fd = e->data.fd;
                eventLoop->fired[j].mask = mask;
            }
        }
        return numevents;
    }

    代码不算复杂,实际上对系统调用做了一层简单的封装

    调用 epoll_ctl 方法来注册监听 fd

    调用 epoll_wait 方法来等待,直到被监听的 fd 上有事件发生为止

    比较有趣的做法是aeFileEvent 结构体里定义了一个 mask 属性来记录这个 fd 被监听的事件,应该是为了便于后续查找。

    新 client 建立连接

    networking.c

    client *createClient(int fd) {
        client *c = zmalloc(sizeof(client));
    
        //fd == -1,说明这是一个用于执行 lua 脚本的无连接的伪客户端,可以省去一些开销
        /* passing -1 as fd it is possible to create a non connected client.
         * This is useful since all the commands needs to be executed
         * in the context of a client. When commands are executed in other
         * contexts (for instance a Lua script) we need a non connected client. */
        if (fd != -1) {
            anetNonBlock(NULL,fd);//将这个 fd 设为 non block 模式
            anetEnableTcpNoDelay(NULL,fd);//调用 setsockopt 方法,禁止使用nagle 算法,确保数据包能尽可能快速的发出去
            if (server.tcpkeepalive)
                anetKeepAlive(NULL,fd,server.tcpkeepalive);
            // 给这个 client 关联的 fd 注册 read 事件处理函数:readQueryFromClient,其定义在文件尾部
            if (aeCreateFileEvent(server.el,fd,AE_READABLE,
                readQueryFromClient, c) == AE_ERR)
            {
                close(fd);
                zfree(c);
                return NULL;
            }
        }

    调用 aeCreateFileEvent 方法给这个 fd 注册 read 事件处理函数 readQueryFromClient,也就是设置到这个 fd 的 rfileProc 属性里

    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
            aeFileProc *proc, void *clientData)
    {
        if (fd >= eventLoop->setsize) {
            errno = ERANGE;
            return AE_ERR;
        }
        aeFileEvent *fe = &eventLoop->events[fd];
    
        if (aeApiAddEvent(eventLoop, fd, mask) == -1)
            return AE_ERR;
        fe->mask |= mask;
        if (mask & AE_READABLE) fe->rfileProc = proc;
        if (mask & AE_WRITABLE) fe->wfileProc = proc;
        fe->clientData = clientData;
        if (fd > eventLoop->maxfd)
            eventLoop->maxfd = fd;
        return AE_OK;
    }

    当 client 发送 command 过来的时候,eventloop 会发现这个 fd 可读,然后调用 readQueryFromClient 进行处理

    处理client 发送的 command

    //回调函数,这个函数被触发的时候,说明 client 触发了 read 事件
    void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    .....
        /* Time to process the buffer. If the client is a master we need to
         * compute the difference between the applied offset before and after
         * processing the buffer, to understand how much of the replication stream
         * was actually applied to the master state: this quantity, and its
         * corresponding part of the replication stream, will be propagated to
         * the sub-slaves and to the replication backlog. */
        if (!(c->flags & CLIENT_MASTER)) {
            processInputBuffer(c);//非 master
        } else {
            //本机为 master,除了处理 buffer 里的命令,还要解决主从复制的问题
            size_t prev_offset = c->reploff;
            processInputBuffer(c);
            size_t applied = c->reploff - prev_offset;
            if (applied) {
                replicationFeedSlavesFromMasterStream(server.slaves,
                        c->pending_querybuf, applied);
                sdsrange(c->pending_querybuf,applied,-1);
            }
        }
    }

    当 fd 可读时,eventloop 会触发 readQueryFromClient 这个回调函数,再调用 processInputBuffer 函数

    /* This function is called every time, in the client structure 'c', there is
     * more query buffer to process, because we read more data from the socket
     * or because a client was blocked and later reactivated, so there could be
     * pending query buffer, already representing a full command, to process. */
    void processInputBuffer(client *c) {
    .....
    
            if (c->reqtype == PROTO_REQ_INLINE) {
                if (processInlineBuffer(c) != C_OK) break;
            } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
                if (processMultibulkBuffer(c) != C_OK) break;
            } else {
                serverPanic("Unknown request type");
            }
    
            /* Multibulk processing could see a <= 0 length. */
            if (c->argc == 0) {
                resetClient(c);
            } else {
                /* Only reset the client when the command was executed. */
                //终于开始执行 command 了
                if (processCommand(c) == C_OK) {
                    if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
                        /* Update the applied replication offset of our master. */
                        c->reploff = c->read_reploff - sdslen(c->querybuf);
                    }
    
                    /* Don't reset the client structure for clients blocked in a
                     * module blocking command, so that the reply callback will
                     * still be able to access the client argv and argc field.
                     * The client will be reset in unblockClientFromModule(). */
                    if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
                        resetClient(c);
                }
                /* freeMemoryIfNeeded may flush slave output buffers. This may
                 * result into a slave, that may be the active client, to be
                 * freed. */
                if (server.current_client == NULL) break;
            }
        }
        server.current_client = NULL;
    }

    调用processCommand 方法,顾名思义,里面会对 client 发来的指令做处理

    其实现位于server.c 里

    /* If this function gets called we already read a whole
     * command, arguments are in the client argv/argc fields.
     * processCommand() execute the command or prepare the
     * server for a bulk read from the client.
     *
     * If C_OK is returned the client is still alive and valid and
     * other operations can be performed by the caller. Otherwise
     * if C_ERR is returned the client was destroyed (i.e. after QUIT). */
    int processCommand(client *c) {
    ......
        /* Now lookup the command and check ASAP about trivial error conditions
         * such as wrong arity, bad command name and so forth. */
        // 从 command dict 里查找对应的 command 实现,
        c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
        //检查 command 是否存在,以及参数的数量是否正确
        if (!c->cmd) {
            flagTransaction(c);
            addReplyErrorFormat(c,"unknown command '%s'",
                (char*)c->argv[0]->ptr);
            return C_OK;
        } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
                   (c->argc < -c->cmd->arity)) {
            flagTransaction(c);
            addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
                c->cmd->name);
            return C_OK;
        }
    .....
    
        //前面是检查参数和处理各种异常情况
        /* Exec the command */
        //如果处在 multi 命令开启的事务环境中
        if (c->flags & CLIENT_MULTI &&
            c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
            c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
        {
            //把命令放到 queue 里
            queueMultiCommand(c);
            addReply(c,shared.queued);
        } else {
            //执行非事务,普通命令,实现位于本文件的2200多行
            call(c,CMD_CALL_FULL);
            c->woff = server.master_repl_offset;
            if (listLength(server.ready_keys))
                handleClientsBlockedOnKeys();
        }
        return C_OK;
    }

    这个方法有两个关键点:

    1. 调用 lookupCommand 方法查找 client 提交的 command 对应的实现(redis server 启动的时候会初始化一个 dict,里面存放了 command 名称到实现函数的映射关系,去这个 dict 里查就好了)

    2. 执行函数,我们先不关注事务,只看最简单的普通命令,那么会调用call 方法

    其实现位于 server.c 里

    void call(client *c, int flags) {
    ......
        /* Call the command. */
        dirty = server.dirty;
        start = ustime();
        c->cmd->proc(c);//执行命令
        duration = ustime()-start;//计算命令执行时间
        dirty = server.dirty-dirty;
        if (dirty < 0) dirty = 0;
    ....
    }

    主要是用 cmd 的 proc 属性,一个函数指针来完成实际操作

    至于 cmd 和它的 proc 属性,是在上一步的 lookupCommand 方法里被设置的。

    例如最简单的 get 方法,就对应于getCommand 这个方法:

        {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},

    其具体实现位于t_string.c 里,细节暂时就不跟进了。

    现在我们就大致上能理解client 发送的 command 的流转过程了。

  • 相关阅读:
    ARMV7-M数据手册---Part A :Application Level Architecture---A7 Instruction Details
    卷积神经网络
    Theano深度学习结构分析
    BP神经网络
    Softmax回归
    CART:分类与回归树
    基于单决策树的AdaBoost
    Logistic回归
    Qt搭建多线程Server
    支持向量机(SVM)
  • 原文地址:https://www.cnblogs.com/stevenczp/p/9383515.html
Copyright © 2011-2022 走看看