zoukankan      html  css  js  c++  java
  • twemproxy发送流程探索——剖析twemproxy代码正编

    本文想要完成对twemproxy发送流程——msg_send的探索,对于twemproxy发送流程的数据结构已经在《twemproxy接收流程探索——剖析twemproxy代码正编》介绍过了,msg_send和msg_recv的流程大致类似。请在阅读代码时,查看注释,英文注释是作者对它的代码的注解,中文注释是我自己的感悟。

    函数msg_send

     1 rstatus_t
     2 msg_send(struct context *ctx, struct conn *conn)
     3 {
     4     rstatus_t status;
     5     struct msg *msg;
     6     /*表示活跃的发送状态*/
     7     ASSERT(conn->send_active);
     8     /*表示准备发送*/
     9     conn->send_ready = 1;
    10     do {
    11         /*获取下一次发送的msg开头*/
    12         msg = conn->send_next(ctx, conn);
    13         if (msg == NULL) {
    14             /* nothing to send */
    15             return NC_OK;
    16         }
    17         /*发送框架,在此框架内conn->send_ready会改变*/
    18         status = msg_send_chain(ctx, conn, msg);
    19         if (status != NC_OK) {
    20             return status;
    21         }
    22 
    23     } while (conn->send_ready);
    24 
    25     return NC_OK;
    26 }

    发送框架msg_send_chain

    由于在发送时,其底层采用writev的高效发送方式,难免出现数据发送到一边,系统的发送队列已满的情况,面对这种尴尬的情况,你应该如何处理?twemproxy的作者给出了自己的方式。

      1 static rstatus_t
      2 msg_send_chain(struct context *ctx, struct conn *conn, struct msg *msg)
      3 {
      4     struct msg_tqh send_msgq;            /* send msg q */
      5     struct msg *nmsg;                    /* next msg */
      6     struct mbuf *mbuf, *nbuf;            /* current and next mbuf */
      7     size_t mlen;                         /* current mbuf data length */
      8     struct iovec *ciov, iov[NC_IOV_MAX]; /* current iovec */
      9     struct array sendv;                  /* send iovec */
     10     size_t nsend, nsent;                 /* bytes to send; bytes sent */
     11     size_t limit;                        /* bytes to send limit */
     12     ssize_t n;                           /* bytes sent by sendv */
     13 
     14     TAILQ_INIT(&send_msgq);
     15 
     16     array_set(&sendv, iov, sizeof(iov[0]), NC_IOV_MAX);
     17 
     18     /* preprocess - build iovec */
     19 
     20     nsend = 0;
     21     /*
     22      * readv() and writev() returns EINVAL if the sum of the iov_len values
     23      * overflows an ssize_t value Or, the vector count iovcnt is less than
     24      * zero or greater than the permitted maximum.
     25      */
     26     limit = SSIZE_MAX;
     27 
     28     /*
     29      *send_msgq是一个临时的发送队列,将当前能进行发送的msg,即处理完的msg
     30      *进行存储。发送队列仅仅自后面处理时能让调用者以msg的buf为单位处理。
     31      *sendv是一个字符串数组,由于发送底层采用的函数是writev,为此sendv将发
     32      *送的数据都存储在一起,sendv才是真正发送的数据内存。
     33      */
     34     for (;;) {
     35         ASSERT(conn->smsg == msg);
     36 
     37         TAILQ_INSERT_TAIL(&send_msgq, msg, m_tqe);
     38 
     39         for (mbuf = STAILQ_FIRST(&msg->mhdr);
     40              mbuf != NULL && array_n(&sendv) < NC_IOV_MAX && nsend < limit;
     41              mbuf = nbuf) {
     42             nbuf = STAILQ_NEXT(mbuf, next);
     43             /*
     44              *发送的信息是否为空,即发送开始的字节位置是否和结束位置一致。
     45              *在处理redis多key命令的mget,mdel,mset以及memcached多key命令
     46              *get,gets时,由于分片的原因,分片后的msg也会在客户端发送队列
     47              *中。在分片处理完要发送后,这些分片的msg应该不能被发送,为此,
     48              *对于分片的msg的pos进行了将msg的发送量置为空,这边的sendv在添
     49              *加发送内容时,忽视了这些分片。
     50              */
     51             if (mbuf_empty(mbuf)) {
     52                 continue;
     53             }
     54 
     55             mlen = mbuf_length(mbuf);
     56             if ((nsend + mlen) > limit) {
     57                 mlen = limit - nsend;
     58             }
     59 
     60             ciov = array_push(&sendv);
     61             ciov->iov_base = mbuf->pos;
     62             ciov->iov_len = mlen;
     63 
     64             nsend += mlen;
     65         }
     66 
     67         /*超过发送限制*/
     68         if (array_n(&sendv) >= NC_IOV_MAX || nsend >= limit) {
     69             break;
     70         }
     71 
     72         /*不存在发送内容*/
     73         msg = conn->send_next(ctx, conn);
     74         if (msg == NULL) {
     75             break;
     76         }
     77     }
     78 
     79     /*
     80      * (nsend == 0) is possible in redis multi-del
     81      * see PR: https://github.com/twitter/twemproxy/pull/225
     82      */
     83 
     84     /*发送函数conn_sendv*/
     85     conn->smsg = NULL;
     86     if (!TAILQ_EMPTY(&send_msgq) && nsend != 0) {
     87         n = conn_sendv(conn, &sendv, nsend);
     88     } else {
     89         n = 0;
     90     }
     91 
     92     nsent = n > 0 ? (size_t)n : 0;
     93 
     94     /* postprocess - process sent messages in send_msgq */
     95     /*
     96      *由于其发送函数底层采用writev,在发送过程中可能存在发送中断或者发送
     97      *数据没有全部发出的情况,为此需要通过实际发送的字节数nsent来确认系统
     98      *实际上发送到了哪一个msg的哪一个mbuf的哪一个字节pos,以便下一次从pos
     99      *开始发送实际的内容,以免重复发送相同的内容,导致不可见的错误。
    100      */
    101     for (msg = TAILQ_FIRST(&send_msgq); msg != NULL; msg = nmsg) {
    102         nmsg = TAILQ_NEXT(msg, m_tqe);
    103 
    104         TAILQ_REMOVE(&send_msgq, msg, m_tqe);
    105 
    106         /*发送内容为空,进行发送完的处理*/
    107         if (nsent == 0) {
    108             if (msg->mlen == 0) {
    109                 conn->send_done(ctx, conn, msg);
    110             }
    111             continue;
    112         }
    113 
    114         /* adjust mbufs of the sent message */
    115         for (mbuf = STAILQ_FIRST(&msg->mhdr); mbuf != NULL; mbuf = nbuf) {
    116             nbuf = STAILQ_NEXT(mbuf, next);
    117 
    118             if (mbuf_empty(mbuf)) {
    119                 continue;
    120             }
    121 
    122             mlen = mbuf_length(mbuf);
    123             if (nsent < mlen) {
    124                 /* mbuf was sent partially; process remaining bytes later */
    125                 /*此处确认了实际上发送到了哪一个msg的哪一个mbuf的哪一个字节pos*/
    126                 mbuf->pos += nsent;
    127                 ASSERT(mbuf->pos < mbuf->last);
    128                 nsent = 0;
    129                 break;
    130             }
    131 
    132             /* mbuf was sent completely; mark it empty */
    133             mbuf->pos = mbuf->last;
    134             nsent -= mlen;
    135         }
    136 
    137         /* message has been sent completely, finalize it */
    138         if (mbuf == NULL) {
    139             conn->send_done(ctx, conn, msg);
    140         }
    141     }
    142 
    143     ASSERT(TAILQ_EMPTY(&send_msgq));
    144 
    145     if (n >= 0) {
    146         return NC_OK;
    147     }
    148 
    149     return (n == NC_EAGAIN) ? NC_OK : NC_ERROR;
    150 }

     发送函数conn_sendv

     writev作为一个高效的网络io,它的正确用法一直是个问题,这里给出了twemproxy的作者给出了自己正确的注解。对于其的异常处理值得借鉴

     1 ssize_t
     2 conn_sendv(struct conn *conn, struct array *sendv, size_t nsend)
     3 {
     4     ssize_t n;
     5 
     6     ASSERT(array_n(sendv) > 0);
     7     ASSERT(nsend != 0);
     8     ASSERT(conn->send_ready);
     9 
    10     for (;;) {
    11         /*这里的nc_writev就是writev*/
    12         n = nc_writev(conn->sd, sendv->elem, sendv->nelem);
    13 
    14         log_debug(LOG_VERB, "sendv on sd %d %zd of %zu in %"PRIu32" buffers",
    15                   conn->sd, n, nsend, sendv->nelem);
    16 
    17         if (n > 0) {
    18             /*
    19              *已发送数据长度比待发送数据长度小,说明系统发送队列已满或者不
    20              *可写,此刻需要停止发送数据。
    21              */
    22             if (n < (ssize_t) nsend) {
    23                 conn->send_ready = 0;
    24             }
    25             conn->send_bytes += (size_t)n;
    26             return n;
    27         }
    28 
    29         if (n == 0) {
    30             log_warn("sendv on sd %d returned zero", conn->sd);
    31             conn->send_ready = 0;
    32             return 0;
    33         }
    34         /*
    35          *EINTR表示由于信号中断,没发送成功任何数据,此刻需要停止发送数据。
    36          *EAGAIN以及EWOULDBLOCK表示系统发送队列已满或者不可写,为此没发送
    37          *成功任何数据,此刻需要停止发送数据,等待下次发送。
    38          *除了上述两种错误,其他的错误为连接出现了问题需要停止发送数据并
    39          *进行断链操作,conn->err非零时在程序流程中会触发断链。
    40          */
    41         if (errno == EINTR) {
    42             log_debug(LOG_VERB, "sendv on sd %d not ready - eintr", conn->sd);
    43             continue;
    44         } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
    45             conn->send_ready = 0;
    46             log_debug(LOG_VERB, "sendv on sd %d not ready - eagain", conn->sd);
    47             return NC_EAGAIN;
    48         } else {
    49             conn->send_ready = 0;
    50             conn->err = errno;
    51             log_error("sendv on sd %d failed: %s", conn->sd, strerror(errno));
    52             return NC_ERROR;
    53         }
    54     }
    55 
    56     NOT_REACHED();
    57 
    58     return NC_ERROR;
    59 }

    小结

    在这短短的数百行代码中,我们获知了msg_send的简单过程,最最重要的是我们知道了writev函数的发送内容处理和异常处理,特别是它如教科书般的异常处理方式使我收益良多。

  • 相关阅读:
    MySQL动态添删改列字段
    关于javascript在子页面中函数无法调试问题的解决
    <T> T[] toArray(T[] a);
    MERGE INTO
    eclipse不能新建server
    关于tomcat7下websocket不能使用
    myeclipse启动tomcat报错cannot find a free socket for debugger
    checkbox提交多组数据到action
    Struts2 Action中的方法命名不要以get开头
    浅谈C#中的接口和抽象类
  • 原文地址:https://www.cnblogs.com/onlyac/p/6361524.html
Copyright © 2011-2022 走看看