zoukankan      html  css  js  c++  java
  • IO多路复用技术总结

    来源:微信公众号「编程学习基地」

    IO 多路复用概述

    I/O 多路复用技术是为了解决进程或线程阻塞到某个 I/O 系统调用而出现的技术,使进程不阻塞于某个特定的 I/O 系统调用。

    在IO多路复用技术描述前,先讲解下同步,异步,阻塞,非阻塞的概念。

    网络IO模型

    linux网络IO中涉及到的模型如下:

    (1)阻塞式IO

    (2)非阻塞式IO

    (3)IO多路复用

    (4)信号驱动IO

    (5)异步IO

    今天不谈信号驱动IO,略过..

    同步/异步

    在学习IO模型的时候,我们必须明确一个概念,处理 IO 的时候,阻塞和非阻塞都是同步 IO。

    只有使用了特殊的 API 才是异步 IO,例如Linux网络中的AIO。

    再看下POSIX对同步和异步这两个术语的定义:

    • 同步IO操作:导致请求进程阻塞,直到I/O操作完成;
    • 异步IO操作:不导致请求进程阻塞;

    通俗的理解下同步和异步

    • 同步:当执行系统调用read时,需要用户等待内核完成从内核缓冲区到用户缓冲区的数据拷贝。

    • 异步:当执行异步IO操作例如aio_read时,用户不需要等待,只需要接收内核完成操作的通知,由内核来完成数据的读取。

    阻塞/非阻塞

    在知晓阻塞和非阻塞都是同步 IO后,阻塞和非阻塞就很好理解了

    阻塞IO:由系统调用read,导致线程一直等待数据返回。

    阻塞等待模型

    非阻塞IO:系统调用read后立即返回一个状态,当数据达到内核缓冲区之前都是非阻塞的,即返回一个系统调用状态。

    非阻塞等地模型

    ps:闪客的动图做的非常的形象,上述gif动图来源「低并发编程」

    IO多路复用

    IO多路复用是一种同步IO模型,实现一个线程可以监视多个文件句柄;

    select

    select 是操作系统提供的系统调用函数,select()用来等待文件描述词(普通文件、终端、伪终端、管道、FIFO、套接字及其他类型的字符型)状态的改变。是一个轮循函数,循环询问文件节点,可设置超时时间,超时时间到了就跳过代码继续往下执行。

    通过select,我们可以把一个文件描述符的数组发给操作系统, 让操作系统去遍历,确定哪个文件描述符可以读写, 然后告诉我们去处理:

    请添加图片描述

    头文件

    #include <sys/select.h>
    #include <sys/time.h>
    #include <sys/types.h>
    #include <unistd.h>
    

    select调用

    拥塞函数,拥塞等待文件描述符事件的到来

    int select(int maxfdp
    	, fd_set *readset
    	, fd_set *writeset
    	, fd_set *exceptset
    	,struct timeval *timeout);
    

    参数说明:

    maxfdp:被监听的文件描述符的最大值,它比所有文件描述符集合中的文件描述符的最大值大1,因为文件描述符是从0开始计数的;

    readfds、writefds、exceptset:分别指向可读、可写和异常等事件对应的描述符集合。

    timeout:用于设置select函数的超时时间,即告诉内核select等待多长时间之后就放弃等待。timeout == NULL 表示等待无限长的时间,timeout == 0,select立即返回

    timeval结构体

    struct timeval
    {      
        long tv_sec;   /*秒 */
        long tv_usec;  /*微秒 */   
    };
    

    select置位

    int FD_ZERO(int fd, fd_set *fdset);   //一个 fd_set类型变量的所有位都设为 0
    int FD_CLR(int fd, fd_set *fdset);  //清除某个位时可以使用
    int FD_SET(int fd, fd_set *fd_set);   //设置变量的某个位置位
    int FD_ISSET(int fd, fd_set *fdset); //测试某个位是否被置位
    

    当声明了一个文件描述符集后,必须用FD_ZERO将所有位置零

    调用 select函数,拥塞等待文件描述符事件的到来 ;如果超过设定的时间,则不再等待,继续往下执行

    select返回后,用FD_ISSET测试给定位是否置位:

    if(FD_ISSET(fd, &rset)   
    { 
        ... 
        //do something  
    }
    

    fd_set结构体

    fd_set其实这是一个数组的宏定义,实际上是一long类型的数组,每一个数组元素都能与一打开的文件句柄(socket、文件、管道、设备等)建立联系,建立联系的工作由程序员完成,当调用select()时,由内核根据IO状态修改fd_set的内容,由此来通知执行了select()的进程哪个句柄可读。

    select使用

    整个 select 的流程图如下:
    请添加图片描述

    Demo1:select示例

    Server

    #include <stdio.h>
    #include <stdlib.h>
    #include <errno.h>
    #include <string.h>
    #include <sys/types.h>
    #include <netinet/in.h>
    #include <sys/socket.h>
    #include <sys/wait.h>
    #include <unistd.h>
    #include <arpa/inet.h>
    #include <sys/time.h>
    #include <sys/types.h>
    
    #define MAXBUF 1024
    #define LISTEN_NUM 2
    
    int main(int argc, char **argv)
    {
        int default_port = 8000;
        int optch = 0;
        while ((optch = getopt(argc, argv, "s:p:")) != -1)
        {
            switch (optch)
            {
            case 'p':
                default_port = atoi(optarg);
                printf("port: %s\n", optarg);
                break;
            case '?':
                printf("Unknown option: %c\n", (char)optopt);
                break;
            default:
                break;
            }
        }
    
        int sockfd, new_fd;
        socklen_t len;
        struct sockaddr_in my_addr, their_addr;
        char buf[MAXBUF + 1];
        fd_set rfds;            // select
        struct timeval tv;      //超时时间
        int retval, maxfd = -1; // select返回值 select监听句柄的最大数量
        
        if ((sockfd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
        {
            perror("socket");
            exit(EXIT_FAILURE);
        }
    
        bzero(&my_addr, sizeof(my_addr));
        my_addr.sin_family = PF_INET;
        my_addr.sin_port = htons(default_port);
        my_addr.sin_addr.s_addr = INADDR_ANY;
        if (bind(sockfd, (struct sockaddr *)&my_addr, sizeof(struct sockaddr)) == -1)
        {
            perror("bind");
            exit(EXIT_FAILURE);
        }
        if (listen(sockfd, LISTEN_NUM) == -1)
        {
            perror("listen");
            exit(EXIT_FAILURE);
        }
        /*				数据处理				*/
        while (1)
        {
            printf("\n----wait for new connect port:%d\n",default_port);
            len = sizeof(struct sockaddr);
            if ((new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &len)) == -1)
            {
                perror("accept");
                exit(errno);
            }
            else
                printf("server: got connection from %s, port %d, socket %d\n",
                       inet_ntoa(their_addr.sin_addr), ntohs(their_addr.sin_port), new_fd);
            while (1)
            {
                FD_ZERO(&rfds);
                FD_SET(0, &rfds);
                FD_SET(new_fd, &rfds);
                maxfd = new_fd;
                tv.tv_sec = 1;
                tv.tv_usec = 0;
                retval = select(maxfd + 1, &rfds, NULL, NULL, &tv);
                if (retval == -1)
                {
                    perror("select");
                    exit(EXIT_FAILURE);
                }
                else if (retval == 0)
                {
                    continue;
                }
                else
                {
                    /*标准输入*/
                    if (FD_ISSET(0, &rfds))
                    {
                        bzero(buf, MAXBUF + 1);
                        fgets(buf, MAXBUF, stdin);
                        if (!strncasecmp(buf, "quit", 4))
                        {
                            printf("i will quit!\n");
                            break;
                        }
                        len = send(new_fd, buf, strlen(buf) - 1, 0);
                        if (len > 0)
                            printf("send successful,%d byte send..\n", len);
                        else
                        {
                            printf("send failure!");
                            break;
                        }
                    }
                    if (FD_ISSET(new_fd, &rfds))
                    {
                        bzero(buf, MAXBUF + 1);
                        len = recv(new_fd, buf, MAXBUF, 0);
                        if (len > 0)
                            printf("recv success :'%s', %d byte recv..\n", buf, len);
                        else
                        {
                            if (len < 0)
                                printf("recv failure\n");
                            else
                            {
                                printf("the client close ,quit\n");
                                break;
                            }
                        }
                    }
                }
            }
            close(new_fd);
            printf("need othe connecdt (no->quit)");
            fflush(stdout);
            bzero(buf, MAXBUF + 1);
            fgets(buf, MAXBUF, stdin);
            if (!strncasecmp(buf, "no", 2))
            {
                printf("quit!\n");
                break;
            }
        }
        close(sockfd);
        return 0;
    }
    

    makefile:

    TARGET=server
    SRC = $(wildcard *.cpp *.c)
    OBJ = $(patsubst %.cpp *.c,%.o,$(SRC))
    DEFS =
    CFLAGS = -g
    CC =g++
    LIBS =  -lpthread
    $(TARGET):$(OBJ)
    	$(CC) $(CFLAGS) $(DEFS) -o $@ $^ $(LIBS)
    .PHONY:
    clean:
    	rm -rf *.o $(TARGET)
    
    ubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/select$ make
    g++ -g  -o server select.c -lpthread
    ubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/select$ ./server
    ----wait for new connect port:8000
    

    client

    #include <stdio.h>
    #include <string.h>
    #include <errno.h>
    #include <sys/socket.h>
    #include <resolv.h>
    #include <stdlib.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <unistd.h>
    #include <sys/time.h>
    #include <sys/types.h>
    
    #define MAXBUF 1024
    int main(int argc, char **argv)
    {
        int sockfd, len;
        struct sockaddr_in dest;
        char buffer[MAXBUF + 1];
        fd_set rfds;
        struct timeval tv;
        int retval, maxfd = -1;
    
        int optch,ret = -1;
        const char*server_addr;
        int default_port = 8000;
    
        /*判断是否为合法输入 必须传入一个参数:服务器Ip*/
        if(argc<3)
        {
            printf("usage:tcpcli <IPaddress>");
            return 0;
        }
        while((optch = getopt(argc, argv, "s:p:")) != -1)
    	{
    		switch (optch)
    		{
            case 's':
                server_addr = optarg;
                break;
            case 'p':
                default_port = atoi(optarg);
                printf("port: %s\n", optarg);
                break;
            case '?':
                printf("Unknown option: %c\n",(char)optopt);    
                break;
            default:
                break;
    		}
    	}
        
        if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) 
        {
            perror("Socket");
            exit(EXIT_FAILURE);
        }
    
        bzero(&dest, sizeof(dest));
        dest.sin_family = AF_INET;
        dest.sin_port = htons(default_port);
        if (inet_aton(server_addr, (struct in_addr *) &dest.sin_addr.s_addr) == 0) 
        {
            perror(server_addr);
            exit(EXIT_FAILURE);
        }
    
        if (connect(sockfd, (struct sockaddr *) &dest, sizeof(dest)) != 0) 
        {
            perror("Connect ");
            exit(EXIT_FAILURE);
        }
    
        printf("\nget ready message chat:\n");
        while (1) 
    	{
            FD_ZERO(&rfds);
            FD_SET(0, &rfds);
            FD_SET(sockfd, &rfds);
            maxfd = sockfd;
            tv.tv_sec = 1;
            tv.tv_usec = 0;
            retval = select(maxfd + 1, &rfds, NULL, NULL, &tv);
            if (retval == -1) 
    		{
                printf("select %s", strerror(errno));
                break;
            } 
    		else if (retval == 0)
                continue;
    		else
    		{
                if (FD_ISSET(sockfd, &rfds)) 
    			{
                    bzero(buffer, MAXBUF + 1);
                    len = recv(sockfd, buffer, MAXBUF, 0);
                    if (len > 0)
                        printf ("recv message:'%s', %d byte recv..\n",buffer, len);
                    else 
    				{
                        if (len < 0)
                            printf ("message recv failure\n");
                        else
    					{
                            printf("server close ,quit\n");
                        	break;
    					}
                    }
                }
                if (FD_ISSET(0, &rfds))
    			{
                    bzero(buffer, MAXBUF + 1);
                    fgets(buffer, MAXBUF, stdin);
                    if (!strncasecmp(buffer, "quit", 4)) {
                        printf("i will quit\n");
                        break;
                    }
                    len = send(sockfd, buffer, strlen(buffer) - 1, 0);
                    if (len < 0) {
                        printf ("message send failure");
                        break;
                    } else
                        printf
                            ("send success,%d byte send..\n",len);
                }
            }
        }
        close(sockfd);
        return 0;
    } 
    
    TARGET=server
    SRC = $(wildcard *.cpp *.c)
    OBJ = $(patsubst %.cpp *.c,%.o,$(SRC))
    DEFS =
    CFLAGS = -g
    CC =g++
    LIBS =  -lpthread
    $(TARGET):$(OBJ)
    	$(CC) $(CFLAGS) $(DEFS) -o $@ $^ $(LIBS)
    .PHONY:
    clean:
    	rm -rf *.o $(TARGET)
    
    ubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/select/client$ make
    g++ -g  -o client client.c -lpthread
    ubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/select/client$ ./client -s 0.0.0.0
    
    get ready message chat:
    

    简易聊天室select版本

    server

    #include<stdio.h>
    #include<sys/types.h>
    #include<stdlib.h>
    #include<sys/socket.h>
    #include<sys/select.h>
    #include<netinet/in.h>
    #include<arpa/inet.h>
    #include<unistd.h>
    #include<string.h>
    
    #define _BACKLOG_ 5 //监听队列里允许等待的最大值
    #define MAX_CONNECT 20
    int fds[MAX_CONNECT];        //用来存放需要处理的IO事件
    int listen_sock = -1;
    int creat_sock(int port)
    {
        int sock = socket(AF_INET,SOCK_STREAM,0);
        if(sock < 0){
            perror("creat_sock error");
            exit(1);
        }
    
        struct sockaddr_in local;
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        local.sin_addr.s_addr = INADDR_ANY; //inet_addr(0.0.0.0)
        
        // 设置允许socket立即重用
        setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&sock, sizeof(sock));  
    
        if( bind(sock,(struct sockaddr*)&local,sizeof(local)) < 0){
            perror("bind");
            exit(2);
         }
    
        if(listen(sock,_BACKLOG_) < 0 ){
            perror("listen");
            exit(4);
         }
    
        return sock;
    }
    
    int accept_sock(){
        struct sockaddr_in client;
        socklen_t len = sizeof(client);
        int accept_sock = accept(listen_sock, (struct sockaddr *)&client, &len);
        if (accept_sock < 0)
        {
            perror("accept");
            exit(5);
        }
    
        printf("connect by a client, ip:%s port:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port));
    
        size_t i = 0;
        for (; i < MAX_CONNECT; ++i) //将新接受的描述符存入集合中
        {
            if (fds[i] == -1)
            {
                fds[i] = accept_sock;
                break;
            }
        }
        if (i == MAX_CONNECT)
        {
            printf("accept is upper limit..\n");
            close(accept_sock);
        }
    }
    
    int groupChat(int sockFd,void* pBuf,int iSize){
        for(int index=0;index<MAX_CONNECT;index++){
            if(fds[index] == sockFd || fds[index] == listen_sock)
            {
                continue;
            }
            if(fds[index]!=-1){
                printf("write fd:%d..socketFd:%d\n",fds[index],sockFd);
                write(fds[index],pBuf,iSize);
            }
        }
    }
    
    int handle_read(int* socketFd)
    {
        int socket = *socketFd;
        char buf[1024];
        memset(buf, '\0', sizeof(buf));
        ssize_t size = read(socket, buf, sizeof(buf) - 1);
        if (size < 0)
        {
            perror("read");
            exit(6);
        }
        else if (size == 0)
        {
            printf("client close..\n");
            close(socket);
            *socketFd = -1;
        }
        else
        {
            printf("client say: %s\n", buf);
            groupChat(socket, buf, size);
        }
    }
    
    int main(int argc,char* argv[])
    {
        int default_port = 8000;
        int optch = 0;
        while ((optch = getopt(argc, argv, "s:p:")) != -1)
        {
            switch (optch)
            {
            case 'p':
                default_port = atoi(optarg);
                printf("port: %s\n", optarg);
                break;
            case '?':
                printf("Unknown option: %c\n", (char)optopt);
                break;
            default:
                break;
            }
        }
    
        listen_sock = creat_sock(default_port);
    
        size_t fds_num = sizeof(fds)/sizeof(fds[0]);
        size_t i = 0;
        for(;i < fds_num;++i)
        {
            fds[i] = -1;
        }
        
        int max_fd = listen_sock;
    	fds[0] = listen_sock;
        fd_set rset;
        while(1)
    	{
            FD_ZERO(&rset);
            FD_SET(listen_sock,&rset);
            struct timeval timeout = {10 , 0};
    
            size_t i = 0;
            for(;i < fds_num;++i)
            {
                if(fds[i] > 0 ){
                    FD_SET(fds[i] ,&rset);
                    if(max_fd < fds[i])
    				{
                        max_fd = fds[i];
                    }
                }
            }
    
            switch(select(max_fd+1,&rset,NULL,NULL,&timeout))
            {
                case -1:
                    perror("select");
                    break;
                case 0:
                    printf("time out..\n");
    				break;
                default:
                {
                    size_t i = 0;
                    for(;i < fds_num;++i)
                    {
    					//连接请求
    					//当为 listen_socket 事件就绪的时候,就表明有新的连接请求
                        if(FD_ISSET(fds[i],&rset) && fds[i] == listen_sock)
                        {
                            accept_sock();
                        }
                        //普通请求
                        else if(FD_ISSET(fds[i],&rset) && (fds[i] > 0))
                        {
                            handle_read(&fds[i]);
                        }
                        else{}
                    }
                }
                break;
            }
        }
        return 0;
    }
    

    makfeile

    TARGET=server
    SRC = $(wildcard *.cpp *.c)
    OBJ = $(patsubst %.cpp *.c,%.o,$(SRC))
    DEFS =
    CFLAGS = -g
    CC =g++
    LIBS =  -lpthread
    $(TARGET):$(OBJ)
    	$(CC) $(CFLAGS) $(DEFS) -o $@ $^ $(LIBS)
    .PHONY:
    clean:
    	rm -rf *.o $(TARGET)
    
    ubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/select$ make
    g++ -g  -o server select.c -lpthread
    ubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/select$ ./server
    connect by a client, ip:127.0.0.1 port:42964
    

    client

    #include <stdio.h>
    #include <string.h>
    #include <errno.h>
    #include <sys/socket.h>
    #include <resolv.h>
    #include <stdlib.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <unistd.h>
    #include <sys/time.h>
    #include <sys/types.h>
    
    #define MAXBUF 1024
    #define MAXNAME 64
    int main(int argc, char **argv)
    {
        int sockfd, len;
        struct sockaddr_in dest;
        char buffer[MAXBUF + 1];
        fd_set rfds;
        struct timeval tv;
        int retval, maxfd = -1;
    
        int optch,ret = -1;
        const char*server_addr;
        int default_port = 8000;
        char* clientName = "佚名";
    
        /*判断是否为合法输入 必须传入一个参数:服务器Ip*/
        if(argc<3)
        {
            printf("usage:tcpcli <IPaddress>");
            return 0;
        }
        while((optch = getopt(argc, argv, "s:p:n:")) != -1)
    	{
    		switch (optch)
    		{
            case 's':
                server_addr = optarg;
                break;
            case 'p':
                default_port = atoi(optarg);
                printf("port: %s\n", optarg);
                break;
            case 'n':
                clientName = optarg;
                printf("client Name: %s\n", optarg);
                break;
            case '?':
                printf("Unknown option: %c\n",(char)optopt);    
                break;
            default:
                break;
    		}
    	}
        
        if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) 
        {
            perror("Socket");
            exit(EXIT_FAILURE);
        }
    
        bzero(&dest, sizeof(dest));
        dest.sin_family = AF_INET;
        dest.sin_port = htons(default_port);
        if (inet_aton(server_addr, (struct in_addr *) &dest.sin_addr.s_addr) == 0) 
        {
            perror(server_addr);
            exit(EXIT_FAILURE);
        }
    
        if (connect(sockfd, (struct sockaddr *) &dest, sizeof(dest)) != 0) 
        {
            perror("Connect ");
            exit(EXIT_FAILURE);
        }
    
        printf("get ready message chat:\n");
        while (1) 
    	{
            FD_ZERO(&rfds);
            FD_SET(0, &rfds);
            FD_SET(sockfd, &rfds);
            maxfd = sockfd;
            tv.tv_sec = 1;
            tv.tv_usec = 0;
            retval = select(maxfd + 1, &rfds, NULL, NULL, &tv);
            if (retval == -1) 
    		{
                printf("select %s", strerror(errno));
                break;
            } 
    		else if (retval == 0)
                continue;
    		else
    		{
                if (FD_ISSET(sockfd, &rfds)) 
    			{
                    bzero(buffer, MAXBUF + 1);
                    len = recv(sockfd, buffer, MAXBUF, 0);
                    if (len > 0)
                        printf ("recv byte %d, %s\n",len, buffer);
                    else 
    				{
                        if (len < 0)
                            printf ("message recv failure\n");
                        else
    					{
                            printf("server close ,quit\n");
                        	break;
    					}
                    }
                }
                if (FD_ISSET(0, &rfds))
    			{
                    char name_msg[MAXNAME + MAXBUF];
                    bzero(buffer, MAXBUF + 1);
                    fgets(buffer, MAXBUF, stdin);
                    if (!strncasecmp(buffer, "quit", 4)) {
                        printf("i will quit\n");
                        break;
                    }
                    sprintf(name_msg, "[%s]: %s", clientName, buffer);
                    len = send(sockfd, name_msg, strlen(name_msg) - 1, 0);
                    if (len < 0) {
                        printf ("message send failure");
                        break;
                    } else
                        printf("send success,%d byte send..\n",len);
                }
            }
        }
        close(sockfd);
        return 0;
    } 
    

    makefile

    TARGET=server
    SRC = $(wildcard *.cpp *.c)
    OBJ = $(patsubst %.cpp *.c,%.o,$(SRC))
    DEFS =
    CFLAGS = -g
    CC =g++
    LIBS =  -lpthread
    $(TARGET):$(OBJ)
    	$(CC) $(CFLAGS) $(DEFS) -o $@ $^ $(LIBS)
    .PHONY:
    clean:
    	rm -rf *.o $(TARGET)
    
    ubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/select/client$ ./client -s 0.0.0.0 -n 梦凡
    client Name: 梦凡
    get ready message chat:
    

    poll调用

    Poll就是监控文件是否可读的一种机制,作用与select一样。

    #include <poll.h>
    int poll(struct pollfd fds[], nfds_t nfds, int timeout);
    

    参数说明

    struct pollfd

    fds:是一个struct pollfd结构类型的数组,列出了我们需要poll()检查的文件描述符

    typedef struct pollfd {
            int fd;           /* 需要被检测或选择的文件描述符*/
            short events;     /* 对文件描述符fd上感兴趣的事件 */
            short revents;    /* 文件描述符fd上当前实际发生的事件*/
    } pollfd_t;
    

    ​ events:想要监听的事件

    ​ revents:实际上发生的事件

    POLLIN
    POLLOUT
    POLLPRI
    POLLRDHUB
    POLLHUP
    POLLERR
    

    nfds

    指定了fds中元素的个数,nfds_t为无符号整形

    timeout

    决定阻塞行为,一般如下:

    • -1:一直阻塞到fds数组中有一个达到就绪态或者捕获到一个信号

    • 0:不会阻塞,立即返回

    • >0:阻塞时间

    返回值

    • >0:数组fds中准备好读、写或出错状态的那些socket描述符的总数量;

    • ==0:数组fds中没有任何socket描述符准备好读、写,或出错;此时poll超时

    • -1: poll函数调用失败

    poll使用

    #include <stdio.h>
    #include <poll.h>
    #include <string.h>
    
    int main()
    {
    	int timeout = 0;			   
    	char buf[1024];
    	struct pollfd fd_poll[1];	   //设置只有一个事件
    
    	while(1){
    		fd_poll[0].fd = 0;      
    		fd_poll[0].events = POLLIN;
    		fd_poll[0].revents = 0;	   
    
    		memset(buf, '\0', sizeof(buf));
    		switch( poll(fd_poll, 1, -1) ){
    			case 0:
    				perror("timeout!");
    				break;
    			case -1:
    				perror("poll");
    				break;
    			default:
    				{
    					if( fd_poll[0].revents & POLLIN )
    					{
    
    						gets(buf);
    						printf("buf : %s\n",buf);
    					}
    				}
    				break;
    		}
    	}
    	return 0;
    }
    

    makefile

    tcp_poll:tcp_poll.c
        gcc -o $@ $^
    .PHONY:clean
    clean:
        rm -f tcp_poll
    

    epoll调用

    epoll没有对描述符数目的限制,它所支持的文件描述符上限是整个系统最大可以打开的文件数目,例如,在1GB内存的机器上,这个限制大概为10万左右。

    epoll只有 epoll_createepoll_ctlepoll_wait 这三个系统调用。

    第一步,创建一个 epoll 句柄

    第二步,向内核添加、修改或删除要监控的文件描述符。

    第三步,发起了 select() 调用

    请添加图片描述

    其定义如下:

    #include <sys/epoll.h>
    
    int epoll_create(int size);
    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
    

    epoll_create

    #include <sys/epoll.h>
    
    int epoll_create(int size);
    

    调用epoll_create方法创建一个epoll的句柄,使用完epoll后使用close函数进行关闭

    epoll_ctl

    #include <sys/epoll.h>
    
    int epoll_ctl(int epfd	//第一个参数epfd:epoll_create函数的返回值。
    	, int op			//第二个参数events:表示动作类型。有三个宏来表示
    	, int fd			//第三个参数fd:需要监听的fd。
    	, struct epoll_event *event);//第四个参数event:告诉内核需要监听什么事件。
    

    op:

    • EPOLL_CTL_ADD:注册新的fd到epfd中;

    • EPOLL_CTL_MOD:修改已经注册的fd的监听事件;

    • EPOLL_CTL_DEL:从 epfd 中删除一个 fd。

    fd:需要注册监视对象文件描述符

    struct epoll_event

    // 感兴趣的事件和被触发的事件
    struct epoll_event {
        __uint32_t events; // Epoll events
        epoll_data_t data; // User data variable
    };
    // 保存触发事件的某个文件描述符相关的数据
    typedef union epoll_data {
        void *ptr;
        int fd;
        __uint32_t u32;
        __uint64_t u64;
    } epoll_data_t;
    
    Epoll Events:

    EPOLLIN:表示对应的文件描述符可读(包括对端Socket);
    EPOLLOUT:表示对应的文件描述符可写;
    EPOLLPRI:表示对应的文件描述符有紧急数据可读(带外数据);
    EPOLLERR:表示对应的文件描述符发生错误;
    EPOLLHUP:表示对应的文件描述符被挂断;
    EPOLLET:将EPOLL设为边缘触发(Edge Triggered),这是相对于水平触发(Level Triggered)而言的。
    EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket,需要再次添加

    例如:

    struct epoll_event ep_ev;
    int accept_sock = accept(listen_sock,(struct sockaddr*)&remote,&len);
    ep_ev.events = EPOLLIN | EPOLLET;
    ep_ev.data.fd = accept_sock;
    epoll_ctl(epoll_fd,EPOLL_CTL_ADD,accept_sock,&ep_ev)
    

    epoll_wait

    收集在epoll监控的事件中已经发生的事件

    #include <sys/epoll.h>
    
    int epoll_wait(int epfd		//第一个参数epfd:epoll_create函数的返回值。
    	, struct epoll_event *events	
    	, int maxevents
    	, int timeout);			//超时时间(毫秒)
    

    第一个参数epfd:epoll_create函数的返回值。

    第二个参数events:是分配好的epoll_event结构体数组,epoll将会把发生的事件赋值到events数组中(events不可以是空指针,内核只负责把数据赋值到这个event数组中,不会去帮助我们在用户态分配内存)

    第三个参数maxevents:maxevents告诉内核这个events数组有多大,这个maxevents的值不能大于创建epoll_create时的size。

    第四个参数:是超时时间(毫秒),如果函数调用成功,则返回对应IO上已准备好的文件描述符数目,如果返回0则表示已经超时。

    基于epoll的简易http服务器

    基于epoll的简单回显服务器

    #include<stdio.h>
    #include<unistd.h>
    #include<sys/types.h>
    #include<sys/socket.h>
    #include<netinet/in.h>
    #include<arpa/inet.h>
    #include<sys/epoll.h>
    #include<fcntl.h>
    #include<stdlib.h>
    #include<string.h>
    
    int listen_sock = -1;
    int epoll_fd = -1;
    
    //设置非阻塞
    int set_noblock(int sock)
    {
        int opts = fcntl(sock,F_GETFL);
        return fcntl(sock,F_SETFL,opts | O_NONBLOCK);
    }
    
    int creat_socket(int port)
    {
        int sock = socket(AF_INET,SOCK_STREAM,0);
        if(sock < 0){
            perror("socket");
            exit(2);
        }
        //调用setsockopt使当server先断开时避免进入 TIME_WAIT 状态,\
            将其属性设定为SO_REUSEADDR,使其地址信息可被重用
        int opt = 1;
        if(setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt)) < 0){
            perror("setsockopt");
            exit(3);
        }
        struct sockaddr_in local;
    
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        local.sin_addr.s_addr = INADDR_ANY; //inet_addr("0.0.0.0");
    
        if( bind(sock,(struct sockaddr*)&local,sizeof(local)) < 0 ){
            perror("bind");
            exit(4);
        }
        if(listen(sock,5) < 0){
            perror("listen");
            exit(5);
        }
        printf("listen port %d..\n",port);
        return sock;
    }
    
    int accept_socket(){
        struct sockaddr_in remote;
        socklen_t len = sizeof(remote);
    
        int accept_sock = accept(listen_sock, (struct sockaddr *)&remote, &len);
        if (accept_sock < 0)
        {
            perror("accept");
            return -1;
        }
        printf("accept a client..[ip]: %s,[port]: %d\n", inet_ntoa(remote.sin_addr), ntohs(remote.sin_port));
        //将新的事件添加到epoll集合中
        struct epoll_event ep_ev;
        ep_ev.events = EPOLLIN | EPOLLET; // edge边沿触发,只触发一次
        ep_ev.data.fd = accept_sock;
    
        set_noblock(accept_sock);
    
        if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, accept_sock, &ep_ev) < 0)
        {
            perror("epoll_ctl");
            close(accept_sock);
            return -1;
        }
        return 0;
    }
    
    int handle_request(int socketFd){
        //申请空间同时存文件描述符和缓冲区地址
        char buf[102400];
        memset(buf, '\0', sizeof(buf));
    
        ssize_t _s = recv(socketFd, buf, sizeof(buf) - 1, 0);
        if (_s < 0)
        {
            perror("recv");
            return -1;
        }
        else if (_s == 0)
        {
            printf("remote close..\n");
            //远端关闭了,进行善后
            epoll_ctl(epoll_fd, EPOLL_CTL_DEL, socketFd, NULL);
            close(socketFd);
        }
        else
        {
            //读取成功,输出数据
            printf("client# %s", buf);
            fflush(stdout);
    
            //将事件改写为关心事件,进行回写
            struct epoll_event ep_ev;
            ep_ev.data.fd = socketFd;
            ep_ev.events = EPOLLOUT | EPOLLET;
    
            //在epoll实例中更改同一个事件,触发socket可写事件
            epoll_ctl(epoll_fd, EPOLL_CTL_MOD, socketFd, &ep_ev);
        }
        return 0;
    }
    
    int handle_response(int socketFd){
        const char *msg = "HTTP/1.1 200 OK \r\n\r\n<h1> hi girl </h1>\r\n";
        send(socketFd, msg, strlen(msg), 0);
        epoll_ctl(epoll_fd, EPOLL_CTL_DEL, socketFd, NULL);
        close(socketFd);
    }
    int main(int argc,char *argv[])
    {
        int default_port = 8000;
        int optch = 0;
        while ((optch = getopt(argc, argv, "s:p:")) != -1)
        {
            switch (optch)
            {
            case 'p':
                default_port = atoi(optarg);
                printf("port: %s\n", optarg);
                break;
            case '?':
                printf("Unknown option: %c\n", (char)optopt);
                break;
            default:
                break;
            }
        }
    
        listen_sock = creat_socket(default_port);
    
        epoll_fd = epoll_create(256);
        if(epoll_fd < 0){
            perror("epoll creat");
            exit(6);
        }
    
        struct epoll_event ep_ev;
        ep_ev.events = EPOLLIN;         //数据的读取
        ep_ev.data.fd = listen_sock;
    
        //添加关心的事件
        if(epoll_ctl(epoll_fd,EPOLL_CTL_ADD,listen_sock,&ep_ev) < 0){
            perror("epoll_ctl");
            exit(7);
        }
    
        struct epoll_event ready_ev[128];   //申请空间来放就绪的事件。
        int maxnum = 128;
        int timeout = -1;                   //设置超时时间,若为-1,则永久阻塞等待。
        int ret = 0;
        
        int done = 0;
        while(!done){
            switch(ret = epoll_wait(epoll_fd,ready_ev,maxnum,timeout)){
                case -1:
                    perror("epoll_wait");
                    break;
                case 0:
                    printf("time out...\n");
                    break;
                default://至少有一个事件就绪
                {
                    int i = 0;
                    for(;i < ret;++i)
    				{
                        //判断是否为监听套接字,是的话 accept
                        int fd = ready_ev[i].data.fd; 
                        if((fd == listen_sock) && (ready_ev[i].events & EPOLLIN)){
                            accept_socket();
                        }
                        else{//普通IO
                            if(ready_ev[i].events & EPOLLIN){
                                handle_request(fd);
                            }else if(ready_ev[i].events & EPOLLOUT){
                                handle_response(fd);
                            }
                        }
                    }
                }
                    break;
            }
        }
        close(listen_sock);
        return 0;
    }
    

    makefile

    TARGET=server
    SRC = $(wildcard *.cpp *.c)
    OBJ = $(patsubst %.cpp *.c,%.o,$(SRC))
    DEFS =
    CFLAGS = -g
    CC =g++
    LIBS =  -lpthread
    $(TARGET):$(OBJ)
    	$(CC) $(CFLAGS) $(DEFS) -o $@ $^ $(LIBS)
    .PHONY:
    clean:
    	rm -rf *.o $(TARGET)
    

    Build

    make
    

    Usage

    ubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/epoll$ ./server
    listen port 8000
    

    浏览器输入:http://服务器ip:8000/例如,http://49.234.35.128:8000/

    请添加图片描述

  • 相关阅读:
    RESTful API 设计指南
    理解RESTful架构
    django-mysqlclient_1193错误
    获取当前脚本所在的目录和路径
    20191007
    20191005
    20191001
    20190927
    20190922
    莫比乌斯反演证明
  • 原文地址:https://www.cnblogs.com/deroy/p/15700636.html
Copyright © 2011-2022 走看看