zoukankan      html  css  js  c++  java
  • 基本套接字编程(5) -- epoll篇

    1. epoll技术

    epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。
    相比于select,epoll最大的好处在于它不会随着监听fd数目的增长而降低效率。因为在内核中的select实现中,它是采用轮询来处理的,轮询的fd数目越多,自然耗时越多。并且,在linux/posix_types.h头文件有这样的声明:
    #define __FD_SETSIZE    1024
    表示select最多同时监听1024个fd,当然,可以通过修改头文件再重编译内核来扩大这个数目,但这似乎并不治本。

    2. epoll系统调用

    epoll技术的接口实现非常简单,相关系统调用:
    • epoll_create()  or  epoll_create1
    • epoll_ctl()
    • epoll_wait()
    下面详细阐述其系统调用函数。

    2.1 句柄创建 -- epoll_create()

    #include <sys/epoll.h>
    
    int epoll_create(int size);
    int epoll_create1(int flags);
    <span style="white-space:pre">				</span>返回值:epoll句柄
    首先:epoll_create()创建一个epoll的事例,通知内核需要监听size个fd。size指的并不是最大的后备存储设备,而是衡量内核内部结构大小的一个提示。当创建成功后,会占用一个fd,所以记得在使用完之后调用close(),否则fd可能会被耗尽。
    Note:自从Linux2.6.8版本以后,size值其实是没什么用的,不过要大于0,因为内核可以动态的分配大小,所以不需要size这个提示了。

    其次:epoll_create1()函数,其实它和epoll_create差不多,不同的是epoll_create1函数的参数是flag;
    • 当flag是0时,表示和epoll_create函数完全一样,不需要size的提示了;
    • 当flag = EPOLL_CLOEXEC,创建的epfd会设置FD_CLOEXEC;
    • 当flag = EPOLL_NONBLOCK,创建的epfd会设置为非阻塞。
    一般用法都是使用EPOLL_CLOEXEC。
    Note:关于FD_CLOEXEC,它是fd的一个标识说明,用来设置文件close-on-exec状态的。当close-on-exec状态为0时,调用exec时,fd不会被关闭;状态非零时则会被关闭,这样做可以防止fd泄露给执行exec后的进程。

    2.2 事件注册 -- epoll_ctl()

     #include <sys/epoll.h>
    
     int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
                                        返回值:成功返回0 失败返回-1,errno设置为错误码
    

      epoll的事件注册函数,它不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。

    • 第一个参数是epoll_create()的返回值;
    • 第二个参数表示动作,用三个宏来表示:
    EPOLL_CTL_ADD:注册新的fd到epfd中;
    EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
    EPOLL_CTL_DEL:从epfd中删除一个fd;
    • 第三个参数是需要监听的fd。
    • 第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:
     typedef union epoll_data {
       void        *ptr;
       int          fd;
       uint32_t     u32;
       uint64_t     u64;
     } epoll_data_t;
    
     struct epoll_event {
      uint32_t     events;      /* Epoll events */
      epoll_data_t data;        /* User data variable */
     };
    

      

    events可以是以下几个宏的集合:
    • EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
    • EPOLLOUT:表示对应的文件描述符可以写;
    • EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
    • EPOLLERR:表示对应的文件描述符发生错误;
    • EPOLLHUP:表示对应的文件描述符被挂断;
    • EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的
    • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
    2.3 等待事件 -- epoll_wait()
     #include <sys/epoll.h>
    
     int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
     int epoll_pwait(int epfd, struct epoll_event *events,int maxevents, int timeout,const sigset_t *sigmask);
                        返回值:返回值>0代表链接描述符个数;=0代表无链接;-1代表失败,且errno设置为错误码
    

      收集在epoll监控的事件中已经发送的事件。

    • 第一个参数:表示epoll_wait等待epfd上的事件;
    • 第二个参数:参数events是分配好的epoll_event结构体数组,epoll将会把发生的事件赋值到events数组中(events不可以是空指针,内核只负责把数据复制到这个events数组中,不会去帮助我们在用户态中分配内存);
    • 第三个参数:maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size;
    • 第四个参数:timeout表示超时时间(单位:毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)
    epoll_pwait(since linux 2.6.19)允许一个应用程序安全的等待,直到fd设备准备就绪,或者捕获到一个信号量。其中sigmask表示要捕获的信号量。

    3. epoll工作原理

    epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。
    另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

    3.1 epoll两种工作模式

    LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。
    ET (edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once),不过在TCP协议中,ET模式的加速效用仍需要更多的benchmark确认。
    ET和LT的区别就在这里体现,LT事件不会丢弃,而是只要读buffer里面有数据可以让用户读,则不断的通知你。而ET则只在事件发生之时通知。可以简单理解为LT是水平触发,而ET则为边缘触发。LT模式只要有事件未处理就会触发,而ET则只在高低电平变换时(即状态从1到0或者0到1)触发。
    下图阐释LT与ET的工作方式:

    3.2 epoll优势

    1. 支持一个进程打开大数目的socket描述符(FD)
        select 最不能忍受的是一个进程所打开的FD是有一定限制的,由FD_SETSIZE设置,默认值是2048。对于那些需要支持的上万连接数目的IM服务器来说显然太少了。这时候你一是可以选择修改这个宏然后重新编译内核,不过资料也同时指出这样会带来网络效率的下降,二是可以选择多进程的解决方案(传统的 Apache方案),不过虽然linux上面创建进程的代价比较小,但仍旧是不可忽视的,加上进程间数据同步远比不上线程间同步的高效,所以也不是一种完美的方案。不过 epoll则没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。
    2. IO效率不随FD数目增加而线性下降
        传统的select/poll另一个致命弱点就是当你拥有一个很大的socket集合,不过由于网络延时,任一时间只有部分的socket是"活跃"的,但是select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。但是epoll不存在这个问题,它只会对"活跃"的socket进行操作---这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的。那么,只有"活跃"的socket才会主动的去调用 callback函数,其他idle状态socket则不会,在这点上,epoll实现了一个"伪"AIO,因为这时候推动力在os内核。在一些 benchmark中,如果所有的socket基本上都是活跃的---比如一个高速LAN环境,epoll并不比select/poll有什么效率,相反,如果过多使用epoll_ctl,效率相比还有稍微的下降。但是一旦使用idle connections模拟WAN环境,epoll的效率就远在select/poll之上了。
    3. 使用mmap加速内核与用户空间的消息传递
        这点实际上涉及到epoll的具体实现了。无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存拷贝就很重要,在这点上,epoll是通过内核于用户空间mmap同一块内存实现的。而如果你想我一样从2.5内核就关注epoll的话,一定不会忘记手工 mmap这一步的。
    4. 内核微调
    这一点其实不算epoll的优点了,而是整个linux平台的优点。也许你可以怀疑linux平台,但是你无法回避linux平台赋予你微调内核的能力。比如,内核TCP/IP协议栈使用内存池管理sk_buff结构,那么可以在运行时期动态调整这个内存pool(skb_head_pool)的大小--- 通过echo XXXX>/proc/sys/net/core/hot_list_length完成。再比如listen函数的第2个参数(TCP完成3次握手的数据包队列长度),也可以根据你平台内存大小动态调整。更甚至在一个数据包面数目巨大但同时每个数据包本身大小却很小的特殊系统上尝试最新的NAPI网卡驱动架构。
     
    linux下epoll如何实现高效处理百万句柄的
    开发高性能网络程序时,windows开发者们言必称iocp,linux开发者们则言必称epoll。大家都明白epoll是一种IO多路复用技术,可以非常高效的处理数以百万计的socket句柄,比起以前的select和poll效率高大发了。我们用起epoll来都感觉挺爽,确实快,那么,它到底为什么可以高速处理这么多并发连接呢?
    使用起来很清晰,首先要调用epoll_create建立一个epoll对象。参数size是内核保证能够正确处理的最大句柄数,多于这个最大数时内核可不保证效果。
    epoll_ctl可以操作上面建立的epoll,例如,将刚建立的socket加入到epoll中让其监控,或者把 epoll正在监控的某个socket句柄移出epoll,不再监控它等等。
    epoll_wait在调用时,在给定的timeout时间内,当在监控的所有句柄中有事件发生时,就返回用户态的进程。
    从上面的调用方式就可以看到epoll比select/poll的优越之处:因为后者每次调用时都要传递你所要监控的所有socket给select/poll系统调用,这意味着需要将用户态的socket列表copy到内核态,如果以万计的句柄会导致每次都要copy几十几百KB的内存到内核态,非常低效。而我们调用epoll_wait时就相当于以往调用select/poll,但是这时却不用传递socket句柄给内核,因为内核已经在epoll_ctl中拿到了要监控的句柄列表。
    所以,实际上在你调用epoll_create后,内核就已经在内核态开始准备帮你存储要监控的句柄了,每次调用epoll_ctl只是在往内核的数据结构里塞入新的socket句柄。
     当一个进程调用epoll_creaqte方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关:


    4. TCP回射程序实例

    4.1 server.c

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <errno.h>
    #include <unistd.h>
    #include <time.h>
    #include <sys/socket.h>
    #include <sys/epoll.h>
    #include <sys/types.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <netdb.h>
    #include <fcntl.h>
    
    
    #ifndef CONNECT_SIZE
    #define CONNECT_SIZE 256
    #endif
    
    #define PORT 7777
    #define MAX_LINE 2048
    #define LISTENQ 20
    
    void setNonblocking(int sockfd)
    {
        int opts;
        opts=fcntl(sockfd,F_GETFL);
        if(opts<0)
        {
            perror("fcntl(sock,GETFL)");
            return;
        }//if
    
        opts = opts|O_NONBLOCK;
        if(fcntl(sockfd,F_SETFL,opts)<0)
        {
             perror("fcntl(sock,SETFL,opts)");
            return;
        }//if
    }
    
    int main(int argc , char **argv)
    {
        int i, listenfd, connfd, sockfd, epfd, nfds;
    
        ssize_t n, ret;
            
        char buf[MAX_LINE];
    
        socklen_t clilen;
    
        struct sockaddr_in servaddr , cliaddr;
    
        /*声明epoll_event结构体变量,ev用于注册事件,数组用于回传要处理的事件*/
        struct epoll_event ev, events[20];
    
        /*(1) 得到监听描述符*/
        listenfd = socket(AF_INET , SOCK_STREAM , 0);
        setNonblocking(listenfd);
    
        /*生成用于处理accept的epoll专用文件描述符*/    
        epfd = epoll_create(CONNECT_SIZE);
        /*设置监听描述符*/
        ev.data.fd = listenfd;
        /*设置处理事件类型*/
        ev.events = EPOLLIN | EPOLLET;
        /*注册事件*/
        epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);        
    
        /*(2) 绑定套接字*/    
        bzero(&servaddr , sizeof(servaddr));
        servaddr.sin_family = AF_INET;
        servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
        servaddr.sin_port = htons(PORT);
    
        bind(listenfd , (struct sockaddr *)&servaddr , sizeof(servaddr));
    
        /*(3) 监听*/
        listen(listenfd , LISTENQ);
    
        /*(4) 进入服务器接收请求死循环*/
        while(1)
        {
            /*等待事件发生*/
            nfds = epoll_wait(epfd , events , CONNECT_SIZE , -1);
            if(nfds <= 0)
                continue;
        
            printf("nfds = %d
    " , nfds);
            /*处理发生的事件*/
            for(i=0 ; i<nfds ; ++i)
            {
                /*检测到用户链接*/
                if(events[i].data.fd == listenfd)
                {    
                    /*接收客户端的请求*/
                    clilen = sizeof(cliaddr);
    
                    if((connfd = accept(listenfd , (struct sockaddr *)&cliaddr , &clilen)) < 0)
                    {
                        perror("accept error.
    ");
                        exit(1);
                    }//if        
    
                    printf("accpet a new client: %s:%d
    ", inet_ntoa(cliaddr.sin_addr) , cliaddr.sin_port);
                
                    /*设置为非阻塞*/
                    setNonblocking(connfd);
                    ev.data.fd = connfd;
                    ev.events = EPOLLIN | EPOLLET;
                    epoll_ctl(epfd , EPOLL_CTL_ADD , connfd , &ev);
                }//if
                /*如果是已链接用户,并且收到数据,进行读入*/
                else if(events[i].events & EPOLLIN){
    
                    if((sockfd = events[i].data.fd) < 0)
                        continue;
                    bzero(buf , MAX_LINE);
                    printf("reading the socket~~~
    ");
                    if((n = read(sockfd , buf , MAX_LINE)) <= 0)
                    {
                        close(sockfd);
                        events[i].data.fd = -1;
                    }//if
                    else{
                        buf[n] = '';
                        printf("clint[%d] send message: %s
    ", i , buf);
                    
                        /*设置用于注册写操作文件描述符和事件*/
                        ev.data.fd = sockfd;
                        ev.events = EPOLLOUT| EPOLLET;    
                        epoll_ctl(epfd , EPOLL_CTL_MOD , sockfd , &ev);            
                    }//else                                            
                }//else
                else if(events[i].events & EPOLLOUT)
                {
                    if((sockfd = events[i].data.fd) < 0)
                    continue;
                    if((ret = write(sockfd , buf , n)) != n)    
                    {
                        printf("error writing to the sockfd!
    ");
                        break;
                    }//if
                    /*设置用于读的文件描述符和事件*/
                    ev.data.fd = sockfd;
                    ev.events = EPOLLIN | EPOLLET;
                    /*修改*/
                    epoll_ctl(epfd , EPOLL_CTL_MOD , sockfd , &ev);                
                }//else
            }//for
        }//while
        free(events);
        close(epfd);
        exit(0);
    }

    4.2 client.c

      1 #include <stdio.h>
      2 #include <stdlib.h>
      3 #include <string.h>
      4 #include <errno.h>
      5 #include <unistd.h>
      6 #include <time.h>
      7 #include <sys/socket.h>
      8 #include <sys/select.h>
      9 #include <sys/types.h>
     10 #include <netinet/in.h>
     11 #include <arpa/inet.h>
     12 #include <netdb.h>
     13 #include <fcntl.h>
     14 
     15 #define PORT 7777
     16 #define MAX_LINE 2048
     17 
     18 int max(int a , int b)
     19 {
     20     return a > b ? a : b;
     21 }
     22 
     23 /*readline函数实现*/
     24 ssize_t readline(int fd, char *vptr, size_t maxlen)
     25 {
     26     ssize_t    n, rc;
     27     char    c, *ptr;
     28 
     29     ptr = vptr;
     30     for (n = 1; n < maxlen; n++) {
     31         if ( (rc = read(fd, &c,1)) == 1) {
     32             *ptr++ = c;
     33             if (c == '
    ')
     34                 break;    /* newline is stored, like fgets() */
     35         } else if (rc == 0) {
     36             *ptr = 0;
     37             return(n - 1);    /* EOF, n - 1 bytes were read */
     38         } else
     39             return(-1);        /* error, errno set by read() */
     40     }
     41 
     42     *ptr = 0;    /* null terminate like fgets() */
     43     return(n);
     44 }
     45 
     46 /*普通客户端消息处理函数*/
     47 void str_cli(int sockfd)
     48 {
     49     /*发送和接收缓冲区*/
     50     char sendline[MAX_LINE] , recvline[MAX_LINE];
     51     while(fgets(sendline , MAX_LINE , stdin) != NULL)    
     52     {
     53         write(sockfd , sendline , strlen(sendline));
     54 
     55         bzero(recvline , MAX_LINE);
     56         if(readline(sockfd , recvline , MAX_LINE) == 0)
     57         {
     58             perror("server terminated prematurely");
     59             exit(1);
     60         }//if
     61 
     62         if(fputs(recvline , stdout) == EOF)
     63         {
     64             perror("fputs error");
     65             exit(1);
     66         }//if
     67 
     68         bzero(sendline , MAX_LINE);
     69     }//while
     70 }
     71 
     72 int main(int argc , char **argv)
     73 {
     74     /*声明套接字和链接服务器地址*/
     75     int sockfd;
     76     struct sockaddr_in servaddr;
     77 
     78     /*判断是否为合法输入*/
     79     if(argc != 2)
     80     {
     81         perror("usage:tcpcli <IPaddress>");
     82         exit(1);
     83     }//if
     84 
     85     /*(1) 创建套接字*/
     86     if((sockfd = socket(AF_INET , SOCK_STREAM , 0)) == -1)
     87     {
     88         perror("socket error");
     89         exit(1);
     90     }//if
     91 
     92     /*(2) 设置链接服务器地址结构*/
     93     bzero(&servaddr , sizeof(servaddr));
     94     servaddr.sin_family = AF_INET;
     95     servaddr.sin_port = htons(PORT);
     96     if(inet_pton(AF_INET , argv[1] , &servaddr.sin_addr) < 0)
     97     {
     98         printf("inet_pton error for %s
    ",argv[1]);
     99         exit(1);
    100     }//if
    101 
    102     /*(3) 发送链接服务器请求*/
    103     if(connect(sockfd , (struct sockaddr *)&servaddr , sizeof(servaddr)) < 0)
    104     {
    105         perror("connect error");
    106         exit(1);
    107     }//if
    108 
    109     /*调用消息处理函数*/
    110     str_cli(sockfd);    
    111     exit(0);
    112 }

    4.3 运行结果

    server端:
    client端:





  • 相关阅读:
    原创js脚本实现百度网盘任意文件强制下载
    百度网盘大文件直接下载与下载提速的简单方法
    MVP架构的一个小例子
    PCQQ
    WPF
    百度网盘的限速破解
    何德何能?
    Maven多模块+SpringBoot 编译失败:程序包xxx不存在
    将Spring Boot项目部署到阿里云服务器上(二、安装MySQL服务器)
    将Spring Boot项目部署到阿里云服务器上(一、安装JDK)
  • 原文地址:https://www.cnblogs.com/shine-yr/p/5214705.html
Copyright © 2011-2022 走看看