zoukankan      html  css  js  c++  java
  • epoll

    1. epoll基础

    epoll - I/O event notification facility

    epoll是一种I/O事件通知机制,这句话基本上包含了所有需要理解的要点:

    I/O事件

    基于file descriptor,支持file, socket, pipe等各种I/O方式。

    当文件描述符关联的内核读缓冲区可读,则触发可读事件,什么是可读呢?就是内核缓冲区非空,有数据可以读取。

    当文件描述符关联的内核写缓冲区可写,则触发可写事件,什么是可写呢?就是内核缓冲区不满,有空闲空间可以写入。

    通知机制

    通知机制,就是当事件发生的时候,去通知他。通知机制的反面,就是轮询机制

    epoll是一种当文件描述符的内核缓冲区非空的时候,发出可读信号进行通知,当写缓冲区不满的时候,发出可写信号通知的机制。

    注:epoll不能操作普通文件:epoll仅对通常在读取/写入时表现出阻塞行为的文件描述符(如管道和套接字)有意义。

    普通文件描述符总是会立即或多或少地立即返回结果或文件结束,epoll_wait()会一直上报直到文件结束(读)或一直写,因此epoll操作无意义

    • epoll_create

    #include <sys/epoll.h>

    int epoll_create(int size);

    size不用,0即可。

    int epoll_create1(int flags);

    • epoll_ctl

    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

    op: EPOLL_CTL_ADD, EPOLL_CTL_DEL, EPOLL_CTL_MOD;

    typedef union epoll_data{
        void *ptr;
        int fd;
        uint32_t u32;
        uint64_t u64;
    }epoll_data_t;
    
    typedef epoll_event {
        uint32_t events; /*Epoll events*/
        epoll_data_t data; 
    };
    events可以是以下几个宏的集合:
    EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
    EPOLLOUT:表示对应的文件描述符可以写;
    EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
    EPOLLERR:表示对应的文件描述符发生错误;
    EPOLLHUP:表示对应的文件描述符被挂断;
    EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
    EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。
    • epoll_wait

    int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

    int epoll_pwait(int epfd, struct epoll_event *events, int maxevents, int timeout, const sigset_t *sigmask);

    timeout: -1死等;0,不等;>0,有限等待时间(ms)。

    成功:事件个数,失败-1。

    2. 水平触发和边缘触发

    epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。

    水平触发只要缓冲区有数据就会一直触发,与select和poll相同。如同电路上只要高电平(1)或低电平(0)时就触发通知。

    只要文件描述符关联的读内核缓冲区非空,有数据可以读取,就一直发出可读信号进行通知;当文件描述符关联的内核写缓冲区不满,有空间可以写入,就一直发出可写信号进行通知。

    LT模式支持阻塞和非阻塞两种方式。epoll默认的模式是LT。

    边沿触发:只有在缓冲区增加数据的那一刻才会触发。如同电路上只有电平发生变化(高电平到低电平,或者低电平到高电平)的时候才触发通知。

    当文件描述符关联的读内核缓冲区由空转化为非空的时候,则发出可读信号进行通知;当文件描述符关联的内核写缓冲区由满转化为不满的时候,则发出可写信号进行通知。

    水平触发和边缘触发模式区别

    1)读缓冲区刚开始是空的

    2)读缓冲区写入2KB数据

    3)水平触发和边缘触发模式此时都会发出可读信号。收到信号通知后,读取了1kb的数据,读缓冲区还剩余1KB数据,水平触发会再次进行通知,而边缘触发不会再进行通知。

    所以,边缘触发需要一次性的把缓冲区的数据读完为止,也就是一直读,直到读到EGAIN为止,EGAIN说明缓冲区已经空了,因为这一点,边缘触发需要设置文件句柄为非阻塞。

    //水平触发
    ret = read(fd, buf, sizeof(buf));
    
    //边缘触发
    while(true) {
        ret = read(fd, buf, sizeof(buf);
        if (ret == EAGAIN) break;
    }
    使用ET的例子:nginx。
    使用LT的例子:redis。

    为什么边沿触发必须使用非阻塞IO?

    阻塞IO:当你去读一个阻塞的文件描述符时,如果在该文件描述符上没有数据可读,那么它会一直阻塞(通俗一点就是一直卡在调用函数那里),直到有数据可读。当你去写一个阻塞的文件描述符时,如果在该文件描述符上没有空间(通常是缓冲区)可写,那么它会一直阻塞,直到有空间可写。以上的读和写我们统一指在某个文件描述符进行的操作,不单单指真正的读数据,写数据,还包括接收连接accept(),发起连接connect()等操作...

    非阻塞IO:当你去读写一个非阻塞的文件描述符时,不管可不可以读写,它都会立即返回,返回成功说明读写操作完成了,返回失败会设置相应errno状态码,根据这个errno可以进一步执行其他处理。它不会像阻塞IO那样,卡在那里不动!!!

    通常来说,et方式是比较危险的方式,如果要使用et方式,那么,应用程序应该

    1、将socket设置为non-blocking方式

    2、epoll_wait收到event后,read或write需要读到没有数据为止,write需要写到没有数据为止(对于non-blocking socket来说,EAGAIN通常是无数据可读,无数据可写的返回状态);

    测试代码(参考:浅析epoll的水平触发和边缘触发,以及边缘触发为什么要使用非阻塞IO):

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <unistd.h>
    #include <errno.h>
    #include <fcntl.h>
    #include <arpa/inet.h>
    #include <netinet/in.h>
    #include <sys/socket.h>
    #include <sys/epoll.h>
    
    #define MAX_BUFFER_SIZE        5
    #define MAX_EPOLL_EVENTS    20    
    #define EPOLL_LT    0
    #define EPOLL_ET    1
    #define FD_BLOCK    0
    #define FD_NONBLOCK    1
    
    int set_nonblock(int fd){
        int old_flags = fcntl(fd, F_GETFL);
        fcntl(fd, F_SETFL, old_flags | O_NONBLOCK);
        return old_flags;
    }
    
    // 注册文件描述符到epoll,并设置其事件为EPOLLIN
    void addfd_to_epoll(int epfd, int fd, int epoll_type, int block_type){
        struct epoll_event ev;
        ev.data.fd = fd;
        ev.events = EPOLLIN;
    
        if (epoll_type == EPOLL_ET){
            ev.events |= EPOLLET;
        }
    
        if (block_type == FD_NONBLOCK){
            set_nonblock(fd);
        }
    
        epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
    }
    
    void epoll_lt(int sockfd){
        char buffer[MAX_BUFFER_SIZE];
        int ret;
    
        printf("-------------------LT recv...
    ");
        memset(buffer, 0, MAX_BUFFER_SIZE);
        ret = recv(sockfd, buffer, MAX_BUFFER_SIZE-1, 0);
        printf("recv bytes: %d
    ", ret);
        if (ret > 0){
    /*        printf("recv bytes:");
            for(i = 0; i < MAX_BUFFER_SIZE; i++){
                printf("%2x ", buffer[i]);
            }
            printf("
    ");
    */
            printf("recv bytes: %s
    ", buffer);
    
        } else {
            if (ret == 0){
                printf("client close
    ");
            }
            close(sockfd);
        }
        printf("LT deal with over
    ");
    }
    
    void epoll_et_loop(int sockfd){
        char buffer[MAX_BUFFER_SIZE];
        int ret ;
    
        printf("-------------------ET loop recv...
    ");
        while(1){
            memset(buffer, 0, MAX_BUFFER_SIZE);
            ret = recv(sockfd, buffer, MAX_BUFFER_SIZE-1, 0);
            printf("ET loop recv bytes: %d
    ", ret);
            if (ret > 0){
                printf("ET loop recv %d bytes: %s
    ", ret, buffer);
            } else if (ret < 0){
                if ((errno == EAGAIN) || errno == EWOULDBLOCK){
                    printf("ET loop recv all data...
    ");
                    break;
                }
                close(sockfd);
                break;
            } else { //if (ret == 0){
                printf("client close
    ");
                close(sockfd);
                break;
            }
        }
        printf("ET loop deal with over
    ");
    }
    
    void epoll_et_nonloop(int sockfd){
        char buffer[MAX_BUFFER_SIZE];
        int ret ;
    
        printf("--------------------ET nonloop recv...
    ");
        memset(buffer, 0, MAX_BUFFER_SIZE);
        ret = recv(sockfd, buffer, MAX_BUFFER_SIZE-1, 0);
    
        printf("ET nonloop recv bytes: %d
    ", ret);
        if (ret > 0){
            printf("ET loop recv %d bytes: %s
    ", ret, buffer);
        } else if (ret < 0){
            close(sockfd);
        } else { //if (ret == 0){
            printf("client close
    ");
            close(sockfd);
        }
    
        printf("ET nonloop deal with over
    ");
    }
    
    void epoll_process(int epfd, struct epoll_event *events, int number, 
            int sockfd, int epoll_type, int block_type)
    {
        struct sockaddr_in client_addr;
        socklen_t client_addrlen;
        int newfd, confd;
        int i;
    
        for (i = 0; i <number; i++){
            newfd = events[i].data.fd;
            if (newfd == sockfd){
                printf("---------------accept()-------------
    ");
           // 模拟服务器繁忙,无法立即accept() printf(
    "sleep 3s... "); sleep(3); printf("sleep 3s over "); client_addrlen = sizeof(client_addr); confd = accept(sockfd, (struct sockaddr *)&client_addr, &client_addrlen); printf("confd= %d ", confd); addfd_to_epoll(epfd, confd, epoll_type, block_type); printf("accept() over!!! "); } else if (events[i].events & EPOLLIN){ if (epoll_type == EPOLL_LT){ epoll_lt(newfd); } else if (epoll_type == EPOLL_ET){ epoll_et_loop(newfd); //epoll_et_nonloop(newfd); } } else { printf("other events... "); } } } void err_exit(char *msg){ perror(msg); exit(1); } int create_socket(const char *ip, const int portnumber){ struct sockaddr_in server_addr; int sockfd , reuse = 1; memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(portnumber); if(inet_pton(PF_INET, ip, &server_addr.sin_addr)== -1){ err_exit("inet_pton() error"); } if ((sockfd = socket(PF_INET, SOCK_STREAM, 0)) == -1){ err_exit("socket() error"); } if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1){ err_exit("setsockopt() error"); } if (bind(sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1){ err_exit("bind() error"); } if (listen(sockfd, 5) == -1){ err_exit("listen() error"); } return sockfd; } int main(int argc, char *argv[]) { if (argc < 3){ fprintf(stderr, "usage: %s ip_address port_number ", argv[0]); exit(1); } int sockfd, epfd, number; sockfd = create_socket(argv[1], atoi(argv[2])); struct epoll_event events[MAX_EPOLL_EVENTS]; if ((epfd = epoll_create1(0)) == -1){ err_exit("epoll_create() error"); } // 以下设置针对监听的sockfd,当epoll_wait返回时,必定有事件发生 // 所以这里忽略罕见的情况外,设置阻塞IO没有意义,我们设置非阻塞IO // // 水平触发 非阻塞 addfd_to_epoll(epfd, sockfd, EPOLL_LT, FD_NONBLOCK); // 边缘触发 非阻塞 //addfd_to_epoll(epfd, sockfd, EPOLL_ET, FD_NONBLOCK); while(1){ number = epoll_wait(epfd, events, MAX_EPOLL_EVENTS, -1); if (number == -1){ err_exit("epoll_wait() error"); } else{ // 水平触发 阻塞 //epoll_process(epfd, events, number, sockfd, EPOLL_LT, FD_BLOCK); // 水平触发 非阻塞 //epoll_process(epfd, events, number, sockfd, EPOLL_LT, FD_NONBLOCK); // 边缘触发 阻塞 //epoll_process(epfd, events, number, sockfd, EPOLL_ET, FD_BLOCK); // 边缘触发 非阻塞 epoll_process(epfd, events, number, sockfd, EPOLL_ET, FD_NONBLOCK); } } close(sockfd); return 0; }

    水平触发、边缘触发与阻塞和非阻塞的组合如下:

    水平触发阻塞sockfd,边缘触发阻塞sockfd:针对sockfd,当epoll_wait()返回时总有事件发生,除特殊情况外,设置阻塞IO没有意义,一般设置sockfd为非阻塞IO。

    水平触发非阻塞sockfd:不会丢连接,因为只要有连接未读取,就会触发epoll_wait()。

    边缘触发非阻塞sockfd:会丢失连接,因为可能不会一次处理完或读取所有客户端连接。

    水平触发阻塞confd:可以正确读取完数据。

    水平触发非阻塞confd:可以正确读取完数据,与阻塞效果相同。

    边缘触发阻塞confd:每次数据改变均触发一次epoll_wait(),若本次未读取完,下次触发时继续接着读取原来的数据,所以必须一次性读取完数据;一次读取完(循环读取)数据时会阻塞,导致程序不能继续运行(不能accept或epoll)。

    边缘触发非阻塞confd:一次性读写完数据,不丢失数据,不阻塞。

    结论:

    1.对于监听的sockfd,最好使用水平触发模式(非阻塞),边缘触发模式会导致高并发情况下,有的客户端会连接不上。如果非要使用边缘触发,网上有的方案是用while来循环accept()。

    2.对于读写的connfd,水平触发模式下,阻塞和非阻塞效果都一样,不过为了防止特殊情况,还是建议设置非阻塞。

    3.对于读写的connfd,边缘触发模式下,必须使用非阻塞IO,并要一次性全部读写完数据。

    经典示例

    #include <sys/socket.h>
    #include <sys/epoll.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <fcntl.h>
    #include <unistd.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <errno.h>
    
    #define MAXLINE 5
    #define OPEN_MAX 100
    #define LISTENQ 20
    #define SERV_PORT 5000
    #define INFTIM 1000
    
    void setnonblocking(int sock)
    {
        int opts;
        opts=fcntl(sock,F_GETFL);
        if(opts<0)
        {
            perror("fcntl(sock,GETFL)");
            exit(1);
        }
        opts = opts|O_NONBLOCK;
        if(fcntl(sock,F_SETFL,opts)<0)
        {
            perror("fcntl(sock,SETFL,opts)");
            exit(1);
        }
    }
    
    int main(int argc, char* argv[])
    {
        int i, listenfd, connfd, sockfd,epfd,nfds, portnumber;
        ssize_t n;
        char line[MAXLINE];
        socklen_t clilen;
    
    
        if ( 2 == argc )
        {
            if( (portnumber = atoi(argv[1])) < 0 )
            {
                fprintf(stderr,"Usage:%s portnumber
    ",argv[0]);
                return 1;
            }
        }
        else
        {
            fprintf(stderr,"Usage:%s portnumber
    ",argv[0]);
            return 1;
        }
    
        //声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件
        struct epoll_event ev,events[20];
    
        //生成用于处理accept的epoll专用的文件描述符
        epfd=epoll_create(1);
        struct sockaddr_in clientaddr;
        struct sockaddr_in serveraddr;
        listenfd = socket(AF_INET, SOCK_STREAM, 0);
    
        //把socket设置为非阻塞方式
        setnonblocking(listenfd);
    
        //设置与要处理的事件相关的文件描述符
        ev.data.fd=listenfd;
        //设置要处理的事件类型  水平触发非阻塞
        ev.events=EPOLLIN;
    
        //注册epoll事件
        epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
    
        bzero(&serveraddr, sizeof(serveraddr));
        serveraddr.sin_family = AF_INET;
        char *local_addr="127.0.0.1";
        inet_aton(local_addr,&(serveraddr.sin_addr));
        serveraddr.sin_port=htons(portnumber);
        bind(listenfd,(struct sockaddr *)&serveraddr, sizeof(serveraddr));
        listen(listenfd, LISTENQ);
        for ( ; ; ) {
            //等待epoll事件的发生
            nfds=epoll_wait(epfd, events, 20, 500);
    
            //处理所发生的所有事件
            for(i=0;i<nfds;++i)
            {
                if(events[i].data.fd==listenfd)//如果新监测到一个SOCKET用户连接到了绑定的SOCKET端口,建立新的连接。
                {
                    connfd = accept(listenfd, (struct sockaddr *)&clientaddr, &clilen);
                    if(connfd<0){
                        perror("connfd<0");
                        exit(1);
                    }
    
                    setnonblocking(connfd);
                    char *str = inet_ntoa(clientaddr.sin_addr);
                    printf("accapt a connection from %s
    ", str);
                    //设置用于读操作的文件描述符
                    ev.data.fd=connfd;
                    //设置用于注测的读操作事件, 边缘触发非阻塞
                    ev.events=EPOLLIN|EPOLLET;
    
                    //注册ev
                    epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
                }
                else if(events[i].events&EPOLLIN)//如果是已经连接的用户,并且收到数据,那么进行读入。
                {
                    printf("EPOLLIN
    ");
                    if ( (sockfd = events[i].data.fd) < 0)
                        continue;
                    while(1){
                        memset(line, 0, MAXLINE);
                        n = read(sockfd, line, MAXLINE-1);
                        if (n > 0){
                            printf("read(%ld): %s
    ", n, line);
                        } else if( n < 0){
                            if ((errno == EAGAIN) || errno == EWOULDBLOCK){
                                printf("read over
    ");
    
                                //设置用于写操作的文件描述符
                                ev.data.fd=sockfd;
                                //设置用于注测的写操作事件
                                ev.events=EPOLLOUT|EPOLLET;
                                //修改sockfd上要处理的事件为EPOLLOUT
                                epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
                                break;
                            }
                            //} else if (errno == ECONNRESET) {
                            close(sockfd);
                            events[i].data.fd = -1;
                            printf("readline error
    ");
                            break;
                        } else { //if (n == 0) {
                            close(sockfd);
                            events[i].data.fd = -1;
                            break;
                        }
                    }
                } 
                else if(events[i].events&EPOLLOUT) // 如果有数据发送
                {
                    sockfd = events[i].data.fd; // 应该用struct管理fd和数据line
                    write(sockfd, line, n);
                    //设置用于读操作的文件描述符
                    ev.data.fd=sockfd;
                    //设置用于注册读操作事件
                    ev.events=EPOLLIN|EPOLLET;
                    //修改sockfd上要处理的事件为EPOLIN
                    epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
                }
            }
        }
    
        return 0;
    }

    附:EPOLLRDHUP处理 

    在对系统问题进行排查时,我发现了一个奇怪的现象:明明是对方断开请求,系统却报告一个查询失败的错误,但从用户角度来看请求的结果正常返回,没有任何问题。

    对这个现象深入分析后发现,这是一个基于 epoll 的连接池实现上的问题,或者说是特性 。

    首先解释一下导致这个现象的原因。

    在使用 epoll 时,对端正常断开连接(调用 close()),在服务器端会触发一个 epoll 事件。在低于 2.6.17 版本的内核中,这个 epoll 事件一般是 EPOLLIN,即 0x1,代表连接可读。

    连接池检测到某个连接发生 EPOLLIN 事件且没有错误后,会认为有请求到来,将连接交给上层进行处理。这样一来,上层尝试在对端已经 close() 的连接上读取请求,只能读到 EOF,会认为发生异常,报告一个错误。

    因此在使用 2.6.17 之前版本内核的系统中,我们无法依赖封装 epoll 的底层连接库来实现对对端关闭连接事件的检测,只能通过上层读取数据时进行区分处理。

    不过,2.6.17 版本内核中增加了 EPOLLRDHUP 事件,代表对端断开连接,关于添加这个事件的理由可以参见 “[RFC] epoll and half closed TCP connections”。

    在使用 2.6.17 之后版本内核的服务器系统中,对端连接断开触发的 epoll 事件会包含 EPOLLIN | EPOLLRDHUP,即 0x2001。有了这个事件,对端断开连接的异常就可以在底层进行处理了,不用再移交到上层。

    重现这个现象的方法很简单,首先 telnet 到 server,然后什么都不做直接退出,查看在不同系统中触发的事件码。

    注意,在使用 2.6.17 之前版本内核的系统中,sys/epoll.h 的 EPOLL_EVENTS 枚举类型中是没有 EPOLLRDHUP 事件的,所以带 EPOLLRDHUP 的程序无法编译通过。

    参考:http://www.linuxidc.com/Linux/2016-04/129819.htm

    参考:

    1. epoll使用详解 简书

    2. 处理大并发之二 对epoll的理解,epoll客户端服务端代码

    3. http://www.linuxidc.com/Linux/2016-04/129819.htm

    4.浅析epoll的水平触发和边缘触发,以及边缘触发为什么要使用非阻塞IO

    5.Linux Epoll介绍和程序实例

    6. linux 网络编程 epoll libevent 高并发epoll示例

  • 相关阅读:
    python学习之第二课时--运行程序和字符编码
    python学习之前言
    一天一道算法题--6.14--思维题
    TOJ--2119--最小生成树和map
    NOJ--1046--dfs
    TOJ--1343--dfs
    一天一道算法题--6.13---计算几何
    一天一道算法题---6.12---链表结点的删除
    TOJ--1114--rmq/线段树
    TOJ--1278--最小生成树
  • 原文地址:https://www.cnblogs.com/embedded-linux/p/5023862.html
Copyright © 2011-2022 走看看