zoukankan      html  css  js  c++  java
  • [转]ceph网络通信 数据结构分析1

    对于一个分布式存储系统,需要一个稳定的底层网络通信模块,用于各个节点的之间的互联互通。

    对于一个网络通信系统,要求:

    • 高性能

              性能评价的两个指标: 带宽和延迟

    • 稳定可靠

              在网络中断时,实现重连。数据不丢包


    在msg的子目录下, 分别对应三种不同的实现方式:Simple, Async, XIO

    Simple是相对比较简单,目前可以在生产环境中使用的模式。 它最大的特点是,每一个链接,都创建两个线程,一个专门用于接收,一个专门用于发送。

    这样的模式实现起来比较简单,但是对于大规模的集群部署,大量的链接会产生大量的线程,占用太多资源,影响网络的性能。

    Async模式使用了基于事件的IO 多路复用模式。这是目前比较通用的方式,没有用第三方库,实现起来比较复杂。目前还处于试验阶段。

    XIO模式:使用了开源的的网络通信模块accelio 来实现。这种需要依赖第三方库。实现起来可以简单一些。但是需要对accelio的使用方式熟悉,accelio出问题要有bug fix的能力。目前也处于试验阶段。

    特别注意的是,前两种方式只支持 tcp/ip 协议,XIO 可以支持 Infiniband网络。

    网络通信的接口

    在src/msg 的目录里,定义了网络模块的接口。
    在源代码src/msg 里实现了ceph 的网络通信模块。在msg目录下,定义了网络通信的抽象接口。

    msg/Message.cc
    msg/Message.h    定义了message

    msg/Connection.h  关于connection 的接口定义
    msg/Dispatcher.h   关于Dispatcher

    msg/Messenger.cc  定义了Messenger
    msg/Messenger.h

    msg/msg_types.cc   定义了消息的类型
    msg/msg_types.h

    Message

    Message 是所有消息的基类,任何要发送的消息,都要继承该类。对于消息,其发送格式如下:

    1  header   + user_data  +  footer

    header是消息头,类似一个消息的信封(envelope),保存消息的一些描述信息。user_data 是用于要发送的实际数据, footer是一个消息尾,保存了消息的效验信息和结束标志。

    user_data

    1    payload  + middle  + data

    用户数据在Message里 有三部分组成: payload, 一般保存ceph操作的元数据;middle 预留,目前没有使用到;data 一般为读写的数据。

    其对应的数据结构如下:

    ceph_msg_header

     1 struct ceph_msg_header {
     2     __le64 seq;       /* message seq# for this session */ 
     3                    当前session内 消息的唯一 序号
     4     __le64 tid;        /* transaction id */
     5                    消息的全局唯一的 id
     6     __le16 type;      /* message type */
     7                    消息类型
     8     __le16 priority;    /* priority.  higher value == higher priority */
     9                    优先级
    10     __le16 version;    /* version of message encoding */
    11                    版本
    12     __le32 front_len;   /* bytes in main payload */
    13                      payload 的长度
    14     __le32 middle_len;  /* bytes in middle payload */
    15                      middle 的长度
    16     __le32 data_len;    /* bytes of data payload */
    17                      data 的 长度
    18     __le16 data_off;    /* sender: include full offset;
    19                      对象的数据偏移量
    20                      receiver: mask against ~PAGE_MASK */
    21     struct ceph_entity_name src;
    22                      //消息源
    23     /* oldest code we think can decode this.  unknown if zero. */
    24     __le16 compat_version;
    25     __le16 reserved;
    26     __le32 crc;       /* header crc32c */
    27 } __attribute__ ((packed));

    ceph_msg_footer

    1 struct ceph_msg_footer {
    2     __le32 front_crc, middle_crc, data_crc;
    3                      //分别对应crc 效验码
    4     __le64  sig;      // 消息的64位  signature
    5     __u8 flags;       //结束标志
    6 } __attribute__ ((packed));

    Message

     1 class  Message{
     2   ceph_msg_header  header;      // 消息头
     3   ceph_msg_footer  footer;       // 消息尾
     4   bufferlist         payload;       // "front" unaligned blob
     5   bufferlist         middle;        // "middle" unaligned blob
     6   bufferlist       data;     // data payload (page-alignment will be preserved where possible)
     7   /* recv_stamp is set when the Messenger starts reading the
     8    * Message off the wire */
     9   utime_t recv_stamp;   //开始接收数据的时间戳
    10   /* dispatch_stamp is set when the Messenger starts calling dispatch() on
    11    * its endpoints */
    12   utime_t dispatch_stamp;  // dispatch 的时间戳
    13   /* throttle_stamp is the point at which we got throttle */
    14   utime_t throttle_stamp;
    15   /* time at which message was fully read */
    16   utime_t recv_complete_stamp;  //接收完成的时间戳
    17   ConnectionRef connection;  //链接
    18   uint32_t magic;
    19   bi::list_member_hook<> dispatch_q;  
    20    //boost::intrusive list 的 member
    21 }

    Connection

    类Connection 就对应的一个链接,它是socket的port 对port 链接的封装。其最重要的接口,就是可以发送消息

     1 struct Connection : public RefCountedObject {
     2   mutable Mutex lock;    //锁包括 Connection的所有字段
     3   Messenger *msgr;     
     4   RefCountedObject *priv;  //私有数据
     5   int peer_type;             //链接的类型
     6   entity_addr_t peer_addr;    //对方的地址
     7   utime_t last_keepalive, last_keepalive_ack;  //最后一次发送keeplive的时间 和最后一次接受keepalive的时间
     8 private:
     9   uint64_t features;           //一些feature的标志位
    10 public:
    11   bool failed;       // true if we are a lossy connection that has failed.
    12   int rx_buffers_version;  //接收缓存区的版本
    13   map<ceph_tid_t,pair<bufferlist,int> > rx_buffers;  //接收缓冲区
    14          ceph_tid --> (buffer, rx_buffers_version)
    15 }

    其最重要的功能,就是发送消息的接口

      virtual int send_message(Message *m) = 0;

    Dispatcher

    类Dispatcher 是接收消息的接口。 其接收消息的接口为:

    1 virtual bool ms_dispatch(Message *m) = 0;
    2 virtual void ms_fast_dispatch(Message *m);

    无论是Server端,还是Client 端, 都需要实现一个Dispatcher 函数,对于Server 来接收请求,对应client 端,来接收ack应答。

    Messenger

    Messenger 是整个网络模块功能类的抽象。其定义了网络模块的基本功能接口。网络模块对外提供的基本的功能,就是能在节点之间发送消息。

    向一个节点发送消息

    virtual int send_message(Message *m, const entity_inst_t& dest) = 0;

    注册一个,用来接收消息

    void add_dispatcher_head(Dispatcher *d)

    网络模块的使用

    通过下面的最基本的服务器和客户端的程序的展示,了解如何调用网络通信来完成收发请求的功能。

    Server 程序

    其源代码在 test/simple_server.cc里,这里只展示有关网络部分的核心流程。

    1.调用 Messenger的函数create 创建一个Messenger的实例,g_conf->ms_type为实现的类型,目前有三种方式,simple,async,xio.

    1  messenger = Messenger::create(g_ceph_context, g_conf->ms_type,
    2                   entity_name_t::MON(-1),
    3                   "simple_server",
    4                   0 /* nonce */);

    2.设置 messenger 的属性

    1 messenger->set_magic(MSG_MAGIC_TRACE_CTR);
    2 messenger->set_default_policy(
    3   Messenger::Policy::stateless_server(CEPH_FEATURES_ALL, 0));

    3.对于 server,需要bind 服务端地址

    1 r = messenger->bind(bind_addr);
    2 if (r < 0)
    3     goto out;
    4 common_init_finish(g_ceph_context);

    4.创建一个Dispatcher,并添加到Messenger

    1 dispatcher = new SimpleDispatcher(messenger);
    2 messenger->add_dispatcher_head(dispatcher); 

    5.启动messenger

    1 messenger->start();
    2 messenger->wait(); // can't be called until ready()

    SimpleDispatcher 函数里实现了ms_dispatch,用于处理接收到的各种请求消息。

    Client 程序分析

    1.调用 Messenger的函数create 创建一个Messenger的实例

    1 messenger = Messenger::create(g_ceph_context, g_conf->ms_type,
    2                       entity_name_t::MON(-1),
    3                       "client",
    4                       getpid());
    5 
    6 messenger->set_magic(MSG_MAGIC_TRACE_CTR);
    7 messenger->set_default_policy(Messenger::Policy::lossy_client(0, 0));

    3.创建Dispatcher 类并添加,用于接收消息

    1 dispatcher = new SimpleDispatcher(messenger);
    2 messenger->add_dispatcher_head(dispatcher);
    3 dispatcher->set_active(); // this side is the pinger

    4.启动消息

    1 r = messenger->start();
    2 if (r < 0)
    3     goto out;

    5.下面开始发送请求,先获取目标server 的链接

    1 conn = messenger->get_connection(dest_server);

    6.通过Connection来发送请求消息。需要注意的是,这里的消息发送都是异步发送,请求的ack应对消息回来后在Dispatcher的 ms_dispatch或者ms_fast_dispatch里处理。

     1 Message *m;
     2 for (msg_ix = 0; msg_ix < n_msgs; ++msg_ix) {
     3   /* add a data payload if asked */
     4   if (! n_dsize) {
     5     m = new MPing();
     6   } else {
     7     m = new_simple_ping_with_data("simple_client", n_dsize);
     8   }
     9   conn->send_message(m);
    10 }

    Simple

    Simple 是ceph里比较早,目前也比较稳定,在生产环境中使用的网络通信模块。

    SimpleMessager

     1 Accepter accepter;    
     2  用于接受客户端的链接请求
     3 DispatchQueue dispatch_queue; 
     4  接收到的请求的消息分发队列
     5  bool did_bind;   
     6    是否绑定
     7   /// counter for the global seq our connection protocol uses
     8   __u32 global_seq;
     9   /// lock to protect the global_seq
    10   ceph_spinlock_t global_seq_lock;
    11   /**
    12    * hash map of addresses to Pipes
    13    *
    14    * NOTE: a Pipe* with state CLOSED may still be in the map but is considered
    15    * invalid and can be replaced by anyone holding the msgr lock
    16    */
    17   ceph::unordered_map<entity_addr_t, Pipe*> rank_pipe;
    18   /**
    19    * list of pipes are in teh process of accepting
    20    *
    21    * These are not yet in the rank_pipe map.
    22    */
    23   set<Pipe*> accepting_pipes;
    24   /// a set of all the Pipes we have which are somehow active
    25   set<Pipe*>      pipes;
    26   /// a list of Pipes we want to tear down
    27   list<Pipe*>     pipe_reap_queue;
    28   /// internal cluster protocol version, if any, for talking to entities of the same type.
    29   int cluster_protocol;

    Accepter

    类Accepter 用来在server端监听,接受链接。其继承了Thread类,本身是一个线程,可以不断的监听server 的端口。

    DispatchQueue

    DispatchQueue 类用于把接收到的请求保存在内部, 通过其内部的线程,调用SimpleMessenger 类注册的 Dispatch 类的处理函数来处理相应的消息。

    class DispatchQueue {
      ......
      mutable Mutex lock;
      Cond cond;
      class QueueItem {
        int type;
        ConnectionRef con;
    MessageRef m;
    ......
      };
      PrioritizedQueue<QueueItem, uint64_t> mqueue;    //基于优先级的 优先队列
      set<pair<double, Message*> > marrival;  
         集合 (recv_time, message) 
      map<Message *, set<pair<double, Message*> >::iterator> marrival_map;
        消息 到  所在集合位置的 映射
    };

    其内部的mqueue 为优先级队列,用来保存消息, marriaval 保存了接收到的消息。marrival_map 保存消息在 集合中的位置。

    函数DispatchQueue::enqueue 用来把接收到的消息添加到 队列中,函数DispatchQueue::entry 为线程的处理函数,用于调用用户注册的Dispatcher类相应的处理函数来处理消息 。

    Pipe

    类Pipe 是PipeConnection的具体实现类。其实现了两个端口之间类似管道的通信接口。

    对于每一个pipe,内部有一个Reader线程 和 一个Writer 线程,分别用来处理有关这个Pipe的消息接收和发送。线程DelayedDelivery用于故障注入测试使用。

    类Pipe的数据结构介绍如下:

     1 SimpleMessenger *msgr;   //   msgr的指针
     2 uint64_t conn_id;     //分配给Pipe 自己唯一的id
     3 char *recv_buf;         //接收缓存区
     4 int recv_max_prefetch;   //接收缓冲区一次预期的最大值
     5 int recv_ofs;            //接收的偏移量
     6 int recv_len;            //接收的长度
     7 int sd;               //pipe 对应的 socked fd
     8 struct iovec msgvec[IOV_MAX];   //发送消息的 iovec 结构
     9 int port;       //链接短裤
    10 int peer_type;  //链接对方的类型
    11 entity_addr_t peer_addr;  //对方地址
    12 Messenger::Policy policy;   //策略
    13 Mutex pipe_lock;
    14 int state;            //当前链接的状态
    15 atomic_t state_closed;    // non-zero iff state = STATE_CLOSED
    16 PipeConnectionRef connection_state;   //PipeConnection 的引用
    17 utime_t backoff;         // backoff time
    18 bool reader_running, reader_needs_join;
    19 bool reader_dispatching;    /// reader thread is dispatching without pipe_lock
    20 bool notify_on_dispatch_done;   /// something wants a signal when dispatch done
    21 bool writer_running;
    22 map<int, list<Message*> > out_q;  // priority queue for outbound msgs
    23    准备发送的消息 优先队列
    24 DispatchQueue *in_q;   //接收消息的DispatchQueue
    25 list<Message*> sent;   //要发送的消息
    26 Cond cond;
    27 bool send_keepalive;
    28 bool send_keepalive_ack;
    29 utime_t keepalive_ack_stamp;
    30 bool halt_delivery;     //if a pipe's queue is destroyed, stop adding to it
    31 __u32 connect_seq, peer_global_seq;
    32 uint64_t out_seq;      发送消息的序列号 
    33 uint64_t in_seq, in_seq_acked;  接收到消息序号和 应对的序号

    消息的发送

    1.当发送一个消息时,首先要通过Messenger类,获取对应的 Connection
      conn = messenger->get_connection(dest_server);

    具体到 SimpleMessenger的实现:


    首先比较,如果dest.addr 是my_inst.addr,就直接返回 local_connection
    调用函数_lookup_pipe 在已经存在的Pipe中查找,如果找到,就直接返回pipe->connection_state,否则调用函数connect_rank 创建一个Pipe,并加入到msgr的register_pipe 里


    2.当获得一个Connection之后,就可以调用 Connection 的 发送函数,发送消息
          conn->send_message(m);

    其最终调用了SimpleMessenger::submit_message 函数


    如果Pipe 不为空,并且状态不是Pipe::STATE_CLOSED 状态,调用函数pipe->_send 把发送的消息添加到out_q 发送队列里,触发发送线程
    如果Pipe 为空,就调用connect_rank 创建Pipe,并把消息添加到out_q 中


    3.发送线程writer把消息发送出去
    通过步骤2,要发送的消息Messae已经保存在相应Pipe的out_q队列里。并触发了发送线程。每个Pipe的Writer 线程负责发送out_q 的消息,其线程入口函数为Pipe::writer, 实现功能:


    调用函数_get_next_outgoing 从out_q 中获取消息
    调用函数 write_message(header, footer, blist) 把消息的header,footer,数据blist 发送出去

    消息的接收

    1.接收消息,每个Pipe对应的线程 Reader 用于接收消息

      其入口函数为  Pipe::reader, 其功能如下:

           1)判断当前的state,如果为STATE_ACCEPTING, 就调用函数Pipe::accept 来接受连接,如果不是STATE_CLOSED,并且不是     STATE_CONNECTING 状态,就接收消息
           2)先调用函数tcp_read 来接收一个tag
           3)根据tag ,来接收不同类型的消息

    1 CEPH_MSGR_TAG_KEEPALIVE 消
    2 CEPH_MSGR_TAG_KEEPALIVE2, 在CEPH_MSGR_TAG_KEEPALIVE的基础上,添加了时间
    3 CEPH_MSGR_TAG_KEEPALIVE2_ACK
    4 CEPH_MSGR_TAG_ACK
    5 CEPH_MSGR_TAG_MSG   这里才是接收的消息
    6 CEPH_MSGR_TAG_CLOSE

    2.调用函数read_message 来接收消息, 当本函数返回后,接完成了接收消息。

    3.调用函数in_q->fast_preprocess(m) 预处理消息
    4.调用函数in_q->can_fast_dispatch(m),如果可以fast dispatch,  就in_q->fast_dispatch(m)处理。 特别注意的是,fast_dispatch 并不把消息加入到 mqueue里,而是直接调用msgr->ms_fast_dispatch 函数,并最终调用注册的fast_dispatcher 函数处理。
    5.否则调用函数in_q->enqueue(m, m->get_priority(), conn_id) , 本函数把接收到的消息加入到DispatchQueue的mqueue 队列里, 由DispatchQueue的线程调用ms_dispatch处理。


    在这里,需要注意的是 ms_fast_dispath 和 ms_dispatch 两种处理的区别。ms_dispatch 是由DispatchQueue的线程处理的,它是一个单线程;ms_fast_dispatch的调用是由Pipe的接收线程直接处理的,因此性能比前者要好。

    错误处理

    网络模块,最重要的是如何处理网络错误。无论是在接收消息还是发送消息的过程中,都会出现各种异常错误,包括返回异常错误码,接收数据的magic验证不一致,接收的数据的效验验证不一致等等。 错误的原因主要是网络本身的错误(物理链路等),或者字节跳变引起的。

    处理的方法比较简单:

    • 关闭连接
    • 重新建立连接
    • 重新发送没有接受到ack应对的消息

    函数 Pipe::fault 用来处理错误

    1. 调用shutdown_socket 关闭Pipe 的socket
    2. 调用函数requeue_sent  把没有收到ack的消息重新加入发送队列,当发送队列有请求时,发送线程会不断的尝试重新连接


    ---------------------
    作者:Jack-changtao
    来源:CSDN
    原文:https://blog.csdn.net/changtao381/article/details/50915328
    版权声明:本文为博主原创文章,转载请附上博文链接!

  • 相关阅读:
    Windows_10 系统封装
    leetcode-75 颜色分类
    leetcode-922 按奇偶排序数组 II
    leetcode-905 按奇偶数排序
    UVA-10827 环面上的最大子矩阵和
    leetcode918 环形最大子数组
    leetcode-85 最大矩形
    leetcode-84 柱状图中的最大矩形
    leetcode-221 最大正方形
    leetcode-713 乘积小于k的数组
  • 原文地址:https://www.cnblogs.com/yi-mu-xi/p/10244828.html
Copyright © 2011-2022 走看看