zoukankan      html  css  js  c++  java
  • mongodb协议透传

    由于没时间搞mongodb透传,之前写mongodb的proxy程序时候都是用自带的驱动(mongoclient),然后自己定协议,让其他语言比如php、c++、java、C#去实现,然后做通讯,用了一段时间也没什么问题,但是总觉得很不完美,最近终于个人时间充裕一些,打开mongodb的源码,对应网上牛人的源码解析,开始实现mongodb的协议透传,无废话,上代码:

    #include <stdio.h>
    #include <sys/wait.h>
    #include "mongo/util/net/message_port.h"
    #include "mongo/util/net/message_server.h"
    #include "DzhMessageServer.h"
    #include "DzhMessageHandler.h"
    
    using namespace std;
    using namespace mongo;
    
    int main(int argc, const char *argv[]) 
    {
        MessageServer::Options opts;
        opts.port = 28686;
        opts.ipList = "10.15.107.155";
        
        DzhMessageHandler handler;        
        DzhMessageServer svr(opts, &handler);    
        svr.run();
    
        return 0;
    }

    这是proxy服务的代码入口,mongodb的服务端是基于boost::asio的,所以这段代码看着眼熟吧,DzhMessageServer 实现的是网络服务,DzhMessageHandler实现的是事件处理,MessageServer::Options.port是服务端口,MessageServer::Options.ipList是本机IP地址,代码清晰,一看就懂。

    #pragma once
    
    #include "mongo/util/net/message.h"
    #include "mongo/util/net/message_port.h"
    #include "mongo/util/net/message_server.h"
    #include "mongo/util/net/listen.h"
    #include "mongo/db/lasterror.h"
    #include <boost/thread.hpp>
    
    using namespace mongo;
    using namespace boost;
    
    class DzhMessageServer:public MessageServer , public Listener
    {
    public:
        DzhMessageServer(const MessageServer::Options& opts, MessageHandler* handler):
                                Listener("" , opts.ipList, opts.port), 
                                m_handler(handler) 
        {
            
        }
        
        void HandleIncomingMsg(void* arg)
        {
            scoped_ptr<HandleIncomingMsgParam> himArg(static_cast<HandleIncomingMsgParam*>(arg));
            MessagingPort* inPort = himArg->inPort;
            MessageHandler* handler = himArg->handler;
            
            inPort->psock->setLogLevel(1);
            scoped_ptr<MessagingPort> p( inPort );
            
            string otherSide;
            
            Message m;
            try
            {
                LastError * le = new LastError();
                lastError.reset( le ); // lastError now has ownership
    
                otherSide = p->psock->remoteString();
            
                handler->connected( p.get() );
                
                while ( ! inShutdown() ) {
                    m.reset();
                    p->psock->clearCounters();
                    if ( ! p->recv(m) ) {
                        if( !cmdLine.quiet ){
                            int conns = 0;//Listener::globalTicketHolder.used()-1;
                            const char* word = (conns == 1 ? " connection" : " connections");
                            log() << "end connection " << otherSide << " (" << conns << word << " now open)" << endl;
                        }
                        p->shutdown();
                        break;
                    }
                    handler->process( m , p.get() , le );
                    //networkCounter.hit( p->psock->getBytesIn() , p->psock->getBytesOut() );
                }
            }
            catch ( AssertionException& e ) {
                log() << "AssertionException handling request, closing client connection: " << e << endl;
                p->shutdown();
            }
            catch ( SocketException& e ) {
                log() << "SocketException handling request, closing client connection: " << e << endl;
                p->shutdown();
            }
            catch ( const DBException& e ) { // must be right above std::exception to avoid catching subclasses
                log() << "DBException handling request, closing client connection: " << e << endl;
                p->shutdown();
            }
            catch ( std::exception &e ) {
                error() << "Uncaught std::exception: " << e.what() << ", terminating" << endl;
                dbexit( EXIT_UNCAUGHT );
            }
            catch ( ... ) {
                error() << "Uncaught exception, terminating" << endl;
                dbexit( EXIT_UNCAUGHT );
            }
            
            handler->disconnected( p.get() );
        }
        
        void run() 
        {
            initAndListen();
        }
        
        void setAsTimeTracker()
        {
            Listener::setAsTimeTracker();
        }
        
        void acceptedMP(MessagingPort *p)
        {
            cout << "acceptedMP start" << endl;
            
            HandleIncomingMsgParam* himParam = new HandleIncomingMsgParam(p, m_handler);
            boost::thread thr(boost::bind(&DzhMessageServer::HandleIncomingMsg, this, himParam));
            
            cout << "acceptedMP end" << endl;
        }
        
    private:
        MessageHandler* m_handler;
        
        struct HandleIncomingMsgParam {
            HandleIncomingMsgParam(MessagingPort* inPort,  MessageHandler* handler):
                inPort(inPort), handler(handler) {
            }
    
            MessagingPort* inPort;
            MessageHandler* handler;
        };
    };

    这里实现的基本上是消息协议格式的获取和转换,具体处理在DzhMessageHandler中

    #pragma once
    
    #include "mongo/util/net/message_server.h"
    #include "mongo/db/dbmessage.h"
    
    using namespace mongo;
    
    class DzhMessageHandler:public MessageHandler
    {
    public:
        DzhMessageHandler()
        {
            
        }
        
        void connected( AbstractMessagingPort* p )
        {
            cout << "connected start" << endl;
            MessagingPort* portptr = (MessagingPort*)p;
            if(portptr)
            {
                cout << "portptr->remote():" << portptr->remote().toString() << endl;
            }
            else
            {
                cout << "connected error!" << endl;
            }
            cout << "connected end" << endl;
        }
    
        void process( Message& m , AbstractMessagingPort* p , LastError * err )
        {
            cout << "process start" << endl;
            
            cout << "process Message:" << m.toString() << endl;
                    
            DbMessage dm(m);        
            QueryMessage q(dm);
            
            try
            {
                cout << "ns:" << q.ns << ".return:" << q.ntoreturn << endl;
                cout << "query:" << q.query.toString(0,0) << endl;
                
                BSONObj ret;
                if(q.query.binaryEqual(BSON("ping" << 1)) ||
                    q.query.binaryEqual(BSON("ismaster" << 1)))
                {
                    ret = BSON("ok" << 1);
                }
                else if(q.query.binaryEqual(BSON("buildinfo" << 1)))
                {
                    vector<string> arrVersion;
                    arrVersion.push_back("2");
                    arrVersion.push_back("2");
                    arrVersion.push_back("0");
                    arrVersion.push_back("0");
                    
                    ret = BSON( "version" << "2.2.0" <<
                                "gitVersion" << "f5e83eae9cfbec7fb7a071321928f00d1b0c5207" <<
                                "sysInfo" << "Linux ip-10-2-29-40 2.6.21.7-2.ec2.v1.2.fc8xen #1 SMP Fri Nov 20 17:48:28 EST 2009 x86_64 BOOST_LIB_VERSION=1_49" << 
                                "versionArray" << arrVersion <<
                                "bits" << 64 << "maxBsonObjectSize" << 16777216 << "ok" << 1);
                }
                
                BufBuilder buf;
                ret.appendSelfToBufBuilder(buf);
                
                replyToQuery(0, p, m, buf.buf(), buf.len(), 1, 0, 0);
                
            }
            catch( AssertionException & e )
            {
                // if ( r.expectResponse() ) {
                    // BSONObj err = BSON( "$err" << e.what() << "code" << e.getCode() );
                    // replyToQuery( ResultFlag_ErrSet, p , m , err );
                // }
            }
            catch ( DBException& e )
            {
                log() << "DBException in process: " << e.what() << endl;
    
                // le->raiseError( e.getCode() , e.what() );
    
                // m.header()->id = r.id();
    
                // if ( r.expectResponse() ) {
                    // BSONObjBuilder b;
                    // b.append("$err",e.what()).append("code",e.getCode());
                    // if( !e._shard.empty() ) {
                        // b.append("shard",e._shard);
                    // }
                    // replyToQuery( ResultFlag_ErrSet, p , m , b.obj() );
                // }
            }
            
            cout << "process end" << endl;
        }
    
        void disconnected( AbstractMessagingPort* p )
        {
            cout << "disconnected start" << endl;
            
            cout << "disconnected end" << endl;
        }
    };

    处理核心在process中,里面是我写的测试代码,嫌碍眼的可以删掉,为什么会有ping、ismaster、buildinfo命令的捕获呢,我在跟踪C#驱动与Mongodb服务交互的时候,C#驱动会首先提交这三个命令,待确认均ok后,然后才发具体的find命令做查询动作,当然我这里只是实验性的,你也可以用Scope直接从mongodb查结果。

    需要的库文件:

    mongoclient boost_system boost_thread boost_program_options boost_date_time boost_filesystem

    有什么问题可以联系我

    Q:7984759

  • 相关阅读:
    P/Invoke应用
    OC第八节——目录操作和文件管理
    OC第七节——内存管理
    OC第六节—— 继承与类别
    被拒原因——You have selected the Kids Category for your app, but it does not include the required privacy policy. Please update your app metadata to include a privacy policy URL and ensure that the URL yo
    OC第五节 ——点语法和@property
    OC第四节——NSDictionary和NSMutableDictionary
    OC第三节——NSArray和NSMutableArray
    OC第二节 —— NSString和NSMutableString
    OC第一节 —— 类和对象
  • 原文地址:https://www.cnblogs.com/sohoer2003/p/3088650.html
Copyright © 2011-2022 走看看