zoukankan      html  css  js  c++  java
  • twemproxy接收流程探索——剖析twemproxy代码正编

    本文旨在帮助大家探索出twemproxy接收流程的代码逻辑框架,有些具体的实现需要我们在未来抽空去探索或者大家自行探索。在这篇文章开始前,大家要做好一个小小的心理准备,由于twemproxy代码是一份优秀的c语言代码,为此,在twemproxy的代码中会大篇幅使用c指针。但是不论是普通类型的指针还是函数指针,都可以让我们这些c语言使用者大饱眼福,生出一种“原来还可以这样写!!!”的快感。

    数据结构

    在探索twemproxy接收流程之前,我们必须对一些我们会用到的数据结构进行说明,以便我们更好地去探索,这边在讲解结构时,仅仅讲解与twemproxy接收流程相关的代码,其他代码暂时不进行剖析。

    mbuf

    在nc_mbuf.h里

    1 struct mbuf {
    2     uint32_t           magic;   /* mbuf magic (const) 这个值不是很理解是什么意思,一般是0xdeadbeef*/
    3     STAILQ_ENTRY(mbuf) next;    /* next mbuf 下一块mbuf,代码里所有的mbuf几乎都是以单向链表的形式存储的*/
    4     uint8_t            *pos;    /* read marker 表示这块mbuf已经读到那个字节了*/
    5     uint8_t            *last;   /* write marker 表示这块mbuf已经写到哪个字节*/
    6     uint8_t            *start;  /* start of buffer (const) 表示这块mbuf的起始位置*/
    7     uint8_t            *end;    /* end of buffer (const) 表示这块mbuf的结束位置*/
    8 };
    9 STAILQ_HEAD(mhdr, mbuf);    /*mhdr是mbuf单向队列的队列头部*/

    这里要对mbuf解释几句,这里涉及到nc_mbuf.c里的代码:

    1.mbuf的每一块可以通过配置规定其大小 ,可以说每一块mbuf的大小都是一个固定值,为此在生成时mbuf会去申请一个固定大小的内存,如果这个大小是mbuf_chunk_size,那么end = start + mbuf_chunk_size - sizeof(struct mbuf),为此start,end,以及magic都是定值。

    2.mbuf在申请后一般不会被释放,在使用完后会被放入static struct mhdr free_mbufq这个队列中,一旦要使用mbuf时首先从free_mbufq中取出未使用的mbuf,如果这个队列为空时,它才会去向系统申请新的mbuf。

    msg

    在nc_message.h里

     1 struct msg {
     2    /*
     3     ...
     4    */
     5     struct conn          *owner;          /* message owner - client | server 服务端或客户端连接*/
     6    /*
     7     ...
     8    */
     9     struct mhdr          mhdr;            /* message mbuf header mbuf单向队列的队列头部*/
    10     uint32_t             mlen;            /* message length mbuf字节长度*/
    11    /*
    12     ...
    13    */
    14     uint8_t              *pos;            /* parser position marker 现在解析到哪个个字节*/
    15     msg_parse_t          parser;          /* message parser 消息解析函数指针*/
    16     msg_parse_result_t   result;          /* message parsing result 消息解析结果*/
    17    /*
    18     ...
    19    */
    20 
    21 };

    msg是用来存储每一条发送过来的redis包的内容,一般一个msg对应一个redis包,所有收发网络数据都存储在mhdr中。

    conn

    在connection.h中

     1 struct conn {
     2    /*
     3     ...
     4    */
     5    int                 sd;              /* socket descriptor 套接字描述符*/    
     6    /*
     7     ...
     8    */
     9     conn_recv_t         recv;            /* recv (read) handler 接收msg函数指针*/
    10     conn_recv_next_t    recv_next;       /* recv next message handler 接收下一个msg的函数指针*/
    11     conn_recv_done_t    recv_done;       /* read done handler 接收完成的函数指针*/
    12    /*
    13     ...
    14    */
    15     size_t              recv_bytes;      /* received (read) bytes 接收数据的字节数*/
    16     size_t              send_bytes;      /* sent (written) bytes 发送数据的字节数*/
    17    /*
    18     ...
    19    */
    20     err_t               err;             /* connection errno 接受数据错误*/
    21     unsigned            recv_active:1;   /* recv active? 是否在接收数据*/
    22     unsigned            recv_ready:1;    /* recv ready? 是否准备接收数据*/
    23    /*
    24     ...
    25    */
    26     unsigned            eof:1;           /* eof? aka passive close? 数据读到尾部*/
    27     unsigned            done:1;          /* done? aka close? 完成数据接收*/
    28     unsigned            redis:1;         /* redis?           网络协议是不是redis*/
    29    /*
    30     ...
    31    */
    32 };

     conn是与服务端或客户端的连接,用于管理连接上的所有事件和网络数据

    接收流程

    首先看下主要流程,很简单的代码在nc_message.c中的msg_recv

     1 rstatus_t
     2 msg_recv(struct context *ctx, struct conn *conn)
     3 {
     4     rstatus_t status;
     5     struct msg *msg;
     6 
     7     ASSERT(conn->recv_active);
     8 
     9     conn->recv_ready = 1;//表示准备接收网络数据
    10     do {
    11         msg = conn->recv_next(ctx, conn, true);
    12         if (msg == NULL) {
    13             return NC_OK;
    14         }
    15 
    16         status = msg_recv_chain(ctx, conn, msg);//接收函数链,在这个流程中会改变conn->recv_ready的值,表示本次接收流程终止
    17         if (status != NC_OK) {
    18             return status;
    19         }
    20     } while (conn->recv_ready);//一旦不准备接收网络数据,就停止
    21 
    22     return NC_OK;
    23 }

     在这个代码中我们会发现一个conn->recv_next,目前我们只要知道它是准备接收下一个msg的函数,不需要知道他的具体实现,因为他在《twemproxy代码框架概述——剖析twemproxy代码前编》提到的客户层服务层扮演的角色是不同的,为此,实现也是不同的,这里主要指的是《twemproxy代码框架概述——剖析twemproxy代码前编》提到的模块1模块3,在这里我们居然看到了c语言的代码里出现了一个在面向对象语言中才有的特性——多态,在下面几篇文章的探索中会讲到,不小心做了广告,请无视上面的部分内容。

    接下来我们来看msg_recv函数中的msg_recv_chain,同样也是一个框架

     1 static rstatus_t
     2 msg_recv_chain(struct context *ctx, struct conn *conn, struct msg *msg)
     3 {
     4     rstatus_t status;
     5     struct msg *nmsg;
     6     struct mbuf *mbuf;
     7     size_t msize;
     8     ssize_t n;
     9     
    10     mbuf = STAILQ_LAST(&msg->mhdr, mbuf, next);//找到目前收到mbuf队列的最后一个mbuf
    11     //如果这个mbuf满了或者为空,则取得一个空的mbuf,加入到msg->mhdr队列中
    12     if (mbuf == NULL || mbuf_full(mbuf)) {
    13         mbuf = mbuf_get();
    14         if (mbuf == NULL) {
    15             return NC_ENOMEM;
    16         }
    17         mbuf_insert(&msg->mhdr, mbuf);
    18         msg->pos = mbuf->pos;//这时解析指针指向该mbuf的读取指针
    19     }
    20     ASSERT(mbuf->end - mbuf->last > 0);
    21     msize = mbuf_size(mbuf); //计算剩余的mbuf的值msize
    22 
    23     n = conn_recv(conn, mbuf->last, msize);//读取最大为msize的网络数据
    24     if (n < 0) {
    25         if (n == NC_EAGAIN) {
    26             return NC_OK;
    27         }
    28         return NC_ERROR;
    29     }
    30 
    31     ASSERT((mbuf->last + n) <= mbuf->end);
    32 
    33     mbuf->last += n; //将写指针偏移到正确的位置
    34     msg->mlen += (uint32_t)n;
    35     //解析网络数据内容,在其中将网络数据分成不同的msg,因为网络包可能黏合,可能会接收到不同的redis包
    36     for (;;) {
    37         status = msg_parse(ctx, conn, msg);//解析网络数据完成分包
    38         if (status != NC_OK) {
    39             return status;
    40         }
    41 
    42         /* get next message to parse */
    43         nmsg = conn->recv_next(ctx, conn, false);
    44         if (nmsg == NULL || nmsg == msg) {
    45             /* no more data to parse */
    46             break;
    47         }
    48 
    49         msg = nmsg;//使指针指向下一个包
    50     }
    51 
    52     return NC_OK;
    53 }

    在前面我们看到在代码中大量使用了断言ASSERT如ASSERT(mbuf->end - mbuf->last > 0),就表示该内存还没有被写满,查看这些断言会使我们对代码有更好的认识。同时,它也是一个很好的代码习惯

    接着就是在connection.c中的接受函数conn_recv,比较简单,一些对于收发网络数据遇到的情况的处理值得学习

     1 ssize_t
     2 conn_recv(struct conn *conn, void *buf, size_t size)
     3 {
     4     ssize_t n;
     5 
     6     ASSERT(buf != NULL);
     7     ASSERT(size > 0);
     8     ASSERT(conn->recv_ready);
     9 
    10     for (;;) {
    11         n = nc_read(conn->sd, buf, size);//相当于read函数
    12 
    13         log_debug(LOG_VERB, "recv on sd %d %zd of %zu", conn->sd, n, size);
    14         //如果收到的数据不为空,一旦收到数据小于size,表示没有更多的数据能被读取,为此将conn->recv_ready = 0
    15         if (n > 0) {
    16             if (n < (ssize_t) size) {
    17                 conn->recv_ready = 0;
    18             }
    19             conn->recv_bytes += (size_t)n;
    20             return n;
    21         }
    22          //如果收到的数据为空,表示没有更多的数据能被读取,为此将conn->recv_ready = 0
    23         if (n == 0) {
    24             conn->recv_ready = 0;
    25             conn->eof = 1;
    26             log_debug(LOG_INFO, "recv on sd %d eof rb %zu sb %zu", conn->sd,
    27                       conn->recv_bytes, conn->send_bytes);
    28             return n;
    29         }
    30         //如果收发数据出现不是EINTR的错误,表示收发数据断链或者遇到错误,为此也将conn->recv_ready = 0
    31         if (errno == EINTR) {
    32             log_debug(LOG_VERB, "recv on sd %d not ready - eintr", conn->sd);
    33             continue;
    34         } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
    35             conn->recv_ready = 0;
    36             log_debug(LOG_VERB, "recv on sd %d not ready - eagain", conn->sd);
    37             return NC_EAGAIN;
    38         } else {
    39             conn->recv_ready = 0;
    40             conn->err = errno;
    41             log_error("recv on sd %d failed: %s", conn->sd, strerror(errno));
    42             return NC_ERROR;
    43         }
    44     }
    45 
    46     NOT_REACHED();
    47 
    48     return NC_ERROR;
    49 }

    下面就是解析分包框架msg_parse

     1 static rstatus_t
     2 msg_parse(struct context *ctx, struct conn *conn, struct msg *msg)
     3 {
     4     rstatus_t status;
     5 
     6     if (msg_empty(msg)) {
     7         /* no data to parse */
     8         conn->recv_done(ctx, conn, msg, NULL);
     9         return NC_OK;
    10     }
    11 
    12     msg->parser(msg);//解析函数器,这个我们会在后续的文章中提到,即完整的redis协议解析流程
    13 
    14     switch (msg->result) {
    15     case MSG_PARSE_OK:
    16         status = msg_parsed(ctx, conn, msg);//解析一个包完成,进行分包
    17         break;
    18 
    19     case MSG_PARSE_REPAIR:
    20         status = msg_repair(ctx, conn, msg);//将受到的网络数据分到不同的buffer中
    21         break;
    22 
    23     case MSG_PARSE_AGAIN:
    24         status = NC_OK;
    25         break;
    26 
    27     default:
    28         status = NC_ERROR;
    29         conn->err = errno;
    30         break;
    31     }
    32 
    33     return conn->err != 0 ? NC_ERROR : status;
    34 }

    在这个代码中我们又会发现一个conn->recv_done,目前我们只要知道它是接收结束的函数,同样不需要知道他的具体实现,因为它也是在《twemproxy代码框架概述——剖析twemproxy代码前编》提到的客户层服务层扮演的角色是不同的,为此,实现也是不同的,这里主要指的是《twemproxy代码框架概述——剖析twemproxy代码前编》提到的模块1模块3

    下面就是msg_parsed,用于解析一个包完成后分包

     1 static rstatus_t
     2 msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg)
     3 {
     4     struct msg *nmsg;
     5     struct mbuf *mbuf, *nbuf;
     6 
     7     mbuf = STAILQ_LAST(&msg->mhdr, mbuf, next);
     8     if (msg->pos == mbuf->last) {//正好结束分包
     9         /* no more data to parse */
    10         conn->recv_done(ctx, conn, msg, NULL);
    11         return NC_OK;
    12     }
    13 
    14     /*
    15      * Input mbuf has un-parsed data. Split mbuf of the current message msg
    16      * into (mbuf, nbuf), where mbuf is the portion of the message that has
    17      * been parsed and nbuf is the portion of the message that is un-parsed.
    18      * Parse nbuf as a new message nmsg in the next iteration.
    19      */
    20     //下面的所有工作就是把mbuf收到的网络数据,将不属于这个包msg的而属于下个包nmsg的内容分割出去放到下一个包nmsg
    21     nbuf = mbuf_split(&msg->mhdr, msg->pos, NULL, NULL);
    22     if (nbuf == NULL) {
    23         return NC_ENOMEM;
    24     }
    25 
    26     nmsg = msg_get(msg->owner, msg->request, conn->redis);
    27     if (nmsg == NULL) {
    28         mbuf_put(nbuf);
    29         return NC_ENOMEM;
    30     }
    31     mbuf_insert(&nmsg->mhdr, nbuf);
    32     nmsg->pos = nbuf->pos;
    33 
    34     /* update length of current (msg) and new message (nmsg)*/
    35     nmsg->mlen = mbuf_length(nbuf);
    36     msg->mlen -= nmsg->mlen;
    37 
    38     conn->recv_done(ctx, conn, msg, nmsg);
    39 
    40     return NC_OK;
    41 }

    上面的流程可以用图1表示,我们可以看到图1中的mbuf收到了两个包的数据,分别是一个包msg(红色)的结尾和一个包nmsg(黄色)的开始,根据我们前文的说法一个msg对应一个包,为此必须把这个mbuf分割到到两个msg中。

    图1.分包示意图

     

    最后是分muf的msg_repair

     1 static rstatus_t
     2 msg_repair(struct context *ctx, struct conn *conn, struct msg *msg)
     3 {
     4     struct mbuf *nbuf;
     5     //取出一个新的nbuf去读取下轮的网络数据
     6     nbuf = mbuf_split(&msg->mhdr, msg->pos, NULL, NULL);
     7     if (nbuf == NULL) {
     8         return NC_ENOMEM;
     9     }
    10     mbuf_insert(&msg->mhdr, nbuf);
    11     msg->pos = nbuf->pos;
    12 
    13     return NC_OK;
    14 }

    在redis包中可能会存在多key的情况,一个msg中的mbuf具体是怎么存的,还需要完成对于redis协议的解读,我们才能明白为什么需要msg_repair,,在这里稍稍挖个坑。目前我们可以理解为它产生了一个新的nbuf去读下一轮的网络数据。

    这样我们完成了整个接收流程的探索,至于发送流程需要在下几个篇章中完成。

    总结

    本文完成了对于twemproxy整个接收流程的探索,首先介绍了相关的数据结构——mbuf、msg以及conn,在下面的日子里我们会更多地去了解它们,在未来的解析中它们是主角,接着分析了接收流程中的各个函数msg_repair、msg_parse、msg_parsed、msg_recv_chain、msg_recv以及conn_recv,最后较为介绍了它们在接收中的作用,当然稍稍挖了几个坑,表示以后再填。下面我们会着重探索twemproxy的redis协议解析和twemproxy发送流程,敬请期待!!

     

    另外,对于博文有问题的请大家在评论中留言与博主讨论,博主会及时回复的!!!!

  • 相关阅读:
    POJ 3037 Skiing(Dijkstra)
    HDU 1875 畅通工程再续(kruskal)
    HDU 1233 还是畅通工程(Kruskal)
    Java实现 LeetCode 754 到达终点数字(暴力+反向)
    Java实现 LeetCode 754 到达终点数字(暴力+反向)
    Java实现 LeetCode 754 到达终点数字(暴力+反向)
    Java实现 LeetCode 753 破解保险箱(递归)
    Java实现 LeetCode 753 破解保险箱(递归)
    Java实现 LeetCode 753 破解保险箱(递归)
    Java实现 LeetCode 752 打开转盘锁(暴力)
  • 原文地址:https://www.cnblogs.com/onlyac/p/6274498.html
Copyright © 2011-2022 走看看