zoukankan      html  css  js  c++  java
  • 我理解的epoll(三)多线程模式下的ET

    ET模式下,需要循环从缓存中读取,直到返回EAGAIN没有数据可读后,一个被通知的事件才算结束。如果还读取过程中,同一个连接又有新的事件到来,触发其他线程处理同一个socket,就乱了。EPOLL_ONESHOT就是用来避免这种情况发生的。将事件设置为EPOLL_ONESHOT后,每次事件触发后就会将这个FD从rbtree中删除,就不会再监听该事件了。

    下面是我模仿别人写的一段代码:

      1 #include <stdio.h>
      2 #include <stdlib.h>
      3 #include <unistd.h>
      4 #include <errno.h>
      5 #include <string.h>
      6 #include <fcntl.h>
      7 #include <libgen.h>
      8 #include <assert.h>
      9 #include <stdbool.h>
     10 
     11 #include <sys/types.h>
     12 #include <sys/socket.h>
     13 #include <netinet/in.h>
     14 #include <arpa/inet.h>
     15 #include <sys/epoll.h>
     16 #include <pthread.h>
     17 
     18 #define MAX_EVENT_NUMBER 1024
     19 #define BUFFER_SIZE 1024
     20 
     21 #define ONE_SHOT true
     22 #define NOT_ONE_SHOT false
     23 
     24 struct data {
     25     int epollfd;
     26     int sockfd;
     27 };
     28 
     29 void setnonblockling(int fd)
     30 {
     31     int fd_option = fcntl(fd, F_GETFL);
     32     fd_option |= O_NONBLOCK;
     33     fcntl(fd, F_SETFL, fd_option);
     34 }
     35 
     36 void add_fd(int epollfd, int fd, bool flag)
     37 {
     38     struct epoll_event event;
     39     event.data.fd = fd;
     40     event.events = EPOLLIN | EPOLLET;    //监听可读事件,ET模式
     41     if (flag)
     42         event.events |= EPOLLONESHOT;
     43 
     44     epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
     45     setnonblockling(fd);
     46 }
     47 
     48 void reset_oneshot(int epollfd, int fd)
     49 {
     50     struct epoll_event event;
     51     event.data.fd = fd;
     52     event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
     53     epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
     54 }
     55 
     56 void *threadMain(void *instance)
     57 {
     58     struct data *data = (struct data *)instance;
     59     int epollfd = data->epollfd;
     60     int sockfd = data->sockfd;
     61     pthread_t pid = pthread_self();    //pid or tid, confused
     62 
     63     printf("start new thread %u to recv data on fd: %d
    ", pid, sockfd);
     64     char buf[BUFFER_SIZE];
     65     memset(buf, 0, sizeof(buf));
     66 
     67     for(;;) {
     68         int n = recv(sockfd, buf, BUFFER_SIZE-1, 0);
     69         if (n == 0) {
     70             close(sockfd);
     71             printf("foreiner closed the connection
    ");
     72             break;
     73         }
     74         else if (n < 0) {
     75             if (errno == EAGAIN) {
     76                 reset_oneshot(epollfd, sockfd);
     77                 printf("EAGAIN. Read later
    ");
     78                 break;
     79             }
     80         }
     81         else {
     82             buf[n] = '';
     83             printf("thread %u get content: %s
    ", pid, buf);
     84             printf("thread %u about to sleep
    ", pid);
     85             sleep(5);
     86             printf("thread %u back from sleep
    ", pid);
     87         }
     88     }
     89 
     90     printf("end thread %u receiving data on fd: %d
    ", pid, sockfd);
     91 }
     92 
     93 int main(int argc, char **argv)
     94 {
     95     if (argc <= 1) {
     96         printf("usage: %s port_number [ip_address]
    ", basename(argv[0]));
     97         return 1;
     98     }
     99 
    100     int ret;
    101     int listenfd;
    102     int port = atoi(argv[1]);
    103     struct sockaddr_in address;
    104     bzero(&address, sizeof(address));
    105     address.sin_family = AF_INET;
    106     if (argc == 3) {
    107         const char *ip = argv[2];
    108         inet_pton(AF_INET, ip, &address.sin_addr);
    109     }
    110     else {
    111         address.sin_addr.s_addr = INADDR_ANY;
    112     }
    113     address.sin_port = htons(port);
    114     printf("port = %d
    ", port);
    115     listenfd = socket(PF_INET, SOCK_STREAM, 0);
    116     assert(listenfd >= 0);
    117 
    118     ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
    119     assert(ret != -1);
    120 
    121     ret = listen(listenfd, 5);
    122     assert(ret != -1);
    123 
    124     struct epoll_event events[MAX_EVENT_NUMBER];
    125     int epollfd = epoll_create(5);
    126     assert(epollfd != -1);
    127 
    128     /* 需要反复监听listenfd */
    129     add_fd(epollfd, listenfd, NOT_ONE_SHOT);
    130 
    131     while(1) {
    132         int fd_number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);    //设置为永久阻塞
    133         if (fd_number < 0) {
    134             perror("epoll wait failed
    ");
    135             break;
    136         }
    137 
    138         for (int i = 0; i < fd_number; i++) {
    139             int sockfd = events[i].data.fd;
    140             /* listenfd上有新事件,则创建新的fd,并加入监听队列 */
    141             if (sockfd == listenfd) {
    142                 struct sockaddr_in client_addr;
    143                 socklen_t client_socklen = sizeof(client_addr);
    144                 int connfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_socklen);
    145                 add_fd(epollfd, connfd, ONE_SHOT);
    146             }
    147             else if (events[i].events & EPOLLIN) {
    148                 pthread_t tid;
    149                 struct data instance;
    150                 instance.epollfd = epollfd;
    151                 instance.sockfd = sockfd;
    152                 /* 一个连接对应一个线程,最好写成threadpool-BlockingQueue模式 */
    153                 pthread_create(&tid, NULL, threadMain, (void *)&instance);
    154             }
    155             else {
    156                 printf("something else happened
    ");
    157             }
    158         }
    159     }
    160 
    161     close(listenfd);
    162     return 0;
    163 }
  • 相关阅读:
    Python3之format
    xml文件整理
    某系统采集2018
    sublime+python3 中文环境
    python3文本读取与写入常用代码
    redis3.0集群使用发现的一些问题
    mysql字符集
    redis3.0集群搭建
    安装Maven、nexus
    一键安装mysql5.6
  • 原文地址:https://www.cnblogs.com/howo/p/8672690.html
Copyright © 2011-2022 走看看