zoukankan      html  css  js  c++  java
  • epoll使用介绍

    一、epoll原理

    一个socket对应一个数据流,通过I/O操作中的read从流中读入数据,write向流中写入数据。当read时,socket流中没有数据的话,read阻塞,线程睡眠,CPU开始做其他的任务,流中有数据可读时,read返回。

    在阻塞IO模式下,一个线程只能处理一个IO事件。如果处理多个事件,需要多线程或多进程,但是效率比较低。

    1、如果采用非阻塞方式,需要不断轮训所有的流,假设共有N个socket流streams[N], 如下:

    // busy poll
    while True:
        for stream in streams[N]:
            if stream has data
                read all data

    这种模式最大的缺点是,如果没有socket可读,也将一直占用CPU,浪费CPU时间。

    2、为了解决这个问题,引入select,当streams[N]中有k(0 < k <= N)个socket流可操作时才返回,否则阻塞,释放CPU。

    // select
    while True:
        select(streams[N])            
        for stream in streams[N]:     
             if stream has data
                 read all data

    当streams[N]中没有数据时, 线程阻塞在select处,CPU处理其他任务。select返回时表示有k个流可操作,可是select并没有通知我们是那些流,因此我们需要轮询所有的N个流,

    时间复杂度为O(N). 在N比较小时,这样处理ok,但是当连接数达到数万甚至几十万时(C10K问题),select的轮询机制会导致效率低下。

    3、epoll则解决了selec后的轮询。epoll会返回每个可操作的socket流以及这些流产生了那些IO事件。

    // epoll
    while True:
        active_streams[k] = epoll(streams[N])
        for stream in active_streams[k]
            deal all data

    这样epoll将复杂度降低为O(1), 提高了效率[1], 解决了C10K问题。

    二、epoll API介绍

    1、int epoll_create(int size);

    创建epoll描述符fd,size用来告诉内核这个监听的数目一共有多大,但是Linux 2.6.8以后,这个参数已经弃用。

    2、int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev);

    epfd: epoll_create 创建的描述符

    op:

      EPOLL_CTL_ADD:注册新的fd到epfd中;

      EPOLL_CTL_MOD:修改已经注册的fd的监听事件;

      EPOLL_CTL_DEL:从epfd中删除一个fd;

    通过epoll_add、epoll_del、epoll_mod可以实现对三个操作的封装:

     1 int event_add(int epollfd, int fd, int event)
     2 {
     3     struct epoll_event ev;
     4     ev.events = event;
     5     ev.data.fd = fd;
     6 
     7     return epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
     8 }
     9 
    10 int event_del(int epollfd, int fd, int event)
    11 {
    12     struct epoll_event ev;
    13     ev.events = event;
    14     ev.data.fd = fd;
    15 
    16     return epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev);
    17 }
    18 
    19 int event_mod(int epollfd, int fd, int event)
    20 {
    21     struct epoll_event ev;
    22     ev.events = event;
    23     ev.data.fd = fd;
    24 
    25     return epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev);
    26 }

    fd: 需要监听的fd

    ev:

    //保存触发事件的某个文件描述符相关的数据(与具体使用方式有关)
    
    typedef union epoll_data {
        void *ptr;
        int fd;
        __uint32_t u32;
        __uint64_t u64;
    } epoll_data_t;
     //感兴趣的事件和被触发的事件
    struct epoll_event {
        __uint32_t events; /* Epoll events */
        epoll_data_t data; /* User data variable */
    };

    events可以是以下几个宏的集合:

    EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);

    EPOLLOUT:表示对应的文件描述符可以写;

    EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);

    EPOLLERR:表示对应的文件描述符发生错误;

    EPOLLHUP:表示对应的文件描述符被挂断;

    EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。

    EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

    3、int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);

    epoll_wait与select类似,等待事件的发生。events既是有IO发生的流对应的epoll event。maxevents表示监听的fd的最大个数。

    timeout:超时事件,0表示立即返回,-1表示永久阻塞。

    三、epoll使用

    下面以简单的echo服务器,介绍epoll的使用。

    echo服务器,接收client发送的字符串,然后原路返回。

      1 int main(void)
      2 {
      3     int listenfd = -1;
      4     int epollfd = -1;
      5     struct sockaddr_in svraddr;
      6     struct epoll_event events[EVENT_SIZE];
      7     int nready = 0;
      8     char buf[BUF_SIZE];
      9     int i = 0;
     10     int fd = 0;
     11 
     12     if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
     13     {
     14         perror("socket error");
     15         return -1;
     16     }
     17 
     18     memset(&svraddr, 0, sizeof(svraddr));
     19     svraddr.sin_family = AF_INET;
     20     svraddr.sin_port = htons(44888);
     21     svraddr.sin_addr.s_addr = htonl(INADDR_ANY);
     22 
     23     if (bind(listenfd, (struct sockaddr* )&svraddr, sizeof(svraddr)) < 0)
     24     {
     25         perror("bind error");
     26         return -1;
     27     }
     28 
     29     if (listen(listenfd, 5) < 0)
     30     {
     31         perror("listen error");
     32         return -1;
     33     }
     34         
     35     if ((epollfd = epoll_create(EVENT_SIZE)) < 0)
     36     {
     37         perror("bind error");
     38         return -1;
     39     }
     40     event_add(epollfd, listenfd, EPOLLIN);
     41 
     42     printf("listen for sockets ...
    ");
     43     while (1)
     44     {
     45         nready = epoll_wait(epollfd, events, EVENT_SIZE, -1);
     46         for (i = 0; i < nready; i++)
     47         {
     48             fd = events[i].data.fd;
     49             if ((fd == listenfd) && (events[i].events & EPOLLIN))
     50             {
     51                 do_accept(epollfd, listenfd);    
     52             }
     53             else if (events[i].events & EPOLLIN)
     54             {
     55                 do_read(epollfd, fd, buf, BUF_SIZE - 1);
     56             }
     57             else if (events[i].events & EPOLLOUT)
     58             {
     59                 do_write(epollfd, fd, buf, BUF_SIZE - 1);
     60             }
     61             else
     62             {
     63                 printf("unused fd %d, event %d", fd, events[i].events);
     64             }
     65         }
     66     }
     67 
     68     return 0;
     69 }
     70 
     71 int do_accept(int epollfd, int listenfd)
     72 {
     73     struct sockaddr_in cliaddr;
     74     int clifd = -1;
     75     socklen_t len = sizeof(cliaddr);
     76     if ((clifd = accept(listenfd, (struct sockaddr* )&cliaddr, &len)) < 0)
     77     {
     78         perror("accept error");
     79         return -1;
     80     }
     81 
     82     printf("accept client: <%s:%d>
    ", inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port);
     83     // add client in read list
     84     event_add(epollfd, clifd, EPOLLIN);
     85 }
     86 
     87 int do_read(int epollfd, int fd, char* buf, int maxsize)
     88 {
     89     memset(buf, 0, maxsize);
     90     int nread = read(fd, buf, maxsize);
     91     if (nread <= 0)
     92     {
     93         if (nread == 0)
     94         {
     95             printf("client %d close socket.
    ", fd);
     96         }
     97         else
     98         {
     99             printf("client %d read error
    ", fd);
    100         }
    101         close(fd);
    102         event_del(epollfd, fd, EPOLLIN);
    103         return nread;
    104     }
    105 
    106     printf("recv from client %d :%s", fd, buf);
    107     // set read to write
    108     event_mod(epollfd, fd, EPOLLOUT);
    109 }
    110 
    111 int do_write(int epollfd, int fd, char* buf, int maxsize)
    112 {
    113     int nread = write(fd, buf, maxsize);
    114     if (nread < 0)
    115     {
    116         printf("client %d write error
    ", fd);
    117         close(fd);
    118         event_del(epollfd, fd, EPOLLIN);
    119         return nread;
    120     }
    121 
    122     printf("send to client %d :%s", fd, buf);
    123     // set write to read
    124     event_mod(epollfd, fd, EPOLLIN);
    125 }

    echo client,从终端输入字符串,然后发送给server,并接收server返回的字符串,答应到终端。

      1 int main(void)
      2 {
      3     int sockfd = -1;
      4     int epollfd = -1;
      5     struct sockaddr_in svraddr;
      6     struct epoll_event events[EVENT_SIZE];
      7     int nready = 0;
      8     char buf[BUF_SIZE];
      9     int i = 0;
     10     int fd = 0;
     11 
     12     if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
     13     {
     14         perror("socket error");
     15         return -1;
     16     }
     17 
     18     memset(&svraddr, 0, sizeof(svraddr));
     19     svraddr.sin_family = AF_INET;
     20     svraddr.sin_port = htons(44888);
     21     inet_pton(AF_INET, "127.0.0.1", &svraddr.sin_addr);
     22 
     23     if (connect(sockfd, (struct sockaddr* )&svraddr, sizeof(svraddr)) < 0)
     24     {
     25         perror("connect error");
     26         return -1;
     27     }
     28     printf("connect to server: <127.0.0.1:44888>
    ");
     29 
     30     if ((epollfd = epoll_create(EVENT_SIZE)) < 0)
     31     {
     32         perror("bind error");
     33         return -1;
     34     }
     35     event_add(epollfd, STDIN_FILENO, EPOLLIN);
     36 
     37     while (1)
     38     {
     39         nready = epoll_wait(epollfd, events, EVENT_SIZE, -1);
     40         //printf("EVENT SIZE %d
    ", nready);
     41         for (i = 0; i < nready; i++)
     42         {
     43             fd = events[i].data.fd;
     44             if (events[i].events & EPOLLIN)
     45             {
     46                 //printf("IN EVENT %d:%d
    ", fd, EPOLLIN);
     47                 do_read(epollfd, fd, sockfd, buf, BUF_SIZE - 1);
     48             }
     49             else if (events[i].events & EPOLLOUT)
     50             {
     51                 //printf("OUT EVENT %d:%d
    ", fd, EPOLLOUT);
     52                 do_write(epollfd, fd, sockfd, buf, BUF_SIZE - 1);
     53             }
     54             else
     55             {
     56                 printf("unused fd %d, event %d", fd, events[i].events);
     57             }
     58         }
     59     }
     60 
     61     return 0;
     62 }
     63 
     64 int do_read(int epollfd, int fd, int sockfd, char* buf, int maxsize)
     65 {
     66     memset(buf, 0, maxsize);
     67     int nread = read(fd, buf, maxsize);
     68     if (nread <= 0)
     69     {
     70         if (nread == 0)
     71         {
     72             printf("client %d close socket.
    ", fd);
     73         }
     74         else
     75         {
     76             printf("client %d read error
    ", fd);
     77         }
     78         close(fd);
     79         event_del(epollfd, fd, EPOLLIN);
     80         return nread;
     81     }
     82 
     83     if (fd == STDIN_FILENO) // std input
     84     {
     85         printf("cli input %d: %s", fd, buf);
     86         event_add(epollfd, sockfd, EPOLLOUT);
     87     }
     88     else
     89     {
     90         printf("read from sock %d: %s", fd, buf);
     91         event_del(epollfd, sockfd, EPOLLIN);
     92         event_add(epollfd, STDOUT_FILENO, EPOLLOUT);
     93     }
     94 }
     95 
     96 int do_write(int epollfd, int fd, int sockfd, char* buf, int maxsize)
     97 {
     98     int nread = write(fd, buf, maxsize);
     99     if (nread < 0)
    100     {
    101         printf("client %d write error
    ", fd);
    102         close(fd);
    103         event_del(epollfd, fd, EPOLLIN);
    104         return nread;
    105     }
    106 
    107     if (fd == STDOUT_FILENO)
    108     {
    109         printf("write to stdin %d: %s", fd, buf);
    110         event_del(epollfd, fd, EPOLLOUT);
    111     }
    112     else
    113     {
    114         printf("write to sock %d: %s", fd, buf);
    115         event_mod(epollfd, fd, EPOLLIN);
    116     }
    117 }
  • 相关阅读:
    SPSS-Friedman 秩和检验-非参数检验-K个相关样本检验 案例解析
    SPSS-多重响应-频率和交叉表案例分析(问卷调查分析)
    SPSS--回归-多元线性回归模型案例解析
    深入理解RunLoop
    杂七杂八集合
    单元测试
    笔记
    http断点续传
    iOS性能优化
    群聊协议
  • 原文地址:https://www.cnblogs.com/ym65536/p/4854869.html
Copyright © 2011-2022 走看看