zoukankan      html  css  js  c++  java
  • redis源码解析之事件驱动

    Redis 内部有个小型的事件驱动,它主要处理两项任务:

    1. 文件事件:使用I/O多路复用技术处理多个客户端请求,并返回执行结果。
    2. 时间事件:维护服务器的资源管理,状态检查。

    主要的数据结构包括文件事件结构体,时间事件结构体,触发事件结构体,事件循环结构体

    /* File event structure */
    typedef struct aeFileEvent {
        int mask; /* one of AE_(READABLE|WRITABLE) */
        aeFileProc *rfileProc;
        aeFileProc *wfileProc;
        void *clientData;
    } aeFileEvent;
    
    /* Time event structure */
    typedef struct aeTimeEvent {
        long long id; /* time event identifier. */
        long when_sec; /* seconds */
        long when_ms; /* milliseconds */
        aeTimeProc *timeProc;
        aeEventFinalizerProc *finalizerProc;
        void *clientData;
        struct aeTimeEvent *next;
    } aeTimeEvent;
    
    /* A fired event */
    typedef struct aeFiredEvent {
        int fd;
        int mask;
    } aeFiredEvent;
    
    /* State of an event based program */
    typedef struct aeEventLoop {
        int maxfd;   /* highest file descriptor currently registered */
        int setsize; /* max number of file descriptors tracked */
        long long timeEventNextId;
        time_t lastTime;     /* Used to detect system clock skew */
        aeFileEvent *events; /* Registered events */
        aeFiredEvent *fired; /* Fired events */
        aeTimeEvent *timeEventHead;
        int stop;
        void *apidata; /* This is used for polling API specific data */
        aeBeforeSleepProc *beforesleep;
    } aeEventLoop;

    首先通过aeCreateEventLoop()函数创建时间循环结构体

    aeEventLoop *aeCreateEventLoop(int setsize) {
        aeEventLoop *eventLoop;
        int i;
    
        if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
        eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
        eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
        if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
        eventLoop->setsize = setsize;
        eventLoop->lastTime = time(NULL);
        eventLoop->timeEventHead = NULL;
        eventLoop->timeEventNextId = 0;
        eventLoop->stop = 0;
        eventLoop->maxfd = -1;
        eventLoop->beforesleep = NULL;
        if (aeApiCreate(eventLoop) == -1) goto err;
        /* Events with mask == AE_NONE are not set. So let's initialize the
         * vector with it. */
        for (i = 0; i < setsize; i++)
            eventLoop->events[i].mask = AE_NONE;
        return eventLoop;
    
    err:
        if (eventLoop) {
            zfree(eventLoop->events);
            zfree(eventLoop->fired);
            zfree(eventLoop);
        }
        return NULL;
    }

    其中aeApiCreate()是核心处理函数,redis 根据不同系统构建了不同的多路复用实现:例如linux的epoll,OS X的kqueue,windows的select。

    initServer() 为监听套接字注册了读事件acceptTcpHandler()或者 acceptUnixHandler()。起作用是当有客户端连接进来的时候调用它,并注册读时间,回调函数为从客户端读取命令的函数readQueryFromClient()

    创建了aeEventLoop后就进入时间循环aeProcessEvents()中,调用aeApiPoll()来监听事件发生。

    获取阻塞时间后,就开始文件事件的触发获取。得到所有触发事件,然后遍历文件事件触发数组(eventLoop->fired),得到fd,然后获取对应的文件事件,这里的fired已经看出,它只是个索引。调用对应的回调函数,结束文件事件的处理。

    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
        int processed = 0, numevents;
    
        /* Nothing to do? return ASAP */
        if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    
        /* Note that we want call select() even if there are no
         * file events to process as long as we want to process time
         * events, in order to sleep until the next time event is ready
         * to fire. */
        if (eventLoop->maxfd != -1 ||
            ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
            int j;
            aeTimeEvent *shortest = NULL;
            struct timeval tv, *tvp;
    
            if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
                shortest = aeSearchNearestTimer(eventLoop);
            if (shortest) {
                long now_sec, now_ms;
    
                /* Calculate the time missing for the nearest
                 * timer to fire. */
                aeGetTime(&now_sec, &now_ms);
                tvp = &tv;
                tvp->tv_sec = shortest->when_sec - now_sec;
                if (shortest->when_ms < now_ms) {
                    tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                    tvp->tv_sec --;
                } else {
                    tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
                }
                if (tvp->tv_sec < 0) tvp->tv_sec = 0;
                if (tvp->tv_usec < 0) tvp->tv_usec = 0;
            } else {
                /* If we have to check for events but need to return
                 * ASAP because of AE_DONT_WAIT we need to set the timeout
                 * to zero */
                if (flags & AE_DONT_WAIT) {
                    tv.tv_sec = tv.tv_usec = 0;
                    tvp = &tv;
                } else {
                    /* Otherwise we can block */
                    tvp = NULL; /* wait forever */
                }
            }
    
            numevents = aeApiPoll(eventLoop, tvp);
            for (j = 0; j < numevents; j++) {
                aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
                int mask = eventLoop->fired[j].mask;
                int fd = eventLoop->fired[j].fd;
                int rfired = 0;
    
            /* note the fe->mask & mask & ... code: maybe an already processed
                 * event removed an element that fired and we still didn't
                 * processed, so we check if the event is still valid. */
                if (fe->mask & mask & AE_READABLE) {
                    rfired = 1;
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                }
                if (fe->mask & mask & AE_WRITABLE) {
                    if (!rfired || fe->wfileProc != fe->rfileProc)
                        fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                }
                processed++;
            }
        }
        /* Check time events */
        if (flags & AE_TIME_EVENTS)
            processed += processTimeEvents(eventLoop);
    
        return processed; /* return the number of processed file/time events */
    }

    当用户在客户端输入命令后,触发读时间,服务器调用readQueryFromClient()来读取命令,每条命令处理完成之后都会调用addReply(),其中之一的作用是注册写事件,回调函数sendReplyToClient(),目的是将处理结果写回客户端。

    处理完文件事件后再执行时间事件(serverCron

    TimeEvent被组织为一个单向链表,表头指针timeEventHead保存在核心数据结构aeEventLoop中。aeMain函数在每一轮循环中都会遍历该链表,针对每个TimeEvent,先调用gettimeofday获取系统当前时间,如果它比TimeEvent中的时间要小,则说明TimeEvent还没触发,应继续前进,否则说明TimeEvent已经触发了,立即调用超时处理函数,接下来根据处理函数的返回值分两种情况讨论:

    1)若处理函数返回-1,那么把这个TimeEvent删掉。

    2)否则,根据返回值修改当前的TimeEvent。比如返回5000,这个TimeEvent就会在5秒后再次被触发。

    由于情况1)我们不能由当前结点到达下一结点,于是就又从表头开始遍历。

    在目前的版本中,正常模式下的Redis 只带有serverCron 一个时间事件,而在benchmark 模
    式下,Redis 也只使用两个时间事件。
    在这种情况下,程序几乎是将无序链表退化成一个指针来使用,所以使用无序链表来保存时间
    事件,并不影响事件处理器的性能。

  • 相关阅读:
    go2基本类型
    go1
    android studio 使用
    ios34---GDC,dispatch_once
    ios33--线程通信
    ios33--线程安全
    ios32---线程的状态
    ios31--NSThread
    ios30---pthread, NSThread, GCD, NSOperation
    ios29--多线程
  • 原文地址:https://www.cnblogs.com/loujiayu/p/3813964.html
Copyright © 2011-2022 走看看