zoukankan      html  css  js  c++  java
  • libevent 入门教程:Echo Server based on libevent(转)

    下面假定已经学习过基本的socket编程(socket, bind, listen, accept, connect, recv, send, close),并且对异步/callback有基本的认识。

    基本的socket编程是阻塞/同步的,每个操作除非已经完成或者出错才会返回,这样对于每一个请求,要使用一个线程或者单独的进程去处理,系统资源没法支撑大量的请求。Posix定义了可以使用异步的select系统调用,但是因为其采用了轮询的方式来判断某个fd是否变成active,效率不高o(n)。于是各系统提出了基于异步/callback的系统调用,例如linux的epoll,BSD的kqueue,windows的IOCP。由于在内核层面做了支持,所以可以再o(1)的效率查找到active的fd。基本上,libevent就是对这些高效IO的封装,提供统一个API,简化开发。

    libevent大概是这样的:

      默认情况下是单线程的(可以配置成多线程),每个线程有且只有一个event_base,对应一个struct event_base结构体(以及附于其上的事件管理器),用来schedule托管给它的一系列event,可以和操作系统的进程管理器类比,当然,更加简单些。当一个事件发生后,event_base会在合适的时间(不一定是立即)去调用绑定在这个事件上的函数(传入一些预定义的参数,以及绑定时指定的一个参数),知道这个函数执行完,再返回schedule其他事件。

    //创建一个event_base
    struct event_base *base = event_base_new();
    assert(base != NULL);

    event_base内部有一个循环,循环阻塞在epoll/kqueue等系统调用上,知道有一个/一些事件发生,然后去处理这些事件。当然,这些事件要被绑定到event_base上。每个事件对应一个struct event,可以是监听一个fd或者Posix信号量之类的。struct event使用event_new来创建和绑定,使用event_add来启用。

    //创建并绑定一个event
    struct event *listen_event;
    //参数:event_base, 监听的fd,事件类型及属性,绑定的回调函数,给回调函数的参数
    listen_event = event_new(base, listener, EV_READ|EV_PERSIST, callback_func, (void*)base);
    //参数:event,超时时间(struct timeval *类型的,NULL表示无超时设置)
    event_add(listen_event, NULL);

    注:libevent支持的事件及属性包括(使用bitfield实现,所以用|来让他们合体)

    (a) EV_TIMEOUT:超时

    (b) EV_READ: 只要网络缓冲中还有数据,回调函数就会被触发

    (c) EV_WRITE: 只要塞给网络缓冲的数据被写完,回调函数就会被触发

    (d) EV_SIGNAL: POSIX信号量,参考manual吧

    (f) EV_ET: Edge-Trigger边缘触发,参考EPOLL_ET

    然后需要启动event_base的循环,这样才能开始处理发生的事件。循环的启动使用event_base_dispatch,循环将一直持续,知道不再有需要关注的事件,或者遇到event_loopbreak()/event_loopexit()函数。

    //启动事件循环
    event_base_dispatch(base);

    接下来关注下绑定event的回调函数callback_func:传递给它的是一个socket fd, 一个event类型及属性bit_field,以及传递给event_new的最后一个参数(去上面几行回顾一下,把event_base给传进来了,实际上更多地是分配一个结构体,把相关的数据都放进去,最后丢给event_new,在这里就能取到了)。其原型是:

    typedef void(* event_callback_fn(evutil_socket_t sockfd, short event_type, void *arg))

    对于一个服务器而言,上面的流程大概是这样组合的:

    1. listener = socket(), bind(), listen(), 设置nonblocking(Posix系统可使用fcntl设置,windows不需要设置,实际上libevent提供了统一的包装evutil_make_socket_nnblocking)

    2. 创建一个event_base

    3. 创建一个event, 将该socket托管给event_base,指定要监听的事件类型,并绑定上相应的回调函数(及需要给它的参数)。对于listener socket来说,只需要监听EV_READ|EV_PERSIST

    4. 启动该事件

    5. 进入事件循环

    6. (异步)当有client发起请求的时候,调用该回调函数,进行处理。

    问题:为什么不在listen完马上调用accept,获得客户端连接以后再丢给event_base呢?这个问题不知道YFS就这么干的,不过这个要另起一个线程。

    回调函数要做什么事情呢?当然是处理client的请求了。首先要accept,获得一个可以与client通信的sockfd,然后……调用recv/send吗?错!大错特错!如果直接调用recv/send的话,这个线程就阻塞在这个地方了,如果这个客户端非常的阴险(比如一直不发消息,或者网络不好,老是丢包),libevent就只能等它,没法处理其他的请求了——所以应该创建一个新的event来托管这个sockfd。

    在老版本libevent上的实现,比较罗嗦[如果不想详细了解的话,看下一部分]。

    对于服务器希望先从client获取数据的情况,大致流程是这样的:

    1. 将这个sockfd设置为nonblocking

    2. 创建2个event:

      event_read, 绑上sockfd的EV_READ|EV_PERSIST,设置回调函数和参数

      event_write, 绑上sockfd的EV_WRITE|EV_PERSIST, 设置回调函数和参数

    3. 启动event_read事件

    ----

    4. (异步)等待event_read事件的发生,调用相应的回调函数。这里麻烦来了:回调函数recv读入的数据,不能直接用send丢给sockfd了事,因为sockfd是nonblocking的,丢给他的话,不能保证正确(因为异步。。。)。所以需要一个自己管理的缓存用来保存读入的数据中(在accept以后就创建一个struct,作为第2步回调函数的arg传进来),在合适的事件(比如遇到换行符)启用event_write事件(event_add(event_write, NULL)),等待EV_WRITE事件的触发。

    5. (异步)当event_write事件的回调函数被调用时,往sockfd写入数据,然后删除event_write事件(event_del(event_write)),等待event_read事件的下一次执行。

    一个例子可以参考官方文档里面的[Example: A low-level ROT13 server with Libevent]

    由于需要自己管理缓冲区,且过程晦涩难懂,并且不兼容于Windows的IOCP,所以libevent2开始,提供了bufferevent这个神器,用来提供更加优雅、易用的API。struct bufferevent内建了两个event(read/write)和对应的缓冲区[struct evbuffer *input, *output],并提供相应的函数用来操作缓冲区(或者直接操作bufferevent)。每当有数据被读入input的时候,read_cb函数被调用;每当output被输出完的时候,write_cb被调用;在网络IO操作出现错误的情况(连接中断、超时、其他错误),error_cb被调用。于是上一部分的步骤被简化为:

    1. 设置sockfd为nonblocking

    2. 使用bufferevent_socket_new创建一个struct bufferevent *bev,关联该sockfd,托管给event_base

    3. 使用bufferevent_setcb(bev, read_cb, write_cb, error_cb, (void *)arg)将EV_READ/EV_WRITE对应的函数

    4. 使用bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST)来启用read/write事件

    ----

    5. (异步)

    在read_cb里面从input读取数据,处理完毕后塞到output里(会被自动写入到sockfd)

    在write_cb里面(需要做什么吗?对于一个echo server来说,read_cb就足够了)

    在error_cb里面处理遇到的错误

    可以使用bufferevent_set_timeouts(bev, struct timeval *READ, struct timeval *WRITE)来设置读写超时, 在error_cb里面处理超时

    read_cb和write_cb的原型是

    void read_or_write_callback(struct bufferevent *bev, void *arg)

    error_cb的原型是

    void error_cb(struct bufferevent *bev, short error, void *arg) //这个是event的标准回调函数原型

    可以从bev中用libevent的API提取出event_base、sockfd、input/output等相关数据

    void read_cb(struct bufferevent *bev, void *arg) {
        char line[256];
        int n;
        evutil_socket_t fd = bufferevent_getfd(bev);
        while (n = bufferevent_read(bev, line, 256), n > 0)
            bufferevent_write(bev, line, n);
    }
    
    void error_cb(struct bufferevent *bev, short event, void *arg) {
        bufferevent_free(bev);
    }
    #include <stdio.h>
    #include <stdlib.h>
    #include <errno.h>
    #include <assert.h>
    
    #include <event2/event.h>
    #include <event2/bufferevent.h>
    
    #define LISTEN_PORT 9999
    #define LISTEN_BACKLOG 32
    
    void do_accept(evutil_socket_t listener, short event, void *arg);
    void read_cb(struct bufferevent *bev, void *arg);
    void error_cb(struct bufferevent *bev, short event, void *arg);
    void write_cb(struct bufferevent *bev, void *arg);
    
    int main(int argc, char *argv[])
    {
        int ret;
        evutil_socket_t listener;
        listener = socket(AF_INET, SOCK_STREAM, 0);
        assert(listener > 0);
        evutil_make_listen_socket_reuseable(listener);
    
        struct sockaddr_in sin;
        sin.sin_family = AF_INET;
        sin.sin_addr.s_addr = 0;
        sin.sin_port = htons(LISTEN_PORT);
    
        if (bind(listener, (struct sockaddr *)&sin, sizeof(sin)) < 0) {
            perror("bind");
            return 1;
        }
    
        if (listen(listener, LISTEN_BACKLOG) < 0) {
            perror("listen");
            return 1;
        }
    
        printf ("Listening...
    ");
    
        evutil_make_socket_nonblocking(listener);
    
        struct event_base *base = event_base_new();
        assert(base != NULL);
        struct event *listen_event;
        listen_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
        event_add(listen_event, NULL);
        event_base_dispatch(base);
    
        printf("The End.");
        return 0;
    }
    
    void do_accept(evutil_socket_t listener, short event, void *arg)
    {
        struct event_base *base = (struct event_base *)arg;
        evutil_socket_t fd;
        struct sockaddr_in sin;
        socklen_t slen = sizeof(sin);
        fd = accept(listener, (struct sockaddr *)&sin, &slen);
        if (fd < 0) {
            perror("accept");
            return;
        }
        if (fd > FD_SETSIZE) { //这个if是参考了那个ROT13的例子,貌似是官方的疏漏,从select-based例子里抄过来忘了改
            perror("fd > FD_SETSIZE
    ");
            return;
        }
    
        printf("ACCEPT: fd = %u
    ", fd);
    
        struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
        bufferevent_setcb(bev, read_cb, NULL, error_cb, arg);
        bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST);
    }
    
    void read_cb(struct bufferevent *bev, void *arg)
    {
    #define MAX_LINE    256
        char line[MAX_LINE+1];
        int n;
        evutil_socket_t fd = bufferevent_getfd(bev);
    
        while (n = bufferevent_read(bev, line, MAX_LINE), n > 0) {
            line[n] = '';
            printf("fd=%u, read line: %s
    ", fd, line);
    
            bufferevent_write(bev, line, n);
        }
    }
    
    void write_cb(struct bufferevent *bev, void *arg) {}
    
    void error_cb(struct bufferevent *bev, short event, void *arg)
    {
        evutil_socket_t fd = bufferevent_getfd(bev);
        printf("fd = %u, ", fd);
        if (event & BEV_EVENT_TIMEOUT) {
            printf("Timed out
    "); //if bufferevent_set_timeouts() called
        }
        else if (event & BEV_EVENT_EOF) {
            printf("connection closed
    ");
        }
        else if (event & BEV_EVENT_ERROR) {
            printf("some other error
    ");
        }
        bufferevent_free(bev);
    }
  • 相关阅读:
    Nodejs学习笔记
    Multiple SSH keys for different github accounts
    深入Node.js的模块机制
    Yoga S5
    Nodejs
    gulp & webpack整合
    git subtree:无缝管理通用子项目
    javascript功能插件大集合,写前端的亲们记得收藏
    Python简单的制作图片验证码
    用CSS3/JS绘制自己想要的按钮
  • 原文地址:https://www.cnblogs.com/xinsheng/p/3880567.html
Copyright © 2011-2022 走看看