59.1 介绍
前面介绍的函数如,recv、send、read 和 write 等函数都是阻塞性函数,若资源没有准备好,则调用该函数的进程将进入阻塞状态。我们可以使用 I/O 多路复用来解决此问题(即解决并发)。
- I/O 多路复用的方式主要有两种实现方法
- fcntl 函数实现(非阻塞方式)
- select 函数实现
59.1.1 fcntl 非阻塞方式——I/O多路复用/转换
59.2 例子
59.2.1 动态数组模块
vector_fd.c
1 #include <malloc.h> 2 #include <stdio.h> 3 #include <assert.h> 4 #include <stdlib.h> 5 #include <memory.h> 6 #include "vector_fd.h" 7 8 static void encapacity(vector_fd *vfd) 9 { 10 if(vfd->counter >= vfd->max_counter){ 11 int *fds = (int *)calloc(vfd->counter + 5, sizeof(int)); 12 assert(fds != NULL); 13 memcpy(fds, vfd->fd, sizeof(int) * vfd->counter); 14 free(vfd->fd); 15 vfd->fd = fds; 16 vfd->max_counter += 5; 17 } 18 } 19 20 static int indexof(vector_fd *vfd, int fd) 21 { 22 int i = 0; 23 for(; i < vfd->counter; i++){ 24 if(vfd->fd[i] == fd) return i; 25 } 26 27 return -1; 28 } 29 30 31 vector_fd *create_vector_fd(void) 32 { 33 vector_fd *vfd = (vector_fd *)calloc(1, sizeof(vector_fd)); 34 assert(vfd != NULL); 35 36 vfd->fd = (int *)calloc(5, sizeof(int)); 37 assert(vfd->fd != NULL); 38 vfd->counter = 0; 39 vfd->max_counter = 0; 40 41 return vfd; 42 } 43 44 45 void destroy_vector_fd(vector_fd *vfd) 46 { 47 assert(vfd != NULL); 48 free(vfd->fd); 49 free(vfd); 50 } 51 52 int get_fd(vector_fd *vfd, int index) 53 { 54 assert(vfd != NULL); 55 if(index < 0 || index > vfd->counter - 1) return 0; 56 57 return vfd->fd[index]; 58 } 59 60 void remove_fd(vector_fd *vfd, int fd) 61 { 62 assert(vfd != NULL); 63 int index = indexof(vfd, fd); 64 if(index == -1) return; 65 int i = index; 66 for(; i < vfd->counter - 1; i++){ 67 vfd->fd[i] = vfd->fd[i + 1]; 68 } 69 vfd->counter--; 70 } 71 72 void add_fd(vector_fd *vfd, int fd) 73 { 74 assert(vfd != NULL); 75 encapacity(vfd); 76 vfd->fd[vfd->counter++] = fd; 77 }
vector_fd.h
1 #ifndef __VECTOR_FD_H__ 2 #define __VECTOR_FD_H__ 3 4 typedef struct { 5 int *fd; 6 int counter; 7 int max_counter; 8 }vector_fd; 9 10 extern vector_fd *create_vector_fd(void); 11 extern void destroy_vector_fd(vector_fd *); 12 extern int get_fd(vector_fd *, int index); 13 extern void remove_fd(vector_fd *, int fd); 14 extern void add_fd(vector_fd *, int fd); 15 16 #endif
编译成模块:gcc -o obj/vector_fd.o -Iinclude -c src/vector_fd.c
59.2.2 服务器端
echo_tcp_server_fcntl.c
1 #include <netdb.h> 2 #include <netinet/in.h> 3 #include <sys/socket.h> 4 #include <sys/wait.h> 5 #include <unistd.h> 6 #include <string.h> 7 #include <stdio.h> 8 #include <stdlib.h> 9 #include <memory.h> 10 #include <signal.h> 11 #include <fcntl.h> 12 #include <time.h> 13 #include <arpa/inet.h> 14 #include <errno.h> 15 #include <pthread.h> 16 #include "vector_fd.h" 17 18 vector_fd *vfd; 19 int sockfd; 20 21 void sig_handler(int signo) 22 { 23 if(signo == SIGINT){ 24 printf("server close "); 25 /** 步骤6: 关闭 socket */ 26 close(sockfd); 27 /** 销毁动态数组 */ 28 destroy_vector_fd(vfd); 29 exit(1); 30 } 31 } 32 33 /** 34 * fd 对应于某个连接的客户端,和某一个连接的客户端进行双向通信(非阻塞方式) 35 */ 36 void do_service(int fd) 37 { 38 char buff[512]; 39 memset(buff, 0, sizeof(buff)); 40 41 /** 42 * 因为采用非阻塞方式,若读不到数据直接返回, 43 * 直接服务于下一个客户端, 44 * 因此不需要判断 size < 0 的情况 */ 45 ssize_t size = read(fd, buff, sizeof(buff)); 46 47 if(size == 0){ 48 /** 客户端已经关闭连接 */ 49 char info[] = "client closed"; 50 write(STDOUT_FILENO, info, sizeof(info)); 51 /** 从动态数组中删除对应的 fd */ 52 remove_fd(vfd, fd); 53 /** 关闭对应客户端的 socket */ 54 close(fd); 55 } 56 else if(size > 0){ 57 write(STDOUT_FILENO, buff, sizeof(buff)); 58 if(write(fd, buff, size) < 0){ 59 if(errno == EPIPE){ 60 /** 客户端关闭连接 */ 61 perror("write error"); 62 remove_fd(vfd, fd); 63 close(fd); 64 } 65 perror("protocal error"); 66 } 67 } 68 } 69 70 void out_addr(struct sockaddr_in *clientaddr) 71 { 72 char ip[16]; 73 memset(ip, 0, sizeof(ip)); 74 int port = ntohs(clientaddr->sin_port); 75 inet_ntop(AF_INET, &clientaddr->sin_addr.s_addr, ip, sizeof(ip)); 76 printf("%s(%d) connected! ", ip, port); 77 } 78 79 void *th_fn(void *arg) 80 { 81 int i; 82 while(1){ 83 i = 0; 84 /** 遍历动态数组中的 socket 描述符 */ 85 for(; i < vfd->counter; i++){ 86 do_service(get_fd(vfd, i)); 87 } 88 } 89 90 return (void *)0; 91 } 92 93 int main(int argc, char *argv[]) 94 { 95 if(argc < 2){ 96 printf("usage: %s #port ", argv[0]); 97 exit(1); 98 } 99 100 if(signal(SIGINT, sig_handler) == SIG_ERR){ 101 perror("signal sigint error"); 102 exit(1); 103 } 104 105 106 /** 步骤1: 创建 socket(套接字) 107 * 注: socket 创建在内核中,是一个结构体. 108 * AF_INET: IPV4 109 * SOCK_STREAM: tcp 协议 110 * AF_INET6: IPV6 111 */ 112 sockfd = socket(AF_INET, SOCK_STREAM, 0); 113 if(sockfd < 0){ 114 perror("socket error"); 115 exit(1); 116 } 117 118 /** 119 * 步骤2: 调用 bind 函数将 socket 和地址(包括 ip、port)进行绑定 120 */ 121 struct sockaddr_in serveraddr; 122 memset(&serveraddr, 0, sizeof(struct sockaddr_in)); 123 /** 往地址中填入 ip、port、internet 地址族类型 */ 124 serveraddr.sin_family = AF_INET; ///< IPV4 125 serveraddr.sin_port = htons(atoi(argv[1])); ///< 填入端口 126 serveraddr.sin_addr.s_addr = INADDR_ANY; ///< 填入 IP 地址 127 if(bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr_in))){ 128 perror("bind error"); 129 exit(1); 130 } 131 132 /** 133 * 步骤3: 调用 listen 函数启动监听(指定 port 监听) 134 * 通知系统去接受来自客户端的连接请求 135 * (将接受到的客户端连接请求放置到对应的队列中) 136 * 第二个参数: 指定队列的长度 137 */ 138 if(listen(sockfd, 10) < 0){ 139 perror("listen error"); 140 exit(1); 141 } 142 143 /** 创建放置套接字描述符 fd 的动态数组 */ 144 vfd = create_vector_fd(); 145 146 /** 设置线程的分离属性 */ 147 pthread_t th; 148 pthread_attr_t attr; 149 pthread_attr_init(&attr); 150 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); 151 int err; 152 if((err = pthread_create(&th, &attr, th_fn, (void *)0)) != 0){ 153 perror("pthread create error"); 154 exit(1); 155 } 156 pthread_attr_destroy(&attr); 157 158 /** 159 * 1)主控线程获得客户端的链接,将新的 socket 描述符放置到动态数组中 160 * 2)启动的子线程负责遍历动态数组中 socket 161 * 描述符,并和对应的客户端进行双向通信(采用非阻塞方式读写) 162 */ 163 struct sockaddr_in clientaddr; 164 socklen_t len = sizeof(clientaddr); 165 166 while(1){ 167 /** 168 * 步骤4: 调用 accept 函数从队列中获得一个客户端的请求连接, 并返回新的 169 * socket 描述符 170 * 注意: 若没有客户端连接,调用此函数后会阻塞, 直到获得一个客户端的连接 171 */ 172 /** 主控线程负责调用 accept 去获得客户端的连接 */ 173 int fd = accept(sockfd, (struct sockaddr *)&clientaddr, &len); 174 if(fd < 0){ 175 perror("accept error"); 176 continue; 177 } 178 179 out_addr(&clientaddr); 180 181 /** 将读写修改为非阻塞方式 */ 182 int val; 183 fcntl(fd, F_GETFL, &val); ///< 获取原来的状态标志 184 val |= O_NONBLOCK; 185 fcntl(fd, F_SETFL, val); ///< 添加新的状态标志 186 187 /** 将返回的新的 socket 描述符加入到动态数组中 */ 188 add_fd(vfd, fd); 189 190 } 191 192 return 0; 193 }
编译:
gcc -o bin/echo_tcp_server_fcntl -Iinclude obj/vector_fd.o src/echo_tcp_server_fcntl.c -lpthread
59.2.3 客户端
echo_tcp_client_fcntl.c
1 #include <sys/types.h> 2 #include <stdlib.h> 3 #include <stdio.h> 4 #include <memory.h> 5 #include <unistd.h> 6 #include <sys/socket.h> 7 #include <netdb.h> 8 #include <signal.h> 9 #include <string.h> 10 #include <time.h> 11 #include <arpa/inet.h> 12 13 14 int main(int argc, char *argv[]) 15 { 16 if(argc < 3){ 17 printf("usage: %s ip port ", argv[0]); 18 exit(1); 19 } 20 21 /** 步骤1: 创建 socket */ 22 int sockfd = socket(AF_INET, SOCK_STREAM, 0); 23 if(sockfd < 0){ 24 perror("socket error"); 25 exit(1); 26 } 27 28 /** 往 serveraddr 中填入 ip、port 和地址族类型(ipv4) */ 29 struct sockaddr_in serveraddr; 30 memset(&serveraddr, 0, sizeof(struct sockaddr_in)); 31 serveraddr.sin_family = AF_INET; 32 serveraddr.sin_port = htons(atoi(argv[2])); 33 /** 将 ip 地址转换成网络字节序后填入 serveraddr 中 */ 34 inet_pton(AF_INET, argv[1], &serveraddr.sin_addr.s_addr); 35 36 /** 37 * 步骤2: 客户端调用 connect 函数连接到服务器端 38 */ 39 if(connect(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr_in)) < 0){ 40 perror("connect error"); 41 exit(1); 42 } 43 44 /** 步骤3: 调用 IO 函数(read/write)和服务器端进行双向通信 */ 45 char buff[512]; 46 ssize_t size; 47 char *prompt = "==>"; 48 while(1){ 49 memset(buff, 0, sizeof(buff)); 50 write(STDOUT_FILENO, prompt, 3); 51 size = read(STDIN_FILENO, buff, sizeof(buff)); 52 if(size < 0) continue; 53 buff[size - 1] = '