zoukankan      html  css  js  c++  java
  • Redis 源码分析系列1-main函数相关调用分析

    从main函数切入,方便从宏观上掌握redis的运作机制,本篇就从main函数入手,从最上层看,main调用了哪些接口,具体完成了什么功能,然后再聚焦具体的模块。

    aeEventLoop是Redis的事件核心数据结构,Redis将aeEventLoop不同平台上的多路分离器进行适配,如select/kqueue/epoll。

    为了跨平台,aeEventLoop中定义了void* apidata这一结构,用来有不同平台的分离器进行关联。

     1 typedef struct aeEventLoop {
     2     // 目前已注册的最大描述符
     3     int maxfd;   /* highest file descriptor currently registered */
     4     // 目前已追踪的最大描述符
     5     int setsize; /* max number of file descriptors tracked */
     6     // 用于生成时间事件 id
     7     long long timeEventNextId;
     8     // 最后一次执行时间事件的时间
     9     time_t lastTime;     /* Used to detect system clock skew */
    10     // 已注册的文件事件
    11     aeFileEvent *events; /* Registered events */
    12     // 已就绪的文件事件
    13     aeFiredEvent *fired; /* Fired events */
    14     // 时间事件
    15     aeTimeEvent *timeEventHead;
    16     // 事件处理器的开关
    17     int stop;
    18     // 多路复用库的私有数据
    19     void *apidata; /* This is used for polling API specific data */
    20     // 在处理事件前要执行的函数
    21     aeBeforeSleepProc *beforesleep;
    22 } aeEventLoop;

    其中select的apidata结构为:

    typedef struct aeApiState {
        fd_set rfds, wfds;
        /* We need to have a copy of the fd sets as it's not safe to reuse
         * FD sets after select(). */
        fd_set _rfds, _wfds;
    } aeApiState;

    这里提到额外引入了_rfds,_wfds,是读写句柄集合的拷贝,原因是:select调用完成后,重用fd集合不是安全的(具体原因可以看select函数解析)。

    在创建eventloop的时候,对其中eventloop->events数组中的每个event(即eventloop->events[fd])的mask都预置位AE_NONE;

    /* Events with mask == AE_NONE are not set. So let's initialize the
         * vector with it. */
        for (i = 0; i < setsize; i++)
            eventLoop->events[i].mask = AE_NONE;

    当第一个事件来临时候,

    static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
        aeApiState *state = eventLoop->apidata;
        int retval, j, numevents = 0;
    
        memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
        memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
    
        retval = select(eventLoop->maxfd+1,
                    &state->_rfds,&state->_wfds,NULL,tvp);
        if (retval > 0) {
            for (j = 0; j <= eventLoop->maxfd; j++) {
                int mask = 0;
                aeFileEvent *fe = &eventLoop->events[j];
    
                if (fe->mask == AE_NONE) continue;
                if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
                    mask |= AE_READABLE;
                if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
                    mask |= AE_WRITABLE;
                eventLoop->fired[numevents].fd = j;
                eventLoop->fired[numevents].mask = mask;
                numevents++;
            }
        }
        return numevents;
    }

    这里对fe->mask进行了判断,如果为AE_NONE,则说明此fd没有可读或者可写事件,跳过本次for循环。

    否则分别判断为可读还是可写,并构造fe的mask。最后将此事件关联的fd(即这里的j)添加至fired数组,其为所有已经就绪的fd数组。

    显然可知:aeApiPoll的作用是利用多路分离器,复用IO,判断哪些fd可读写,并加入到已就绪数组中,等待被处理

    那么aeApiPoll什么时候会被调用呢?

    在函数aeProcessEvents中调用,获取已经就绪的fd,完成读写IO。

    调用处的关键代码如下:

    numevents = aeApiPoll(eventLoop, tvp);
            for (j = 0; j < numevents; j++) {
                // 从已就绪数组中获取事件
                aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
    
                int mask = eventLoop->fired[j].mask;
                int fd = eventLoop->fired[j].fd;
                int rfired = 0;
    
                /* note the fe->mask & mask & ... code: maybe an already processed
                 * event removed an element that fired and we still didn't
                 * processed, so we check if the event is still valid. */
                if (fe->mask & mask & AE_READABLE) {
                    // 读事件
                    rfired = 1; // 确保读/写事件只能执行其中一个
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                }
                if (fe->mask & mask & AE_WRITABLE) {
                    // 写事件
                    if (!rfired || fe->wfileProc != fe->rfileProc)
                        fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                }
    
                processed++;

    简单分析下具体细节:

    // 计算tvp(即多路分离器的timeout,超时时间,对于select的最后一个参数来说,如果为NULL,那么select会阻塞直到有句柄可读写为止)

    调用numevents=aeApiPoll(eventloop, tvp),返回可读写的事件个数,遍历eventloop->events数组,此数组以fd作为下标索引事件,而fd则存储在eventloop->fired数组中。

    因此从numevents到fe的索引逻辑如下图:

    事件可读,则对调用fe->rfileProc;事件可写,那么调用fe->wfileProc进行IO写。而fe->rfileProc,fe->wfileProc的初始化则在initServer中;

    initServer的主要代码如下:

    // 注册新号处理函数,对于SIGHUP,SIGPIPE,忽略。
    // 初始化共享对象
    
    // 关键,初始化redis的事件循环主结构
    server.el = aeCreateEventLoop(server.maxclients+1024);
    
    // 初始化作为server的网络连接
    server.ipfd = anetTcpServer(server.neterr,server.port,server.bindaddr);
    
    //初始化db
    
    // 初始化pubsub
    
    //关键,关联时间时间
    aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
    
    
    // 将前面创建的server socket即server.ipfd与redis的事件循环关联。
    if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.ipfd file event.");
    
    //初始化bio

    其中关键的函数为aeCreateFileEvent,这里将server.ipfd于server.el进行关联,而acceptTcpHandler将用来初始化fe->rfileProc,以及fe->wfileProc。

    进入aeCreateFileEvent函数中,此函数比较重要,我们逐行分析:

    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
            aeFileProc *proc, void *clientData)
    {
        if (fd >= eventLoop->setsize) return AE_ERR;
        aeFileEvent *fe = &eventLoop->events[fd];
    
        // 监听指定 fd
        if (aeApiAddEvent(eventLoop, fd, mask) == -1)
            return AE_ERR;
    
        // 设置文件事件类型
        fe->mask |= mask;
        if (mask & AE_READABLE) fe->rfileProc = proc;
        if (mask & AE_WRITABLE) fe->wfileProc = proc;
    
        fe->clientData = clientData;
    
        // 如果有需要,更新事件处理器的最大 fd
        if (fd > eventLoop->maxfd)
            eventLoop->maxfd = fd;
    
        return AE_OK;
    }

    上面对此调用aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, acceptTcpHandler,NULL)

    其中eventloop为redis核心事件循环,fd为server.ipfd,mask为AE_READABLE,proc=acceptTcpHandler,即fe->rfileProc以及fe->wfileProc均为acceptTcpHandler,

    clientdata为NULL。

    总结为:server端建立服务连接后,用于接收client响应的socket即server.ipfd(用socket(AF_INET,SOCK_STREAM,0)创建出socket),创建的socket监听读事件,对应的事件处理函数为acceptTcpHandler。

    查看acceptTcpHandler函数代码:

    // 其中调用accept,创建cfd服务客户端
    cfd = anetTcpAccept(server.neterr, fd, cip, &cport)
    
    // 为cfd创建redisClient,获取客户端查询命令
    // 调用readQueryFromClient()获取客户端buffer。
    acceptCommandHandler(cfd,0)

    cfd为来自客户端的响应创建的socket,为此客户端服务。在acceptCommandHandler中为cfd创建redisclient。关键代码为:

    static void acceptCommonHandler(int fd, int flags) {
        redisClient *c;
    
        // 创建新客户端
        if ((c = createClient(fd)) == NULL) {
            redisLog(REDIS_WARNING,"Error allocating resources for the client");
            close(fd); /* May be already closed, just ignore errors */
            return;
        }
    
        /* If maxclient directive is set and this is one client more... close the
         * connection. Note that we create the client instead to check before
         * for this condition, since now the socket is already set in nonblocking
         * mode and we can send an error for free using the Kernel I/O */
    
        if (listLength(server.clients) > server.maxclients) {
            char *err = "-ERR max number of clients reached
    ";
    
            /* That's a best effort error message, don't check write errors */
            // 发送错误信息到客户端
            if (write(c->fd,err,strlen(err)) == -1) {
                /* Nothing to do, Just to avoid the warning... */
            }
            server.stat_rejected_conn++;
            // 释放客户端
            freeClient(c);
            return;
        }
        server.stat_numconnections++;
        c->flags |= flags;
    }

    其中调用createClient为cfd这个句柄创建redisClient,在createClient中对socket设置了O_NONBLOCK以及TCP_NODELAY属性,即非阻塞及不延迟模式。

    同时在此函数为cfd添加读事件,监听来自客户端的命令请求。具体的细节,在后面系列中再深入剖析。

    对系列1进行总结:从redis.c的main函数入口,做必要的初始化、配置读取、创建redis的核心事件循环eventloop、创建server网络连接,并将事件循环与server的网络连接进行关联,并基于不同平台的IO多路复用机制实现redis自己的多路分离机制。

    基于此,可以绘制出易于理解的思维导图:

    小旗子中标出的为重点理解部分。

  • 相关阅读:
    [翻译] M13ProgressSuite
    控制器转场动画详解
    [翻译] SIAlertView
    隐藏导航栏之后支持手势退回上一个控制器
    UIView的无损截图
    [翻译] UIColor-uiGradientsAdditions
    简化通知中心的使用
    Java Web应用的开发环境配置
    StartUML的基础的使用,用例图,序列图
    SQLyog图形化l数据库的操作和学习
  • 原文地址:https://www.cnblogs.com/crafet/p/4659802.html
Copyright © 2011-2022 走看看