zoukankan      html  css  js  c++  java
  • epoll使用介绍

    一、epoll原理

    一个socket对应一个数据流,通过I/O操作中的read从流中读入数据,write向流中写入数据。当read时,socket流中没有数据的话,read阻塞,线程睡眠,CPU开始做其他的任务,流中有数据可读时,read返回。

    在阻塞IO模式下,一个线程只能处理一个IO事件。如果处理多个事件,需要多线程或多进程,但是效率比较低。

    1、如果采用非阻塞方式,需要不断轮训所有的流,假设共有N个socket流streams[N], 如下:

    // busy poll
    while True:
        for stream in streams[N]:
            if stream has data
                read all data

    这种模式最大的缺点是,如果没有socket可读,也将一直占用CPU,浪费CPU时间。

    2、为了解决这个问题,引入select,当streams[N]中有k(0 < k <= N)个socket流可操作时才返回,否则阻塞,释放CPU。

    // select
    while True:
        select(streams[N])            
        for stream in streams[N]:     
             if stream has data
                 read all data

    当streams[N]中没有数据时, 线程阻塞在select处,CPU处理其他任务。select返回时表示有k个流可操作,可是select并没有通知我们是那些流,因此我们需要轮询所有的N个流,

    时间复杂度为O(N). 在N比较小时,这样处理ok,但是当连接数达到数万甚至几十万时(C10K问题),select的轮询机制会导致效率低下。

    3、epoll则解决了selec后的轮询。epoll会返回每个可操作的socket流以及这些流产生了那些IO事件。

    // epoll
    while True:
        active_streams[k] = epoll(streams[N])
        for stream in active_streams[k]
            deal all data

    这样epoll将复杂度降低为O(1), 提高了效率[1], 解决了C10K问题。

    二、epoll API介绍

    1、int epoll_create(int size);

    创建epoll描述符fd,size用来告诉内核这个监听的数目一共有多大,但是Linux 2.6.8以后,这个参数已经弃用。

    2、int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev);

    epfd: epoll_create 创建的描述符

    op:

      EPOLL_CTL_ADD:注册新的fd到epfd中;

      EPOLL_CTL_MOD:修改已经注册的fd的监听事件;

      EPOLL_CTL_DEL:从epfd中删除一个fd;

    通过epoll_add、epoll_del、epoll_mod可以实现对三个操作的封装:

     1 int event_add(int epollfd, int fd, int event)
     2 {
     3     struct epoll_event ev;
     4     ev.events = event;
     5     ev.data.fd = fd;
     6 
     7     return epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
     8 }
     9 
    10 int event_del(int epollfd, int fd, int event)
    11 {
    12     struct epoll_event ev;
    13     ev.events = event;
    14     ev.data.fd = fd;
    15 
    16     return epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev);
    17 }
    18 
    19 int event_mod(int epollfd, int fd, int event)
    20 {
    21     struct epoll_event ev;
    22     ev.events = event;
    23     ev.data.fd = fd;
    24 
    25     return epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev);
    26 }

    fd: 需要监听的fd

    ev:

    //保存触发事件的某个文件描述符相关的数据(与具体使用方式有关)
    
    typedef union epoll_data {
        void *ptr;
        int fd;
        __uint32_t u32;
        __uint64_t u64;
    } epoll_data_t;
     //感兴趣的事件和被触发的事件
    struct epoll_event {
        __uint32_t events; /* Epoll events */
        epoll_data_t data; /* User data variable */
    };

    events可以是以下几个宏的集合:

    EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);

    EPOLLOUT:表示对应的文件描述符可以写;

    EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);

    EPOLLERR:表示对应的文件描述符发生错误;

    EPOLLHUP:表示对应的文件描述符被挂断;

    EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。

    EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

    3、int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);

    epoll_wait与select类似,等待事件的发生。events既是有IO发生的流对应的epoll event。maxevents表示监听的fd的最大个数。

    timeout:超时事件,0表示立即返回,-1表示永久阻塞。

    三、epoll使用

    下面以简单的echo服务器,介绍epoll的使用。

    echo服务器,接收client发送的字符串,然后原路返回。

      1 int main(void)
      2 {
      3     int listenfd = -1;
      4     int epollfd = -1;
      5     struct sockaddr_in svraddr;
      6     struct epoll_event events[EVENT_SIZE];
      7     int nready = 0;
      8     char buf[BUF_SIZE];
      9     int i = 0;
     10     int fd = 0;
     11 
     12     if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
     13     {
     14         perror("socket error");
     15         return -1;
     16     }
     17 
     18     memset(&svraddr, 0, sizeof(svraddr));
     19     svraddr.sin_family = AF_INET;
     20     svraddr.sin_port = htons(44888);
     21     svraddr.sin_addr.s_addr = htonl(INADDR_ANY);
     22 
     23     if (bind(listenfd, (struct sockaddr* )&svraddr, sizeof(svraddr)) < 0)
     24     {
     25         perror("bind error");
     26         return -1;
     27     }
     28 
     29     if (listen(listenfd, 5) < 0)
     30     {
     31         perror("listen error");
     32         return -1;
     33     }
     34         
     35     if ((epollfd = epoll_create(EVENT_SIZE)) < 0)
     36     {
     37         perror("bind error");
     38         return -1;
     39     }
     40     event_add(epollfd, listenfd, EPOLLIN);
     41 
     42     printf("listen for sockets ...
    ");
     43     while (1)
     44     {
     45         nready = epoll_wait(epollfd, events, EVENT_SIZE, -1);
     46         for (i = 0; i < nready; i++)
     47         {
     48             fd = events[i].data.fd;
     49             if ((fd == listenfd) && (events[i].events & EPOLLIN))
     50             {
     51                 do_accept(epollfd, listenfd);    
     52             }
     53             else if (events[i].events & EPOLLIN)
     54             {
     55                 do_read(epollfd, fd, buf, BUF_SIZE - 1);
     56             }
     57             else if (events[i].events & EPOLLOUT)
     58             {
     59                 do_write(epollfd, fd, buf, BUF_SIZE - 1);
     60             }
     61             else
     62             {
     63                 printf("unused fd %d, event %d", fd, events[i].events);
     64             }
     65         }
     66     }
     67 
     68     return 0;
     69 }
     70 
     71 int do_accept(int epollfd, int listenfd)
     72 {
     73     struct sockaddr_in cliaddr;
     74     int clifd = -1;
     75     socklen_t len = sizeof(cliaddr);
     76     if ((clifd = accept(listenfd, (struct sockaddr* )&cliaddr, &len)) < 0)
     77     {
     78         perror("accept error");
     79         return -1;
     80     }
     81 
     82     printf("accept client: <%s:%d>
    ", inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port);
     83     // add client in read list
     84     event_add(epollfd, clifd, EPOLLIN);
     85 }
     86 
     87 int do_read(int epollfd, int fd, char* buf, int maxsize)
     88 {
     89     memset(buf, 0, maxsize);
     90     int nread = read(fd, buf, maxsize);
     91     if (nread <= 0)
     92     {
     93         if (nread == 0)
     94         {
     95             printf("client %d close socket.
    ", fd);
     96         }
     97         else
     98         {
     99             printf("client %d read error
    ", fd);
    100         }
    101         close(fd);
    102         event_del(epollfd, fd, EPOLLIN);
    103         return nread;
    104     }
    105 
    106     printf("recv from client %d :%s", fd, buf);
    107     // set read to write
    108     event_mod(epollfd, fd, EPOLLOUT);
    109 }
    110 
    111 int do_write(int epollfd, int fd, char* buf, int maxsize)
    112 {
    113     int nread = write(fd, buf, maxsize);
    114     if (nread < 0)
    115     {
    116         printf("client %d write error
    ", fd);
    117         close(fd);
    118         event_del(epollfd, fd, EPOLLIN);
    119         return nread;
    120     }
    121 
    122     printf("send to client %d :%s", fd, buf);
    123     // set write to read
    124     event_mod(epollfd, fd, EPOLLIN);
    125 }

    echo client,从终端输入字符串,然后发送给server,并接收server返回的字符串,答应到终端。

      1 int main(void)
      2 {
      3     int sockfd = -1;
      4     int epollfd = -1;
      5     struct sockaddr_in svraddr;
      6     struct epoll_event events[EVENT_SIZE];
      7     int nready = 0;
      8     char buf[BUF_SIZE];
      9     int i = 0;
     10     int fd = 0;
     11 
     12     if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
     13     {
     14         perror("socket error");
     15         return -1;
     16     }
     17 
     18     memset(&svraddr, 0, sizeof(svraddr));
     19     svraddr.sin_family = AF_INET;
     20     svraddr.sin_port = htons(44888);
     21     inet_pton(AF_INET, "127.0.0.1", &svraddr.sin_addr);
     22 
     23     if (connect(sockfd, (struct sockaddr* )&svraddr, sizeof(svraddr)) < 0)
     24     {
     25         perror("connect error");
     26         return -1;
     27     }
     28     printf("connect to server: <127.0.0.1:44888>
    ");
     29 
     30     if ((epollfd = epoll_create(EVENT_SIZE)) < 0)
     31     {
     32         perror("bind error");
     33         return -1;
     34     }
     35     event_add(epollfd, STDIN_FILENO, EPOLLIN);
     36 
     37     while (1)
     38     {
     39         nready = epoll_wait(epollfd, events, EVENT_SIZE, -1);
     40         //printf("EVENT SIZE %d
    ", nready);
     41         for (i = 0; i < nready; i++)
     42         {
     43             fd = events[i].data.fd;
     44             if (events[i].events & EPOLLIN)
     45             {
     46                 //printf("IN EVENT %d:%d
    ", fd, EPOLLIN);
     47                 do_read(epollfd, fd, sockfd, buf, BUF_SIZE - 1);
     48             }
     49             else if (events[i].events & EPOLLOUT)
     50             {
     51                 //printf("OUT EVENT %d:%d
    ", fd, EPOLLOUT);
     52                 do_write(epollfd, fd, sockfd, buf, BUF_SIZE - 1);
     53             }
     54             else
     55             {
     56                 printf("unused fd %d, event %d", fd, events[i].events);
     57             }
     58         }
     59     }
     60 
     61     return 0;
     62 }
     63 
     64 int do_read(int epollfd, int fd, int sockfd, char* buf, int maxsize)
     65 {
     66     memset(buf, 0, maxsize);
     67     int nread = read(fd, buf, maxsize);
     68     if (nread <= 0)
     69     {
     70         if (nread == 0)
     71         {
     72             printf("client %d close socket.
    ", fd);
     73         }
     74         else
     75         {
     76             printf("client %d read error
    ", fd);
     77         }
     78         close(fd);
     79         event_del(epollfd, fd, EPOLLIN);
     80         return nread;
     81     }
     82 
     83     if (fd == STDIN_FILENO) // std input
     84     {
     85         printf("cli input %d: %s", fd, buf);
     86         event_add(epollfd, sockfd, EPOLLOUT);
     87     }
     88     else
     89     {
     90         printf("read from sock %d: %s", fd, buf);
     91         event_del(epollfd, sockfd, EPOLLIN);
     92         event_add(epollfd, STDOUT_FILENO, EPOLLOUT);
     93     }
     94 }
     95 
     96 int do_write(int epollfd, int fd, int sockfd, char* buf, int maxsize)
     97 {
     98     int nread = write(fd, buf, maxsize);
     99     if (nread < 0)
    100     {
    101         printf("client %d write error
    ", fd);
    102         close(fd);
    103         event_del(epollfd, fd, EPOLLIN);
    104         return nread;
    105     }
    106 
    107     if (fd == STDOUT_FILENO)
    108     {
    109         printf("write to stdin %d: %s", fd, buf);
    110         event_del(epollfd, fd, EPOLLOUT);
    111     }
    112     else
    113     {
    114         printf("write to sock %d: %s", fd, buf);
    115         event_mod(epollfd, fd, EPOLLIN);
    116     }
    117 }
  • 相关阅读:
    hihoCoder #1176 : 欧拉路·一 (简单)
    228 Summary Ranges 汇总区间
    227 Basic Calculator II 基本计算器II
    226 Invert Binary Tree 翻转二叉树
    225 Implement Stack using Queues 队列实现栈
    224 Basic Calculator 基本计算器
    223 Rectangle Area 矩形面积
    222 Count Complete Tree Nodes 完全二叉树的节点个数
    221 Maximal Square 最大正方形
    220 Contains Duplicate III 存在重复 III
  • 原文地址:https://www.cnblogs.com/ym65536/p/4854869.html
Copyright © 2011-2022 走看看