1 概述
之间的学习中发现,传统的阻塞式系统调用不仅浪费进程运行时间,而且会带来狠毒问题。因此进程需要有一种预先告知内核的能力,使得内核一旦发现进程指定的一个或者多个I/O条件就绪,它就通知进程。这个能力称为I/O复用,是由select和poll函数支持的。
I/O复用的典型使用场景:
- 当客户处理多个描述符(通常是交互输入和网路套接字)时,必须使用I/O复用
- 一个客户同时处理多个套接字
- 如果一个TCP服务器既要处理监听套接字,也要处理已连接套接字,一般就要使用I/O复用
- 如果一个服务器既要处理TCP也要处理UDP
- 一个服务器要处理多个服务或者协议
2 I/O模型
在学习select和poll函数之前,先了解Unix系统的5种I/O模型:
2.1 阻塞式模型
在图6-1中,进程调用recvfrom,其系统调用直到数据报到达且被复制到应用进程的缓冲区或者发生错误才返回。最常见的错误是系统调用被信号中断。因此进程在调用recvfrom时都是阻塞的,只有函数返回之后才开始处理数据报。
2.2 非阻塞式模型
进程把一个套接字设置成非阻塞模式:通知内核,在请求的I/O操作需要等待时,内核不要把进程投入睡眠,而是返回一个错误。
当一个应用进程像上图一样对一个非阻塞描述符重复调用recvfrom时,我们称之为轮询(polling)。这样做往往会浪费大量CPU时间。
2.3 I/O复用模型
有了I/O复用,我们就可以调用select或poll函数阻塞在两个系统调用的某一个上,而不是阻塞在真正的I/O系统调用上。下图概括了I/O复用模型。
我们阻塞于select调用,等待数据报套接字变为可读。当select返回套接字可读这一条件时,我们调用recvfrom把所读取的数据报复制到应用进程缓冲区。
比较图6-3和图6-1,I/O复用似乎不显得有什么优势,事实上由于使用select需要两个系统调用,I/O复用还稍显劣势。不过使用select的优势在于可以等待多个描述符就绪。
2.4 信号驱动式
我们也可以用信号,让内核在描述符就绪的时候发送SIGIO信号通知我们。我们称这种信号模型为信号驱动式I/O。
首先开启套接字的信号驱动式I/O功能,并通过sigaction系统调用匹配一个信号处理函数。该系统调用立即返回,进程继续工作不被阻塞。当数据报准备好读取时,内核就为该进程产生一个SIGIO辛哈,我们就可以在信号处理函数中调用recvfrom读取数据报,也可以立即通知主循环让它读取数据报。
无论如何处理SIGIO信号,信号驱动式模型的优势在于等待数据报到达期间进程不会被阻塞。主循环可以继续执行,只要等待来自信号处理函数的通知:既可以是数据已被准备好处理,也可以是数据报已准备好被读取。
2.5 异步I/O模型
异步I/O由POSIX规范定义。它的工作机制是:告知内核启动某个操作,并让内核在整个操作(包括将数据从内核搬运到自己进程的缓冲区)完成后通知我们。这种模型与信号驱动式模型的区别在于:信号驱动I/O是内核通知我们何时启动一个I/O操作,而异步I/O是由内核通知我们I/O操作何时完成。如下图所示:
我们调用aio_read函数,给内核传递描述符、缓冲区指针、缓冲区大小和文件偏移,并告诉内核整个操作完成的时候通知我们。该系统调用立即返回,并且在等待I/O完成期间,进程不被阻塞。
2.6 各种模型比较
3 select函数
select函数指示内核等待多个事件中的任何一个事件发生,并且立即或者等待一段时间之后通知进程。
#include <sys/select.h> #include <sys/time.h> int select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset, const struct timeval *timeout); /*返回:若有就绪的描述符则返回它的数目,若超时则返回0,出错返回-1*/
time参数告知内核等待指定描述符中的任何一个最大花费时间。timeval结构用于指定这段时间的秒数和微秒数。
struct timeval { long tv_sec; long tv_usec; };
这个参数有三种可能:
(1)空指针:永远等待下去,直到任意一个描述符准备好才返回;
(2)某个固定时间:不超过指定的时间内,等待任意一个描述符准备好才返回;
(3)0:根本不等待,检查描述符之后立即返回,这称为轮询(polling)。该参数必须指向一个timeval结构体,并且其中的值均为0。
中间的三个参数readset、writeset、exceptset指定内核测试读、写和异常的描述符。目前支持异常的条件有两个:
(1)某个套接字的带外数据的到达;
(2)某个已置为分组模式的伪终端存在可从其主端读取的控制状态信息。
select函数使用描述符集(通常是一个整数数组)的每一位来对应一个描述符。它的具体操作隐藏在fd_set的数据类型和以下四个宏之中:
void FD_ZERO(fd_set *fdset); void FD_SET(int fd, fd_set *fdset); void FD_CLR(int fd, fd_set *fdset); int FD_ISSET(int fd, fd_set *fdset);
描述符集的初始化非常重要,如果作为自动变量分配的描述符集没有被初始化,将产生意想不到的后果。
select的三个参数readset、writeset、exceptset中,如果对某一个不感兴趣,可以把它设为空指针。
maxfdp1参数指定了待测试的描述符个数,它的值是待测试的所有描述符最大值加1,描述符0,1,2,……一直到maxfdp1-1都将被测试。
select函数修改由指针readset、writeset、exceptset所指向的描述符集,因此这三个参数都是值-结果参数。调用select函数时,指定关心的描述符值,函数返回时,结果将指示哪些描述符已就绪,我们就可以用FD_ISSET宏来测试fd_set数据类型中的描述符。描述符集中任何未就绪的描述符相对应的位都被置为0。因此,每次重新调用select函数时,都得把描述符集相应的位置为1。
select函数的返回值表示描述符集中已就绪的总位数。如果在任何描述符就绪就绪之前定时器超时,则返回0。返回-1表示出错。
3.2 描述符就绪条件
满足下列四个条件时,一个套接字准备好读:
(1)该套接字接收缓冲区的数据字节数大于等于套接字接收缓冲区低水位标记的当前大小
(2)该连接的读半部关闭(也就是接收了FIN的TCP连接)
(3)该套接字是一个监听套接字并且已完成的连接数不为0
(4)其上有一个套接字错误待处理
满足下列四个条件时,一个套接字准备好写:
(1)该套接字发送缓冲区的数据字节数大于等于套接字发送缓冲区低水位标记的当前大小
(2)该连接的写半部关闭
(3)使用非阻塞式的connect的套接字已建立连接,或者connect已经以失败告终
(4)其上有一个套接字错误待处理
4 客户端中str_cli函数修订版
使用select函数重写了TCP echo客户端程序。早先的版本问题在于:当套接字上发生某些事件时,客户可能阻塞于fgets调用。新版本该为阻塞于select调用,或是等待标准输入可读,或是等待套接字可读。
客户的套接字上三个条件处理如下:
(1)如果对端TCP发送数据,那么该套接字变为可读,并且read返回一个大于0的值(即读入的字节数)
(2)如果对端TCP发送一个FIN,那么该套接字变为可读,并且read返回0(EOF)
(3)如果对端TCP发送一个RST(对端主机崩溃并重新启动),那么该套接字变为可读,并且read返回-1,而errno中含有确切的错误码
新版本代码如下:
void str_cli(FILE *fp, int sockfd) { int maxfdp1; fd_set rset; char sendline[MAXLINE], recvline[MAXLINE]; FD_ZERO(&rset); while(1) { FD_SET(fileno(fp), &rset); FD_SET(sockfd, &rset); maxfdp1 = max(fileno(fp), sockfd) + 1; select(maxfdp1, &rset, NULL, NULL, NULL); if(FD_ISSET(sockfd, &rset) { if(read(sockfd, recvline, MAXLINE) == 0) perror("str_cli:server terminated prematurely"); fputs(recvline, stdout); } if(FD_ISSET(fileno(fp), &rset) { if(fgets(sendline, MAXLINE, fp) == NULL) return; write(sockfd, sendline, strlen(sendline)); } }
}
5 TCP服务器函数修订版
回顾之前想的TCP echo服务器程序,把它重写成使用select来处理任意个客户的单进程程序,而不是为每一个客户派生一个子进程。
先通过一个例子描述服务器的工作流程:
当前是服务器未建立连接的状态。服务器有单个监听描述符,维护一个读描述符集合一个名为client的整形数组,数组的大小为FD_SETSIZE-1即内核允许的本进程能打开的最大描述符数目,该数组的所有元素都被初始化为-1。
可见监听套接字第一个可用的描述符是3。描述符集的前三个是标准输入输出和标准错误的描述符,唯一的非0项是监听套接字的项,因此select的第一个参数为4。
当第一个客户与服务器建立连接时,监听描述符变为可读,服务器于是调用accept,假设accept返回的描述符是4。服务器在client数组中记住每个新的已连接描述符,并把它加到描述符集中去。
同样的,第二个客户与服务器相连接,新的描述符(5)被记录,并添加到描述符集中。
假设第一个客户终止它的连接。该客户的TCP发送一个FIN,使得服务器中的描述符4变为可读。但服务器读取这个套接字时,read返回0.于是服务器关闭这个套接字并相应的更新数据结构:把client[0]的值置为-1,把描述符集中的描述符4的位设置为0。注意,maxfd的值没有改变。
总之,当可到达时,我们在client数组中的第一个可用项(值为-1的第一个项)中记录已连接的套接字描述符,同时把该描述符添加到读描述符集之中。变量maxi是client数组当前使用项的最大下标,而变量maxfd+1是select函数第一个参数值。对于本服务器所能处理的最大客户数目是FD_SETSIZE和内核允许本进程打开的最大描述符数目的较小值。
下面给出修改后的服务器程序echo_server.c
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <errno.h> #include <sys/socket.h> #include <sys/select.h> #include <sys/wait.h> #include <sys/time.h> #include <netinet/in.h> #include <arpa/inet.h> int main(int argc, char **argv) { // client数组当前使用的最大下标,描述符集的当前最大长度 int i, maxi, maxfd; // 已就绪信号,client数组 int nready, client[FD_SETSIZE]; ssize_t n; fd_set rset, allset; char buf[1024]; // 套接字描述符 int listenfd, connfd, sockfd; socklen_t clilen; struct sockaddr_in server_addr, client_addr; listenfd=socket(AF_INET, SOCK_STREAM, 0); memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family=AF_INET; server_addr.sin_addr.s_addr=htonl(INADDR_ANY); server_addr.sin_port=htons(12345); if(bind(listenfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) perror("bind error"); if(listen(listenfd, 10) < 0) perror("listen error"); // select初始化 maxfd = listenfd; maxi = -1; for(i=0; i<FD_SETSIZE; i++) client[i] = -1; FD_ZERO(&allset); FD_SET(listenfd, &allset); while(1) { // 阻塞于select rset = allset; nready = select(maxfd+1, &rset, NULL, NULL, NULL); if(FD_ISSET(listenfd, &rset)) // 如果是新客户连接 { clilen = sizeof(client_addr); connfd = accept(listenfd, (struct sockaddr *)&client_addr, &clilen); for(i=0; i<FD_SETSIZE; i++) { if(client[i]<0) { client[i] = connfd; // 记录描述符 break; } } if(i == FD_SETSIZE) perror("too many clients."); // 向描述符集添加一个新的描述符 FD_SET(connfd, &allset); // 更新描述符集的maxfd if(connfd > maxfd) maxfd = connfd; // 更新client数组的maxi if(i > maxi) maxi = i; // 没有可读的描述符了 if(--nready <= 0) continue; } // 如果没有新客户连接,检查现有的连接来确认是谁 for(i=0; i<=maxi; i++) { if((sockfd = client[i]) < 0) continue; if(FD_ISSET(sockfd, &rset)) { if((n = read(sockfd, buf, 1024)) == 0) // 客户端关闭连接 { close(sockfd); FD_CLR(sockfd, &allset); client[i] = -1; }else{ write(sockfd, buf, n); } // 没有可读的描述符了 if(--nready <= 0) break; } } } return 0; }
6 shutdown函数
终止网络连接的通常方法是调用close函数。不过close有两个限制,可以使用shutdown函数来避免。
(1)close把描述符的引用计数减1,仅在计数变为0时才关闭套接字。使用shutdown不管引用计数就可以激发TCP的正常连接终止序列
(2)close终止读和写两个方向上的传送。既然TCP连接是全双工的,有时候我们需要通知对端我们已经完成了数据发送任务,但是可能对方还有信息要发送给我们。
#include <sys/socket.h>
int shutdown(int sockfd, int howto); /*返回:若成功返回0,出错返回-1*/
该函数的行为依赖于howto参数的值:
SHUT_RD:关闭连接的读这一半——套接字不再有数据接收,且接收缓冲区的数据将被丢弃。
SHUT_WR:关闭连接的写这一半——对于TCP套接字,则称为半关闭。当前留在发送缓冲区的数据将被发送掉,接着发送终止序列。
SHUT_REDWR:等效于分别调用上述两个shutdown函数。
7 poll函数
poll的功能和select类似,不过在处理流设备时,它能够提供额外的信息。
#include <poll.h> int poll(struct pollfd *fdarray, unsigned long nfds, int timeout); /*返回:若有就绪描述符则为其数目,若超时则为0,若出错则为-1*/
第一个参数是指向一个结构体数组的第一个元素的指针。每个数组元素都是一个pollfd的结构,用于测试指定某个给定描述符fd的条件。
struct pollfd { int fd; short events; short revents; }
要测试的条件由events成员指定,函数在相应的revents返回该描述符的状态(避免了使用值-结果参数)。这两个成员中的每一个都由指定的某个条件的一位或多位构成。
我们将图分成三个部分:第一部分是处理输入的四个常值,第二部分是处理输出的三个常值,第三部分是处理错误的三个常值。第三部分不能在events中设置,而是由revents返回。
nfds参数指定结构数组中元素的个数。
timeout参数指定poll函数返回前等待多长时间,它是一个指示毫秒数的整数值。
用poll函数修改之前的服务器程序:
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <errno.h> #include <poll.h> #include <sys/socket.h> #include <sys/select.h> #include <sys/wait.h> #include <sys/time.h> #include <netinet/in.h> #include <arpa/inet.h> #define OPEN_MAX 1024 #define INFTIM -1 int main(int argc, char **argv) { // client数组当前使用的最大下标 int i, maxi; // 已就绪信号 int nready; // 分配poll的结构体数组 struct pollfd client[OPEN_MAX]; ssize_t n; char buf[1024]; // 套接字描述符 int listenfd, connfd, sockfd; socklen_t clilen; struct sockaddr_in server_addr, client_addr; listenfd=socket(AF_INET, SOCK_STREAM, 0); memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family=AF_INET; server_addr.sin_addr.s_addr=htonl(INADDR_ANY); server_addr.sin_port=htons(12345); if(bind(listenfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) perror("bind error"); if(listen(listenfd, 10) < 0) perror("listen error"); // poll初始化 client[0].fd = listenfd; client[0].events = POLLRDNORM; for(i=1; i<OPEN_MAX; i++) client[i].fd = -1; maxi = 0; while(1) { // 阻塞于poll nready = poll(client, maxi+1, INFTIM); if(client[0].revents & POLLRDNORM) // 如果是新客户连接 { clilen = sizeof(client_addr); connfd = accept(listenfd, (struct sockaddr *)&client_addr, &clilen); for(i=1; i<OPEN_MAX; i++) { if(client[i].fd < 0) { client[i].fd = connfd; // 记录描述符 break; } } if(i == OPEN_MAX) perror("too many clients."); // 向client[]数组添加一个新的pollfd结构体 client[i].events = POLLRDNORM; // 更新client数组的maxi if(i > maxi) maxi = i; // 没有可读的描述符了 if(--nready <= 0) continue; } // 如果没有新客户连接,检查现有的连接来确认是谁 for(i=1; i<=maxi; i++) { if((sockfd = client[i].fd) < 0) continue; if(client[i].revents & (POLLRDNORM | POLLERR)) { if((n = read(sockfd, buf, 1024)) < 0) // 客户端重新连接 { if(errno == ECONNRESET) { close(sockfd); client[i].fd = -1; }else perror("read error."); }else if(n == 0) // 客户端关闭连接 { close(sockfd); client[i].fd = -1; }else write(sockfd, buf, n); // 没有可读的描述符了 if(--nready <= 0) break; } } } return 0; }