服务器设计技术有很多,按使用的协议来分有TCP服务器和UDP服务器,按处理方式来分有循环服务器和并发服务器。
在网络程序里面,一般来说都是许多客户对应一个服务器,为了处理客户的请求,对服务端的程序就提出了特殊的要求。
目前最常用的服务器模型有:
- 循环服务器:服务器在同一时刻只能响应一个客户端的请求
- 并发服务器:服务器在同一时刻可以响应多个客户端的请求
1、循环服务器模型
1.1 UDP循环服务器的实现方法
UDP循环服务器每次从套接字上读取一个客户端的请求->处理->然后将结果返回给客户机。
因为UDP是非面向连接的,没有一个客户端可以老是占住服务端。只要处理过程不是死循环,服务器对于每一个客户机的请求总是能够满足。
UDP循环服务器模型为:
socket(); bind(); while(1) { recvfrom(); process(); sendto(); }
1.2 TCP循环服务器的实现方法
TCP循环服务器接受一个客户端的连接,然后处理,完成了这个客户的所有请求后,断开连接。TCP循环服务器一次只能处理一个客户端的请求,只有在这个客户的所有请求满足后,服务器才可以继续后面的请求。如果有一个客户端占住服务器不放时,其它的客户机都不能工作了,因此,TCP服务器一般很少用循环服务器模型的。
TCP循环服务器模型为:
socket(); bind(); listen(); while(1){ accept(); process(); close(); }
2、三种并发服务器实现方法
一个好的服务器,一般都是并发服务器。并发服务器设计技术一般有:多进程服务器、多线程服务器、I/O复用服务器等。
2.1 多进程并发服务器
在Linux环境下多进程的应用很多,其中最主要的就是网络/客户服务器。多进程服务器是当客户有请求时,服务器用一个子进程来处理客户请求,父进程继续等待其它客户的请求。这种方法的优点是当客户有请求时,服务器能及时处理客户,特别是在客户服务器交互系统中。对于一个 TCP服务器,客户与服务器的连接可能并不马上关闭,可能会等到客户提交某些数据后再关闭,这段时间服务器端的进程会阻塞。所以这时操作系统可能调度其它客户服务进程,比起循环服务器大大提高了服务性能。
TCP多进程并发服务器
TCP并发服务器的思想是每一个客户机的请求并不由服务器直接处理,而是由服务器创建一个子进程来处理。
socket(); bind(); listen(); while(1){ accept(); if(fork() == 0) { process(); close(); exit(); } close(); }
使用示例:
#include <unistd.h> #include <sys/types.h> /* basic system data types */ #include <sys/socket.h> /* basic socket definitions */ #include <netinet/in.h> /* sockaddr_in{} and other Internet defns */ #include <arpa/inet.h> /* inet(3) functions */ #include <stdlib.h> #include <errno.h> #include <stdio.h> #include <string.h> #define MAXLINE 1024 //typedef struct sockaddr SA; void handle(int connfd); int main(int argc, char **argv) { int listenfd, connfd; int serverPort = 6888; int listenq = 1024; pid_t childpid; char buf[MAXLINE]; socklen_t socklen; struct sockaddr_in cliaddr, servaddr; socklen = sizeof(cliaddr); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(serverPort); listenfd = socket(AF_INET, SOCK_STREAM, 0); if (listenfd < 0) { perror("socket error"); return -1; } if (bind(listenfd, (struct sockaddr *) &servaddr, socklen) < 0) { perror("bind error"); return -1; } if (listen(listenfd, listenq) < 0) { perror("listen error"); return -1; } printf("echo server startup,listen on port:%d ", serverPort); for ( ; ; ) { connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &socklen); if (connfd < 0) { perror("accept error"); continue; } sprintf(buf, "accept form %s:%d ", inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port); printf(buf,""); childpid = fork(); if (childpid == 0) { /* child process */ close(listenfd); /* close listening socket */ handle(connfd); /* process the request */ exit (0); } else if (childpid > 0) { close(connfd); /* parent closes connected socket */ } else { perror("fork error"); } } } void handle(int connfd) { size_t n; char buf[MAXLINE]; for(;;) { n = read(connfd, buf, MAXLINE); if (n < 0) { if(errno != EINTR) { perror("read error"); break; } } if (n == 0) { //connfd is closed by client close(connfd); printf("client exit "); break; } //client exit if (strncmp("exit", buf, 4) == 0) { close(connfd); printf("client exit "); break; } write(connfd, buf, n); //write maybe fail,here don't process failed error } }
2.2 多线程服务器
多线程服务器是对多进程的服务器的改进,由于多进程服务器在创建进程时要消耗较大的系统资源,所以用线程来取代进程,这样服务处理程序可以较快的创建。据统计创建线程与创建进程要快10100 倍,所以又把线程称为“轻量级”进程。线程与进程不同的是:一个进程内的所有线程共享相同的全局内存、全局变量等信息。
这种机制又带来了同步问题。以下是多线程服务器模板:
socket(); bind(); listen(); while(1){ accept(); if((pthread_creat()) != -1) { process(); close(); exit(); } close(); }
2.3 I/O复用服务器
除了上面所述的Apache模型(Process Per Connection,简称PPC,多进程服务器),TPC(Thread Per Connection,多线程服务器)模型,还有一种常用的技术即I/O多路复用技术。
I/O复用技术是为了解决进程或线程阻塞到某个I/O系统调用而出现的技术,使进程不阻塞于某个特定的I/O系统调用。它也可用于并发服务器的设计,常用函数select 、poll和epoll来实现。
下面介绍一下三种常用的I/O复用服务器模型。
2.3.1 select模型
int select(int nfds,fd_set *readfds,fd_set *writefds, fd_set *exceptfds,struct timeval *timeout); void FD_SET(int fd,fd_set *fdset); void FD_CLR(int fd,fd_set *fdset); void FD_ZERO(fd_set *fdset); int FD_ISSET(int fd,fd_set *fdset);
一般的来说当我们在向文件读写时,进程有可能在读写出阻塞,直到一定的条件满足。比如我们从一个套接字读数据时,可能缓冲区里面没有数据可读 (通信的对方还没有发送数据过来),这个时候我们的读调用就会等待(阻塞)直到有数据可读。如果我们不希望阻塞,我们的一个选择是用select系统调用。只要我们设置好select的各个参数,那么当文件可以读写的时候,select回"通知"我们说可以进行读写了。
readfds 所有要读的文件文件描述符的集合。
writefds 所有要的写文件文件描述符的集合。
exceptfds 其他的服要向我们通知的文件描述符。
timeout 超时设置。
nfds 所有监控的文件描述符中最大的那一个加1。
在调用select时进程会一直阻塞直到以下的一种情况发生: 1)有文件可以读;2)有文件可以写;3)超时所设置的时间到。
为了设置文件描述符我们要使用几个宏:
FD_SET 将fd加入到fdset。
FD_CLR 将fd从fdset里面清除。
FD_ZERO 从fdset中清除所有的文件描述符。
FD_ISSET 判断fd是否在fdset集合中。
调用select()将阻塞,直到指定的文件描述符准备好执行I/O,或者可选参数timeout指定的时间已经过去。
监视的文件描述符分为三类set,每一种对应等待不同的事件。readfds中列出的文件描述符被监视是否有数据可供读取(如果读取操作完成则不会阻塞)。writefds中列出的文件描述符则被监视是否写入操作完成而不阻塞。最后,exceptfds中列出的文件描述符则被监视是否发生异常,或者无法控制的数据是否可用(这些状态仅仅应用于套接字)。
这三类set可以是NULL,这种情况下select()不监视这一类事件。
select()成功返回时,每组set都被修改以使它只包含准备好I/O的文件描述符。例如,假设有两个文件描述符,值分别是7和9,被放在readfds中。当select()返回时,如果7仍然在set中,则这个文件描述符已经准备好被读取而不会阻塞。如果9已经不在set中,则读取它将可能会阻塞(我说可能是因为数据可能正好在select返回后就可用,这种情况下,下一次调用select()将返回文件描述符准备好读取)。
第一个参数n,等于所有set中最大的那个文件描述符的值加1。因此,select()的调用者负责检查哪个文件描述符拥有最大值,并且把这个值加1再传递给第一个参数。
timeout参数是一个指向timeval结构体的指针,timeval定义如下:
#include <sys/time.h> struct timeval { long tv_sec; /* seconds */ long tv_usec; /* 10E-6 second */ };
如果这个参数不是NULL,则即使没有文件描述符准备好I/O,select()也会在经过tv_sec秒和tv_usec微秒后返回。当select()返回时,timeout参数的状态在不同的系统中是未定义的,因此每次调用select()之前必须重新初始化timeout和文件描述符set。实际上,当前版本的Linux会自动修改timeout参数,设置它的值为剩余时间。因此,如果timeout被设置为5秒,然后在文件描述符准备好之前经过了3秒,则这一次调用select()返回时tv_sec将变为2。
如果timeout中的两个值都设置为0,则调用select()将立即返回,报告调用时所有未决的事件,但不等待任何随后的事件。
返回值和错误代码
select()成功时返回准备好I/O的文件描述符数目,包括所有三个set。如果提供了timeout,返回值可能是0;错误时返回-1,并且设置errno为下面几个值之一:
EBADF,给某个set提供了无效文件描述符。
EINTR,等待时捕获到信号,可以重新发起调用。
EINVAL,参数n为负数,或者指定的timeout非法。
ENOMEM,不够可用内存来完成请求。
使用select的示例:
#include<iostream> #include<stdio.h> #include<stdlib.h> #include<unistd.h> #include<errno.h> #include<netdb.h> #include<sys/types.h> #include<sys/socket.h> #include<netinet/in.h> #include<arpa/inet.h> #include<netdb.h> #include<sys/time.h> #include<string.h> #include<sys/select.h> #include<pthread.h> using namespace std; int max_fd(int a[], int n) { int max = 0; for(int i = 0; i < n; i++) { if(max < a[i]) { max = a[i]; } } return max; } int main(int argc, char*argv[]) { int port = 0; int N = 0; if (argc != 3) { cout<<"command error"<<endl; exit(-1); } port = atoi(argv[1]); N = atoi(argv[2]); if(N > FD_SETSIZE) { N = FD_SETSIZE; } int server_sock = 0; struct sockaddr_in server_addr; memset(&server_addr, 0, sizeof(server_addr)); if((server_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) { cout<<"create socket error"<<endl; exit(-1); } server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr =htonl(INADDR_ANY); server_addr.sin_port = htons(port); if(bind(server_sock, (struct sockaddr*)&server_addr,sizeof(sockaddr)) == -1) { cout<<"bind error"<<endl; exit(-1); } if(listen(server_sock, 5) == -1) { cout<<"listent error"<<endl; exit(-1); } fd_set fd[2]; FD_ZERO(&fd[0]); FD_SET(server_sock, &fd[0]); int *sock = new int[N]; memset(sock, 0, sizeof(int)*N); sock[0] = server_sock; int count = 0; while(1) { struct timeval tv = {5, 0}; FD_ZERO(&fd[1]); fd[1] = fd[0]; int ret = select(max_fd(sock, N)+1, &fd[1], NULL, NULL, &tv); if(ret < 0) { cout<<"select error"<<endl; } else if(ret == 0) { cout<<"time out"<<endl; } else { if(FD_ISSET(sock[0], &fd[1]) && count < N-1) { struct sockaddr_in client_addr; memset(&client_addr, 0, sizeof(client_addr)); unsigned int len = sizeof(client_addr); int new_sock=accept(sock[0],(struct sockaddr*)&client_addr, &len); if(new_sock == -1) { cout<<"accept error"<<endl; } else { for(int i = 1; i < N; i++) { if(sock[i] == 0) { sock[i] = new_sock; FD_SET(new_sock, &fd[0]); count++; break; } } } } char recvbuf[1024] = {0}; char sendbuf[1024] = {0}; for(int i = 1; i < N; i++) { if(FD_ISSET(sock[i], &fd[1])) { if(recv(sock[i], recvbuf, sizeof(recvbuf), 0) <= 0) { cout<<"recv error"<<endl; FD_CLR(sock[i], &fd[0]); close(sock[i]); sock[i] = 0; count--; continue; } strcpy(sendbuf, recvbuf); if(send(sock[i], sendbuf, sizeof(sendbuf), 0) <= 0) { cout<<"send error"<<endl; FD_CLR(sock[i], &fd[0]); close(sock[i]); sock[i] = 0; count--; continue; } } }//end for } }//end while return 0; }
使用select后我们的服务器程序就变成:
socket(); bind(); listen(); while(1) { 设置监听读写文件描述符(FD_*); 调用select; 如果是监听套接字就绪,说明一个新的连接请求建立 { 建立连接(accept); 加入到监听文件描述符中去; } 否则说明是一个已经连接过的描述符 { 进行操作(read或者write); } }
select模型的缺点:
- 最大并发数限制。因为一个进程所打开的fd(文件描述符)是有限的,有FD_SETSIZE设置,默认值是1024/2048,因此select模型的最大并发数就被相应限制了。
- 效率问题。select每次调用对线性地扫描全部fd集合,这样效率就会呈线性下降,把FD_SETSIZE放大的后果,就是会导致效率线性下降更快。
- 内核/用户控件内存拷贝问题。如何让内核把fd消息通知给用户控件,select采取了内存拷贝的方法。
2.3.2 poll模型
poll模型就是监控文件是否可读的一种机制,作用同select。
poll函数原型:
int poll(struct pollfd *fds,nfds_t nfds,int timeout);
Poll机制会判断fds中的文件是否可读,如果可读则会立即返回,返回的值就是可读fd的数量;如果不可读,那么就进程就会休眠timeout单位时间,然后再来判断是否有文件可读。如果有,返回fd的数量;如果没有,则返回0。
和select()不一样,poll()没有使用低效的三个基于位的文件描述符set,而是采用了一个单独的结构体pollfd数组,由fds指针指向这个组。
pollfd结构体定义如下:
#include <sys/poll.h> struct pollfd { int fd; /* file descriptor */ short events; /* requested events to watch */ short revents; /* returned events witnessed */ };
每一个pollfd结构体指定了一个被监视的文件描述符,可以传递多个结构体,指示poll()监视多个文件描述符。每个结构体的events域是监视该文件描述符的事件掩码,由用户来设置这个域。revents域是文件描述符的操作结果事件掩码。内核在调用返回时设置这个域。events域中请求的任何事件都可能在revents域中返回。合法的事件如下:
POLLIN,有数据可读。
POLLRDNORM,有普通数据可读。
POLLRDBAND,有优先数据可读。
POLLPRI,有紧迫数据可读。
POLLOUT,写数据不会导致阻塞。
POLLWRNORM,写普通数据不会导致阻塞。
POLLWRBAND,写优先数据不会导致阻塞。
POLLMSG,SIGPOLL消息可用。
此外,revents域中还可能返回下列事件:
POLLER,指定的文件描述符发生错误。
POLLHUP,指定的文件描述符挂起事件。
POLLNVAL,指定的文件描述符非法。
这些事件在events域中无意义,因为它们在合适的时候总是会从revents中返回。使用poll()和select()不一样,你不需要显式地请求异常情况报告。POLLIN | POLLPRI等价于select()的读事件,POLLOUT | POLLWRBAND等价于select()的写事件。POLLIN等价于POLLRDNORM | POLLRDBAND,而POLLOUT则等价于POLLWRNORM。
例如,要同时监视一个文件描述符是否可读和可写,我们可以设置events为POLLIN | POLLOUT。在poll返回时,我们可以检查revents中的标志,对应于文件描述符请求的events结构体。如果POLLIN事件被设置,则文件描述符可以被读取而不阻塞。如果POLLOUT被设置,则文件描述符可以写入而不导致阻塞。这些标志并不是互斥的:它们可能被同时设置,表示这个文件描述符的读取和写入操作都会正常返回而不阻塞。timeout参数指定等待的毫秒数,无论I/O是否准备好,poll都会返回。timeout指定为负数值表示无限超时;timeout为0指示poll调用立即返回并列出准备好I/O的文件描述符,但并不等待其它的事件。这种情况下,poll()就像它的名字那样,一旦选举出来,立即返回。
返回值和错误代码
成功时,poll()返回结构体中revents域不为0的文件描述符个数;如果在超时前没有任何事件发生,poll()返回0;失败时,poll()返回-1,并设置errno为下列值之一:
EBADF,一个或多个结构体中指定的文件描述符无效。
EFAULT,fds指针指向的地址超出进程的地址空间。
EINTR,请求的事件之前产生一个信号,调用可以重新发起。
EINVAL,nfds参数超出PLIMIT_NOFILE值。
ENOMEM,可用内存不足,无法完成请求。
内核实现流程:
当应用程序调用poll函数的时候,会调用到系统调用sys_poll函数,该函数最终调用do_poll函数,do_poll函数中有一个死循环,在里面又会利用do_pollfd函数去调用驱动中的poll函数(fds中每个成员的字符驱动程序都会被扫描到),驱动程序中的Poll函数的工作有两个,一个就是调用poll_wait 函数,把进程挂到等待队列中去,另一个是确定相关的fd是否有内容可读,如果可读,就返回1,否则返回0,如果返回1 ,do_poll函数中的count++,然后do_poll函数然后判断三个条件if (count ||!timeout || signal_pending(current))如果成立就直接跳出,如果不成立,就睡眠timeout个单位长的时间(调用schedule_timeout实现睡眠),如果在这段时间内没有其他进程去唤醒它,那么第二次执行判断的时候就会跳出死循环。如果在这段时间内有其他进程唤醒它,那么也可以跳出死循环返回(例如我们可以利用中断处理函数去唤醒它,这样的话一有数据可读,就可以让它立即返回)。
poll模型使用示例
//分发进程 //listen #include <stdio.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/poll.h> #include <errno.h> #include <netinet/in.h> #include <unistd.h> #include <arpa/inet.h> #include <stdlib.h> #define LISTENQ 5 #define OPEN_MAX 1024 #define SERV_PORT 10088 #define MAX_LINE 1024 #define INFTIM -1 int main(int argc, char** argv) { int err, maxpoll, nreadly; struct pollfd client[OPEN_MAX]; struct sockaddr_in cliaddr, servaddr; int listenFd = socket(AF_INET, SOCK_STREAM, 0); if( listenFd < 0) { printf("socket函数执行失败"); return 1; } servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); //inet_aton('10.132.10.64', &(servaddr.sin_addr)); //servaddr.sin_addr.s_addr = inet_addr("10.132.10.64"); servaddr.sin_port = htons(SERV_PORT); if(bind(listenFd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0) { printf("bind函数执行失败"); return 1; } if(listen(listenFd, LISTENQ) < 0) { printf("listen函数执行失败"); return 1; } client[0].fd = listenFd; client[0].events = POLLRDNORM; for(int i =1; i < OPEN_MAX; i++) { client[i].fd = -1; } maxpoll = 0; printf("listen函数执行成功 "); while(true) { socklen_t clilen = sizeof(cliaddr); nreadly = poll(client, maxpoll + 1, 1000); if(client[0].revents & POLLRDNORM) { int idx = -1; for(int i =1; i < OPEN_MAX; i++) { if(client[i].fd < 0) { idx = i; break; } } if(idx == -1) { //丢弃连接 continue; } int connfd = accept(listenFd, (struct sockaddr*)&cliaddr, &clilen); if(connfd < 0) { printf("accept函数执行失败"); break; } printf("Ip: %s 到此一游 ", inet_ntoa(cliaddr.sin_addr)); client[idx].fd = connfd; client[idx].events = POLLRDNORM; maxpoll = (idx > maxpoll ? idx : maxpoll); if(--nreadly <= 0) { continue; } } for(int i =1; i < OPEN_MAX; i++) { int sockfd = client[i].fd; if(sockfd < 0) { continue; } if(client[i].revents & (POLLRDNORM | POLLERR)) { char line[MAX_LINE]; int n = read(sockfd, line, sizeof(line)); if(n < 0) { if(errno == ECONNRESET) { close(sockfd); client[i].fd = -1; printf("异常退出 "); } else { printf("网络异常"); exit(-1); } } else if(n == 0) { close(sockfd); client[i].fd = -1; printf("正常退出 "); } else { line[n] = 0; printf("接收到数据:%s ", line); write(sockfd, line, n); } if(--nreadly <= 0) { continue; } } } } }
2.3.3 epoll模型
为什么要引入epoll模型,这就要先说一下上面两种I/O复用模型和PPC/TPC模型的缺点了。
常用模型的缺点
PPC/TPC模型:两种模型实现思想类似,通俗地说就是让每个到来的连接用进程/线程去处理,主进程继续做自己的工作。可是开辟进程和线程依然需要时间和空间,连接大量的进程和线程效率会降低。
select/poll模型:两者的效率基本相同,poll模型依然具有select模型上的缺点。关于select模型的缺点,在select模型一节已经讲过,在此不再重复。
epoll模型的优点:
1、支持一个进程打开大数目的socket描述符(FD)
select 最不能忍受的是一个进程所打开的FD是有一定限制的,由FD_SETSIZE设置,默认值是2048。对于那些需要支持的上万连接数目的IM服务器来说显然太少了。这时候你一是可以选择修改这个宏然后重新编译内核,不过资料也同时指出这样会带来网络效率的下降,二是可以选择多进程的解决方案(传统的 Apache方案),不过虽然linux上面创建进程的代价比较小,但仍旧是不可忽视的,加上进程间数据同步远比不上线程间同步的高效,所以也不是一种完美的方案。不过 epoll则没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。
2、IO效率不随fd数目增加而线性下降
传统的select/poll另一个致命弱点就是当你拥有一个很大的socket集合,不过由于网络延时,任一时间只有部分的socket是"活跃"的,但是select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。但是epoll不存在这个问题,它只会对"活跃"的socket进行操作---这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的。那么,只有"活跃"的socket才会主动的去调用 callback函数,其他idle状态socket则不会,在这点上,epoll实现了一个"伪"AIO,因为这时候推动力在os内核。在一些 benchmark中,如果所有的socket基本上都是活跃的---比如一个高速LAN环境,epoll并不比select/poll有什么效率,相反,如果过多使用epoll_ctl,效率相比还有稍微的下降。但是一旦使用idle connections模拟WAN环境,epoll的效率就远在select/poll之上了。
3、使用mmap加速内核与用户空间的消息传递。
这点实际上涉及到epoll的具体实现了。无论是select、poll还是epoll都需要内核把fd消息通知给用户空间,如何避免不必要的内存拷贝就很重要,在这点上,epoll是通过内核于用户空间mmap同一块内存实现的。
4、内核微调
这一点其实不算epoll的优点了,而是整个linux平台的优点。也许你可以怀疑linux平台,但是你无法回避linux平台赋予你微调内核的能力。比如,内核TCP/IP协议栈使用内存池管理sk_buff结构,那么可以在运行时期动态调整这个内存pool(skb_head_pool)的大小--- 通过echo XXXX>/proc/sys/net/core/hot_list_length完成。再比如listen函数的第2个参数(TCP完成3次握手的数据包队列长度),也可以根据你平台内存大小动态调整。更甚至在一个数据包面数目巨大但同时每个数据包本身大小却很小的特殊系统上尝试最新的NAPI网卡驱动架构。
epoll模型的工作模式
1)LT模式
LT:level triggered,缺省的工作方式,同时支持block和no-block socket。在这种模式中,内核会通知用户文件描述符是否就绪,然后就可以对这个就绪的fd进行I/O操作了。如果不作任何操作,内核还是会继续发出通知消息。所以,这种模式下编程出现错误的可能性要小一点。传统的select/poll模型都是这种工作模式。
2)ET模式
ET:edge triggered,高速工作模式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核就通过epoll告诉你,然后就假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,知道你做了某些操作而导致那个描述符不再是就绪状态(比如在发送、接收或者接受请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK错误),内核就不会发送更多的通知(only once)。不过在tcp协议中,ET模式的加速效果仍需要更多的benchmark确认。
epoll模型的API
1)int epoll_create(int size);
创建一个epoll句柄,size用来告诉内核这个监听的数目一共有多少。这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就会占用一个fd值,在Linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
2)int epoll_ctl(int epfd,int op,int fd,struct epoll *event);
epoll的事件注册函数,它不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。第一个参数是epoll_create()的返回值,第二个参数表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd。
第三个参数就是需要监听的fd,第四个参数告诉内核需要监听什么事件。
struct epoll_event的结构如下:
typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t; struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ };
events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。
3)int epoll_wait(int epfd,struct epoll_event *event,int maxevents,int timeout);
等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告诉内核这个events有多大,但是maxevents不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定)。该函数返回需要处理的事件的数目,如果返回0表示已超时。
epoll模型的使用示例
// // a simple echo server using epoll in linux //
#include <sys/socket.h> #include <sys/epoll.h> #include <netinet/in.h> #include <arpa/inet.h> #include <fcntl.h> #include <unistd.h> #include <stdio.h> #include <errno.h> #include <iostream> using namespace std; #define MAX_EVENTS 500 struct myevent_s { int fd; void (*call_back)(int fd, int events, void *arg); int events; void *arg; int status; // 1: in epoll wait list, 0 not in char buff[128]; // recv data buffer int len; long last_active; // last active time }; // set event void EventSet(myevent_s *ev, int fd, void (*call_back)(int, int, void*), void *arg) { ev->fd = fd; ev->call_back = call_back; ev->events = 0; ev->arg = arg; ev->status = 0; ev->last_active = time(NULL); } // add/mod an event to epoll void EventAdd(int epollFd, int events, myevent_s *ev) { struct epoll_event epv = {0, {0}}; int op; epv.data.ptr = ev; epv.events = ev->events = events; if(ev->status == 1){ op = EPOLL_CTL_MOD; } else{ op = EPOLL_CTL_ADD; ev->status = 1; } if(epoll_ctl(epollFd, op, ev->fd, &epv) < 0) printf("Event Add failed[fd=%d]/n", ev->fd); else printf("Event Add OK[fd=%d]/n", ev->fd); } // delete an event from epoll void EventDel(int epollFd, myevent_s *ev) { struct epoll_event epv = {0, {0}}; if(ev->status != 1) return; epv.data.ptr = ev; ev->status = 0; epoll_ctl(epollFd, EPOLL_CTL_DEL, ev->fd, &epv); } int g_epollFd; myevent_s g_Events[MAX_EVENTS+1]; // g_Events[MAX_EVENTS] is used by listen fd void RecvData(int fd, int events, void *arg); void SendData(int fd, int events, void *arg); // accept new connections from clients void AcceptConn(int fd, int events, void *arg) { struct sockaddr_in sin; socklen_t len = sizeof(struct sockaddr_in); int nfd, i; // accept if((nfd = accept(fd, (struct sockaddr*)&sin, &len)) == -1) { if(errno != EAGAIN && errno != EINTR) { printf("%s: bad accept", __func__); } return; } do { for(i = 0; i < MAX_EVENTS; i++) { if(g_Events[i].status == 0) { break; } } if(i == MAX_EVENTS) { printf("%s:max connection limit[%d].", __func__, MAX_EVENTS); break; } // set nonblocking if(fcntl(nfd, F_SETFL, O_NONBLOCK) < 0) break; // add a read event for receive data EventSet(&g_Events[i], nfd, RecvData, &g_Events[i]); EventAdd(g_epollFd, EPOLLIN|EPOLLET, &g_Events[i]); printf("new conn[%s:%d][time:%d]/n", inet_ntoa(sin.sin_addr), ntohs(sin.sin_port), g_Events[i].last_active); }while(0); } // receive data void RecvData(int fd, int events, void *arg) { struct myevent_s *ev = (struct myevent_s*)arg; int len; // receive data len = recv(fd, ev->buff, sizeof(ev->buff)-1, 0); EventDel(g_epollFd, ev); if(len > 0) { ev->len = len; ev->buff[len] = '/0'; printf("C[%d]:%s/n", fd, ev->buff); // change to send event EventSet(ev, fd, SendData, ev); EventAdd(g_epollFd, EPOLLOUT|EPOLLET, ev); } else if(len == 0) { close(ev->fd); printf("[fd=%d] closed gracefully./n", fd); } else { close(ev->fd); printf("recv[fd=%d] error[%d]:%s/n", fd, errno, strerror(errno)); } } // send data void SendData(int fd, int events, void *arg) { struct myevent_s *ev = (struct myevent_s*)arg; int len; // send data len = send(fd, ev->buff, ev->len, 0); ev->len = 0; EventDel(g_epollFd, ev); if(len > 0) { // change to receive event EventSet(ev, fd, RecvData, ev); EventAdd(g_epollFd, EPOLLIN|EPOLLET, ev); } else { close(ev->fd); printf("recv[fd=%d] error[%d]/n", fd, errno); } } void InitListenSocket(int epollFd, short port) { int listenFd = socket(AF_INET, SOCK_STREAM, 0); fcntl(listenFd, F_SETFL, O_NONBLOCK); // set non-blocking printf("server listen fd=%d/n", listenFd); EventSet(&g_Events[MAX_EVENTS], listenFd, AcceptConn, &g_Events[MAX_EVENTS]); // add listen socket EventAdd(epollFd, EPOLLIN|EPOLLET, &g_Events[MAX_EVENTS]); // bind & listen sockaddr_in sin; bzero(&sin, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_addr.s_addr = INADDR_ANY; sin.sin_port = htons(port); bind(listenFd, (const sockaddr*)&sin, sizeof(sin)); listen(listenFd, 5); } int main(int argc, char **argv) { short port = 12345; // default port if(argc == 2){ port = atoi(argv[1]); } // create epoll g_epollFd = epoll_create(MAX_EVENTS); if(g_epollFd <= 0) printf("create epoll failed.%d/n", g_epollFd); // create & bind listen socket, and add to epoll, set non-blocking InitListenSocket(g_epollFd, port); // event loop struct epoll_event events[MAX_EVENTS]; printf("server running:port[%d]/n", port); int checkPos = 0; while(1){ // a simple timeout check here, every time 100, better to use a mini-heap, and add timer event long now = time(NULL); for(int i = 0; i < 100; i++, checkPos++) // doesn't check listen fd { if(checkPos == MAX_EVENTS) checkPos = 0; // recycle if(g_Events[checkPos].status != 1) continue; long duration = now - g_Events[checkPos].last_active; if(duration >= 60) // 60s timeout { close(g_Events[checkPos].fd); printf("[fd=%d] timeout[%d--%d]./n", g_Events[checkPos].fd, g_Events[checkPos].last_active, now); EventDel(g_epollFd, &g_Events[checkPos]); } } // wait for events to happen int fds = epoll_wait(g_epollFd, events, MAX_EVENTS, 1000); if(fds < 0){ printf("epoll_wait error, exit/n"); break; } for(int i = 0; i < fds; i++){ myevent_s *ev = (struct myevent_s*)events[i].data.ptr; if((events[i].events&EPOLLIN)&&(ev->events&EPOLLIN)) // read event { ev->call_back(ev->fd, events[i].events, ev->arg); } if((events[i].events&EPOLLOUT)&&(ev->events&EPOLLOUT)) // write event { ev->call_back(ev->fd, events[i].events, ev->arg); } } } // free resource return 0; }