zoukankan      html  css  js  c++  java
  • twemproxy源码分析

      twemproxy是twitter开源的redis/memcached 代理,数据分片提供取模,一致性哈希等手段,维护和后端server的长连接,自动踢除server,恢复server,提供专门的状态监控端口供外部工具获取状态监控信息。代码写的比较漂亮,学习了一些Nginx的东西,比如每个请求的处理分为多个阶段,IO模型方面,采用单线程收发包,基于epoll事件驱动模型。文档中提到的Zero Copy技术,通过将消息指针在3个队列之间流转实现比较巧妙。本文主要分析twemproxy的核心部分,即一个请求的从接收到最后发送响应给客户端的流程。

    一、大体流程

       twemproxy后端支持多个server pool,为每个server pool分配一个监听端口用于接收客户端的连接。客户端和proxy建立连接记作client_conn,发起请求,proxy读取数据,放入req_msg中,设置msg的owner为client_conn,proxy根据策略从server pool中选取一个server并且建立连接记作server_conn,然后转发req_forward,将req_msg指针放入client_conn的output队列中,同时放入server_conn的input队列,然后触发server_conn的写事件,server_conn的写回调函数会从input队列中取出req_msg发送给对应的后端server,发送完成后将req_msg放入server_conn的output队列,当req_msg的响应rsp_msg回来后,调用rsp_filter(用于判断消息是否为空,是否消息可以不用回复等)和rsp_forward,将req_msg从server_conn的output队列中取出,建立req_msg和rsp_msg的对应关系(通过msg的peer字段),通过req_msg的owner找到client_conn,然后启动client_conn的写事件,client_conn的写回调函数从client_conn的output队列中取出req_msg,然后通过peer字段拿到对应的rsp_msg,将其发出去。至此,一次请求从被proxy接收到最后响应给client结束。

       可以看出,整个流程,req_msg内容只有一份,req_msg指针在三个队列中的顺序是:

            1. req_msg => client_conn.outputq

            2. req_msg => server_conn.inputq

            3. server_conn.inputq => req_msg

            4. req_msg => server_conn.outputq

            5. server_conn.outputq => req_msg

            6. client_conn.outputq => req_msg

       总体来说,proxy既需要接收客户端的连接,也需要维护和后端server的长连接,根据从客户端收到的req根据特定策略选择后端一台server进行转发,同一个客户端的连接上的不同的请求可能会转发到后端不同的server。

       

      

    二、基础数据结构:

      后端的每个redis server对应一个server结构:

    struct server {
          uint32_t           idx;           /* server index */
          struct server_pool *owner;        /* owner pool */      // 每个server属于一个server pool                                                                                                                                  
          struct string      pname;         /* name:port:weight (ref in conf_server) */
          struct string      name;          /* name (ref in conf_server) */
          uint16_t           port;          /* port */      
          uint32_t           weight;        /* weight */    
          int                family;        /* socket family */
          socklen_t          addrlen;       /* socket length */
          struct sockaddr    *addr;         /* socket address (ref in conf_server) */                                                                                                                            
      
          uint32_t           ns_conn_q;     /* # server connection */  // 下面队列中conn的个数
          struct conn_tqh    s_conn_q;      /* server connection q */ // proxy和这个redis server之间维护的连接队列  
      
          int64_t            next_retry;    /* next retry time in usec */ // proxy有踢除后端server机制,当proxy给某台server转发请求出错次数达到server_failure_limit次,则next_retry微妙内不会请求该server。可配。
          uint32_t           failure_count; /* # consecutive failures */
      };  

      twemproxy中使用一堆宏来定义队列等数据结构,如上面struct conn_tqh,nc_connection.h中有定义TAILQ_HEAD(conn_tqh, conn),TAILQ_HEAD宏定义如下:

     /*   
       * Tail queue declarations. 
       */  
      #define TAILQ_HEAD(name, type)                                                                                                                                                                            
      struct name {                                                           
          struct type *tqh_first; /* first element */                         
          struct type **tqh_last; /* addr of last next element */             
          TRACEBUF                                                            
      }

    可以看出结构体是通过宏来定义的,非常恶心,看代码ctags 找不到。conn_tqh是一个队列头部结构体,嵌入到一个server中,链表中每个元素是一个conn结构体,内嵌入一个TAILQ_ENTRY,用于将conn串入server的队列中。宏定义如下:

     #define TAILQ_ENTRY(type)                                               
      struct {                                                                
          struct type *tqe_next;  /* next element */                          
          struct type **tqe_prev; /* address of previous next element */      
          TRACEBUF                                                            
      } 

    各个field的关系如下图所示:

     

    看一个很重要的结构conn,可以表示client和proxy之间的connection,也可以表示proxy和redis server之间的connection:

    struct conn {
          TAILQ_ENTRY(conn)  conn_tqe;      /* link in server_pool / server / free q */ // 队列entry字段,用于和其他的conn串起来
          void               *owner;        /* connection owner - server_pool / server */ // 每个连接属于一个server
      
          int                sd;            /* socket descriptor */
          int                family;        /* socket address family */
          socklen_t          addrlen;       /* socket length */
          struct sockaddr    *addr;         /* socket address (ref in server or server_pool) */
      
          struct msg_tqh     imsg_q;        /* incoming request Q */  //从名字看出,和conn_tqh类似,这里也是一个消息队列,从连接读入的数据会组织成msg,push到这个消息队列,msg由mbuf队列组成用来存储具体的数据。
          struct msg_tqh     omsg_q;        /* outstanding request Q */ // 需要往这个连接中写的msg push到这个消息队列
          struct msg         *rmsg;         /* current message being rcvd */ //从连接上读到的数据往rmsg指向的msg里面填
          struct msg         *smsg;         /* current message being sent */ //当前正在写的msg指针
      
          conn_recv_t        recv;          /* recv (read) handler */ //读事件触发时回调
          conn_recv_next_t   recv_next;     /* recv next message handler */ //实际读数据之前,调这个函数来得到当前正在使用的msg
          conn_recv_done_t   recv_done;     /* read done handler */ // 每次接收到一个完整的消息后,回调
          conn_send_t        send;          /* send (write) handler */ //写事件触发时回调
          conn_send_next_t   send_next;     /* write next message handler */ 实际写数据之前,定位当前要写的msg
          conn_send_done_t   send_done;     /* write done handler */ //发送完一个msg则回调一次
          conn_close_t       close;         /* close handler */ //
          conn_active_t      active;        /* active? handler */
      
          conn_ref_t         ref;           /* connection reference handler */ // 得到一个连接后,将连接加入相应的队列
          conn_unref_t       unref;         /* connection unreference handler */
      
          conn_msgq_t        enqueue_inq;   /* connection inq msg enqueue handler */  //这四个队列用于存放msg的指针,和Zero Copy密切相关,后续详述
          conn_msgq_t        dequeue_inq;   /* connection inq msg dequeue handler */
          conn_msgq_t        enqueue_outq;  /* connection outq msg enqueue handler */
          conn_msgq_t        dequeue_outq;  /* connection outq msg dequeue handler */
      
          size_t             recv_bytes;    /* received (read) bytes */ //该连接上读了多少数据
          size_t             send_bytes;    /* sent (written) bytes */  
      
          uint32_t           events;        /* connection io events */
          err_t              err;           /* connection errno */
          unsigned           recv_active:1; /* recv active? */                                                                                                                                                   
          unsigned           recv_ready:1;  /* recv ready? */
          unsigned           send_active:1; /* send active? */
          unsigned           send_ready:1;  /* send ready? */
      
          unsigned           client:1;      /* client? or server? */ //连接属于proxy和client之间时,client为1,连接属于proxy和后端server之间时,client为0
          unsigned           proxy:1;       /* proxy? */ // listen fd封装在conn中时,proxy置1,响应的recv回调函数accept连接
          unsigned           connecting:1;  /* connecting? */
          unsigned           connected:1;   /* connected? */
          unsigned           eof:1;         /* eof? aka passive close? */
          unsigned           done:1;        /* done? aka close? */
          unsigned           redis:1;       /* redis? */ //后端server是redis还是memcached
      };

     二、

    不管是proxy accept了Client的连接从而分配一个conn结构,还是proxy主动和后端server建立连接从而分配一个conn结构,都调用conn_get()函数,如下:

    struct conn *conn_get(void *owner, bool client, bool redis)                                                                                                                                                             
    {
          struct conn *conn;
      
          conn = _conn_get();
          if (conn == NULL) {
              return NULL;
          }
      
          /* connection either handles redis or memcache messages */
          conn->redis = redis ? 1 : 0;
      
          conn->client = client ? 1 : 0;
      
          if (conn->client) {
              /*
               * client receives a request, possibly parsing it, and sends a
               * response downstream.
               */
              conn->recv = msg_recv;  // 从conn读数据
              conn->recv_next = req_recv_next; // 在真正从conn读数据之前,需要分配一个req_msg,用于承载读进来的数据
              conn->recv_done = req_recv_done; //每次读完一个完整的消req_msg被调用
      
              conn->send = msg_send; // 将从server收到的响应rsp_msg发给客户端
              conn->send_next = rsp_send_next; // 每次发送rsp_msg之前需要首先确定从哪个开始发
              conn->send_done = rsp_send_done; // 每次发送完成一个rsp_msg给客户端,调一次
      
              conn->close = client_close; //用于proxy断开和client的半连接
              conn->active = client_active;
      
              conn->ref = client_ref; //获取conn后将conn丢进客户端连接队列
              conn->unref = client_unref;
      
              conn->enqueue_inq = NULL;
              conn->dequeue_inq = NULL;
              conn->enqueue_outq = req_client_enqueue_omsgq; // proxy每次接收到一个client发过来的req_msg,将req_msg入conn的output 队列
              conn->dequeue_outq = req_client_dequeue_omsgq; // 给客户端发送完rsp_msg后将其对应的req_msg从conn的output队列中删除
          } else {
              /*
               * server receives a response, possibly parsing it, and sends a
               * request upstream.
               */
              conn->recv = msg_recv; 
              conn->recv_next = rsp_recv_next; //从后端server接收数据之前需要先得到一个rsp_msg,用于承载读到的数据
              conn->recv_done = rsp_recv_done; // 每次读完一个完整的rsp_msg,则回调
              conn->send = msg_send; // 将req_msg往后端server发
              conn->send_next = req_send_next; // 确定发哪个req_msg
              conn->send_done = req_send_done; // 每转发完一个即回调
      
              conn->close = server_close;
              conn->active = server_active;
      
              conn->ref = server_ref;
              conn->unref = server_unref;
      
              conn->enqueue_inq = req_server_enqueue_imsgq; // proxy将需要转发的req_msg放入对应后端server连接的input队列
              conn->dequeue_inq = req_server_dequeue_imsgq; //proxy从input队列中取出req_msg发送给后端server完成后,需要将req_msg从这个后端连接的input队列中删除
              conn->enqueue_outq = req_server_enqueue_omsgq; // 继上一步,需要将req_msg放入到后端连接的output队列
              conn->dequeue_outq = req_server_dequeue_omsgq; // 收到后端server的rsp_msg后,将rsp_msg对应的req_msg从连接的output队列删除
          }
      
          conn->ref(conn, owner);
      
          log_debug(LOG_VVERB, "get conn %p client %d", conn, conn->client);
      
          return conn;
      }

     如果该连接是proxy和后端server建立的,则client为false,否则为true,如果后端server是redis,则redis为true,如果为memcached,则为false,连接上的多个读写回调函数根据传入的标记不同而不同。

    三、 请求具体处理流程

     从前面可以看出,当proxy和Client之间有数据可读时,会调用msg_recv(),如下:

    rstatus_t
      msg_recv(struct context *ctx, struct conn *conn)                                                                                                                                                           
      {    
          rstatus_t status;       
          struct msg *msg;        
      
          ASSERT(conn->recv_active);
      
          conn->recv_ready = 1;   
          do {
              msg = conn->recv_next(ctx, conn, true); //req_recv_next()
              if (msg == NULL) {  
                  return NC_OK;
              }
      
              status = msg_recv_chain(ctx, conn, msg);
              if (status != NC_OK) {
                  return status;
              }
          } while (conn->recv_ready);    
      
          return NC_OK;
      }

    代码很短,其实就是反复的做两件事:req_recv_next和msg_recv_chain

    req_recv_next获取当前用于接收数据的msg,设置到conn->rmsg中,并且返回rmsg,然后传给msg_recv_chain,每次重新接收一个完整的请求时,rmsg为空,如果某次读取只读取了请求的一部分,则rmsg不为空,下次读取时数据继续追加到上一次的msg中。

    重点看msg_recv_chain()做的事情:

    1. 从conn->rmsg中拿出最后一个mbuf(一个msg的数据实际上存在mbuf中,一个msg可以包含多个mbuf,同样通过队列组织),下一次读最多读最后一个mbuf的剩余空间大小,如果最后一个mbuf满了,分配一个新的,插入到rmsg的mbuf队列尾部

    2. 循环从conn读数据,如果读取到的数据小于参入的buf大小,设置conn->recv_ready为0,表示后续没有数据要读了,外部的while(conn->recv_ready)循环退出。同样,如果读操作返回0表示客户端主动断开连接,将conn的eof标记置位,同时recv_ready也清0

    3. 调用msg_parse()解析rmsg数据,如果成功解析到一条完整的命令,则继续调用msg_parsed(msg),由于msg由多个mbuf组成,并且TCP是流式协议,所以一次读可能接收到了多条完整的命令,甚至是部分命令。这时,msg_parsed(msg)会将后面这些多余的数据拷贝到一个新的mbuf中,并且产生一个新的msg,作为conn的rmsg。由于一次读可能会读到多条命令,这就是为什么msg_recv_chain()中有下面这个循环:

    for (;;) {
              status = msg_parse(ctx, conn, msg); //每次解析一条命令                                                                                                                                                     
              if (status != NC_OK) {
                   return status;
               }
      
              /* get next message to parse */
              nmsg = conn->recv_next(ctx, conn, false);
              if (nmsg == NULL || nmsg == msg) {
                 /* no more data to parse */
                 break;
              }
              msg = nmsg;
          }

     4. 同时,每次调msg_parse(ctx,conn,msg)解析出一条新的命令后,都会回调req_recv_done()方法。这个方法对请求进行过滤(req_filter)和转发(req_forward)给后端server。

     5. msg_recv_chain()完成。

    回到msg_recv,根据conn->recv_ready来判断是否连接中还有未读数据,有则继续读,parse。。

    下面看请求转发给后端函数 req_forward():

     1. 将解析出来的msg(以后将从客户端发过来的msg叫做req_msg)push进conn的output队列

     2. 根据key和策略从server pool中选择一个server,并且获得和server的连接,记作server_conn,将req_msg push到server_conn的输入队列,起server_conn的写事件

    至此,req_msg从client接收到解析到转发结束。下面看发消息给后端server,从后端server接收响应,将响应回复给客户端的过程

    起server_conn写事件后,回调函数msg_send(struct context *ctx, struct conn *conn):

      rstatus_t
      msg_send(struct context *ctx, struct conn *conn)                                                                                                                                                           
      {    
          rstatus_t status;       
          struct msg *msg;        
      
          ASSERT(conn->send_active);
      
          conn->send_ready = 1;   
          do {
              msg = conn->send_next(ctx, conn); 
              if (msg == NULL) {  
                  /* nothing to send */          
                  return NC_OK;   
              }
      
              status = msg_send_chain(ctx, conn, msg);
              if (status != NC_OK) {
                  return status;
              }
      
          } while (conn->send_ready);

    可以看出和接收函数msg_recv类似。req_send_next()从server_conn的input队列中拿出一个待发消息赋值给conn->smsg。然后调用msg_send_chain()对消息进行实际的发送。

    msg_send_chain()流程如下:

    1. 准备NC_IOV_MAX个iov,遍历smsg的mbuf队列,每个mbuf的数据用一个iov指向。

    2. 循环调用req_send_next()不断的取出待发送的smsg,将其加入到一个局部消息队列send_msgq,同时遍历smsg的mbuf队列,每个mbuf用一个iov指向,直到NC_IOV_MAX个iov被装满,或者没有消息需要发送了,记录下装进去的消息总大小,记作nsend。

    3. 循环往fd上写,返回成功写的大小nsent,遍历send_msgq中的msg,和msg中的mbuf队列,将已发送成功的mbuf置为空,并且将发送了部分msg的pos指向第一个未发送的字节。并且,如果一个msg的mbuf队列中的所有的mbuf都发送完成了,则将调用req_send_done(),将这个msg(指针)从这个连接的input队列中删除,并且放入到连接的output队列中。从这里可以看出,只有一个msg的所有的mbuf都被发送出去了才会从input队列中删除,如果只发送了部分mbuf,这些mbuf会被标记为空,下次继续发送这个msg时,会略过空的msg,实现一个msg过大或者网络阻塞导致需要多次发送才能发出一个msg的情况。

    4. 至此,发送流程分析完成。

    Redis接收到消息,处理返回,proxy接收响应,和proxy接收client的数据类似,同样调用msg_recv()

      rstatus_t
      msg_recv(struct context *ctx, struct conn *conn)                                                                                                                                                           
      {    
          rstatus_t status;       
          struct msg *msg;        
      
          ASSERT(conn->recv_active);
      
          conn->recv_ready = 1;   
          do {
              msg = conn->recv_next(ctx, conn, true); //rsp_recv_next()
              if (msg == NULL) {  
                  return NC_OK;   
              }
      
              status = msg_recv_chain(ctx, conn, msg);
              if (status != NC_OK) {
                  return status;
              }
          } while (conn->recv_ready);    
      
          return NC_OK;
      }

    只是,在这里,conn->recv_next指向的函数是rsp_recv_next(),不是req_recv_next()。同理,接收回消息的处理函数是rsp_recv_done(),不是req_recv_done()

    rsp_recv_next():

    1. 如果当前conn的rmsg为空,则分配一个新的返回,否则返回当前这个rmsg

    2. 将上一步返回的rmsg传给msg_recv_chain()处理。这个函数之前在proxy接收client请求时分析了,不再赘述。

    收到一个完整的响应rsp_msg后,调rsp_recv_done():

    1. 同样和前面类似,在这里调用rsp_filter()和rsp_forward(),而不是req_filter()和req_forward()

    重点说rsp_forward():

    1. 从连接的output队列中弹出第一个元素,记作req_msg,这个msg即是目前收到的这个msg(rsp_msg)相对应的请求msg

    2. 将req_msg的标记done置为1,表示这个请求已完成。

    3. 通过以下两个语句将这两个msg建立对应关系:

    pmsg->peer = msg; //pmsg为req_msg,msg为rsp_msg
    msg->peer = pmsg

    4. 从pmsg的owner可以找到所属的client conn,然后启client conn的写事件。

    5. 至此,proxy从后端server接收响应分析完成。

    最后,看一下proxy将响应写给client的流程。

    类似,调用msg_send():

    rstatus_t
      msg_send(struct context *ctx, struct conn *conn)
      {
          rstatus_t status;
          struct msg *msg;
      
          ASSERT(conn->send_active);
      
          conn->send_ready = 1;
          do {
              msg = conn->send_next(ctx, conn); // rsp_send_next()
              if (msg == NULL) {
                  /* nothing to send */
                  return NC_OK;
              }
      
              status = msg_send_chain(ctx, conn, msg);
              if (status != NC_OK) {
                  return status;
              }
      
          } while (conn->send_ready);
      
          return NC_OK;
      }               

    同理,在这里,conn->send_next函数指针指向rsp_send_next():

    1. 从client conn的output队列中拿出msg,记作req_msg,从req_msg的peer字段将相应的rsp_msg拿出来,放在conn的smsg上待发送

    发出完成后,调用rsp_send_done(),主要做的事就是将req_msg从client conn的output队列中删除。

  • 相关阅读:
    .NetCore Grpc 客服端 工厂模式配置授权
    DOCKER 拉取 dotnet 镜像太慢 docker pull mcr.microsoft.com too slow
    Introducing .NET 5
    VSCode 出现错误 System.IO.IOException: The configured user limit (128) on the number of inotify instances has been reached.
    Omnisharp VsCode Attaching to remote processes
    zookeeper3.5.5 centos7 完全分布式 搭建随记
    Hadoop2.7.7 centos7 完全分布式 配置与问题随记
    MySQL索引 索引分类 最左前缀原则 覆盖索引 索引下推 联合索引顺序
    SQL基础随记3 范式 键
    MySQL调优 优化需要考虑哪些方面
  • 原文地址:https://www.cnblogs.com/foxmailed/p/3623817.html
Copyright © 2011-2022 走看看