zoukankan      html  css  js  c++  java
  • libevent中的bufferevent原理

           以前的文章看过缓冲区buffer了,libevent用bufferevent来负责管理缓冲区与buffer读写事件。
           今天就带大家看下evbuffer.c,使用bufferevent处理事件的数据,是buffer和event的综合。在最后用一个稍微综合的例子看下使用bufferevent的整个流程。
      首先依旧看下bufferevent的结构。结构很清晰。源码版本1.4.14。

      

     1 struct bufferevent {
     2     struct event_base *ev_base;
     3 
     4     //读事件
     5     struct event ev_read;
     6     //写事件
     7     struct event ev_write;
     8     //读缓冲区,输入缓冲
     9     struct evbuffer *input;
    10     //写缓冲区,输出缓冲
    11     struct evbuffer *output;
    12 
    13     //读水位
    14     struct event_watermark wm_read;
    15     //写水位
    16     struct event_watermark wm_write;
    17 
    18     //发生读触发用户设置的回调
    19     evbuffercb readcb;
    20     //发生写触发用户设置的回调
    21     evbuffercb writecb;
    22     //发生错误触发用户设置的回调
    23     everrorcb errorcb;
    24     //当前设置的回调函数传递的参数,和上面3个回调配合使用
    25     void *cbarg;
    26 
    27     //设置读超时时间,默认为0
    28     int timeout_read;    /* in seconds */
    29     //设置写超时时间,默认为0
    30     int timeout_write;    /* in seconds */
    31 
    32     //当前事件是否可用
    33     short enabled;    /* events that are currently enabled */
    34 };
    35 //水位
    36 struct event_watermark {
    37     //低水位
    38     size_t low;
    39     //高水位
    40     size_t high;
    41 };

    evbuffer中有2个缓冲区,一个是读缓冲区,一个写缓冲区。分别用来处理读写事件的数据。
    evbuffer中有读水位和写水位,分别对应了读缓冲区和写缓冲区。
    里面有个水位的概念。其实很好理解。水位有一个高水位,一个低水位。
    如果水位达到高水位时,不能再往里面灌水了。如果水位达到低水位,不能再从中取水了。

     

    读操作发生时:如果高于高水位,那就不能再读入数据了,等待数据被读掉然后再开始读入数据。低水位只做判断。低水位不为0,如果缓冲区低于低水位,可以继续直接读数据到缓冲区。
    写操作发生时:如果写缓冲区数据长度小于等于低水位,触发用户写事件,通知用户。写数据高水位没用。因为写数据是把缓冲区的数据读出写到对应的文件描述符中,所以水位肯定是下降的。
    我的理解:水位控制了信息的颗粒度,多少数据触发次用户事件。数据缓冲区降低了频繁申请内存带来的开销。

    接着我们来看evbuffer.c中最重要的几个函数

    1.bufferevent_new

     进行一些初始化。最重要的是指定了eventbuffer内部读写事件的回调,bufferevent_readcb与bufferevent_writecb。当前也可以通过后面的bufferevent_setcb实现。

     1 struct bufferevent *
     2 bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
     3     everrorcb errorcb, void *cbarg)
     4 {
     5     struct bufferevent *bufev;
     6 
     7     //申请内存空间并且初始化,使用calloc
     8     if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
     9         return (NULL);
    10 
    11     if ((bufev->input = evbuffer_new()) == NULL) {
    12         free(bufev);
    13         return (NULL);
    14     }
    15 
    16     if ((bufev->output = evbuffer_new()) == NULL) {
    17         evbuffer_free(bufev->input);
    18         free(bufev);
    19         return (NULL);
    20     }
    21     //读事件关联回调,传递参数
    22     event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
    23 
    24     //写事件关联回调,传递参数
    25     event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
    26 
    27     //设置bufferevent的读、写和出错事件回调,并且传递cbarg参数。
    28     bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg);
    29 
    30     /*
    31      * Set to EV_WRITE so that using bufferevent_write is going to
    32      * trigger a callback.  Reading needs to be explicitly enabled
    33      * because otherwise no data will be available.
    34      */
    35     //开启可写,否则无法执行写入回调
    36     bufev->enabled = EV_WRITE;
    37 
    38     return (bufev);
    39 }

    2.bufferevent_readcb

    读事件,最先接触到数据,读出数据然后写入缓冲区

    首先看下bufferevent_readcb的流程图

     1 //读事件,最先接触到数据,读出数据然后写入缓冲区
     2 static void
     3 bufferevent_readcb(int fd, short event, void *arg)
     4 {
     5     struct bufferevent *bufev = arg;
     6     int res = 0;
     7     short what = EVBUFFER_READ;
     8     size_t len;
     9     int howmuch = -1;
    10     //超时事件,报错
    11     if (event == EV_TIMEOUT) {
    12         what |= EVBUFFER_TIMEOUT;
    13         goto error;
    14     }
    15 
    16     /*
    17      * If we have a high watermark configured then we don't want to
    18      * read more data than would make us reach the watermark.
    19      */
    20     //查看高水位,如果缓冲区数据已经高于高水位,不应该再写入。
    21     if (bufev->wm_read.high != 0) {
    22         howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input);
    23         /* we might have lowered the watermark, stop reading */
    24         if (howmuch <= 0) {
    25             struct evbuffer *buf = bufev->input;
    26             //达到高水位,删除读入事件,不再读入数据到缓冲区
    27             event_del(&bufev->ev_read);
    28             //设置bufev->input变化需要调用的回调函数和回调参数
    29             evbuffer_setcb(buf,
    30                 bufferevent_read_pressure_cb, bufev);
    31             return;
    32         }
    33     }
    34     //没达到高水位,读取数据到input缓冲区中
    35     res = evbuffer_read(bufev->input, fd, howmuch);
    36     if (res == -1) {
    37         //信号中断等一些原因,goto reschedule,可以继续。
    38         if (errno == EAGAIN || errno == EINTR)
    39             goto reschedule;
    40         /* error case */
    41         what |= EVBUFFER_ERROR;
    42     } else if (res == 0) {
    43         /* eof case */
    44         what |= EVBUFFER_EOF;
    45     }
    46 
    47     if (res <= 0)
    48         goto error;
    49     //读事件加入事件队列
    50     bufferevent_add(&bufev->ev_read, bufev->timeout_read);
    51 
    52     /* See if this callbacks meets the water marks */
    53     len = EVBUFFER_LENGTH(bufev->input);
    54     if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
    55         return;
    56     //如果高水位不为0,并且缓冲区数据长度已经不小于高水位了,触发事件。
    57     if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
    58         //缓冲区数据已经不小于高水位,不能再进数据了,删除读缓冲区的读外部数据事件
    59         struct evbuffer *buf = bufev->input;
    60         event_del(&bufev->ev_read);
    61 
    62         /* Now schedule a callback for us when the buffer changes */
    63         //缓冲区大小发生变化,触发回调
    64         //设置回调函数和回调参数
    65         evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
    66     }
    67 
    68     /* Invoke the user callback - must always be called last */
    69     //触发用户回调事件
    70     if (bufev->readcb != NULL)
    71         (*bufev->readcb)(bufev, bufev->cbarg);
    72     return;
    73 
    74 reschedule:
    75     //读事件加入事件队列,继续进行读取
    76     bufferevent_add(&bufev->ev_read, bufev->timeout_read);
    77     return;
    78 
    79  error:
    80     (*bufev->errorcb)(bufev, what, bufev->cbarg);
    81 }

     3.bufferevent_writecb

    写事件

     1 static void
     2 bufferevent_writecb(int fd, short event, void *arg)
     3 {
     4     //事件缓冲区管理
     5     struct bufferevent *bufev = arg;
     6     int res = 0;
     7     short what = EVBUFFER_WRITE;
     8 
     9     //超时事件,报错
    10     if (event == EV_TIMEOUT) {
    11         what |= EVBUFFER_TIMEOUT;
    12         goto error;
    13     }
    14 
    15     if (EVBUFFER_LENGTH(bufev->output)) {
    16         //将缓冲区数据读出,写入到fd文件描述符对应的文件中
    17         res = evbuffer_write(bufev->output, fd);
    18         if (res == -1) {
    19 #ifndef WIN32
    20 /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
    21  *set errno. thus this error checking is not portable*/
    22             if (errno == EAGAIN ||
    23             errno == EINTR ||
    24             errno == EINPROGRESS)
    25                 goto reschedule;
    26             /* error case */
    27             what |= EVBUFFER_ERROR;
    28 
    29 #else
    30                 goto reschedule;
    31 #endif
    32 
    33         } else if (res == 0) {
    34             /* eof case */
    35             what |= EVBUFFER_EOF;
    36         }
    37         if (res <= 0)
    38             goto error;
    39     }
    40     //缓冲区不为0,写事件加入执行队列
    41     if (EVBUFFER_LENGTH(bufev->output) != 0)
    42         bufferevent_add(&bufev->ev_write, bufev->timeout_write);
    43 
    44     /*
    45      * Invoke the user callback if our buffer is drained or below the
    46      * low watermark.
    47      */
    48     //缓冲区数据长度低于低水位,用户写事件触发。
    49     if (bufev->writecb != NULL &&
    50         EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
    51         (*bufev->writecb)(bufev, bufev->cbarg);
    52     return;
    53 
    54  reschedule:
    55     if (EVBUFFER_LENGTH(bufev->output) != 0)
    56         bufferevent_add(&bufev->ev_write, bufev->timeout_write);
    57     return;
    58 
    59  error:
    60     (*bufev->errorcb)(bufev, what, bufev->cbarg);
    61 }

     示例

    下面看一个改造过的服务器和客户端的例子。(当前你可以直接使用test中的regress.c例子,我这边因为libevent本来就是用来解决网络问题的,所以自己就用了这个例子)

    server.c

    我的编译命令:gcc -g -Wall -I/usr/local/include -o server server.c -L/usr/local/lib -levent

    服务端监听所有socket。端口5555。这里我们为了演示:evbuffer读缓冲区对应水位设置为高水位10,低水位0。

      1 /*
      2 * libevent echo server example using buffered events.
      3 */
      4 
      5 #include <sys/types.h>
      6 #include <sys/socket.h>
      7 #include <netinet/in.h>
      8 #include <arpa/inet.h>
      9 
     10 /* Required by event.h. */
     11 #include <sys/time.h>
     12 
     13 #include <stdlib.h>
     14 #include <stdio.h>
     15 #include <string.h>
     16 #include <fcntl.h>
     17 #include <unistd.h>
     18 #include <errno.h>
     19 #include <err.h>
     20 
     21 /* Libevent. */
     22 #include <event.h>
     23 
     24 /* Port to listen on. */
     25 #define SERVER_PORT 5555
     26 
     27 /**
     28 * A struct for client specific data, also includes pointer to create
     29 * a list of clients.
     30 */
     31 struct client {
     32     /* The clients socket. */
     33     int fd;
     34 
     35     /* The bufferedevent for this client. */
     36     struct bufferevent *buf_ev;
     37 };
     38 
     39 /**
     40 * Set a socket to non-blocking mode.
     41 */
     42 //用于设置非阻塞
     43 int
     44 setnonblock(int fd)
     45 {
     46     int flags;
     47 
     48     flags = fcntl(fd, F_GETFL);
     49     if (flags < 0)
     50         return flags;
     51     flags |= O_NONBLOCK;
     52     if (fcntl(fd, F_SETFL, flags) < 0)
     53         return -1;
     54 
     55     return 0;
     56 }
     57 
     58 /**
     59 * Called by libevent when there is data to read.
     60 */
     61 void
     62 buffered_on_read(struct bufferevent *bev, void *arg)
     63 {
     64     /* Write back the read buffer. It is important to note that
     65     * bufferevent_write_buffer will drain the incoming data so it
     66     * is effectively gone after we call it. */
     67     char msg[4096];
     68 
     69     size_t len = bufferevent_read(bev, msg, sizeof(msg));
     70 
     71     msg[len] = '';
     72     printf("recv the client msg: %s
    ", msg);
     73 
     74     char reply_msg[4096] = "I have recvieced the msg: ";
     75     strcat(reply_msg + strlen(reply_msg), msg);
     76     bufferevent_write(bev, reply_msg, strlen(reply_msg));
     77 
     78 }
     79 
     80 /**
     81 * Called by libevent when the write buffer reaches 0.  We only
     82 * provide this because libevent expects it, but we don't use it.
     83 */
     84 //当写缓冲区达到低水位时触发调用,我们这边不用
     85 void
     86 buffered_on_write(struct bufferevent *bev, void *arg)
     87 {
     88     
     89 }
     90 
     91 /**
     92 * Called by libevent when there is an error on the underlying socket
     93 * descriptor.
     94 */
     95 void
     96 buffered_on_error(struct bufferevent *bev, short what, void *arg)
     97 {
     98     struct client *client = (struct client *)arg;
     99 
    100     if (what & EVBUFFER_EOF) {
    101         /* Client disconnected, remove the read event and the
    102         * free the client structure. */
    103         printf("Client disconnected.
    ");
    104     }
    105     else {
    106         warn("Client socket error, disconnecting.
    ");
    107     }
    108     bufferevent_free(client->buf_ev);
    109     close(client->fd);
    110     free(client);
    111 }
    112 
    113 /**
    114 * This function will be called by libevent when there is a connection
    115 * ready to be accepted.
    116 */
    117 void
    118 on_accept(int fd, short ev, void *arg)
    119 {
    120     int client_fd;
    121     struct sockaddr_in client_addr;
    122     socklen_t client_len = sizeof(client_addr);
    123     struct client *client;
    124 
    125     client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
    126     if (client_fd < 0) {
    127         warn("accept failed");
    128         return;
    129     }
    130 
    131     /* Set the client socket to non-blocking mode. */
    132     if (setnonblock(client_fd) < 0)
    133         warn("failed to set client socket non-blocking");
    134 
    135     /* We've accepted a new client, create a client object. */
    136     client = calloc(1, sizeof(*client));
    137     if (client == NULL)
    138         err(1, "malloc failed");
    139     client->fd = client_fd;
    140 
    141     /* Create the buffered event.
    142     *
    143     * The first argument is the file descriptor that will trigger
    144     * the events, in this case the clients socket.
    145     *
    146     * The second argument is the callback that will be called
    147     * when data has been read from the socket and is available to
    148     * the application.
    149     *
    150     * The third argument is a callback to a function that will be
    151     * called when the write buffer has reached a low watermark.
    152     * That usually means that when the write buffer is 0 length,
    153     * this callback will be called.  It must be defined, but you
    154     * don't actually have to do anything in this callback.
    155     *
    156     * The fourth argument is a callback that will be called when
    157     * there is a socket error.  This is where you will detect
    158     * that the client disconnected or other socket errors.
    159     *
    160     * The fifth and final argument is to store an argument in
    161     * that will be passed to the callbacks.  We store the client
    162     * object here.
    163     */
    164     client->buf_ev = bufferevent_new(client_fd, buffered_on_read,
    165         buffered_on_write, buffered_on_error, client);
    166     client->buf_ev->wm_read.high = 10;
    167     client->buf_ev->wm_read.low = 0;
    168     /* We have to enable it before our callbacks will be
    169     * called. */
    170     bufferevent_enable(client->buf_ev, EV_READ);
    171 
    172     printf("Accepted connection from %s
    ",
    173         inet_ntoa(client_addr.sin_addr));
    174 }
    175 
    176 int
    177 main(int argc, char **argv)
    178 {
    179     int listen_fd;
    180     struct sockaddr_in listen_addr;
    181     struct event ev_accept;
    182     int reuseaddr_on;
    183 
    184     /* Initialize libevent. */
    185     event_init();
    186 
    187     /* Create our listening socket. */
    188     listen_fd = socket(AF_INET, SOCK_STREAM, 0);
    189     if (listen_fd < 0)
    190         err(1, "listen failed");
    191     memset(&listen_addr, 0, sizeof(listen_addr));
    192     listen_addr.sin_family = AF_INET;
    193     listen_addr.sin_addr.s_addr = INADDR_ANY;
    194     listen_addr.sin_port = htons(SERVER_PORT);
    195     if (bind(listen_fd, (struct sockaddr *)&listen_addr,
    196         sizeof(listen_addr)) < 0)
    197         err(1, "bind failed");
    198     if (listen(listen_fd, 5) < 0)
    199         err(1, "listen failed");
    200     reuseaddr_on = 1;
    201     setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on,
    202         sizeof(reuseaddr_on));
    203 
    204     /* Set the socket to non-blocking, this is essential in event
    205     * based programming with libevent. */
    206     if (setnonblock(listen_fd) < 0)
    207         err(1, "failed to set server socket to non-blocking");
    208 
    209     /* We now have a listening socket, we create a read event to
    210     * be notified when a client connects. */
    211     event_set(&ev_accept, listen_fd, EV_READ | EV_PERSIST, on_accept, NULL);
    212     event_add(&ev_accept, NULL);
    213 
    214     /* Start the event loop. */
    215     event_dispatch();
    216 
    217     return 0;
    218 }

    client.c

    读键盘输入,发送到服务端,服务端再返回,客户端回显。

    gcc -g -Wall -I/usr/local/include -o client client.c -L/usr/local/lib -levent

      1 #include<sys/types.h>
      2 #include<sys/socket.h>
      3 #include<netinet/in.h>
      4 #include<arpa/inet.h>
      5 #include<errno.h>
      6 #include<unistd.h>
      7 
      8 #include <stdlib.h>
      9 #include <stdio.h>
     10 #include <string.h>
     11 #include <fcntl.h>
     12 #include <err.h>
     13 
     14 #include<event.h>
     15 
     16 #define SERVER_PORT 5555
     17 
     18 
     19 //服务端信息
     20 struct server {
     21     /* The server socket. */
     22     int fd;
     23 
     24     /* The bufferedevent for this server. */
     25     struct bufferevent *buf_ev;
     26 };
     27 
     28 //全局server数据
     29 struct server *serv;
     30 
     31 //设置文件状态标记
     32 int setnonblock(int fd)
     33 {
     34     int flags;
     35     flags = fcntl(fd, F_GETFL);
     36     if (flags < 0)
     37         return flags;
     38     flags |= O_NONBLOCK;
     39     if (fcntl(fd, F_SETFL, flags) < 0)
     40         return -1;
     41     return 0;
     42 }
     43 
     44 //键盘事件
     45 void cmd_msg_cb(int fd, short events, void* arg)
     46 {
     47     printf("cmd_msg_cb
    ");
     48     char msg[1024];
     49 
     50     int ret = read(fd, msg, sizeof(msg));
     51     if (ret < 0)
     52     {
     53         perror("read fail ");
     54         exit(1);
     55     }
     56     struct bufferevent* bev = (struct bufferevent*)arg;
     57     //把终端的消息发送给服务器端
     58     bufferevent_write(bev, msg, ret);
     59 }
     60 
     61 //读服务端发来的数据
     62 void read_msg_cb(struct bufferevent* bev, void* arg)
     63 {
     64     printf("read_msg_cb
    ");
     65     char msg[1024];
     66 
     67     size_t len = bufferevent_read(bev, msg, sizeof(msg));
     68     msg[len] = '';
     69     printf("recv %s from server", msg);
     70 }
     71 
     72 //连接断开或者出错回调
     73 void event_error(struct bufferevent *bev, short event, void *arg)
     74 {
     75     printf("event_error
    ");
     76     if (event & EVBUFFER_EOF)
     77         printf("connection closed
    ");
     78     else if (event & EVBUFFER_ERROR)
     79         printf("some other error
    ");
     80     struct event *ev = (struct event*)arg;
     81     //因为socket已经没有,所以这个event也没有存在的必要了  
     82     free(ev);
     83     //当发生错误退出事件循环
     84     event_loopexit(0);
     85     bufferevent_free(bev);
     86 }
     87 
     88 //连接到server
     89 typedef struct sockaddr SA;
     90 int tcp_connect_server(const char* server_ip, int port)
     91 {
     92     int sockfd, status, save_errno;
     93     struct sockaddr_in server_addr;
     94 
     95     memset(&server_addr, 0, sizeof(server_addr));
     96 
     97     server_addr.sin_family = AF_INET;
     98     server_addr.sin_port = htons(port);
     99     status = inet_aton(server_ip, &server_addr.sin_addr);
    100 
    101     if (status == 0) //the server_ip is not valid value
    102     {
    103         errno = EINVAL;
    104         return -1;
    105     }
    106 
    107     sockfd = socket(AF_INET, SOCK_STREAM, 0);
    108     if (sockfd == -1)
    109         return sockfd;
    110     status = connect(sockfd, (SA*)&server_addr, sizeof(server_addr));
    111 
    112     if (status == -1)
    113     {
    114         save_errno = errno;
    115         close(sockfd);
    116         errno = save_errno; //the close may be error
    117         return -1;
    118     }
    119 
    120     setnonblock(sockfd);
    121 
    122     return sockfd;
    123 }
    124 
    125 
    126 int main(int argc, char** argv)
    127 {
    128 
    129     event_init();
    130     //测试用直接连接本地server
    131     int sockfd = tcp_connect_server("127.0.0.1", SERVER_PORT);
    132     if (sockfd == -1)
    133     {
    134         perror("tcp_connect error ");
    135         return -1;
    136     }
    137     
    138     printf("connect to server successful
    ");
    139     serv = calloc(1, sizeof(*serv));
    140     if (serv == NULL)
    141         err(1, "malloc failed");
    142     serv->fd = sockfd;
    143     serv->buf_ev = bufferevent_new(sockfd, read_msg_cb,
    144         NULL, NULL, (void *)serv);
    145 
    146     //监听终端输入事件
    147     struct event *ev_cmd = calloc(1,sizeof(*ev_cmd));
    148     event_set(ev_cmd, STDIN_FILENO,
    149         EV_READ | EV_PERSIST, cmd_msg_cb,
    150         (void*)serv->buf_ev);
    151     event_add(ev_cmd, NULL);
    152     //设置下read和发生错误的回调函数。(当socket关闭时会用到回调参数,删除键盘事件)
    153     bufferevent_setcb(serv->buf_ev, read_msg_cb, NULL, event_error, (void*)ev_cmd);
    154     bufferevent_enable(serv->buf_ev, EV_READ| EV_PERSIST);
    155     event_dispatch();
    156     printf("finished 
    ");
    157     return 0;
    158 }

    过程

    1.运行 ./server

    2.运行./client

    3.服务端显示连接成功

    4.键入abcdefghijklmn回车

    5.服务器接收到数据

    由于读缓冲区高水位为10,低水位为0。所以接到abcdefghij后出发用户事件读掉缓冲区数据,然后再读klmn回车。多空一行是键盘输入的回车也读到了。

    6.客户端回显

    7.在服务端终端中按下ctrl+c

    8.客户端如下

    测试了client.c中加入的event_error。event_error执行退出事件循环。

  • 相关阅读:
    Python 参数设置
    python日志模块logging
    推荐一种快速提高英语口语的方法
    Windows 远程桌面文件传输的方法
    敏捷术语
    Timeout watchdog using a standby thread
    ZFS -世界上最高级的文件系统之一
    Linux下创建和删除用户
    linux 学习之路
    Difference between a Hard Link and Soft (Symbolic) Link
  • 原文地址:https://www.cnblogs.com/nengm1988/p/8203784.html
Copyright © 2011-2022 走看看