zoukankan      html  css  js  c++  java
  • EDA风格与Reactor模式

    本文将探讨如下几个问题:

    • Event-Driven架构风格的约束
    • EDA风格对架构属性的影响
    • Reactor架构模式
    • Reactor所解决的问题
    • redis中的EventDriven

    从观察者模式到EDA风格

    GOF的23种设计模式中,有一个观察者模式!个人觉得叫「监听模式」更合理!

    定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时, 所有依赖于它的对象都得到通知并被自动更新。

    它和EDA风格有些类似!不过一个应用在代码关系层面;一个应用在架构上!

    在EDA中,有三个必要组件:

    • 事件生产组件:对应到观察者模式中,就是引起Subject状态变化的对象。它引起Subject状态变化的动作就是事件。
    • 事件队列:对应到观察者模式中,就是Subject对象。事件生产者产生的事件,会到事件队列中。
    • 事件处理组件:处理事件生产组件所产生的事件。对应到观察者模式中,就是Observer对象。

    EDA风格的约束

    • 「事件生产组件」产生事件,将其添加到「事件队列」中
    • 「事件处理组件」监听「事件队列」
    • 当「事件队列」中有自己能处理的事件时,「事件处理组件」将对该事件进行处理
    • 「事件处理组件」处理完事件后,可以将处理结果返回给「事件生产组件」,也可以不返回
    • 「事件生产组件」可以接收「事件处理组件」处理结果,也可以不接收

    EDA风格可以细分为Mediator结构和Broker结构!

    • Mediator结构通过一个Mediator组件协调多个步骤间的关系和执行顺序
    • Broker结构则是通过松散的方式来组织多个步骤之间的关系和执行顺序

    Mediator结构如下:

    在Mediator结构中主要有四个组件:

    • 事件队列(event queue)
    • 事件中介(event mediator)
    • 事件通道(event channel)
    • 事件处理器(event processor)。

    执行流程如下:

    • 「事件生产组件」产生事件,将其添加到「事件队列」中
    • 「事件中介」监听「事件队列」
    • 当「事件队列」中有事件时,「事件中介」根据具体的事件,将其拆分/组合为一步步的具体步骤
    • 将具体的步骤添加到对应的「事件通道」中
    • 「事件处理器」从「事件通道」中获取到事件,进行处理

    Broker结构如下:

    Broker结构中主要包括两个组件:

    • Broker:Broker可被集中或相互关联在一起使用,此外,Broker中还可以包含所有事件流中使用的事件通道。
    • 事件处理器

    执行流程如下:

    • 「事件生产组件」产生事件,将其添加到「Broker」中
    • 「事件处理组件」监听「Broker」
    • 当「Broker」中有自己能处理的事件时,「事件处理组件」将对该事件进行处理
    • 「事件处理组件」处理完事件后,发送后续事件到「Broker」中
    • 其它「事件处理组件」接收到自己能处理的事件后,对该事件进行处理,处理完后,可能继续发送事件到「Broker」中
    • 当不再有后续事件时,所有步骤完成

    EDA风格对架构属性的影响

    • 性能:由于EDA是个异步架构,对于不需要返回值的请求,它能明显的提高用户可感知的性能。
    • 扩展性:事件生产者和事件消费者松耦合,可以独立进化,可以方便的进行功能扩展。同时由于,可以方便的添加事件消费者,故而进一步提高了扩展性。
    • 伸缩性:事件生产者和事件消费者松耦合及可以方便的添加事件消费者,使得EDA有很好的伸缩性
    • 可维护性:事件生产者和事件消费者松耦合,且和一般的调用方式相比,在理解上有一定的难度,使得开发难度增加,但单元测试较方便。而由于EDA的异步性,使得集成测试比较麻烦。可维护性一般
    • 可运维性:事件生产者和事件消费者松耦合,可独立部署,可运维性相对较容易
    • 灵活性:高度可扩展,易于伸缩使得EDA有较高的灵活性

    Reactor架构模式

    Reactor架构模式,是EDA风格的一种实现!它是为了处理:

    • 高并发IO请求
    • 其中请求所包含的数据量不大
    • 每个请求处理耗时不长

    Reactor模型,有四个组件:

    • Acceptor:Acceptor接受Client连接。属于EDA「事件队列组件」。
    • Channel:接收IO事件。属于EDA「事件队列组件」。
    • Reactor:监听Channel和Acceptor,当发生连接事件或IO事件时,将其派发给对应的Handler。属于EDA「事件队列组件」。
    • Handler:和一个Client通讯的实体,进行实际的业务处理。对应EDA的「事件处理组件」。

    Client属于EDA的「事件生产组件」!

    Reactor可以分为:单线程模型,多线程模型和主从Reactor模型

    • 单线程模型

    Reactor轮询到消息后,就直接委托给Handler去处理,处理完后,再进行后面的轮询,即一个Handler处理完之后,才能进行下一个Handler的处理!如果某个handler耗时较长,就会阻塞后续的handler的执行!

    • 多线程模型

    Reactor多线程模型就是将Handler中的IO操作和非IO操作分开,操作IO的线程称为IO线程,非IO操作的线程称为工作线程!这样的话,客户端的请求会直接被丢到线程池中,客户端发送请求就不会堵塞!

    但是当用户进一步增加的时候,Reactor会出现瓶颈!因为Reactor既要处理IO操作请求,又要响应连接请求!为了分担Reactor的负担,所以引入了主从Reactor模型!

    • 主从Reactor模型

    主Reactor用于响应连接请求,从Reactor用于处理IO操作请求!

    详细内容可参考之前的博文《高性能Server---Reactor模型》

    这里简单说下,为什么Reactor中,请求的数据量不能太大,请求处理时间为什么不能太长!

    其实很好理解,如果请求数据量太大、处理时间很长!即使是主从Reactor模型,线程池也会被耗尽!耗尽后,导致后续的请求积压。

    Reactor解决的是传统BIO模型下,Server并发处理请求的能力受限于系统所能创建的线程数的问题!

    redis中的EventDriven

    redis使用了「Event-driven programming」来处理指令,「Event-driven programming」是一种编程范式,也可以说是EDA的代码实现。redis中的实现,类似Reactor单线程模型!

    事件驱动程序设计是一种计算机程序设计模型。与传统上一次等待一个完整的指令然后再做运行的方式不同,事件驱动程序模型下的系统,基本上的架构是预先设计一个事件循环所形成的程序,这个事件循环程序不断地检查目前要处理的信息,根据要处理的信息运行一个触发函数进行必要的处理。其中这个外部信息可能来自一个目录夹中的文件,可能来自键盘或鼠标的动作,或者是一个时间事件。

    简单的看一下代码:

    // server.c
    
    int main(int argc, char **argv) {
        ......
        aeMain(server.el);  // 进入事件监听轮询,轮询注册上来的文件句柄
        ......
    }
    
    // ae.c
    
    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;          // eventLoop相当于Reactor模式中的Reactor组件
        while (!eventLoop->stop) {
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);
            aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
        }
    }
    
    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
        ......
            
        numevents = aeApiPoll(eventLoop, tvp);// select,epoll,evport,kqueue底层实现,获取事件
    
        ......
        
        // 将事件委托给具体的Handler去处理
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; 
    
            int invert = fe->mask & AE_BARRIER;
    
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }
    
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
    
            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
    
            processed++;
        }
        ......
    }
    
    // ae.h
    // aeFileEvent的结构如下,在构建时,就将处理程序赋给了其中的aeFileProc
    // 在上面的轮询中,直接取出来执行即可
    typedef struct aeFileEvent {
        int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
        aeFileProc *rfileProc;
        aeFileProc *wfileProc;
        void *clientData;
    } aeFileEvent;
    

    下面以我们启动redis-server和redis-cli这个流程,来简单的说明下redis的执行流程:

    //redis-server启动后,进入轮询状态,等待事件
    //启动时构建了一个tcpHandler,用于处理客户端连接
    
    // server.c
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
    
    //redis-cli启动后,eventLoop轮询到连接事件,触发tcpHandler,根据获取到的文件句柄,构建aeFileEvent,同时设置readQueryFromClient Handler,用于处理后续的客户端的指令
    client *createClient(int fd) {
        ......
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
        ......
    }
    
    // 客户端发送指令后,触发readQueryFromClient Handler,来处理指令
    // 处理完后,等待下一个指令
    
    

    参考资料

  • 相关阅读:
    IOC.AOP
    struts的工作原理
    信息的增删查改
    java基础面试题
    用户登录的增,删,查,改,各个例子
    配置测试类
    数据库连接代码 (javaBean)
    phonegap开发环境搭建
    2014以及未来几年编程语言趋势
    6-集群环境搭建
  • 原文地址:https://www.cnblogs.com/ivaneye/p/10129896.html
Copyright © 2011-2022 走看看