zoukankan      html  css  js  c++  java
  • 并发服务器--02(基于I/O复用——运用epoll技术)

      本文承接自上一博文I/O复用——运用Select函数

    epoll介绍

      epoll是在2.6内核中提出的。和select类似,它也是一种I/O复用技术,是之前的select和poll的增强版本。

      Linux下设计并发网络程序,向来不缺少方法,比如典型的Apache模型(Process Per Connection,简称PPC),TPC(Thread PerConnection)模型,以及select模型和poll模型,那为何还要再引入epoll呢?我们先来看一下常用模型的缺点:

      PPC/TPC模型

      这两种模型思想类似,就是让每一个到来的连接一边自己做事去,别再来烦我。只是PPC是为它开了一个进程,而TPC开了一个线程。可是别烦我是有代价的,它要时间和空间啊,连接多了之后,那么多的进程/线程切换,这开销就上来了;因此这类模型能接受的最大连接数都不会高,一般在几百个左右。

      select模型

      1)最大并发数限制,因为一个进程所打开的FD(文件描述符)是有限制的,由FD_SETSIZE设置,默认值是1024/2048,因此Select模型的最大并发数就被相应限制了。自己改改这个FD_SETSIZE?想法虽好,可是先看看下面吧…

      2)效率问题,select每次调用都会线性扫描全部的FD集合,这样效率就会呈现线性下降,把FD_SETSIZE改大的后果就是,大家都慢慢来,什么?都超时了??!!

      3)内核/用户空间内存拷贝问题,如何让内核把FD消息通知给用户空间呢?在这个问题上select采取了内存拷贝方法。

      poll模型

      基本上效率和select是相同的,select缺点的2和3它都没有改掉。

      epoll的提升

      其实把select的缺点反过来那就是Epoll的优点了:

      1)epoll没有最大并发连接的限制,上限是最大可以打开文件的数目(一般远大于2048),一般跟系统内存关系很大。具体数目可以cat /proc/sys/fs/file-max察看。

      2)效率提升,epoll最大的优点就在于它只管“活跃”的连接,而跟连接总数无关,因此在实际的网络环境中,epoll的效率就会远远高于select和poll。

      3)内存拷贝,epoll在这点上使用了“共享内存”,所以这个内存拷贝也省略了。

    epoll接口

      epoll操作过程用到的三个接口如下:

    #include <sys/epoll.h>
    int epoll_create(int size);    
    // 返回:若成功返回非负epoll描述符,否则返回-1,并设置errno的值
    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *events);    
    // 返回:若成功返回0,否则返回-1,并设置errno的值
    int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);    
    // 返回:若成功返回在timeout时间内就绪的文件描述符数,否则返回-1,并设置errno的值

    epoll_create 

      该函数返回一个epoll的描述符(一个整数)。size用来告诉内核这个监听的数目一共有多大,不同于select()中的第一个参数给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id号/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

      如下图是刚打开服务端程序(本文ET模式服务器程序)的截图:

      

      上图中,前三个/dev/pts/1应该表示系统输入、输出及异常,socket表示监听套接字,eventepoll表示进程创建的epoll。

      另外,当有新的客户连接到服务端时,截图如下:

      

    epoll_ctl

      该函数是epoll的事件注册函数。它不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里注册要监听的事件类型。

      该函数的参数说明如下(fd是file descriptor的缩写,表示文件描述符):

      1)第一个参数是epoll_create函数的返回值。

      2)第二个参数表示动作:

      EPOLL_CTL_ADD:注册新的fd到epfd中;
      EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
      EPOLL_CTL_DEL:从epfd中删除一个fd。

      3)第三个参数表示需要监听的fd。

      4)第四个参数告诉内核需要监听什么事。struct epoll_event的结构如下:

    struct epoll_event {
        uint32_t events;    /* Epoll events */
        epoll_data_t data;  /* User data variable */
    };

      events可以是以下几个宏的集合:
      EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
      EPOLLOUT:表示对应的文件描述符可以写;
      EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
      EPOLLERR:表示对应的文件描述符发生错误;
      EPOLLHUP:表示对应的文件描述符被挂断;
      EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
      EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

    epoll_wait

      该函数等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合;maxevents告诉内核返回的events的最大大小,这个maxevents的值不能大于创建epoll_create()时的size,也必须大于0;参数timeout是超时时间(毫秒,0会立即返回,-1将永久阻塞)。该函数返回需要处理的数目,如返回0表示已超时。

    epoll工作模式

      这部分要详细参考博文Epoll在LT和ET模式下的读写方式(已附于文章最后)。

      在一个非阻塞的socket上调用read/write函数, 返回EAGAIN或者EWOULDBLOCK(注: EAGAIN就是EWOULDBLOCK)

      从字面上看,意思是:
      * EAGAIN:再试一次
      * EWOULDBLOCK:如果这是一个阻塞socket,操作将被block
      * perror输出:Resource temporarily unavailable

      总结:
      这个错误表示资源暂时不够,可能read时,读缓冲区没有数据,或者,write时,写缓冲区满了。
      遇到这种情况,如果是阻塞socket,read/write就要阻塞掉。而如果是非阻塞socket,read/write立即返回-1,同时errno设置为EAGAIN。所以,对于阻塞socket, read/write返回-1代表网络出错了。但对于非阻塞socket, read/write返回-1不一定网络真的出错了,可能是Resource temporarily unavailable。 这时我们应该再试,直到Resource available。
      
      综上,对于non-blocking的socket,正确的读写操作为:
      读:忽略掉errno = EAGAIN的错误,下次继续读。
      写:忽略掉errno = EAGAIN的错误,下次继续写。
      对于select和epoll的LT模式,这种读写方式是没有问题的。 但对于epoll的ET模式,这种方式还有漏洞。

      epoll对文件描述符的操作有两种模式:LT(level trigger)和ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下:

      LT模式:只要某个监听中的文件描述符处于readable/writable状态,无论什么时候进行epoll_wait都会返回该描述符;

      ET模式:只有某个监听中的文件描述符从unreadable变为readable或从unwritable变为writable时,epoll_wait才会返回该描述符。

      ET模式下套接字要设定为non-blocking

      我们先来看一下Linux官方给出的手册关于LT和ET的一个例子

      “

      The epoll event distribution interface is able to behave both as
           edge-triggered (ET) and as level-triggered (LT).  The difference
           between the two mechanisms can be described as follows.  Suppose that
           this scenario happens:
    
           1. The file descriptor that represents the read side of a pipe (rfd)
              is registered on the epoll instance.
           2. A pipe writer writes 2 kB of data on the write side of the pipe.
           3. A call to epoll_wait(2) is done that will return rfd as a ready
              file descriptor.
           4. The pipe reader reads 1 kB of data from rfd.
           5. A call to epoll_wait(2) is done.
    
           If the rfd file descriptor has been added to the epoll interface
           using the EPOLLET (edge-triggered) flag, the call to epoll_wait(2)
           done in step 5 will probably hang despite the available data still
           present in the file input buffer; meanwhile the remote peer might be
           expecting a response based on the data it already sent.  The reason
           for this is that edge-triggered mode delivers events only when
           changes occur on the monitored file descriptor.  So, in step 5 the
           caller might end up waiting for some data that is already present
           inside the input buffer.  In the above example, an event on rfd will
           be generated because of the write done in 2 and the event is consumed
           in 3.  Since the read operation done in 4 does not consume the whole
           buffer data, the call to epoll_wait(2) done in step 5 might block
           indefinitely.
    
           An application that employs the EPOLLET flag should use nonblocking
           file descriptors to avoid having a blocking read or write starve a
           task that is handling multiple file descriptors.  The suggested way
           to use epoll as an edge-triggered (EPOLLET) interface is as follows:
    
                  i   with nonblocking file descriptors; and
                  ii  by waiting for an event only after read(2) or write(2)
                      return EAGAIN.
    
           By contrast, when used as a level-triggered interface (the default,
           when EPOLLET is not specified), epoll is simply a faster poll(2), and
           can be used wherever the latter is used since it shares the same
           semantics.

      ”

      从上述例子,我们可以看出:

      1. 在LT模式下,只要某个监听中的文件描述符处于readable/writable状态,无论什么时候进行epoll_wait都会返回该描述符。所以,只要读缓冲区有数据或者写缓冲区仍有空间,那就可读或可写, 都不会因套接字设定为blocking或者non-blocking而导致epoll_wait进入无限期等待;

      2. 在ET模式下,只有某个监听中的文件描述符从unreadable变为readable或从unwritable变为writable时,epoll_wait才会返回该描述符。所以当我们没有把读缓冲区的数据全部读完或者没有把写缓冲区的空间写满就返回,套接字就会一直处于不可读或者不可写的状态,这样read/write会阻塞直到套接字可读或可写,但在这种情况下,套接字不可能变为可读或可写,所以read/write会一直阻塞下去,从而导致epoll_wait无限期阻塞于该套接字而无法返回。所以,要将套接字设定为non-blocking,让epoll_waite可以及时返回。

      下边两图可形象展示LT和ET的区别:

      从socket读数据:
      

      往socket写数据:

      

      所以,在epoll的ET模式下,正确的读写方式为:
      读:只要可读,就一直读,直到返回0,或者 errno = EAGAIN
      写:只要可写,就一直写,直到数据发送完,或者 errno = EAGAIN

    示例

    示例1:回射程序(LT模式)

      这里服务器端程序主要摘自博文IO多路复用之epoll总结,不过修复了部分Bug,具体如下:

      1 #include <stdio.h>
      2 #include <stdlib.h>
      3 #include <string.h>
      4 #include <errno.h>
      5 
      6 #include <netinet/in.h>
      7 #include <sys/socket.h>
      8 #include <arpa/inet.h>
      9 #include <sys/epoll.h>
     10 #include <unistd.h>
     11 #include <sys/types.h>
     12 
     13 #define PORT        9877
     14 #define BUFFERSIZ    1024
     15 #define LISTENQ     1024
     16 #define FDSIZE      1024
     17 #define EPOLLEVENTS 100
     18 
     19 //函数声明
     20 //创建套接字并进行绑定
     21 static int socket_bind(int port);
     22 //IO多路复用epoll
     23 static void do_epoll(int listenfd);
     24 //事件处理函数
     25 static void
     26 handle_events(int epollfd, struct epoll_event *events, int num, int listenfd, char *buf);
     27 //处理接收到的连接
     28 static void handle_accpet(int epollfd, int listenfd);
     29 //读处理
     30 static void do_read(int epollfd, int fd, char *buf);
     31 //写处理
     32 static void do_write(int epollfd, int fd, char *buf);
     33 //添加事件
     34 static void add_event(int epollfd, int fd, int state);
     35 //修改事件
     36 static void modify_event(int epollfd, int fd, int state);
     37 //删除事件
     38 static void delete_event(int epollfd, int fd, int state);
     39 //close socket
     40 void Close(int fd);
     41 
     42 int main(int argc, char *argv[])
     43 {
     44     int  listenfd;
     45     listenfd = socket_bind(PORT);
     46     listen(listenfd, LISTENQ);
     47     do_epoll(listenfd);
     48     exit(0);
     49 }
     50 
     51 static int socket_bind(int port)
     52 {
     53     int  listenfd;
     54     struct sockaddr_in servaddr;
     55     listenfd = socket(AF_INET, SOCK_STREAM, 0);
     56     if (listenfd == -1)
     57     {
     58         perror("socket error:");
     59         exit(1);
     60     }
     61     bzero(&servaddr, sizeof(servaddr));
     62     servaddr.sin_family = AF_INET;
     63     servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
     64     servaddr.sin_port = htons(port);
     65     if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == -1)
     66     {
     67         perror("bind error: ");
     68         exit(1);
     69     }
     70     return listenfd;
     71 }
     72 
     73 static void do_epoll(int listenfd)
     74 {
     75     int epollfd;
     76     struct epoll_event events[EPOLLEVENTS];
     77     int num;
     78     char buf[BUFFERSIZ];
     79     memset(buf, 0, BUFFERSIZ);
     80     //创建一个描述符
     81     epollfd = epoll_create(FDSIZE);
     82     //添加监听描述符事件
     83     add_event(epollfd, listenfd, EPOLLIN);
     84     for ( ; ; )
     85     {
     86         //获取已经准备好的描述符事件
     87         if ((num = epoll_wait(epollfd, events, EPOLLEVENTS, -1)) == -1) 
     88         {
     89             perror("epoll_pwait");
     90             exit(1);
     91         }
     92         handle_events(epollfd, events, num, listenfd, buf);
     93     }
     94     Close(epollfd);
     95 }
     96 
     97 static void
     98 handle_events(int epollfd, struct epoll_event *events, int num, int listenfd, char *buf)
     99 {
    100     int i;
    101     int fd;
    102     // 遍历
    103     for (i = 0; i < num; i++)
    104     {
    105         fd = events[i].data.fd;
    106         //根据描述符的类型和事件类型进行处理
    107         if ((fd == listenfd) && (events[i].events & EPOLLIN))
    108             handle_accpet(epollfd, listenfd);
    109         else if (events[i].events & EPOLLIN)
    110             do_read(epollfd, fd, buf);
    111         else if (events[i].events & EPOLLOUT)
    112             do_write(epollfd, fd, buf);
    113     }
    114 }
    115 static void handle_accpet(int epollfd, int listenfd)
    116 {
    117     int clifd;
    118     struct sockaddr_in cliaddr;
    119     socklen_t  cliaddrlen;
    120     clifd = accept(listenfd, (struct sockaddr*)&cliaddr, &cliaddrlen);
    121     if (clifd == -1)
    122         perror("accpet error:");
    123     else
    124     {
    125         printf("accept a new client: %s:%d
    ", inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port);
    126         //添加一个客户描述符和事件
    127         add_event(epollfd, clifd, EPOLLIN);
    128     }
    129 }
    130 
    131 static void do_read(int epollfd, int fd, char *buf)
    132 {
    133     int nread;
    134     nread = read(fd, buf, BUFFERSIZ);
    135     if (nread == -1)
    136     {
    137         perror("read error:");
    138         //Close(fd);
    139         delete_event(epollfd, fd, EPOLLIN);
    140     }
    141     else if (nread == 0)
    142     {
    143         fprintf(stderr, "client close.
    ");
    144         //Close(fd);
    145         delete_event(epollfd, fd, EPOLLIN);
    146     }
    147     else
    148     {
    149         printf("read message is : %s", buf);
    150         //修改描述符对应的事件,由读改为写
    151         modify_event(epollfd, fd, EPOLLOUT);
    152     }
    153 }
    154 
    155 static void do_write(int epollfd, int fd, char *buf)
    156 {
    157     int nwrite;
    158     nwrite = write(fd, buf, strlen(buf));
    159     if (nwrite == -1)
    160     {
    161         perror("write error:");
    162         //Close(fd);
    163         delete_event(epollfd, fd, EPOLLOUT);
    164     }
    165     else
    166         modify_event(epollfd, fd, EPOLLIN);
    167     memset(buf, 0, BUFFERSIZ);
    168 }
    169 
    170 static void add_event(int epollfd, int fd, int state)
    171 {
    172     struct epoll_event ev;
    173     ev.events = state;
    174     ev.data.fd = fd;
    175     if((epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev)) == -1)
    176     {
    177         perror("epoll_ctl: add");
    178     }
    179 }
    180 
    181 static void delete_event(int epollfd, int fd, int state)
    182 {
    183     struct epoll_event ev;
    184     ev.events = state;
    185     ev.data.fd = fd;
    186     if((epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev)) == -1)
    187     {
    188         perror("epoll_ctl: del");
    189     }
    190     Close(fd);    
    191     // 如果描述符fd已关闭,再从epoll中删除fd,则会出现epoll failed: Bad file descriptor问题
    192     // 所以要先从epoll中删除fd,在关闭fd. 具体可参考博文http://www.cnblogs.com/scw2901/p/3907657.html
    193 }
    194 
    195 static void modify_event(int epollfd, int fd, int state)
    196 {
    197     struct epoll_event ev;
    198     ev.events = state;
    199     ev.data.fd = fd;
    200     if((epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev)) == -1)
    201     {
    202         perror("epoll_ctl: mod");
    203     }
    204 }
    205 
    206 void Close(int fd)
    207 {
    208     if((close(fd)) < 0)
    209     {
    210         perror("close socket error");
    211         exit(1);
    212     }
    213 }
    servepoll

      客户端程序:

      1 #include <netinet/in.h>
      2 #include <sys/socket.h>
      3 #include <stdio.h>
      4 #include <string.h>
      5 #include <stdlib.h>
      6 #include <sys/epoll.h>
      7 #include <time.h>
      8 #include <unistd.h>
      9 #include <sys/types.h>
     10 #include <arpa/inet.h>
     11 #include <errno.h>
     12 
     13 #define BUFFERSIZ    1024
     14 #define SERV_PORT   9877
     15 #define FDSIZE        1024
     16 #define EPOLLEVENTS 100
     17 
     18 void handle_connection(int sockfd);
     19 void handle_events(int epollfd, struct epoll_event *events, int num, int sockfd, char *buf);
     20 void add_event(int epollfd, int fd, int state);
     21 void delete_event(int epollfd, int fd, int state);
     22 void modify_event(int epollfd, int fd, int state);
     23 
     24 ssize_t Read(int fd, void *ptr, size_t nbytes);
     25 void Write(int fd, void *ptr, size_t nbytes);
     26 ssize_t writen(int fd, const void *vptr, size_t n);
     27 void Writen(int fd, void *ptr, size_t nbytes);
     28 
     29 void Close(int fd);
     30 
     31 int main(int argc, char **argv)
     32 {
     33     if(argc != 2)
     34         perror("usage: tcpcli <IPaddress>");
     35     
     36     int sockfd;
     37     struct sockaddr_in  servaddr;
     38     if( (sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) 
     39     {
     40         perror("socket error
    ");
     41         exit(1);
     42     }
     43     
     44     bzero(&servaddr, sizeof(servaddr));
     45     servaddr.sin_family = AF_INET;
     46     servaddr.sin_port = htons(SERV_PORT);
     47     if((inet_pton(AF_INET, argv[1], &servaddr.sin_addr)) <= 0)
     48     {
     49         perror("inet_pton error");
     50         exit(1);
     51     }
     52 
     53     if((connect(sockfd, (struct sockaddr*) &servaddr, sizeof(servaddr))) < 0)
     54     {
     55         perror("connect error
    ");
     56         exit(1);
     57     }
     58     
     59     handle_connection(sockfd);
     60     
     61     Close(sockfd);
     62     exit(0);
     63 }
     64 
     65 void handle_connection(int sockfd)
     66 {
     67     int epollfd;
     68     struct epoll_event events[EPOLLEVENTS];
     69     char buf[BUFFERSIZ];
     70     int num;
     71     epollfd = epoll_create(FDSIZE);
     72     add_event(epollfd, sockfd, EPOLLIN);
     73     add_event(epollfd, STDIN_FILENO, EPOLLIN);
     74     for ( ; ; )
     75     {
     76         num = epoll_wait(epollfd, events, EPOLLEVENTS, -1);
     77         handle_events(epollfd, events, num, sockfd, buf);
     78     }
     79     Close(epollfd);
     80 }
     81 
     82 void handle_events(int epollfd, struct epoll_event *events, int num, int sockfd, char *buf)
     83 {
     84     int fd;
     85     int i;
     86     int stdineof = 0;
     87     for (i = 0; i < num; i++)
     88     {
     89         fd = events[i].data.fd;
     90         if(fd == sockfd)
     91         {
     92             int nread;
     93             nread = Read(sockfd, buf, BUFFERSIZ);
     94             if (nread == 0)
     95             {
     96                 if(stdineof == 1)
     97                     return;
     98                 fprintf(stderr, "server close
    ");
     99                 Close(sockfd);
    100                 Close(epollfd);
    101                 exit(1);
    102             }
    103             Write(fileno(stdout), buf, nread);
    104         }
    105         else
    106         {
    107             int nread;
    108             nread = Read(fd, buf, BUFFERSIZ);
    109             if (nread == 0)
    110             {
    111                 stdineof = 1;
    112                 fprintf(stderr, "no inputs
    ");
    113                 Close(fd);
    114                 continue;
    115             }
    116             modify_event(epollfd, sockfd, EPOLLOUT);
    117             Writen(sockfd, buf, nread);
    118             modify_event(epollfd, sockfd, EPOLLIN);
    119             memset(buf, 0, BUFFERSIZ);
    120         }
    121     }
    122 }
    123 
    124 void add_event(int epollfd, int fd, int state)
    125 {
    126     struct epoll_event ev;
    127     ev.events = state;
    128     ev.data.fd = fd;
    129     if(epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev) == -1)
    130     {
    131         perror("epoll_ctl: add");
    132     }
    133 }
    134 
    135 void delete_event(int epollfd, int fd, int state)
    136 {
    137     struct epoll_event ev;
    138     ev.events = state;
    139     ev.data.fd = fd;
    140     if(epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev) == -1)
    141     {
    142         perror("epoll_ctl: del");
    143     }
    144 }
    145 
    146 void modify_event(int epollfd, int fd, int state)
    147 {
    148     struct epoll_event ev;
    149     ev.events = state;
    150     ev.data.fd = fd;
    151     if(epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev) == -1)
    152     {
    153         perror("epoll_ctl: mod");
    154     }
    155 }
    156 
    157 ssize_t Read(int fd, void *ptr, size_t nbytes)
    158 {
    159     ssize_t n;
    160 
    161     if ( (n = read(fd, ptr, nbytes)) == -1)
    162         perror("read error");
    163     return(n);
    164 }
    165 
    166 void Write(int fd, void *ptr, size_t nbytes)
    167 {
    168     if (write(fd, ptr, nbytes) != nbytes)
    169         perror("write error");
    170 }
    171 
    172 /* Write "n" bytes to a descriptor. */
    173 ssize_t writen(int fd, const void *vptr, size_t n)
    174 {
    175     size_t        nleft;
    176     ssize_t        nwritten;
    177     const char    *ptr;
    178 
    179     ptr = vptr;
    180     nleft = n;
    181     while (nleft > 0) {
    182         if ( (nwritten = write(fd, ptr, nleft)) <= 0) {
    183             if (nwritten < 0 && errno == EINTR)
    184                 nwritten = 0;        /* and call write() again */
    185             else
    186                 return(-1);            /* error */
    187         }
    188 
    189         nleft -= nwritten;
    190         ptr   += nwritten;
    191     }
    192     return(n);
    193 }
    194 
    195 void Writen(int fd, void *ptr, size_t nbytes)
    196 {
    197     if (writen(fd, ptr, nbytes) != nbytes)
    198         perror("writen error");
    199 }
    200 
    201 void Close(int fd)
    202 {
    203     if((close(fd)) < 0)
    204     {
    205         perror("close error");
    206         exit(1);
    207     }
    208 }
    cliepoll

      程序运行截图如下:

      1)客户端主动连接主动关闭

      客户端:

      

      服务器端:

      

      2)客户端主动连接被动关闭

      客户端:

      

      服务器端:

      

    示例2:回射程序(ET模式)

      这里我们只给出服务器端的程序。这部分程序部分参考自博文Epoll在LT和ET模式下的读写方式。具体程序如下:

      1 #include <stdio.h>
      2 #include <stdlib.h>
      3 #include <string.h>
      4 #include <errno.h>
      5 #include <fcntl.h>
      6 
      7 #include <netinet/in.h>
      8 #include <sys/socket.h>
      9 #include <arpa/inet.h>
     10 #include <sys/epoll.h>
     11 #include <unistd.h>
     12 #include <sys/types.h>
     13 
     14 #define PORT        9877
     15 #define BUFFERSIZ    1024
     16 #define LISTENQ     1024
     17 #define FDSIZE      1024
     18 #define EPOLLEVENTS 100
     19 
     20 //函数声明
     21 //创建套接字并进行绑定
     22 int socket_bind(int port);
     23 //IO多路复用epoll
     24 void do_epoll(int listenfd);
     25 //事件处理函数
     26 void handle_events(int epollfd, struct epoll_event *events, int num, int listenfd, char *buf);
     27 //处理接收到的连接
     28 void handle_accpet(int epollfd, int listenfd);
     29 //读处理
     30 void do_read(int epollfd, int fd, char *buf);
     31 //写处理
     32 void do_write(int epollfd, int fd, char *buf);
     33 
     34 //设置socket连接为非阻塞模式
     35 void setnonblocking(int sockfd);
     36 //close socket
     37 void Close(int fd);
     38 
     39 int main(int argc, char *argv[])
     40 {
     41     int  listenfd;
     42     listenfd = socket_bind(PORT);
     43     listen(listenfd, LISTENQ);
     44     do_epoll(listenfd);
     45     exit(0);
     46 }
     47 
     48 int socket_bind(int port)
     49 {
     50     int  listenfd;
     51     struct sockaddr_in servaddr;
     52     listenfd = socket(AF_INET, SOCK_STREAM, 0);
     53     if (listenfd == -1)
     54     {
     55         perror("socket error:");
     56         exit(1);
     57     }
     58     setnonblocking(listenfd);
     59     bzero(&servaddr, sizeof(servaddr));
     60     servaddr.sin_family = AF_INET;
     61     servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
     62     servaddr.sin_port = htons(port);
     63     if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == -1)
     64     {
     65         perror("bind error: ");
     66         exit(1);
     67     }
     68     return listenfd;
     69 }
     70 
     71 void do_epoll(int listenfd)
     72 {
     73     int epollfd;
     74     struct epoll_event ev;
     75     struct epoll_event events[EPOLLEVENTS];
     76     int num;
     77     char buf[BUFFERSIZ];
     78     memset(buf, 0, BUFFERSIZ);
     79     //创建一个描述符
     80     epollfd = epoll_create(FDSIZE);
     81     //添加监听描述符事件
     82     ev.events = EPOLLIN;
     83     ev.data.fd = listenfd;
     84     if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &ev) == -1) 
     85     {
     86         perror("epoll_ctl: add");
     87         exit(1);
     88     }
     89 
     90     for ( ; ; )
     91     {
     92         //获取已经准备好的描述符事件
     93         if ((num = epoll_wait(epollfd, events, EPOLLEVENTS, -1)) == -1) 
     94         {
     95             perror("epoll_wait");
     96             exit(1);
     97         }
     98         handle_events(epollfd, events, num, listenfd, buf);
     99     }
    100     Close(epollfd);
    101 }
    102 
    103 void handle_events(int epollfd, struct epoll_event *events, int num, int listenfd, char *buf)
    104 {
    105     int i;
    106     int fd;
    107     // 遍历
    108     for (i = 0; i < num; i++)
    109     {
    110         fd = events[i].data.fd;
    111         //根据描述符的类型和事件类型进行处理
    112         if ((fd == listenfd) && (events[i].events & EPOLLIN))
    113             handle_accpet(epollfd, listenfd);
    114         else if (events[i].events & EPOLLIN)
    115             do_read(epollfd, fd, buf);
    116         else if (events[i].events & EPOLLOUT)
    117             do_write(epollfd, fd, buf);
    118     }
    119 }
    120 
    121 void handle_accpet(int epollfd, int listenfd)
    122 {
    123     int connfd;
    124     struct sockaddr_in cliaddr;
    125     socklen_t  cliaddrlen;
    126     struct epoll_event ev;
    127     while ((connfd = accept(listenfd, (struct sockaddr *) &cliaddr, &cliaddrlen)) > 0) 
    128     {
    129         printf("accept a new client: %s:%d
    ", inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port);
    130         setnonblocking(connfd);
    131         ev.events = EPOLLIN | EPOLLET;
    132         ev.data.fd = connfd;
    133         if (epoll_ctl(epollfd, EPOLL_CTL_ADD, connfd, &ev) == -1) 
    134         {
    135             perror("epoll_ctl: add");
    136             exit(1);
    137         }
    138     }
    139     if (connfd == -1) 
    140     {
    141         if (errno != EAGAIN && errno != ECONNABORTED 
    142                 && errno != EPROTO && errno != EINTR) 
    143             perror("accept");
    144     }
    145 }
    146 
    147 void do_read(int epollfd, int fd, char *buf)
    148 {
    149     int nread;
    150     int n = 0;
    151     struct epoll_event ev;
    152     while ((nread = read(fd, buf + n, BUFFERSIZ-1)) > 0) 
    153     {
    154         n += nread;
    155     }
    156     if (nread == -1 && errno != EAGAIN) 
    157     {
    158         perror("read error");
    159         
    160         ev.data.fd = fd;
    161         ev.events = EPOLLIN | EPOLLET;
    162         if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev) == -1) 
    163         {
    164             perror("epoll_ctl: mod");
    165         }
    166         Close(fd);
    167     }
    168     else if(nread == 0)
    169     {
    170         perror("client close");
    171         
    172         ev.data.fd = fd;
    173         ev.events = EPOLLIN | EPOLLET;
    174         if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev) == -1) 
    175         {
    176             perror("epoll_ctl: mod");
    177         }
    178         Close(fd);
    179     }
    180     else
    181     {
    182         printf("read message is : %s", buf); 
    183         ev.data.fd = fd;
    184         ev.events = EPOLLOUT | EPOLLET;
    185         if (epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev) == -1) 
    186         {
    187             perror("epoll_ctl: mod");
    188         }
    189     }
    190 }
    191 
    192 void do_write(int epollfd, int fd, char *buf)
    193 {
    194     int nwrite, data_size = strlen(buf);
    195     int n = data_size;
    196     struct epoll_event ev;
    197     int flag = 0;
    198     while (n > 0) 
    199     {
    200         nwrite = write(fd, buf + data_size - n, n);
    201         if (nwrite < n) 
    202         {
    203             if (nwrite == -1 && errno != EAGAIN) 
    204             {
    205                 flag = 1;
    206                 perror("write error");
    207     
    208                 ev.data.fd = fd;
    209                 ev.events = EPOLLOUT | EPOLLET;
    210                 if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev) == -1) 
    211                 {
    212                     perror("epoll_ctl: mod");
    213                 }
    214                 Close(fd);
    215             }
    216             break;
    217         }
    218         n -= nwrite;
    219     }
    220     
    221     if(flag != 1)
    222     {
    223         ev.data.fd = fd;
    224         ev.events =  EPOLLIN | EPOLLET;
    225         if (epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev) == -1) 
    226         {
    227             perror("epoll_ctl: mod");
    228         }
    229     }
    230     // 这句很重要
    231     memset(buf, 0, BUFFERSIZ);
    232 }
    233 
    234 void setnonblocking(int sockfd) 
    235 {
    236     int opts;
    237     opts = fcntl(sockfd, F_GETFL);
    238     if(opts < 0) 
    239     {
    240         perror("Error: fcntl(F_GETFL)
    ");
    241         exit(1);
    242     }
    243     opts = (opts | O_NONBLOCK);
    244     if(fcntl(sockfd, F_SETFL, opts) < 0) 
    245     {
    246         perror("Error: fcntl(F_SETFL)
    ");
    247         exit(1);
    248     }
    249 }
    250 
    251 void Close(int fd)
    252 {
    253     if((close(fd)) < 0)
    254     {
    255         perror("close socket error");
    256         exit(1);
    257     }
    258 }
    servepoll2

    参考资料

      IO多路复用之epoll总结

      Epoll在LT和ET模式下的读写方式

      epoll精髓

      Linux Epoll介绍和程序实例

    特附博文Epoll在LT和ET模式下的读写方式

      在一个非阻塞的socket上调用read/write函数,返回EAGAIN或者EWOULDBLOCK(注: EAGAIN就是EWOULDBLOCK)。从字面上看,EAGAIN,再试一次;EWOULDBLOCK,如果这是一个阻塞socket,操作将被block,perror输出: Resource temporarily unavailable。

      总结:
      这个错误表示资源暂时不够,能read时,读缓冲区没有数据,或者write时,写缓冲区满了。遇到这种情况,如果是阻塞socket,read/write就要阻塞掉;而如果是非阻塞socket,read/write则立即返回-1, 同时errno设置为EAGAIN。
      所以,对于阻塞socket,read/write返回-1代表网络出错了。但对于非阻塞socket,read/write返回-1不一定网络真的出错了。可能是Resource temporarily unavailable。这时你应该再试,直到Resource available。

      综上,对于non-blocking的socket,正确的读写操作为:
      读:忽略掉errno = EAGAIN的错误,下次继续读
      写:忽略掉errno = EAGAIN的错误,下次继续写

      对于select和epoll的LT模式,这种读写方式是没有问题的。但对于epoll的ET模式,这种方式还有漏洞。

      epoll的两种模式LT和ET
      二者的差异在于level-trigger模式下只要某个socket处于readable/writable状态,无论什么时候进行epoll_wait都会返回该socket;而edge-trigger模式下只有某个socket从unreadable变为readable或从unwritable变为writable时,epoll_wait才会返回该socket。

      所以,在epoll的ET模式下,正确的读写方式为:

      读:只要可读,就一直读,直到返回0,或者 errno = EAGAIN
      写:只要可写,就一直写,直到数据发送完,或者 errno = EAGAIN

      正确的读

    1 n = 0;
    2 while ((nread = read(fd, buf + n, BUFSIZ-1)) > 0) {
    3     n += nread;
    4 }
    5 if (nread == -1 && errno != EAGAIN) {
    6     perror("read error");
    7 }

      正确的写

     1 int nwrite, data_size = strlen(buf);
     2 n = data_size;
     3 while (n > 0) {
     4     nwrite = write(fd, buf + data_size - n, n);
     5     if (nwrite < n) {
     6         if (nwrite == -1 && errno != EAGAIN) {
     7             perror("write error");
     8         }
     9         break;
    10     }
    11     n -= nwrite;
    12 }

      正确的accept,accept 要考虑 2 个问题
      1)阻塞模式 accept 存在的问题
      考虑这种情况:TCP连接被客户端夭折,即在服务器调用accept之前,客户端主动发送RST终止连接,导致刚刚建立的连接从就绪队列中移出,如果套接口被设置成阻塞模式,服务器就会一直阻塞在accept调用上,直到其他某个客户建立一个新的连接为止。但是在此期间,服务器单纯地阻塞在accept调用上,就绪队列中的其他描述符都得不到处理。

      解决办法是把监听套接口设置为非阻塞,当客户在服务器调用accept之前中止某个连接时,accept调用可以立即返回-1,这时源自Berkeley的实现会在内核中处理该事件,并不会将该事件通知给epool,而其他实现把errno设置为ECONNABORTED或者EPROTO错误,我们应该忽略这两个错误。

      2)ET模式下accept存在的问题
      考虑这种情况:多个连接同时到达,服务器的TCP就绪队列瞬间积累多个就绪连接,由于是边缘触发模式,epoll只会通知一次,accept只处理一个连接,导致TCP就绪队列中剩下的连接都得不到处理。

      解决办法是用while循环抱住accept调用,处理完TCP就绪队列中的所有连接后再退出循环。如何知道是否处理完就绪队列中的所有连接呢?accept返回-1并且errno设置为EAGAIN就表示所有连接都处理完。

      综合以上两种情况,服务器应该使用非阻塞地accept,accept在ET模式下的正确使用方式为:

    1 while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote, (size_t *)&addrlen)) > 0) {
    2     handle_client(conn_sock);
    3 }
    4 if (conn_sock == -1) {
    5     if (errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR)
    6     perror("accept");
    7 }

      一道腾讯后台开发的面试题
      使用Linux epoll模型,水平触发模式;当socket可写时,会不停的触发socket可写的事件,如何处理?

      第一种最普遍的方式:
      需要向socket写数据的时候才把socket加入epoll,等待可写事件。
      接受到可写事件后,调用write或者send发送数据。
      当所有数据都写完后,把socket移出epoll。

      这种方式的缺点是,即使发送很少的数据,也要把socket加入epoll,写完后在移出epoll,有一定操作代价。

      一种改进的方式:
      开始不把socket加入epoll,需要向socket写数据的时候,直接调用write或者send发送数据。如果返回EAGAIN,把socket加入epoll,在epoll的驱动下写数据,全部数据发送完毕后,再移出epoll。

      这种方式的优点是:数据不多的时候可以避免epoll的事件处理,提高效率。

      最后贴一个使用epoll、ET模式的简单HTTP服务器代码:

      1 #include <sys/socket.h>
      2 #include <sys/wait.h>
      3 #include <netinet/in.h>
      4 #include <netinet/tcp.h>
      5 #include <sys/epoll.h>
      6 #include <sys/sendfile.h>
      7 #include <sys/stat.h>
      8 #include <unistd.h>
      9 #include <stdio.h>
     10 #include <stdlib.h>
     11 #include <string.h>
     12 #include <strings.h>
     13 #include <fcntl.h>
     14 #include <errno.h> 
     15 
     16 #define MAX_EVENTS 10
     17 #define PORT 8080
     18 
     19 //设置socket连接为非阻塞模式
     20 void setnonblocking(int sockfd) {
     21     int opts;
     22 
     23     opts = fcntl(sockfd, F_GETFL);
     24     if(opts < 0) {
     25         perror("fcntl(F_GETFL)
    ");
     26         exit(1);
     27     }
     28     opts = (opts | O_NONBLOCK);
     29     if(fcntl(sockfd, F_SETFL, opts) < 0) {
     30         perror("fcntl(F_SETFL)
    ");
     31         exit(1);
     32     }
     33 }
     34 
     35 int main(){
     36     struct epoll_event ev, events[MAX_EVENTS];
     37     int addrlen, listenfd, conn_sock, nfds, epfd, fd, i, nread, n;
     38     struct sockaddr_in local, remote;
     39     char buf[BUFSIZ];
     40 
     41     //创建listen socket
     42     if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
     43         perror("sockfd
    ");
     44         exit(1);
     45     }
     46     setnonblocking(listenfd);
     47     bzero(&local, sizeof(local));
     48     local.sin_family = AF_INET;
     49     local.sin_addr.s_addr = htonl(INADDR_ANY);;
     50     local.sin_port = htons(PORT);
     51     if( bind(listenfd, (struct sockaddr *) &local, sizeof(local)) < 0) {
     52         perror("bind
    ");
     53         exit(1);
     54     }
     55     listen(listenfd, 20);
     56 
     57     epfd = epoll_create(MAX_EVENTS);
     58     if (epfd == -1) {
     59         perror("epoll_create");
     60         exit(EXIT_FAILURE);
     61     }
     62 
     63     ev.events = EPOLLIN;
     64     ev.data.fd = listenfd;
     65     if (epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev) == -1) {
     66         perror("epoll_ctl: listen_sock");
     67         exit(EXIT_FAILURE);
     68     }
     69 
     70     for (;;) {
     71         nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);
     72         if (nfds == -1) {
     73             perror("epoll_pwait");
     74             exit(EXIT_FAILURE);
     75         }
     76 
     77         for (i = 0; i < nfds; ++i) {
     78             fd = events[i].data.fd;
     79             if (fd == listenfd) {
     80                 while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote, 
     81                                 (size_t *)&addrlen)) > 0) {
     82                     setnonblocking(conn_sock);
     83                     ev.events = EPOLLIN | EPOLLET;
     84                     ev.data.fd = conn_sock;
     85                     if (epoll_ctl(epfd, EPOLL_CTL_ADD, conn_sock,
     86                                 &ev) == -1) {
     87                         perror("epoll_ctl: add");
     88                         exit(EXIT_FAILURE);
     89                     }
     90                 }
     91                 if (conn_sock == -1) {
     92                     if (errno != EAGAIN && errno != ECONNABORTED 
     93                             && errno != EPROTO && errno != EINTR) 
     94                         perror("accept");
     95                 }
     96                 continue;
     97             }  
     98             if (events[i].events & EPOLLIN) {
     99                 n = 0;
    100                 while ((nread = read(fd, buf + n, BUFSIZ-1)) > 0) {
    101                     n += nread;
    102                 }
    103                 if (nread == -1 && errno != EAGAIN) {
    104                     perror("read error");
    105                 }
    106                 ev.data.fd = fd;
    107                 ev.events = events[i].events | EPOLLOUT;
    108                 if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) == -1) {
    109                     perror("epoll_ctl: mod");
    110                 }
    111             }
    112             if (events[i].events & EPOLLOUT) {
    113                 sprintf(buf, "HTTP/1.1 200 OK
    Content-Length: %d
    
    Hello World", 11);
    114                 int nwrite, data_size = strlen(buf);
    115                 n = data_size;
    116                 while (n > 0) {
    117                     nwrite = write(fd, buf + data_size - n, n);
    118                     if (nwrite < n) {
    119                         if (nwrite == -1 && errno != EAGAIN) {
    120                             perror("write error");
    121                         }
    122                         break;
    123                     }
    124                     n -= nwrite;
    125                 }
    126                 close(fd);
    127             }
    128         }
    129     }
    130 
    131     return 0;
    132 }
    View Code
  • 相关阅读:
    SecureRandom
    《Head First 设计模式》[02] 观察者模式
    《MySQL必知必会》[07] 管理事务处理
    《MySQL必知必会》[06] 触发器
    《MySQL必知必会》[05] 存储过程和游标
    Centos7安装Nginx
    IDEA配置Tomcat
    Java小功能大杂烩
    Java处理中文乱码问题
    Java邮件发送
  • 原文地址:https://www.cnblogs.com/xiehongfeng100/p/4636118.html
Copyright © 2011-2022 走看看