zoukankan      html  css  js  c++  java
  • beanstalk源码剖析——网络框架

    在beanstalk中,网络处理模块没有使用第三方库,而是作者自己实现的一个模块,总计代码不到100行。

    是epoll的使用典范。

    1. epoll_event

    struct epoll_event结构如下:
    struct epoll_event {
      __uint32_t events;  /* Epoll events */
      epoll_data_t data;  /* User data variable */
    };
    typedef union epoll_data {
                    void ptr;
                    int fd;
                    __uint32_t u32;
                    __uint64_t u64;
          } epoll_data_t;

    beanstalk中将epoll_data存储一个socket抽象,结构如下:

    socket抽象

    struct Socket { int fd; //socket句柄 Handle f; //回调处理函数 void *x; //回调函数输入参数 int added; //是否加入epoll };

      

    2. epoll处理

    epoll操作包括包括创建epoll对象,将socket加入epoll对象,获取准备好的socket。

    epoll
    int
    sockinit(void) //epoll初始化,非常简单,没有特殊处理
    {
        epfd = epoll_create(1);
        if (epfd == -1) {
            twarn("epoll_create");
            return -1;
        }
        return 0;
    }
    
    
    int
    sockwant(Socket *s, int rw) //管理epoll的socket,增删改
    {
        int op;
        struct epoll_event ev = {};
        //epoll操作
        if (!s->added && !rw) {
            return 0;
        } else if (!s->added && rw) {
            s->added = 1;
            op = EPOLL_CTL_ADD;
        } else if (!rw) {
            op = EPOLL_CTL_DEL;
        } else {
            op = EPOLL_CTL_MOD;
        }
        //epoll_event的事件和数据,使用的是epoll默认的LT(level-trigger )模式
        switch (rw) {
        case 'r':
            ev.events = EPOLLIN;
            break;
        case 'w':
            ev.events = EPOLLOUT;
            break;
        }
        ev.events |= EPOLLRDHUP | EPOLLPRI;
        ev.data.ptr = s;
    
        return epoll_ctl(epfd, op, s->fd, &ev);
    }
    
    
    int
    socknext(Socket **s, int64 timeout) //获取一个触发epoll事件的socket
    {
        int r;
        struct epoll_event ev;
    
        r = epoll_wait(epfd, &ev, 1, (int)(timeout/1000000));
        if (r == -1 && errno != EINTR) {  //由于epoll_wait同步等待,有可能被信号中断,返回EINTR错误
            twarn("epoll_wait");
            exit(1);
        }
        //将事件转化为抽象的操作
        if (r) {
            *s = ev.data.ptr;
            if (ev.events & (EPOLLHUP|EPOLLRDHUP)) {
                return 'h';
            } else if (ev.events & EPOLLIN) {
                return 'r';
            } else if (ev.events & EPOLLOUT) {
                return 'w';
            }
        }
        return 0;

    3. epoll在程序中的回调

        在beanstalk中,socket抽象包括两类:

        a. 监听socket,回调函数就是接受处理srvaccept

        b.和客户端连接socket,回调函数就是协议处理prothandle

        整体来说流程如下:

        a. Server负责初始化epoll;并将自己的监听socket加入epoll;

        b. 当客户端有连接的时候,监听socket通过接受处理srvaccept和客户端建立socket连接,并将其加入epoll;

        c. 时钟处理通过定时任务prottick,把超时的客户端连接socket加入epoll。

        这里的加入epoll,第一个是直接调用socketwant加入的,其他两种都是将socket放入协议处理文件的静态变量dirty中,

        然后调用update_conns()来完成的。

    4. 网络处理流程总结

        a. Server创建epoll;

        b. 监听socket加入epoll,接受客户端连接h_accept、连接管理update_conns、关闭连接connclose调用socketwant处理socket请求;

        c. 在连接Conn.state为STATE_CLOSE时,或者事件fd和连接fd不同时,或者在update_conns中调用socketwant失败时,调用关闭连接;

        d. 接受客户端连接h_accept、协议处理prothandle、定时处理prottick三个函数,函数返回之前都要调用连接管理函数update_conns;

    5.. 网络连接接受处理

     1 void
     2 h_accept(const int fd, const short which, Server *s)
     3 {
     4     Conn *c;
     5     int cfd, flags, r;
     6     socklen_t addrlen;
     7     struct sockaddr_in6 addr;
     8 
     9     addrlen = sizeof addr;
    10     cfd = accept(fd, (struct sockaddr *)&addr, &addrlen);
    11     if (cfd == -1) {
    12         if (errno != EAGAIN && errno != EWOULDBLOCK) twarn("accept()");
    13         update_conns();
    14         return;
    15     }
    16     if (verbose) {
    17         printf("accept %d
    ", cfd);
    18     }
    19 
    20     flags = fcntl(cfd, F_GETFL, 0);
    21     if (flags < 0) {
    22         twarn("getting flags");
    23         close(cfd);
    24         if (verbose) {
    25             printf("close %d
    ", cfd);
    26         }
    27         update_conns();
    28         return;
    29     }
    30 
    31     r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
    32     if (r < 0) {
    33         twarn("setting O_NONBLOCK");
    34         close(cfd);
    35         if (verbose) {
    36             printf("close %d
    ", cfd);
    37         }
    38         update_conns();
    39         return;
    40     }
    41 
    42     c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube);
    43     if (!c) {
    44         twarnx("make_conn() failed");
    45         close(cfd);
    46         if (verbose) {
    47             printf("close %d
    ", cfd);
    48         }
    49         update_conns();
    50         return;
    51     }
    52     c->srv = s;
    53     c->sock.x = c;
    54     c->sock.f = (Handle)prothandle;
    55     c->sock.fd = cfd;
    56 
    57     r = sockwant(&c->sock, 'r');
    58     if (r == -1) {
    59         twarn("sockwant");
    60         close(cfd);
    61         if (verbose) {
    62             printf("close %d
    ", cfd);
    63         }
    64         update_conns();
    65         return;
    66     }
    67     update_conns();
    68 }
    接受处理
  • 相关阅读:
    提问回顾
    个人阅读作业+个人总结
    结对项目-数独程序扩展
    个人作业-Week 3
    个人作业-Week 2
    个人项目-数独程序
    个人作业-Week 1
    第0次博客作业
    2017[BUAA软工]第0次个人作业
    [2017BUAA软工]提问回顾
  • 原文地址:https://www.cnblogs.com/blockcipher/p/3279331.html
Copyright © 2011-2022 走看看