zoukankan      html  css  js  c++  java
  • IO multiplexing 与 非阻塞网络编程

      使用I/O multipexing 的网络编程中,一般需要采用非阻塞网络编程的风格,防止服务端在处理高连接量大时候阻塞在某个文件描述符上面,比如某个socket 有大量的数据需要写,但是内核发送缓冲区已经填满,无法在一次write中将需要发送到数据发送出去,程序就会阻塞在该处,导致select/poll/epoll_wait() 此时不能处理其它到来的请求,同样read或者accept也可能出现阻塞的情况,比如当客户端发起connect,之后立刻关闭该链接,在服务端尚未调用accept之前就关闭了该连接,当后来服务端accept得以调用此时完成队列中又没有完成的三次握手的连接,accept就会导致进程睡眠(详细情况可以参见UNPv1非阻塞accept的描述)。因此I/O multiplexing 一般采用非阻塞网络编程的风格。

      对于read/wirte 操作来说,如果采用了非阻塞编程则需要为每个connection配备应用层缓冲区,read端主要防止一次来到数据太多,write主要防止出现阻塞,可以把没有发送完成的数据写入缓冲区,等到socket 可写之后继续发送。如果在新一次write请求到来的时候,应用层写缓冲区中还有之前未发送完的数据,则应该先将上次未写入内核的数据写入内核缓冲区,保证发送到顺序性。此处给一个简单的例子。

    #include <stdio.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <unistd.h>
    #include <arpa/inet.h>
    #include <sys/types.h>
    #include <vector>
    #include <string.h>
    #include <stdlib.h>
    #include <map>
    #include <fcntl.h>
    #include <errno.h>
    #include <string>
    #include <iostream>
    #include <sys/select.h>
    
    #define SEVER_PORT 1314
    #define MAX_LINE_LEN 1024
    
    using namespace std;
    
    int Accept(int fd, struct sockaddr_in *addr)
    {
        socklen_t addr_len = static_cast<socklen_t>( sizeof *addr);
        int connfd,flags;
    
        connfd = accept(fd,reinterpret_cast<struct sockaddr *>(addr),&addr_len);
    
        flags = fcntl(connfd,F_GETFL,0);
        fcntl(connfd,F_SETFL,flags | O_NONBLOCK);
    
        if(connfd < 0)
        {
            int ErrorCode = errno;
            switch(ErrorCode)
            {
                case 0:
                case EWOULDBLOCK:
                case ECONNABORTED:
                case EPROTO:
                case EINTR:
                case EMFILE:
                        errno = ErrorCode;
                        printf("Accept Error: %s
    ",strerror(ErrorCode));
                    break;
                default:
                    break;
            }
        }
        return connfd;
    }
    
    int Read(int fd, map<int, string> &bufMap)
    {
        struct iovec iov[2];
        char buf[MAX_LINE_LEN+1];
        char exbuf[65535]; // 如果一次read很多数据,则动用该缓冲区
        int nrcv;
        
        iov[0].iov_base = buf;
        iov[0].iov_len = MAX_LINE_LEN;
        
        iov[1].iov_base = exbuf;
        iov[1].iov_len = sizeof exbuf;
        
        nrcv = readv(fd, iov, 2);// 使用readv保证能将数据读取完
        
        if(nrcv > MAX_LINE_LEN)
        {
            bufMap[fd] += string(buf) + string(exbuf); // test !
            printf("extrabuf in use! 
    ");
        }
        else if( nrcv > 0)
        {
            bufMap[fd] += string(buf);
        }
        else
        {
            return nrcv;
        }
    
        return nrcv;
    }
    
    int getSocketError(int fd)
    {
        int optval;
        
        socklen_t optlen = static_cast<socklen_t>(sizeof optval);
        
        if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0)
        {
            return errno;
        }
        else
        {
            return optval;
        }
    }
    
    int main()
    {
        struct sockaddr_in cli_addr, server_addr;
        vector<int> client(FD_SETSIZE,-1);
        map<int ,string> bufMap;// 简易应用层缓冲区
    
        fd_set rset,wrset,allset;
        int listenfd, connfd, sockfd, maxfd, nready, ix,maxid, nrcv,flags,nwrt,one;
        char addr_str[INET_ADDRSTRLEN];
    
        int accepted = 0;
    
        server_addr.sin_family = AF_INET;
        server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
        server_addr.sin_port = htons(SEVER_PORT);
    
        listenfd = socket(AF_INET,SOCK_STREAM,0);
    
        flags = fcntl(listenfd,F_GETFL,0);
        fcntl(listenfd,F_SETFL,flags | O_NONBLOCK);
    
        one = 1;
        setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR,&one, sizeof(one));
    
        if(bind(listenfd,(struct sockaddr *)&server_addr,sizeof server_addr) < 0)
        {
            printf("socket bind error: %s
    ",strerror(errno));
            return 0;
        }
    
        listen(listenfd,10);
    
        FD_ZERO(&rset);
        FD_ZERO(&wrset);
        FD_ZERO(&allset);
        FD_SET(listenfd,&allset);
        maxfd = listenfd;
        maxid = -1;
    
    
        while(1)
        {
            rset = allset;
            nready = select(maxfd + 1, &rset,&wrset,NULL,NULL);
    
    
            if(nready < 0)
            {
                printf("select error: %s
    ",strerror(errno));
                exit(1);
            }
    
            if(FD_ISSET(listenfd, &rset))
            {
    
                connfd = Accept(listenfd,&cli_addr);
    
                printf("recieve from : %s at port %d
    ", inet_ntop(AF_INET,&cli_addr.sin_addr,addr_str,INET_ADDRSTRLEN),cli_addr.sin_port);
    
                for(ix = 0; ix < static_cast<int>(client.size()); ix++)
                {
                    if(client[ix] < 0)
                    {
                        client[ix] = connfd;
                        break;
                    }
                }
    
                printf("client[%d] = %d
    ",ix,connfd);
    
                if( FD_SETSIZE == ix)
                {
                    printf("too many client! 
    ");
                    exit(1);
                }
    
                if( connfd > maxfd)
                {
                    maxfd = connfd;
                }
    
                FD_SET(connfd, &allset);
    
                accepted++;
                printf("accepted: %d
    ",accepted);
    
                if(ix > maxid)
                {
                    maxid = ix;
                }
                
                if(--nready == 0)
                {
                    continue;
                }
    
            }
    
            for(ix = 0; ix <= maxid; ix++)
            {
                if((sockfd = client[ix]) < 0)
                {
                    continue;
                }
    
                if(FD_ISSET(sockfd,&rset))
                {
                
                    int left_len = bufMap[sockfd].length();
                    
                    if( 0 == (nrcv = Read(sockfd,bufMap)))
                    {
                        client[ix] = -1;
                        printf("close! 
    ");
                        FD_CLR(sockfd,&allset);
                        bufMap.erase(sockfd);
                        close(sockfd);
                    }
                    else if ( nrcv > 0)
                    {
                        printf("nrcv = %d 
    ",nrcv);
    
                        nrcv += left_len;//next time when client write to
    
                        //nwrt = write(sockfd,bufMap[sockfd].c_str(),200);// 模拟还有剩余
                        nwrt = write(sockfd,bufMap[sockfd].c_str(),nrcv);
    
                        if(nwrt < 0)
                        {
                            if( errno != EWOULDBLOCK)
                            {
                                printf("Write error: %s
    ", strerror(errno));
                            }
                        }
    
                        printf("nwrt = %d 
    ",nwrt);
    
                        if(nwrt == nrcv) // 全部写到了内核缓冲区
                        {
                            bufMap[sockfd].clear();
                            //bufMap[sockfd].erase(0,nrcv);
                            if(FD_ISSET(sockfd,&wrset))
                            {
                                FD_CLR(sockfd,&wrset);
                            }
                        }
                        else // 还有剩余
                        {
                            printf("write left 
    ");
                            bufMap[sockfd].erase(0,nwrt);
                            std::cout << " after erase: "<<bufMap[sockfd] <<std::endl;
                            FD_SET(sockfd,&wrset);//开始关注写事件
                        }
    
                    }
                    else
                    {
                        int err = getSocketError(sockfd);
                        
                        printf("SocketError: %s
    ",strerror(err));
                    }
                }
    
                if(FD_ISSET(sockfd,&wrset))
                {
                    nrcv = bufMap[sockfd].size();
                    printf("write again: nrcv left = %d 
    ",nrcv);
                    nwrt = write(sockfd,bufMap[sockfd].c_str(),nrcv);
    
                    if(nwrt == nrcv)
                    {
                        bufMap[sockfd].clear();
                        if(FD_ISSET(sockfd,&wrset))
                        {
                            FD_CLR(sockfd,&wrset);
                        }
                        printf("Write complete! 
    ");
                    }
                    else
                    {
                        bufMap[sockfd].erase(0,nwrt);
                    }
                }
    
                if(--nready == 0)
                {
                    break;
                }
            }
        }
    
        return 0;
    }
  • 相关阅读:
    安卓读取SD卡的容量
    Eclipse常用快捷键
    安卓实现记住密码登陆功能
    eclipse DDMS导出文件失败--android Failed to push the item
    LinearLayout线性布局搭配权重属性的使用
    unity 天空盒有缝隙的解决方案
    unity gitignore
    unity 5.6.1 Oculus手柄输入问题
    两个长整形相除等于零
    Unity 获得视频的某一帧,生成缩略图
  • 原文地址:https://www.cnblogs.com/zhuyp1015/p/3560095.html
Copyright © 2011-2022 走看看