zoukankan      html  css  js  c++  java
  • IO多路复用:select、poll、epoll示例

    一、IO多路复用

    所谓IO多路复用,就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。

    Linux支持IO多路复用的系统调用有select、poll、epoll,这些调用都是内核级别的。但select、poll、epoll本质上都是同步I/O,先是block住等待就绪的socket,再是block住将数据从内核拷贝到用户内存。

    当然select、poll、epoll之间也是有区别的,如下表:

    select poll epoll
    操作方式 遍历 遍历 回调
    底层实现 数组 链表 哈希表
    IO效率 每次调用都进行线性遍历,时间复杂度为O(n) 每次调用都进行线性遍历,时间复杂度为O(n) 事件通知方式,每当fd就绪,系统注册的回调函数就会被调用,将就绪fd放到rdllist里面。时间复杂度O(1)
    最大连接数 1024(x86)或 2048(x64) 无上限 无上限
    fd拷贝 每次调用select,都需要把fd集合从用户态拷贝到内核态 每次调用poll,都需要把fd集合从用户态拷贝到内核态 调用epoll_ctl时拷贝进内核并保存,之后每次epoll_wait不拷贝


    二、select示例

    2.1 流程图

    注:摘自IBM iSeries 信息中心

    2.2 相关函数

    #include <sys/select.h>
    #include <sys/time.h>
    
    int select(int max_fd, fd_set *readset, fd_set *writeset, fd_set *exceptset, struct timeval *timeout)
    1. 该select()函数返回就绪描述符的数目,超时返回0,出错返回-1

    2. 第一个参数max_fd指待测试的fd个数,它的值是待测试的最大文件描述符加1,文件描述符从0开始到max_fd-1都将被测试。

    3. 中间三个参数readset、writeset和exceptset指定要让内核测试读、写和异常条件的fd集合,如果不需要测试可以设置为NULL。操作fd_set有四个宏:

      • void FD_ZERO(fd_set *fdset):清空集合
      • void FD_SET(int fd, fd_set *fdset):将一个给定的文件描述符加入集合之中
      • void FD_CLR(int fd, fd_set *fdset):将一个给定的文件描述符从集合中删除
      • int FD_ISSET(int fd, fd_set *fdset):判断指定描述符是否在集合中
    4. timeout是指 select 的等待时长,如果这段时间内所监听的 socket 没有事件就绪,超时返回。

    2.3 示例程序

    这里写一个程序,Client向Server发送消息,Server接收消息并原样发送给Client,Client再把消息输出到终端。

    /*************************************************************************
        > File Name: server.cpp
        > Author: SongLee
        > E-mail: lisong.shine@qq.com
        > Created Time: 2016年04月28日 星期四 22时02分43秒
        > Personal Blog: http://songlee24.github.io/
     ************************************************************************/
    #include<netinet/in.h>   // sockaddr_in
    #include<sys/types.h>    // socket
    #include<sys/socket.h>   // socket
    #include<arpa/inet.h>
    #include<unistd.h>
    #include<sys/select.h>   // select
    #include<sys/ioctl.h>
    #include<sys/time.h>
    #include<iostream>
    #include<vector>
    #include<string>
    #include<cstdlib>
    #include<cstdio>
    #include<cstring>
    using namespace std;
    #define BUFFER_SIZE 1024
    
    struct PACKET_HEAD
    {
        int length;
    };
    
    class Server
    {
    private:
        struct sockaddr_in server_addr;
        socklen_t server_addr_len;
        int listen_fd;    // 监听的fd
        int max_fd;       // 最大的fd
        fd_set master_set;   // 所有fd集合,包括监听fd和客户端fd   
        fd_set working_set;  // 工作集合
        struct timeval timeout; 
    public:
        Server(int port);
        ~Server();
        void Bind();
        void Listen(int queue_len = 20);
        void Accept();
        void Run();
        void Recv(int nums);
    };
    
    Server::Server(int port)
    {
        bzero(&server_addr, sizeof(server_addr));
        server_addr.sin_family = AF_INET;
        server_addr.sin_addr.s_addr = htons(INADDR_ANY);
        server_addr.sin_port = htons(port);
        // create socket to listen
        listen_fd = socket(PF_INET, SOCK_STREAM, 0);
        if(listen_fd < 0)
        {
            cout << "Create Socket Failed!";
            exit(1);
        }
        int opt = 1;
        setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    }
    
    Server::~Server()
    {
        for(int fd=0; fd<=max_fd; ++fd)
        {
            if(FD_ISSET(fd, &master_set))
            {
                close(fd);
            }
        }
    }
    
    void Server::Bind()
    {
        if(-1 == (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr))))
        {
            cout << "Server Bind Failed!";
            exit(1);
        }
        cout << "Bind Successfully.
    "; 
    }
    
    void Server::Listen(int queue_len)
    {
        if(-1 == listen(listen_fd, queue_len))
        {
            cout << "Server Listen Failed!";
            exit(1);
        }
        cout << "Listen Successfully.
    ";
    }
    
    void Server::Accept()
    {
        struct sockaddr_in client_addr;
        socklen_t client_addr_len = sizeof(client_addr);
    
        int new_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &client_addr_len);
        if(new_fd < 0)
        {
            cout << "Server Accept Failed!";
            exit(1);
        }
    
        cout << "new connection was accepted.
    ";
        // 将新建立的连接的fd加入master_set
        FD_SET(new_fd, &master_set);
        if(new_fd > max_fd)
        {
            max_fd = new_fd;
        }
    }   
    
    void Server::Run()
    {
        max_fd = listen_fd;   // 初始化max_fd
        FD_ZERO(&master_set);
        FD_SET(listen_fd, &master_set);  // 添加监听fd
    
        while(1)
        {
            FD_ZERO(&working_set);
            memcpy(&working_set, &master_set, sizeof(master_set));
    
            timeout.tv_sec = 30;
            timeout.tv_usec = 0;
    
            int nums = select(max_fd+1, &working_set, NULL, NULL, &timeout);
            if(nums < 0)
            {
                cout << "select() error!";
                exit(1);
            }
    
            if(nums == 0)
            {
                //cout << "select() is timeout!";
                continue;
            }
    
            if(FD_ISSET(listen_fd, &working_set))
                Accept();   // 有新的客户端请求
            else
                Recv(nums); // 接收客户端的消息
        }
    }
    
    void Server::Recv(int nums)
    {
        for(int fd=0; fd<=max_fd; ++fd)
        {
            if(FD_ISSET(fd, &working_set))
            {
                bool close_conn = false;  // 标记当前连接是否断开了
    
                PACKET_HEAD head;
                recv(fd, &head, sizeof(head), 0);   // 先接受包头,即数据总长度
    
                char* buffer = new char[head.length];
                bzero(buffer, head.length);
                int total = 0;
                while(total < head.length)
                {
                    int len = recv(fd, buffer + total, head.length - total, 0);
                    if(len < 0)
                    {
                        cout << "recv() error!";
                        close_conn = true;
                        break;
                    }
                    total = total + len;
                }
    
                if(total == head.length)  // 将收到的消息原样发回给客户端
                {
                    int ret1 = send(fd, &head, sizeof(head), 0);
                    int ret2 = send(fd, buffer, head.length, 0);
                    if(ret1 < 0 || ret2 < 0)
                    {
                        cout << "send() error!";
                        close_conn = true;
                    }
                }
    
                delete buffer;
    
                if(close_conn)  // 当前这个连接有问题,关闭它
                {
                    close(fd);
                    FD_CLR(fd, &master_set);
                    if(fd == max_fd)  // 需要更新max_fd;
                    {
                        while(FD_ISSET(max_fd, &master_set) == false)
                            --max_fd;
                    }
                }
            }
        }   
    }
    
    int main()
    {
        Server server(15000);
        server.Bind();
        server.Listen();
        server.Run();
        return 0;
    }


    /*************************************************************************
        > File Name: client.cpp
        > Author: SongLee
        > E-mail: lisong.shine@qq.com
        > Created Time: 2016年04月28日 星期四 23时10分15秒
        > Personal Blog: http://songlee24.github.io/
     ************************************************************************/
    #include<netinet/in.h>   // sockaddr_in
    #include<sys/types.h>    // socket
    #include<sys/socket.h>   // socket
    #include<arpa/inet.h>
    #include<sys/ioctl.h>
    #include<unistd.h>
    #include<iostream>
    #include<string>
    #include<cstdlib>
    #include<cstdio>
    #include<cstring>
    using namespace std;
    #define BUFFER_SIZE 1024
    
    struct PACKET_HEAD
    {
        int length;
    };
    
    class Client 
    {
    private:
        struct sockaddr_in server_addr;
        socklen_t server_addr_len;
        int fd;
    public:
        Client(string ip, int port);
        ~Client();
        void Connect();
        void Send(string str);
        string Recv();
    };
    
    Client::Client(string ip, int port)
    {
        bzero(&server_addr, sizeof(server_addr));
        server_addr.sin_family = AF_INET;
        if(inet_pton(AF_INET, ip.c_str(), &server_addr.sin_addr) == 0)
        {
            cout << "Server IP Address Error!";
            exit(1);
        }
        server_addr.sin_port = htons(port);
        server_addr_len = sizeof(server_addr);
        // create socket
        fd = socket(AF_INET, SOCK_STREAM, 0);
        if(fd < 0)
        {
            cout << "Create Socket Failed!";
            exit(1);
        }
    }
    
    Client::~Client()
    {
        close(fd);
    }
    
    void Client::Connect()
    {
        cout << "Connecting......" << endl;
        if(connect(fd, (struct sockaddr*)&server_addr, server_addr_len) < 0)
        {
            cout << "Can not Connect to Server IP!";
            exit(1);
        }
        cout << "Connect to Server successfully." << endl;
    }
    
    void Client::Send(string str)
    {
        PACKET_HEAD head;
        head.length = str.size()+1;   // 注意这里需要+1
        int ret1 = send(fd, &head, sizeof(head), 0);
        int ret2 = send(fd, str.c_str(), head.length, 0);
        if(ret1 < 0 || ret2 < 0)
        {
            cout << "Send Message Failed!";
            exit(1);
        }
    }
    
    string Client::Recv()
    {
        PACKET_HEAD head;
        recv(fd, &head, sizeof(head), 0);
    
        char* buffer = new char[head.length];
        bzero(buffer, head.length);
        int total = 0;
        while(total < head.length)
        {
            int len = recv(fd, buffer + total, head.length - total, 0);
            if(len < 0)
            {
                cout << "recv() error!";
                break;
            }
            total = total + len;
        }
        string result(buffer);
        delete buffer;
        return result;
    }
    
    int main()
    {
        Client client("127.0.0.1", 15000);
        client.Connect();
        while(1)
        {
            string msg;
            getline(cin, msg);
            if(msg == "exit")
                break;
            client.Send(msg);
            cout << client.Recv() << endl;  
        }
        return 0;
    }

    对上述程序的一些说明:

    • 监听socket也由select来轮询,不需要单独的线程;
    • working_set每次都要重新设置,因为select调用后它所检测的集合working_set会被修改;
    • 接收很长一段数据时,需要循环多次recv。但是recv函数会阻塞,可以通过自定义包头(保存数据长度)。


    三、poll示例

    3.1 基本原理

    poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态,如果设备就绪则在设备等待队列中加入一项并继续遍历,如果遍历完所有fd后没有发现就绪设备,则挂起当前进程,直到设备就绪或者主动超时,被唤醒后它又要再次遍历fd。这个过程经历了多次无谓的遍历。

    它没有最大连接数的限制,原因是它是基于链表来存储的,但是同样有一个缺点:

    • 大量的fd的数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义。
    • poll还有一个特点是“水平触发”,如果报告了fd后,没有被处理,那么下次poll时会再次报告该fd。

    从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。

    3.2 相关函数

    原型:

    #include <poll.h>
    int poll(struct pollfd fds[], nfds_t nfds, int timeout);

    参数描述:

    1. 该poll()函数返回fds集合中就绪的读、写,或出错的描述符数量,返回0表示超时,返回-1表示出错;
    2. fds是一个struct pollfd类型的数组,用于存放需要检测其状态的socket描述符,并且调用poll函数之后fds数组不会被清空;
    3. nfds:记录数组fds中描述符的总数量;
    4. timeout:调用poll函数阻塞的超时时间,单位毫秒;

    其中pollfd结构体定义如下:

    typedef struct pollfd {
            int fd;                         /* 需要被检测或选择的文件描述符*/
            short events;                   /* 对文件描述符fd上感兴趣的事件 */
            short revents;                  /* 文件描述符fd上当前实际发生的事件*/
    } pollfd_t;

    一个pollfd结构体表示一个被监视的文件描述符,通过传递fds[]指示 poll() 监视多个文件描述符,其中:

    • 结构体的events域是监视该文件描述符的事件掩码,由用户来设置这个域。
    • 结构体的revents域是文件描述符的操作结果事件掩码,内核在调用返回时设置这个域。

    events域中请求的任何事件都可能在revents域中返回。合法的事件如下:

    常量 说明
    POLLIN 普通或优先级带数据可读
    POLLRDNORM 普通数据可读
    POLLRDBAND 优先级带数据可读
    POLLPRI 高优先级数据可读
    POLLOUT 普通数据可写
    POLLWRNORM 普通数据可写
    POLLWRBAND 优先级带数据可写
    POLLERR 发生错误
    POLLHUP 发生挂起
    POLLNVAL 描述字不是一个打开的文件

    当需要监听多个事件时,使用POLLIN | POLLRDNORM设置 events 域;当poll调用之后检测某事件是否就绪时,fds[i].revents & POLLIN进行判断。

    3.3 示例程序

    这里写一个程序,Client向Server发送消息,Server接收消息并原样发送给Client,Client再把消息输出到终端。

    #include<netinet/in.h>   // sockaddr_in
    #include<sys/types.h>    // socket
    #include<sys/socket.h>   // socket
    #include<arpa/inet.h>
    #include<unistd.h>
    #include<poll.h>     // poll 
    #include<sys/ioctl.h>
    #include<sys/time.h>
    #include<iostream>
    #include<vector>
    #include<string>
    #include<cstdlib>
    #include<cstdio>
    #include<cstring>
    using namespace std;
    #define BUFFER_SIZE 1024
    #define MAX_FD 1000
    
    struct PACKET_HEAD
    {
        int length;
    };
    
    class Server
    {
    private:
        struct sockaddr_in server_addr;
        socklen_t server_addr_len;
        int listen_fd;    // 监听的fd
        struct pollfd fds[MAX_FD];    // fd数组,大小为1000
        int nfds;
    public:
        Server(int port);
        ~Server();
        void Bind();
        void Listen(int queue_len = 20);
        void Accept();
        void Run();
        void Recv();
    };
    
    Server::Server(int port)
    {
        bzero(&server_addr, sizeof(server_addr));
        server_addr.sin_family = AF_INET;
        server_addr.sin_addr.s_addr = htons(INADDR_ANY);
        server_addr.sin_port = htons(port);
        // create socket to listen
        listen_fd = socket(PF_INET, SOCK_STREAM, 0);
        if(listen_fd < 0)
        {
            cout << "Create Socket Failed!";
            exit(1);
        }
        int opt = 1;
        setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    }
    
    Server::~Server()
    {
        for(int i=0; i<MAX_FD; ++i)
        {
            if(fds[i].fd >=0)
            {
                close(fds[i].fd);
            }
        }
    }
    
    void Server::Bind()
    {
        if(-1 == (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr))))
        {
            cout << "Server Bind Failed!";
            exit(1);
        }
        cout << "Bind Successfully.
    "; 
    }
    
    void Server::Listen(int queue_len)
    {
        if(-1 == listen(listen_fd, queue_len))
        {
            cout << "Server Listen Failed!";
            exit(1);
        }
        cout << "Listen Successfully.
    ";
    }
    
    void Server::Accept()
    {
        struct sockaddr_in client_addr;
        socklen_t client_addr_len = sizeof(client_addr);
    
        int new_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &client_addr_len);
        if(new_fd < 0)
        {
            cout << "Server Accept Failed!";
            exit(1);
        }
    
        cout << "new connection was accepted.
    ";
        // 将新建立的连接的fd加入fds[]
        int i;
        for(i=1; i<MAX_FD; ++i)
        {
            if(fds[i].fd < 0)
            {
                fds[i].fd = new_fd;
                break;
            }
        }
        // 超过最大连接数
        if(i == MAX_FD)
        {
            cout << "Too many clients.
    ";
            exit(1);
        }
    
        fds[i].events = POLLIN;      // 设置新描述符的读事件
        nfds = i > nfds ? i : nfds;  // 更新连接数
    }   
    
    void Server::Run()
    {
        fds[0].fd = listen_fd;        // 添加监听描述符
        fds[0].events = POLLIN;
        nfds = 0;
    
        for(int i=1; i<MAX_FD; ++i)
            fds[i].fd = -1;
    
        while(1)
        {
            int nums = poll(fds, nfds+1, -1);
            if(nums < 0)
            {
                cout << "poll() error!";
                exit(1);
            }
    
            if(nums == 0)
            {
                continue;
            }
    
            if(fds[0].revents & POLLIN)
                Accept();   // 有新的客户端请求
            else
                Recv();
        }
    }
    
    void Server::Recv()
    {
        for(int i=1; i<MAX_FD; ++i)
        {
            if(fds[i].fd < 0)
                continue;
            if(fds[i].revents & POLLIN)       // 读就绪
            {
                int fd = fds[i].fd;
                bool close_conn = false;  // 标记当前连接是否断开了
    
                PACKET_HEAD head;
                recv(fd, &head, sizeof(head), 0);   // 先接受包头,即数据总长度
    
                char* buffer = new char[head.length];
                bzero(buffer, head.length);
                int total = 0;
                while(total < head.length)
                {
                    int len = recv(fd, buffer + total, head.length - total, 0);
                    if(len < 0)
                    {
                        cout << "recv() error!";
                        close_conn = true;
                        break;
                    }
                    total = total + len;
                }
    
                if(total == head.length)  // 将收到的消息原样发回给客户端
                {
                    int ret1 = send(fd, &head, sizeof(head), 0);
                    int ret2 = send(fd, buffer, head.length, 0);
                    if(ret1 < 0 || ret2 < 0)
                    {
                        cout << "send() error!";
                        close_conn = true;
                    }
                }
    
                delete buffer;
    
                if(close_conn)  // 当前这个连接有问题,关闭它
                {
                    close(fd);
                    fds[i].fd = -1;
                }
            }
        }   
    }
    
    int main()
    {
        Server server(15000);
        server.Bind();
        server.Listen();
        server.Run();
        return 0;
    }

    客户端程序同上。


    四、epoll示例

    4.1 基本原理

    epoll是在2.6内核中提出的,相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

    epoll使用“事件”的就绪通知方式,通过epoll_ctl注册fd,一旦该fd就绪,内核就会采用类似callback的回调机制来激活该fd,epoll_wait便可以收到通知。epoll的优点在于:

    1. 没有最大并发连接的限制,能打开的fd上限远大于1024(1G的内存上能监听约10万个端口)
    2. 采用回调的方式,效率提升。只有活跃可用的fd才会调用callback函数,也就是说 epoll 只管你“活跃”的连接,而跟连接总数无关,因此在实际的网络环境中,epoll的效率就会远远高于select和poll。
    3. 内存拷贝,利用mmap()文件映射内存加速与内核空间的消息传递;即epoll使用mmap减少复制开销。

    epoll对文件描述符的操作有两种模式:LT(level trigger,水平触发)和ET(edge trigger,边缘触发)。二者的区别如下:

    • 水平触发:默认工作模式,即当epoll_wait检测到某描述符事件就绪并通知应用程序时,应用程序可以不立即处理该事件;下次调用epoll_wait时,会再次通知此事件。

    • 边缘触发:当epoll_wait检测到某描述符事件就绪并通知应用程序时,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次通知此事件。(直到你做了某些操作导致该描述符变成未就绪状态了,也就是说边缘触发只在状态由未就绪变为就绪时通知一次)

    边缘触发(ET模式)在很大程度上减少了epoll事件被重复触发的次数,因此效率要比LT模式高。epoll工作在ET模式的时候,必须使用非阻塞socket,以避免由于一个文件描述符的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。

    4.2 相关函数

    epoll操作过程涉及三个函数:

    #include <sys/epoll.h>
    int epoll_create(int size);
    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

    1) epoll_create函数用来创建一个epoll句柄,参数size用来告诉内核要监听的数目一共有多少个。

    • 成功时返回一个文件描述符,表示epoll句柄(最后也需要close关闭)
    • 失败时返回-1

    2)epoll_ctl函数用于注册要监听的事件类型,它有四个参数:

    • 第一个参数 epfd 表示epoll句柄,即epoll_create()的返回值;
    • 第二个参数表示对fd的操作类型,
      • EPOLL_CTL_ADD(注册新的fd到epfd中)
      • EPOLL_CTL_MOD(修改已注册的fd的监听事件)
      • EPOLL_CTL_DEL(从epfd中删除一个fd)
    • 第三个参数是需要监听的fd
    • 第四个参数是告诉内核需要监听什么事件

    其中struct epoll_event结构体定义如下:

    struct epoll_event {
        __uint32_t events;  /* Epoll events */
        epoll_data_t data;  /* User data variable */
    };
    
    typedef union epoll_data {
        void *ptr;
        int fd;
        __uint32_t u32;
        __uint64_t u64;
    } epoll_data_t;

    域 events 可以是以下几个宏的集合:

    • EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
    • EPOLLOUT:表示对应的文件描述符可以写;
    • EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
    • EPOLLERR:表示对应的文件描述符发生错误;
    • EPOLLHUP:表示对应的文件描述符被挂断;
    • EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
    • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

    3)epoll_wait函数等待事件的就绪,成功时返回就绪的事件数目,调用失败时返回 -1,等待超时返回 0。它也有四个参数:

    • 第一个参数 epfd 即epoll句柄;
    • 第二个参数 events 用来从内核得到就绪事件的集合;
    • 第三个参数 maxevents 告诉内核这个 events 有多大;
    • 第四个参数 timeout 表示等待时的超时时间,以毫秒为单位。

    4.3 示例程序

    这里写一个程序,Client向Server发送消息,Server接收消息并原样发送给Client,Client再把消息输出到终端。

    #include<netinet/in.h>   // sockaddr_in
    #include<sys/types.h>    // socket
    #include<sys/socket.h>   // socket
    #include<arpa/inet.h>
    #include<unistd.h>
    #include<sys/epoll.h>    // epoll 
    #include<sys/ioctl.h>
    #include<sys/time.h>
    #include<iostream>
    #include<vector>
    #include<string>
    #include<cstdlib>
    #include<cstdio>
    #include<cstring>
    using namespace std;
    #define BUFFER_SIZE 1024
    #define SIZE 1000
    #define EPOLLSIZE 100
    
    struct PACKET_HEAD
    {
        int length;
    };
    
    class Server
    {
    private:
        struct sockaddr_in server_addr;
        socklen_t server_addr_len;
        int listen_fd;    // 监听的fd
        int epfd;         // epoll fd
        struct epoll_event events[EPOLLSIZE];   // epoll_wait返回的就绪事件
    public:
        Server(int port);
        ~Server();
        void Bind();
        void Listen(int queue_len = 20);
        void Accept();
        void Run();
        void Recv(int fd);
    };
    
    Server::Server(int port)
    {
        bzero(&server_addr, sizeof(server_addr));
        server_addr.sin_family = AF_INET;
        server_addr.sin_addr.s_addr = htons(INADDR_ANY);
        server_addr.sin_port = htons(port);
        // create socket to listen
        listen_fd = socket(PF_INET, SOCK_STREAM, 0);
        if(listen_fd < 0)
        {
            cout << "Create Socket Failed!";
            exit(1);
        }
        int opt = 1;
        setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    }
    
    Server::~Server()
    {
        close(epfd);
    }
    
    void Server::Bind()
    {
        if(-1 == (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr))))
        {
            cout << "Server Bind Failed!";
            exit(1);
        }
        cout << "Bind Successfully.
    "; 
    }
    
    void Server::Listen(int queue_len)
    {
        if(-1 == listen(listen_fd, queue_len))
        {
            cout << "Server Listen Failed!";
            exit(1);
        }
        cout << "Listen Successfully.
    ";
    }
    
    void Server::Accept()
    {
        struct sockaddr_in client_addr;
        socklen_t client_addr_len = sizeof(client_addr);
    
        int new_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &client_addr_len);
        if(new_fd < 0)
        {
            cout << "Server Accept Failed!";
            exit(1);
        }
    
        cout << "new connection was accepted.
    ";
    
    
        // 在epfd中注册新建立的连接   
        struct epoll_event event;
        event.data.fd = new_fd;
        event.events = EPOLLIN;
    
        epoll_ctl(epfd, EPOLL_CTL_ADD, new_fd, &event);
    }   
    
    void Server::Run()
    {
        epfd = epoll_create(SIZE);   // 创建epoll句柄
    
        struct epoll_event event;
        event.data.fd = listen_fd;
        event.events = EPOLLIN;
        epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &event);   // 注册listen_fd 
    
        while(1)
        {
            int nums = epoll_wait(epfd, events, EPOLLSIZE, -1);
            if(nums < 0)
            {
                cout << "poll() error!";
                exit(1);
            }
    
            if(nums == 0)
            {
                continue;
            }
    
            for(int i=0; i<nums; ++i)  // 遍历所有就绪事件
            {
                int fd = events[i].data.fd;
                if((fd == listen_fd) && (events[i].events & EPOLLIN))
                    Accept();    // 有新的客户端请求
                else if(events[i].events & EPOLLIN)
                    Recv(fd);    // 读数据 
                else
                    ;
            }       
        }
    }
    
    void Server::Recv(int fd)
    {
        bool close_conn = false;  // 标记当前连接是否断开了
    
        PACKET_HEAD head;
        recv(fd, &head, sizeof(head), 0);   // 先接受包头,即数据总长度
    
        char* buffer = new char[head.length];
        bzero(buffer, head.length);
        int total = 0;
        while(total < head.length)
        {
            int len = recv(fd, buffer + total, head.length - total, 0);
            if(len < 0)
            {
                cout << "recv() error!";
                close_conn = true;
                break;
            }
            total = total + len;
        }
    
        if(total == head.length)  // 将收到的消息原样发回给客户端
        {
            int ret1 = send(fd, &head, sizeof(head), 0);
            int ret2 = send(fd, buffer, head.length, 0);
            if(ret1 < 0 || ret2 < 0)
            {
                cout << "send() error!";
                close_conn = true;
            }
        }
    
        delete buffer;
    
        if(close_conn)  // 当前这个连接有问题,关闭它
        {
            close(fd);
            struct epoll_event event;
            event.data.fd = fd;
            event.events = EPOLLIN;
            epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &event);  // Delete一个fd
        }
    }
    
    int main()
    {
        Server server(15000);
        server.Bind();
        server.Listen();
        server.Run();
        return 0;
    }

    注意:

    1. 默认情况下,epoll采用 LT 模式;若要采用 ET 模式,调用epoll_ctl的时候在 events 中添加EPOLLET

    2. 对于监听的sockfd,最好使用水平触发模式,边缘触发模式会导致高并发情况下,有的客户端会连接不上。

    3. 对于读写的connfd,水平触发模式下,阻塞和非阻塞效果都一样,不过为了防止特殊情况,还是建议设置非阻塞。

    4. 对于读写的connfd,边缘触发模式下,必须使用非阻塞fd,并要一次性全部读写完数据(否则会干扰其他事件)。



  • 相关阅读:
    html调用js提示方法名 is not defined处理方法
    Amazon Redshift 基于 PostgreSQL 8.0.2
    Data Nodes
    AWS X-Ray
    API Gateway 中控制和管理对 REST API 的访问
    CodeBuild 与 Amazon Virtual Private Cloud 结合使用
    ElastiCache for Redis 缓存策略
    在 AWS X-Ray 控制台中配置采样规则
    什么是 Amazon Kinesis Data Analytics for SQL 应用程序?
    AWS Secrets Manager
  • 原文地址:https://www.cnblogs.com/songlee/p/5738015.html
Copyright © 2011-2022 走看看