zoukankan      html  css  js  c++  java
  • 并发服务器--02(基于I/O复用——运用Select函数)

    I/O模型

      Unix/Linux下有5中可用的I/O模型:

    • 阻塞式I/O
    • 非阻塞式I/O
    • I/O复用(select、poll、epoll和pselect)
    • 信号驱动式I/O(SIGIO)
    • 异步I/O(POSIX的aio_系列的函数)

      关于这五种详细介绍可参考《UNIX网络编程 卷1》或网上博文

    I/O复用

    概念

      I/O多路复用(I/O Multiplexing)是指内核一旦发现进程指定的一个或者多个IO条件就绪(也就是说输入已准备好被读取,或者描述符已能承接更多的输出),它就通知该进程的能力。

      I/O多路复用适用如下场合:

      (1)当客户处理多个描述字时(一般是交互式输入和网络套接口),必须使用I/O复用。

      (2)当一个客户同时处理多个套接口时,而这种情况是可能的,但很少出现。

      (3)如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到I/O复用。

      (4)如果一个服务器既要处理TCP,又要处理UDP,一般要使用I/O复用。

      (5)如果一个服务器要处理多个服务或多个协议,一般要使用I/O复用。

      与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。

    描述符就绪条件

      1. 满足下列四个条件中的任何一个时,一个套接字准备好

      1)该套接字接收缓冲区中的数据字节数大于等于套接字接收缓存区低水位标记的当前大小。对于TCP和UDP套接字而言,缓冲区低水位的值默认为1。那就意味着,默认情况下,只要缓冲区中有数据,那就是可读的。我们可以通过使用SO_RCVLOWAT套接字选项(参见setsockopt函数)来设置该套接字的低水位大小。此种描述符就绪(可读)的情况下,当我们使用read/recv等对该套接字执行读操作的时候,套接字不会阻塞,而是成功返回一个大于0的值(即可读数据的大小)。

      2)该连接的读半部关闭(也就是接收了FIN的TCP连接)。对这样的套接字的读操作,将不会阻塞,而是返回0(也就是EOF)。

      3)该套接字是一个listen的监听套接字,并且目前已经完成的连接数不为0。对这样的套接字进行accept操作通常不会阻塞。

      4)有一个错误套接字待处理。对这样的套接字的读操作将不阻塞并返回-1(也就是返回了一个错误),同时把errno设置成确切的错误条件。这些待处理错误(pending error)也可以通过指定SO_ERROR套接字选项调用getsockopt获取并清除。

      2. 满足下列四个条件中的任何一个时,一个套接字准备好

      1)该套接字发送缓冲区中的可用空间字节数大于等于套接字发送缓存区低水位标记时,并且该套接字已经成功连接(UDP套接字不需要连接)。对于TCP和UDP而言,这个低水位的值默认为2048,而套接字默认的发送缓冲区大小是8K,这就意味着一般一个套接字连接成功后,就是处于可写状态的。我们可以通过SO_SNDLOWAT套接字选项(参见setsockopt函数)来设置这个低水位。此种情况下,我们设置该套接字为非阻塞,对该套接字进行写操作(如write、send等),将不阻塞,并返回一个正值(例如由传输层接受的字节数,即发送的数据大小)。

      2)该连接的写半部关闭。对这样的套接字的写操作将会产生SIGPIPE信号。所以我们的网络程序基本都要自定义处理SIGPIPE信号。因为SIGPIPE信号的默认处理方式是程序退出。

      3)使用非阻塞的connect套接字已建立连接,或者connect已经以失败告终。

      4)有一个错误的套接字待处理。对这样的套接字的写操作将不阻塞并返回-1(也就是返回了一个错误),同时把errno设置成确切的错误条件。这些待处理的错误也可以通过指定SO_ERROR套接字选项调用getsockopt获取并清除。

      

      3. 如果一个套接字存在带外数据(out-of-band data)或者仍处于带外标记,那么它有异常条件待处理。 

      带外数据有时也称为经加速数据(expedited data)。其想法是一个连接的某端发生了重要的事情,而且该端希望迅速通告其对端。这里的“迅速”意味着这种通知应该在已经排队等待发送的任何“普通”(有时称为带内)数据之前发送。也就是说,带外数据被认为具有比普通数据更高的优先级。带外数据并不要求在客户和服务器之间再使用一个连接,而是被映射到已有的连接中。

      UDP套接字不存在带外数据。TCP并没有真正的带外数据,不过提供了紧急模式(urgent mode)(TCP数据包头部的紧急指针)。

      注意:当某个套接字上发生错误时,它将由select标记为既可写又可读。

      关于带外数据更详细的请参考《UNIX网络编程 卷1》第24章。

      接收低水位标记和发送低水位标记的目的在于:允许应用进程控制在select返回可读或可写条件之前有多少数据可读或有多大空间可用于写。举例来说,如果我们知道除非至少存在64个字节的数据,否则我们的应用进程没有任何有效工作可做,那么可以把接收低水位标记设置为64,以防少于64个字节的数据准备好读时select唤醒我们。

      任何UDP套接字只要发送低水位标记小于等于发送缓冲区大小(默认应该总是这种关系)就总是可写的,这是因为UDP套接字不需要连接。

      

      下图汇总了导致select返回某个套接字就绪的条件

      

    I/O复用——运用Select函数

    函数说明

      select函数可以在一段指定的时间内,监听用户感兴趣的文件描述符的可读、可写及异常事件。

      Select函数定义

    #include <sys/select.h>
    #include <sys/time.h>
    
    int select(int maxfdp1,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout)
    返回值:就绪描述符的数目,超时返回0,出错返回-1

      Select函数说明

      应用程序调用select函数时,通过readset、writeset、exceptset传入感兴趣的文件描述符,内核将修改它们来通知应用程序哪些文件描述符已经就绪。

      Select函数参数说明 

      1)maxfdp1指定待测试的描述字个数,它的值是待测试的最大描述字加1(因此把该参数命名为maxfdp1),描述字0、1、2...maxfdp1-1均将被测试。

      2)readset、writeset和exceptset指定我们要让内核测试读、写和异常条件的描述字。如果对某一个的条件不感兴趣,就可以把它设为空指针。详细如下:

      readset : 可读的文件描述符集合;
      writeset : 可写的文件描述符集合; 
      exceptset : 异常的文件描述符集合。
      3)timeout告知内核等待所指定描述字中的任何一个就绪可花多少时间。

      Select函数返回值

      select函数成功时,返回就绪(可读、可写、异常)文件描述符的总数;

      如果在超时时间timeout内没有任何文件描述符就绪,则select函数返回0;

      select函数失败时,返回-1,并设置errno;

      如果在select函数等待期间,程序接收到信号,则select函数立即返回-1,并设置errno为EINTR。

      fd_set说明

      struct fd_set可以理解为一个集合,这个集合中存放的是文件描述符,可通过以下四个宏进行设置:

    void FD_ZERO(fd_set *fdset);             //清除set的所有位  
    void FD_SET(int fd, fd_set *fdset);      //设置set的第fd位  
    void FD_CLR(int fd, fd_set *fdset);      //清除set的第fd位  
    void FD_ISSET(int fd, fd_set *fdset);    //测试set的第fd为是否被设置    

      timeval说明

      timeout告知内核等待所指定描述字中的任何一个就绪可花多少时间。其timeval结构用于指定这段时间的秒数和微秒数。

    struct timeval{
       long tv_sec;   //seconds
       long tv_usec;  //microseconds
    };

      这个参数有三种可能:

      1)永远等待下去:仅在有一个描述字准备好I/O时才返回。为此,把该参数设置为空指针NULL。

      2)等待一段固定时间:在有一个描述字准备好I/O时返回,但是不超过由该参数所指向的timeval结构中指定的秒数和微秒数。

      3)根本不等待:检查描述字后立即返回,这称为轮询(polling)。为此,该参数必须指向一个timeval结构,而且其中的定时器值(由该结构指定的秒数和微秒数)必须为0。

    修订的str_cli函数

      我们这里修订的是以前博文并发服务器--01(基于进程派生)的str_cli函数。在那篇博文,它的定义如下:

     1 void
     2 str_cli(FILE *fp, int sockfd)
     3 {
     4     char    sendline[MAXLINE], recvline[MAXLINE];
     5 
     6     while (Fgets(sendline, MAXLINE, fp) != NULL) {
     7 
     8         Writen(sockfd, sendline, strlen(sendline));
     9 
    10         if (Readline(sockfd, recvline, MAXLINE) == 0)
    11             err_quit("str_cli: server terminated prematurely");
    12 
    13         Fputs(recvline, stdout);
    14     }
    15 }
    原str_cli

      早先str_cli版本阻塞于fgets调用,这个版本的将改为阻塞于select调用,或是等待标准输入可读,或是等待套接字可读。下图展示了调用select所处理的各种条件:

      

      客户的套接字上的三个条件处理如下:

      1)如果对端TCP发送数据,那么该套接字变为可读,并且read返回一个大于0的值(即读入数据的字节数);

      2)如果对端TCP发送一个FIN(对端进程终止),那么套接字变为可读,并且read返回0(EOF);

      3)如果对端TCP发送一个RST(对端主机崩溃并重新启动),那么该套接字变为可读,并且read返回-1,而errno中含有确切的错误码。

      新版本的str_cli函数如下:

     1 void
     2 str_cli_select(FILE *fp, int sockfd)
     3 {
     4     int maxfdp, stdineof;
     5     fd_set rset;
     6     char buf[MAXLINE];
     7     int n;
     8 
     9     stdineof = 0;
    10     FD_ZERO(&rset);
    11     for(;;)
    12     {
    13         if(stdineof == 0)
    14             FD_SET(fileno(fp), &rset);
    15         FD_SET(sockfd, &rset);
    16         maxfdp = max(fileno(fp), sockfd) + 1;
    17         Select(maxfdp, &rset, NULL, NULL, NULL);
    18 
    19         if(FD_ISSET(sockfd, &rset))    /* socket is readable */
    20         {
    21             if((n == Read(sockfd, buf, MAXLINE)) == 0)
    22             {
    23                 if(stdineof == 1)
    24                     return;    /* normal termination */
    25                 else
    26                     err_quit("str_cli_select: server terminated prematurely");
    27             }
    28             Write(fileno(stdout), buf, n);
    29         }
    30 
    31         if(FD_ISSET(fileno(fp), &rset))    /* input is readable */
    32         {
    33             if((n = Read(fileno(fp), buf, MAXLINE)) == 0)
    34             {
    35                 stdineof = 1;
    36                 Shutdown(sockfd, SHUT_WR);    /* send FIN */
    37                 FD_CLR(fileno(fp), &rset);
    38                 continue;
    39             }
    40             Writen(sockfd, buf, n);
    41         }
    42     }
    43 }

      上述程序中的stdineof是一个初始化为0的新标志。只要该标识为0,每次主循环中我们总是select标准输入的可读性。

      19~29:当我们在套接字上读到EOF时,如果我们已在标准输入上遇到EOF,那就是正常的终止,于是函数返回;但是如果我们在标准输入上没有遇到EOF,那么服务器进程已过早终止。

      31~41:当我们在标准输入上碰到EOF时,我们把新标志stdineof置为1,并把第二个参数指定为SHUT_WR来调用shutdown以发送FIN。

    shutdown函数

      终止网络连接的通常方法是调用close函数。不过close有两个限制,却可以用shutdown来避免:

      1)close把描述符的引用计数减1,仅在该计数变为0是才关闭套接字。使用shutdown可以不管引用计数就激发TCP的正常连接终止序列。

      2)close终止读和写两个方向的数据传送。既然TCP连接是全双工的,有时候我们需要告知对端我们已经完成了数据发送,即使对端仍有数据要发送给我们。

      shutdown函数定义如下:

    #include <sys/socket.h>
    int shutdown(int sockfd, int howto);
    // 返回:若成功则返回0,若出错则返回-1

      该函数的行为依赖于howto参数的值:

      1)SHUT_RD:关闭连接的读这一半——套接字中不再有数据可接收,而且套接字接收缓冲区中的现有数据都被丢弃。进程不能再对这样的套接字调用任何读函数。对一个TCP套接字这样调用shutdown函数后,由该套接字接收来的来自对端的任何数据都被确认,然后悄然丢弃。

      2)SHUT_WR:关闭连接的写这一半——对于TCP套接字,这称为半关闭(half close)。当前留在套接字发送缓冲区的数据将被发送掉,后跟TCP的正常连接终止序列。

      3)SHUT_RDWR:连接的读半部和写半部都关闭——这与调用shutdown两次等效:第一次调用SHUT_RD,第二次调用指定SHUT_WR。

    修订TCP回射服务器程序

      我们这里修订的是以前博文并发服务器--01(基于进程派生)的str_cli函数。在那篇博文,它的内容如下:

     1 #include "unp.h"
     2 
     3 void
     4 sig_chld(int signo)
     5 {
     6     pid_t pid;
     7     int stat;
     8     while((pid = waitpid(-1, &stat, WNOHANG)) > 0)
     9         printf("child %d terminated
    ", pid);
    10     return;
    11 }
    12 
    13 int
    14 main()
    15 {
    16     int listenfd, connfd;
    17     pid_t childpid;
    18     socklen_t clilen;
    19     struct sockaddr_in cliaddr, servaddr;
    20 
    21     void sig_chld(int);
    22 
    23     listenfd = Socket(AF_INET, SOCK_STREAM, 0);
    24     
    25     bzero(&servaddr, sizeof(servaddr));
    26     servaddr.sin_family = AF_INET;
    27     servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    28     servaddr.sin_port = htons(SERV_PORT);
    29 
    30     Bind(listenfd, (SA *) &servaddr, sizeof(servaddr));
    31     Listen(listenfd, LISTENQ);
    32     Signal(SIGCHLD, sig_chld);    /* must call waitpid */
    33     for( ; ; )
    34     {
    35         clilen = sizeof(cliaddr);
    36         if((connfd = accept(listenfd, (SA *)&cliaddr, &clilen)) < 0)
    37         {
    38             if(errno == EINTR)
    39                 continue;
    40             else
    41                 err_sys("accept error");
    42         }
    43         if((childpid = Fork()) == 0)
    44         {
    45             Close(listenfd);
    46             str_echo(connfd);
    47             Close(connfd);
    48             exit(0);
    49         }
    50         Close(connfd);
    51     }
    52 
    53     exit(0);
    54 }
    原tcpserv

      在这里,我们将把上边程序改写成使用select来处理任意个客户的单进程程序。现在我们来跟踪一下服务器端的状态:

      下图给出了第一个客户建立连接前服务器的状态:

      

      服务器有单个监听描述符,我们用一个圆点来表示。

      服务器只维护一个读描述符集,如图6-15所示。假设服务器是在前台启动的,那么描述符0、1和2将分别被设置为标准输入 、标准输出和标准错误输出。可见监听套接字的第一个可用描述符是3。图6-15还展示了一个名为client的整形数组,它含有每个客户的已连接套接字描述符。该数组的所有元素都被初始化为-1。

      

      描述符中的唯一的非0项是表示监听套接字的项,因此select的第一个参数将为4。

      当地一个客户与服务器建立连接时,监听描述符变为可读,我们的服务器于是调用accept。在本例的假设下,有accept返回的新的已连接描述符将是4。图6-16展示了从用户到服务器的连接:

      

      从现在起,我们的服务器必须在其client数组中记住每个新的已连接描述符,并把它加到描述符集中去。图6-17展示了这样更新后的数据结构:

      

      稍后,第二个客户与服务器建立连接,图6-18展示了这种情形:

      

      新的已连接描述符(假设是5)必须被记住,从而给出如图6-19所示的数据结构:

      

      我们接着假设第一个客户终止它的连接。该客户的TCP发送一个FIN,使得服务器中的描述符4变为可读。当服务器读这个已连接套接字时,read将返回0。我们于是关闭该套接字并相应地更新数据结构:把client[0]的值置为-1,把描述符集中描述符4的为设置为0,如图6-20所示。注意,maxfd的值没有改变。

      

      总之,当有客户到达时,我们在client数组中的第一个可用项(即值为-1的第一个项)中记录其已连接套接字的描述符。我们还必须把这个已连接描述符加到读描述符集中。变量maxi是client数组当前使用项的最大下标,而变量maxfd(加1之后)是select函数第一个参数的当前值。对于本服务器所能处理的最大客户数目的限制是以下两个之中的较小者:FD_SETSIZE和内核允许本进程打开的最大描述符数。 

      完整的服务器程序如下:

     1 #include "unp.h"
     2 
     3 int
     4 main()
     5 {
     6     int i, maxi, maxfd, listenfd, connfd, sockfd;
     7     int nready, client[FD_SETSIZE];
     8     ssize_t n;
     9     fd_set rset, allset;
    10     char buf[MAXLINE];
    11     socklen_t clilen;
    12     struct sockaddr_in cliaddr, servaddr;
    13 
    14     listenfd = Socket(AF_INET, SOCK_STREAM, 0);
    15     
    16     bzero(&servaddr, sizeof(servaddr));
    17     servaddr.sin_family = AF_INET;
    18     servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    19     servaddr.sin_port = htons(SERV_PORT);
    20 
    21     Bind(listenfd, (SA *) &servaddr, sizeof(servaddr));
    22     Listen(listenfd, LISTENQ);
    23     
    24     maxfd = listenfd;    /* initialize */
    25     maxi = -1;            /* index into client[] array */
    26     for(i = 0; i <FD_SETSIZE; i++)
    27         client[i] = -1;    /* -1 indicates available entry */
    28     FD_ZERO(&allset);
    29     FD_SET(listenfd, &allset);
    30     
    31     for( ; ; )
    32     {
    33         rset = allset;        /* structure assignment */
    34         nready = Select(maxfd + 1, &rset, NULL, NULL, NULL);
    35 
    36         if(FD_ISSET(listenfd, &rset))    /* new client connection */
    37         {
    38             clilen = sizeof(cliaddr);
    39             connfd = Accept(listenfd, (SA *) &cliaddr, &clilen);
    40 
    41             for(i = 0; i < FD_SETSIZE; i++)
    42             {
    43                 if(client[i] < 0)
    44                 {
    45                     client[i] = connfd;    /* save descriptor */
    46                     break;
    47                 }
    48             }
    49             if(i == FD_SETSIZE)
    50                 err_quit("too many clients");
    51         
    52             FD_SET(connfd, &allset);    /* add new descriptor to set */
    53             if(connfd > maxfd)
    54                 maxfd = connfd;    /* for select */
    55             if(i > maxi)
    56                 maxi = i;        /* max index in client[] array */
    57 
    58             if(--nready <= 0)
    59                 continue;        /* no more readable descriptor */
    60         }
    61 
    62         for(i = 0; i <= maxi; i++)    /* check all clients for data */
    63         {
    64             if((sockfd = client[i]) < 0)
    65                 continue;
    66             if(FD_ISSET(sockfd, &rset))
    67             {
    68                 if((n = Read(sockfd, buf, MAXLINE)) == 0)
    69                 {
    70                     /* connection closed by client */
    71                     Close(sockfd);
    72                     FD_CLR(sockfd, &allset);
    73                     client[i] = -1;
    74                 }
    75                 else
    76                     Writen(sockfd, buf, n);
    77 
    78                 if(--nready <= 0)
    79                     break;    /* no more available descriptors */
    80             }
    81         }
    82     }
    83 
    84     exit(0);
    85 }
    View Code

      相应的客户端程序如下:

     1 #include "unp.h"
     2 
     3 void
     4 str_cli_select(FILE *fp, int sockfd);
     5 
     6 int
     7 main(int argc, char **argv)
     8 {
     9     int sockfd;
    10     struct sockaddr_in servaddr;
    11 
    12     if(argc != 2)
    13         err_quit("usage: tcpcli <IPaddress>");
    14 
    15     sockfd = Socket(AF_INET, SOCK_STREAM, 0);
    16     
    17     bzero(&servaddr, sizeof(servaddr));
    18     servaddr.sin_family = AF_INET;
    19     servaddr.sin_port = htons(SERV_PORT);
    20     Inet_pton(AF_INET, argv[1], &servaddr.sin_addr.s_addr);
    21 
    22     Connect(sockfd, (SA *) &servaddr, sizeof(servaddr));
    23 
    24     str_cli_select(stdin, sockfd);
    25 
    26     exit(0);
    27 }
    28 
    29 void
    30 str_cli_select(FILE *fp, int sockfd)
    31 {
    32     int maxfdp, stdineof;
    33     fd_set rset;
    34     char buf[MAXLINE];
    35     int n;
    36 
    37     stdineof = 0;
    38     FD_ZERO(&rset);
    39     for(;;)
    40     {
    41         if(stdineof == 0)
    42             FD_SET(fileno(fp), &rset);
    43         FD_SET(sockfd, &rset);
    44         maxfdp = max(fileno(fp), sockfd) + 1;
    45         Select(maxfdp, &rset, NULL, NULL, NULL);
    46 
    47         if(FD_ISSET(sockfd, &rset))    /* socket is readable */
    48         {
    49             if((n == Read(sockfd, buf, MAXLINE)) == 0)
    50             {
    51                 if(stdineof == 1)
    52                     return;    /* normal termination */
    53                 else
    54                     err_quit("str_cli_select: server terminated prematurely");
    55             }
    56             Write(fileno(stdout), buf, n);
    57         }
    58 
    59         if(FD_ISSET(fileno(fp), &rset))    /* input is readable */
    60         {
    61             if((n = Read(fileno(fp), buf, MAXLINE)) == 0)
    62             {
    63                 stdineof = 1;
    64                 Shutdown(sockfd, SHUT_WR);    /* send FIN */
    65                 FD_CLR(fileno(fp), &rset);
    66                 continue;
    67             }
    68             Writen(sockfd, buf, n);
    69         }
    70     }
    71 }
    View Code

    运用Select函数存在的问题

      这段摘自博文select、poll、epoll之间的区别总结[整理]

      select的调用过程如下所示:

      

      (图片链接

      1)使用copy_from_user从用户空间拷贝fd_set到内核空间

      2)注册回调函数__pollwait

      3)遍历所有fd,调用其对应的poll方法(对于socket,这个poll方法是sock_poll,sock_poll根据情况会调用到tcp_poll,udp_poll或者datagram_poll)

      4)以tcp_poll为例,其核心实现就是__pollwait,也就是上面注册的回调函数。

      5)__pollwait的主要工作就是把current(当前进程)挂到设备的等待队列中,不同的设备有不同的等待队列,对于tcp_poll来说,其等待队列是sk->sk_sleep(注意把进程挂到等待队列中并不代表进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时current便被唤醒了。

      6)poll方法返回时会返回一个描述读写操作是否就绪的mask掩码,根据这个mask掩码给fd_set赋值。

      7)如果遍历完所有的fd,还没有返回一个可读写的mask掩码,则会调用schedule_timeout是调用select的进程(也就是current)进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。如果超过一定的超时时间(schedule_timeout指定),还是没人唤醒,则调用select的进程会重新被唤醒获得CPU,进而重新遍历fd,判断有没有就绪的fd。

      8)把fd_set从内核空间拷贝到用户空间。

     

      select的几大缺点:

      1)每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大

      2)同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大

      3)select支持的文件描述符数量太小了,默认是1024

    参考资料

      《UNIX网络编程 卷1》

      Linux下的I/O复用

      select、poll、epoll之间的区别总结[整理]

  • 相关阅读:
    LeetCode具体分析 :: Recover Binary Search Tree [Tree]
    [leetcode] Path Sum
    System、应用程序进程的Binder线程池和Handler消息循环
    R(二): http与R脚本通讯环境安装
    R(一): R基础知识
    Hive(五):hive与hbase整合
    Hive(六):HQL DDL
    Hive(四):c#通过odbc访问hive
    Hive(三):SQuirrel连接hive配置
    Hive(二):windows hive ODBC 安装
  • 原文地址:https://www.cnblogs.com/xiehongfeng100/p/4634361.html
Copyright © 2011-2022 走看看