zoukankan      html  css  js  c++  java
  • libevent学习

    Because generating and reading the select() bit arrays takes time proportional to the largest fd that you provided for select(), the select() call scales terribly when the number of sockets is high. 

    Different operating systems have provided different replacement functions for select. These include poll(), epoll(), kqueue(), evports, and /dev/poll. All of these give better performance than select(), and all but poll() give O(1) performance for adding a socket, removing a socket, and for noticing that a socket is ready for IO.

    Unfortunately, none of the efficient interfaces is a ubiquitous standard.

    Linux has epoll(), the BSDs (including Darwin) have kqueue(), Solaris has evports and /dev/poll… and none of these operating systems has any of the others.

    So if you want to write a portable high-performance asynchronous application, you’ll need an abstraction that wraps all of these interfaces, and provides whichever one of them is the most efficient.

    And that’s what the lowest level of the Libevent API does for you. It provides a consistent interface to various select() replacements, using the most efficient version available on the computer where it’s running.

    Here’s yet another version of our asynchronous ROT13 server. This time, it uses Libevent 2 instead of select(). Note that the fd_sets are gone now: instead, we associate and disassociate events with a struct event_base, which might be implemented in terms of select(), poll(), epoll(), kqueue(), etc.

    贴一例代码吧

    Example: A low-level ROT13 server with Libevent
      1 /* For sockaddr_in */
      2 #include <netinet/in.h>
      3 /* For socket functions */
      4 #include <sys/socket.h>
      5 /* For fcntl */
      6 #include <fcntl.h>
      7 
      8 #include <event2/event.h>
      9 
     10 #include <assert.h>
     11 #include <unistd.h>
     12 #include <string.h>
     13 #include <stdlib.h>
     14 #include <stdio.h>
     15 #include <errno.h>
     16 
     17 #define MAX_LINE 16384
     18 
     19 void do_read(evutil_socket_t fd, short events, void *arg);
     20 void do_write(evutil_socket_t fd, short events, void *arg);
     21 
     22 char
     23 rot13_char(char c)
     24 {
     25     /* We don't want to use isalpha here; setting the locale would change
     26      * which characters are considered alphabetical. */
     27     if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
     28         return c + 13;
     29     else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
     30         return c - 13;
     31     else
     32         return c;
     33 }
     34 
     35 struct fd_state {
     36     char buffer[MAX_LINE];
     37     size_t buffer_used;
     38 
     39     size_t n_written;
     40     size_t write_upto;
     41 
     42     struct event *read_event;
     43     struct event *write_event;
     44 };
     45 
     46 struct fd_state *
     47 alloc_fd_state(struct event_base *base, evutil_socket_t fd)
     48 {
     49     struct fd_state *state = malloc(sizeof(struct fd_state));
     50     if (!state)
     51         return NULL;
     52     state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state);
     53     if (!state->read_event) {
     54         free(state);
     55         return NULL;
     56     }
     57     state->write_event =
     58         event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state);
     59 
     60     if (!state->write_event) {
     61         event_free(state->read_event);
     62         free(state);
     63         return NULL;
     64     }
     65 
     66     state->buffer_used = state->n_written = state->write_upto = 0;
     67 
     68     assert(state->write_event);
     69     return state;
     70 }
     71 
     72 void
     73 free_fd_state(struct fd_state *state)
     74 {
     75     event_free(state->read_event);
     76     event_free(state->write_event);
     77     free(state);
     78 }
     79 
     80 void
     81 do_read(evutil_socket_t fd, short events, void *arg)
     82 {
     83     struct fd_state *state = arg;
     84     char buf[1024];
     85     int i;
     86     ssize_t result;
     87     while (1) {
     88         assert(state->write_event);
     89         result = recv(fd, buf, sizeof(buf), 0);
     90         if (result <= 0)
     91             break;
     92 
     93         for (i=0; i < result; ++i)  {
     94             if (state->buffer_used < sizeof(state->buffer))
     95                 state->buffer[state->buffer_used++] = rot13_char(buf[i]);
     96             if (buf[i] == '
    ') {
     97                 assert(state->write_event);
     98                 event_add(state->write_event, NULL);
     99                 state->write_upto = state->buffer_used;
    100             }
    101         }
    102     }
    103 
    104     if (result == 0) {
    105         free_fd_state(state);
    106     } else if (result < 0) {
    107         if (errno == EAGAIN) // XXXX use evutil macro
    108             return;
    109         perror("recv");
    110         free_fd_state(state);
    111     }
    112 }
    113 
    114 void
    115 do_write(evutil_socket_t fd, short events, void *arg)
    116 {
    117     struct fd_state *state = arg;
    118 
    119     while (state->n_written < state->write_upto) {
    120         ssize_t result = send(fd, state->buffer + state->n_written,
    121                               state->write_upto - state->n_written, 0);
    122         if (result < 0) {
    123             if (errno == EAGAIN) // XXX use evutil macro
    124                 return;
    125             free_fd_state(state);
    126             return;
    127         }
    128         assert(result != 0);
    129 
    130         state->n_written += result;
    131     }
    132 
    133     if (state->n_written == state->buffer_used)
    134         state->n_written = state->write_upto = state->buffer_used = 1;
    135 
    136     event_del(state->write_event);
    137 }
    138 
    139 void
    140 do_accept(evutil_socket_t listener, short event, void *arg)
    141 {
    142     struct event_base *base = arg;
    143     struct sockaddr_storage ss;
    144     socklen_t slen = sizeof(ss);
    145     int fd = accept(listener, (struct sockaddr*)&ss, &slen);
    146     if (fd < 0) { // XXXX eagain??
    147         perror("accept");
    148     } else if (fd > FD_SETSIZE) {
    149         close(fd); // XXX replace all closes with EVUTIL_CLOSESOCKET */
    150     } else {
    151         struct fd_state *state;
    152         evutil_make_socket_nonblocking(fd);
    153         state = alloc_fd_state(base, fd);
    154         assert(state); /*XXX err*/
    155         assert(state->write_event);
    156         event_add(state->read_event, NULL);
    157     }
    158 }
    159 
    160 void
    161 run(void)
    162 {
    163     evutil_socket_t listener;
    164     struct sockaddr_in sin;
    165     struct event_base *base;
    166     struct event *listener_event;
    167 
    168     base = event_base_new();
    169     if (!base)
    170         return; /*XXXerr*/
    171 
    172     sin.sin_family = AF_INET;
    173     sin.sin_addr.s_addr = 0;
    174     sin.sin_port = htons(40713);
    175 
    176     listener = socket(AF_INET, SOCK_STREAM, 0);
    177     evutil_make_socket_nonblocking(listener);
    178 
    179 #ifndef WIN32
    180     {
    181         int one = 1;
    182         setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
    183     }
    184 #endif
    185 
    186     if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
    187         perror("bind");
    188         return;
    189     }
    190 
    191     if (listen(listener, 16)<0) {
    192         perror("listen");
    193         return;
    194     }
    195 
    196     listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
    197     /*XXX check it */
    198     event_add(listener_event, NULL);
    199 
    200     event_base_dispatch(base);
    201 }
    202 
    203 int
    204 main(int c, char **v)
    205 {
    206     setvbuf(stdout, NULL, _IONBF, 0);
    207 
    208     run();
    209     return 0;
    210 }

    (Other things to note in the code: instead of typing the sockets as "int", we’re using the type evutil_socket_t. Instead of calling fcntl(O_NONBLOCK) to make the sockets nonblocking, we’re calling evutil_make_socket_nonblocking. These changes make our code compatible with the divergent parts of the Win32 networking API.)

    About "bufferevents"

    If you’re deeply experienced with networking on Windows, you’ll realize that Libevent probably isn’t getting optimal performance when it’s used as in the example above. On Windows, the way you do fast asynchronous IO is not with a select()-like interface: it’s by using the IOCP (IO Completion Ports) API. Unlike all the fast networking APIs, IOCP does not alert your program when a socket is ready for an operation that your program then has to perform. Instead, the program tells the Windows networking stack to start a network operation, and IOCP tells the program when the operation has finished.

    Fortunately, the Libevent 2 "bufferevents" interface solves both of these issues: it makes programs much simpler to write, and provides an interface that Libevent can implement efficiently on Windows and on Unix.

    Here’s our ROT13 server one last time, using the bufferevents API.

      1 /* For sockaddr_in */
      2 #include <netinet/in.h>
      3 /* For socket functions */
      4 #include <sys/socket.h>
      5 /* For fcntl */
      6 #include <fcntl.h>
      7 
      8 #include <event2/event.h>
      9 #include <event2/buffer.h>
     10 #include <event2/bufferevent.h>
     11 
     12 #include <assert.h>
     13 #include <unistd.h>
     14 #include <string.h>
     15 #include <stdlib.h>
     16 #include <stdio.h>
     17 #include <errno.h>
     18 
     19 #define MAX_LINE 16384
     20 
     21 void do_read(evutil_socket_t fd, short events, void *arg);
     22 void do_write(evutil_socket_t fd, short events, void *arg);
     23 
     24 char
     25 rot13_char(char c)
     26 {
     27     /* We don't want to use isalpha here; setting the locale would change
     28      * which characters are considered alphabetical. */
     29     if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
     30         return c + 13;
     31     else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
     32         return c - 13;
     33     else
     34         return c;
     35 }
     36 
     37 void
     38 readcb(struct bufferevent *bev, void *ctx)
     39 {
     40     struct evbuffer *input, *output;
     41     char *line;
     42     size_t n;
     43     int i;
     44     input = bufferevent_get_input(bev);
     45     output = bufferevent_get_output(bev);
     46 
     47     while ((line = evbuffer_readln(input, &n, EVBUFFER_EOL_LF))) {
     48         for (i = 0; i < n; ++i)
     49             line[i] = rot13_char(line[i]);
     50         evbuffer_add(output, line, n);
     51         evbuffer_add(output, "
    ", 1);
     52         free(line);
     53     }
     54 
     55     if (evbuffer_get_length(input) >= MAX_LINE) {
     56         /* Too long; just process what there is and go on so that the buffer
     57          * doesn't grow infinitely long. */
     58         char buf[1024];
     59         while (evbuffer_get_length(input)) {
     60             int n = evbuffer_remove(input, buf, sizeof(buf));
     61             for (i = 0; i < n; ++i)
     62                 buf[i] = rot13_char(buf[i]);
     63             evbuffer_add(output, buf, n);
     64         }
     65         evbuffer_add(output, "
    ", 1);
     66     }
     67 }
     68 
     69 void
     70 errorcb(struct bufferevent *bev, short error, void *ctx)
     71 {
     72     if (error & BEV_EVENT_EOF) {
     73         /* connection has been closed, do any clean up here */
     74         /* ... */
     75     } else if (error & BEV_EVENT_ERROR) {
     76         /* check errno to see what error occurred */
     77         /* ... */
     78     } else if (error & BEV_EVENT_TIMEOUT) {
     79         /* must be a timeout event handle, handle it */
     80         /* ... */
     81     }
     82     bufferevent_free(bev);
     83 }
     84 
     85 void
     86 do_accept(evutil_socket_t listener, short event, void *arg)
     87 {
     88     struct event_base *base = arg;
     89     struct sockaddr_storage ss;
     90     socklen_t slen = sizeof(ss);
     91     int fd = accept(listener, (struct sockaddr*)&ss, &slen);
     92     if (fd < 0) {
     93         perror("accept");
     94     } else if (fd > FD_SETSIZE) {
     95         close(fd);
     96     } else {
     97         struct bufferevent *bev;
     98         evutil_make_socket_nonblocking(fd);
     99         bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
    100         bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);
    101         bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE);
    102         bufferevent_enable(bev, EV_READ|EV_WRITE);
    103     }
    104 }
    105 
    106 void
    107 run(void)
    108 {
    109     evutil_socket_t listener;
    110     struct sockaddr_in sin;
    111     struct event_base *base;
    112     struct event *listener_event;
    113 
    114     base = event_base_new();
    115     if (!base)
    116         return; /*XXXerr*/
    117 
    118     sin.sin_family = AF_INET;
    119     sin.sin_addr.s_addr = 0;
    120     sin.sin_port = htons(40713);
    121 
    122     listener = socket(AF_INET, SOCK_STREAM, 0);
    123     evutil_make_socket_nonblocking(listener);
    124 
    125 #ifndef WIN32
    126     {
    127         int one = 1;
    128         setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
    129     }
    130 #endif
    131 
    132     if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
    133         perror("bind");
    134         return;
    135     }
    136 
    137     if (listen(listener, 16)<0) {
    138         perror("listen");
    139         return;
    140     }
    141 
    142     listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
    143     /*XXX check it */
    144     event_add(listener_event, NULL);
    145 
    146     event_base_dispatch(base);
    147 }
    148 
    149 int
    150 main(int c, char **v)
    151 {
    152     setvbuf(stdout, NULL, _IONBF, 0);
    153 
    154     run();
    155     return 0;
    156 }
  • 相关阅读:
    weibo4j中的 jar解释
    Android使用系统Intent实现分享功能及将应用加入分享列表++分享邮箱实现
    Android ListView 技巧 (一) Android ListView Header
    [Android]使用AChartEngine画柱状图 .
    【TCPIP】握手挥手详解
    python : import this. 如此幽默,让人不得不爱
    Forbidden (403) CSRF verification failed. Request aborted.
    Django修改model如何同步数据库
    程序员的7种武器【转】
    Django Admin 录入中文错误解决办法
  • 原文地址:https://www.cnblogs.com/xiaokuang/p/4550585.html
Copyright © 2011-2022 走看看