zoukankan      html  css  js  c++  java
  • [转]ceph网络通信模块_以monitor模块为例

    [摘自http://yuedu.163.com/book_reader/cda64592f9a746559e528c40ec3ceb1b_4/abff4ebcba54472188f22b8b4778cc7e_5

    https://blog.csdn.net/zhq5515/article/details/49814941]

    1.src/msg

    是客户端和服务器间通信的底层模块,用来在客户端和服务器间发送和接收请求。

    在src/msg 目录下:首先定义了一个网络通信的框架,完成通信接口和具体实现的分离,子目录Simple,Async,XIO分别是三种不同的实现方式。

    simple:每一个网络连接都会创建两个线程,一个负责接收,一个负责发送。

    Async模式:使用了基于事件的I/O多路复用模式。是目前网络通信中广泛采用的方式,但是在ceph中,还实验阶段??

    XIO:使用了开源的网络通信库accelio来实现。实验阶段??

    1.1 相关的类

    1.1 .1 message

    类Message: src/msg/Message.h  Message.cc

    是所有消息的基类,任何要发送的消息都要继承该类。格式如图所示:

    header user_data

    footer

    1   ceph_msg_header  header;      // headerelope
    2   ceph_msg_footer  footer;
    3   bufferlist       payload;  // "front" unaligned blob
    4   bufferlist       middle;   // "middle" unaligned blob
    5   bufferlist       data;     // data payload (page-alignment will be preserved where possible)

    ceph_msg_header 是消息头,定义了消息传输相关的元数据;

    ceph_msg_footer为消息的尾部,附加了一些crc校验数据和消息结束标记。

    消息带的数据分别保存在payload,middle,data这三个bufferlist中。

    payload一般保存ceph操作相关的元数据;

    middle目前没有使用到;

    data一般为读写的数据。

    1.1.2 connection

    src/msg/connection.h

    是端(port)对端的socket的链接的封装。其最重要的接口是可以发送消息。

     1.1.3 messenger

    是整个网络抽象模块,定义了网络模块的基本api接口。

    该类作为消息的发布者, 各个 Dispatcher 子类作为消息的订阅者, Messenger 收到消息之后,通过 Pipe 读取消息,然后转给 Dispatcher 处理

    SimpleMessenger
    Messenger 接口的实现

    1.1.4 dispatcher

    src/msg/dispatcher.h

    dispatcher是消息分发的接口。

    server端注册dispatcher类用于把接收到的message请求分发给具体处理的应用层;client需要实现一个dispatcher函数用于处理接收到的ACK应对消息。

    该类是订阅者的基类,具体的订阅后端继承该类,初始化的时候通过 Messenger::add_dispatcher_tail/head 注册到 Messenger::dispatchers. 收到消息后,通知改类处理。Monitor, osd等都是继承自dispatcher。

    1.1.5 Accepter 

    监听 peer 的请求, 有新请求时, 调用 SimpleMessenger::add_accept_pipe() 创建新的 Pipe 到 SimpleMessenger::pipes 来处理该请求

    1.1.6 Pipe

    用于消息的读取和发送,该类主要有两个组件,Pipe::Reader 和 Pipe::Writer, 分别用来处理 消息的读取和发送. 这两个类都是 class Thread 的子类,意味这每次处理消息都会有两个 线程被分别创建.

    消息被 Pipe::Reader 读取后,该线程会通知注册到 Messenger::dispatchers 中的某一个 Dispatcher(如 Monitor) 处理, 处理完成之后将回复的消息放到 SimpleMessenger::Pipe::out_q 中,供 Pipe::Writer 来处理发送

    1.1.7 DispatchQueue

    该类用来缓存收到的消息, 然后唤醒 DispatchQueue::dispatch_thread 线程找到后端的 Dispatch 处理消息。

    1.2 详细过程

    1.3 monitor 例子

    初始化

     1 int main(int argc, char *argv[])
     2 {
     3     // 创建一个 Messenger 对象,由于 Messenger 是抽象类,不能直接实例化,提供了一个
     4     // ::create 的方法来创建子类,目前 Ceph 所有模块使用 SimpleMessenger
     5     Messenger *messenger = Messenger::create(g_ceph_context,
     6                                              entity_name_t::MON(rank),
     7                                              "mon",
     8                                              0);
     9 
    10     /**
    11      * 执行 socket() -> bind() -> listen() 等一系列动作, 执行流程如下:
    12      SimpleMessenger::bind()
    13          --> Accepter::bind()
    14              socket() -> bind() -> listen()
    15     */
    16     err = messenger->bind(ipaddr);
    17 
    18     // 创建一个 Dispatch 的子类对象, 这里是 Monitor
    19     mon = new Monitor(g_ceph_context, g_conf->name.get_id(), store, 
    20                       messenger, &monmap);
    21 
    22     // 启动 Reaper 线程
    23     messenger->start();
    24 
    25     /**
    26      * a). 初始化 Monitor 模块
    27      * b). 通过 SimpleMessenger::add_dispatcher_tail() 注册自己到
    28      * SimpleMessenger::dispatchers 中, 流程如下:
    29      * Messenger::add_dispatcher_tail()
    30      *      --> ready()
    31      *        --> dispatch_queue.start()(新 DispatchQueue 线程)
    32               --> Accepter::start()(启动start线程)
    33      *            --> accept?? accepter::entry
    34      *                --> SimpleMessenger::add_accept_pipe
    35      *                    --> Pipe::start_reader 启动reader_thread,reader_thread调用reader()函数
    36      *                        --> Pipe::reader()
    37      * 在 ready() 中: 通过 Messenger::reader(),
    38      * 1) DispatchQueue 线程会被启动,用于缓存收到的消息消息
    39      * 2) Accepter 线程启动,开始监听新的连接请求.
    40      */
    41     mon->init();
    42 
    43     // 进入 mainloop, 等待退出
    44     messenger->wait();
    45     return 0;
    46 }

    消息处理

    收到连接请求

    请求的监听和处理由 SimpleMessenger::ready –> Accepter::entry 实现

     1 void SimpleMessenger::ready()
     2 {
     3     // 启动 DispatchQueue 线程
     4     dispatch_queue.start();
     5 
     6     lock.Lock();
     7     // 启动 Accepter 线程监听客户端连接, 见下面的 Accepter::entry
     8     if (did_bind)
     9         accepter.start();
    10     lock.Unlock();
    11 }
    12 
    13 void *Accepter::entry()
    14 {
    15     struct pollfd pfd;
    16     // listen_sd 是 Accepter::bind()中创建绑定的 socket
    17     pfd.fd = listen_sd;
    18     pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
    19     while (!done) {
    20         int r = poll(&pfd, 1, -1);
    21         if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP))
    22             break;
    23         if (done) break;
    24         entity_addr_t addr;
    25         socklen_t slen = sizeof(addr.ss_addr());
    26         int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen);
    27         if (sd >= 0) {
    28             // 调用 SimpleMessenger::add_accept_pipe() 处理这个连接
    29             msgr->add_accept_pipe(sd);
    30         } 
    31     }
    32     return 0;
    33 }

    随后创建pipe()开始消息的处理

     1 Pipe *SimpleMessenger::add_accept_pipe(int sd)
     2 {
     3     lock.Lock();
     4     Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL);
     5     p->sd = sd;
     6     p->pipe_lock.Lock();
     7     // 
     8     /**
     9      * 调用 Pipe::start_reader() 开始读取消息, 将会创建一个读线程开始处理.
    10      * Pipe::start_reader() --> Pipe::reader
    11      */
    12     p->start_reader();
    13     p->pipe_lock.Unlock();
    14     pipes.insert(p);
    15     accepting_pipes.insert(p);
    16     lock.Unlock();
    17     return p;
    18 }

    创建消息读取和发送线程

    处理消息由 Pipe::start_reader() –> Pipe::reader() 开始,此时已经是在 Reader 线程中. 首先会调用 accept() 做一些简答的处理然后创建 Writer() 线程,等待发送回复 消息. 然后读取消息, 读取完成之后, 将收到的消息封装在 Message 中,交由 dispatch_queue() 处理.

    dispatch_queue() 找到注册者,将消息转交给它处理,处理完成唤醒 Writer() 线程发送回复消息.

     1 void Pipe::reader()
     2 {
     3     /**
     4      * Pipe::accept() 会调用 Pipe::start_writer() 创建 writer 线程, 进入 writer 线程
     5      * 后,会 cond.Wait() 等待被激活,激活的流程看下面的说明. Writer 线程的创建见后后面
     6      * Pipe::accept() 的分析
     7      */
     8     if (state == STATE_ACCEPTING) {
     9         accept();
    10     }
    11 
    12     while (state != STATE_CLOSED &&
    13            state != STATE_CONNECTING) {
    14         // 读取消息类型,某些消息会马上激活 writer 线程先处理
    15         if (tcp_read((char*)&tag, 1) < 0) {
    16             continue;
    17         }
    18         if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
    19             continue;
    20         }
    21         if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
    22             continue;
    23         }
    24         if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
    25             continue;
    26         }
    27         if (tag == CEPH_MSGR_TAG_ACK) {
    28             continue;
    29         }
    30         else if (tag == CEPH_MSGR_TAG_MSG) {
    31             // 收到 MSG 消息
    32             Message *m = 0;
    33             // 将消息读取到 new 到的 Message 对象
    34             int r = read_message(&m, auth_handler.get());
    35 
    36             // 先激活 writer 线程 ACK 这个消息
    37             cond.Signal();  // wake up writer, to ack this
    38 
    39             // 如果该次请求是可以延迟处理的请求,将 msg 放到 Pipe::DelayedDelivery::delay_queue, 
    40             // 后面通过相关模块再处理
    41             // 注意,一般来讲收到的消息分为三类:
    42             // 1. 直接可以在 reader 线程中处理,如上面的 CEPH_MSGR_TAG_ACK
    43             // 2. 正常处理, 需要将消息放入 DispatchQueue 中,由后端注册的消息处理,然后唤醒发送线程发送
    44             // 3. 延迟发送, 下面的这种消息, 由定时时间决定什么时候发送
    45             if (delay_thread) {
    46                 utime_t release;
    47                 if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
    48                     release = m->get_recv_stamp();
    49                     release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
    50                     lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
    51                 }
    52                 delay_thread->queue(release, m);
    53             } else {
    54 // 正常处理的消息,放到 Pipe::DispatchQueue *in_q 中, 以下是整个消息的流程
    55 // DispatchQueue::enqueue()
    56 //     --> mqueue.enqueue() -> cond.Signal()(激活唤醒                         DispatchQueue::dispatch_thread 线程) 
    57 //         --> DispatchQueue::dispatch_thread::entry() 该线程得到唤醒
    58 //             --> Messenger::ms_deliver_XXX
    59 //                 --> 具体的 Dispatch 实例, 如 Monitor::ms_dispatch()
    60 //                     --> Messenger::send_message()
    61 //                         --> SimpleMessenger::submit_message()
    62 //                             --> Pipe::_send()
    63 //                                 --> Pipe::out_q[].push_back(m) -> cond.Signal 激活 writer 线程
    64 //                                     --> ::sendmsg()//发送到 socket
    65                 in_q->enqueue(m, m->get_priority(), conn_id);
    66             }
    67         } 
    68 
    69         else if (tag == CEPH_MSGR_TAG_CLOSE) {
    70             cond.Signal();
    71             break;
    72         }
    73         else {
    74             ldout(msgr->cct,0) << "reader bad tag " << (int)tag << dendl;
    75             pipe_lock.Lock();
    76             fault(true);
    77         }
    78     }
    79 }

    Pipe::accept() 做一些简单的协议检查和认证处理,之后创建 Writer() 线程: Pipe::start_writer() –> Pipe::Writer

     1 int Pipe::accept()
     2 {
     3     ldout(msgr->cct,10) << "accept" << dendl;
     4     // 检查自己和对方的协议版本等信息是否一致等操作
     5     // ......
     6 
     7     while (1) {
     8         // 协议检查等操作
     9         // ......
    10 
    11         /**
    12          * 通知注册者有新的 accept 请求过来,如果 Dispatcher 的子类有实现
    13          * Dispatcher::ms_handle_accept(),则会调用该方法处理
    14          */
    15         msgr->dispatch_queue.queue_accept(connection_state.get());
    16 
    17         // 发送 reply 和认证相关的消息
    18         // ......
    19 
    20         if (state != STATE_CLOSED) {
    21             /**
    22              * 前面的协议检查,认证等都完成之后,开始创建 Writer() 线程等待注册者
    23              * 处理完消息之后发送
    24              * 
    25              */
    26             start_writer();
    27         }
    28         ldout(msgr->cct,20) << "accept done" << dendl;
    29 
    30         /**
    31          * 如果该消息是延迟发送的消息, 且相关的发送线程没有启动,启动之
    32          * Pipe::maybe_start_delay_thread()
    33          *     --> Pipe::DelayedDelivery::entry()
    34          */
    35         maybe_start_delay_thread();
    36         return 0;   // success.
    37     }
    38 }

    随后 Writer 线程等待被唤醒发送回复消息

     1 void Pipe::writer()
     2 {
     3     while (state != STATE_CLOSED) {// && state != STATE_WAIT) {
     4         if (state != STATE_CONNECTING && state != STATE_WAIT && state != STATE_STANDBY &&
     5             (is_queued() || in_seq > in_seq_acked)) {
     6 
     7             // 对 keepalive, keepalive2, ack 包的处理
     8             // ......
     9 
    10             // 从 Pipe::out_q 中得到一个取出包准备发送
    11             Message *m = _get_next_outgoing();
    12             if (m) {
    13                 // 对包进行一些加密处理
    14                 m->encode(features, !msgr->cct->_conf->ms_nocrc);
    15 
    16                 // 包头
    17                 ceph_msg_header& header = m->get_header();
    18                 ceph_msg_footer& footer = m->get_footer();
    19 
    20                 // 取出要发送的二进制数据
    21                 bufferlist blist = m->get_payload();
    22                 blist.append(m->get_middle());
    23                 blist.append(m->get_data());
    24 
    25                 // 发送包: Pipe::write_message() --> Pipe::do_sendmsg --> ::sendmsg()
    26                 ldout(msgr->cct,20) << "writer sending " << m->get_seq() << " " << m << dendl;
    27                 int rc = write_message(header, footer, blist);
    28                 m->put();
    29             }
    30             continue;
    31         }
    32 
    33         // 等待被 Reader 或者 Dispatcher 唤醒
    34         ldout(msgr->cct,20) << "writer sleeping" << dendl;
    35         cond.Wait(pipe_lock);
    36     }
    37 }

    消息的处理

    Reader 线程将消息交给 dispatch_queue 处理,流程如下:

    Pipe::reader() –> Pipe::in_q->enqueue()

     1 void DispatchQueue::enqueue(Message *m, int priority, uint64_t id)
     2 {
     3     Mutex::Locker l(lock);
     4     ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
     5     add_arrival(m);
     6     // 将消息按优先级放入 DispatchQueue::mqueue 中
     7     if (priority >= CEPH_MSG_PRIO_LOW) {
     8         mqueue.enqueue_strict(
     9             id, priority, QueueItem(m));
    10     } else {
    11         mqueue.enqueue(
    12             id, priority, m->get_cost(), QueueItem(m));
    13     }
    14     // 唤醒 DispatchQueue::entry() 处理消息
    15     cond.Signal();
    16 }
    17 
    18 void DispatchQueue::entry()
    19 {
    20     while (true) {
    21         while (!mqueue.empty()) {
    22             QueueItem qitem = mqueue.dequeue();
    23             Message *m = qitem.get_message();
    24             /**
    25              * 交给 Messenger::ms_deliver_dispatch() 处理,后者会找到
    26              * Monitor/OSD 等的 ms_deliver_dispatch() 开始对消息的逻辑处理
    27              * Messenger::ms_deliver_dispatch()
    28              *     --> Monitor::ms_dispatch()
    29              */
    30             msgr->ms_deliver_dispatch(m);
    31         }
    32         if (stop)
    33             break;
    34 
    35         // 等待被 DispatchQueue::enqueue() 唤醒
    36         cond.Wait(lock);
    37     }
    38     lock.Unlock();
    39 }

    下面简单看一下在订阅者的模块中消息是怎样被放入 Pipe::out_q 中的:

     1 Messenger::ms_deliver_dispatch()
     2     --> Monitor::ms_dispatch()
     3         --> Monitor::_ms_dispatch
     4             --> Monitor::dispatch
     5                 --> Monitor::handle_mon_get_map
     6                     --> Monitor::send_latest_monmap
     7                         --> SimpleMessenger::send_message()
     8                             --> SimpleMessenger::_send_message()
     9                                 --> SimpleMessenger::submit_message()
    10                                     --> Pipe::_send()
     1 bool Monitor::_ms_dispatch(Message *m)
     2 {
     3     ret = dispatch(s, m, src_is_mon);
     4 
     5     if (s) {
     6         s->put();
     7     }
     8 
     9     return ret;
    10 }
    11 
    12 bool Monitor::dispatch(MonSession *s, Message *m, const bool src_is_mon)
    13 {
    14     switch (m->get_type()) {
    15     case CEPH_MSG_MON_GET_MAP:
    16         handle_mon_get_map(static_cast<MMonGetMap*>(m));
    17         break;
    18     // ......
    19     default:
    20         ret = false;
    21     }
    22     return ret;
    23 }
    24 
    25 void Monitor::handle_mon_get_map(MMonGetMap *m)
    26 {
    27     send_latest_monmap(m->get_connection().get());
    28     m->put();
    29 }
    30 
    31 void Monitor::send_latest_monmap(Connection *con)
    32 {
    33     bufferlist bl;
    34     monmap->encode(bl, con->get_features());
    35     /**
    36      * SimpleMessenger::send_message()
    37      *     --> SimpleMessenger::_send_message()
    38      *         --> SimpleMessenger::submit_message()
    39      *             --> Pipe::_send()
    40      */
    41     messenger->send_message(new MMonMap(bl), con);
    42 }
    43 
    44 void Pipe::_send(Message *m)
    45 {
    46     assert(pipe_lock.is_locked());
    47     out_q[m->get_priority()].push_back(m);
    48     // 唤醒 Writer 线程
    49     cond.Signal();
    50 }

    由上面的所有分析,除了订阅者/发布者设计模式,对网络包的处理上采用的是古老的 生产者消费者问题 线程模型,每次新的请求就会有创建一对收/发线程用来处理消息的接受 发送,如果有大规模的请求,线程的上下文切换会带来大量的开销,性能可能产生瓶颈。

  • 相关阅读:
    linux随记
    springboot-2
    netty-lean1
    nginx
    自定义启动器
    arrayList add
    Mybatis 转义符
    idea 闪退 但是启动的服务还在解决办法
    java 通过map根据list某个字段进行合并
    java list的深拷贝
  • 原文地址:https://www.cnblogs.com/yi-mu-xi/p/10144362.html
Copyright © 2011-2022 走看看