zoukankan      html  css  js  c++  java
  • redis 文件事件模型

    参考文献:

    1. 深入剖析 redis 事件驱动
    2. Redis 中的事件循环
    3. 深入了解epoll (转)
    4. Redis自己的事件模型 ae
    5. EPOLL(7)
    6. Linux IO模式及 select、poll、epoll详解
    7. epoll为什么这么快,epoll的实现原理

    概述

    在redis中,对于对于文件事件的处理采用了Reactor模型。总体来说,就是将io多路复用所监听到的文件去处,并放入一个队列中依次处理。接下去本文以一个io多路复用的例子开始,一步步还原redis文件事件的运行过程

    epoll (本节从Linux IO模式及 select、poll、epoll详解摘抄)

    epoll使用的过程中需要如下的三个接口:

    int epoll_create(int size);//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大
    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
    

    int epoll_create(int size)

    创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值,参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议。
    当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

    函数是对指定描述符fd执行op操作。

    • epfd:是epoll_create()的返回值。
    • op:表示op操作,用三个宏来表示:添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分别添加、删除和修改对fd的监听事件。
    • fd:是需要监听的fd(文件描述符)
    • epoll_event:是告诉内核需要监听什么事,struct epoll_event结构如下:
    struct epoll_event {
      __uint32_t events;  /* Epoll events */
      epoll_data_t data;  /* User data variable */
    };
    
    //events可以是以下几个宏的集合:
    EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
    EPOLLOUT:表示对应的文件描述符可以写;
    EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
    EPOLLERR:表示对应的文件描述符发生错误;
    EPOLLHUP:表示对应的文件描述符被挂断;
    EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
    EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
    

    int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

    等待epfd上的io事件,最多返回maxevents个事件。
    参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

    一个epoll的示例

    有了上面的论述,用一个简单的例子来说明下epoll的使用(来自http://man7.org/linux/man-pages/man7/epoll.7.html):

    #define MAX_EVENTS 10
    struct epoll_event ev, events[MAX_EVENTS];
    int listen_sock, conn_sock, nfds, epollfd;
    
    /* Code to set up listening socket, 'listen_sock',
       (socket(), bind(), listen()) omitted */
    
    epollfd = epoll_create1(0);
    if (epollfd == -1) {
    	perror("epoll_create1");
    	exit(EXIT_FAILURE);
    }
    
    ev.events = EPOLLIN;
    ev.data.fd = listen_sock;
    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
    	perror("epoll_ctl: listen_sock");
    	exit(EXIT_FAILURE);
    }
    
    for (;;) {
    	nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
    	if (nfds == -1) {
    		perror("epoll_wait");
    		exit(EXIT_FAILURE);
    	}
    
    	for (n = 0; n < nfds; ++n) {
    		if (events[n].data.fd == listen_sock) {
    			conn_sock = accept(listen_sock,
    					(struct sockaddr *) &addr, &addrlen);
    			if (conn_sock == -1) {
    				perror("accept");
    				exit(EXIT_FAILURE);
    			}
    			setnonblocking(conn_sock);
    			ev.events = EPOLLIN | EPOLLET;
    			ev.data.fd = conn_sock;
    			if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,
    						&ev) == -1) {
    				perror("epoll_ctl: conn_sock");
    				exit(EXIT_FAILURE);
    			}
    		} else {
    			do_use_fd(events[n].data.fd);
    		}
    	}
    }
    

    image
    如图所示,可以看出使用epoll的过程。接下来将介绍redis事件驱动模型中主要涉及的数据结构。

    redis事件驱动模型

    数据结构

    redis事件驱动模型中主要涉及到如下的几个数据结构:

    1. aeCreateEventLoop.
    2. aeApiState.
    3. aeFileEvent.
    4. aeFiredEvent.
      redis事件处理的核心是aeCreateEventLoop结构,如图可以看出主要的结构体如下:
      image
    typedef struct aeEventLoop {
    
        // 目前已注册的最大描述符
        int maxfd;   /* highest file descriptor currently registered */
    
        // 目前已追踪的最大描述符
        int setsize; /* max number of file descriptors tracked */
    
        // 用于生成时间事件 id
        long long timeEventNextId;
    
        // 最后一次执行时间事件的时间
        time_t lastTime;     /* Used to detect system clock skew */
    
        // 已注册的文件事件
        aeFileEvent *events; /* Registered events */
    
        // 已就绪的文件事件
        aeFiredEvent *fired; /* Fired events */
    
        // 时间事件
        aeTimeEvent *timeEventHead;
    
        // 事件处理器的开关
        int stop;
    
        // 多路复用库的私有数据
        void *apidata; /* This is used for polling API specific data */
    
        // 在处理事件前要执行的函数
        aeBeforeSleepProc *beforesleep;
    
    } aeEventLoop;
    

    其中aeFileEvent 结构体为已经注册并需要监听的事件的结构体。在redis初始化的时候会创建一个 setSizesizeof(aeFileEvent) 以及一个 setSizesiezeof(aeFiredEvent) 大小的内存,用文件描述符作为其索引。那么这个大小定位多少合适呢?在Linux个中,文件描述符是个有限的资源,当打开一个文件时就会消耗一个文件描述符,当关闭该文件描述符或者程序结束时会释放该文件描述符资源,从而供其他文件打开操作使用。当文件描述符超过最大值后,打开文件就会出错。那么这个最大值是多少呢?可以通过/proc/sys/fs/file-max看到系统支持的最大的文件描述符数。通过 ulimit -n 可以看到当前用户能打开的最大的文件描述符。在我这里的一台8g内存的机器上,系统支持最大的文件描述是365146。而在这台64bit的机器上 sizeof(aeFiredEvent) + sizeof(aeFileEvent) 大小为40byte。按系统最大支持的文件描述符来算,固定消耗内存为14.6M。这样以文件描述符作为数组的下标来索引,虽然这样的哈希在接入量不大的情况下会有大量的浪费。但是最多也就浪费14M 的内存,因此这样的设计是可取的。【4】

    typedef struct aeFileEvent {
    
        // 监听事件类型掩码,
        // 值可以是 AE_READABLE 或 AE_WRITABLE ,
        // 或者 AE_READABLE | AE_WRITABLE
        int mask; /* one of AE_(READABLE|WRITABLE) */
    
        // 读事件处理器
        aeFileProc *rfileProc;
    
        // 写事件处理器
        aeFileProc *wfileProc;
    
        // 多路复用库的私有数据
        void *clientData;
    
    } aeFileEvent;
    

    aeFiredEvent结构体是已经监听到有事件发生的描述符的集合。

    typedef struct aeFiredEvent {
    
        // 已就绪文件描述符
        int fd;
    
        // 事件类型掩码,
        // 值可以是 AE_READABLE 或 AE_WRITABLE
        // 或者是两者的或
        int mask;
    
    } aeFiredEvent;
    
    

    void *apidata;在ae创建的时候,会被赋值为aeApiState结构体,结构体的定义如下:

    typedef struct aeApiState {
    
        // epoll_event 实例描述符
        int epfd;
    
        // 事件槽
        struct epoll_event *events;
    
    } aeApiState;
    

    可以见得,这个结构体是为了epoll所准备的数据结构。redis可以选择不同的io多路复用方法。因此 apidata 是个void类型,根据不同的io多路复用库来选择。

    Reactor模型的创建与使用

    aeEventLoop 的创建

    aeEventLoop *aeCreateEventLoop(int setsize) {
        aeEventLoop *eventLoop;
        int i;
    
        // 创建事件状态结构
        if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    
        // 初始化文件事件结构和已就绪文件事件结构数组
        eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
        eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
        if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
        // 设置数组大小
        eventLoop->setsize = setsize;
        // 初始化执行最近一次执行时间
        eventLoop->lastTime = time(NULL);
    
        // 初始化时间事件结构
        eventLoop->timeEventHead = NULL;
        eventLoop->timeEventNextId = 0;
    
        eventLoop->stop = 0;
        eventLoop->maxfd = -1;
        eventLoop->beforesleep = NULL;
        if (aeApiCreate(eventLoop) == -1) goto err;
    
        /* Events with mask == AE_NONE are not set. So let's initialize the
         * vector with it. */
        // 初始化监听事件
        for (i = 0; i < setsize; i++)
            eventLoop->events[i].mask = AE_NONE;
    
        // 返回事件循环
        return eventLoop;
    
    err:
        if (eventLoop) {
            zfree(eventLoop->events);
            zfree(eventLoop->fired);
            zfree(eventLoop);
        }
        return NULL;
    }
    

    如下图所示,可以见得在初始化的时候创建结构体的流程。

    graph LR
    创建aeFileEvent-->创建aeFireEvent
    创建aeFireEvent-->调用aeApiCreate创建aeApiState 
    

    函数aeApiCreate则创建了一个epoll所需要的数据结构。

    /*
     * 创建一个新的 epoll 实例,并将它赋值给 eventLoop
     */
    static int aeApiCreate(aeEventLoop *eventLoop) {
    
        aeApiState *state = zmalloc(sizeof(aeApiState));
    
        if (!state) return -1;
    
        // 初始化事件槽空间
        state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
        if (!state->events) {
            zfree(state);
            return -1;
        }
    
        // 创建 epoll 实例
        state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
        if (state->epfd == -1) {
            zfree(state->events);
            zfree(state);
            return -1;
        }
    
        // 赋值给 eventLoop
        eventLoop->apidata = state;
        return 0;
    }
    
    

    aeFileEvent的注册

    在创建了aeEventLoop之后,对于需要监听的文件描述符需要进行注册,在aeFileEvent结构体中,可以看到如下的两个结构aeFileProc *rfileProc和aeFileProc *wfileProc,就是在注册监听事件的时候进行赋值的。
    函数aeCreateFileEvent执行创建aeFileEvent和添加文件句柄到epoll中。

    /*
     * 根据 mask 参数的值,监听 fd 文件的状态,
     * 当 fd 可用时,执行 proc 函数
     */
    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
            aeFileProc *proc, void *clientData)
    {
        if (fd >= eventLoop->setsize) {
            errno = ERANGE;
            return AE_ERR;
        }
    
        if (fd >= eventLoop->setsize) return AE_ERR;
    
        // 取出文件事件结构
        aeFileEvent *fe = &eventLoop->events[fd];
    
        // 监听指定 fd 的指定事件
        if (aeApiAddEvent(eventLoop, fd, mask) == -1)
            return AE_ERR;
    
        // 设置文件事件类型,以及事件的处理器
        fe->mask |= mask;
        if (mask & AE_READABLE) fe->rfileProc = proc;
        if (mask & AE_WRITABLE) fe->wfileProc = proc;
    
        // 私有数据
        fe->clientData = clientData;
    
        // 如果有需要,更新事件处理器的最大 fd
        if (fd > eventLoop->maxfd)
            eventLoop->maxfd = fd;
    
        return AE_OK;
    }
    
    

    其中aeApiAddEvent函数就是在开头之中epoll例子中添加一个文件描述符到监听集合中的方法封装函数:

    /*
     * 关联给定事件到 fd
     */
    static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
        aeApiState *state = eventLoop->apidata;
        struct epoll_event ee;
    
        /* If the fd was already monitored for some event, we need a MOD
         * operation. Otherwise we need an ADD operation.
         *
         * 如果 fd 没有关联任何事件,那么这是一个 ADD 操作。
         *
         * 如果已经关联了某个/某些事件,那么这是一个 MOD 操作。
         */
        int op = eventLoop->events[fd].mask == AE_NONE ?
                EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    
        // 注册事件到 epoll
        ee.events = 0;
        mask |= eventLoop->events[fd].mask; /* Merge old events */
        if (mask & AE_READABLE) ee.events |= EPOLLIN;
        if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
        ee.data.u64 = 0; /* avoid valgrind warning */
        ee.data.fd = fd;
    
        if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    
        return 0;
    }
    
    

    事件驱动模型的运行过程

    到这里redis事件驱动的主要数据结构和初始化的方法已经介绍完毕。接下来将展示事件驱动的运行过程。在redis源码中,省略去其他部分,跟事件驱动相关的代码如下:

        server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
        /* Create an event handler for accepting new connections in TCP and Unix
         * domain sockets. */
        // 为 TCP 连接关联连接应答(accept)处理器
        // 用于接受并应答客户端的 connect() 调用
        for (j = 0; j < server.ipfd_count; j++) {
            if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
                acceptTcpHandler,NULL) == AE_ERR)
                {
                    redisPanic(
                        "Unrecoverable error creating server.ipfd file event.");
                }
        }
        
        // 为本地套接字关联应答处理器
        if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
            acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
        aeMain(server.el);
    
    

    其中aeCreateEventLoop和aeCreateFileEvent函数在之前已经介绍过。接下来重点介绍下aeMain函数:

    
    void aeMain(aeEventLoop * eventLoop) {
    
        eventLoop->stop = 0;
        while (!eventLoop->stop){
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);
            aeProcessEvents(eventLoop, AE_ALL_EVENTS);
        }
    }
    

    我们可以看出,aeMain函数中主要调用了aeProcessEvents处理事件,aeProcessEvents中我们略去其他的代码,主要关注如下的部分:

    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
        ....
        // 处理文件事件,阻塞时间由 tvp 决定
            numevents = aeApiPoll(eventLoop, tvp);
            for (j = 0; j < numevents; j++) {
                // 从已就绪数组中获取事件
                aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
    
                int mask = eventLoop->fired[j].mask;
                int fd = eventLoop->fired[j].fd;
                int rfired = 0;
    
               /* note the fe->mask & mask & ... code: maybe an already processed
                 * event removed an element that fired and we still didn't
                 * processed, so we check if the event is still valid. */
                // 读事件
                if (fe->mask & mask & AE_READABLE) {
                    // rfired 确保读/写事件只能执行其中一个
                    rfired = 1;
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                }
                // 写事件
                if (fe->mask & mask & AE_WRITABLE) {
                    if (!rfired || fe->wfileProc != fe->rfileProc)
                        fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                }
    
                processed++;
            }
        ....
    }
    

    可以看出函数aeProcessEvents调用了aeApiPoll获取已经就绪的事件。在for循环中,从eventLoop->fired(已经就绪的事件)中取出事件结构体,然后根据是读时间还是写事件进行处理。在aeApiPoll中,就可以看到我们熟悉的
    epoll_wait的身影。可以见得通过调用系统的epoll_wait函数,然后将已经就绪的事件放入 eventLoop->fired中。

    /*
     * 获取可执行事件
     */
    static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
        aeApiState *state = eventLoop->apidata;
        int retval, numevents = 0;
    
        // 等待时间
        retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
                tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    
        // 有至少一个事件就绪?
        if (retval > 0) {
            int j;
    
            // 为已就绪事件设置相应的模式
            // 并加入到 eventLoop 的 fired 数组中
            numevents = retval;
            for (j = 0; j < numevents; j++) {
                int mask = 0;
                struct epoll_event *e = state->events+j;
    
                if (e->events & EPOLLIN) mask |= AE_READABLE;
                if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
                if (e->events & EPOLLERR) mask |= AE_WRITABLE;
                if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
    
                eventLoop->fired[j].fd = e->data.fd;
                eventLoop->fired[j].mask = mask;
            }
        }
    
        // 返回已就绪事件个数
        return numevents;
    }
    
    

    到这里还有一个疑问,在redis初始化的时候只注册了tcp和本地套接字的描述符,那么当有个新的客户端连接进来的时候,是怎么将客户端的描述符加到监听队列里面的呢?答案就在最开始的acceptTcpHandler函数中。在这个函数中依次调用了acceptCommonHandler->createClient->aeCreateFileEvent函数。可以见得当监听的一个tcp或者本地socket产生了connect 事件的时候,就会依次调用这些函数,然后将新的客户端端描述符加入监听中。

    总结

    redis的事件驱动模型分析就到这里,总体而言还是比较直观的。这中间也学习了很多,包括epoll的原理等。

  • 相关阅读:
    Android中各级目录的作用
    轻量级java开发(一)-Hibernate 安装
    Eclipse 安装插件
    Eclipse超级完美汉化教程
    JAVA中extends 与implements区别
    Java基础语法总结
    C#笔试题面试题锦集(全)总20篇
    Nginx集群
    Redis 集群方案
    MS Sql Server 中主从库的配置和使用介绍
  • 原文地址:https://www.cnblogs.com/bush2582/p/8968997.html
Copyright © 2011-2022 走看看