zoukankan      html  css  js  c++  java
  • epoll中et+多线程模式中很重要的EPOLL_ONESHOT实验

    因为et模式需要循环读取,但是在读取过程中,如果有新的事件到达,很可能触发了其他线程来处理这个socket,那就乱了。

    EPOLL_ONESHOT就是用来避免这种情况。注意在一个线程处理完一个socket的数据,也就是触发EAGAIN errno时候,就应该重置EPOLL_ONESHOT的flag,这时候,新到的事件,就可以重新进入触发流程了。

    注:EPOLL_ONESHOT的原理其实是,每次触发事件之后,就将事件注册从fd上清除了,也就不会再被追踪到;下次需要用epoll_ctl的EPOLL_CTL_MOD来手动加上才行。

    服务器代码如下:

    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <errno.h>
    #include <string.h>
    #include <fcntl.h>
    #include <assert.h>
    
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <sys/epoll.h>
    #include <pthread.h>
    
    #define MAX_EVENT_NUMBER 1024
    #define BUFFER_SIZE 1024
    struct fds {
            int epollfd;
            int sockfd;
    };
    
    int setnonblocking(int fd) {
            int old_option = fcntl(fd, F_GETFL);
            int new_option = old_option | O_NONBLOCK;
            fcntl(fd, F_SETFL, new_option);
            return old_option;
    }
    
    
    void addfd(int epollfd, int fd, bool oneshot) {
            epoll_event event;
            event.data.fd = fd;
            event.events = EPOLLIN | EPOLLET;
            if (oneshot) {
                    event.events |= EPOLLONESHOT;
            }
            epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
            setnonblocking(fd);
    }
    
    void reset_oneshot(int epollfd, int fd) {
            epoll_event event;
            event.data.fd = fd;
            event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
            epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
    }
    
    void* worker(void *arg) {
            int sockfd = ((fds*)arg)->sockfd;
            int epollfd = ((fds*)arg)->epollfd;
            pthread_t pid = pthread_self();
    
            printf("start new thread %u to recv data on fd: %d
    ", pid, sockfd);
            char buf[BUFFER_SIZE];
            memset(buf, '', BUFFER_SIZE);
    
            while(1) {
                    int ret = recv(sockfd, buf, BUFFER_SIZE-1, 0);
                    if (ret == 0) {
                            close(sockfd);
                            printf("foreiner closed the connection
    ");
                            break;
                    }
                    else if (ret < 0) {
                            if (errno == EAGAIN) {
                                    reset_oneshot(epollfd, sockfd);
                                    printf("EAGAIN read later
    ");
                                    break;
                            }
                    }
                    else {
                            buf[ret] = '';
                            printf("thread %u get content: %s
    ", pid, buf);
                            printf("thread %u about to sleep
    ", pid);
                            sleep(5);
                            printf("thread %u back from sleep
    ", pid);
                    }
            }
            //printf("end thread %u receiving data on fd: %d
    ", pid, sockfd);
    
    }
    
    int main(int argc, char *argv[]) {
            if (argc <= 1) {
                    printf("usage: %s port_number [ip_address]
    ", basename(argv[0]));
                    return 1;
            }
            int port = atoi(argv[1]);
    
            int ret = 0;
            sockaddr_in address;
            bzero(&address, sizeof(address));
            address.sin_family = AF_INET;
            if (argc >= 3) {
                    const char *ip =argv[2];
                    inet_pton(AF_INET, ip, &address.sin_addr);
            }
            else {
                    address.sin_addr.s_addr = INADDR_ANY;
            }
            address.sin_port = htons(port);
            int listenfd = socket(PF_INET, SOCK_STREAM, 0);
            assert(listenfd >= 0);
    
            ret = bind(listenfd, (sockaddr*)&address, sizeof(address));
            assert(ret != -1);
    
            ret = listen(listenfd, 5);
            assert(ret != -1);
    
            epoll_event events[MAX_EVENT_NUMBER];
            int epollfd = epoll_create(5);
            assert(epollfd != -1);
    
            addfd(epollfd, listenfd, false);
    
            while(1) {
                    int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
                    if (ret < 0) {
                            printf("epoll failure
    ");
                            break;
                    }
    
                    for (int i=0; i<ret; i++) {
                            int sockfd = events[i].data.fd;
                            if (sockfd == listenfd) {
                                    sockaddr_in client_address;
                                    socklen_t client_addrlength = sizeof(client_address);
                                    int connfd = accept(listenfd, (sockaddr*)&client_address,
                                                            &client_addrlength);
    
                                    addfd(epollfd, connfd, true);
                                    printf("new connection is added to epollfd
    ");
                            }
                            else if (events[i].events & EPOLLIN) {
                                    pthread_t thread;
                                    fds fds_for_new_worker;
                                    fds_for_new_worker.epollfd = epollfd;
                                    fds_for_new_worker.sockfd = sockfd;
                                    // new thread
                                    pthread_create(&thread, NULL, worker,
                                                    (void*)&fds_for_new_worker);
                            }
                            else {
                                    printf("something else happened
    ");
                            }
                    }
            }
    
            close(listenfd);
            return 0;
    
    }

    以下是使用telnet客户端发送的文本,匀速敲入代码:

    $telnet 127.0.0.1 12346
    Trying 127.0.0.1...
    Connected to 127.0.0.1.
    Escape character is '^]'.
    hi1
    hi2
    hi3
    hi4
    hi5
    hi6
    hi7
    hi8
    Connection closed by foreign host.

    以下是服务器的运行和输出:

    $./epoll_oneshot 12346
    new connection is added to epollfd
    start new thread 1734051584 to recv data on fd: 5
    thread 1734051584 get content: hi1
    
    thread 1734051584 about to sleep
    thread 1734051584 back from sleep
    thread 1734051584 get content: hi2
    hi3
    
    thread 1734051584 about to sleep
    thread 1734051584 back from sleep
    thread 1734051584 get content: hi4
    hi5
    hi6
    
    thread 1734051584 about to sleep
    thread 1734051584 back from sleep
    thread 1734051584 get content: hi7
    
    thread 1734051584 about to sleep
    thread 1734051584 back from sleep
    EAGAIN read later
    start new thread 1723561728 to recv data on fd: 5
    thread 1723561728 get content: hi8
    
    thread 1723561728 about to sleep
    thread 1723561728 back from sleep
    EAGAIN read later
    ^C

    最后用Ctrl+C来结束服务器。

    可以看出,在hi7文本和hi8文本之间,服务器收到了EAGAIN,表示读取告一段落。而之后的线程id也换成了新线程id。在hi7之前,因为每次服务器sleep结束之后,都还有没有读完的数据,所以线程id始终没有变,始终是同一个线程处理数据。

    另外,要注意的是,EPOLL_ONESHOT既可以在et下也可以在lt下设置。效果是一样的,都是同一个fd上面的相同事件只会触发一次。上面是et的例子,对于lt,如果设置了EPOLL_ONESHOT,也是需要把数据读完,然后重置event。而不能像原始lt编程方式那样依赖于事件通知来读取数据了。

  • 相关阅读:
    sql交集、差集、并集
    控件自适应文本宽度
    pivot列行转换,自动计算分组,解决groupby问题
    echart-scatter使用散点图,带坐标和项目名称
    下载文件根据浏览器判断文件名,解决兼容性问题
    sql中类型转换涉及的性能差异之convert和cast
    js使用正则表达式对文本框进行限制输入
    Aspose.Words.dll根据模板生成word详解
    Windows服务开发
    SqlBulkCopy学习(导入海量数据的类)
  • 原文地址:https://www.cnblogs.com/charlesblc/p/5538363.html
Copyright © 2011-2022 走看看