zoukankan      html  css  js  c++  java
  • Redis的事件机制

    Redis程序的运行过程是一个处理事件的过程,也称Redis是一个事件驱动的服务。Redis中的事件分两类:文件事件(File Event)、时间事件(Time Event)。文件事件处理文件的读写操作,特别是与客户端通信的Socket文件描述符的读写操作;时间事件主要用于处理一些定时处理的任务。

    本文首先介绍Redis的运行过程,阐明Redis程序是一个事件驱动的程序;接着介绍事件机制实现中涉及的数据结构以及事件的注册;最后介绍了处理客户端中涉及到的套接字文件读写事件。

    一、Redis的运行过程

    Redis的运行过程是一个事件处理的过程,可以通过下图反映出来:

    ​ 图1 Redis的事件处理过程

    从上图可以看出:Redis服务器的运行过程就是循环等待并处理事件的过程。通过时间事件将运行事件分成一个个的时间分片,如图1的右半部分所示。如果在指定的时间分片中,有文件事件发生,如:读文件描述符可读、写文件描述符可写,则调用相应的处理函数进行文件的读写处理。文件事件处理完成之后,处理期望发生时间在当前时间之前或正好是当前时刻的时间事件。然后再进入下一次循环迭代处理。

    如果在指定的事件间隔中,没有文件事件发生,则不需要处理,直接进行时间事件的处理,如下图所示。

    ​ 图2 Redis的事件处理过程(无文件事件发生)

    二、事件数据结构

    2.1 文件事件数据结构

    Redis用如下结构体来记录一个文件事件:

    /* File event structure */
    typedef struct aeFileEvent {
        int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
        aeFileProc *rfileProc;
        aeFileProc *wfileProc;
        void *clientData;
    } aeFileEvent;
    

    通过mask来描述发生了什么事件:

    • AE_READABLE:文件描述符可读;
    • AE_WRITABLE:文件描述符可写;
    • AE_BARRIER:文件描述符阻塞

    rfileProc和wfileProc分别为读事件和写事件发生时的回调函数,其函数签名如下:

    typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
    
    2.2 事件事件数据结构

    Redis用如下结构体来记录一个时间事件:

    /* Time event structure */
    typedef struct aeTimeEvent {
        long long id; /* time event identifier. */
        long when_sec; /* seconds */
        long when_ms; /* milliseconds */
        aeTimeProc *timeProc;
        aeEventFinalizerProc *finalizerProc;
        void *clientData;
        struct aeTimeEvent *prev;
        struct aeTimeEvent *next;
    } aeTimeEvent;
    

    when_sec和when_ms指定时间事件发生的时间,timeProc为时间事件发生时的处理函数,签名如下:

    typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
    

    prev和next表明时间事件构成了一个双向链表。

    3.3 事件循环

    Redis用如下结构体来记录系统中注册的事件及其状态:

    /* State of an event based program */
    typedef struct aeEventLoop {
        int maxfd;   /* highest file descriptor currently registered */
        int setsize; /* max number of file descriptors tracked */
        long long timeEventNextId;
        time_t lastTime;     /* Used to detect system clock skew */
        aeFileEvent *events; /* Registered events */
        aeFiredEvent *fired; /* Fired events */
        aeTimeEvent *timeEventHead;
        int stop;
        void *apidata; /* This is used for polling API specific data */
        aeBeforeSleepProc *beforesleep;
        aeBeforeSleepProc *aftersleep;
    } aeEventLoop;
    

    这一结构体中,最主要的就是文件事件指针events和时间事件头指针timeEventHead。文件事件指针event指向一个固定大小(可配置)数组,通过文件描述符作为下标,可以获取文件对应的事件对象。

    三、事件的注册过程

    事件驱动的程序实际上就是在事件发生时,调用相应的处理函数(即:回调函数)进行逻辑处理。因此关于事件,程序需要知道:①事件的发生;② 回调函数。事件的注册过程就是告诉程序这两。下面我们分别从文件事件、时间事件的注册过程进行阐述。

    3.1 文件事件的注册过程

    对于文件事件:

    • 事件的发生:应用程序需要知道哪些文件描述符发生了哪些事件。感知文件描述符上有事件发生是由操作系统的职责,应用程序需要告诉操作系统,它关心哪些文件描述符的哪些事件,这样通过相应的系统API就会返回发生了事件的文件描述符。
    • 回调函数:应用程序知道了文件描述符发生了事件之后,需要调用相应回调函数进行处理,因而需要在事件发生之前将相应的回调函数准备好。

    这就是文件事件的注册过程,函数的实现如下:

    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
            aeFileProc *proc, void *clientData)
    {
        if (fd >= eventLoop->setsize) {
            errno = ERANGE;
            return AE_ERR;
        }
        aeFileEvent *fe = &eventLoop->events[fd];
    
        if (aeApiAddEvent(eventLoop, fd, mask) == -1)
            return AE_ERR;
        fe->mask |= mask;
        if (mask & AE_READABLE) fe->rfileProc = proc;
        if (mask & AE_WRITABLE) fe->wfileProc = proc;
        fe->clientData = clientData;
        if (fd > eventLoop->maxfd)
            eventLoop->maxfd = fd;
        return AE_OK;
    }
    

    这段代码逻辑非常清晰:首先根据文件描述符获得文件事件对象,接着在操作系统中添加自己关心的文件描述符(addApiAddEvent),最后将回调函数记录到文件事件对象中。因此,一个线程就可以同时监听多个文件事件,这就是IO多路复用。操作系统提供多种IO多路复用模型,如:Select模型、Poll模型、EPOLL模型等。Redis支持所有这些模型,用户可以根据需要进行选择。不同的模型,向操作系统添加文件描述符方式也不同,Redis将这部分逻辑封装在aeApiAddEvent中,下面代码是EPOLL模型的实现:

    static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
        aeApiState *state = eventLoop->apidata;
        struct epoll_event ee = {0}; /* avoid valgrind warning */
        /* If the fd was already monitored for some event, we need a MOD
         * operation. Otherwise we need an ADD operation. */
        int op = eventLoop->events[fd].mask == AE_NONE ?
                EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    
        ee.events = 0;
        mask |= eventLoop->events[fd].mask; /* Merge old events */
        if (mask & AE_READABLE) ee.events |= EPOLLIN;
        if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
        ee.data.fd = fd;
        if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
        return 0;
    }
    

    这段代码就是对操作系统调用epoll_ctl()的封装,EPOLLIN对应的是读(输入)事件,EPOLLOUT对应的是写(输出)事件。

    3.2 时间事件的注册过程

    对于时间事件:

    • 事件的发生:当前时刻正好是事件期望发生的时刻,或者是晚于事件期望发生的时刻,所以需要让程序知道事件期望发生的时刻;
    • 回调函数:此时调用回调函数进行处理,所以需要让程序知道事件的回调函数。

    对应的事件事件注册函数如下:

    long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
            aeTimeProc *proc, void *clientData,
            aeEventFinalizerProc *finalizerProc)
    {
        long long id = eventLoop->timeEventNextId++;
        aeTimeEvent *te;
    
        te = zmalloc(sizeof(*te));
        if (te == NULL) return AE_ERR;
        te->id = id;
        aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
        te->timeProc = proc;
        te->finalizerProc = finalizerProc;
        te->clientData = clientData;
        te->prev = NULL;
        te->next = eventLoop->timeEventHead;
        if (te->next)
            te->next->prev = te;
        eventLoop->timeEventHead = te;
        return id;
    }
    

    这段代码逻辑也是非常简单:首先创建时间事件对象,接着设置事件,设置回调函数,最后将事件事件对象插入到时间事件链表中。设置时间事件期望发生的时间比较简单:

    static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
        long cur_sec, cur_ms, when_sec, when_ms;
    
        aeGetTime(&cur_sec, &cur_ms);
        when_sec = cur_sec + milliseconds/1000;
        when_ms = cur_ms + milliseconds%1000;
        if (when_ms >= 1000) {
            when_sec ++;
            when_ms -= 1000;
        }
        *sec = when_sec;
        *ms = when_ms;
    }
    
    static void aeGetTime(long *seconds, long *milliseconds)
    {
        struct timeval tv;
    
        gettimeofday(&tv, NULL);
        *seconds = tv.tv_sec;
        *milliseconds = tv.tv_usec/1000;
    }
    

    当前时间加上期望的时间间隔,作为事件期望发生的时刻。

    四、套接字文件事件

    Redis为客户端提供存储数据和获取数据的缓存服务,监听并处理来自请求,将结果返回给客户端,这一过程将会发生以下文件事件:

    与上图相对应,对于一个请求,Redis会注册三个文件事件:

    4.1 TCP连接建立事件

    服务器初始化时,在服务器套接字上注册TCP连接建立的事件。

    void initServer(void) {
        /* Create an event handler for accepting new connections in TCP and Unix
         * domain sockets. */
        for (j = 0; j < server.ipfd_count; j++) {
            if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
                acceptTcpHandler,NULL) == AE_ERR)
                {
                    serverPanic(
                        "Unrecoverable error creating server.ipfd file event.");
                }
        }
    }
    

    回调函数为acceptTcpHandler,该函数最重要的职责是创建客户端结构。

    4.2 客户端套接字读事件

    创建客户端:在客户端套接字上注册客户端套接字可读事件。

    if (aeCreateFileEvent(server.el,fd,AE_READABLE,
                          readQueryFromClient, c) == AE_ERR)
    {
        close(fd);
        zfree(c);
        return NULL;
    }
    

    回调函数为readQueryFromClient,顾名思义,此函数将从客户端套接字中读取数据。

    4.3 向客户端返回数据

    Redis完成请求后,Redis并非处理完一个请求后就注册一个写文件事件,然后事件回调函数中往客户端写回结果。根据图1,检测到文件事件发生后,Redis对这些文件事件进行处理,即:调用rReadProc或writeProc回调函数。处理完成后,对于需要向客户端写回的数据,先缓存到内存中:

    typedef struct client {
        // ...其他字段
        
        list *reply;            /* List of reply objects to send to the client. */
        
        /* Response buffer */
        int bufpos;
        char buf[PROTO_REPLY_CHUNK_BYTES];
    };
    

    发送给客户端的数据会存放到两个地方:

    • reply指针存放待发送的对象;

    • buf中存放待返回的数据,bufpos指示数据中的最后一个字节所在位置。

    这里遵循一个原则:只要能存放在buf中,就尽量存入buf字节数组中,如果buf存不下了,才存放在reply对象数组中。

    写回发生在进入下一次等待文件事件之前,见图1中【等待前处理】,会调用以下函数来处理客户端数据写回逻辑:

    int writeToClient(int fd, client *c, int handler_installed) {
        while(clientHasPendingReplies(c)) {
            if (c->bufpos > 0) {
                nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
                if (nwritten <= 0) break;
                c->sentlen += nwritten;
                totwritten += nwritten;
                if ((int)c->sentlen == c->bufpos) {
                    c->bufpos = 0;
                    c->sentlen = 0;
                }
            } else {
                o = listNodeValue(listFirst(c->reply));
                objlen = o->used;
                if (objlen == 0) {
                    c->reply_bytes -= o->size;
                    listDelNode(c->reply,listFirst(c->reply));
                    continue;
                }
    
                nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
                if (nwritten <= 0) break;
                c->sentlen += nwritten;
                totwritten += nwritten;
            }
        }
    }
    

    上述函数只截取了数据发送部分,首先发送buf中的数据,然后发送reply中的数据。

    有读者可能会疑惑:write()系统调用是阻塞式的接口,上述做法会不会在write()调用的地方有等待,从而导致性能低下?这里就要介绍Redis是怎么处理这个问题的。

    首先,我们发现创建客户端的代码:

    client *createClient(int fd) {
        client *c = zmalloc(sizeof(client));
        if (fd != -1) {
            anetNonBlock(NULL,fd);
        }
    }
    

    可以看到设置fd是非阻塞(NonBlock),这就保证了在套接字fd上的read()和write()系统调用不是阻塞的。

    其次,和文件事件的处理操作一样,往客户端写数据的操作也是批量的,函数如下:

    int handleClientsWithPendingWrites(void) {
        listRewind(server.clients_pending_write,&li);
        while((ln = listNext(&li))) {
            /* Try to write buffers to the client socket. */
            if (writeToClient(c->fd,c,0) == C_ERR) continue;
            /* If after the synchronous writes above we still have data to
             * output to the client, we need to install the writable handler. */
            if (clientHasPendingReplies(c)) {
                int ae_flags = AE_WRITABLE;
                if (aeCreateFileEvent(server.el, c->fd, ae_flags,
                    sendReplyToClient, c) == AE_ERR)
                {
                        freeClientAsync(c);
                }
            }
        }
    }
    

    可以看到,首先对每个客户端调用刚才介绍的writeToClient()函数进行写数据,如果还有数据没写完,那么注册写事件,当套接字文件描述符写就绪时,调用sendReplyToClient()进行剩余数据的写操作:

    void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
        writeToClient(fd,privdata,1);
    }
    

    仔细想一下就明白了:处理完得到结果后,这时套接字的写缓冲区一般是空的,因此write()函数调用成功,所以就不需要注册写文件事件了。如果写缓冲区满了,还有数据没写完,此时再注册写文件事件。并且在数据写完后,将写事件删除:

    int writeToClient(int fd, client *c, int handler_installed) {
        if (!clientHasPendingReplies(c)) {
            if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
        }
    }
    

    注意到,在sendReplyToClient()函数实现中,第三个参数正好是1。

  • 相关阅读:
    Django-ORM
    Django-路由系统
    Django-(Request对象和Response对象)
    Django-(CBV和FBV)
    批量设置模板中的时间格式
    Django模板语言-(母板、组件、静态文件相关、simple_tag、inclusion_tag)
    yii2csrf攻击
    centos6更改密码
    ide vscode安装
    xshell配色方案
  • 原文地址:https://www.cnblogs.com/lijihong/p/13388090.html
Copyright © 2011-2022 走看看