zoukankan      html  css  js  c++  java
  • SequoiaDB 系列之七 :源码分析之catalog节点

    这一篇紧接着上一篇SequoiaDB 系列之六 :源码分析之coord节点来讲

    在上一篇中,分析了coord转发数据包到catalog节点(也有可能是data节点,视情况而定)。这一次,我们继续分析上一篇中的rtnCoordCMDListCollectionSpace的消息包被转发到catalog节点上的处理流程。

    catalog节点的进程,同样sequoiadb进程,只是角色不一样,运行的服务有区别。

    这里就不再赘述catalog节点的启动过程。

    在SequoiaDB/engine/cat/catalogueCB.cpp文件的最后,有代码:

    sdbCatalogueCB* sdbGetCatalogueCB()
    {
       static sdbCatalogueCB s_catacb ;
       return &s_catacb ;
    }

    当sdbGetCatalogueCB()第一次被调用的时候,会初始化sdbCatalogueCB的一个静态实例。
    我们来看看sdbCatalogueCB类:

    class sdbCatalogueCB : public _IControlBlock, public _IEventHander
       {
       public:
          friend class catMainController ;
    
          typedef std::map<UINT32, string>    GRP_ID_MAP;
          typedef std::map<UINT16, UINT16>    NODE_ID_MAP;
    
          public:
             sdbCatalogueCB() ;
             virtual ~sdbCatalogueCB() ;
    
              ...
    
             _netRouteAgent* netWork()
             {
                return _pNetWork;
             }
             catMainController* getMainController()
             {
                return &_catMainCtrl ;
             }
             catCatalogueManager* getCatlogueMgr()
             {
                return &_catlogueMgr ;
             }
             catNodeManager* getCatNodeMgr()
             {
                return &_catNodeMgr ;
             }
             catDCManager* getCatDCMgr()
             {
                return &_catDCMgr ;
             }
             catLevelLockMgr* getLevelLockMgr()
             {
                return &_levelLockMgr ;
             }
    
          private:
             _netRouteAgent       *_pNetWork ;
             _MsgRouteID          _routeID ;
             std::string          _strHostName ;
             std::string          _strCatServiceName ;
             NODE_ID_MAP          _nodeIdMap ;
             NODE_ID_MAP          _sysNodeIdMap ;
             GRP_ID_MAP           _grpIdMap ;
             GRP_ID_MAP           _deactiveGrpIdMap ;
             UINT16               _iCurNodeId ;
             UINT16               _curSysNodeId ;
             UINT32               _iCurGrpId ;
    
             catMainController    _catMainCtrl ;       // 这个是本次的重点
             catCatalogueManager  _catlogueMgr ;
             catNodeManager       _catNodeMgr ;
             catDCManager         _catDCMgr ;
             catLevelLockMgr      _levelLockMgr ;
       } ;

    在sdbCatalogueCB中,有一个成员变量,其类型是catMainController。
    这个类是这样声明的:

    class catMainController : public _pmdObjBase, public _netMsgHandler,
                                 public _netTimeoutHandler
    {
          ...
    
       public:
          INT32 handleMsg( const NET_HANDLE &handle,
                           const _MsgHeader *header,
                           const CHAR *msg ) ;
          void  handleClose( const NET_HANDLE &handle, _MsgRouteID id ) ;
    
          void  handleTimeout( const UINT32 &millisec, const UINT32 &id ) ;
    
       protected:
          virtual INT32 _defaultMsgFunc ( NET_HANDLE handle,
                                          MsgHeader* msg ) ;
    
          INT32 _processMsg( const NET_HANDLE &handle, MsgHeader *pMsg ) ;
    
          void  _dispatchDelayedOperation( BOOLEAN dispatch ) ;
    
       protected:
          INT32 _onActiveEvent( pmdEDUEvent *event ) ;
          INT32 _onDeactiveEvent( pmdEDUEvent *event ) ;
    
       protected :
          INT32 _processGetMoreMsg ( const NET_HANDLE &handle, MsgHeader *pMsg ) ;
          INT32 _processQueryDataGrp( const NET_HANDLE &handle, MsgHeader *pMsg ) ;
          INT32 _processQueryCollections( const NET_HANDLE &handle,
                                          MsgHeader *pMsg ) ;
          INT32 _processQueryCollectionSpaces ( const NET_HANDLE &handle,
                                                MsgHeader *pMsg ) ;
          INT32 _processQueryMsg( const NET_HANDLE &handle, MsgHeader *pMsg ) ;
          INT32 _processKillContext(const NET_HANDLE &handle, MsgHeader *pMsg ) ;
          INT32 _processAuthenticate( const NET_HANDLE &handle, MsgHeader *pMsg ) ;
          INT32 _processAuthCrt( const NET_HANDLE &handle, MsgHeader *pMsg ) ;
          INT32 _processAuthDel( const NET_HANDLE &handle, MsgHeader *pMsg ) ;
          INT32 _processCheckRouteID( const NET_HANDLE &handle, MsgHeader *pMsg ) ;
          INT32 _processInterruptMsg( const NET_HANDLE &handle,
                                      MsgHeader *header ) ;
          INT32 _processDisconnectMsg( const NET_HANDLE &handle,
                                       MsgHeader *header ) ;
          INT32 _processQueryRequest ( const NET_HANDLE &handle,
                                       MsgHeader *pMsg,
                                       const CHAR *pCollectionName ) ;
    
       protected:
          INT32 _postMsg( const NET_HANDLE &handle, const MsgHeader *pHead ) ;
          INT32 _catBuildMsgEvent ( const NET_HANDLE &handle,
                                    const MsgHeader *pMsg,
                                    pmdEDUEvent &event ) ;
          INT32 _ensureMetadata() ;
          INT32 _createSysIndex ( const CHAR *pCollection,
                                  const CHAR *pIndex,
                                  pmdEDUCB *cb ) ;
          INT32 _createSysCollection ( const CHAR *pCollection,
                                       pmdEDUCB *cb ) ;
          void _addContext( const UINT32 &handle, UINT32 tid, INT64 contextID ) ;
          void _delContextByHandle( const UINT32 &handle ) ;
          void _delContext( const UINT32 &handle, UINT32 tid ) ;
          void _delContextByID( INT64 contextID, BOOLEAN rtnDel ) ;
    
          ...
       } ;

    根据类的继承,可以猜想到这个类具备消息处理的能力。

    这里,我们不深究到怎么收到网络消息的,我们只管怎么去处理网络消息的 :)

    catMainController继承是_pmdObjBase的虚函数_defaultMsgFunc

    INT32 catMainController::_defaultMsgFunc( NET_HANDLE handle,
                                                 MsgHeader * msg )
       {
          INT32 rc = SDB_OK ;
    
          _isDelayed = FALSE ;
          _pCatCB->getCatDCMgr()->onCommandBegin( msg ) ;
    
          if ( MSG_CAT_CATALOGUE_BEGIN < (UINT32)msg->opCode &&
               (UINT32)msg->opCode < MSG_CAT_CATALOGUE_END )
          {
             rc = _pCatCB->getCatlogueMgr()->processMsg( handle, msg ) ;
          }
          else if  ( MSG_CAT_NODE_BEGIN < (UINT32)msg->opCode &&
                     (UINT32)msg->opCode < MSG_CAT_NODE_END )
          {
             rc = _pCatCB->getCatNodeMgr()->processMsg( handle, msg ) ;
          }
          else if ( MSG_CAT_DC_BEGIN < (UINT32)msg->opCode &&
                    (UINT32)msg->opCode < MSG_CAT_DC_END )
          {
             rc = _pCatCB->getCatDCMgr()->processMsg( handle, msg ) ;
          }
          else
          {
             rc = _processMsg( handle, msg ) ;
          }
    
          _pCatCB->getCatDCMgr()->onCommandEnd( msg, rc ) ;
          return rc ;
       }

    收到网络消息包后,交给对应的消息处理对象处理。而在coord节点上,交由rtnCoordCMDListCollectionSpace命令处理后的内部消息,消息类型是 MSG_CAT_CREATE_COLLECTION_SPACE_REQ如此,这个消息会交给

    rc = _pCatCB->getCatlogueMgr()->processMsg( handle, msg ) ; 

    处理。跟进去

    INT32 catCatalogueManager::processMsg( const NET_HANDLE &handle,
                                              MsgHeader *pMsg )
       {
          INT32 rc = SDB_OK;
          PD_TRACE_ENTRY ( SDB_CATALOGMGR_PROCESSMSG ) ;
          PD_TRACE1 ( SDB_CATALOGMGR_PROCESSMSG,
                      PD_PACK_INT ( pMsg->opCode ) ) ;
    
          switch ( pMsg->opCode )
          {
          case MSG_CAT_CREATE_COLLECTION_REQ :
          case MSG_CAT_DROP_COLLECTION_REQ :
          case MSG_CAT_CREATE_COLLECTION_SPACE_REQ :
          case MSG_CAT_DROP_SPACE_REQ :
          case MSG_CAT_ALTER_COLLECTION_REQ :
          case MSG_CAT_LINK_CL_REQ :
          case MSG_CAT_UNLINK_CL_REQ :
          case MSG_CAT_SPLIT_PREPARE_REQ :
          case MSG_CAT_SPLIT_READY_REQ :
          case MSG_CAT_SPLIT_CANCEL_REQ :
          case MSG_CAT_SPLIT_START_REQ :
          case MSG_CAT_SPLIT_CHGMETA_REQ :
          case MSG_CAT_SPLIT_CLEANUP_REQ :
          case MSG_CAT_SPLIT_FINISH_REQ :
          case MSG_CAT_CRT_PROCEDURES_REQ :
          case MSG_CAT_RM_PROCEDURES_REQ :
          case MSG_CAT_CREATE_DOMAIN_REQ :
          case MSG_CAT_DROP_DOMAIN_REQ :
          case MSG_CAT_ALTER_DOMAIN_REQ :
             {
                _pCatCB->getCatDCMgr()->setImageCommand( TRUE ) ;
                rc = processCommandMsg( handle, pMsg, TRUE ) ;
                break;
             }
          case MSG_CAT_QUERY_SPACEINFO_REQ :
             {
                rc = processCommandMsg( handle, pMsg, TRUE ) ;
                break;
             }
          case MSG_CAT_QUERY_CATALOG_REQ:
             {
                rc = processQueryCatalogue( handle, pMsg ) ;
                break;
             }
          case MSG_CAT_QUERY_TASK_REQ:
             {
                rc = processQueryTask ( handle, pMsg ) ;
                break ;
             }
          default:
             {
                rc = SDB_UNKNOWN_MESSAGE;
                PD_LOG( PDWARNING, "received unknown message (opCode: [%d]%u)",
                        IS_REPLY_TYPE(pMsg->opCode),
                        GET_REQUEST_TYPE(pMsg->opCode) ) ;
                break;
             }
          }
          PD_TRACE_EXITRC ( SDB_CATALOGMGR_PROCESSMSG, rc ) ;
          return rc;
       }

    该函数表明,大部分的消息(包括MSG_CAT_CREATE_COLLECTION_SPACE_REQ),都交由

    processCommandMsg( handle, pMsg, TRUE )

    处理去了。

    显然,processCommandMsg是重点,我们看一下其具体实现:

    INT32 catCatalogueManager::processCommandMsg( const NET_HANDLE &handle,
                                                     MsgHeader *pMsg,
                                                     BOOLEAN writable )
       {
          INT32 rc = SDB_OK ;
          MsgOpQuery *pQueryReq = (MsgOpQuery *)pMsg ;
    
          PD_TRACE_ENTRY ( SDB_CATALOGMGR_PROCESSCOMMANDMSG ) ;
          MsgOpReply replyHeader ;
          rtnContextBuf ctxBuff ;
    
          INT32      opCode = pQueryReq->header.opCode ;
          BOOLEAN    fillPeerRouteID = FALSE ;
    
          INT32 flag = 0 ;
          CHAR *pCMDName = NULL ;
          INT64 numToSkip = 0 ;
          INT64 numToReturn = 0 ;
          CHAR *pQuery = NULL ;
          CHAR *pFieldSelector = NULL ;
          CHAR *pOrderBy = NULL ;
          CHAR *pHint = NULL ;
    
          replyHeader.header.messageLength = sizeof( MsgOpReply ) ;
          replyHeader.contextID = -1 ;
          replyHeader.flags = SDB_OK ;
          replyHeader.numReturned = 0 ;
          replyHeader.startFrom = 0 ;
          _fillRspHeader( &(replyHeader.header), &(pQueryReq->header) ) ;
    
          if ( MSG_CAT_SPLIT_START_REQ == opCode ||
               MSG_CAT_SPLIT_CHGMETA_REQ == opCode ||
               MSG_CAT_SPLIT_CLEANUP_REQ == opCode ||
               MSG_CAT_SPLIT_FINISH_REQ == opCode )
          {
             fillPeerRouteID = TRUE ;
          }
    
          rc = msgExtractQuery( (CHAR*)pMsg, &flag, &pCMDName, &numToSkip,
                                &numToReturn, &pQuery, &pFieldSelector,
                                &pOrderBy, &pHint ) ;
          PD_RC_CHECK( rc, PDERROR, "Failed to extract query msg, rc: %d", rc ) ;
    
          if ( writable && !pmdIsPrimary() )
          {
             rc = SDB_CLS_NOT_PRIMARY ;
             PD_LOG ( PDWARNING, "Service deactive but received command: %s,"
                      "opCode: %d", pCMDName, pQueryReq->header.opCode ) ;
             goto error ;
          }
          else if ( _pCatCB->getCatDCMgr()->isImageCommand() &&
                    !_pCatCB->isDCActive() )
          {
             rc = SDB_CAT_CLUSTER_NOT_ACTIVE ;
             goto error ;
          }
    
          switch ( pQueryReq->header.opCode )
          {
             case MSG_CAT_CREATE_COLLECTION_REQ :
                rc = processCmdCreateCL( pQuery, ctxBuff ) ;
                break ;
             case MSG_CAT_CREATE_COLLECTION_SPACE_REQ :
                rc = processCmdCreateCS( pQuery, ctxBuff ) ;
                break ;
             case MSG_CAT_SPLIT_PREPARE_REQ :
             case MSG_CAT_SPLIT_READY_REQ :
             case MSG_CAT_SPLIT_CANCEL_REQ :
             case MSG_CAT_SPLIT_START_REQ :
             case MSG_CAT_SPLIT_CHGMETA_REQ :
             case MSG_CAT_SPLIT_CLEANUP_REQ :
             case MSG_CAT_SPLIT_FINISH_REQ :
                rc = processCmdSplit( pQuery, pQueryReq->header.opCode,
                                      ctxBuff ) ;
                break ;
             case MSG_CAT_QUERY_SPACEINFO_REQ :
                rc = processCmdQuerySpaceInfo( pQuery, ctxBuff ) ;
                break ;
             case MSG_CAT_DROP_COLLECTION_REQ :
                rc = processCmdDropCollection( pQuery, pQueryReq->version ) ;
                break ;
             case MSG_CAT_DROP_SPACE_REQ :
                rc = processCmdDropCollectionSpace( pQuery ) ;
                break ;
             case MSG_CAT_ALTER_COLLECTION_REQ :
                rc = processAlterCollection( pQuery, ctxBuff ) ;
                break ;
             case MSG_CAT_CRT_PROCEDURES_REQ :
                rc = processCmdCrtProcedures( pQuery ) ;
                break ;
             case MSG_CAT_RM_PROCEDURES_REQ :
                rc = processCmdRmProcedures( pQuery ) ;
                break ;
             case MSG_CAT_LINK_CL_REQ :
                rc = processCmdLinkCollection( pQuery, ctxBuff ) ;
                break;
             case MSG_CAT_UNLINK_CL_REQ :
                rc = processCmdUnlinkCollection( pQuery, ctxBuff );
                break;
             case MSG_CAT_CREATE_DOMAIN_REQ :
                rc = processCmdCreateDomain ( pQuery ) ;
                break ;
             case MSG_CAT_DROP_DOMAIN_REQ :
                rc = processCmdDropDomain ( pQuery ) ;
                break ;
             case MSG_CAT_ALTER_DOMAIN_REQ :
                rc = processCmdAlterDomain ( pQuery ) ;
                break ;
             default :
                rc = SDB_INVALIDARG ;
                PD_LOG( PDERROR, "Recieved unknow command: %s, opCode: %d",
                        pCMDName, pQueryReq->header.opCode ) ;
                break ;
          }
    
          PD_RC_CHECK( rc, PDERROR, "Process command[%s] failed, opCode: %d, "
                       "rc: %d", pCMDName, pQueryReq->header.opCode, rc ) ;
    
       done:
          if ( fillPeerRouteID )
          {
             replyHeader.header.routeID.value = pQueryReq->header.routeID.value ;
          }
    
          if ( 0 == ctxBuff.size() )
          {
             rc = _pCatCB->netWork()->syncSend( handle, (void*)&replyHeader ) ;
          }
          else
          {
             replyHeader.header.messageLength += ctxBuff.size() ;
             replyHeader.numReturned = ctxBuff.recordNum() ;
             rc = _pCatCB->netWork()->syncSend( handle, &(replyHeader.header),
                                                (void*)ctxBuff.data(),
                                                ctxBuff.size() ) ;
          }
          PD_TRACE_EXITRC ( SDB_CATALOGMGR_PROCESSCOMMANDMSG, rc ) ;
          return rc ;
       error:
          replyHeader.flags = rc ;
          goto done ;
       }

    该函数先初始化了一个reply消息的头部,然后针对不同的消息,做对应的处理。例如我们例子中的 MSG_CAT_CREATE_COLLECTION_SPACE_REQ消息,交给 processCmdCreateCS 函数处理了;

    PS:这个函数还分发了其它消息,如 创建Collection(createCollection)的消息,以及切分(split),删除Collection(dropCollection),删除CollectionSpace(dropCollectionSpace)等等操作,都会在catalog节点上有对应的处理(因为这类操作要修改元数据)。

    回到处理MSG_CAT_CREATE_COLLECTION_SPACE_REQ上,processCmdCreateCS函数主要是传递了一下参数,具体的操作,交给了_createCS函数处理

    INT32 catCatalogueManager::_createCS( BSONObj &createObj,
                                             UINT32 &groupID )
       {
          INT32 rc               = SDB_OK ;
          string strGroupName ;
    
          const CHAR *csName     = NULL ;
          const CHAR *domainName = NULL ;
          BOOLEAN isSpaceExist   = FALSE ;
          PD_TRACE_ENTRY ( SDB_CATALOGMGR__CREATECS ) ;
    
          catCSInfo csInfo ;
          BSONObj spaceObj ;
          BSONObj domainObj ;
          vector< UINT32 >  domainGroups ;
    
          rc = _checkCSObj( createObj, csInfo ) ;
          PD_RC_CHECK( rc, PDERROR, "Check create collection space obj[%s] failed,"
                       "rc: %d", createObj.toString().c_str(), rc ) ;
          csName = csInfo._pCSName ;
          domainName = csInfo._domainName ;
    
          rc = dmsCheckCSName( csName ) ;
          PD_RC_CHECK( rc, PDERROR, "Check collection space name[%s] failed, rc: "
                       "%d", csName, rc ) ;
    
          rc = catCheckSpaceExist( csName, isSpaceExist, spaceObj, _pEduCB ) ;
          PD_RC_CHECK( rc, PDERROR, "Failed to check collection space existed, rc: "
                       "%d", rc ) ;
          PD_TRACE1 ( SDB_CATALOGMGR_CREATECS, PD_PACK_INT ( isSpaceExist ) ) ;
          PD_CHECK( FALSE == isSpaceExist, SDB_DMS_CS_EXIST, error, PDERROR,
                    "Collection space[%s] is already existed", csName ) ;
    
          if ( domainName )
          {
             rc = catGetDomainObj( domainName, domainObj, _pEduCB ) ;
             PD_RC_CHECK( rc, PDERROR, "Failed to get domain[%s] obj, rc: %d",
                          domainName, rc ) ;
             rc = catGetDomainGroups( domainObj, domainGroups ) ;
             PD_RC_CHECK( rc, PDERROR, "Get domain[%s] groups failed, rc: %d",
                          domainObj.toString().c_str(), rc ) ;
          }
    
          rc = _assignGroup( &domainGroups, groupID ) ;
          PD_RC_CHECK( rc, PDERROR, "Assign group for collection space[%s] "
                       "failed, rc: %d", csName, rc ) ;
          catGroupID2Name( groupID, strGroupName, _pEduCB ) ;
    
          {
             BSONObjBuilder newBuilder ;
             newBuilder.appendElements( csInfo.toBson() ) ;
             BSONObjBuilder sub1( newBuilder.subarrayStart( CAT_COLLECTION ) ) ;
             sub1.done() ;
    
             BSONObj newObj = newBuilder.obj() ;
    
             rc = rtnInsert( CAT_COLLECTION_SPACE_COLLECTION, newObj, 1, 0,
                             _pEduCB, _pDmsCB, _pDpsCB, _majoritySize() ) ;
             PD_RC_CHECK( rc, PDERROR, "Failed to insert collection space obj[%s] "
                          " to collection[%s], rc: %d", newObj.toString().c_str(),
                          CAT_COLLECTION_SPACE_COLLECTION, rc ) ;
          }
    
       done:
          PD_TRACE_EXITRC ( SDB_CATALOGMGR__CREATECS, rc ) ;
          return rc ;
       error:
          goto done ;
       }

    这个函数会把网络消息的内容分解,然后对创建CollectionSpace的属性,选项做检查,检查通过之后,执行 rtnInsert 把一条记录插入 CAT_COLLECTION_SPACE_COLLECTION 所定义的系统表中;否则就返回检查出错时候的错误码,回复给corrd。具体插入过程,就不详细分析了。

    PS:catalog也是一个数据库,具备多张系统元数据的表(Collection),表上存放整个数据库集群的元数据消息,例如集群有哪些节点,节点地址,节点的ID,每个集群有哪些CollectionSpace,有哪些Collection,以及整个数据库有哪些Domain,有哪些user等等。catalog描述是整个数据库集群的环境。

    其它的命令,以后也不会一一分析了。结合上一篇分析,我们分析整个创建CollectionSpace的整个流程,从client调用创建接口,coord转发消息,到catalog收到消息,处理消息。

    所有的命令或者操作,流程类似于此。涉及数据库元数据的,会先发送消息给catalog。对于CRUD消息,也会先请求catalog拿到准确的数据节点信息,然后再发送给对应的数据节点,由数据节点上执行。

    总结一下SequoiaDB的客户端操作数据库的流程:

    客户端发送请求给coord节点

       coord先揪出这个请求是做什么

          交给对应的command处理

             查询(本地缓存或者远程获取的)catalog信息

             把消息转成节点间的内部消息

             转发给catalog目标节点

             然后等待catalog处理返回数据

             [

                转发消息给数据节点

                等待数据节点处理,返回数据

             ] // 绿色标识部分,如果不涉及到数据节点,可能不存在

         再把返回数据交给处理线程

    线程把返回结果发送给client

    =====>THE END<===== 

  • 相关阅读:
    【分布计算环境学习笔记】4 Enterprise Java Bean
    超详细的秒杀架构设计,运维,了解一下【转】
    Redis的监控指标【转】
    Windows netstat 查看端口、进程占用 查看进程路径
    wireshark抓包新手使用教程【转】
    关于设置sftp 指定端口【转】
    简单聊一聊Ansible自动化运维【转】
    tomcat启动报错SEVERE: Exception loading sessions from persistent storage【转】
    彻底搞懂 Kubernetes 的底层网络,看这几张图就够了【转】
    Java设计模式之(五)——代理模式
  • 原文地址:https://www.cnblogs.com/tynia/p/catalog.html
Copyright © 2011-2022 走看看