zoukankan      html  css  js  c++  java
  • linuxepoll研究

       做linux网络编程的同学都清楚,2.6版本以前的linux内核大多都是用select作为非阻塞的事件触发模型,但是效率低,使用受限已经很明显的暴露了select(包括poll)的缺陷了,为了解决这些缺陷,epoll作为linux新的事件触发模型被创造出来。

    一、epoll相对于select的优点:

      1.支持一个进程socket描述符(FD)的最大数目

        select支持的单进程socket描述符最大数目只有几千,而epoll支持的数目很大,等于系统最大打开的文件描述符数,这个文件描述符数跟内存有一定关系

      2.IO效率不随FD数目增加而线性下降

        select对事件的扫描是针对于所有创建的socket描述符进行的,也就是说,有多少个socket描述符,就需要遍历多少个句柄,所以IO效率是随描述符增加线性下降的;而epoll只遍历活跃的socket描述符,这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的。那么,只有"活跃"的socket才会主动的去调用 callback函数,其他idle状态socket则不会。比如一个高速LAN环境,epoll并不比select/poll有什么效率,相 反,如果过多使用epoll_ctl,效率相比还有稍微的下降但是一旦使用idle connections模拟WAN环境,epoll的效率就远在select/poll之上了

        3.使用mmap加速内核与用户空间的消息传递

        select事件触发后会将信息从内核拷贝到用户空间,这种拷贝就影响了效率。而mmap将内核与用户空间的内存映射到一块内存上,内核将消息捕获后放入该内存空间,用户无需拷贝直接可以访问,减少了拷贝次数,提高了效率。

     二、epoll工作模型

    epoll事件有两种模型:
    Edge Triggered (ET),边缘触发是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once),不过在TCP协议中,ET模式的加速效用仍需要更多的benchmark确认。效率非常高,在并发,大流量的情况下,会比LT少很多epoll的系统调用,因此效率高。但是对编程要求高,需要细致的处理每个请求,否则容易发生丢失事件的情况。
    Level Triggered (LT),水平触发是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。效率会低于ET触发,尤其在大并发,大流量的情况下。但是LT对代码编写要求比较低,不容易出现问题。LT模式服务编写上的表现是:只要有数据没有被获取,内核就不断通知你,因此不用担心事件丢失的情况。

     

    三、值得注意的情况:

    1.当使用epoll的ET模型来工作时,当产生了一个EPOLLIN事件后,读数据的时候需要考虑的是当recv()返回的大小如果等于请求的大小,那么很有可能是缓冲区还有数据未读完,也意味着该次事件还没有处理完,所以还需要再次读取:

    while(rs)
    {
        buflen = recv(events[i].data.fd, buf, sizeof(buf), 0);
        if(buflen < 0)
        {
            // 由于是非阻塞的模式,所以当errno为EAGAIN时,表示当前缓冲区已无数据可读
            // 在这里就当作是该次事件已处理处.
            if(errno == EAGAIN)
                break;
            else
                return;
        }
        else if(buflen == 0)
        {
            // 这里表示对端的socket已正常关闭.
        }
        if(buflen == sizeof(buf)
            rs = 1;   // 需要再次读取
        else
            rs = 0;
    }

    2.如果发送端流量大于接收端的流量,也就是说,epoll所在的程序读比转发的socket要慢,由于是非阻塞的socket,那么send()函数虽然返回,但实际缓冲区的数据并未真正发给接收端,这样不断的读和发,当缓冲区满后会产生EAGAIN错误(参考mansend),同时,不理会这次请求发送的数据。所以,需要封装socket_send()的函数用来处理这种情况,该函数会尽量将数据写完再返回,返回-1表示出错。在socket_send()内部,当写缓冲已满(send()返回-1,且errno为EAGAIN),那么会等待后再重试。这种方式并不很完美,在理论上可能会长时间的阻塞在socket_send()内部,但暂没有更好的办法。

    ssize_t socket_send(int sockfd, const char* buffer, size_t buflen)
    {
        ssize_t tmp;
        size_t total = buflen;
        const char *p = buffer;
        
        while(1)
        {
            tmp = send(sockfd, p, total, 0);
            if(tmp < 0)
            {
                // 当send收到信号时,可以继续写,但这里返回-1.
                if(errno == EINTR)
                return -1;
            
                // 当socket是非阻塞时,如返回此错误,表示写缓冲队列已满,
                // 在这里做延时后再重试.
                if(errno == EAGAIN)
                {
                    usleep(1000);
                    continue;
                }
            
                return -1;
            }
        
            if((size_t)tmp == total)
                return buflen;
            
            total -= tmp;
            p += tmp;
        }
        
        return tmp;
    }

    四、实例

    #include <iostream>
    #include <sys/socket.h>
    #include <sys/epoll.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <fcntl.h>
    #include <unistd.h>
    #include <stdio.h>
    #include <errno.h>
    
    using namespace std;
    
    #define MAXLINE 5
    #define OPEN_MAX 100
    #define LISTENQ 20
    #define SERV_PORT 5000
    #define INFTIM 1000 
    
    //设置非阻塞 
    void setnonblocking(int sock)
    {
        int opts;
        opts = fcntl(sock, F_GETFL);
    
        if(opts<0)
        {
           perror("fcntl(sock,GETFL)");
           exit(1);
        }
    
       opts = opts|O_NONBLOCK;
    
        if(fcntl(sock,F_SETFL,opts)<0)
        {
           perror("fcntl(sock,SETFL,opts)");
           exit(1);
        }  
    }
    
    int main()
    {
        int i, maxi, listenfd,connfd, sockfd,epfd,nfds;
        ssize_t n;
        char line[MAXLINE];
           socklen_t clilen;
    
        //声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件
        struct epoll_event ev, events[20];
    
        //生成用于处理accept的epoll专用的文件描述符
        epfd = epoll_create(256);
    
        struct sockaddr_in clientaddr;
        struct sockaddr_in serveraddr;
    
        listenfd = socket(AF_INET, SOCK_STREAM, 0);
    
        //把socket设置为非阻塞方式
        //setnonblocking(listenfd);
    
        //设置与要处理的事件相关的文件描述符
        ev.data.fd = listenfd;
    
        //设置要处理的事件类型ET
        ev.events = EPOLLIN|EPOLLET;
    
        //ev.events=EPOLLIN;
        //注册epoll事件
        epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
        bzero(&serveraddr, sizeof(serveraddr));
        serveraddr.sin_family = AF_INET;
    
        char *local_addr="127.0.0.1";
    
        inet_aton(local_addr,&(serveraddr.sin_addr));//htons(SERV_PORT);
        serveraddr.sin_port=htons(SERV_PORT);
    
        bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr));
        listen(listenfd, LISTENQ);
    
        maxi = 0;
        for ( ; ; ) {
           //等待epoll事件的发生
           nfds = epoll_wait(epfd, events, 20, 500);
    
           //处理所发生的所有事件    
           for(i = 0; i < nfds;++i)
           {
               if(events[i].data.fd == listenfd)
               {
                    connfd = accept(listenfd, (sockaddr *)&clientaddr, &clilen);
                    if(connfd < 0){
                        perror("connfd<0");
                        exit(1);
                    }
                    //setnonblocking(connfd);
                    char *str = inet_ntoa(clientaddr.sin_addr);
                    cout << "accapt a connection from " << str << endl;
    
                    //设置用于读操作的文件描述符
                    ev.data.fd = connfd;
    
                    //设置用于注测的读操作事件
                    ev.events = EPOLLIN|EPOLLET;
    
                    //ev.events=EPOLLIN;
                    //注册ev
                    epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
               }
               else if(events[i].events&EPOLLIN)
               {
                    cout << "EPOLLIN" << endl;
                    if ( (sockfd = events[i].data.fd) < 0)
                        continue;
                    if ( (n = read(sockfd, line, MAXLINE)) < 0){
                        if (errno == ECONNRESET) {
                            close(sockfd);
                            events[i].data.fd = -1;
                        } else
                            std::cout<<"readline error"<<std::endl;
    
                    } else if (n == 0) {
                        close(sockfd);
                        events[i].data.fd = -1;
                    }
                    line[n] = '\0';
                    cout << "read " << line << endl;
                    
                    //设置用于写操作的文件描述符
                    ev.data.fd = sockfd;
    
                    //设置用于注册的写操作事件
                    ev.events = EPOLLOUT|EPOLLET;
    
                    //修改sockfd上要处理的事件为EPOLLOUT
                    //epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
                }
                else if(events[i].events&EPOLLOUT)
                {  
                    sockfd = events[i].data.fd;
                    write(sockfd, line, n);
                    
                    //设置用于读操作的文件描述符
                    ev.data.fd = sockfd;
                    //设置用于注测的读操作事件
                    ev.events = EPOLLIN|EPOLLET;
                    //修改sockfd上要处理的事件为EPOLIN
                    epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
                }
            }
        }
        return 0;
    }

    上面的代码是ET模式

    测试脚本1:

    #!/usr/bin/python
    import socket
    import time
    
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(('127.0.0.1', 5000))
    
    sock.send('1234567890')
    time.sleep(5)while(1):
        time.sleep(1)

    输出1:

    accapt a connection from 0.0.0.0
    EPOLLIN
    read 12345

    说明1:

    运行server和client发现,server仅仅读取了5字节的数据,而client其实发送了10字节的数据,也就是说,server仅当第一次监听到了EPOLLIN事件,由于没有读取完数据,而且采用的是ET模式,状态在此之后不发生变化,因此server再也接收不到EPOLLIN事件了。当关闭客户端时,会另外触发一个事件,这个事件又触发了一次读操作,也就将后面的5个字节读取出来。

    测试脚本2:

    #!/usr/bin/python
    import socket
    import time
    
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(('127.0.0.1', 5000))
    
    sock.send('1234567890')
    time.sleep(5)
    sock.send('1234567890')
    
    while(1):
        time.sleep(1)

    输出2:

    accapt a connection from 0.0.0.0
    EPOLLIN
    read 12345
    (5 sec...)
    EPOLLIN
    read 67890

    说明2:

    可以发现,在server接收完5字节的数据之后一直监听不到client的事件,而当client休眠5秒之后重新发送数据,server再次监听到了变化,只不过因为只是读取了5个字节,仍然有10个字节的数据(client第二次发送的数据)没有接收完。

    如果上面的实验中,对accept的socket都采用的是LT模式,那么只要还有数据留在buffer中,server就会继续得到通知,可以将上面标黄的选项去掉则变为LT模式。

    五、总结

        ET模式仅当状态发生变化的时候才获得通知,这里所谓的状态的变化并不包括缓冲区中还有未处理的数据,也就是说,如果要采用ET模式,需要一直read/write直到出错为止,很多人反映为什么采用ET模式只接收了一部分数据就再也得不到通知了,大多是这个原因造成的;而LT模式是只要有数据没有处理就会一直通知下去的。

    补充说明一下这里一直强调的"状态变化"是什么:

    1)对于监听可读事件时,如果是socket是监听socket,那么当有新的主动连接到来为状态发生变化;对一般的socket而言,协议栈中相应的缓冲区有新的数据为状态发生变化。但是,如果在一个时间同时接收了N个连接(N>1),但是监听socket只accept了一个连接,那么其它未 accept的连接将不会在ET模式下给监听socket发出通知,此时状态不发生变化;对于一般的socket,就如例子中而言,如果对应的缓冲区本身已经有了N字节的数据,而只取出了小于N字节的数据,那么残存的数据不会造成状态发生变化。

    2)对于监听可写事件时,同理可推,不再详述。

         不论是监听可读还是可写,对方关闭socket连接都将造成状态发生变化,比如在例子中,如果强行中断client脚本,也就是主动中断了socket连接,那么都将造成server端发生状态的变化,从而server得到通知,将已经在本方缓冲区中的数据读出。

        把前面的描述可以总结如下:仅当对方的动作(发出数据,关闭连接等)造成的事件才能导致状态发生变化,而本方协议栈中已经处理的事件(包括接收了对方的数据,接收了对方的主动连接请求)并不是造成状态发生变化的必要条件,状态变化一定是对方造成的。所以在ET模式下的,必须一直处理到出错或者完全处理完毕,才能进行下一个动作,否则可能会发生错误。

    部分转自他处-没有找到最终来源

  • 相关阅读:
    DHCP DHCPv6
    DHCPv6协议
    IPv6邻居发现协议
    CentOS下禁止防火墙
    centOS下更新yum源
    centOS下yum报错
    Flink+Kafka整合的实例
    Flink基本概念
    Ubuntu16.04下配置ssh免密登录
    Zookeeper+Kafka的单节点配置
  • 原文地址:https://www.cnblogs.com/geekma/p/2695534.html
Copyright © 2011-2022 走看看