zoukankan      html  css  js  c++  java
  • epoll 惊群处理

      1 #include <sys/types.h>
      2 #include <sys/socket.h>
      3 #include <sys/epoll.h>
      4 #include <netdb.h>
      5 #include <string.h>
      6 #include <stdio.h>
      7 #include <unistd.h>
      8 #include <fcntl.h>
      9 #include <stdlib.h>
     10 #include <errno.h>
     11 #include <sys/wait.h>
     12 #include <unistd.h>
     13 #include <semaphore.h>
     14 #include <sys/shm.h>
     15 #define IP   "127.0.0.1"
     16 #define PORT  8888
     17 #define PROCESS_NUM 4
     18 #define MAXEVENTS 64
     19 
     20 static int
     21 create_and_bind ()
     22 {
     23     int fd = socket (PF_INET, SOCK_STREAM, 0);
     24     struct sockaddr_in serveraddr;
     25     serveraddr.sin_family = AF_INET;
     26     inet_pton (AF_INET, IP, &serveraddr.sin_addr);
     27     serveraddr.sin_port = htons (PORT);
     28     bind (fd, (struct sockaddr *) &serveraddr, sizeof (serveraddr));
     29     return fd;
     30 }
     31 
     32 static int
     33 make_socket_non_blocking (int sfd)
     34 {
     35     int flags, s;
     36     flags = fcntl (sfd, F_GETFL, 0);
     37     if (flags == -1)
     38     {
     39         perror ("fcntl");
     40         return -1;
     41     }
     42     flags |= O_NONBLOCK;
     43     s = fcntl (sfd, F_SETFL, flags);
     44     if (s == -1)
     45     {
     46         perror ("fcntl");
     47         return -1;
     48     }
     49     return 0;
     50 }
     51 
     52 void
     53 worker (int sfd, int efd, struct epoll_event *events, int k, sem_t * sem)
     54 {
     55     /* The event loop */
     56     struct epoll_event event;
     57     // struct epoll_event *events;
     58     efd = epoll_create (MAXEVENTS);
     59     if (efd == -1)
     60     {
     61         perror ("epoll_create");
     62         abort ();
     63     }
     64     int epoll_lock = 0;
     65     while (1)
     66     {
     67         int n, i;
     68         int s;
     69         event.data.fd = sfd;
     70         event.events = EPOLLIN;
     71         if (0 == sem_trywait (sem))
     72         {
     73             //拿到锁的进程将listen 描述符加入epoll
     74             if (!epoll_lock)
     75             {
     76                 fprintf (stderr, "%d  >>>get lock
    ", k);
     77                 s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);
     78                 if (s == -1)
     79                 {
     80                     perror ("epoll_ctl");
     81                     abort ();
     82                 }
     83                 epoll_lock = 1;
     84             }
     85         }
     86         else
     87         {
     88             fprintf (stderr, "%d not lock
    ", k);
     89             //没有拿到锁的进程 将lisfd 从epoll 中去掉
     90             if (epoll_lock)
     91             {
     92                 fprintf (stderr, "worker  %d return from epoll_wait!
    ", k);
     93                 if (-1 == epoll_ctl (efd, EPOLL_CTL_DEL, sfd, &event))
     94                 {
     95                     if (errno == ENOENT)
     96                     {
     97                         fprintf (stderr, "EPOLL_CTL_DEL
    ");
     98                     }
     99                 }
    100                 epoll_lock = 0;
    101             }
    102         }
    103         //epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);
    104         // fprintf(stderr, "ok
    ");
    105         //不能设置为-1  为了能让拿不到锁的进程再次拿到锁
    106         n = epoll_wait (efd, events, MAXEVENTS, 300);
    107         for (i = 0; i < n; i++)
    108         {
    109             if (sfd == events[i].data.fd)
    110             {
    111                 /* We have a notification on the listening socket, which means one or more incoming connections. */
    112                 struct sockaddr in_addr;
    113                 socklen_t in_len;
    114                 int infd;
    115                 char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
    116                 in_len = sizeof in_addr;
    117                 while ((infd = accept (sfd, &in_addr, &in_len)) > 0)
    118                 {
    119                     fprintf(stderr, "get one
    ");
    120                     close (infd);
    121                 }
    122             }
    123         }
    124         if (epoll_lock)
    125         {
    126             //这里将锁释放
    127             sem_post (sem);
    128             epoll_lock = 0;
    129             epoll_ctl (efd, EPOLL_CTL_DEL, sfd, &event);
    130         }
    131     }
    132 }
    133 
    134 int
    135 main (int argc, char *argv[])
    136 {
    137     int shmid;
    138     sem_t *acctl;
    139     //建立共享内存
    140     shmid = shmget (IPC_PRIVATE, sizeof (sem_t), 0600);
    141     acctl = (sem_t *) shmat (shmid, 0, 0600);
    142     //进程间信号量初始化   要用到上面的共享内存
    143     sem_init (acctl, 1, 1);
    144     int sfd, s;
    145     int efd;
    146     // struct epoll_event event;
    147     // struct epoll_event *events;
    148     sfd = create_and_bind ();
    149     if (sfd == -1)
    150     {
    151         abort ();
    152     }
    153     s = make_socket_non_blocking (sfd);
    154     if (s == -1)
    155     {
    156         abort ();
    157     }
    158     s = listen (sfd, SOMAXCONN);
    159     if (s == -1)
    160     {
    161         perror ("listen");
    162         abort ();
    163     }
    164     efd = 0;
    165     int k;
    166     for (k = 0; k < PROCESS_NUM; k++)
    167     {
    168         printf ("Create worker %d
    ", k + 1);
    169         int pid = fork ();
    170         if (pid == 0)
    171         {
    172             struct epoll_event *events;
    173             events = calloc (MAXEVENTS, sizeof (struct epoll_event));
    174             worker (sfd, efd, events, k, acctl);
    175             break;
    176         }
    177     }
    178     int status;
    179     wait (&status);
    180     close (sfd);
    181     return EXIT_SUCCESS;
    182 }
    183 /*
    184  * 这里处理惊群 用到了进程的锁(信号量, 共享内存), 根据试验的结果多个进程时accept接收客户端连接的效率并没有提高太多
    185  * 但是处理其他可读可写(非监听描述符)时, 要比单个进程要快很多。
    186 */
    187 
    188  
    View Code

      在早期的kernel中, 多线程或多进程调用accept就会出现如下情况, 当前多个进程阻塞在accept中, 此时有客户端连接时, 内核就会通知阻塞在accept的所有进程, 这时就会造成惊群现象, 也就是所有accept都会返回 但是只有一个能拿到有效的文件描述符, 其他进程最后都会返回无效描述符。但在linux kernel 版本2.6 以上时, accept惊群的问题已经解决, 大致方案就是选一个阻塞在accept的进程返回。

      但是在IO复用中, select/poll/epoll 还是存在这种现象,其原因就是这些阻塞函数造成了以上同样的问题。这里就给出了类似Nginx的解决方案, 给监听描述符竞争枷锁, 保证只有一个进程处理监听描述符。 这里还可以控制锁的频率,如果一个进程接受连接的数量达到一定数量就不再申请锁。  这里要注意的是epoll_create的位置要在fork之后的子进程中, 这是因为若在父进程中create 那么fork之后子进程保留这个描述符副本,epoll_create其实是内核中建立的文件系统 保存在内核中, 那么其子进程都会共用这个文件系统, 那么多任务的epoll_ctl就会出现问题。子进程中建立各自的epoll fd 就可以避免这种情况。

  • 相关阅读:
    ntohs, ntohl, htons,htonl的比较和详解【转】
    Device Tree 详解【转】
    浅析Linux DeviceTree【转】
    【spring boot】spring boot 拦截器
    【jQuery】jQuery/js 判断字符串是否JSON字符串
    【java】java中的 &= 和 |= 和 ^= 的区别
    zabbix创建触发器、action,发送报警邮件
    html iframe禁用右键
    mysql数据库mysqldump方式备份
    JDK8新特性
  • 原文地址:https://www.cnblogs.com/MaAce/p/7755749.html
Copyright © 2011-2022 走看看