zoukankan      html  css  js  c++  java
  • Redis 源码走读(一)事件驱动机制与命令处理

    eventloop 

    从 server.c 的 main 方法看起

    int main(int argc, char **argv) {
    .......
    
        aeSetBeforeSleepProc(server.el,beforeSleep);
        aeSetAfterSleepProc(server.el,afterSleep);
        aeMain(server.el);
        aeDeleteEventLoop(server.el);
        return 0;
    }

    aeMain.c

    //在死循环中调用 aeProcessEvents 方法,处理可以执行的 time event 与 file event
    // 在 server.c 的 main 函数中会被调用
    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);
            aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
        }
    }
    /* Process every pending time event, then every pending file event
     * (that may be registered by time event callbacks just processed).
     * Without special flags the function sleeps until some file event
     * fires, or when the next time event occurs (if any).
     *
     * If flags is 0, the function does nothing and returns.
     * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
     * if flags has AE_FILE_EVENTS set, file events are processed.
     * if flags has AE_TIME_EVENTS set, time events are processed.
     * if flags has AE_DONT_WAIT set the function returns ASAP until all
     * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
     * the events that's possible to process without to wait are processed.
     *
     * The function returns the number of events processed. */
    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
    ........
    
        /* Note that we want call select() even if there are no
         * file events to process as long as we want to process time
         * events, in order to sleep until the next time event is ready
         * to fire. */
        //优先执行 time event
        if (eventLoop->maxfd != -1 ||
            ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
            int j;
            aeTimeEvent *shortest = NULL;
            struct timeval tv, *tvp;
    
            if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
                //找到time event 链表里,最近的 time event
                shortest = aeSearchNearestTimer(eventLoop);
            //计算从现在起到这个time event 被执行,要等待多久
            if (shortest) {
                long now_sec, now_ms;
    
                aeGetTime(&now_sec, &now_ms);
                tvp = &tv;
    
                /* How many milliseconds we need to wait for the next
                 * time event to fire? */
                long long ms =
                    (shortest->when_sec - now_sec)*1000 +
                    shortest->when_ms - now_ms;
    
                if (ms > 0) {
                    tvp->tv_sec = ms/1000;
                    tvp->tv_usec = (ms % 1000)*1000;
                } else {
                    tvp->tv_sec = 0;
                    tvp->tv_usec = 0;
                }
            } else {
                /* If we have to check for events but need to return
                 * ASAP because of AE_DONT_WAIT we need to set the timeout
                 * to zero */
                if (flags & AE_DONT_WAIT) {
                    tv.tv_sec = tv.tv_usec = 0;
                    tvp = &tv;
                } else {
                    /* Otherwise we can block */
                    tvp = NULL; /* wait forever */
                }
            }
    
            /* Call the multiplexing API, will return only on timeout or when
             * some event fires. */
            //调用 IO 多路复用的代码,找到可读写的 file event
            numevents = aeApiPoll(eventLoop, tvp);
    
            /* After sleep callback. */
            if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
                eventLoop->aftersleep(eventLoop);
    
            //遍历 event loop 的 fired 数组对应的 fd
            for (j = 0; j < numevents; j++) {
                aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
                int mask = eventLoop->fired[j].mask;//记录了事件类型:read/write
                int fd = eventLoop->fired[j].fd;//事件的 fd
                int fired = 0; /* Number of events fired for current fd. */
    
                /* Normally we execute the readable event first, and the writable
                 * event laster. This is useful as sometimes we may be able
                 * to serve the reply of a query immediately after processing the
                 * query.
                 *
                 * However if AE_BARRIER is set in the mask, our application is
                 * asking us to do the reverse: never fire the writable event
                 * after the readable. In such a case, we invert the calls.
                 * This is useful when, for instance, we want to do things
                 * in the beforeSleep() hook, like fsynching a file to disk,
                 * before replying to a client. */
                int invert = fe->mask & AE_BARRIER;
    
            /* Note the "fe->mask & mask & ..." code: maybe an already
                 * processed event removed an element that fired and we still
                 * didn't processed, so we check if the event is still valid.
                 *
                 * Fire the readable event if the call sequence is not
                 * inverted. */
                if (!invert && fe->mask & mask & AE_READABLE) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
    
                /* Fire the writable event. */
                if (fe->mask & mask & AE_WRITABLE) {
                    if (!fired || fe->wfileProc != fe->rfileProc) {
                        fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                        fired++;
                    }
                }
    
                /* If we have to invert the call, fire the readable event now
                 * after the writable one. */
                if (invert && fe->mask & mask & AE_READABLE) {
                    if (!fired || fe->wfileProc != fe->rfileProc) {
                        fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                        fired++;
                    }
                }
    
                processed++;
            }
        }
        /* Check time events */
        if (flags & AE_TIME_EVENTS)
            processed += processTimeEvents(eventLoop);
    
        return processed; /* return the number of processed file/time events */
    }

    标准的事件驱动框架,在死循环中调用aeProcessEvents方法

    aeProcessEvents 方法比较长,里面会处理两种事件TimeEvent 与 FileEvent,本文关注的重点是 FileEvent

    aeProcessEvents 调用 aeApiPoll 方法来查找监听的 fd 上有哪些是可用的,找到可用的 fd 之后,根据 fd 的事件类型,决定调用 wfileProc 还是rfileProc 来处理相关的事件, 本文里我们关心的是 client 发来的 command 会被如何处理,那就是rfileProc了,rfileProc的设置过程在后文中被提及

    aeApiPoll 在多个文件中被实现,Redis 用条件编译的手法决定采用哪种实现,很有意思

    /* Include the best multiplexing layer supported by this system.
     * The following should be ordered by performances, descending. */
     //用宏实现编译期重载,很稳
    #ifdef HAVE_EVPORT
    #include "ae_evport.c"
    #else
        #ifdef HAVE_EPOLL
        #include "ae_epoll.c"
        #else
            #ifdef HAVE_KQUEUE
            #include "ae_kqueue.c"
            #else
            #include "ae_select.c"
            #endif
        #endif
    #endif

    就看最经典的 epoll 好了:

    typedef struct aeApiState {
        int epfd;
        struct epoll_event *events;
    } aeApiState;
    
    //创建eventloop
    static int aeApiCreate(aeEventLoop *eventLoop) {
        aeApiState *state = zmalloc(sizeof(aeApiState));
    
        if (!state) return -1;
        state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
        if (!state->events) {
            zfree(state);
            return -1;
        }
        state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
        if (state->epfd == -1) {
            zfree(state->events);
            zfree(state);
            return -1;
        }
        eventLoop->apidata = state;
        return 0;
    }
    
    static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
        aeApiState *state = eventLoop->apidata;
        struct epoll_event ee = {0}; /* avoid valgrind warning */
        /* If the fd was already monitored for some event, we need a MOD
         * operation. Otherwise we need an ADD operation. */
        int op = eventLoop->events[fd].mask == AE_NONE ?
                EPOLL_CTL_ADD : EPOLL_CTL_MOD;//epoll_ctl函数的 op 参数的可能的取值:EPOLL_CTL_ADD 注册、EPOLL_CTL_MOD 修 改、EPOLL_CTL_DEL 删除
    
        ee.events = 0;
        //同时修改 eventLoop 里 event 的 mask 标记,和关联的 epoll fd 所监听的事件集合
        mask |= eventLoop->events[fd].mask; /* Merge old events */
        if (mask & AE_READABLE) ee.events |= EPOLLIN;
        if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
        ee.data.fd = fd;
        if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
        return 0;
    }
    
    //传入的 tvp 是 epoll 超时时间,如果 tvp 为 null,则永久阻塞
    static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
        aeApiState *state = eventLoop->apidata;
        int retval, numevents = 0;
    
        retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
                tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
        if (retval > 0) {
            int j;
    
            numevents = retval;
            //遍历可读写的 fd
            for (j = 0; j < numevents; j++) {
                int mask = 0;
                struct epoll_event *e = state->events+j;
    
                if (e->events & EPOLLIN) mask |= AE_READABLE;
                if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
                if (e->events & EPOLLERR) mask |= AE_WRITABLE;
                if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
    
                //设置 eventLoop.fired 数组里的元素,这些元素代表可读写的 fd
                eventLoop->fired[j].fd = e->data.fd;
                eventLoop->fired[j].mask = mask;
            }
        }
        return numevents;
    }

    代码不算复杂,实际上对系统调用做了一层简单的封装

    调用 epoll_ctl 方法来注册监听 fd

    调用 epoll_wait 方法来等待,直到被监听的 fd 上有事件发生为止

    比较有趣的做法是aeFileEvent 结构体里定义了一个 mask 属性来记录这个 fd 被监听的事件,应该是为了便于后续查找。

    新 client 建立连接

    networking.c

    client *createClient(int fd) {
        client *c = zmalloc(sizeof(client));
    
        //fd == -1,说明这是一个用于执行 lua 脚本的无连接的伪客户端,可以省去一些开销
        /* passing -1 as fd it is possible to create a non connected client.
         * This is useful since all the commands needs to be executed
         * in the context of a client. When commands are executed in other
         * contexts (for instance a Lua script) we need a non connected client. */
        if (fd != -1) {
            anetNonBlock(NULL,fd);//将这个 fd 设为 non block 模式
            anetEnableTcpNoDelay(NULL,fd);//调用 setsockopt 方法,禁止使用nagle 算法,确保数据包能尽可能快速的发出去
            if (server.tcpkeepalive)
                anetKeepAlive(NULL,fd,server.tcpkeepalive);
            // 给这个 client 关联的 fd 注册 read 事件处理函数:readQueryFromClient,其定义在文件尾部
            if (aeCreateFileEvent(server.el,fd,AE_READABLE,
                readQueryFromClient, c) == AE_ERR)
            {
                close(fd);
                zfree(c);
                return NULL;
            }
        }

    调用 aeCreateFileEvent 方法给这个 fd 注册 read 事件处理函数 readQueryFromClient,也就是设置到这个 fd 的 rfileProc 属性里

    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
            aeFileProc *proc, void *clientData)
    {
        if (fd >= eventLoop->setsize) {
            errno = ERANGE;
            return AE_ERR;
        }
        aeFileEvent *fe = &eventLoop->events[fd];
    
        if (aeApiAddEvent(eventLoop, fd, mask) == -1)
            return AE_ERR;
        fe->mask |= mask;
        if (mask & AE_READABLE) fe->rfileProc = proc;
        if (mask & AE_WRITABLE) fe->wfileProc = proc;
        fe->clientData = clientData;
        if (fd > eventLoop->maxfd)
            eventLoop->maxfd = fd;
        return AE_OK;
    }

    当 client 发送 command 过来的时候,eventloop 会发现这个 fd 可读,然后调用 readQueryFromClient 进行处理

    处理client 发送的 command

    //回调函数,这个函数被触发的时候,说明 client 触发了 read 事件
    void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    .....
        /* Time to process the buffer. If the client is a master we need to
         * compute the difference between the applied offset before and after
         * processing the buffer, to understand how much of the replication stream
         * was actually applied to the master state: this quantity, and its
         * corresponding part of the replication stream, will be propagated to
         * the sub-slaves and to the replication backlog. */
        if (!(c->flags & CLIENT_MASTER)) {
            processInputBuffer(c);//非 master
        } else {
            //本机为 master,除了处理 buffer 里的命令,还要解决主从复制的问题
            size_t prev_offset = c->reploff;
            processInputBuffer(c);
            size_t applied = c->reploff - prev_offset;
            if (applied) {
                replicationFeedSlavesFromMasterStream(server.slaves,
                        c->pending_querybuf, applied);
                sdsrange(c->pending_querybuf,applied,-1);
            }
        }
    }

    当 fd 可读时,eventloop 会触发 readQueryFromClient 这个回调函数,再调用 processInputBuffer 函数

    /* This function is called every time, in the client structure 'c', there is
     * more query buffer to process, because we read more data from the socket
     * or because a client was blocked and later reactivated, so there could be
     * pending query buffer, already representing a full command, to process. */
    void processInputBuffer(client *c) {
    .....
    
            if (c->reqtype == PROTO_REQ_INLINE) {
                if (processInlineBuffer(c) != C_OK) break;
            } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
                if (processMultibulkBuffer(c) != C_OK) break;
            } else {
                serverPanic("Unknown request type");
            }
    
            /* Multibulk processing could see a <= 0 length. */
            if (c->argc == 0) {
                resetClient(c);
            } else {
                /* Only reset the client when the command was executed. */
                //终于开始执行 command 了
                if (processCommand(c) == C_OK) {
                    if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
                        /* Update the applied replication offset of our master. */
                        c->reploff = c->read_reploff - sdslen(c->querybuf);
                    }
    
                    /* Don't reset the client structure for clients blocked in a
                     * module blocking command, so that the reply callback will
                     * still be able to access the client argv and argc field.
                     * The client will be reset in unblockClientFromModule(). */
                    if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
                        resetClient(c);
                }
                /* freeMemoryIfNeeded may flush slave output buffers. This may
                 * result into a slave, that may be the active client, to be
                 * freed. */
                if (server.current_client == NULL) break;
            }
        }
        server.current_client = NULL;
    }

    调用processCommand 方法,顾名思义,里面会对 client 发来的指令做处理

    其实现位于server.c 里

    /* If this function gets called we already read a whole
     * command, arguments are in the client argv/argc fields.
     * processCommand() execute the command or prepare the
     * server for a bulk read from the client.
     *
     * If C_OK is returned the client is still alive and valid and
     * other operations can be performed by the caller. Otherwise
     * if C_ERR is returned the client was destroyed (i.e. after QUIT). */
    int processCommand(client *c) {
    ......
        /* Now lookup the command and check ASAP about trivial error conditions
         * such as wrong arity, bad command name and so forth. */
        // 从 command dict 里查找对应的 command 实现,
        c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
        //检查 command 是否存在,以及参数的数量是否正确
        if (!c->cmd) {
            flagTransaction(c);
            addReplyErrorFormat(c,"unknown command '%s'",
                (char*)c->argv[0]->ptr);
            return C_OK;
        } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
                   (c->argc < -c->cmd->arity)) {
            flagTransaction(c);
            addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
                c->cmd->name);
            return C_OK;
        }
    .....
    
        //前面是检查参数和处理各种异常情况
        /* Exec the command */
        //如果处在 multi 命令开启的事务环境中
        if (c->flags & CLIENT_MULTI &&
            c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
            c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
        {
            //把命令放到 queue 里
            queueMultiCommand(c);
            addReply(c,shared.queued);
        } else {
            //执行非事务,普通命令,实现位于本文件的2200多行
            call(c,CMD_CALL_FULL);
            c->woff = server.master_repl_offset;
            if (listLength(server.ready_keys))
                handleClientsBlockedOnKeys();
        }
        return C_OK;
    }

    这个方法有两个关键点:

    1. 调用 lookupCommand 方法查找 client 提交的 command 对应的实现(redis server 启动的时候会初始化一个 dict,里面存放了 command 名称到实现函数的映射关系,去这个 dict 里查就好了)

    2. 执行函数,我们先不关注事务,只看最简单的普通命令,那么会调用call 方法

    其实现位于 server.c 里

    void call(client *c, int flags) {
    ......
        /* Call the command. */
        dirty = server.dirty;
        start = ustime();
        c->cmd->proc(c);//执行命令
        duration = ustime()-start;//计算命令执行时间
        dirty = server.dirty-dirty;
        if (dirty < 0) dirty = 0;
    ....
    }

    主要是用 cmd 的 proc 属性,一个函数指针来完成实际操作

    至于 cmd 和它的 proc 属性,是在上一步的 lookupCommand 方法里被设置的。

    例如最简单的 get 方法,就对应于getCommand 这个方法:

        {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},

    其具体实现位于t_string.c 里,细节暂时就不跟进了。

    现在我们就大致上能理解client 发送的 command 的流转过程了。

  • 相关阅读:
    POJ 1811 Prime Test 素性测试 分解素因子
    sysbench的安装与使用
    电脑中已有VS2005和VS2010安装.NET3.5失败的解决方案
    I.MX6 show battery states in commandLine
    RPi 2B Raspbian system install
    I.MX6 bq27441 driver porting
    I.MX6 隐藏电池图标
    I.MX6 Power off register hacking
    I.MX6 Goodix GT9xx touchscreen driver porting
    busybox filesystem httpd php-5.5.31 sqlite3 webserver
  • 原文地址:https://www.cnblogs.com/stevenczp/p/9383515.html
Copyright © 2011-2022 走看看