zoukankan      html  css  js  c++  java
  • 轻松应对C10k问题

    http://blog.csdn.net/u011011917/article/details/17203539

    传统的、教科书里的I/O复用等待函数select/poll在处理数以万计的客户端连接时,往往出现效率低下甚至完全瘫痪,,这被称为C10K 问题。

    本文尝试着以一个最简单的单线程epoll程序为基础,轻松应对收发数据不频繁的过万客户端并发连接。并以此回顾C10K问题,介绍应对C10K问题的本质方法,线程模式的选择,介绍服务器编程流行的Reactor模式。  顺带介绍怎么应对socket句柄耗尽。

    以下是一段使用Epoll的代码。用它可以轻松应对不繁忙的过万客户端连接。

    为了让它可以工作,需要修改 sudovi /etc/security/limits.conf 增加如下行: 

    * soft nofile 65535

    * hard nofile 65535

     

    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. int main(int argc,char** argv)  
    2. {  
    3.     //Socket初始化  
    4.     socket(AF_INET,SOCK_STREAM,0);  
    5.     bind( listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr))  
    6.         listen(listenfd,5);   
    7.   
    8.     make_socket_non_blocking(listenfd)  
    9.   
    10.         //Create epoll  
    11.         epollfd = epoll_create1(EPOLL_CLOEXEC);   
    12.     event.data.fd = listenfd;  
    13.     event.events = EPOLLIN | EPOLLET;  
    14.     val = epoll_ctl(epollfd,EPOLL_CTL_ADD,listenfd,&event  
    15.         //应对句柄耗尽问题,实现开无用一个句柄候着  
    16.         idleFd = open("/dev/null", O_RDONLY | O_CLOEXEC);  
    17.     for(;;)  
    18.     {  
    19.         //阻塞等待epoll事件  
    20.         int nfds = epoll_wait(epollfd,events,MAXEVENTS,-1);  
    21.         //有nfds个句柄待处理  
    22.         for(i = 0; i < nfds; i++)  
    23.         {  
    24.             if((events[i].events & EPOLLERR)  
    25.                 ||(events[i].events & EPOLLHUP)  
    26.                 ||(!(events[i].events & EPOLLIN))  
    27.                 )  
    28.             {//出错处理  
    29.                 close(events[i].data.fd);  
    30.                 continue;  
    31.             }  
    32.             else if(listenfd == events[i].data.fd)  
    33.             {//accept,一定要accept 到errno ==EAGAIN 为止  
    34.                 while(1)  
    35.                 {  
    36.                     infd = accept(events[i].data.fd,(struct sockaddr*)&in_addr,&in_len);  
    37.                     if(infd < 0)  
    38.                     {  
    39.                         switch (errno)   
    40.                         {  
    41.                         case EAGAIN:  
    42.                         case ECONNABORTED:  
    43.                         case EINTR:  
    44.                         case EPROTO:   
    45.                         case EPERM:  
    46.                             {  
    47.                                 //忽略  
    48.                             }  
    49.                             break;  
    50.                         case EMFILE: // 句柄耗尽  
    51.                             // 先关闭空句柄,再将本次连接的句柄accept进来关掉它,再打开空句柄候着。  
    52.                             close(idleFd);                              
    53.                             idleFd= accept(events[i].data.fd,(struct sockaddr*)&in_addr,&in_len);                             
    54.                             close(idleFd);  
    55.                             idleFd = open("/dev/null",O_RDONLY | O_CLOEXEC);  
    56.                             break;  
    57.                         default:  
    58.                             //错误处理,有可能是accept期间对方断开了连接等。  
    59.                             break;  
    60.                         }  
    61.                     }  
    62.   
    63.                     val = make_socket_non_blocking(infd);  
    64.                     event.data.fd = infd;  
    65.                     event.events = EPOLLIN|EPOLLET;  
    66.                     val = epoll_ctl(epollfd,EPOLL_CTL_ADD,infd,&event);  
    67.   
    68.                 }//while(1)  
    69.                 continue;  
    70.             }//Accept  
    71.             else if(events[i].events & EPOLLIN)  
    72.             {//Read data  
    73.                 //读数据,一定要读到(errno == EAGAIN)为止  
    74.   
    75.             }  
    76.             else if(events[i].events & EPOLLOUT)  
    77.             {//发数据,一定要发完,或者发到(errno == EAGAIN)为止                ;  
    78.             }  
    79.         }  
    80.     }  
    81. }  



    这就是epoll的程序框架。很重要的一点是,它将编程的思维转变过来。由主动accept连接,主动调用send/receive改为当相应句柄数据准备好时,由操作系统通知你来处理事件,处理数据。这个就是Reactor模式的基本要点

    网上有很多关于epoll的例子,因此我不打算深入展开解释。简单提一下ET模式和LT模式,

    ET模式被称为高速模式,是当句柄可用时epoll只通知你一次,因此在代码里我们需要用循环抱住所有的事件处理,一直处理到数据为空为止。否则会收不到接下来的事件。在这个模式下,意味着如果你有大数据需要接收,或者大数据需要发送,不可避免地会对本线程的其它连接造成影响。它的好处是通过减少epoll相关的系统调用(epoll_wait,epoll_ctl)来增加效率。坏处是编程复杂,容易出错。

    LT模式是epoll的缺省处理模式,只要句柄就绪了,epoll就会不停地通知你。一直到你处理完毕为止。用它编程简单,可以均衡线程中的各个连接处理。但是用它关注EPOLLOUT时会有点小麻烦。每当socket可写时,不管你有没有数据要发送,它都会不停地通知你。于是腾讯有这么一道面试题:epoll水平触发模式(Level-Triggered);当socket可写时,会不停的触发socket可写的事件,如何处理?解决办法是只有数据要发送时才关注EPOLLOUT,一旦数据发送完毕,立刻调用epoll_ctl停止观察。

    ET模式和LT模式谁的效率更高则有待时间检验,目前缺乏相应的基准测试数据。

     

    回头来再来看看C10K问题。没错,我们已经解决它了。C10K的原文在这里,英文不好的人可以看看翻译版的。C10K最大的特点是提升机器性能并不能相应提升程序的吞吐量

    应对C10K主要有两方面的策略:

    1) 应用程序以何种方式和操作系统合作,获取I/O事件并调度多个Socket上的I/O操作?

    2) 应用程序以何种方式处理任务和线程/进程的关系?

    我们先来看策略1:

    传统的select/poll函数返回后需要对传入的句柄列表做一次扫描来确定引发事件的相应句柄,其单个任务消耗的资源和当前连接的关系是O(n),CPU占用率和并发数近似成O(n2)。

    Epoll则用一个数组只返回就绪的句柄。单个任务和连接的关系是O(1)。在有大量空闲的情况下无疑效率要高出一大截来。

    我们再来看策略2:

    本质问题时你要create几个线程,每个线程用来干什么?废话不多说,我只谈我自己的理解:再多CPU多核的环境下,效率最高的线程数量是和CPU个数*核数相等。这样可以避开操作系统线程调度的开销。

    简单的处理方式是,一个线程负责accept,其它线程负责send/receive。线程的总数固定。好处是编程简单,计算任务可以直接在send/receive线程中完成,当出现某个计算任务过大时不会把系统跑死。一般用这个方案就可以了。

    复杂方案是按千兆比特每秒来配置send/receive线程,也就是一个千兆以太网卡一个线程,其它的放入线程池,用来计算用。

    再复杂的线程配置方案则可以开一个专题来讲,半同步半异步模式、领导追随者模式等。

     

    附上自己随手写的的单线程epoll程序,是上面代码的完整版。ET模式下收到数据后返回8K内容。

     

    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
      1. #include <stdlib.h>  
      2. #include <sys/epoll.h>  
      3. #include <stdio.h>  
      4. #include <sys/socket.h>  
      5. #include <netinet/in.h>  
      6. #include <fcntl.h>  
      7. #include <unistd.h>  
      8. #include <string.h>  
      9. #include <errno.h>  
      10. #include <string.h>  
      11. #define MAXEVENTS 64  
      12. #define PORT 2981  
      13. int make_socket_non_blocking (int sfd)  
      14. {  
      15.     int flags, s;  
      16.   
      17.     flags = fcntl (sfd, F_GETFL, 0);  
      18.     if (flags == -1)  
      19.     {  
      20.         perror ("fcntl");  
      21.         return -1;  
      22.     }  
      23.   
      24.     flags |= O_NONBLOCK;  
      25.     s = fcntl (sfd, F_SETFL, flags);  
      26.     if (s == -1)  
      27.     {  
      28.         perror ("fcntl");  
      29.         return -1;  
      30.     }  
      31.   
      32.     return 0;  
      33. }  
      34. struct StruEventBuf  
      35. {  
      36.     int fd;  
      37.     char *pszData;  
      38.     int nSize;  
      39.     int nStart;  
      40. };  
      41. int main(int argc,char** argv)  
      42. {  
      43.     int listenfd = 0;  
      44.     struct sockaddr_in servaddr;  
      45.     int epollfd = 0;  
      46.     int val = 0;  
      47.     struct epoll_event event;  
      48.     struct epoll_event events[MAXEVENTS];  
      49.     int i = 0;  
      50.     int j = 0;  
      51.     int idleFd;  
      52.     int nPort = 0;  
      53.     int flag;  
      54.   
      55.     int nSendBuf=10;    // 设置为32K  
      56.   
      57.     socklen_t optlen;  
      58.     idleFd = open("/dev/null", O_RDONLY | O_CLOEXEC);  
      59.     //Create Socket and bind  
      60.     listenfd = socket(AF_INET,SOCK_STREAM,0);  
      61.     if( listenfd < 0)  
      62.     {  
      63.         printf("socket error ");  
      64.         return 1;  
      65.     }  
      66.   
      67.     bzero(events,sizeof(events));  
      68.     bzero(&servaddr,sizeof(servaddr));  
      69.     servaddr.sin_family = AF_INET;  
      70.     servaddr.sin_addr.s_addr = htonl(INADDR_ANY);  
      71.     servaddr.sin_port = htons(PORT);  
      72.   
      73.     flag  = 1;  
      74.     optlen = sizeof(flag);  
      75.     setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&flag,optlen);  
      76.     if( bind( listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) < 0)  
      77.     {  
      78.         printf("bind error:(%d) %s ", errno, strerror(errno));  
      79.     }  
      80.     val = listen(listenfd,1024);  
      81.     //Listen  
      82.     if(-1 == val )  
      83.     {  
      84.         printf("listen error,errno = %d, %s ",errno, strerror(errno));  
      85.         return 1;  
      86.     }  
      87.     //Set socket opention to no block  
      88.     val = fcntl(listenfd,F_GETFL,0);  
      89.     fcntl(listenfd,F_SETFL, val | O_NONBLOCK);  
      90.   
      91.     val = fcntl(STDIN_FILENO,F_GETFL,0);  
      92.     fcntl(STDIN_FILENO,F_SETFL, val | O_NONBLOCK);  
      93.   
      94.     val = fcntl(STDOUT_FILENO,F_GETFL,0);  
      95.     fcntl(STDOUT_FILENO,F_SETFL, val | O_NONBLOCK);  
      96.   
      97.     //Create epoll  
      98.     epollfd = epoll_create1(EPOLL_CLOEXEC);  
      99.     printf("epollfd = %d ",epollfd);  
      100.   
      101.     struct StruEventBuf *pStruEventBufListen = (struct StruEventBuf*)malloc(sizeof(struct StruEventBuf));  
      102.     bzero(pStruEventBufListen,sizeof(struct StruEventBuf));  
      103.     pStruEventBufListen->fd = listenfd;  
      104.     event.data.ptr = pStruEventBufListen;  
      105.     event.events = EPOLLIN | EPOLLET;  
      106.     val = epoll_ctl(epollfd,EPOLL_CTL_ADD,listenfd,&event);  
      107.     if(-1 == val)  
      108.     {  
      109.         printf("epoll_ctl error ");  
      110.     }  
      111.     for(;;)  
      112.     {  
      113.         int nfds = epoll_wait(epollfd,events,MAXEVENTS,-1);  
      114.         if(-1 == nfds)  
      115.         {  
      116.             printf("epoll_pwat error ");  
      117.             return 1;  
      118.         }  
      119.         for(i = 0; i < nfds; i++)  
      120.         {  
      121.             struct StruEventBuf*pStruEventBuf = (struct StruEventBuf*) events[i].data.ptr;  
      122.             int fd = pStruEventBuf->fd;  
      123.             if((events[i].events & EPOLLERR)  
      124.                 ||(events[i].events & EPOLLHUP)  
      125.                 ||(!(events[i].events & (EPOLLIN | EPOLLOUT)))  
      126.                 )  
      127.             {  
      128.                 printf("epoll error fd = %d, events = %0x, errno = %d, %s "  
      129.                     , fd,events[i].events,errno  
      130.                     ,strerror(errno));  
      131.                 close(fd);  
      132.                 continue;  
      133.             }  
      134.             else if(listenfd == fd)  
      135.             {//accept  
      136.                 while(1)  
      137.                 {  
      138.                     struct sockaddr_in in_addr;  
      139.                     socklen_t in_len;  
      140.                     int infd;  
      141.   
      142. #define NI_MAXHOST 100  
      143. #define NI_MAXSERV 100  
      144.                     char hbuf[NI_MAXHOST],sbuf[NI_MAXSERV];  
      145.   
      146.                     in_len = sizeof(in_addr);  
      147.                     infd = accept(fd,(struct sockaddr*)&in_addr,&in_len);  
      148.                     if(infd < 0)  
      149.                     {  
      150.                         switch (errno)   
      151.                         {  
      152.                         case EAGAIN:  
      153.                         case ECONNABORTED:  
      154.                         case EINTR:  
      155.                         case EPROTO: // ???  
      156.                         case EPERM:  
      157.                             {  
      158.                                 //    printf("accept expected error %d,%s ", errno,strerror(errno));  
      159.                             }  
      160.                             break;  
      161.                         case EMFILE: // per-process lmit of open file desctiptor ???  
      162.                             // expected errors  
      163.                             close(idleFd);  
      164.   
      165.                             idleFd= accept(fd,(struct sockaddr*)&in_addr,&in_len);  
      166.                             inet_ntop(AF_INET,&in_addr.sin_addr,hbuf,sizeof(hbuf));  
      167.                             nPort = ntohs(in_addr.sin_port);  
      168.                             printf("Max connection ,will close connection from %s, port %d errno = %d %s ",hbuf,nPort,errno, strerror(errno));  
      169.   
      170.                             close(idleFd);  
      171.   
      172.                             idleFd = open("/dev/null",O_RDONLY | O_CLOEXEC);  
      173.                             break;  
      174.                         case EBADF:  
      175.                         case EFAULT:  
      176.                         case EINVAL:  
      177.                         case ENFILE:  
      178.                         case ENOBUFS:  
      179.                         case ENOMEM:  
      180.                         case ENOTSOCK:  
      181.                         case EOPNOTSUPP:  
      182.                             {  
      183.                                 printf("accept unexpected error %d,%s ", errno,strerror(errno));  
      184.                             }  
      185.                             break;  
      186.                         default:  
      187.                             {  
      188.                                 printf("accept unkonw error %d,%s ", errno,strerror(errno));  
      189.                             } break;  
      190.                         }  
      191.   
      192.                         if((EAGAIN == errno)  
      193.                             ||(EWOULDBLOCK == errno))  
      194.                         {  
      195.                             //we have processed all incoming connections  
      196.                             break;  
      197.                         }  
      198.                         else  
      199.                         {  
      200.                             printf("accept error %d,%s ", errno,strerror(errno));  
      201.                             break;  
      202.                         }  
      203.                     }  
      204.                     inet_ntop(AF_INET,&in_addr.sin_addr,hbuf,sizeof(hbuf));  
      205.                     nPort = ntohs(in_addr.sin_port);  
      206.   
      207.                     printf("connection from %s, port %d  ",hbuf,nPort);  
      208.   
      209.                     /*val = getnameinfo(&in_addr,in_len,hbuf, sizeof(hbuf), 
      210.                     sbuf,sizeof(sbuf)); 
      211.                     if(0 == val) 
      212.                     { 
      213.                     printf("accepted connection on descriptor %d" 
      214.                     "(host = %s, Port= %s) ",infd,hbuf,sbuf); 
      215.                     } 
      216.                     */  
      217.                     val = make_socket_non_blocking(infd);  
      218.                     if(-1 == val)  
      219.                     {  
      220.                         printf("make socket_non_bolcking error ");  
      221.                     }  
      222.   
      223.                     printf("accepted, fd = %d ",infd);  
      224.   
      225.                     nSendBuf = 10;  
      226.                     setsockopt(infd,SOL_SOCKET,SO_SNDBUF,(const char*)&nSendBuf,sizeof(int));  
      227.                     struct StruEventBuf *pStruEventBuf = (struct StruEventBuf*)malloc(sizeof(struct StruEventBuf));  
      228.                     bzero(pStruEventBuf,sizeof(struct StruEventBuf));  
      229.                     pStruEventBuf->fd = infd;  
      230.                     //pStruEventBuf->pszData = (char*) malloc(8 * 1024 + 1);  
      231.                     //pStruEventBuf->nSize = 8 * 1024 + 1;  
      232.                     pStruEventBuf->nStart = 0;  
      233.                     event.data.ptr = pStruEventBuf;  
      234.                     event.events = EPOLLIN|EPOLLOUT|EPOLLET;  
      235.                     val = epoll_ctl(epollfd,EPOLL_CTL_ADD,infd,&event);  
      236.                     if(val == -1)  
      237.                     {  
      238.                         printf("epoll_ctrl error ");  
      239.                     }  
      240.                 }//while(1)  
      241.                 continue;  
      242.             }//Accept  
      243.             else if(events[i].events & EPOLLIN)  
      244.             {//Read data  
      245.                 //We have data on the fd waiting to be read.   
      246.                 //Read and display it. We must read whateveer data  
      247.                 //is available and completly, as we are running in   
      248.                 //edge-triggereed mode and won't get a notification   
      249.                 //again for the same data  
      250.   
      251.                 int done = 0;  
      252.                 struct StruEventBuf* pStruEventBuf = (struct StruEventBuf*) (events[i].data.ptr);  
      253.                 while(1)  
      254.                 {  
      255.                     ssize_t count;  
      256.                     char buf[512];  
      257.                     int fd = pStruEventBuf->fd;  
      258.                     count = read(fd, buf, sizeof(buf));  
      259.                     if(-1 == count)  
      260.                     {  
      261.                         //if(errno == EAGAIN, that means we have read all data.  
      262.                         //So goback to the main loop.  
      263.   
      264.                         if(errno != EAGAIN)  
      265.                         {  
      266.                             printf("read error ");  
      267.                             done = 1;  
      268.                         }  
      269.                         break;  
      270.                     }  
      271.                     else if(count == 0)  
      272.                     {  
      273.   
      274.                         printf("Remote has close the socket ");  
      275.                         done = 1;  
      276.                         break;  
      277.                     }  
      278.                     buf[count] = 0;  
      279.                     printf("receive %s ", buf);  
      280.                     //send(fd,buf,count,0);  
      281.                     pStruEventBuf->nSize = 8 * 1024 + 1;  
      282.                     pStruEventBuf->pszData = (char*)malloc(pStruEventBuf->nSize);  
      283.                     char *p = pStruEventBuf->pszData;  
      284.                     for(i = 0; i < 8; i++)  
      285.                     {  
      286.                         for(j = 0; j < 1023; j ++)  
      287.                         {  
      288.                             *p++ = '0' + i;  
      289.                         }  
      290.                         *p++ = ' ';  
      291.   
      292.                     }  
      293.                     //*p++ = '';  
      294.                     if( p >= pStruEventBuf->pszData + pStruEventBuf->nSize )  
      295.                     {  
      296.                         printf("ERRORRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR! ");  
      297.                     }  
      298.                     val = send(fd,pStruEventBuf->pszData, pStruEventBuf->nSize - pStruEventBuf->nStart,0);  
      299.                     if(val < 0)  
      300.                     {  
      301.                         if((-1 == val) && (errno != EAGAIN))   
      302.                         {  
      303.                             printf("write error ");  
      304.                             done = 1;  
      305.                         }  
      306.                     }  
      307.                     else  
      308.                     {  
      309.                         pStruEventBuf->nStart += val;      
      310.                     }  
      311.   
      312.                 }  
      313.                 if(done)  
      314.                 {  
      315.                     printf("closed connection on descriptor %d  ",   
      316.                         fd);  
      317.                     //Closing the descriptor will make epoll remove it from the  
      318.                     //set of descriptiors which are monitored  
      319.                     //struct StruEventBuf *p = (struct StruEventBuf*) events[i].data.ptr;     
      320.                     free(pStruEventBuf);  
      321.                     close(fd);  
      322.                 }  
      323.             }     
      324.             else if(events[i].events & EPOLLOUT)  
      325.             {  
      326.                 //struct StruEventBuf *pStruEventBuf= (struct StruEventBuf*) events[i].data.ptr;      
      327.                 while( pStruEventBuf->nStart < pStruEventBuf->nSize)  
      328.                 {  
      329.                     int nLen = pStruEventBuf->nSize - pStruEventBuf->nStart;  
      330.                     val = send(fd,pStruEventBuf->pszData + pStruEventBuf->nStart,nLen,0);  
      331.   
      332.                     if(val < nLen)  
      333.                     {  
      334.                         if(val < 0)  
      335.                         {  
      336.                             if((-1 == val) && (errno != EAGAIN))   
      337.                             {  
      338.                                 printf("write error ");  
      339.                                 ;//done = 1;  
      340.                             }  
      341.                         }  
      342.                         else  
      343.                         {  
      344.                             pStruEventBuf->nStart += val;      
      345.                             printf("Send data ");  
      346.                         }  
      347.                         break;  
      348.                     }  
      349.                     else  
      350.                     {  
      351.                         char *p = pStruEventBuf->pszData;  
      352.                         free(p);  
      353.                         pStruEventBuf->pszData = NULL;  
      354.                         pStruEventBuf->nSize = 0;  
      355.                         pStruEventBuf->nStart= 0;  
      356.                     }  
      357.                 }  
      358.             }  
      359.         }  
      360.     }  
      361.     close(epollfd);  
      362.     return 0;  
      363. }  
  • 相关阅读:
    阿里消息队列中间件 RocketMQ 源码分析 —— Message 拉取与消费(上)
    数据库中间件 ShardingJDBC 源码分析 —— SQL 解析(三)之查询SQL
    数据库分库分表中间件 ShardingJDBC 源码分析 —— SQL 解析(六)之删除SQL
    数据库分库分表中间件 ShardingJDBC 源码分析 —— SQL 解析(五)之更新SQL
    消息队列中间件 RocketMQ 源码分析 —— Message 存储
    源码圈 300 胖友的书单整理
    数据库分库分表中间件 ShardingJDBC 源码分析 —— SQL 路由(一)分库分表配置
    数据库分库分表中间件 ShardingJDBC 源码分析 —— SQL 解析(四)之插入SQL
    数据库分库分表中间件 ShardingJDBC 源码分析 —— SQL 路由(二)之分库分表路由
    C#中Math类的用法
  • 原文地址:https://www.cnblogs.com/jukan/p/5228311.html
Copyright © 2011-2022 走看看