zoukankan      html  css  js  c++  java
  • twemproxy接收流程探索——剖析twemproxy代码正编

    本文旨在帮助大家探索出twemproxy接收流程的代码逻辑框架,有些具体的实现需要我们在未来抽空去探索或者大家自行探索。在这篇文章开始前,大家要做好一个小小的心理准备,由于twemproxy代码是一份优秀的c语言代码,为此,在twemproxy的代码中会大篇幅使用c指针。但是不论是普通类型的指针还是函数指针,都可以让我们这些c语言使用者大饱眼福,生出一种“原来还可以这样写!!!”的快感。

    数据结构

    在探索twemproxy接收流程之前,我们必须对一些我们会用到的数据结构进行说明,以便我们更好地去探索,这边在讲解结构时,仅仅讲解与twemproxy接收流程相关的代码,其他代码暂时不进行剖析。

    mbuf

    在nc_mbuf.h里

    1 struct mbuf {
    2     uint32_t           magic;   /* mbuf magic (const) 这个值不是很理解是什么意思,一般是0xdeadbeef*/
    3     STAILQ_ENTRY(mbuf) next;    /* next mbuf 下一块mbuf,代码里所有的mbuf几乎都是以单向链表的形式存储的*/
    4     uint8_t            *pos;    /* read marker 表示这块mbuf已经读到那个字节了*/
    5     uint8_t            *last;   /* write marker 表示这块mbuf已经写到哪个字节*/
    6     uint8_t            *start;  /* start of buffer (const) 表示这块mbuf的起始位置*/
    7     uint8_t            *end;    /* end of buffer (const) 表示这块mbuf的结束位置*/
    8 };
    9 STAILQ_HEAD(mhdr, mbuf);    /*mhdr是mbuf单向队列的队列头部*/

    这里要对mbuf解释几句,这里涉及到nc_mbuf.c里的代码:

    1.mbuf的每一块可以通过配置规定其大小 ,可以说每一块mbuf的大小都是一个固定值,为此在生成时mbuf会去申请一个固定大小的内存,如果这个大小是mbuf_chunk_size,那么end = start + mbuf_chunk_size - sizeof(struct mbuf),为此start,end,以及magic都是定值。

    2.mbuf在申请后一般不会被释放,在使用完后会被放入static struct mhdr free_mbufq这个队列中,一旦要使用mbuf时首先从free_mbufq中取出未使用的mbuf,如果这个队列为空时,它才会去向系统申请新的mbuf。

    msg

    在nc_message.h里

     1 struct msg {
     2    /*
     3     ...
     4    */
     5     struct conn          *owner;          /* message owner - client | server 服务端或客户端连接*/
     6    /*
     7     ...
     8    */
     9     struct mhdr          mhdr;            /* message mbuf header mbuf单向队列的队列头部*/
    10     uint32_t             mlen;            /* message length mbuf字节长度*/
    11    /*
    12     ...
    13    */
    14     uint8_t              *pos;            /* parser position marker 现在解析到哪个个字节*/
    15     msg_parse_t          parser;          /* message parser 消息解析函数指针*/
    16     msg_parse_result_t   result;          /* message parsing result 消息解析结果*/
    17    /*
    18     ...
    19    */
    20 
    21 };

    msg是用来存储每一条发送过来的redis包的内容,一般一个msg对应一个redis包,所有收发网络数据都存储在mhdr中。

    conn

    在connection.h中

     1 struct conn {
     2    /*
     3     ...
     4    */
     5    int                 sd;              /* socket descriptor 套接字描述符*/    
     6    /*
     7     ...
     8    */
     9     conn_recv_t         recv;            /* recv (read) handler 接收msg函数指针*/
    10     conn_recv_next_t    recv_next;       /* recv next message handler 接收下一个msg的函数指针*/
    11     conn_recv_done_t    recv_done;       /* read done handler 接收完成的函数指针*/
    12    /*
    13     ...
    14    */
    15     size_t              recv_bytes;      /* received (read) bytes 接收数据的字节数*/
    16     size_t              send_bytes;      /* sent (written) bytes 发送数据的字节数*/
    17    /*
    18     ...
    19    */
    20     err_t               err;             /* connection errno 接受数据错误*/
    21     unsigned            recv_active:1;   /* recv active? 是否在接收数据*/
    22     unsigned            recv_ready:1;    /* recv ready? 是否准备接收数据*/
    23    /*
    24     ...
    25    */
    26     unsigned            eof:1;           /* eof? aka passive close? 数据读到尾部*/
    27     unsigned            done:1;          /* done? aka close? 完成数据接收*/
    28     unsigned            redis:1;         /* redis?           网络协议是不是redis*/
    29    /*
    30     ...
    31    */
    32 };

     conn是与服务端或客户端的连接,用于管理连接上的所有事件和网络数据

    接收流程

    首先看下主要流程,很简单的代码在nc_message.c中的msg_recv

     1 rstatus_t
     2 msg_recv(struct context *ctx, struct conn *conn)
     3 {
     4     rstatus_t status;
     5     struct msg *msg;
     6 
     7     ASSERT(conn->recv_active);
     8 
     9     conn->recv_ready = 1;//表示准备接收网络数据
    10     do {
    11         msg = conn->recv_next(ctx, conn, true);
    12         if (msg == NULL) {
    13             return NC_OK;
    14         }
    15 
    16         status = msg_recv_chain(ctx, conn, msg);//接收函数链,在这个流程中会改变conn->recv_ready的值,表示本次接收流程终止
    17         if (status != NC_OK) {
    18             return status;
    19         }
    20     } while (conn->recv_ready);//一旦不准备接收网络数据,就停止
    21 
    22     return NC_OK;
    23 }

     在这个代码中我们会发现一个conn->recv_next,目前我们只要知道它是准备接收下一个msg的函数,不需要知道他的具体实现,因为他在《twemproxy代码框架概述——剖析twemproxy代码前编》提到的客户层服务层扮演的角色是不同的,为此,实现也是不同的,这里主要指的是《twemproxy代码框架概述——剖析twemproxy代码前编》提到的模块1模块3,在这里我们居然看到了c语言的代码里出现了一个在面向对象语言中才有的特性——多态,在下面几篇文章的探索中会讲到,不小心做了广告,请无视上面的部分内容。

    接下来我们来看msg_recv函数中的msg_recv_chain,同样也是一个框架

     1 static rstatus_t
     2 msg_recv_chain(struct context *ctx, struct conn *conn, struct msg *msg)
     3 {
     4     rstatus_t status;
     5     struct msg *nmsg;
     6     struct mbuf *mbuf;
     7     size_t msize;
     8     ssize_t n;
     9     
    10     mbuf = STAILQ_LAST(&msg->mhdr, mbuf, next);//找到目前收到mbuf队列的最后一个mbuf
    11     //如果这个mbuf满了或者为空,则取得一个空的mbuf,加入到msg->mhdr队列中
    12     if (mbuf == NULL || mbuf_full(mbuf)) {
    13         mbuf = mbuf_get();
    14         if (mbuf == NULL) {
    15             return NC_ENOMEM;
    16         }
    17         mbuf_insert(&msg->mhdr, mbuf);
    18         msg->pos = mbuf->pos;//这时解析指针指向该mbuf的读取指针
    19     }
    20     ASSERT(mbuf->end - mbuf->last > 0);
    21     msize = mbuf_size(mbuf); //计算剩余的mbuf的值msize
    22 
    23     n = conn_recv(conn, mbuf->last, msize);//读取最大为msize的网络数据
    24     if (n < 0) {
    25         if (n == NC_EAGAIN) {
    26             return NC_OK;
    27         }
    28         return NC_ERROR;
    29     }
    30 
    31     ASSERT((mbuf->last + n) <= mbuf->end);
    32 
    33     mbuf->last += n; //将写指针偏移到正确的位置
    34     msg->mlen += (uint32_t)n;
    35     //解析网络数据内容,在其中将网络数据分成不同的msg,因为网络包可能黏合,可能会接收到不同的redis包
    36     for (;;) {
    37         status = msg_parse(ctx, conn, msg);//解析网络数据完成分包
    38         if (status != NC_OK) {
    39             return status;
    40         }
    41 
    42         /* get next message to parse */
    43         nmsg = conn->recv_next(ctx, conn, false);
    44         if (nmsg == NULL || nmsg == msg) {
    45             /* no more data to parse */
    46             break;
    47         }
    48 
    49         msg = nmsg;//使指针指向下一个包
    50     }
    51 
    52     return NC_OK;
    53 }

    在前面我们看到在代码中大量使用了断言ASSERT如ASSERT(mbuf->end - mbuf->last > 0),就表示该内存还没有被写满,查看这些断言会使我们对代码有更好的认识。同时,它也是一个很好的代码习惯

    接着就是在connection.c中的接受函数conn_recv,比较简单,一些对于收发网络数据遇到的情况的处理值得学习

     1 ssize_t
     2 conn_recv(struct conn *conn, void *buf, size_t size)
     3 {
     4     ssize_t n;
     5 
     6     ASSERT(buf != NULL);
     7     ASSERT(size > 0);
     8     ASSERT(conn->recv_ready);
     9 
    10     for (;;) {
    11         n = nc_read(conn->sd, buf, size);//相当于read函数
    12 
    13         log_debug(LOG_VERB, "recv on sd %d %zd of %zu", conn->sd, n, size);
    14         //如果收到的数据不为空,一旦收到数据小于size,表示没有更多的数据能被读取,为此将conn->recv_ready = 0
    15         if (n > 0) {
    16             if (n < (ssize_t) size) {
    17                 conn->recv_ready = 0;
    18             }
    19             conn->recv_bytes += (size_t)n;
    20             return n;
    21         }
    22          //如果收到的数据为空,表示没有更多的数据能被读取,为此将conn->recv_ready = 0
    23         if (n == 0) {
    24             conn->recv_ready = 0;
    25             conn->eof = 1;
    26             log_debug(LOG_INFO, "recv on sd %d eof rb %zu sb %zu", conn->sd,
    27                       conn->recv_bytes, conn->send_bytes);
    28             return n;
    29         }
    30         //如果收发数据出现不是EINTR的错误,表示收发数据断链或者遇到错误,为此也将conn->recv_ready = 0
    31         if (errno == EINTR) {
    32             log_debug(LOG_VERB, "recv on sd %d not ready - eintr", conn->sd);
    33             continue;
    34         } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
    35             conn->recv_ready = 0;
    36             log_debug(LOG_VERB, "recv on sd %d not ready - eagain", conn->sd);
    37             return NC_EAGAIN;
    38         } else {
    39             conn->recv_ready = 0;
    40             conn->err = errno;
    41             log_error("recv on sd %d failed: %s", conn->sd, strerror(errno));
    42             return NC_ERROR;
    43         }
    44     }
    45 
    46     NOT_REACHED();
    47 
    48     return NC_ERROR;
    49 }

    下面就是解析分包框架msg_parse

     1 static rstatus_t
     2 msg_parse(struct context *ctx, struct conn *conn, struct msg *msg)
     3 {
     4     rstatus_t status;
     5 
     6     if (msg_empty(msg)) {
     7         /* no data to parse */
     8         conn->recv_done(ctx, conn, msg, NULL);
     9         return NC_OK;
    10     }
    11 
    12     msg->parser(msg);//解析函数器,这个我们会在后续的文章中提到,即完整的redis协议解析流程
    13 
    14     switch (msg->result) {
    15     case MSG_PARSE_OK:
    16         status = msg_parsed(ctx, conn, msg);//解析一个包完成,进行分包
    17         break;
    18 
    19     case MSG_PARSE_REPAIR:
    20         status = msg_repair(ctx, conn, msg);//将受到的网络数据分到不同的buffer中
    21         break;
    22 
    23     case MSG_PARSE_AGAIN:
    24         status = NC_OK;
    25         break;
    26 
    27     default:
    28         status = NC_ERROR;
    29         conn->err = errno;
    30         break;
    31     }
    32 
    33     return conn->err != 0 ? NC_ERROR : status;
    34 }

    在这个代码中我们又会发现一个conn->recv_done,目前我们只要知道它是接收结束的函数,同样不需要知道他的具体实现,因为它也是在《twemproxy代码框架概述——剖析twemproxy代码前编》提到的客户层服务层扮演的角色是不同的,为此,实现也是不同的,这里主要指的是《twemproxy代码框架概述——剖析twemproxy代码前编》提到的模块1模块3

    下面就是msg_parsed,用于解析一个包完成后分包

     1 static rstatus_t
     2 msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg)
     3 {
     4     struct msg *nmsg;
     5     struct mbuf *mbuf, *nbuf;
     6 
     7     mbuf = STAILQ_LAST(&msg->mhdr, mbuf, next);
     8     if (msg->pos == mbuf->last) {//正好结束分包
     9         /* no more data to parse */
    10         conn->recv_done(ctx, conn, msg, NULL);
    11         return NC_OK;
    12     }
    13 
    14     /*
    15      * Input mbuf has un-parsed data. Split mbuf of the current message msg
    16      * into (mbuf, nbuf), where mbuf is the portion of the message that has
    17      * been parsed and nbuf is the portion of the message that is un-parsed.
    18      * Parse nbuf as a new message nmsg in the next iteration.
    19      */
    20     //下面的所有工作就是把mbuf收到的网络数据,将不属于这个包msg的而属于下个包nmsg的内容分割出去放到下一个包nmsg
    21     nbuf = mbuf_split(&msg->mhdr, msg->pos, NULL, NULL);
    22     if (nbuf == NULL) {
    23         return NC_ENOMEM;
    24     }
    25 
    26     nmsg = msg_get(msg->owner, msg->request, conn->redis);
    27     if (nmsg == NULL) {
    28         mbuf_put(nbuf);
    29         return NC_ENOMEM;
    30     }
    31     mbuf_insert(&nmsg->mhdr, nbuf);
    32     nmsg->pos = nbuf->pos;
    33 
    34     /* update length of current (msg) and new message (nmsg)*/
    35     nmsg->mlen = mbuf_length(nbuf);
    36     msg->mlen -= nmsg->mlen;
    37 
    38     conn->recv_done(ctx, conn, msg, nmsg);
    39 
    40     return NC_OK;
    41 }

    上面的流程可以用图1表示,我们可以看到图1中的mbuf收到了两个包的数据,分别是一个包msg(红色)的结尾和一个包nmsg(黄色)的开始,根据我们前文的说法一个msg对应一个包,为此必须把这个mbuf分割到到两个msg中。

    图1.分包示意图

     

    最后是分muf的msg_repair

     1 static rstatus_t
     2 msg_repair(struct context *ctx, struct conn *conn, struct msg *msg)
     3 {
     4     struct mbuf *nbuf;
     5     //取出一个新的nbuf去读取下轮的网络数据
     6     nbuf = mbuf_split(&msg->mhdr, msg->pos, NULL, NULL);
     7     if (nbuf == NULL) {
     8         return NC_ENOMEM;
     9     }
    10     mbuf_insert(&msg->mhdr, nbuf);
    11     msg->pos = nbuf->pos;
    12 
    13     return NC_OK;
    14 }

    在redis包中可能会存在多key的情况,一个msg中的mbuf具体是怎么存的,还需要完成对于redis协议的解读,我们才能明白为什么需要msg_repair,,在这里稍稍挖个坑。目前我们可以理解为它产生了一个新的nbuf去读下一轮的网络数据。

    这样我们完成了整个接收流程的探索,至于发送流程需要在下几个篇章中完成。

    总结

    本文完成了对于twemproxy整个接收流程的探索,首先介绍了相关的数据结构——mbuf、msg以及conn,在下面的日子里我们会更多地去了解它们,在未来的解析中它们是主角,接着分析了接收流程中的各个函数msg_repair、msg_parse、msg_parsed、msg_recv_chain、msg_recv以及conn_recv,最后较为介绍了它们在接收中的作用,当然稍稍挖了几个坑,表示以后再填。下面我们会着重探索twemproxy的redis协议解析和twemproxy发送流程,敬请期待!!

     

    另外,对于博文有问题的请大家在评论中留言与博主讨论,博主会及时回复的!!!!

  • 相关阅读:
    CSS盒子模型
    getContextPath、getServletPath、getRequestURI、request.getRealPath的区别
    MYSQL中的CASE WHEN END AS
    单点登录的精华总结
    git&github
    June 21st 2017 Week 25th Wednesday
    June 20th 2017 Week 25th Tuesday
    June 19th 2017 Week 25th Monday
    June 18th 2017 Week 25th Sunday
    June 17th 2017 Week 24th Saturday
  • 原文地址:https://www.cnblogs.com/onlyac/p/6274498.html
Copyright © 2011-2022 走看看