一、epoll原理
一个socket对应一个数据流,通过I/O操作中的read从流中读入数据,write向流中写入数据。当read时,socket流中没有数据的话,read阻塞,线程睡眠,CPU开始做其他的任务,流中有数据可读时,read返回。
在阻塞IO模式下,一个线程只能处理一个IO事件。如果处理多个事件,需要多线程或多进程,但是效率比较低。
1、如果采用非阻塞方式,需要不断轮训所有的流,假设共有N个socket流streams[N], 如下:
// busy poll while True: for stream in streams[N]: if stream has data read all data
这种模式最大的缺点是,如果没有socket可读,也将一直占用CPU,浪费CPU时间。
2、为了解决这个问题,引入select,当streams[N]中有k(0 < k <= N)个socket流可操作时才返回,否则阻塞,释放CPU。
// select while True: select(streams[N]) for stream in streams[N]: if stream has data read all data
当streams[N]中没有数据时, 线程阻塞在select处,CPU处理其他任务。select返回时表示有k个流可操作,可是select并没有通知我们是那些流,因此我们需要轮询所有的N个流,
时间复杂度为O(N). 在N比较小时,这样处理ok,但是当连接数达到数万甚至几十万时(C10K问题),select的轮询机制会导致效率低下。
3、epoll则解决了selec后的轮询。epoll会返回每个可操作的socket流以及这些流产生了那些IO事件。
// epoll while True: active_streams[k] = epoll(streams[N]) for stream in active_streams[k] deal all data
这样epoll将复杂度降低为O(1), 提高了效率[1], 解决了C10K问题。
二、epoll API介绍
1、int epoll_create(int size);
创建epoll描述符fd,size用来告诉内核这个监听的数目一共有多大,但是Linux 2.6.8以后,这个参数已经弃用。
2、int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev);
epfd: epoll_create 创建的描述符
op:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
通过epoll_add、epoll_del、epoll_mod可以实现对三个操作的封装:
1 int event_add(int epollfd, int fd, int event) 2 { 3 struct epoll_event ev; 4 ev.events = event; 5 ev.data.fd = fd; 6 7 return epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev); 8 } 9 10 int event_del(int epollfd, int fd, int event) 11 { 12 struct epoll_event ev; 13 ev.events = event; 14 ev.data.fd = fd; 15 16 return epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev); 17 } 18 19 int event_mod(int epollfd, int fd, int event) 20 { 21 struct epoll_event ev; 22 ev.events = event; 23 ev.data.fd = fd; 24 25 return epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev); 26 }
fd: 需要监听的fd
ev:
//保存触发事件的某个文件描述符相关的数据(与具体使用方式有关) typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t; //感兴趣的事件和被触发的事件 struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ };
events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
3、int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
epoll_wait与select类似,等待事件的发生。events既是有IO发生的流对应的epoll event。maxevents表示监听的fd的最大个数。
timeout:超时事件,0表示立即返回,-1表示永久阻塞。
三、epoll使用
下面以简单的echo服务器,介绍epoll的使用。
echo服务器,接收client发送的字符串,然后原路返回。
1 int main(void) 2 { 3 int listenfd = -1; 4 int epollfd = -1; 5 struct sockaddr_in svraddr; 6 struct epoll_event events[EVENT_SIZE]; 7 int nready = 0; 8 char buf[BUF_SIZE]; 9 int i = 0; 10 int fd = 0; 11 12 if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) 13 { 14 perror("socket error"); 15 return -1; 16 } 17 18 memset(&svraddr, 0, sizeof(svraddr)); 19 svraddr.sin_family = AF_INET; 20 svraddr.sin_port = htons(44888); 21 svraddr.sin_addr.s_addr = htonl(INADDR_ANY); 22 23 if (bind(listenfd, (struct sockaddr* )&svraddr, sizeof(svraddr)) < 0) 24 { 25 perror("bind error"); 26 return -1; 27 } 28 29 if (listen(listenfd, 5) < 0) 30 { 31 perror("listen error"); 32 return -1; 33 } 34 35 if ((epollfd = epoll_create(EVENT_SIZE)) < 0) 36 { 37 perror("bind error"); 38 return -1; 39 } 40 event_add(epollfd, listenfd, EPOLLIN); 41 42 printf("listen for sockets ... "); 43 while (1) 44 { 45 nready = epoll_wait(epollfd, events, EVENT_SIZE, -1); 46 for (i = 0; i < nready; i++) 47 { 48 fd = events[i].data.fd; 49 if ((fd == listenfd) && (events[i].events & EPOLLIN)) 50 { 51 do_accept(epollfd, listenfd); 52 } 53 else if (events[i].events & EPOLLIN) 54 { 55 do_read(epollfd, fd, buf, BUF_SIZE - 1); 56 } 57 else if (events[i].events & EPOLLOUT) 58 { 59 do_write(epollfd, fd, buf, BUF_SIZE - 1); 60 } 61 else 62 { 63 printf("unused fd %d, event %d", fd, events[i].events); 64 } 65 } 66 } 67 68 return 0; 69 } 70 71 int do_accept(int epollfd, int listenfd) 72 { 73 struct sockaddr_in cliaddr; 74 int clifd = -1; 75 socklen_t len = sizeof(cliaddr); 76 if ((clifd = accept(listenfd, (struct sockaddr* )&cliaddr, &len)) < 0) 77 { 78 perror("accept error"); 79 return -1; 80 } 81 82 printf("accept client: <%s:%d> ", inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port); 83 // add client in read list 84 event_add(epollfd, clifd, EPOLLIN); 85 } 86 87 int do_read(int epollfd, int fd, char* buf, int maxsize) 88 { 89 memset(buf, 0, maxsize); 90 int nread = read(fd, buf, maxsize); 91 if (nread <= 0) 92 { 93 if (nread == 0) 94 { 95 printf("client %d close socket. ", fd); 96 } 97 else 98 { 99 printf("client %d read error ", fd); 100 } 101 close(fd); 102 event_del(epollfd, fd, EPOLLIN); 103 return nread; 104 } 105 106 printf("recv from client %d :%s", fd, buf); 107 // set read to write 108 event_mod(epollfd, fd, EPOLLOUT); 109 } 110 111 int do_write(int epollfd, int fd, char* buf, int maxsize) 112 { 113 int nread = write(fd, buf, maxsize); 114 if (nread < 0) 115 { 116 printf("client %d write error ", fd); 117 close(fd); 118 event_del(epollfd, fd, EPOLLIN); 119 return nread; 120 } 121 122 printf("send to client %d :%s", fd, buf); 123 // set write to read 124 event_mod(epollfd, fd, EPOLLIN); 125 }
echo client,从终端输入字符串,然后发送给server,并接收server返回的字符串,答应到终端。
1 int main(void) 2 { 3 int sockfd = -1; 4 int epollfd = -1; 5 struct sockaddr_in svraddr; 6 struct epoll_event events[EVENT_SIZE]; 7 int nready = 0; 8 char buf[BUF_SIZE]; 9 int i = 0; 10 int fd = 0; 11 12 if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) 13 { 14 perror("socket error"); 15 return -1; 16 } 17 18 memset(&svraddr, 0, sizeof(svraddr)); 19 svraddr.sin_family = AF_INET; 20 svraddr.sin_port = htons(44888); 21 inet_pton(AF_INET, "127.0.0.1", &svraddr.sin_addr); 22 23 if (connect(sockfd, (struct sockaddr* )&svraddr, sizeof(svraddr)) < 0) 24 { 25 perror("connect error"); 26 return -1; 27 } 28 printf("connect to server: <127.0.0.1:44888> "); 29 30 if ((epollfd = epoll_create(EVENT_SIZE)) < 0) 31 { 32 perror("bind error"); 33 return -1; 34 } 35 event_add(epollfd, STDIN_FILENO, EPOLLIN); 36 37 while (1) 38 { 39 nready = epoll_wait(epollfd, events, EVENT_SIZE, -1); 40 //printf("EVENT SIZE %d ", nready); 41 for (i = 0; i < nready; i++) 42 { 43 fd = events[i].data.fd; 44 if (events[i].events & EPOLLIN) 45 { 46 //printf("IN EVENT %d:%d ", fd, EPOLLIN); 47 do_read(epollfd, fd, sockfd, buf, BUF_SIZE - 1); 48 } 49 else if (events[i].events & EPOLLOUT) 50 { 51 //printf("OUT EVENT %d:%d ", fd, EPOLLOUT); 52 do_write(epollfd, fd, sockfd, buf, BUF_SIZE - 1); 53 } 54 else 55 { 56 printf("unused fd %d, event %d", fd, events[i].events); 57 } 58 } 59 } 60 61 return 0; 62 } 63 64 int do_read(int epollfd, int fd, int sockfd, char* buf, int maxsize) 65 { 66 memset(buf, 0, maxsize); 67 int nread = read(fd, buf, maxsize); 68 if (nread <= 0) 69 { 70 if (nread == 0) 71 { 72 printf("client %d close socket. ", fd); 73 } 74 else 75 { 76 printf("client %d read error ", fd); 77 } 78 close(fd); 79 event_del(epollfd, fd, EPOLLIN); 80 return nread; 81 } 82 83 if (fd == STDIN_FILENO) // std input 84 { 85 printf("cli input %d: %s", fd, buf); 86 event_add(epollfd, sockfd, EPOLLOUT); 87 } 88 else 89 { 90 printf("read from sock %d: %s", fd, buf); 91 event_del(epollfd, sockfd, EPOLLIN); 92 event_add(epollfd, STDOUT_FILENO, EPOLLOUT); 93 } 94 } 95 96 int do_write(int epollfd, int fd, int sockfd, char* buf, int maxsize) 97 { 98 int nread = write(fd, buf, maxsize); 99 if (nread < 0) 100 { 101 printf("client %d write error ", fd); 102 close(fd); 103 event_del(epollfd, fd, EPOLLIN); 104 return nread; 105 } 106 107 if (fd == STDOUT_FILENO) 108 { 109 printf("write to stdin %d: %s", fd, buf); 110 event_del(epollfd, fd, EPOLLOUT); 111 } 112 else 113 { 114 printf("write to sock %d: %s", fd, buf); 115 event_mod(epollfd, fd, EPOLLIN); 116 } 117 }