zoukankan      html  css  js  c++  java
  • mongodb源码分析查询

         在之前的一篇文章中,介绍了mongodb的主程序入口main()的执行流程,其实main只是实始化一些参数信息并做了些后台线程任务的启动工作(包括数据准备和恢复),并最终启动一个线程进行循环侦听。今天将会介绍在mongodb中数据查询 (find)的流程,以了解mongodb是如果对message进行拆包分析,以及数据进行表扫描及索引使用的。
        
         好了,开始今天的正文吧!

         这里继续昨天的代码浏览过程,从connThread函数说起,看了上一篇文章的朋友都清楚了该函数主要工作就是不断循环[while ( 1 )]获取当前客户端发来的信息(上面已封装成了message)并将其信息进行分析,并根据相应操作标志位确定当前操作是CRUD或构建索引等[assembleResponse()],如果一些正常,则向客户端发送应答信息。而如果客户端连接提交了一个查询操作(也包括CUD及其它操作)的话,那么它就会调用assembleResponse方法来进行相关操作的处理,该方法声明如下(instance.cpp第224行):
        
        // 直接请求包括'end'字符,则返回false    
        void assembleResponse( Message &/*客户端传来的(操作)信息*/,
                               DbResponse 
    &dbresponse,/*响应结构体,用于绑定要响应的数据及状态*/
                               
    const SockAddr &client ) {

        
    // 获取操作符枚举信息
        int op = m.operation();

        
        注:枚举定义如下
        enum Operations {
            opReply 
    = 1,     /* reply. responseTo is set. */
            dbMsg 
    = 1000,    /* generic msg command followed by a string */
            dbUpdate 
    = 2001/* update object */
            dbInsert 
    = 2002,  //dbGetByOID = 2003,
            dbQuery = 2004,
            dbGetMore 
    = 2005,
            dbDelete 
    = 2006,
            dbKillCursors 
    = 2007
        };


          接着它会判断是否为$cmd命令,即以.$cmd为开头,形如 db.$cmd.findOne({getlasterror: 1}),并对一些特殊指令进行单独处理,包括inprog,killop,unlock。
            bool isCommand = false;
            
    const char *ns = m.singleData()->_data + 4;
            
    if ( op == dbQuery ) {
                
    if( strstr(ns, ".$cmd") ) {
                    isCommand 
    = true;
                    opwrite(m);
                    
    if( strstr(ns, ".$cmd.sys.") ) {
                        
    if( strstr(ns, "$cmd.sys.inprog") ) {
                            inProgCmd(m, dbresponse);
                            
    return;
                        }
                        
    if( strstr(ns, "$cmd.sys.killop") ) {
                            killOp(m, dbresponse);
                            
    return;
                        }
                        
    if( strstr(ns, "$cmd.sys.unlock") ) {
                            unlockFsync(ns, m, dbresponse);
                            
    return;
                        }
                    }
                }
                
    else {
                    opread(m);
                }
            }
            
    else if( op == dbGetMore ) {
                opread(m);
            }


          接着就是获取当前线程连接的客户端对象,如下:
        Client& c = cc();

          该方法实现代码如下:
        /** get the Client object for this thread. */
        inline Client
    & cc() {
            Client 
    * c = currentClient.get();
            assert( c );
            
    return *c;
        }

         其主要用内联函数方式获取当前客户端操作的线程信息,而该线程默认就是上一篇文章中所创建的那个:
        
        Client::initThread("initandlisten");


         因为mongodb会为每一个客户端DB操作创建一个线程 Client对象,我个人把它理解为服务端持有的对应(每)客户端的操作对象。其主体函数如下:

        boost::thread_specific_ptr<Client> currentClient; //thread_specific_ptr对象为每个线程保持一个指针,每个线程都应该new出一个对 象交给thread_specific_ptr,当线程终结时,该对象释放。

        
    /* each thread which does db operations has a Client object in TLS.
           call this when your thread starts.
        
    */
        Client
    & Client::initThread(const char *desc, MessagingPort *mp) {
            assert( currentClient.
    get() == 0 );
            Client 
    *= new Client(desc, mp);
            currentClient.reset(c);
            mongo::lastError.initThread();
            
    return *c;
        }


         我们再回到assembleResponse函数,接下来的代码就是使用CurOp(一个提供了内部锁机制来保存当前客户端操作状态的对象)来把当前Client对象及相应操作(CRUD等)封装于其中,这样当以访问该对象进行原子操作时(Atomic)就可以通过其内置支持多线程并发访问和锁保护了。
        
        CurOp* currentOpP = c.curop();
        ......
        CurOp
    & currentOp = *currentOpP;
        currentOp.reset(client,op);

        OpDebug
    & debug = currentOp.debug();
        StringBuilder
    & ss = debug.str;
        ss 
    << opToString( op ) << " ";

        
    int logThreshold = cmdLine.slowMS;
        
    bool log = logLevel >= 1;


        接着就是执行查询语句,也就是我们今天的主角“隆重登场”了,如下:
       
    if ( op == dbQuery ) {
            
    if ( handlePossibleShardedMessage( m , &dbresponse ) ) /*查看是不是sharding状态(查询),如果是则返回啊*/
                
    return;
            receivedQuery(c , dbresponse, m ); 
    /*执行查询 */
        }
        
    else if ( op == dbGetMore ) {
           ......
        
    else {
            
    const char *ns = m.singleData()->_data + 4;
            
    char cl[256];
            nsToDatabase(ns, cl);
            
    //进行权限认证
            if! c.getAuthenticationInfo()->isAuthorized(cl) ) {
                uassert_nothrow(
    "unauthorized");
            }
            
    else {
                
    try {
                    
    if ( op == dbInsert ) {  //添加记录操作
                        receivedInsert(m, currentOp);
                    }
                    
    else if ( op == dbUpdate ) { //更新记录
                        receivedUpdate(m, currentOp);
                    }
                    
    else if ( op == dbDelete ) { //删除记录
                        receivedDelete(m, currentOp);
                    }
                    
    else if ( op == dbKillCursors ) { //删除Cursors(游标)对象
                        currentOp.ensureStarted();
                        logThreshold 
    = 10;
                        ss 
    << "killcursors ";
                        receivedKillCursors(m);
                    }
                    
    else {
                        mongo::log() 
    << "    operation isn't supported: " << op << endl;
                        currentOp.done();
                        log 
    = true;
                    }
                }
              .....
            }
        }

          上面代码中receivedQuery(c , dbresponse, m )就是执行查询功能的,而其它else分支特别是cud操作等我会专门再写文章加以解释,因为今天主要介绍查询功能,所以我们接着会分析该方法的执行逻辑,如下:

        static bool receivedQuery(Client& c, DbResponse& dbresponse, Message& m ) {
            
    bool ok = true;
            MSGID responseTo 
    = m.header()->id; /*从message提取id信息用于绑定到dbresponse.responseTo*/

            DbMessage d(m);
    /*对Message进行封装,从而初始化DbMessage实例*/
            QueryMessage q(d);
            auto_ptr
    < Message > resp( new Message() );

            CurOp
    & op = *(c.curop()); //获取当前Client线程对象执行的操作(支持线程安全)

            
    try {
                dbresponse.exhaust 
    = runQuery(m, q, op, *resp); /*执行查询*/
                assert( 
    !resp->empty() );
            }
            
    catch ( AssertionException& e ) {
               ......
            }
            ......

            dbresponse.response 
    = resp.release();
            dbresponse.responseTo 
    = responseTo;

            
    return ok;
        }


        上面代码主要实始化一些查询对象,包括数据库操作消息(用于数据库/服务协议),查询结果消息(QueryMessage,即运行查询的请求所接收到的来自数据库的信息)等。最后运行runQuery(m, q, op, *resp)开始查询(方法位于query.cpp),该方法代码较长,主要功能如下:

         const char *runQuery(Message& m, QueryMessage& q, CurOp& curop, Message &result) {
            StringBuilder
    & ss = curop.debug().str;
            
    //构造ParsedQuery查询对象,该对象包括查询记录数字,以及记录跳转偏移量等信息,这些值会在访问磁盘查询时使用,用法参见:query.cpp 662行的virtual void _init()方法
            shared_ptr<ParsedQuery> pq_shared( new ParsedQuery(q) );
            ParsedQuery
    & pq( *pq_shared );
            ......
            
    //对查询命令判断,指令形如abc.$cmd.findOne( { ismaster:1 } )
            if ( pq.couldBeCommand() ) {
                BufBuilder bb;
                bb.skip(
    sizeof(QueryResult));
                BSONObjBuilder cmdResBuf;
                
    //对查询权限判断,并执行相应查询指令
                if ( runCommands(ns, jsobj, curop, bb, cmdResBuf, false, queryOptions) ) {
                    ss 
    << " command: ";
                    jsobj.toString( ss );
                    curop.markCommand();
                    auto_ptr
    < QueryResult > qr;
                    qr.reset( (QueryResult 
    *) bb.buf() );
                    bb.decouple();
                    qr
    ->setResultFlagsToOk();
                    qr
    ->len = bb.len();
                    ss 
    << " reslen:" << bb.len();
                    qr
    ->setOperation(opReply);
                    qr
    ->cursorId = 0;
                    qr
    ->startingFrom = 0;
                    qr
    ->nReturned = 1;
                    result.setData( qr.release(), 
    true );
                }
                
    else {
                    uasserted(
    13530"bad or malformed command request?");
                }
                
    return 0;
            }

            
    /* 普通查询分支(非指令式操作,也就是我们用c#客户端链接查询方式)*/
            ......
            BSONObj order 
    = pq.getOrder();
            BSONObj query 
    = pq.getFilter();

            
    /* 对查询对象大小进行判断,过滤错误的查询对象(为0)*/
            
    if ( query.objsize() == 0 ) {
                
    out() << "Bad query object?\n  jsobj:";
                
    out() << jsobj.toString() << "\n  query:";
                
    out() << query.toString() << endl;
                uassert( 
    10110 , "bad query object"false);
            }

            
    /* 声明读锁 */
            mongolock lk(
    false);

            Client::Context ctx( ns , dbpath , 
    &lk );
            ......
            
    /*对查询对象及选项进行过滤, 比如:ns,_id值,索引等*/
            
    if ( ! (explain || pq.showDiskLoc()) && isSimpleIdQuery( query ) && !pq.hasOption( QueryOption_CursorTailable ) ) {
                
    bool nsFound = false;
                
    bool indexFound = false;

                BSONObj resObject;
                Client
    & c = cc();
                
    bool found = Helpers::findById( c, ns , query , resObject , &nsFound , &indexFound );
                
    if ( nsFound == false || indexFound == true ) {
                    ......
                    
    return false;
                }
            }
            .....
            
    //定义扫描器(类定义位于queryoptimizer.h),在看代码过程中发现MultiPlanScanner主要用于提供$or查询支持,
            
    //语法形如:db.foo.find( { name : "bob" , $or : [ { a : 1 } , { b : 2 } ] } )
            
    //更多内容参见:http://www.mongodb.org/display/DOCS/OR+operations+in+query+expressions
            auto_ptr< MultiPlanScanner > mps( new MultiPlanScanner( ns, query, order, &hint, !explain, pq.getMin(), pq.getMax(), falsetrue ) );
            .....
            ExplainBuilder eb;
            UserQueryOp original( pq, result, eb, curop );
            shared_ptr
    < UserQueryOp > o = mps->runOp( original ); /*执行查询*/

            UserQueryOp 
    &dqo = *o;
            
    if ( ! dqo.complete() )
                
    throw MsgAssertionException( dqo.exception() );
            
    if ( explain ) {
                dqo.finishExplain( explainSuffix );
            }
            ......

            
    /*设置查询结果*/
            QueryResult 
    *qr = (QueryResult *) result.header();
            qr
    ->cursorId = cursorid;
            qr
    ->setResultFlagsToOk();
            
    // qr->len is updated automatically by appendData()
            ss << " reslen:" << qr->len;
            qr
    ->setOperation(opReply);
            qr
    ->startingFrom = 0;
            qr
    ->nReturned = n;

            
    /*查询耗时统计*/
            
    int duration = curop.elapsedMillis();
            
    bool dbprofile = curop.shouldDBProfile( duration );
            
    if ( dbprofile || duration >= cmdLine.slowMS ) {
                ss 
    << " nscanned:" << nscanned << ' ';
                
    if ( ntoskip )
                    ss 
    << " ntoskip:" << ntoskip;
                
    if ( dbprofile )
                    ss 
    << " \nquery: ";
                ss 
    << jsobj.toString() << ' ';
            }
            ss 
    << " nreturned:" << n;
            
    return exhaust;
        }


          读到这里,发现系统除了把查询消息之类的信息一股脑塞给了MultiPlanScanner之后就运行了runOP方法之外,竟然还没看到mongodb是如果查询数据库文件的,看来mongodb还真挺卖关子,没变法,只有继续往下挖代码,下面是runOP方法代码(位于queryoptimizer.cpp文件730行):

        shared_ptr< QueryOp > MultiPlanScanner::runOp( QueryOp &op ) {
            shared_ptr
    < QueryOp > ret = runOpOnce( op ); /*先运行一次查询*/
            
    while!ret->stopRequested() && mayRunMore() ) { /*当前查询请求未停止并且有$or查询关键字时*/
                ret 
    = runOpOnce( *ret );//再次运行查询
            }
            
    return ret;
        }


        看来runOpOnce方法是用于进行单次非or查询的,看一下代码就明白了,如下:
       
    shared_ptr< QueryOp > MultiPlanScanner::runOpOnce( QueryOp &op ) {
            massert( 
    13271"can't run more ops", mayRunMore() );
            
    if ( !_or ) { /*如当前查询不是or,则运行*/
                
    ++_i;
                
    return _currentQps->runOp( op );
            }
            
    ++_i;
            auto_ptr
    < FieldRangeSet > frs( _fros.topFrs() );/*(表)字段对象集合*/
            auto_ptr
    < FieldRangeSet > originalFrs( _fros.topFrsOriginal() );
            BSONElement hintElt 
    = _hint.firstElement();
            
    //创建查询计划集合
            _currentQps.reset( new QueryPlanSet( _ns, frs, originalFrs, _query, BSONObj(), &hintElt, _honorRecordedPlan, BSONObj(), BSONObj(), _bestGuessOnly, _mayYield ) );
            
    //设置查询计划要调用的查询方法及相关参数
            shared_ptr< QueryOp > ret( _currentQps->runOp( op ) );
            
    if ( ret->qp().willScanTable() ) {/*设置表扫描标识*/
                _tableScanned 
    = true;
            }
            
    //pop出or谓词/子句
            _fros.popOrClause( ret->qp().indexed() ? ret->qp().indexKey() : BSONObj() );
            
    return ret;
        }

        上面方面最终都是调用 _currentQps->runOp( op )来执行查询操作,下面就是方法的代码:    

        shared_ptr< QueryOp > QueryPlanSet::runOp( QueryOp &op ) {
            
    if ( _usingPrerecordedPlan ) { /*该变量貌似“是否使用预先记录的计划”,也就是索引*/
                Runner r( 
    *this, op );
                shared_ptr
    < QueryOp > res = r.run();
                ......
            }
            Runner r( 
    *this, op );
            
    return r.run();
        }


        上面代码主要是定义声明Runner实例并运行它,Runner本身为strcut类型,主要是用于对执行步骤进行封装(形成依次执行的操作流),这里不再多述了。下面是其r.run()方法的定义:
        
         shared_ptr< QueryOp > QueryPlanSet::Runner::run() {
            ......
            
    for( vector< shared_ptr< QueryOp > >::iterator i = ops.begin(); i != ops.end(); ++i ) {
                initOp( 
    **i ); //初始化操作,声明如下
                if ( (*i)->complete() )
                    
    return *i;
            }
            ......
        }

        
    void QueryPlanSet::Runner::initOp( QueryOp &op ) {
            GUARD_OP_EXCEPTION( op, op.init() );
        }


        上面op.init操作主要最终会执行下面方法(位于query.cpp 662行), 该方法会用查询条件构造一个游标,该游标记录着遍历数据集方式,查询起始位置等信息等
        
         virtual void _init() {
               ......
                    _c 
    = qp().newCursor( DiskLoc() , _pq.getNumToReturn() + _pq.getSkip() );/*构造*/
                    _capped 
    = _c->capped();

                    
    // setup check for if we can only use index to extract
                    if ( _c->modifiedKeys() == false && _c->isMultiKey() == false && _pq.getFields() ) {
                        _keyFieldsOnly.reset( _pq.getFields()
    ->checkKey( _c->indexKeyPattern() ) );
                    }
                }
              ......        
        }


        下面是其函数的代码(queryoptimizer.cpp 168 行):        
        
        shared_ptr<Cursor> QueryPlan::newCursor( const DiskLoc &startLoc , int numWanted ) const {
             .....
            
    if ( !_index ) { //非索引扫描
                if ( _fbs.nNontrivialRanges() )
                    checkTableScanAllowed( _fbs.ns() );
                
    return findTableScan( _fbs.ns(), _order, startLoc ); /*进行表扫描*/
            }
            .....        
        }


        findTableScan方法(pdfile.cpp 687行)即开始表扫描指定磁盘位置信息,并根据相关条件指定相应类型的游标信息。  
       
    shared_ptr<Cursor> findTableScan(const char *ns, const BSONObj& order, const DiskLoc &startLoc) {
            BSONElement el 
    = order.getField("$natural"); // e.g., { $natural : -1 }

            
    if ( el.number() >= 0 )
                
    return DataFileMgr::findAll(ns, startLoc);  /*startLoc开始位置*/

            ......
        }

        返回的游标类型为Cursor,但findAll方法里构造的是BasicCursor,相应代码(pdfile.cpp 639行):
        shared_ptr<Cursor> DataFileMgr::findAll(const char *ns, const DiskLoc &startLoc) {
            NamespaceDetails 
    * d = nsdetails( ns );
            
    if ( ! d )
                
    return shared_ptr<Cursor>(new BasicCursor(DiskLoc()));
            .....
            
            
    return shared_ptr<Cursor>(new BasicCursor( e->firstRecord ));
        }


        BasicCursor构造函数比较有意思,其引入了AdvanceStrategy对象指针,这个策略指针定义访问物理磁盘文件的方式,其操作单元是DiskLoc(DiskLoc实例对象实际是一个双向链接),访问方法虽然只有next一种,但mongodb却用它实现了向前和后转两种访问方式(详情参见cursor.cpp),如下:
        BasicCursor(DiskLoc dl, const AdvanceStrategy *_s = forward()) : curr(dl), s( _s ), _nscanned() {
                incNscanned();
                init();
        }

         
    /* these will be used outside of mutexes - really functors - thus the const */
        
    class Forward : public AdvanceStrategy {
            
    virtual DiskLoc next( const DiskLoc &prev ) const {
                
    return prev.rec()->getNext( prev );
            }
        } _forward;

        
    class Reverse : public AdvanceStrategy {
            
    virtual DiskLoc next( const DiskLoc &prev ) const {
                
    return prev.rec()->getPrev( prev );
            }
        } _reverse;


        上面的 prev.rec()方法调用最终会执行下面函数调用流程:
      
        //pdfile.h
        inline Record* DiskLoc::rec() const {
            
    return DataFileMgr::getRecord(*this);
        }
        inline Record
    * DataFileMgr::getRecord(const DiskLoc& dl) {
            assert( dl.a() 
    != -1 );
            
    return cc().database()->getFile(dl.a())->recordAt(dl);
        }
        
         而最后“cc().database()->getFile(dl.a())->recordAt(dl)”方法会最终从数据库文件 mongodfile中获取记录信息(详见database.cpp):
        //pdfile.h
        inline Record* MongoDataFile::recordAt(DiskLoc dl) {
            
    int ofs = dl.getOfs();
            
    if( ofs < DataFileHeader::HeaderSize ) badOfs(ofs); // will uassert - external call to keep out of the normal code path
            return (Record*) (p()+ofs);
        }

        
         兜了一大圈,头都快大了,不是吗?呵呵。另外Record,DiskLoc这两个与数据访问/存储相关类以后会抽时间介绍。

         好了,今天的内容到这里就告一段落了,在接下来的文章中,将会介绍客户端发起Insert操作时,Mongodb的执行流程和B树的相应部分实现。

        原文链接:http://www.cnblogs.com/daizhj/archive/2011/03/18/1988288.html
        作者: daizhj, 代震军   
        微博: http://t.sina.com.cn/daizhj
       
    Tags: mongodb,c++,source code
  • 相关阅读:
    Docker容器常用操作命令(镜像的上传、下载、导入、导出、创建、删除、修改、启动等)详解
    [20191213]toad 12下BIND_AWARE提示无效.txt
    [20191206]隐含参数_db_always_check_system_ts.txt
    [20191127]表 full Hash Value的计算.txt
    [20191127]探究等待事件的本源4.txt
    [20191126]探究等待事件的本源2.txt
    [20191125]oracel SQL parsing function qcplgte 2.txt
    [20191125]探究等待事件的本源.txt
    [20191122]oracel SQL parsing function qcplgte.txt
    [20191119]探究ipcs命令输出2.txt
  • 原文地址:https://www.cnblogs.com/daizhj/p/1988288.html
Copyright © 2011-2022 走看看