接收端1024个读出线程,每个线程处理一个socket, 先select,再recv.
发送端布置在4个发送结点:cmm02node02, cmm02node03, cmm02node06, cmm02node07, 每个节点256个发送线程,
socket缓冲区: 发送缓冲区16MB, 接收缓冲区8MB
用 bw.sh 统计得到的接收端的总带宽为: 16.9Gb/s.
发送端单个线程的带宽随时间的变化情况如下图所示:可以满足要求。
统计带宽代码:bw.sh
发送端代码:server1bak.c
接收端代码:client1_multi_select.c
1 #include <stdio.h> 2 #include <stdlib.h> 3 4 #include<pthread.h> 5 #include <unistd.h> 6 #include <time.h> 7 8 #include<sys/ioctl.h> 9 #include <sys/socket.h> 10 #include <arpa/inet.h> 11 #include <netinet/in.h> 12 #include<sys/time.h> 13 #include<string> 14 15 #define PORT 33333 16 17 #define SOCKNUM 1024 18 #define THREAD_NUM 1024 19 #define SOCKET_PER_THREAD 1 20 #define SERVER_NUM 4 21 #define MSGSIZE 2048 22 23 typedef struct{ 24 int sock[SOCKET_PER_THREAD]; 25 }ARG; 26 27 void printOpt(int sock) 28 { 29 int opt; 30 socklen_t len = sizeof(int); 31 getsockopt(sock, SOL_SOCKET, SO_RCVBUF, &opt, &len); 32 printf("recv buf size: %d ", opt) ; 33 34 getsockopt(sock, SOL_SOCKET, SO_SNDBUF, &opt, &len); 35 printf("send buf size: %d ", opt) ; 36 37 } 38 39 40 int SetSocketOptions(int fd) 41 { 42 int sockopt = 0; 43 int SOCKET_ERROR = -1; 44 static const int c_so_rcvbuf = 8*1024*1024; 45 static const int c_so_sndbuf = 8*1024*1024; 46 47 sockopt = 1; 48 if ( setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&sockopt, sizeof(sockopt)) == SOCKET_ERROR ) 49 { 50 perror("set reuseaddr error"); 51 return -1; 52 } 53 54 sockopt = c_so_sndbuf; 55 if ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (char*)&sockopt, sizeof(sockopt)) == SOCKET_ERROR ) 56 { 57 perror("set so_sndbuf error"); 58 return -1; 59 } 60 61 sockopt = c_so_rcvbuf; 62 if ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (char*)&sockopt, sizeof(sockopt)) == SOCKET_ERROR ) 63 { 64 perror("set so_rcvbuf error"); 65 return -1; 66 } 67 } 68 69 70 int poll(int socket) 71 { 72 fd_set recv_fds; 73 struct timeval c_select_timeout = {0, 0}; 74 FD_ZERO(&recv_fds); 75 FD_SET(socket, &recv_fds); 76 77 int sel = select(socket+1, &recv_fds, NULL, NULL, &c_select_timeout); 78 if(sel == 0) 79 { 80 return 0; 81 } 82 else if(sel > 0) 83 { 84 if( FD_ISSET(socket, &recv_fds) ) // socket is readable 85 { 86 int length; 87 int status = ioctl(socket, FIONREAD, &length); 88 if(status == -1) 89 { 90 printf("Error reading input size "); 91 } 92 if(length) 93 { 94 //printf("data length in socket buffer: %lf MB ", length/1024.0/1024.0); 95 return 1; 96 } 97 else 98 { 99 printf("Nothing to read, eof?? "); 100 if(socket != -1) 101 { 102 close(socket); 103 socket = -1; 104 } 105 perror("socket flagged but no data available probable EOF"); 106 return 0; 107 } 108 109 } 110 else 111 { 112 if(socket != -1) 113 { 114 close(socket); 115 socket = -1; 116 } 117 perror("FD_ISSET == zero"); 118 } 119 120 } 121 else 122 { 123 if(socket != -1) 124 { 125 close(socket); 126 socket = -1; 127 } 128 perror("select<0 error"); 129 } 130 } 131 132 133 134 int recvdata(int sock, char *buffer) 135 { 136 int msgsize = MSGSIZE; 137 int ret; 138 int nrecv=0; 139 while (nrecv < msgsize) 140 { 141 ret = recv(sock, buffer, msgsize-nrecv, 0); 142 if (ret < 0) 143 { 144 perror("recv fail"); 145 exit(1); 146 } 147 else 148 { 149 nrecv += ret; 150 } 151 } 152 return nrecv; 153 } 154 155 void *recvData(void *arg) 156 { 157 ARG* a = (ARG*)arg; 158 int *socket = a->sock; 159 char buffer[MSGSIZE] = "0"; 160 int count = 0; 161 struct timeval start; 162 struct timeval end; 163 unsigned long timer; 164 gettimeofday(&start,NULL); 165 166 while(1) 167 { 168 for(int i=0; i<SOCKET_PER_THREAD; i++) 169 { 170 if( poll(socket[i]) ) 171 { 172 // for(int num=0; num<300; num++) 173 recvdata(socket[i], buffer); 174 } 175 #if 0 176 count++; 177 gettimeofday(&end,NULL); 178 timer = 1000000 * (end.tv_sec-start.tv_sec)+ end.tv_usec-start.tv_usec; 179 if(timer % 5000000==0) 180 { 181 printf("timer = %ld us, %lf Gb/s ",timer, count*2048.0/timer/1024*8); 182 } 183 #endif 184 } 185 } 186 return 0; 187 } 188 189 190 int main() 191 { 192 int sock[SERVER_NUM][SOCKNUM/SERVER_NUM]; 193 struct sockaddr_in addr_ser[SERVER_NUM]; 194 struct sockaddr_in addr_cli[SERVER_NUM][SOCKNUM/SERVER_NUM]; 195 196 std::string local_ip("192.168.250.141"); 197 198 std::string server_ip[SERVER_NUM] = {"192.168.250.146", "192.168.250.147", "192.168.250.142", "192.168.250.143"}; 199 //std::string server_ip[SERVER_NUM] = {"192.168.251.166", "192.168.251.167", "192.168.251.162", "192.168.251.163"}; 200 // std::string server_ip[SERVER_NUM] = {"192.168.251.163"}; 201 for(int ser=0; ser < SERVER_NUM; ser++) 202 { 203 for(int i=0; i<SOCKNUM/SERVER_NUM; i++) 204 { 205 sock[ser][i] = socket(AF_INET, SOCK_STREAM, 0); 206 if(sock[ser][i] < 0) 207 { 208 printf("%d ", i); 209 perror("create socket fail"); 210 } 211 212 addr_ser[ser].sin_family = AF_INET; 213 addr_ser[ser].sin_port = htons(PORT); 214 addr_ser[ser].sin_addr.s_addr = inet_addr(server_ip[ser].c_str()); 215 216 addr_cli[ser][i].sin_family = AF_INET; 217 addr_cli[ser][i].sin_port = 0; 218 addr_cli[ser][i].sin_addr.s_addr = inet_addr(local_ip.c_str()); 219 220 221 int sockopt = 1; 222 if ( setsockopt(sock[ser][i], SOL_SOCKET, SO_REUSEADDR, (char*)&sockopt, sizeof(sockopt)) == -1 ) 223 { 224 perror("set reuseaddr error"); 225 exit(1); 226 } 227 228 #if 1 229 230 if ( SetSocketOptions(sock[ser][i]) == -1) 231 { 232 perror("set socket options error"); 233 exit(1); 234 } 235 #endif 236 237 238 printOpt(sock[ser][i]); 239 240 if( bind(sock[ser][i], (struct sockaddr*)&addr_cli[ser][i], sizeof(addr_cli[ser][i]) ) < 0 ) 241 { 242 perror("TCP bind: "); 243 exit(1); 244 } 245 printf("bind ok! "); 246 247 if(connect(sock[ser][i], (struct sockaddr*)&addr_ser[ser], sizeof(struct sockaddr)) < 0) 248 { 249 perror("connect fail:"); 250 exit(1); 251 } 252 printf("connect ok! "); 253 254 } 255 256 } 257 258 259 int socket[SOCKNUM] ; 260 int count=0; 261 for(int i=0; i< SERVER_NUM; i++) 262 { 263 for(int j=0; j<SOCKNUM/SERVER_NUM; j++) 264 { 265 socket[count++] = sock[i][j]; 266 } 267 } 268 269 pthread_t tid[THREAD_NUM]; 270 ARG a[THREAD_NUM]; 271 for(int i=0; i<THREAD_NUM; i++) 272 { 273 for(int j=0; j<SOCKET_PER_THREAD; j++) 274 { 275 a[i].sock[j] = socket[i*SOCKET_PER_THREAD+j]; 276 } 277 pthread_create(&tid[i], 0, recvData, (void *)&a[i]); 278 } 279 280 for(int i=0; i<SOCKNUM; i++) 281 { 282 pthread_join(tid[i], 0); 283 } 284 285 return 0; 286 }