zoukankan      html  css  js  c++  java
  • 1024个读出线程的测试结果

    接收端1024个读出线程,每个线程处理一个socket, 先select,再recv.

    发送端布置在4个发送结点:cmm02node02, cmm02node03, cmm02node06, cmm02node07, 每个节点256个发送线程,

    socket缓冲区: 发送缓冲区16MB, 接收缓冲区8MB

    用 bw.sh 统计得到的接收端的总带宽为: 16.9Gb/s.

    发送端单个线程的带宽随时间的变化情况如下图所示:可以满足要求。

    统计带宽代码:bw.sh

    发送端代码:server1bak.c

    接收端代码:client1_multi_select.c

     1 #include <stdio.h>
      2 #include <stdlib.h>
      3 
      4 #include<pthread.h>
      5 #include <unistd.h>
      6 #include <time.h>
      7 
      8 #include<sys/ioctl.h>
      9 #include <sys/socket.h>
     10 #include <arpa/inet.h>
     11 #include <netinet/in.h>
     12 #include<sys/time.h>
     13 #include<string>
     14 
     15 #define PORT 33333
     16 
     17 #define SOCKNUM 1024
     18 #define THREAD_NUM 1024
     19 #define SOCKET_PER_THREAD 1
     20 #define SERVER_NUM 4
     21 #define MSGSIZE 2048
     22 
     23 typedef struct{
     24     int sock[SOCKET_PER_THREAD];
     25 }ARG;
     26 
     27 void printOpt(int sock)
     28 {
     29     int opt;
     30     socklen_t len = sizeof(int);
     31     getsockopt(sock, SOL_SOCKET, SO_RCVBUF, &opt, &len);
    32     printf("recv buf size: %d
    ", opt) ;
     33 
     34     getsockopt(sock, SOL_SOCKET, SO_SNDBUF, &opt, &len);
     35     printf("send buf size: %d
    ", opt) ;
     36 
     37 }
     38 
     39 
     40 int SetSocketOptions(int fd)
     41 {
     42     int sockopt = 0;
     43     int SOCKET_ERROR = -1;
     44     static const int c_so_rcvbuf = 8*1024*1024;
     45     static const int c_so_sndbuf = 8*1024*1024;
     46 
     47     sockopt = 1;
     48     if ( setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&sockopt, sizeof(sockopt)) == SOCKET_ERROR )
     49     {
     50         perror("set reuseaddr error");
     51         return -1;
     52     }
     53 
     54     sockopt = c_so_sndbuf;
     55     if ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (char*)&sockopt, sizeof(sockopt)) == SOCKET_ERROR )
     56     {
     57         perror("set so_sndbuf error");
     58         return -1;
     59     }
     60 
     61     sockopt = c_so_rcvbuf;
    62     if ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (char*)&sockopt, sizeof(sockopt)) == SOCKET_ERROR )
     63     {
     64         perror("set so_rcvbuf error");
     65         return -1;
     66     }
     67 }
     68 
     69 
     70 int poll(int socket)
     71 {
     72     fd_set recv_fds;
     73     struct timeval c_select_timeout = {0, 0};
     74     FD_ZERO(&recv_fds);
     75     FD_SET(socket, &recv_fds);
     76 
     77     int sel = select(socket+1, &recv_fds, NULL, NULL, &c_select_timeout);
     78     if(sel == 0)
     79     {
     80         return 0;
     81     }
     82     else if(sel > 0)
     83     {
     84         if( FD_ISSET(socket, &recv_fds) ) // socket is readable
     85         {
     86             int length;
     87             int status = ioctl(socket, FIONREAD, &length);
     88             if(status == -1)
     89             {
     90                 printf("Error reading input size
    ");
     91             }
     92             if(length)
     93             {
     94                 //printf("data length in socket buffer: %lf MB
    ", length/1024.0/1024.0);
     95                 return 1;
     96             }
     97             else
     98             {
     99                 printf("Nothing to read, eof??
    ");
    100                 if(socket != -1)
    101                 {
    102                     close(socket);
    103                     socket = -1;
    104                 }
    105                 perror("socket flagged but no data available probable EOF");
    106                 return 0;
    107             }
    108 
    109         }
    110         else
    111         {
    112             if(socket != -1)
    113             {
    114                 close(socket);
    115                 socket = -1;
    116             }
    117             perror("FD_ISSET == zero");
    118         }
    119 
    120     }
    121     else
    122     {
    123         if(socket != -1)
    124         {
    125             close(socket);
    126             socket = -1;
    127         }
    128         perror("select<0 error");
    129     }
    130 }
    131 
    132 
    133 
    134 int recvdata(int sock, char *buffer)
    135 {
    136     int msgsize = MSGSIZE;
    137     int ret;
    138     int nrecv=0;
    139     while (nrecv < msgsize)
    140     {
    141         ret = recv(sock, buffer, msgsize-nrecv, 0);
    142         if (ret < 0)
    143         {
    144             perror("recv fail");
    145             exit(1);
    146         }
    147         else
    148         {
    149             nrecv += ret;
    150         }
    151     }
    152     return nrecv;
    153 }
    154 
    155 void *recvData(void *arg)
    156 {
    157     ARG* a = (ARG*)arg;
    158     int *socket = a->sock;
    159     char buffer[MSGSIZE] = "0";
    160     int count = 0;
    161     struct  timeval  start;
    162     struct  timeval  end;
    163     unsigned long timer;
    164     gettimeofday(&start,NULL);
    165 
    166     while(1)
    167     {
    168         for(int i=0; i<SOCKET_PER_THREAD; i++)
    169         {
    170             if( poll(socket[i]) )
    171             {
    172 //          for(int num=0; num<300; num++)
    173             recvdata(socket[i], buffer);
    174             }
    175 #if 0
    176             count++;
    177             gettimeofday(&end,NULL);
    178             timer = 1000000 * (end.tv_sec-start.tv_sec)+ end.tv_usec-start.tv_usec;
    179             if(timer % 5000000==0)
    180             {
    181                 printf("timer = %ld us, %lf Gb/s
    ",timer, count*2048.0/timer/1024*8);
    182             }
    183 #endif
    184         }
    185     }
    186     return 0;
    187 }
    188 
    189 
    190 int main()
    191 {
    192     int sock[SERVER_NUM][SOCKNUM/SERVER_NUM];
    193     struct sockaddr_in addr_ser[SERVER_NUM];
    194     struct sockaddr_in addr_cli[SERVER_NUM][SOCKNUM/SERVER_NUM];
    195 
    196     std::string local_ip("192.168.250.141");
    197 
    198     std::string server_ip[SERVER_NUM] = {"192.168.250.146", "192.168.250.147", "192.168.250.142", "192.168.250.143"};
    199     //std::string server_ip[SERVER_NUM] = {"192.168.251.166", "192.168.251.167", "192.168.251.162", "192.168.251.163"}; 
    200 //  std::string server_ip[SERVER_NUM] = {"192.168.251.163"}; 
    201     for(int ser=0; ser < SERVER_NUM; ser++)
    202     {
    203     for(int i=0; i<SOCKNUM/SERVER_NUM; i++)
    204     {
    205         sock[ser][i] = socket(AF_INET, SOCK_STREAM, 0);
    206         if(sock[ser][i] < 0)
    207         {
    208             printf("%d ", i);
    209             perror("create socket fail");
    210         }
    211 
    212         addr_ser[ser].sin_family = AF_INET;
    213         addr_ser[ser].sin_port = htons(PORT);
    214         addr_ser[ser].sin_addr.s_addr = inet_addr(server_ip[ser].c_str());
    215 
    216         addr_cli[ser][i].sin_family = AF_INET;
    217         addr_cli[ser][i].sin_port = 0;
    218         addr_cli[ser][i].sin_addr.s_addr = inet_addr(local_ip.c_str());
    219 
    220 
    221         int sockopt = 1;
    222         if ( setsockopt(sock[ser][i], SOL_SOCKET, SO_REUSEADDR, (char*)&sockopt, sizeof(sockopt)) == -1 )
    223         {
    224             perror("set reuseaddr error");
    225             exit(1);
    226         }
    227 
    228 #if 1 
    229 
    230         if ( SetSocketOptions(sock[ser][i]) == -1)
    231         {
    232             perror("set socket options error");
    233             exit(1);
    234         }
    235 #endif
    236 
    237 
    238         printOpt(sock[ser][i]);
    239 
    240         if( bind(sock[ser][i], (struct sockaddr*)&addr_cli[ser][i], sizeof(addr_cli[ser][i]) ) < 0 )
    241         {
    242             perror("TCP bind: ");
    243             exit(1);
    244         }
    245         printf("bind ok!
    ");
    246 
    247         if(connect(sock[ser][i], (struct sockaddr*)&addr_ser[ser], sizeof(struct sockaddr)) < 0)
    248         {
    249             perror("connect fail:");
    250             exit(1);
    251         }
    252         printf("connect ok!
    ");
    253 
    254     }
    255 
    256     }
    257 
    258 
    259     int socket[SOCKNUM] ;
    260     int count=0;
    261     for(int i=0; i< SERVER_NUM; i++)
    262     {
    263         for(int j=0; j<SOCKNUM/SERVER_NUM; j++)
    264         {
    265             socket[count++] = sock[i][j];
    266         }
    267     }
    268 
    269     pthread_t tid[THREAD_NUM];
    270     ARG a[THREAD_NUM];
    271     for(int i=0; i<THREAD_NUM; i++)
    272     {
    273         for(int j=0; j<SOCKET_PER_THREAD; j++)
    274         {
    275         a[i].sock[j] = socket[i*SOCKET_PER_THREAD+j];
    276         }
    277         pthread_create(&tid[i], 0, recvData, (void *)&a[i]);
    278     }
    279 
    280     for(int i=0; i<SOCKNUM; i++)
    281     {
    282         pthread_join(tid[i], 0);
    283     }
    284 
    285     return 0;
    286 }
  • 相关阅读:
    第四次课堂作业
    12周课后作业
    12周上机(5.21)
    11周周五课后作业
    11周上机作业(5.14)
    第十周(5.7)上机
    第九周4.30上机作业
    第八周周五课后作业
    4.23 第八周上机作业
    4.17课后作业
  • 原文地址:https://www.cnblogs.com/zengtx/p/6803678.html
Copyright © 2011-2022 走看看