zoukankan      html  css  js  c++  java
  • SequoiaDB 系列之六 :源码分析之coord节点

    好久不见。

    在上一篇SequoiaDB 系列之五   :源码分析之main函数,有讲述进程开始运行时,会根据自身的角色,来初始化不同的CB(控制块,control block)。

    在之前的一篇SequoiaDB 系列之四   :架构简析中,我们简单过了一遍SequoiaDB的架构和各个节点的角色。

    今天来看看SequoiaDB的coord角色。

    首先,需要有个大致的轮廓:

    coord节点主要承担代理的角色。作为SequoiaDB集群对外的接头人,它转发消息给其它节点,组合(combine)不同节点返回的数据,把结果返回给client。

    catalog节点主要存储meta数据,比如集群中有哪些组,每个组的状态;每个组上有哪些节点,有哪些集合(Collection),哪些集合是主子表等等。

    data节点主要是管理存储的数据,它接受coord转发过来的CRUD等操作,并记录同步日志(最终一致性)。

    在注册CB的函数中:

    void _pmdController::registerCB( SDB_ROLE dbrole )
    {
       if ( SDB_ROLE_DATA == dbrole )
       {
          ...
       }
       else if ( SDB_ROLE_COORD == dbrole )
       {
          PMD_REGISTER_CB( sdbGetTransCB() ) ;      // TRANS
          PMD_REGISTER_CB( sdbGetCoordCB() ) ;      // COORD
          PMD_REGISTER_CB( sdbGetFMPCB () ) ;       // FMP
       }
       ...
       // 每个节点都会注册的控制块
       PMD_REGISTER_CB( sdbGetDMSCB() ) ;           // DMS
       PMD_REGISTER_CB( sdbGetRTNCB() ) ;           // RTN
       PMD_REGISTER_CB( sdbGetSQLCB() ) ;           // SQL
       PMD_REGISTER_CB( sdbGetAggrCB() ) ;          // AGGR
       PMD_REGISTER_CB( sdbGetPMDController() ) ;   // CONTROLLER
    }
    

     coord注册这几个CB之后,就开始注册和启动服务:

    具体函数在_KRCB::init()中,不再表述。_KRCB::init()会根据节点的角色,启动不同的服务。

    客户端连接到coord,coord便会启动一个线程,为该连接服务。

     1 INT32 pmdTcpListenerEntryPoint ( pmdEDUCB *cb, void *pData )
     2 {
     3    ...
     4 
     5    while ( !cb->isDisconnected() && !pListerner->isClosed() )
     6    {
     7       SOCKET s ;
     8       rc = pListerner->accept ( &s, NULL, NULL ) ;
     9       if ( SDB_TIMEOUT == rc || SDB_TOO_MANY_OPEN_FD == rc )
    10       {
    11          rc = SDB_OK ;
    12          continue ;
    13       }
    14       if ( rc && PMD_IS_DB_DOWN )
    15       {
    16          rc = SDB_OK ;
    17          goto done ;
    18       }
    19       else if ( rc )
    20       {
    21          PD_LOG ( PDERROR, "Failed to accept socket in TcpListener(rc=%d)",
    22                   rc ) ;
    23          if ( pListerner->isClosed() )
    24          {
    25             break ;
    26          }
    27          else
    28          {
    29             continue ;
    30          }
    31       }
    32 
    33       cb->incEventCount() ;
    34       ++mondbcb->numConnects ;
    35       void *pData = NULL ;
    36       *((SOCKET *) &pData) = s ;
    37       if ( !krcb->isActive() )
    38       {
    39          ossSocket newsock ( &s ) ;
    40          newsock.close () ;
    41          continue ;
    42       }
    43 
    44       rc = eduMgr->startEDU ( EDU_TYPE_AGENT, pData, &agentEDU ) ;
    45       if ( rc )
    46       {
    47          PD_LOG( ( rc == SDB_QUIESCED ? PDWARNING : PDERROR ),
    48                  "Failed to start edu, rc: %d", rc ) ;
    49          ossSocket newsock ( &s ) ;
    50          newsock.close () ;
    51          continue ;
    52       }
    53    } //while ( ! cb->isDisconnected() )
    54 
    55    ...
    56 }

    服务线程监听到client的连接,启动一个EDU_TYPE_AGENT类型的线程,单独为client服务。

    下面讲述coord节点的最主要的功能——消息转发

    coord的启动初,会初始化一些必要的全局变量。在SequoiaDB中,会初始化很多command,拿创建集合空间来说,在文件SequoiaDB/engine/rtn/rtnCoord.cpp 中

    1 RTN_COORD_CMD_BEGIN
    2    ...
    3 
    4    RTN_COORD_CMD_ADD( COORD_CMD_LISTCOLLECTIONSPACES, rtnCoordCMDListCollectionSpace )
    5 
    6    ...
    7 RTN_COORD_OP_END

     嗯,上面的代码有点MFC中消息映射的感觉。

    来看看 RTN_COORD_CMD_ADD 宏:

    1 #define RTN_COORD_CMD_ADD( cmdName, cmdClass )  {
    2        rtnCoordCommand *pObj = SDB_OSS_NEW cmdClass();
    3        _cmdMap.insert ( COORD_CMD_MAP::value_type (cmdName, pObj ));}

    宏主要是new一个对象,再把对象插入到_cmdMap中,这样在程序初始化时候,便会有一系列的command对象存储在_cmdMap中。

    另外,对SequoiaDB而言,所有的command操作,都是在查询操作的基础上做的,服务端用一些方法区别开是真正的查询,还是command。SequoiaDB的命令,是以$开头的字符串。

    前提简述完毕,现在假设client连接上了coord,coord也创建了一个线程,为这个client服务。

      1 INT32 _pmdLocalSession::run()
      2    {
      3       INT32 rc                = SDB_OK ;
      4       UINT32 msgSize          = 0 ;
      5       CHAR *pBuff             = NULL ;
      6       INT32 buffSize          = 0 ;
      7       pmdEDUMgr *pmdEDUMgr    = NULL ;
      8 
      9       if ( !_pEDUCB )
     10       {
     11          rc = SDB_SYS ;
     12          goto error ;
     13       }
     14 
     15       pmdEDUMgr = _pEDUCB->getEDUMgr() ;
     16 
     17       while ( !_pEDUCB->isDisconnected() && !_socket.isClosed() )
     18       {
     19          _pEDUCB->resetInterrupt() ;
     20          _pEDUCB->resetInfo( EDU_INFO_ERROR ) ;
     21          _pEDUCB->resetLsn() ;
     22 
     23          rc = recvData( (CHAR*)&msgSize, sizeof(UINT32) ) ;                         // 收取数据包的前四个字节,代表该数据包有多大
     24          if ( rc )
     25          {
     26             if ( SDB_APP_FORCED != rc )
     27             {
     28                PD_LOG( PDERROR, "Session[%s] failed to recv msg size, "
     29                        "rc: %d", sessionName(), rc ) ;
     30             }
     31             break ;
     32          }
     33 
     34          if ( msgSize == (UINT32)MSG_SYSTEM_INFO_LEN )  // 如果包长度是 MSG_SYSTEM_INFO_LEN (-1),则这是一个请求系统信息包,coord会返回本机的字节序列给client
     35          {                                              // 每个连接的第一个包,一定是长度标记为 MSG_SYSTEM_INFO_LEN 的包,否则字节序不正确,所有的数据都不能保证能正确解析(万一数据库运行在大端机上呢)
     36             rc = _recvSysInfoMsg( msgSize, &pBuff, buffSize ) ;
     37             if ( rc )
     38             {
     39                break ;
     40             }
     41             rc = _processSysInfoRequest( pBuff ) ;
     42             if ( rc )
     43             {
     44                break ;
     45             }
     46 
     47             _setHandshakeReceived() ;
     48          }
     49          else if ( msgSize < sizeof(MsgHeader) || msgSize > SDB_MAX_MSG_LENGTH )  // 对包的大小做出了限制,包长超过某值或者小于某值的包,都会导致连接中断
     50          {
     51             PD_LOG( PDERROR, "Session[%s] recv msg size[%d] is less than "
     52                     "MsgHeader size[%d] or more than max msg size[%d]",
     53                     sessionName(), msgSize, sizeof(MsgHeader),
     54                     SDB_MAX_MSG_LENGTH ) ;
     55             rc = SDB_INVALIDARG ;
     56             break ;
     57          }
     58          else
     59          {
     60             pBuff = getBuff( msgSize + 1 ) ;
     61             if ( !pBuff )
     62             {
     63                rc = SDB_OOM ;
     64                break ;
     65             }
     66             buffSize = getBuffLen() ;
     67             *(UINT32*)pBuff = msgSize ;
     68             rc = recvData( pBuff + sizeof(UINT32),
     69                            msgSize - sizeof(UINT32),
     70                            PMD_RECV_DATA_AFTER_LENGTH_TIMEOUT ) ;                // 到此处,说明程序可以愉快的接受client的发送的数据包了
     71             if ( rc )
     72             {
     73                if ( SDB_APP_FORCED != rc )
     74                {
     75                   PD_LOG( PDERROR, "Session[%s] failed to recv msg[len: %u], "
     76                           "rc: %d", sessionName(), msgSize - sizeof(UINT32),
     77                           rc ) ;
     78                }
     79                break ;
     80             }
     81  
     82             _pEDUCB->incEventCount() ;
     83             pBuff[ msgSize ] = 0 ;
     84             if ( SDB_OK != ( rc = pmdEDUMgr->activateEDU( _pEDUCB ) ) )
     85             {
     86                PD_LOG( PDERROR, "Session[%s] activate edu failed, rc: %d",
     87                        sessionName(), rc ) ;
     88                break ;
     89             }
     90             rc = _processMsg( (MsgHeader*)pBuff ) ;                                     // 收到数据包,开始处理,该函数在结合代码讲解
     91             if ( rc )
     92             {
     93                break ;
     94             }
     95             if ( SDB_OK != ( rc = pmdEDUMgr->waitEDU( _pEDUCB ) ) )
     96             {
     97                PD_LOG( PDERROR, "Session[%s] wait edu failed, rc: %d",
     98                        sessionName(), rc ) ;
     99                break ;
    100             }
    101          }
    102       } // end while
    103 
    104    done:
    105       disconnect() ;
    106       return rc ;
    107    error:
    108       goto done ;
    109    }

     _processMsg方法:

     1 INT32 _pmdLocalSession::_processMsg( MsgHeader * msg )
     2    {
     3       INT32 rc          = SDB_OK ;
     4       const CHAR *pBody = NULL ;
     5       INT32 bodyLen     = 0 ;
     6       rtnContextBuf contextBuff ;
     7       INT32 opCode      = msg->opCode ;
     8 
     9       rc = _onMsgBegin( msg ) ;                                     // 对数据包做前期处理,例如改数据包是不是需要返回,(若出错)需不需要回滚,并初始化好回复的数据包头部
    10       if ( SDB_OK == rc )
    11       {
    12          rc = _processor->processMsg( msg, contextBuff,             // 我是项目经理,这个包就交给processor处理去吧,我要的是结果。
    13                                       _replyHeader.contextID,       // processor在不同的节点中,指向不同的对象(咦,这不是多态么?),因此也有不同的处理方式
    14                                       _needReply ) ;
    15          pBody     = contextBuff.data() ;                           // pBody指向要返回的数据,避免拷贝(提高执行效率)
    16          bodyLen   = contextBuff.size() ;                           // 数据长度,不表
    17          _replyHeader.numReturned = contextBuff.recordNum() ;       // 返回的数据共有多少条记录
    18          _replyHeader.startFrom = (INT32)contextBuff.getStartFrom() ; // 应该从哪一条开始读
    19          if ( SDB_OK != rc )
    20          {
    21             if ( _needRollback )                                          // 当执行过程中例如(insert, delete等),出错了,需要把数据复原
    22             {
    23                INT32 rcTmp = rtnTransRollback( eduCB(), getDPSCB() ) ;
    24                if ( rcTmp )
    25                {
    26                   PD_LOG( PDERROR, "Session[%s] failed to rollback trans "
    27                           "info, rc: %d", sessionName(), rcTmp ) ;
    28                }
    29                _needRollback = FALSE ;
    30             }
    31          }
    32       }
    33 
    34       if ( _needReply )                                              // 需要回复,那就再处理一下把
    35       {
    36          if ( rc && bodyLen == 0 )                                   // 执行过程出错,那就返回出错信息
    37          {
    38             _errorInfo = utilGetErrorBson( rc, _pEDUCB->getInfo(
    39                                            EDU_INFO_ERROR ) ) ;
    40             pBody = _errorInfo.objdata() ;
    41             bodyLen = (INT32)_errorInfo.objsize() ;
    42             _replyHeader.numReturned = 1 ;
    43          }
    44          _replyHeader.header.opCode = MAKE_REPLY_TYPE(opCode) ;            // 填充回复数据包中的字段
    45          _replyHeader.flags         = rc ;
    46          _replyHeader.header.messageLength = sizeof( _replyHeader ) +
    47                                              bodyLen ;
    48 
    49          INT32 rcTmp = _reply( &_replyHeader, pBody, bodyLen ) ;            // 把包发送给client
    50          if ( rcTmp )
    51          {
    52             PD_LOG( PDERROR, "Session[%s] failed to send response, rc: %d",
    53                     sessionName(), rcTmp ) ;
    54             disconnect() ;
    55          }
    56       }
    57 
    58       _onMsgEnd( rc, msg ) ;
    59       rc = SDB_OK ;
    60 
    61       return rc ;
    62    }

     coord节点上的processor,是pmdCoordProcessor的一个实例,是用来做数据转发的,不同于真正做数据处理的pmdDataProcessor。

     1 INT32 _pmdCoordProcessor::processMsg( MsgHeader *msg,
     2                                          rtnContextBuf &contextBuff,
     3                                          INT64 &contextID,
     4                                          BOOLEAN &needReply )
     5    {
     6       ...
     7 
     8       rc = _processCoordMsg( msg, _replyHeader, contextBuff ) ;                     // 转给另一个函数(_processCoordMsg)处理,下面讲述
     9       if ( SDB_COORD_UNKNOWN_OP_REQ == rc )
    10       {
    11          contextBuff.release() ;
    12          rc = _pmdDataProcessor::processMsg( msg, contextBuff,                     // 如果上一个函数处理后,返回的错误是一个 SDB_COORD_UNKNOWN_OP_REQ类型,则交给pmdDataProcessor处理
    13                                              contextID, needReply ) ;
    14       }
    15       ...
    16    }

     pmdCoordProcessor的处理过程

     1 INT32 _pmdCoordProcessor::_processCoordMsg( MsgHeader *msg, 
     2                                                MsgOpReply &replyHeader,
     3                                                rtnContextBuf &contextBuff )
     4    {
     5       INT32 rc = SDB_OK ;
     6       if ( NULL != _pErrorObj )
     7       {
     8          SDB_OSS_DEL _pErrorObj ;
     9          _pErrorObj = NULL ;
    10       }
    11       if ( NULL != _pResultBuff )
    12       {
    13          _pResultBuff = NULL ;
    14       }
    15       CoordCB *pCoordcb  = _pKrcb->getCoordCB();
    16       rtnCoordProcesserFactory *pProcesserFactory
    17                                         = pCoordcb->getProcesserFactory();
    18 
    19       if ( MSG_AUTH_VERIFY_REQ == msg->opCode )
    20       {
    21          rc = SDB_COORD_UNKNOWN_OP_REQ ;
    22          goto done ;
    23       }
    24       else if ( MSG_BS_INTERRUPTE == msg->opCode ||
    25                 MSG_BS_INTERRUPTE_SELF == msg->opCode ||
    26                 MSG_BS_DISCONNECT == msg->opCode )
    27       {
    28       }
    29       else if ( !getClient()->isAuthed() )                        // 没有用用户和密码登录,就收到了数据包的,就先尝试用默认的用户名和密码,先取得数据库的授权,否则无法做操作
    30       {
    31          rc = getClient()->authenticate( "", "" ) ;
    32          if ( rc )
    33          {
    34             goto done ;
    35          }
    36       }
    37 
    38       switch ( msg->opCode )                                      // 开始检查client要做什么样的操作了
    39       {
    40       case MSG_BS_GETMORE_REQ :                                   // get more操作,coord不做处理,先标记成 SDB_COORD_UNKNOWN_OP_REQ,交给其它地方处理
    41          rc = SDB_COORD_UNKNOWN_OP_REQ ;
    42          break ;
    43       case MSG_BS_QUERY_REQ:                                      // 查询操作,这个是重点。所有的command
    44          {
    45             MsgOpQuery *pQueryMsg   = ( MsgOpQuery * )msg ;
    46             CHAR *pQueryName        = pQueryMsg->name ;
    47             SINT32 queryNameLen     = pQueryMsg->nameLength ;
    48             if ( queryNameLen > 0 && '$' == pQueryName[0] )       // 如果查询的name字段,是用$开头的字符串,则认为这个是command,要走command处理
    49             {
    50                rtnCoordCommand *pCmdProcesser = 
    51                            pProcesserFactory->getCommandProcesser( pQueryMsg ) ;  // 找到command的对象,上文中有描述所有的command都在初始化的时候,存入_cmdMap中
    52                if ( NULL != pCmdProcesser ) 
    53                {
    54                   rc = pCmdProcesser->execute( ( CHAR *)msg,                    // 找到了,就开始command处理了
    55                                                msg->messageLength,
    56                                                eduCB(),
    57                                                replyHeader,
    58                                                &contextBuff ) ;
    59                   break ;
    60                }
    61             }
    62             // 如果没有找到,则走入 default代码块
    63          }
    64       default:
    65          {
    66             rtnContextBase *pContext = NULL ;
    67             rtnCoordOperator *pOperator = 
    68                            pProcesserFactory->getOperator( msg->opCode ) ;         // 交给operator处理,operator是类似于command的几个特殊的处理对象,数量比较少,此处不表
    69             rc = pOperator->execute( ( CHAR* )msg,                                 // 转发给对应的operator类实例
    70                                      msg->messageLength,
    71                                      eduCB(),
    72                                      replyHeader,
    73                                      &contextBuff ) ;
    74              ...
    75           }
    76    }

     以创建集合空间的command为例,看看 rtnCoordCMDListCollectionSpace 的 execute做了什么:

    INT32 rtnCoordCMDCreateCollectionSpace::execute( CHAR *pReceiveBuffer,
                                                        SINT32 packSize,
                                                        pmdEDUCB *cb,
                                                        MsgOpReply &replyHeader,
                                                        rtnContextBuf *buf )
       {
          ...
    
          MsgOpQuery *pCreateReq           = (MsgOpQuery *)pReceiveBuffer;                   // 构造一个 MSG_CAT_CREATE_COLLECTION_SPACE_REQ 的数据包
          pCreateReq->header.routeID.value = 0;
          pCreateReq->header.TID           = cb->getTID();
          pCreateReq->header.opCode        = MSG_CAT_CREATE_COLLECTION_SPACE_REQ;            // 数据包的类型
    
          rc = executeOnCataGroup ( (CHAR*)pCreateReq, pRouteAgent,
                                    cb, NULL, NULL ) ;
          if ( rc )
          {
             PD_LOG ( PDERROR, "create collectionspace failed, rc = %d", rc ) ;
             goto error ;
          }
    
       done :
          replyHeader.flags = rc ;
          PD_TRACE_EXITRC ( SDB_RTNCOCMDCRCS_EXE, rc ) ;
          return rc;
       error :
          goto done ;
       }

     该函数的主体,构造了另外一个数据包,然后执行 executeOnCataGroup ( (CHAR*)pCreateReq, pRouteAgent, cb, NULL, NULL ) ;这一句上。跟进这一函数:

     1 INT32 rtnCoordCommand::executeOnCataGroup ( CHAR *pBuffer,
     2                                                netMultiRouteAgent *pRouteAgent,
     3                                                pmdEDUCB *cb,
     4                                                rtnContextCoord *pContext,
     5                                                CoordGroupList *pGroupList,
     6                                                std::vector<BSONObj> *pReplyObjs )
     7    {
     8       INT32 rc = SDB_OK;
     9       ...
    10    retry :
    11       rc = rtnCoordGetCatGroupInfo( cb, isNeedRefresh, catGroupInfo );            // 查询catalog的信息,主要是获取到catalog组的主节点的服务地址
    12       if ( rc )
    13       {
    14          probe = 100 ;
    15          goto error ;
    16          PD_LOG ( PDERROR, "Execute on catalogue node failed, failed to get "
    17                   "catalogue group info(rc=%d)", rc );
    18       }
    19       rc = rtnCoordSendRequestToPrimary( pBuffer, catGroupInfo, sendNodes,       // 跟了这么久,做了那么多的准备,这一句才是真开始了,有兴趣可以自己看一下 :)
    20                                          pRouteAgent, MSG_ROUTE_CAT_SERVICE,
    21                                          cb );
    22       if ( rc )
    23       {
    24          probe = 200 ;
    25          goto error ;
    26       }
    27       rc = rtnCoordGetReply( cb, sendNodes, replyQue,                         // 等待并收取远程节点处理的返回信息
    28                              MAKE_REPLY_TYPE(((MsgHeader*)pBuffer)->opCode) ) ;
    29       ...
    30    }

     rtnCoordSendRequestToPrimary就不再详细跟进描述了,根据函数名,大致就可以了解一个大概,是把数据发送到指定组(此处是catalog组)的主节点。

    coord上的其它command或者operator也是采用类似的方法来转发消息给其它节点,就不再一一赘述了。

    综合全文的讲述,coord处理client请求的流程

    发送请求给coord节点

       coord先揪出这个请求是做什么

          交给对应的command处理

             查询(本地缓存或者远程获取的)catalog信息

             把消息转成节点间的内部消息

             转发给目标节点

             然后等待返回数据

         再把返回数据交给处理线程

    线程把返回结果发送给client

    =====>THE END<===== 

  • 相关阅读:
    Oracle数据库面试题【转载】
    年龄计算周岁
    丈夫的权力与妻子的职业水平
    JDK 8 and JRE 8 Supported Locales
    一笔画 奇点 偶点
    流水行船问题
    PL/SQL LOOP SAMPLE
    OpenCV——识别各省份地图轮廓
    OpenCV——轮廓面积及长度计算
    树莓派3安装opencv2程序无法运行
  • 原文地址:https://www.cnblogs.com/tynia/p/coord.html
Copyright © 2011-2022 走看看