zoukankan      html  css  js  c++  java
  • Reactor事件模型在Redis中的应用

    1 模型简介  

    Redis没有使用第三方的libevent等网络库,而是自己开发了一个单线程的Reactor模型的事件处理模型。而Memcached内部使用的libevent库,多线程模型。

    综合对比可见:nginx,memcached,redis网络模型总结

    Redis在主循环中统一处理文件事件和时间事件,信号事件则由专门的handler来处理。

    文件事件,我理解为IO事件,Redis将产生事件套接字放入一个就绪队列中,即redisServer.aeEventLoop.fired数组,然后在aeProcessEvents会依次分派给文件事件处理器;

    Redis编写了多个文件事件处理器。

    Redis中文件事件包括:客户端的连接、命令请求、数据回复、连接断开,当上述事件发生时,会造成相应的描述符可读可写,再调用相应类型的文件事件处理器。

    文件事件处理器有:

    • 连接应答处理器 networking.c/acceptTcpHandler
    • 命令请求处理器 networking.c/readQueryFromClinet
    • 命令回复处理器 networking.c/sendReplyToClient

    时间事件包含定时事件周期性事件,Redis将其放入一个无序链表中,每当时间事件执行器运行时,就遍历链表,查找已经到达的时间事件,调用相应的处理器。

    (1) 主循环

    def ae_Main():
        #一直循环处理事件
        while(not_stop){
            aeProcessEvents()
        }

    (2)aeProcessEvents调度文件事件和时间事件的过程:

    def aeProcessEvents():
        time_event = aeSearchNearestTimer() #获取当前时间最近的时间事件
        remaind_ms = time_event.when - unix_ts_now() #获取最近的时间事件达到的毫秒时间
        if remaind_ms < 0 : #时间为负数,赋值0
            remaind_ms = 0
        timeval = create_timeval_with_ms(remainds_ms) #创建等待的时间结构
        aeApiPoll(timeval) #等待文件事件产生,时间取决于remainds_ms
        processFileEvent() #处理文件事件
        processTimeEvent() #处理时间事件

    2 Reactor事件模型在Redis中的应用

      下面主要结合文件事件的处理过程讲解Reactor事件模型在Redis中的应用。其中,Reactor事件模型框图如下所示:

       

    2.1 Initiation Dispatcher在Redis中的实现

    (1) handle_events()

    在Redis中,对于文件事件,相应的处理函数为Ae.c/aeProcessEvents,其关键处理流程如下:

    (1)底层调用接口返回,将就绪事件拷贝到eventLoop->fired数组;

    (2)遍历就绪数组,获取相关fd,进而获取fd对应的aeFileEvent : eventLoop->events[fd],从而得到相关回调函数;

    int aeProcessEvents(aeEventLoop *eventLoop, int flags){
         ....省略
            // 获取就绪文件事件,阻塞时间由最近的时间事件决定
            numevents = aeApiPoll(eventLoop, tvp);
            for (j = 0; j < numevents; j++) {
                // 从已就绪数组中获取包装后的文件事件aeFileEvent
                aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
                // 获取文件事件的详细参数:fd, mask
                int mask = eventLoop->fired[j].mask;
                int fd = eventLoop->fired[j].fd;
                int rfired = 0;
    
                // 处理读事件,调用相关回调函数
                if (fe->mask & mask & AE_READABLE) {
                    // rfired 确保读/写事件只能执行其中一个
                    rfired = 1;
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                }
                // 处理写事件
                if (fe->mask & mask & AE_WRITABLE) {
                    if (!rfired || fe->wfileProc != fe->rfileProc)
                        fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                }
                processed++;
            }
        }
        // 处理时间事件
        if (flags & AE_TIME_EVENTS)
            processed += processTimeEvents(eventLoop);
    }

    (2)register_handler/remove_handler 事件处理器的注册与删除等

    在Redis中,相关的处理函数也在Ae.c文件中:

    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, eFileProc *proc, void *clientData); //创建文件事件(fd:mask),相关的回掉函数为eFileProc
    void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); //将 fd 从 mask 指定的监听队列中删除
    int aeGetFileEvents(aeEventLoop *eventLoop, int fd); //获取fd被监控的事件mask
    long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc); int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);

    2.2 Synchronous Event Demultiplexer在Redis中的实现

    针对IO复用方法,比如select,poll,epoll,kqueue等,每种方法的效率和使用方法都不相同,Redis通过统一包装各方法,来屏蔽它们的不同之处。

    (1) IO复用跨平台

    首先,Redis会根据平台,自动选择性能最好的IO复用函数库。该过程提现在Ae.c头文件包含中,如下:

    #ifdef HAVE_EVPORT
    #include "ae_evport.c" //evport优先级最高
    #else
        #ifdef HAVE_EPOLL
        #include "ae_epoll.c" //epoll优先级较次
        #else
            #ifdef HAVE_KQUEUE
            #include "ae_kqueue.c" //kqueue优先级还次
            #else
            #include "ae_select.c" //select优先级最低
            #endif
        #endif
    #endif

    (2) 统一事件接口

    ae_select.cae_epoll.cae_kqueue.cae_evport.c都提供一套统一的事件注册、删除接口,使得在ae.c中可以直接使用以下接口,其中针对epoll的包装实现如下:

    /* 事件状态*/
    typedef struct aeApiState {
        int epfd;  //epoll_event 实例描述符
        struct epoll_event *events; // 事件槽,存储返回的就绪事件,大小为eventLoop->setsize
    } aeApiState;
    
    static int aeApiCreate(aeEventLoop *eventLoop)  //创建aeApiState实例,并赋值于eventLoop->apidata
    static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) //增加关注的事件 static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) //删除关注的事件 static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) //等待事件就绪返回,并存储于eventLoop->fired数组 static char *aeApiName(void) //获取底层调用的IO复用接口,如epoll

    2.3 Concrete Event Handler

    文件事件相关的一些具体的事件处理器如下:

    连接请求acceptTcpHandler:在 redis.c/initServer中,程序会为redisServer.eventLoop关联一个客户连接的事件处理器。
    命令请求readQueryFromClinet : 当新连接来的时候,需要调用networking.c/createClient创建客户端,在其中为客户端套接字注册读事件,关联处理器readQueryFromClinet。
    命令回复sendReplyToClient : 当Redis调用networking.c/addReply时,会调用prepareClientToWrite来注册写事件,当套接字可写时,触发sendReplyToClient发送命令回复。

    2.4 相关数据结构

    从上面的相关接口可以发现,大多用到了结构体:aeEventLoop, aeFileEvent, aeFiredEvent。 它们之间的关系图如下:

    (1) aeFileEvent

    /* File event structure
     *
     * 文件事件结构
     */
    typedef struct aeFileEvent {
    
        // 监听事件类型掩码,
        // 值可以是 AE_READABLE 或 AE_WRITABLE ,
        // 或者 AE_READABLE | AE_WRITABLE
        int mask; /* one of AE_(READABLE|WRITABLE) */
    
        // 读事件处理器
        aeFileProc *rfileProc;
    
        // 写事件处理器
        aeFileProc *wfileProc;
    
        // 多路复用库的私有数据
        void *clientData;
    
    } aeFileEvent;

    可以发现aeFileEvent中没有fd信息,获取fd对应的aeFileEvent,需要到eventLoop->events[fd]处提取,因为在调用aeCreateFileEvent事件处理器注册函数时,将fd对应的aeFileEvent函数存储于eventLoop->events[fd]处。

    (2)aeFiredEvent

    /* A fired event
     *
     * 已就绪事件
     */
    typedef struct aeFiredEvent {
    
        // 已就绪文件描述符
        int fd;
    
        // 事件类型掩码,
        // 值可以是 AE_READABLE 或 AE_WRITABLE
        // 或者是两者的或
        int mask;
    
    } aeFiredEvent;

    aeFiredEvent刚好包含一个就绪事件的所有有用信息,在aeApiPoll调用底层IO复用函数(如epoll)返回时,会将就绪事件从底层的就绪数组aeApiState.events拷贝到eventLoop->fired就绪数组中;通过aeFiredEvent中的fd可以找到对应的aeFileEvent,进而获取相关的回调函数。

    (3) aeEventLoop

    // 事件处理器的状态
    typedef struct aeEventLoop { // 目前已注册的最大描述符 int maxfd; /* highest file descriptor currently registered */ // 目前已追踪的最大描述符 int setsize; /* max number of file descriptors tracked */ // 用于生成时间事件 id long long timeEventNextId; // 最后一次执行时间事件的时间 time_t lastTime; /* Used to detect system clock skew */ // 已注册的文件事件 aeFileEvent *events; /* Registered events,events数组下标与fd对应 */ // 已就绪的文件事件 aeFiredEvent *fired; /* Fired events */ // 时间事件 aeTimeEvent *timeEventHead; // 事件处理器的开关 int stop; // 多路复用库的私有数据 void *apidata; /* This is used for polling API specific data */ // 在处理事件前要执行的函数 aeBeforeSleepProc *beforesleep;

    该结构的初始化创建过程如下:

    /*
     * 初始化事件处理器状态
     */
    aeEventLoop *aeCreateEventLoop(int setsize) {
        aeEventLoop *eventLoop;
        int i;
        ...
    
        // 初始化文件事件结构和已就绪文件事件结构数组
        eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); //aeFileEvent中没有fd,如何获取fd信息,将fd对应的aeFileEvent存储于eventLoop->events[fd]
        eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); 
        ...
        // 设置数组大小
        eventLoop->setsize = setsize;
        // 初始化执行最近一次执行时间
    
        eventLoop->stop = 0;
        eventLoop->maxfd = -1;
        eventLoop->beforesleep = NULL;
        if (aeApiCreate(eventLoop) == -1) goto err;
    
        /* Events with mask == AE_NONE are not set. So let's initialize the
         * vector with it. */
        // 初始化监听事件
        for (i = 0; i < setsize; i++)
            eventLoop->events[i].mask = AE_NONE;
    
        // 返回事件循环
        return eventLoop;
    }

    2.5 register_handler/remove_handler 事件处理器注册与删除等的具体实现

     (1)aeCreateFileEvent

    该事件处理器注册函数主要涉及到变量eventLoop->events,eventLoop->apidata

    其中,eventLoop->events数组主要用于存储aeFileEvent,包括回调函数,感兴趣的事件掩码mask,clientData等,fd对应的aeFileEvent存储于eventLoop->events[fd]处。(通过aeFileEvent和events数组,便将fd:mask和相关回调函数proc对应起来)

    在调用aeApiAddEvent时,会将fd的指定事件加入底层的IO复用函数中;

    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData)
    {
        if (fd >= eventLoop->setsize) {
            errno = ERANGE;
            return AE_ERR;
        }
    
        if (fd >= eventLoop->setsize) return AE_ERR;
    
        // 取出文件事件结构
        aeFileEvent *fe = &eventLoop->events[fd];
    
        // 监听指定 fd 的指定事件
        if (aeApiAddEvent(eventLoop, fd, mask) == -1)
            return AE_ERR;
    
        // 设置文件事件类型,以及事件的处理器
        fe->mask |= mask;
        if (mask & AE_READABLE) fe->rfileProc = proc;
        if (mask & AE_WRITABLE) fe->wfileProc = proc;
    
        // 私有数据
        fe->clientData = clientData;
    
        // 如果有需要,更新事件处理器的最大 fd
        if (fd > eventLoop->maxfd)
            eventLoop->maxfd = fd;
    
        return AE_OK;
    }

    (2)void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); //删除文件事件

     与aeCreateFileEvent相反,将在fd对应的aeFileEvent中,取消对事件mask的关注;并通过aeApiDelEvent在底层取消对fd相关事件mask的监听。具体代码如下:

    /*
     * 将 fd 从 mask 指定的监听队列中删除
     */
    void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
    {
        if (fd >= eventLoop->setsize) return;
    
        // 取出文件事件结构
        aeFileEvent *fe = &eventLoop->events[fd];
    
        // 未设置监听的事件类型,直接返回
        if (fe->mask == AE_NONE) return;
    
        // 计算新掩码
        fe->mask = fe->mask & (~mask);
        if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
            /* Update the max fd */
            int j;
    
            for (j = eventLoop->maxfd-1; j >= 0; j--)
                if (eventLoop->events[j].mask != AE_NONE) break;
            eventLoop->maxfd = j;
        }
    
        // 取消对给定 fd 的给定事件的监视
        aeApiDelEvent(eventLoop, fd, mask);
    }

     

    3 Redis中事件监听和处理的流程图

    Redis中事件监听和处理的流程如下

    (1) 通过aeApiPoll监听用户感兴趣的事件;

    (2) 当有文件事件发生时返回(此处不考虑时间事件),就绪事件将存储于底层的就绪数组aeApiState.events;

    (3) 将就绪数组拷贝到aeEventLoop的就绪数组aeEventLoop.fired中;

    (4)通过fd,在aeEventLoop的注册文件事件数组中找到aeFileEvent -- eventLoop->events[fd],最后调用相关回调函数,完成事件处理。

    参考:

    redis中事件模型实现分析

    事件库之Redis自己的事件模型-ae

  • 相关阅读:
    mybatis的mapper文件中的一个标签是否可以写多条SQL语句?是否存在事物?
    Filebeat快速入门
    JVM第一弹
    eclipse git 解决冲突 解决 mergetool 不能使用问题
    使用gitee作为图床 编写markdown自动上传文件
    JVM面试题 第一弹
    学习了JsonSchema,我自定义了一个校验代码
    JsonSchema用法
    Docker简介与安装
    Docker第二弹之常用命令
  • 原文地址:https://www.cnblogs.com/harvyxu/p/7499396.html
Copyright © 2011-2022 走看看