由于没时间搞mongodb透传,之前写mongodb的proxy程序时候都是用自带的驱动(mongoclient),然后自己定协议,让其他语言比如php、c++、java、C#去实现,然后做通讯,用了一段时间也没什么问题,但是总觉得很不完美,最近终于个人时间充裕一些,打开mongodb的源码,对应网上牛人的源码解析,开始实现mongodb的协议透传,无废话,上代码:
#include <stdio.h> #include <sys/wait.h> #include "mongo/util/net/message_port.h" #include "mongo/util/net/message_server.h" #include "DzhMessageServer.h" #include "DzhMessageHandler.h" using namespace std; using namespace mongo; int main(int argc, const char *argv[]) { MessageServer::Options opts; opts.port = 28686; opts.ipList = "10.15.107.155"; DzhMessageHandler handler; DzhMessageServer svr(opts, &handler); svr.run(); return 0; }
这是proxy服务的代码入口,mongodb的服务端是基于boost::asio的,所以这段代码看着眼熟吧,DzhMessageServer 实现的是网络服务,DzhMessageHandler实现的是事件处理,MessageServer::Options.port是服务端口,MessageServer::Options.ipList是本机IP地址,代码清晰,一看就懂。
#pragma once #include "mongo/util/net/message.h" #include "mongo/util/net/message_port.h" #include "mongo/util/net/message_server.h" #include "mongo/util/net/listen.h" #include "mongo/db/lasterror.h" #include <boost/thread.hpp> using namespace mongo; using namespace boost; class DzhMessageServer:public MessageServer , public Listener { public: DzhMessageServer(const MessageServer::Options& opts, MessageHandler* handler): Listener("" , opts.ipList, opts.port), m_handler(handler) { } void HandleIncomingMsg(void* arg) { scoped_ptr<HandleIncomingMsgParam> himArg(static_cast<HandleIncomingMsgParam*>(arg)); MessagingPort* inPort = himArg->inPort; MessageHandler* handler = himArg->handler; inPort->psock->setLogLevel(1); scoped_ptr<MessagingPort> p( inPort ); string otherSide; Message m; try { LastError * le = new LastError(); lastError.reset( le ); // lastError now has ownership otherSide = p->psock->remoteString(); handler->connected( p.get() ); while ( ! inShutdown() ) { m.reset(); p->psock->clearCounters(); if ( ! p->recv(m) ) { if( !cmdLine.quiet ){ int conns = 0;//Listener::globalTicketHolder.used()-1; const char* word = (conns == 1 ? " connection" : " connections"); log() << "end connection " << otherSide << " (" << conns << word << " now open)" << endl; } p->shutdown(); break; } handler->process( m , p.get() , le ); //networkCounter.hit( p->psock->getBytesIn() , p->psock->getBytesOut() ); } } catch ( AssertionException& e ) { log() << "AssertionException handling request, closing client connection: " << e << endl; p->shutdown(); } catch ( SocketException& e ) { log() << "SocketException handling request, closing client connection: " << e << endl; p->shutdown(); } catch ( const DBException& e ) { // must be right above std::exception to avoid catching subclasses log() << "DBException handling request, closing client connection: " << e << endl; p->shutdown(); } catch ( std::exception &e ) { error() << "Uncaught std::exception: " << e.what() << ", terminating" << endl; dbexit( EXIT_UNCAUGHT ); } catch ( ... ) { error() << "Uncaught exception, terminating" << endl; dbexit( EXIT_UNCAUGHT ); } handler->disconnected( p.get() ); } void run() { initAndListen(); } void setAsTimeTracker() { Listener::setAsTimeTracker(); } void acceptedMP(MessagingPort *p) { cout << "acceptedMP start" << endl; HandleIncomingMsgParam* himParam = new HandleIncomingMsgParam(p, m_handler); boost::thread thr(boost::bind(&DzhMessageServer::HandleIncomingMsg, this, himParam)); cout << "acceptedMP end" << endl; } private: MessageHandler* m_handler; struct HandleIncomingMsgParam { HandleIncomingMsgParam(MessagingPort* inPort, MessageHandler* handler): inPort(inPort), handler(handler) { } MessagingPort* inPort; MessageHandler* handler; }; };
这里实现的基本上是消息协议格式的获取和转换,具体处理在DzhMessageHandler中
#pragma once #include "mongo/util/net/message_server.h" #include "mongo/db/dbmessage.h" using namespace mongo; class DzhMessageHandler:public MessageHandler { public: DzhMessageHandler() { } void connected( AbstractMessagingPort* p ) { cout << "connected start" << endl; MessagingPort* portptr = (MessagingPort*)p; if(portptr) { cout << "portptr->remote():" << portptr->remote().toString() << endl; } else { cout << "connected error!" << endl; } cout << "connected end" << endl; } void process( Message& m , AbstractMessagingPort* p , LastError * err ) { cout << "process start" << endl; cout << "process Message:" << m.toString() << endl; DbMessage dm(m); QueryMessage q(dm); try { cout << "ns:" << q.ns << ".return:" << q.ntoreturn << endl; cout << "query:" << q.query.toString(0,0) << endl; BSONObj ret; if(q.query.binaryEqual(BSON("ping" << 1)) || q.query.binaryEqual(BSON("ismaster" << 1))) { ret = BSON("ok" << 1); } else if(q.query.binaryEqual(BSON("buildinfo" << 1))) { vector<string> arrVersion; arrVersion.push_back("2"); arrVersion.push_back("2"); arrVersion.push_back("0"); arrVersion.push_back("0"); ret = BSON( "version" << "2.2.0" << "gitVersion" << "f5e83eae9cfbec7fb7a071321928f00d1b0c5207" << "sysInfo" << "Linux ip-10-2-29-40 2.6.21.7-2.ec2.v1.2.fc8xen #1 SMP Fri Nov 20 17:48:28 EST 2009 x86_64 BOOST_LIB_VERSION=1_49" << "versionArray" << arrVersion << "bits" << 64 << "maxBsonObjectSize" << 16777216 << "ok" << 1); } BufBuilder buf; ret.appendSelfToBufBuilder(buf); replyToQuery(0, p, m, buf.buf(), buf.len(), 1, 0, 0); } catch( AssertionException & e ) { // if ( r.expectResponse() ) { // BSONObj err = BSON( "$err" << e.what() << "code" << e.getCode() ); // replyToQuery( ResultFlag_ErrSet, p , m , err ); // } } catch ( DBException& e ) { log() << "DBException in process: " << e.what() << endl; // le->raiseError( e.getCode() , e.what() ); // m.header()->id = r.id(); // if ( r.expectResponse() ) { // BSONObjBuilder b; // b.append("$err",e.what()).append("code",e.getCode()); // if( !e._shard.empty() ) { // b.append("shard",e._shard); // } // replyToQuery( ResultFlag_ErrSet, p , m , b.obj() ); // } } cout << "process end" << endl; } void disconnected( AbstractMessagingPort* p ) { cout << "disconnected start" << endl; cout << "disconnected end" << endl; } };
处理核心在process中,里面是我写的测试代码,嫌碍眼的可以删掉,为什么会有ping、ismaster、buildinfo命令的捕获呢,我在跟踪C#驱动与Mongodb服务交互的时候,C#驱动会首先提交这三个命令,待确认均ok后,然后才发具体的find命令做查询动作,当然我这里只是实验性的,你也可以用Scope直接从mongodb查结果。
需要的库文件:
mongoclient boost_system boost_thread boost_program_options boost_date_time boost_filesystem
有什么问题可以联系我
Q:7984759