  • MongoDB源码分析——mongod数据查询操作

    源码版本为MongoDB 2.6分支




    class MyMessageHandler : public MessageHandler {
        virtual void process( Message& m , AbstractMessagingPort* port , LastError * le) {
            while ( true ) {
                DbResponse dbresponse;
                try {
                    assembleResponse( m, dbresponse, port->remote() );
                catch ( const ClockSkewException & ) {
                    log() << "ClockSkewException - shutting down" << endl;
                    exitCleanly( EXIT_CLOCK_SKEW );

    DbResponse dbresponse;封装了服务器处理消息后的响应数据。在进入数据处理分析之前先看一个枚举类型Operations ,Operations表示了所有MongoDB的操作 类型 :

    enum Operations {
        opReply = 1,     /* reply. responseTo is set. */
        dbMsg = 1000,    /* generic msg command followed by a string */
        dbUpdate = 2001, /* update object */
        dbInsert = 2002, //数据插入
        //dbGetByOID = 2003,
        dbQuery = 2004,  //数据查询
        dbGetMore = 2005, //可能是数据同步
        dbDelete = 2006, //数据删除
        dbKillCursors = 2007 //关闭cursor

    Message对象中封装了当前message的操作类型。之后本篇文章只分析dbQuery 部分,其他部分将会在其他文章中分析。
    可以看到process中调用了assembleResponse来处理消息并封装响应对象(DbResponse dbresponse),下面部分我们将分析assembleResponse函数:

    int op = m.operation();
        bool isCommand = false;
        DbMessage dbmsg(m);
        if ( op == dbQuery ) {
            const char *ns = dbmsg.getns();
            if (strstr(ns, ".$cmd")) {
                isCommand = true;
                if( strstr(ns, ".$cmd.sys.") ) {
                    if( strstr(ns, "$cmd.sys.inprog") ) {
                        inProgCmd(m, dbresponse);
                    if( strstr(ns, "$cmd.sys.killop") ) {
                        killOp(m, dbresponse);
                    if( strstr(ns, "$cmd.sys.unlock") ) {
                        unlockFsync(ns, m, dbresponse);
            else {
                opread(m); //如果不是命令则记录日志

    在阅读上面的代码之前首先要了解MongoDB源码里面的一个概念——namespace(缩写ns),一个ns代表一个collection和对应的db,一般表示为:”db name” + “.” + “collection name”,如果ns名称中包含“.$cmd”则表示当前操作为一个命令。所以上面代码先判断了是否为数据库命令,如果是则处理,然后返回。

        // Increment op counters.
        switch (op) {
        case dbQuery:
            if (!isCommand) {
            else {
                // Command counting is deferred, since it is not known yet whether the command
                // needs counting.
        if ( op == dbQuery ) {
            if ( handlePossibleShardedMessage( m , &dbresponse ) )
            receivedQuery(c , dbresponse, m );


    static bool receivedQuery(Client& c, DbResponse& dbresponse, Message& m ) {
        DbMessage d(m);
        QueryMessage q(d);
        auto_ptr< Message > resp( new Message() );
        CurOp& op = *(c.curop());
        try {
            NamespaceString ns(d.getns());
            cout << "receivedQuery NamespaceString : " << d.getns() << endl;
            if (!ns.isCommand()) {
                // Auth checking for Commands happens later.
                Client* client = &cc();
                Status status = client->getAuthorizationSession()->checkAuthForQuery(ns, q.query);
                audit::logQueryAuthzCheck(client, ns, q.query, status.code());
            dbresponse.exhaustNS = newRunQuery(m, q, op, *resp);
            verify( !resp->empty() );
        catch (...)
        return ok;


      * Run the query 'q' and place the result in 'result'.
    std::string newRunQuery(Message& m, QueryMessage& q, CurOp& curop, Message &result);


        const NamespaceString nsString(ns);
        uassert(16256, str::stream() << "Invalid ns [" << ns << "]", nsString.isValid());
        // Set curop information.
        curop.debug().ns = ns;
        curop.debug().ntoreturn = q.ntoreturn;
        curop.debug().query = q.query;
        // If the query is really a command, run it.
        if (nsString.isCommand()) {
            int nToReturn = q.ntoreturn;
            uassert(16979, str::stream() << "bad numberToReturn (" << nToReturn
                                         << ") for $cmd type ns - can only be 1 or -1",
                    nToReturn == 1 || nToReturn == -1);
            BufBuilder bb;
            BSONObjBuilder cmdResBuf;
            if (!runCommands(ns, q.query, curop, bb, cmdResBuf, false, q.queryOptions)) {
                uasserted(13530, "bad or malformed command request?");
            curop.debug().iscommand = true;
            // TODO: Does this get overwritten/do we really need to set this twice?
            curop.debug().query = q.query;
            QueryResult* qr = reinterpret_cast<QueryResult*>(bb.buf());
            qr->len = bb.len();
            curop.debug().responseLength = bb.len();
            qr->cursorId = 0;
            qr->startingFrom = 0;
            qr->nReturned = 1;
            result.setData(qr, true);
            return "";


        // This is a read lock.  We require this because if we're parsing a $where, the
        // where-specific parsing code assumes we have a lock and creates execution machinery that
        // requires it.
        Client::ReadContext ctx(q.ns);
        Collection* collection = ctx.ctx().db()->getCollection( ns );
        // Parse the qm into a CanonicalQuery.
        CanonicalQuery* cq;
        Status canonStatus = CanonicalQuery::canonicalize(q, &cq);
        if (!canonStatus.isOK()) {
            uasserted(17287, str::stream() << "Can't canonicalize query: " << canonStatus.toString());
        QLOG() << "Running query:
    " << cq->toString();
        LOG(2) << "Running query: " << cq->toStringShort();
        // Parse, canonicalize, plan, transcribe, and get a runner.
        Runner* rawRunner = NULL;
        // We use this a lot below.
        const LiteParsedQuery& pq = cq->getParsed();
        // We'll now try to get the query runner that will execute this query for us. There
        // are a few cases in which we know upfront which runner we should get and, therefore,
        // we shortcut the selection process here.
        // (a) If the query is over a collection that doesn't exist, we get a special runner
        // that's is so (a runner) which doesn't return results, the EOFRunner.
        // (b) if the query is a replication's initial sync one, we get a SingleSolutinRunner
        // that uses a specifically designed stage that skips extents faster (see details in
        // exec/oplogstart.h)
        // Otherwise we go through the selection of which runner is most suited to the
        // query + run-time context at hand.
        Status status = Status::OK();
        if (collection == NULL) {
            rawRunner = new EOFRunner(cq, cq->ns());
        else if (pq.hasOption(QueryOption_OplogReplay)) {
            status = getOplogStartHack(collection, cq, &rawRunner);
        else {
            // Takes ownership of cq.
            size_t options = QueryPlannerParams::DEFAULT;
            if (shardingState.needCollectionMetadata(pq.ns())) {
                options |= QueryPlannerParams::INCLUDE_SHARD_FILTER;
            status = getRunner(cq, &rawRunner, options);
        if (!status.isOK()) {
            // NOTE: Do not access cq as getRunner has deleted it.
            uasserted(17007, "Unable to execute query: " + status.reason());


        // This is a read lock.  We require this because if we're parsing a $where, the
        // where-specific parsing code assumes we have a lock and creates execution machinery that
        // requires it.
        Client::ReadContext ctx(q.ns);


     /** "read lock, and set my context, all in one operation" 
         *  This handles (if not recursively locked) opening an unopened database.
        class ReadContext : boost::noncopyable { 
            ReadContext(const std::string& ns, const std::string& path=storageGlobalParams.dbpath);
            Context& ctx() { return *c.get(); }
            scoped_ptr<Lock::DBRead> lk;
            scoped_ptr<Context> c;

    ReadContext 有点像一个代理或者是适配器,实际包含了一个Context对象,然后利用Lock::DBRead添加“读锁”操作。

    /** "read lock, and set my context, all in one operation" 
     *  This handles (if not recursively locked) opening an unopened database.
    Client::ReadContext::ReadContext(const string& ns, const std::string& path) {
            lk.reset( new Lock::DBRead(ns) );
            Database *db = dbHolder().get(ns, path);
            if( db ) {
                c.reset( new Context(path, ns, db) );
        // we usually don't get here, so doesn't matter how fast this part is
            if( Lock::isW() ) { 
                // write locked already
                DEV RARELY log() << "write locked on ReadContext construction " << ns << endl;
                c.reset(new Context(ns, path));
            else if( !Lock::nested() ) { 
                    Lock::GlobalWrite w;
                    Context c(ns, path);
                // db could be closed at this interim point -- that is ok, we will throw, and don't mind throwing.
                lk.reset( new Lock::DBRead(ns) );
                c.reset(new Context(ns, path));
            else { 
                uasserted(15928, str::stream() << "can't open a database from a nested read lock " << ns);



    lk.reset( new Lock::DBRead(ns) );
    c.reset(new Context(ns, path));


     // Parse the qm into a CanonicalQuery.
        CanonicalQuery* cq;
        Status canonStatus = CanonicalQuery::canonicalize(q, &cq);


        Runner* rawRunner = NULL;
        // We use this a lot below.
        const LiteParsedQuery& pq = cq->getParsed();
        // We'll now try to get the query runner that will execute this query for us. There
        // are a few cases in which we know upfront which runner we should get and, therefore,
        // we shortcut the selection process here.
        // (a) If the query is over a collection that doesn't exist, we get a special runner
        // that's is so (a runner) which doesn't return results, the EOFRunner.
        // (b) if the query is a replication's initial sync one, we get a SingleSolutinRunner
        // that uses a specifically designed stage that skips extents faster (see details in
        // exec/oplogstart.h)
        // Otherwise we go through the selection of which runner is most suited to the
        // query + run-time context at hand.
        Status status = Status::OK();
        if (collection == NULL) {
            rawRunner = new EOFRunner(cq, cq->ns());
        else if (pq.hasOption(QueryOption_OplogReplay)) {
            status = getOplogStartHack(collection, cq, &rawRunner);
        else {
            // Takes ownership of cq.
            size_t options = QueryPlannerParams::DEFAULT;
            if (shardingState.needCollectionMetadata(pq.ns())) {
                options |= QueryPlannerParams::INCLUDE_SHARD_FILTER;
            status = getRunner(cq, &rawRunner, options);


    IDHackRunner : 当前集合是以“_id”作为索引或者查询条件中包含”_id”时就使用此来查询。
    MultiPlanRunner:使用QueryPlanner来plan 查询条件,如果结果为多个QuerySolution,则使用此来执行查询。


        // Run the query.
        // bb is used to hold query results
        // this buffer should contain either requested documents per query or
        // explain information, but not both
        BufBuilder bb(32768);
        while (Runner::RUNNER_ADVANCED == (state = runner->getNext(&obj, NULL))) {
            // Add result to output buffer. This is unnecessary if explain info is requested
            if (!isExplain) {
                bb.appendBuf((void*)obj.objdata(), obj.objsize());
            // Count the result.



