  • libevent学习

    Because generating and reading the select() bit arrays takes time proportional to the largest fd that you provided for select(), the select() call scales terribly when the number of sockets is high. 

    Different operating systems have provided different replacement functions for select. These include poll(), epoll(), kqueue(), evports, and /dev/poll. All of these give better performance than select(), and all but poll() give O(1) performance for adding a socket, removing a socket, and for noticing that a socket is ready for IO.

    Unfortunately, none of the efficient interfaces is a ubiquitous standard.

    Linux has epoll(), the BSDs (including Darwin) have kqueue(), Solaris has evports and /dev/poll… and none of these operating systems has any of the others.

    So if you want to write a portable high-performance asynchronous application, you’ll need an abstraction that wraps all of these interfaces, and provides whichever one of them is the most efficient.

    And that’s what the lowest level of the Libevent API does for you. It provides a consistent interface to various select() replacements, using the most efficient version available on the computer where it’s running.

    Here’s yet another version of our asynchronous ROT13 server. This time, it uses Libevent 2 instead of select(). Note that the fd_sets are gone now: instead, we associate and disassociate events with a struct event_base, which might be implemented in terms of select(), poll(), epoll(), kqueue(), etc.


    Example: A low-level ROT13 server with Libevent
      1 /* For sockaddr_in */
      2 #include <netinet/in.h>
      3 /* For socket functions */
      4 #include <sys/socket.h>
      5 /* For fcntl */
      6 #include <fcntl.h>
      8 #include <event2/event.h>
     10 #include <assert.h>
     11 #include <unistd.h>
     12 #include <string.h>
     13 #include <stdlib.h>
     14 #include <stdio.h>
     15 #include <errno.h>
     17 #define MAX_LINE 16384
     19 void do_read(evutil_socket_t fd, short events, void *arg);
     20 void do_write(evutil_socket_t fd, short events, void *arg);
     22 char
     23 rot13_char(char c)
     24 {
     25     /* We don't want to use isalpha here; setting the locale would change
     26      * which characters are considered alphabetical. */
     27     if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
     28         return c + 13;
     29     else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
     30         return c - 13;
     31     else
     32         return c;
     33 }
     35 struct fd_state {
     36     char buffer[MAX_LINE];
     37     size_t buffer_used;
     39     size_t n_written;
     40     size_t write_upto;
     42     struct event *read_event;
     43     struct event *write_event;
     44 };
     46 struct fd_state *
     47 alloc_fd_state(struct event_base *base, evutil_socket_t fd)
     48 {
     49     struct fd_state *state = malloc(sizeof(struct fd_state));
     50     if (!state)
     51         return NULL;
     52     state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state);
     53     if (!state->read_event) {
     54         free(state);
     55         return NULL;
     56     }
     57     state->write_event =
     58         event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state);
     60     if (!state->write_event) {
     61         event_free(state->read_event);
     62         free(state);
     63         return NULL;
     64     }
     66     state->buffer_used = state->n_written = state->write_upto = 0;
     68     assert(state->write_event);
     69     return state;
     70 }
     72 void
     73 free_fd_state(struct fd_state *state)
     74 {
     75     event_free(state->read_event);
     76     event_free(state->write_event);
     77     free(state);
     78 }
     80 void
     81 do_read(evutil_socket_t fd, short events, void *arg)
     82 {
     83     struct fd_state *state = arg;
     84     char buf[1024];
     85     int i;
     86     ssize_t result;
     87     while (1) {
     88         assert(state->write_event);
     89         result = recv(fd, buf, sizeof(buf), 0);
     90         if (result <= 0)
     91             break;
     93         for (i=0; i < result; ++i)  {
     94             if (state->buffer_used < sizeof(state->buffer))
     95                 state->buffer[state->buffer_used++] = rot13_char(buf[i]);
     96             if (buf[i] == '
    ') {
     97                 assert(state->write_event);
     98                 event_add(state->write_event, NULL);
     99                 state->write_upto = state->buffer_used;
    100             }
    101         }
    102     }
    104     if (result == 0) {
    105         free_fd_state(state);
    106     } else if (result < 0) {
    107         if (errno == EAGAIN) // XXXX use evutil macro
    108             return;
    109         perror("recv");
    110         free_fd_state(state);
    111     }
    112 }
    114 void
    115 do_write(evutil_socket_t fd, short events, void *arg)
    116 {
    117     struct fd_state *state = arg;
    119     while (state->n_written < state->write_upto) {
    120         ssize_t result = send(fd, state->buffer + state->n_written,
    121                               state->write_upto - state->n_written, 0);
    122         if (result < 0) {
    123             if (errno == EAGAIN) // XXX use evutil macro
    124                 return;
    125             free_fd_state(state);
    126             return;
    127         }
    128         assert(result != 0);
    130         state->n_written += result;
    131     }
    133     if (state->n_written == state->buffer_used)
    134         state->n_written = state->write_upto = state->buffer_used = 1;
    136     event_del(state->write_event);
    137 }
    139 void
    140 do_accept(evutil_socket_t listener, short event, void *arg)
    141 {
    142     struct event_base *base = arg;
    143     struct sockaddr_storage ss;
    144     socklen_t slen = sizeof(ss);
    145     int fd = accept(listener, (struct sockaddr*)&ss, &slen);
    146     if (fd < 0) { // XXXX eagain??
    147         perror("accept");
    148     } else if (fd > FD_SETSIZE) {
    149         close(fd); // XXX replace all closes with EVUTIL_CLOSESOCKET */
    150     } else {
    151         struct fd_state *state;
    152         evutil_make_socket_nonblocking(fd);
    153         state = alloc_fd_state(base, fd);
    154         assert(state); /*XXX err*/
    155         assert(state->write_event);
    156         event_add(state->read_event, NULL);
    157     }
    158 }
    160 void
    161 run(void)
    162 {
    163     evutil_socket_t listener;
    164     struct sockaddr_in sin;
    165     struct event_base *base;
    166     struct event *listener_event;
    168     base = event_base_new();
    169     if (!base)
    170         return; /*XXXerr*/
    172     sin.sin_family = AF_INET;
    173     sin.sin_addr.s_addr = 0;
    174     sin.sin_port = htons(40713);
    176     listener = socket(AF_INET, SOCK_STREAM, 0);
    177     evutil_make_socket_nonblocking(listener);
    179 #ifndef WIN32
    180     {
    181         int one = 1;
    182         setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
    183     }
    184 #endif
    186     if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
    187         perror("bind");
    188         return;
    189     }
    191     if (listen(listener, 16)<0) {
    192         perror("listen");
    193         return;
    194     }
    196     listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
    197     /*XXX check it */
    198     event_add(listener_event, NULL);
    200     event_base_dispatch(base);
    201 }
    203 int
    204 main(int c, char **v)
    205 {
    206     setvbuf(stdout, NULL, _IONBF, 0);
    208     run();
    209     return 0;
    210 }

    (Other things to note in the code: instead of typing the sockets as "int", we’re using the type evutil_socket_t. Instead of calling fcntl(O_NONBLOCK) to make the sockets nonblocking, we’re calling evutil_make_socket_nonblocking. These changes make our code compatible with the divergent parts of the Win32 networking API.)

    About "bufferevents"

    If you’re deeply experienced with networking on Windows, you’ll realize that Libevent probably isn’t getting optimal performance when it’s used as in the example above. On Windows, the way you do fast asynchronous IO is not with a select()-like interface: it’s by using the IOCP (IO Completion Ports) API. Unlike all the fast networking APIs, IOCP does not alert your program when a socket is ready for an operation that your program then has to perform. Instead, the program tells the Windows networking stack to start a network operation, and IOCP tells the program when the operation has finished.

    Fortunately, the Libevent 2 "bufferevents" interface solves both of these issues: it makes programs much simpler to write, and provides an interface that Libevent can implement efficiently on Windows and on Unix.

    Here’s our ROT13 server one last time, using the bufferevents API.

      1 /* For sockaddr_in */
      2 #include <netinet/in.h>
      3 /* For socket functions */
      4 #include <sys/socket.h>
      5 /* For fcntl */
      6 #include <fcntl.h>
      8 #include <event2/event.h>
      9 #include <event2/buffer.h>
     10 #include <event2/bufferevent.h>
     12 #include <assert.h>
     13 #include <unistd.h>
     14 #include <string.h>
     15 #include <stdlib.h>
     16 #include <stdio.h>
     17 #include <errno.h>
     19 #define MAX_LINE 16384
     21 void do_read(evutil_socket_t fd, short events, void *arg);
     22 void do_write(evutil_socket_t fd, short events, void *arg);
     24 char
     25 rot13_char(char c)
     26 {
     27     /* We don't want to use isalpha here; setting the locale would change
     28      * which characters are considered alphabetical. */
     29     if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
     30         return c + 13;
     31     else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
     32         return c - 13;
     33     else
     34         return c;
     35 }
     37 void
     38 readcb(struct bufferevent *bev, void *ctx)
     39 {
     40     struct evbuffer *input, *output;
     41     char *line;
     42     size_t n;
     43     int i;
     44     input = bufferevent_get_input(bev);
     45     output = bufferevent_get_output(bev);
     47     while ((line = evbuffer_readln(input, &n, EVBUFFER_EOL_LF))) {
     48         for (i = 0; i < n; ++i)
     49             line[i] = rot13_char(line[i]);
     50         evbuffer_add(output, line, n);
     51         evbuffer_add(output, "
    ", 1);
     52         free(line);
     53     }
     55     if (evbuffer_get_length(input) >= MAX_LINE) {
     56         /* Too long; just process what there is and go on so that the buffer
     57          * doesn't grow infinitely long. */
     58         char buf[1024];
     59         while (evbuffer_get_length(input)) {
     60             int n = evbuffer_remove(input, buf, sizeof(buf));
     61             for (i = 0; i < n; ++i)
     62                 buf[i] = rot13_char(buf[i]);
     63             evbuffer_add(output, buf, n);
     64         }
     65         evbuffer_add(output, "
    ", 1);
     66     }
     67 }
     69 void
     70 errorcb(struct bufferevent *bev, short error, void *ctx)
     71 {
     72     if (error & BEV_EVENT_EOF) {
     73         /* connection has been closed, do any clean up here */
     74         /* ... */
     75     } else if (error & BEV_EVENT_ERROR) {
     76         /* check errno to see what error occurred */
     77         /* ... */
     78     } else if (error & BEV_EVENT_TIMEOUT) {
     79         /* must be a timeout event handle, handle it */
     80         /* ... */
     81     }
     82     bufferevent_free(bev);
     83 }
     85 void
     86 do_accept(evutil_socket_t listener, short event, void *arg)
     87 {
     88     struct event_base *base = arg;
     89     struct sockaddr_storage ss;
     90     socklen_t slen = sizeof(ss);
     91     int fd = accept(listener, (struct sockaddr*)&ss, &slen);
     92     if (fd < 0) {
     93         perror("accept");
     94     } else if (fd > FD_SETSIZE) {
     95         close(fd);
     96     } else {
     97         struct bufferevent *bev;
     98         evutil_make_socket_nonblocking(fd);
     99         bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
    100         bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);
    101         bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE);
    102         bufferevent_enable(bev, EV_READ|EV_WRITE);
    103     }
    104 }
    106 void
    107 run(void)
    108 {
    109     evutil_socket_t listener;
    110     struct sockaddr_in sin;
    111     struct event_base *base;
    112     struct event *listener_event;
    114     base = event_base_new();
    115     if (!base)
    116         return; /*XXXerr*/
    118     sin.sin_family = AF_INET;
    119     sin.sin_addr.s_addr = 0;
    120     sin.sin_port = htons(40713);
    122     listener = socket(AF_INET, SOCK_STREAM, 0);
    123     evutil_make_socket_nonblocking(listener);
    125 #ifndef WIN32
    126     {
    127         int one = 1;
    128         setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
    129     }
    130 #endif
    132     if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
    133         perror("bind");
    134         return;
    135     }
    137     if (listen(listener, 16)<0) {
    138         perror("listen");
    139         return;
    140     }
    142     listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
    143     /*XXX check it */
    144     event_add(listener_event, NULL);
    146     event_base_dispatch(base);
    147 }
    149 int
    150 main(int c, char **v)
    151 {
    152     setvbuf(stdout, NULL, _IONBF, 0);
    154     run();
    155     return 0;
    156 }
