zoukankan      html  css  js  c++  java
  • 第15章 高并发服务器编程(3)_事件驱动模型

    4. 事件驱动模型:epoll

    4.1 epoll简介

    (1)epoll是Linux内核为处理大批量的socket而改进的poll,相对于select/poll来说,epoll更加灵活。它使用一个文件描述符来管理多个socket。

    (2)epoll之所以高效,是因为它将用户关心的socket事件存放到内核的一个事件表中。而不是像select/poll每次调用都需要重复传入fd_set。比如当一个事件发生(如读事件),epoll无须遍历整个被监听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入就绪队列的描述符集就行了。

    4.2 epoll的工作机制

     

    (1)调用epoll_create,内核会创建一个eventpoll结构体。该结构体中的rbr成员是一颗红黑树存储着所有添加到epoll中需要监控的事件。而rdlist成员是一个双向链表存放着将要通过epoll_wait返回给用户的满足条件的事件

    (2)调用epoll_ctl(),会向epoll对象中添加、删除或修改感兴趣的事件。

    (3)当调用epoll_wait检查是否有事件发生时,内核会从eventpoll对象中的rdlist双向链表中检查是否有元素。如果有,则会把事件复制到用户态,同时将事件数量返回给用户。

    4.3 epoll接口

    (1)创建和关闭epoll对象

    头文件

    #include <sys/epoll.h>

    函数

    int epoll_create(int size);

    int close(int epollfd); //关闭epoll对象

    参数

    size:用于告诉内核要监听的数目。注意,size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议。

    返回值

    成功时返回epoll对象的文件描述符。失败返回-1

    功能

    创建一个epoll对象

    (2)操作epoll对象

    头文件

    #include <sys/epoll.h>

    函数

    int epoll_ctl (int epfd, int op, int fd, struct epoll_event* event);

    参数

    epfd:由epoll_create()的返回值

    op:表示要将fd添加到epoll对象或从epoll对象删除/修改fd等。

    添加:EPOLL_CTL_ADD,删除:EPOLL_CTL_DEL,修改:EPOLL_CTL_MOD

    fd:需要监听的fd(文件描述符)

    struct epoll_event{  //告诉内核需要监听什么事件。
        __uint32_t events;  //如EPOLLIN,EPOLLOUT等。
        epoll_data_t data;  //用户自定义的数据
    };

    返回值

    成功时返回epoll对象的文件描述符。失败返回-1

    备注

    struct epoll_event中的events成员变量的取值:

    (1)EPOLLIN:表示对应的文件描述符可以读(包括对端socket正常关闭)

    (2)EPOLLOUT:表示对应的文件描述符可写

    (3)EPOLLPRI:表示对应的文件描述符有紧急的数据可读(带外数据的到来)

    (4)EPOLLERR:表示对应的文件描述符发生错误

    (5)EPOLLHUP:表示对应的文件描述符被挂断

    (6)EPOLLET:将epoll设置为边缘触发

    (7)EPOLLONESHOT:只监听一次,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到epoll队列里。

    (3)等待IO事件

    头文件

    #include <sys/epoll.h>

    函数

    int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);

    参数

    epfd:由epoll_create()的返回值

    events:用来接收从内核得到事件的集合。

    maxevents:最多返回多少个事件

    timeout:超时时间(毫秒):0会立即返回。

    返回值

    成功时返回需要处理的事件数目,0表示超时

    功能

    等待IO事件

    4.4 epoll的工作模式

    (1)LT模式默认模式,同时支持block和non-block socket。当epoll_wait检测到socket事件发生并将此事件通知应用程序,用户可以对这个就绪的socket进行IO操作,也可以,可以不立即处理该事件。如果你不作任何操作,在下次调用epoll_wait时,内核会继续通知应用程序

    (2)ET模式:边缘模式只支持non-block socket。在这种模式下,只有当socket从未就结果变为就绪时,内核才会通知应用程序以后就不再通知,直到应用程序对该socket做了某些操作,使得该socket不再处理就绪状态。值得注意的是,如果不对这个socket作IO操作,内核不会发送更多的通知(only once)

    【编程实验】echo服务器

     

    (1)主线程创建epoll对象,并注册listent socket的“读就绪”事件到epoll请求队列中。

    (2)主线程调用epoll_wait等待事件发生。如果此时有用户连接进来,则会调知主线程,并调用我们设置的handle_accept函数。

    (3)handle_accept函数中调用accept系统函数来接受请求,并将新的socket的“读就绪”事件插入epoll对象的请求队列中,等待客户端发送数据过来。

    (4)如果服务器收到客户端发送的数据,主线程会从epoll_wait中返回,并将数据分派给handle_event函数去做(这里可以开启一个工作线程来完成!)

    (5)handle_event接收并处理数据,然后准备好发送缓冲区,再注册“写就绪”事件。当系统检测到可以写数据后,就会调用sendData去发送数据。

    (6)数据发送完后,重新注册“读就绪”事件,主线程调用epoll_wait等待客户端发送新的数据过来。

    【注意】在Reactor模式中,没必要区分所谓的“读工作线程”和“写工作线程”。

    //myevent.h

    #ifndef __MYEVENT_H__
    #define __MYEVENT_H__
    
    typedef void (ev_callback)(int fd, int events, void* arg);
    typedef struct _tag_myevent
    {
        int fd;
        ev_callback*  callback;
        int events;  //事件类型(如EPOLLIN、EPOLLOUT等,也可按位或)
        void* arg;
        int status; //1:in epoll wait list, 0 not in;
        long last_active;   //last active time
    
        char buff[512];
        int len, s_offset; //标示发送和接收缓冲区当前的大小
    }myevent_s;
    
    void copyData(myevent_s* src, myevent_s* obj);
    void event_set(myevent_s* ev, int fd, int events, ev_callback* callback);
    void event_add(int epollFd, myevent_s* ev);
    void event_del(int epollFd, myevent_s* ev);
    
    #endif

    //myevent.c

    #include "myevent.h"
    #include <memory.h>
    #include <time.h>
    #include <sys/epoll.h>
    
    //拷贝数据
    void copyData(myevent_s* src, myevent_s* obj)
    {
        memcpy(obj->buff, src->buff, sizeof(obj->buff));
        obj->len = src->len;
           
        obj->s_offset = src->s_offset;
    }
    
    //自定义事件的封装(包含事件发生时的回调函数、事件活跃时间等信息)
    void event_set(myevent_s* ev, int fd, int events, ev_callback* callback)
    {
        ev->fd = fd;
        ev->callback = callback;
        ev->events = events;
        ev->arg = ev;
        ev->status = 0;
        ev->last_active = time(NULL);
    
        memset(ev->buff, 0, sizeof(ev->buff));
        ev->len = 0;
        ev->s_offset = 0;
    }
    
    //向epoll对象中添加或修改事件
    void event_add(int epollFd, myevent_s* ev)
    {
        struct epoll_event epv={0, {0}};
        int op;
        epv.data.ptr = ev; //用户数据,将自定义ev绑定到内核epoll_event上
        epv.events = ev->events;
       
        op = (ev->status == 1) ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
        ev->status = 1;
    
        if(epoll_ctl(epollFd, op, ev->fd, &epv) < 0){
            perror("event_add error");
        }
    }
    
    //将指定的事件从epoll对象中删除
    void event_del(int epollFd, myevent_s* ev)
    {
        struct epoll_event epv ={0, {0}};
        if(ev->status != 1) return; //1: in epoll, 0: not in
    
        epv.data.ptr = ev;
        ev->status = 0;
        
        if(epoll_ctl(epollFd, EPOLL_CTL_DEL, ev->fd, &epv) < 0){
            perror("event_del error");
        }
    }

    //epoll.c

    #include "myevent.h"
    #include <sys/socket.h>
    #include <sys/epoll.h>
    #include <netdb.h>
    #include <fcntl.h>
    #include <unistd.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <memory.h>
    #include <time.h>
    
    #define IPADDRESS   "127.0.0.1"
    #define PORT         8888
    #define MAX_EVENTS   100
    
    //全局变量
    int g_epollFd;
    myevent_s g_Events[MAX_EVENTS + 1]; //g_Events[MAX_EVENTS]用于保存listen fd
    
    /*函数声明*/
    //设置为非阻塞模式
    static int set_nonblock(int sockfd);
    //创建并绑定套接字
    static int socket_bind(const char* ip, int port);
    //IO多路复用epoll
    static void do_epoll(int listenfd);
    //事件处理函数
    static void handle_events(myevent_s* ev);
    //accept回调函数
    static void handle_accept(int fd, int events, void* arg);
    //接收数据
    static void recvData(int fd, int events, void* arg);
    //发送数据
    static void sendData(int fd, int events, void* arg);
    
    int main(int argc, char* argv[])
    {
        int listenfd;
        listenfd = socket_bind(IPADDRESS, PORT);
           
        //监听连接
        listen(listenfd, 5);
    
        do_epoll(listenfd);
    
        return 0;
    }
    
    //设置为非阻塞模式
    int set_nonblock(int sockfd)
    {
        int iret = -1;
        int opts;
        opts = fcntl(sockfd, F_GETFL);
        if(opts < 0){
            perror("fcntl error");
            return -1;
        }
    
        opts |= O_NONBLOCK;
        if((iret = fcntl(sockfd, F_SETFL, opts)) < 0){
            perror("fcntl error");
        }
    
        return iret;
    }
    
    //创建套接字并进行绑定
    int socket_bind(const char* ip, int port)
    {
        int listenfd = -1;
        struct sockaddr_in servaddr;
        listenfd = socket(AF_INET, SOCK_STREAM, 0);
        
        if(listen < 0){
            perror("socket error");
            exit(1);
        }
        
        memset(&servaddr, 0, sizeof(servaddr));
        servaddr.sin_family = AF_INET;
        inet_pton(AF_INET, ip, &servaddr.sin_addr);
        servaddr.sin_port = htons(port);
    
        if(bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0){
            perror("bind error");
            exit(1);
        }
        
       // set_nonblock(listenfd); //设置为非阻塞模式
    
        return listenfd;
    }
    
    //IO多路复用epoll
    void do_epoll(int listenfd)
    {
        //创建epoll对象
        g_epollFd = epoll_create(MAX_EVENTS);
        
        //将listen socket加入到epoll对象中
        event_set(&g_Events[MAX_EVENTS], listenfd, EPOLLIN, handle_accept);
        event_add(g_epollFd, &g_Events[MAX_EVENTS]);
    
        struct epoll_event events[MAX_EVENTS];
        printf("server is running:%s(%d)
    ", IPADDRESS, PORT);
    
        int checkPos = 0;
        while(1){
            //1.检查是否超时(只检查前面的100个连接)
            long now = time(NULL);
            int i = 0;
            for(; i<100; i++, checkPos++){ //不检测查listen fd
                if(checkPos == MAX_EVENTS)
                    checkPos = 0;
    
                if(g_Events[checkPos].status != 1) continue;
    
                long duration = now - g_Events[checkPos].last_active;
                if(duration >=60){ //设置不活动超过60秒为超时
                    close(g_Events[checkPos].fd);
                    event_del(g_epollFd, &g_Events[checkPos]);
                    printf("[fd=%d] timeout.
    ", g_Events[checkPos].fd);   
                }
            }
    
            //2.获取己经准备好的socket事件
            int fds = epoll_wait(g_epollFd, events, MAX_EVENTS, 1000); //返回值为发生的事件数量
            if(fds < 0){
                perror("epoll_wait error, exit
    ");
                exit(1);
            }
            
            //处理事件
            for(i=0; i<fds; i++){
                myevent_s* ev = (myevent_s*)events[i].data.ptr;
                if((events[i].events & EPOLLIN) || (events[i].events & EPOLLOUT))
                    handle_events(ev); //可以将任务分派到新线程中处理。本例为简单起见,直接在主线程处理
            }
        }
    
        //关闭epoll对象
        close(g_epollFd);
    }
    
    //事件处理函数
    void handle_events(myevent_s* ev)
    {
         ev->callback(ev->fd, ev->events, ev);  
    }
    
    //接受客户端连接
    void handle_accept(int fd, int events, void* arg)
    {
        struct sockaddr_in cliaddr;
        socklen_t len = sizeof(cliaddr);
        int nfd, i;
        
        //accept系统调用
        if((nfd = accept(fd, (struct sockaddr*)&cliaddr, &len)) < 0){
            perror("accept error");
            return;
        }
    
        //检查连接数是否己经达到上限
        do{ //使用do...while(0)是个技巧,可以代替goto
            //查找是否仍有可用连接数
            for(i=0; i<MAX_EVENTS; i++){
                if(g_Events[i].status == 0)
                break;
            }
    
            if(i == MAX_EVENTS){ //最后一个为listen fd
                printf("max connection limit[%d]
    ", MAX_EVENTS);
                break;  //跳出do...while
            }
    
            //找到可用连接,设置非阻塞模式
            //if(set_nonblock(nfd) < 0)
            //    break;  //跳出do...while
            
            //添加一个与客户通信的socket描述符事件
            event_set(&g_Events[i], nfd, EPOLLIN, recvData);
            event_add(g_epollFd, &g_Events[i]);
            
        }while(0);
    }
    
    //接收数据
    void recvData(int fd, int events, void* arg)
    {
        myevent_s* ev = (myevent_s*)arg;
        int len;
    
        //接收数据
        len = recv(fd, ev->buff + ev->len, sizeof(ev->buff) - ev->len, 0);
        event_del(g_epollFd, ev);
        
        if(len > 0){
            ev->len += len;
            ev->buff[len] = '';
            printf("Client[%d]:%s
    ", fd, ev->buff);
    
            //设置写事件
            myevent_s tmp;
            copyData(ev, &tmp);
            event_set(ev, fd, EPOLLOUT, sendData);
            copyData(&tmp, ev);
            event_add(g_epollFd, ev);
        }else if(len == 0){
             close(ev->fd);
             printf("[fd=%d], closed gracefully.
    ", fd);
        }else{
            close(ev->fd);
            printf("recv [fd=%d] error.
    ",fd);
        }
    }
    
    //发送数据
    void sendData(int fd, int events, void* arg){
        myevent_s* ev = (myevent_s*)arg;
        int len;
        //发送数据
        len = send(fd, ev->buff + ev->s_offset, ev->len - ev->s_offset, 0);
        if(len > 0){
            printf("send[fd=%d], [%d<->%d]%s
    ", fd, len, ev->len, ev->buff);
            ev->s_offset += len;
            if(ev->s_offset == ev->len){
                //设置读事件
                event_del(g_epollFd, ev);
                event_set(ev, fd, EPOLLIN, recvData);
                event_add(g_epollFd, ev);
            }
        }else{
            close(ev->fd);
            event_del(g_epollFd, ev);
            printf("send[fd=%d] error.
    ", fd);
        }
    }
    /*输出结果
     [root@localhost 15.AdvNet]# bin/epoll2  
     server is running:127.0.0.1(8888)
     Client[5]:abcdef
     send[fd=5], [512<->512]abcdef
     Client[5]:1234567
     send[fd=5], [512<->512]1234567
     ^C
     */

    //echo_tcp_client.c(与上一例相同)

    #include <netdb.h>
    #include <sys/socket.h>
    #include <unistd.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <memory.h>
    
    int main(int argc, char* argv[])
    {
        if(argc < 3){
            printf("usage: %s ip port
    ", argv[0]);
            exit(1);
        }
    
        /*步骤1: 创建socket(套接字)*/
        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
        if(sockfd < 0){
            perror("socket error");
        }
    
        //往servAddr中填入ip、port和地址族类型
        struct sockaddr_in servAddr;
        memset(&servAddr, 0, sizeof(servAddr));
        servAddr.sin_family = AF_INET;
        servAddr.sin_port = htons(atoi(argv[2]));
        //将ip地址转换成网络字节序后填入servAdd中
        inet_pton(AF_INET, argv[1], &servAddr.sin_addr.s_addr);
    
        /*步骤2: 客户端调用connect函数连接到服务器端*/
        if(connect(sockfd, (struct sockaddr*)&servAddr, sizeof(servAddr)) < 0){
            perror("connect error");
            exit(1);
        }
    
        /*步骤3: 调用自定义的协议处理函数和服务端进行双向通信*/
        char buff[512];
        size_t size;
        char* prompt = ">";
    
        while(1){
            memset(buff, 0, sizeof(buff));
            write(STDOUT_FILENO, prompt, 1);
            size = read(STDIN_FILENO, buff, sizeof(buff));
            if(size < 0) continue;
    
            buff[size-1] = '';
            //将键盘输入的内容发送到服务端
            if(write(sockfd, buff, sizeof(buff)) < 0){
                perror("write error");
                continue;
            }else{
                memset(buff, 0, sizeof(buff));
                //读取来自服务端的消息
                if(read(sockfd, buff, sizeof(buff)) < 0){
                    perror("read error");
                    continue;
                }else{
                    printf("%s
    ", buff);
                }
            }
        }
    
        /*关闭套接字*/
        close(sockfd);
    }
  • 相关阅读:
    R基础-适合于纯小白
    endnote将参考文献导入word中
    百度学术导入endnote出现choose an import filter解决
    数据梳理、降维--主成分分析、超易懂实例及R语言实现
    R语言输出高质量图片
    方向导数,偏导数,梯度
    开通博客
    存储引擎
    消息队列的两种模型
    消息队列的应用场景
  • 原文地址:https://www.cnblogs.com/5iedu/p/6698727.html
Copyright © 2011-2022 走看看