zoukankan      html  css  js  c++  java
  • select poll epoll Linux高并发网络编程模型

    0 发展历程

      同步阻塞迭代模型-->多进程并发模型-->多线程并发模型-->select-->poll-->epoll-->...

    1 同步阻塞迭代模型

    bind(srvfd);
    listen(srvfd);
    for(;;)
    {
        clifd = accept(srvfd,...); //开始接受客户端来的连接
        read(clifd,buf,...);       //从客户端读取数据
        dosomthingonbuf(buf);
        write(clifd,buf);          //发送数据到客户端
    }  

    缺点:

      1.如果没有客户端的连接请求,进程会阻塞在accept系统调用处,程序不能执行其他任何操作。系统调用使得程序从用户态陷入内核态 -- 程序员的自我修养 

      2.在与客户端建立好一条链路后,通过read系统调用从客户端接受数据,而客户端合适发送数据过来是不可控的。如果客户端迟迟不发生数据过来,则程序同样会阻塞在read调用,此时,如果另外的客户端来尝试连接时,都会失败。

      3.同样,write系统调用也会使得程序出现阻塞(例如:客户端接受数据异常缓慢,导致写缓冲区满,数据迟迟发送不出)。

    2 多进程并发模型

    bind(srvfd);
    listen(srvfd);
    for(;;){
    clifd = accept(srvfd,...);      //开始接受客户端来的连接
    ret = fork();            //创建子进程
    switch( ret )
    {
        case -1 :
            do_err_handler();
            break;
        case 0:             // 子进程
            client_handler(clifd);
            break ;
        default :            // 父进程
            close(clifd);
            continue ;
    }
    }
    void client_handler(clifd)
    {
        read(clifd,buf,...);     //从客户端读取数据
        dosomthingonbuf(buf);
        write(clifd,buf);      //发送数据到客户端
    }

    优点:通过多进程,解决了同步阻塞问题。

    缺点:每一个客户端连接开启fork一个进程,即使linux中引入了写实拷贝机制,降低了fork一个子进程的消耗,但若客户端连接较大,则系统依然将不堪负重。  

    3 多线程并发模型

    void *thread_callback( void *args )        //线程回调函数
    {
        int clifd = *(int *)args ;
        client_handler(clifd);
    }
    
    void client_handler(clifd)
    {
        read(clifd,buf,...);              //从客户端读取数据
        dosomthingonbuf(buf);
        write(clifd,buf);                //发送数据到客户端
    }
    bind(srvfd); listen(srvfd); for(;;) { clifd = accept(); pthread_create(...,thread_callback,&clifd);//创建新线程并绑定回调函数、文件描述符 }

    多线程的实现方式:

    (1)按需生成(来一个连接生成一个线程) 
    (2)线程池(预先生成很多线程) 
    (3)Leader follower(LF)

    服务端分为主线程和工作线程,主线程负责accept()连接,而工作线程负责处理业务逻辑和流的读取等。 
    因此,即使在工作线程阻塞的情况下,也只是阻塞在线程范围内,对继续接受新的客户端连接不会有影响。 
    通过线程池的引入可以避免频繁的创建、销毁线程,能在很大程序上提升性能。 
    但不管如何实现,多线程模型先天具有缺点。

    缺点: 
      1.稳定性相对较差。一个线程的崩溃会导致整个程序崩溃。 
      2.临界资源的访问控制,在加大程序复杂性的同时,锁机制的引入会是严重降低程序的性能、死锁等情况。  

    4 select

    bind(listenfd);
    listen(listenfd);
    FD_ZERO(&allset);          /*初始清空、并添加绑定文件描述符*/
    FD_SET(listenfd, &allset);
    for(;;)
    {
        select(...);
        if (FD_ISSET(listenfd, &rset)) 
        {                 /*有新的客户端连接到来*/
            clifd = accept();
            cliarray[] = clifd;    /*保存新的连接套接字*/
            FD_SET(clifd, &allset);  /*将新的描述符加入监听数组中*/
        }
    
        for(;;)
        {                 /*这个循环检查所有已经连接的客户端是否有数据可读写*/
    
            fd = cliarray[i];
            if (FD_ISSET(fd , &rset))
                dosomething();
        }
    }
    

    对于多进程模型和多线程模型,每个进程/线程只能处理一路IO,在服务器并发数较高的情况下,过多的进程/线程会使得服务器性能下降。

    通过多路IO复用,能使得一个进程同时处理多路IO,提升服务器吞吐量。

    在Linux支持epoll模型之前,都使用select/poll模型来实现IO多路复用

    select IO多路复用的缺点:

      1.单进程能够监视的文件描述符数量存在最大限制( __FD_SETSIZE 1024),可以更改数量,因select采用轮询的方式扫描文件描述符,文件描述符数量越多性能越差
      2.内核态 / 用户态内存拷贝问题,select需要复制大量的句柄数据结构,产生巨大的开销; 
      3.select返回整个句柄数组应用程序需遍历才能发现哪些句柄发生了事件; 
      4.select水平触发方式是,应用程序如果没有完成对一个已经就绪文件描述符进行IO操作,那么之后每次select调用还是会将这些文件描述符通知进程,即重复

    假设我们的服务器需要支持100万的并发连接,则在__FD_SETSIZE 为1024的情况下,则我们至少需要开辟1k个进程才能实现100万的并发连接。除了进程间上下文切换的时间消耗外,从内核/用户空间大量的无脑内存拷贝、数组轮询等,是系统难以承受的。因此,基于select模型的服务器程序,要达到10万级别的并发访问,是一个很难完成的任务。

    5 poll

    相比select模型,poll使用链表保存文件描述符,因此没有了监视文件数量的限制,但其它selec的t缺点依然存在。

    6 epoll

    epoll,select/poll调用分成3个部分去实现:

      1.调用epoll_create()建立一个epoll对象(在epoll文件系统中为这个句柄对象分配资源)

      2.调用epoll_ctl向epoll对象中添加连接的套接字

      3.调用epoll_wait收集发生的所监听事件的连接

    因此,在进程启动时建立一个epoll对象,然后在需要的时候向这个epoll对象中添加或者删除连接。同时,epoll_wait的效率也非常高,因为调用epoll_wait时,并没有一股脑的向操作系统复制全部连接的句柄数据,内核也不需要去遍历全部的连接。

    Linux内核针对3部分具体的epoll机制实现思路:

      首先,进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关。eventpoll结构体如下:

    struct eventpoll{
        ....
        /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
        struct rb_root  rbr;
        /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
        struct list_head rdlist;
        ....
    };
    

      每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件。这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的高度)。

    而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当相应的事件发生时会调用这个回调方法。这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中。

      在epoll中,对于每一个事件,都会建立一个epitem结构体,如下:

    struct epitem{
        struct rb_node  rbn;      //红黑树节点
        struct list_head  rdllink;  //双向链表节点
        struct epoll_filefd  ffd;    //事件句柄信息
        struct eventpoll *ep;       //指向其所属的eventpoll对象
        struct epoll_event event;    //期待发生的事件类型
    }
    

      当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可。如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户。

      通过红黑树和双链表数据结构,并结合回调机制,造就了epoll的高效。

      总结来说:epoll三步曲。

        第一步:epoll_create()系统调用。此调用返回一个句柄,之后所有的使用都依靠这个句柄来标识。

        第二步:epoll_ctl()系统调用。通过此调用向epoll对象中添加、删除、修改感兴趣的事件,返回0标识成功,返回-1表示失败。

        第三部:epoll_wait()系统调用。通过此调用收集收集在epoll监控中已经发生的事件。

    epoll编程实例

    //   
    // a simple echo server using epoll in linux  
    //   
    // 2009-11-05  
    // 2013-03-22:修改了几个问题,1是/n格式问题,2是去掉了原代码不小心加上的ET模式;
    // 本来只是简单的示意程序,决定还是加上 recv/send时的buffer偏移
    // by sparkling  
    //   
    #include <sys/socket.h>  
    #include <sys/epoll.h>  
    #include <netinet/in.h>  
    #include <arpa/inet.h>  
    #include <fcntl.h>  
    #include <unistd.h>  
    #include <stdio.h>  
    #include <errno.h>  
    #include <iostream>  
    using namespace std;  
    #define MAX_EVENTS 500  
    struct myevent_s  
    {  
        int fd;  
        void (*call_back)(int fd, int events, void *arg);  
        int events;  
        void *arg;  
        int status; // 1: in epoll wait list, 0 not in  
        char buff[128]; // recv data buffer  
        int len, s_offset;  
        long last_active; // last active time  
    };  
    // set event  
    void EventSet(myevent_s *ev, int fd, void (*call_back)(int, int, void*), void *arg)  
    {  
        ev->fd = fd;  
        ev->call_back = call_back;  
        ev->events = 0;  
        ev->arg = arg;  
        ev->status = 0;
        bzero(ev->buff, sizeof(ev->buff));
        ev->s_offset = 0;  
        ev->len = 0;
        ev->last_active = time(NULL);  
    }  
    // add/mod an event to epoll  
    void EventAdd(int epollFd, int events, myevent_s *ev)  
    {  
        struct epoll_event epv = {0, {0}};  
        int op;  
        epv.data.ptr = ev;  
        epv.events = ev->events = events;  
        if(ev->status == 1){  
            op = EPOLL_CTL_MOD;  
        }  
        else{  
            op = EPOLL_CTL_ADD;  
            ev->status = 1;  
        }  
        if(epoll_ctl(epollFd, op, ev->fd, &epv) < 0)  
            printf("Event Add failed[fd=%d], evnets[%d]
    ", ev->fd, events);  
        else  
            printf("Event Add OK[fd=%d], op=%d, evnets[%0X]
    ", ev->fd, op, events);  
    }  
    // delete an event from epoll  
    void EventDel(int epollFd, myevent_s *ev)  
    {  
        struct epoll_event epv = {0, {0}};  
        if(ev->status != 1) return;  
        epv.data.ptr = ev;  
        ev->status = 0;
        epoll_ctl(epollFd, EPOLL_CTL_DEL, ev->fd, &epv);  
    }  
    int g_epollFd;  
    myevent_s g_Events[MAX_EVENTS+1]; // g_Events[MAX_EVENTS] is used by listen fd  
    void RecvData(int fd, int events, void *arg);  
    void SendData(int fd, int events, void *arg);  
    // accept new connections from clients  
    void AcceptConn(int fd, int events, void *arg)  
    {  
        struct sockaddr_in sin;  
        socklen_t len = sizeof(struct sockaddr_in);  
        int nfd, i;  
        // accept  
        if((nfd = accept(fd, (struct sockaddr*)&sin, &len)) == -1)  
        {  
            if(errno != EAGAIN && errno != EINTR)  
            {  
            }
            printf("%s: accept, %d", __func__, errno);  
            return;  
        }  
        do  
        {  
            for(i = 0; i < MAX_EVENTS; i++)  
            {  
                if(g_Events[i].status == 0)  
                {  
                    break;  
                }  
            }  
            if(i == MAX_EVENTS)  
            {  
                printf("%s:max connection limit[%d].", __func__, MAX_EVENTS);  
                break;  
            }  
            // set nonblocking
            int iret = 0;
            if((iret = fcntl(nfd, F_SETFL, O_NONBLOCK)) < 0)
            {
                printf("%s: fcntl nonblocking failed:%d", __func__, iret);
                break;
            }
            // add a read event for receive data  
            EventSet(&g_Events[i], nfd, RecvData, &g_Events[i]);  
            EventAdd(g_epollFd, EPOLLIN, &g_Events[i]);  
        }while(0);  
        printf("new conn[%s:%d][time:%d], pos[%d]
    ", inet_ntoa(sin.sin_addr),
                ntohs(sin.sin_port), g_Events[i].last_active, i);  
    }  
    // receive data  
    void RecvData(int fd, int events, void *arg)  
    {  
        struct myevent_s *ev = (struct myevent_s*)arg;  
        int len;  
        // receive data
        len = recv(fd, ev->buff+ev->len, sizeof(ev->buff)-1-ev->len, 0);    
        EventDel(g_epollFd, ev);
        if(len > 0)
        {
            ev->len += len;
            ev->buff[len] = '';  
            printf("C[%d]:%s
    ", fd, ev->buff);  
            // change to send event  
            EventSet(ev, fd, SendData, ev);  
            EventAdd(g_epollFd, EPOLLOUT, ev);  
        }  
        else if(len == 0)  
        {  
            close(ev->fd);  
            printf("[fd=%d] pos[%d], closed gracefully.
    ", fd, ev-g_Events);  
        }  
        else  
        {  
            close(ev->fd);  
            printf("recv[fd=%d] error[%d]:%s
    ", fd, errno, strerror(errno));  
        }  
    }  
    // send data  
    void SendData(int fd, int events, void *arg)  
    {  
        struct myevent_s *ev = (struct myevent_s*)arg;  
        int len;  
        // send data  
        len = send(fd, ev->buff + ev->s_offset, ev->len - ev->s_offset, 0);
        if(len > 0)  
        {
            printf("send[fd=%d], [%d<->%d]%s
    ", fd, len, ev->len, ev->buff);
            ev->s_offset += len;
            if(ev->s_offset == ev->len)
            {
                // change to receive event
                EventDel(g_epollFd, ev);  
                EventSet(ev, fd, RecvData, ev);  
                EventAdd(g_epollFd, EPOLLIN, ev);  
            }
        }  
        else  
        {  
            close(ev->fd);  
            EventDel(g_epollFd, ev);  
            printf("send[fd=%d] error[%d]
    ", fd, errno);  
        }  
    }  
    void InitListenSocket(int epollFd, short port)  
    {  
        int listenFd = socket(AF_INET, SOCK_STREAM, 0);  
        fcntl(listenFd, F_SETFL, O_NONBLOCK); // set non-blocking  
        printf("server listen fd=%d
    ", listenFd);  
        EventSet(&g_Events[MAX_EVENTS], listenFd, AcceptConn, &g_Events[MAX_EVENTS]);  
        // add listen socket  
        EventAdd(epollFd, EPOLLIN, &g_Events[MAX_EVENTS]);  
        // bind & listen  
        sockaddr_in sin;  
        bzero(&sin, sizeof(sin));  
        sin.sin_family = AF_INET;  
        sin.sin_addr.s_addr = INADDR_ANY;  
        sin.sin_port = htons(port);  
        bind(listenFd, (const sockaddr*)&sin, sizeof(sin));  
        listen(listenFd, 5);  
    }  
    int main(int argc, char **argv)  
    {  
        unsigned short port = 12345; // default port  
        if(argc == 2){  
            port = atoi(argv[1]);  
        }  
        // create epoll  
        g_epollFd = epoll_create(MAX_EVENTS);  
        if(g_epollFd <= 0) printf("create epoll failed.%d
    ", g_epollFd);  
        // create & bind listen socket, and add to epoll, set non-blocking  
        InitListenSocket(g_epollFd, port);  
        // event loop  
        struct epoll_event events[MAX_EVENTS];  
        printf("server running:port[%d]
    ", port);  
        int checkPos = 0;  
        while(1){  
            // a simple timeout check here, every time 100, better to use a mini-heap, and add timer event  
            long now = time(NULL);  
            for(int i = 0; i < 100; i++, checkPos++) // doesn't check listen fd  
            {  
                if(checkPos == MAX_EVENTS) checkPos = 0; // recycle  
                if(g_Events[checkPos].status != 1) continue;  
                long duration = now - g_Events[checkPos].last_active;  
                if(duration >= 60) // 60s timeout  
                {  
                    close(g_Events[checkPos].fd);  
                    printf("[fd=%d] timeout[%d--%d].
    ", g_Events[checkPos].fd, g_Events[checkPos].last_active, now);  
                    EventDel(g_epollFd, &g_Events[checkPos]);  
                }  
            }  
            // wait for events to happen  
            int fds = epoll_wait(g_epollFd, events, MAX_EVENTS, 1000);  
            if(fds < 0){  
                printf("epoll_wait error, exit
    ");  
                break;  
            }  
            for(int i = 0; i < fds; i++){  
                myevent_s *ev = (struct myevent_s*)events[i].data.ptr;  
                if((events[i].events&EPOLLIN)&&(ev->events&EPOLLIN)) // read event  
                {  
                    ev->call_back(ev->fd, events[i].events, ev->arg);  
                }  
                if((events[i].events&EPOLLOUT)&&(ev->events&EPOLLOUT)) // write event  
                {  
                    ev->call_back(ev->fd, events[i].events, ev->arg);  
                }  
            }  
        }  
        // free resource  
        return 0;  
    }  
    

      

    触发方式

    LT(Level_triggered 水平触发 ):是epoll缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,直至变为未就绪状态,也就是epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll_wait()时,它还会通知你在上次没读写完的文件描述符上继续读写所以,这种模式编程出错误可能性要小一点。传统的select/poll都是只有这种触发方式。 
    ET (Edge_triggered边缘触发 ):是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll_wait()通知处理程序去读写,如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait()时,它不会通知你,然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你。 

      

     

  • 相关阅读:
    PAT A1094 The Largest Generation (25 分)——树的bfs遍历
    PAT A1055 The World's Richest (25 分)——排序
    PAT A1052 Linked List Sorting (25 分)——链表,排序
    PAT A1076 Forwards on Weibo (30 分)——图的bfs
    辅导员
    辅导员面试
    C程序设计
    Excel VBA 基本概念
    Excel函数
    导入excel表的数据到数据库ssh
  • 原文地址:https://www.cnblogs.com/iTlijun/p/9399579.html
Copyright © 2011-2022 走看看