zoukankan      html  css  js  c++  java
  • 深入mongoDB(1)mongod的线程模型与网络框架

    最近工作需要开始研究mongoDB,我准备从其源代码角度,对于mongod和mongos服务的架构、sharding策略、 replicaset策略、数据同步容灾、索引等机制做一个本质性的了解。其代码约20万行(我研究的是 2.0.6版本源码),本篇先从mongod的 启动流程说起,它本是一个多线程程序,所以本文在于说明mongod有多少个线程,每个线程的意义所在。希望大家阅读本文时关注在mongod的外围框 架,暂不涉及数据文件的组织、索引B树的组织等,仅focus in在网络框架、线程模型上。

    弄清楚这点的好处很明显:之后就可以有的放矢的研究mongod某个模块究竟是如何实现的,可以快速的跳到相应的类中阅读源码,解决我们在产品中的实际问题。我认为这是研究其庞大源码一个好的开始。

    在说明mongod前,须了解mongoDB大量代码是基于boost库构建的,因此这里先行对boost库建立线程做个简单的了解。

    1、boost库如何建立线程

    boost::thread是boost中跨平台的多线程库,mongoDB创建线程时大多数情况下是使用thread库的(少量情况直接调用pthread_create方法),主要使用了以下两种方式:

    (1)直接运行让线程运行func

    例如durThread线程:

    void durThread() {

    while( !inShutdown() ) { ... }

    }

    boost::thread t(durThread);

    (2)在类中定义静态的run方法,调用thread创建线程

        class FileAllocator : boost::noncopyable {
            static void run( FileAllocator * fa );

            void FileAllocator::start() {
                 boost::thread t( boost::bind( &FileAllocator::run , this ) );
            }
        };

    2、mongod的入口

    mongod的入口main函数在src/mongo/db/db.cpp文件中,我画了个简单的活动图简要介绍其启动流程:

    如上图所示,这里出现了12个固定线程,还没有包括mongod运行以后处理请求时派生出来的线程,如下所示:

    –      interruptThread

    –      DataFileSync::run

    –      FileAllocator::run

    –      durThread

    –      SnapshotThread::run

    –      ClientCursorMonitor::run

    –      PeriodicTask::Runner::run

    –      TTLMonitor::run

    –      replSlaveThread

    –      replMasterThread

    –      webServerThread

    –      处理数据库请求的主线程

    如果不属于任何replica set,那么至少有10个固定线程(去除 replSlaveThread和 replMasterThread)。

    下面我们先讨论这10个固定的线程,再讨论性能非常弱的监听web事件的线程是怎样处理请求的,最后讨论性能稍好一点的主服务线程是怎样处理请求的。

    3、5个基于BackgroundJob类实现的工作线程

    这5个线程分别是DataFileSync,SnapshotThread, ClientCursorMonitor, TTLMonitor, PeriodicTask,类图如下所示:

    上面这5个类也是用boost::threadfunction方法创建线程运行的,它们继承了BackgroundJob类,使用go方法启动线程执行jobBody就是在启动线程执行run方法,如下所示:

    1. BackgroundJob& BackgroundJob::go() {  
    2.     boost::thread t( boost::bind( &BackgroundJob::jobBody , this, _status ) );  
    3.     return *this;  
    4. }  
    5.   
    6. void BackgroundJob::jobBody( boost::shared_ptr<JobStatus> status ) {  
    7.     ...  
    8.     run();  
    9.     ...  
    10. }     


    这些线程的意义如下:

    DataFileSync 主要在调用MemoryMappedFile::flush方法将内存中的数据刷到磁盘上。 我们知道,mongodb是调用mmap把磁盘中的数据映射到内存中的,所以必须有一个机制时刻的刷数据到硬盘才能保证可靠性,多久刷一次是与 syncdelay参数相关的。

    SnapshotThread将生成快照文件帮助快速恢复。

    ClientCursorMonitor将管理用户的游标,每4秒调用一次idleTimeReport()方法,每一分钟调用sayMemoryStatus()方法。

    TTLMonitor管理TTL,通过调用doTTLForDB()方法检查所有db。

    PeriodicTask将从动态数组std::vector<PeriodicTask* > _tasks中获取周期性任务执行。

    4、5个直接提供全局方法执行的线程

    FileAllocator用于分配新文件,它决定分配文件的大小,例如用翻倍的方式。

    interruptThread只处理信号量。

    durThread做批量提交和回滚工作。

    replSlaveThread是当前结点作为secondary时的同步线程。

    replMasterThread是当前结点作为master时的同步线程。

    5、web监听线程

    mongod是如何处理web请求的呢?它是通过网络框架中的核心类Listerner实现的,类图如下所示:

    怎么理解这幅类图呢?

    首先看 Listener类,它负责监听、创建新连接,其工作步骤如下:

    a、创建socket句柄,绑定端口,监听

    b、调用select检测新连接事件

    c、对检测到的事件调用accept建立新连接

    d、调用void Listener::acceptedMP(MessagingPort*mp)方法处理新连接,谁重新实现acceptedMP方法谁决定处理方式

    这个Listener类既用于处理web请求,也用于处理普通的数据库请求。

    OK, 现在我们看web请求是如何处理的。MiniWebServer类继承了Listener类,它重新实现了acceptedMP方法,开始接收TCP流, 解析HTTP协议,同时还会负责组装HTTP响应包并发送TCP流到客户端。那么实际完成http请求的类是谁呢?它是继承了MiniWebServer 类的DbWebServer类。这个类重新实现了doRequest方法,它会在完整接收到HTTP请求后被调用,HTTP请求的处理过程不在本篇的讨论 范围内,这里略过。但我们清楚了,这个线程采用同步的阻塞的方式处理请求,它意味着它同一时刻只能处理一个web请求,并发能力超级弱,还好web请求只 是mongod的副业,仅用于查询状态。

    6、主监听线程和数据请求的处理线程

    处理数据库请求的是上图中的PortMessageServer 类,它运行在主线程中。

    我们先看看PortMessageServer 类是如何实现acceptedMP方法的:

    1. virtual voidacceptedMP(MessagingPort * p) {  
    2.     if ( !connTicketHolder.tryAcquire() ) {  
    3.         sleepmillis(2); // otherwisewe'll hard loop  
    4.         return;  
    5.     }  
    6.      …  
    7.         int failed =pthread_create(&thread, &attrs, (void*(*)(void*)) &pms::threadRun,p);  
    8.         …  
    9. }  


    很清晰,它开启了一个线程独立的执行这个请求。虽然这种方式依然性能极差:大量的进程间上下文切换在等着我们,但总比web请求处理要好多了,而且mongod的并发能力本来就不是它的长项。

    对于每个新连接,都会有类封装成对象,如下:

    接下来pms::threadRun方法是在处理MessagingPort对象。

    下面看看pms::threadRun方法中做了些什么:

    1. void threadRun( MessagingPort *inPort) {  
    2.     TicketHolderReleaserconnTicketReleaser( &connTicketHolder );  
    3.     Message m;  
    4.     try {  
    5.         LastError * le = newLastError();  
    6.         lastError.reset( le ); //lastError now has ownership  
    7.         handler->connected( p.get());  
    8.         while ( ! inShutdown() ) {  
    9.             if ( ! p->recv(m) ) {  
    10.                 p->shutdown();  
    11.                 break;  
    12.             }  
    13.             handler->process( m ,p.get() , le );  
    14.         }  
    15.     }  
    16.     handler->disconnected( p.get());  
    17. }  


    可以看到,它会在这个连接上接收完整的请求,之后会调用handler的process方法。这个handler又是什么呢?如下图所示:

    所以,普通的数据库请求是由MyMessageHandler的process方法处理的。这个方法里也只是个封装,真正处理业务的是全局方法assembleResponse。

    assembleResponse方法中会按照8种操作方式分别的调用DataFileMgr中的方法处理实际文件,例如:

    1. enum Operations {  
    2.     opReply = 1,     /* reply. responseTo is set. */  
    3.     dbMsg = 1000,    /* generic msg command followed by a string */  
    4.     dbUpdate = 2001, /* update object */  
    5.     dbInsert = 2002,  
    6.     //dbGetByOID = 2003,  
    7.     dbQuery = 2004,  
    8.     dbGetMore = 2005,  
    9.     dbDelete = 2006,  
    10.     dbKillCursors = 2007  
    11. };  


    在方法中有类似这样的代码在调用实际的业务类处理操作:

    1. else if ( op == dbInsert ) {  
    2.     receivedInsert(m, currentOp);  
    3. }  
    4. else if ( op == dbUpdate ) {  
    5.     receivedUpdate(m, currentOp);  
    6. }  
    7. else if ( op == dbDelete ) {  
    8.     receivedDelete(m, currentOp);  
    9. }  


    当然本篇志不在此,下篇我们再讨论索引和数据文件的操作。

  • 相关阅读:
    build.gradle文件详解<转> 推荐
    openGL 环境配置
    手写 大整数
    FOJ有奖月赛-2016年8月(daxia专场之过四题方有奖)
    2.1 基本计数方法
    第7章 代码
    第7章 高级数据结构的编程实验
    3.3 字符串(1)
    2016_NENU_CS_3
    3.2 区间信息的维护与查询
  • 原文地址:https://www.cnblogs.com/SequoiaDB/p/3274189.html
Copyright © 2011-2022 走看看