zoukankan      html  css  js  c++  java
  • [Apache Doris] Apache Doris 元数据设计及DDL操作源码阅读

    元数据设计

    image

    如上图,Doris 的元数据主要存储4类数据:

    • 用户数据信息。包括数据库、表的 Schema、分片信息等。
    • 各类作业信息。如导入作业,Clone 作业、SchemaChange 作业等。
    • 用户及权限信息
    • 集群及节点信息

    image

    元数据目录

    元数据目录通过 FE 的配置项 meta_dir 指定。
    bdb/ 目录下为 bdbje 的数据存放目录。
    image/ 目录下为 image 文件的存放目录。
    image.[logid] 是最新的 image 文件。后缀 logid 表明 image 所包含的最后一条日志的 id。

    image.ckpt 是正在写入的 image 文件,如果写入成功,会重命名为 image.[logid],并替换掉旧的 image 文件。

    VERSION 文件中记录着 cluster_id。cluster_id 唯一标识一个 Doris 集群。是在 leader 第一次启动时随机生成的一个 32 位整型。也可以通过 fe 配置项 cluster_id 来指定一个 cluster id。

    ROLE 文件中记录的 FE 自身的角色。只有 FOLLOWER 和 OBSERVER 两种。其中 FOLLOWER 表示 FE 为一个可选举的节点。(注意:即使是 leader 节点,其角色也为 FOLLOWER)

    DDL相关源代码阅读

    启动MySQL服务

    org.apache.doris.qe.QeService

    if (nioEnabled) {
        mysqlServer = new NMysqlServer(port, scheduler);
    } else {
        mysqlServer = new MysqlServer(port, scheduler);
    }
    

    DDL代码调用过程

    image

    org.apache.doris.qe.ConnectProcessor#dispatch 命令识别

    switch (command) {
        case COM_INIT_DB:
            handleInitDb();
            break;
        case COM_QUIT:
            handleQuit();
            break;
        case COM_QUERY:
            handleQuery();
            break;
        case COM_FIELD_LIST:
            handleFieldList();
            break;
        case COM_PING:
            handlePing();
            break;
        default:
            ctx.getState().setError("Unsupported command(" + command + ")");
            LOG.warn("Unsupported command(" + command + ")");
            break;
    

    org.apache.doris.qe.ConnectProcessor#analyze 词法语法解析

    // Parse statement with parser generated by CUP&FLEX
    SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());
    SqlParser parser = new SqlParser(input);
    

    从连接中读取原始语句字符串
    词法解析文件
    • fe/fe-core/src/main/jflex/sql_scanner.flex
    • 语法解析文件
    • fe/fe-core/src/main/cup/sql_parser.cup

    所有语法实现类:

    StatementBase     [vim org/apache/doris/analysis/StatementBase.java +33]
          ├── ExportStmt    [vim org/apache/doris/analysis/ExportStmt.java +48]
          ├── ImportColumnsStmt     [vim org/apache/doris/analysis/ImportColumnsStmt.java +21]
          ├── ImportDeleteOnStmt    [vim org/apache/doris/analysis/ImportDeleteOnStmt.java +19]
          ├── ImportSequenceStmt    [vim org/apache/doris/analysis/ImportSequenceStmt.java +19]
          ├── ImportWhereStmt       [vim org/apache/doris/analysis/ImportWhereStmt.java +19]
          ├── KillStmt      [vim org/apache/doris/analysis/KillStmt.java +19]
          ├── SetStmt       [vim org/apache/doris/analysis/SetStmt.java +24]
          ├── UseStmt       [vim org/apache/doris/analysis/UseStmt.java +33]
          ├── QueryStmt     [vim org/apache/doris/analysis/QueryStmt.java +38]
          │   ├── SelectStmt        [vim org/apache/doris/analysis/SelectStmt.java +65]
          │   └── SetOperationStmt  [vim org/apache/doris/analysis/SetOperationStmt.java +36]
          ├── ShowStmt      [vim org/apache/doris/analysis/ShowStmt.java +22]
          │   ├── AdminShowConfigStmt       [vim org/apache/doris/analysis/AdminShowConfigStmt.java +33]
          │   ├── AdminShowDataSkewStmt     [vim org/apache/doris/analysis/AdminShowDataSkewStmt.java +32]
          │   ├── AdminShowReplicaDistributionStmt  [vim org/apache/doris/analysis/AdminShowReplicaDistributionStmt.java +34]
          │   ├── AdminShowReplicaStatusStmt        [vim org/apache/doris/analysis/AdminShowReplicaStatusStmt.java +39]
          │   ├── DescribeStmt      [vim org/apache/doris/analysis/DescribeStmt.java +54]
          │   ├── HelpStmt  [vim org/apache/doris/analysis/HelpStmt.java +26]
          │   ├── ShowAlterStmt     [vim org/apache/doris/analysis/ShowAlterStmt.java +46]
          │   ├── ShowAuthorStmt    [vim org/apache/doris/analysis/ShowAuthorStmt.java +23]
          │   ├── ShowBackendsStmt  [vim org/apache/doris/analysis/ShowBackendsStmt.java +30]
          │   ├── ShowBackupStmt    [vim org/apache/doris/analysis/ShowBackupStmt.java +38]
          │   ├── ShowBrokerStmt    [vim org/apache/doris/analysis/ShowBrokerStmt.java +30]
          │   ├── ShowCharsetStmt   [vim org/apache/doris/analysis/ShowCharsetStmt.java +23]
          │   ├── ShowClusterStmt   [vim org/apache/doris/analysis/ShowClusterStmt.java +34]
          │   ├── ShowCollationStmt [vim org/apache/doris/analysis/ShowCollationStmt.java +24]
          │   ├── ShowColumnStatsStmt       [vim org/apache/doris/analysis/ShowColumnStatsStmt.java +28]
          │   ├── ShowColumnStmt    [vim org/apache/doris/analysis/ShowColumnStmt.java +28]
          │   ├── ShowCreateDbStmt  [vim org/apache/doris/analysis/ShowCreateDbStmt.java +36]
          │   ├── ShowCreateFunctionStmt    [vim org/apache/doris/analysis/ShowCreateFunctionStmt.java +32]
          │   ├── ShowCreateRoutineLoadStmt [vim org/apache/doris/analysis/ShowCreateRoutineLoadStmt.java +24]
          │   ├── ShowCreateTableStmt       [vim org/apache/doris/analysis/ShowCreateTableStmt.java +29]
          │   ├── ShowDataStmt      [vim org/apache/doris/analysis/ShowDataStmt.java +56]
          │   ├── ShowDbIdStmt      [vim org/apache/doris/analysis/ShowDbIdStmt.java +29]
          │   ├── ShowDbStmt        [vim org/apache/doris/analysis/ShowDbStmt.java +27]
          │   ├── ShowDeleteStmt    [vim org/apache/doris/analysis/ShowDeleteStmt.java +31]
          │   ├── ShowDynamicPartitionStmt  [vim org/apache/doris/analysis/ShowDynamicPartitionStmt.java +29]
          │   ├── ShowEncryptKeysStmt       [vim org/apache/doris/analysis/ShowEncryptKeysStmt.java +32]
          │   ├── ShowEnginesStmt   [vim org/apache/doris/analysis/ShowEnginesStmt.java +23]
          │   ├── ShowEventsStmt    [vim org/apache/doris/analysis/ShowEventsStmt.java +23]
          │   ├── ShowExportStmt    [vim org/apache/doris/analysis/ShowExportStmt.java +40]
          │   ├── ShowFrontendsStmt [vim org/apache/doris/analysis/ShowFrontendsStmt.java +30]
          │   ├── ShowFunctionsStmt [vim org/apache/doris/analysis/ShowFunctionsStmt.java +32]
          │   ├── ShowGrantsStmt    [vim org/apache/doris/analysis/ShowGrantsStmt.java +32]
          │   ├── ShowIndexStmt     [vim org/apache/doris/analysis/ShowIndexStmt.java +32]
          │   ├── ShowLoadProfileStmt       [vim org/apache/doris/analysis/ShowLoadProfileStmt.java +27]
          │   ├── ShowLoadStmt      [vim org/apache/doris/analysis/ShowLoadStmt.java +42]
          │   ├── ShowLoadWarningsStmt      [vim org/apache/doris/analysis/ShowLoadWarningsStmt.java +36]
          │   ├── ShowMigrationsStmt        [vim org/apache/doris/analysis/ShowMigrationsStmt.java +31]
          │   ├── ShowOpenTableStmt [vim org/apache/doris/analysis/ShowOpenTableStmt.java +23]
          │   ├── ShowPartitionIdStmt       [vim org/apache/doris/analysis/ShowPartitionIdStmt.java +29]
          │   ├── ShowPartitionsStmt        [vim org/apache/doris/analysis/ShowPartitionsStmt.java +49]
          │   ├── ShowPluginsStmt   [vim org/apache/doris/analysis/ShowPluginsStmt.java +23]
          │   ├── ShowProcStmt      [vim org/apache/doris/analysis/ShowProcStmt.java +32]
          │   ├── ShowProcedureStmt [vim org/apache/doris/analysis/ShowProcedureStmt.java +23]
          │   ├── ShowProcesslistStmt       [vim org/apache/doris/analysis/ShowProcesslistStmt.java +24]
          │   ├── ShowQueryProfileStmt      [vim org/apache/doris/analysis/ShowQueryProfileStmt.java +27]
          │   ├── ShowRepositoriesStmt      [vim org/apache/doris/analysis/ShowRepositoriesStmt.java +25]
          │   ├── ShowResourcesStmt [vim org/apache/doris/analysis/ShowResourcesStmt.java +37]
          │   ├── ShowRestoreStmt   [vim org/apache/doris/analysis/ShowRestoreStmt.java +38]
          │   ├── ShowRolesStmt     [vim org/apache/doris/analysis/ShowRolesStmt.java +29]
          │   ├── ShowRollupStmt    [vim org/apache/doris/analysis/ShowRollupStmt.java +28]
          │   ├── ShowRoutineLoadStmt       [vim org/apache/doris/analysis/ShowRoutineLoadStmt.java +34]
          │   ├── ShowRoutineLoadTaskStmt   [vim org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java +32]
          │   ├── ShowSmallFilesStmt        [vim org/apache/doris/analysis/ShowSmallFilesStmt.java +32]
          │   ├── ShowSnapshotStmt  [vim org/apache/doris/analysis/ShowSnapshotStmt.java +29]
          │   ├── ShowSqlBlockRuleStmt      [vim org/apache/doris/analysis/ShowSqlBlockRuleStmt.java +31]
          │   ├── ShowStatusStmt    [vim org/apache/doris/analysis/ShowStatusStmt.java +23]
          │   ├── ShowStreamLoadStmt        [vim org/apache/doris/analysis/ShowStreamLoadStmt.java +39]
          │   ├── ShowSyncJobStmt   [vim org/apache/doris/analysis/ShowSyncJobStmt.java +33]
          │   ├── ShowTableIdStmt   [vim org/apache/doris/analysis/ShowTableIdStmt.java +30]
          │   ├── ShowTableStatsStmt        [vim org/apache/doris/analysis/ShowTableStatsStmt.java +32]
          │   ├── ShowTableStatusStmt       [vim org/apache/doris/analysis/ShowTableStatusStmt.java +35]
          │   ├── ShowTableStmt     [vim org/apache/doris/analysis/ShowTableStmt.java +34]
          │   ├── ShowTabletStmt    [vim org/apache/doris/analysis/ShowTabletStmt.java +39]
          │   ├── ShowTransactionStmt       [vim org/apache/doris/analysis/ShowTransactionStmt.java +35]
          │   ├── ShowTrashDiskStmt [vim org/apache/doris/analysis/ShowTrashDiskStmt.java +33]
          │   ├── ShowTrashStmt     [vim org/apache/doris/analysis/ShowTrashStmt.java +36]
          │   ├── ShowTriggersStmt  [vim org/apache/doris/analysis/ShowTriggersStmt.java +23]
          │   ├── ShowUserPropertyStmt      [vim org/apache/doris/analysis/ShowUserPropertyStmt.java +42]
          │   ├── ShowUserStmt      [vim org/apache/doris/analysis/ShowUserStmt.java +25]
          │   ├── ShowVariablesStmt [vim org/apache/doris/analysis/ShowVariablesStmt.java +29]
          │   ├── ShowViewStmt      [vim org/apache/doris/analysis/ShowViewStmt.java +39]
          │   ├── ShowWarningStmt   [vim org/apache/doris/analysis/ShowWarningStmt.java +23]
          │   └── ShowWhiteListStmt [vim org/apache/doris/analysis/ShowWhiteListStmt.java +23]
          ├── TransactionStmt       [vim org/apache/doris/analysis/TransactionStmt.java +22]
          │   ├── TransactionBeginStmt      [vim org/apache/doris/analysis/TransactionBeginStmt.java +24]
          │   ├── TransactionCommitStmt     [vim org/apache/doris/analysis/TransactionCommitStmt.java +19]
          │   └── TransactionRollbackStmt   [vim org/apache/doris/analysis/TransactionRollbackStmt.java +19]
          ├── UnsupportedStmt       [vim org/apache/doris/analysis/UnsupportedStmt.java +22]
          │   └── EmptyStmt [vim org/apache/doris/analysis/EmptyStmt.java +19]
          └── DdlStmt       [vim org/apache/doris/analysis/DdlStmt.java +19]
              ├── AdminCancelRepairTableStmt        [vim org/apache/doris/analysis/AdminCancelRepairTableStmt.java +33]
              ├── AdminCheckTabletsStmt     [vim org/apache/doris/analysis/AdminCheckTabletsStmt.java +33]
              ├── AdminCleanTrashStmt       [vim org/apache/doris/analysis/AdminCleanTrashStmt.java +34]
              ├── AdminRepairTableStmt      [vim org/apache/doris/analysis/AdminRepairTableStmt.java +33]
              ├── AdminSetConfigStmt        [vim org/apache/doris/analysis/AdminSetConfigStmt.java +32]
              ├── AdminSetReplicaStatusStmt [vim org/apache/doris/analysis/AdminSetReplicaStatusStmt.java +30]
              ├── AlterClusterStmt  [vim org/apache/doris/analysis/AlterClusterStmt.java +29]
              ├── AlterColumnStatsStmt      [vim org/apache/doris/analysis/AlterColumnStatsStmt.java +33]
              ├── AlterDatabasePropertyStmt [vim org/apache/doris/analysis/AlterDatabasePropertyStmt.java +24]
              ├── AlterDatabaseQuotaStmt    [vim org/apache/doris/analysis/AlterDatabaseQuotaStmt.java +30]
              ├── AlterDatabaseRename       [vim org/apache/doris/analysis/AlterDatabaseRename.java +34]
              ├── AlterRoutineLoadStmt      [vim org/apache/doris/analysis/AlterRoutineLoadStmt.java +34]
              ├── AlterSqlBlockRuleStmt     [vim org/apache/doris/analysis/AlterSqlBlockRuleStmt.java +31]
              ├── AlterSystemStmt   [vim org/apache/doris/analysis/AlterSystemStmt.java +28]
              ├── AlterTableStatsStmt       [vim org/apache/doris/analysis/AlterTableStatsStmt.java +33]
              ├── AlterTableStmt    [vim org/apache/doris/analysis/AlterTableStmt.java +37]
              ├── CancelLoadStmt    [vim org/apache/doris/analysis/CancelLoadStmt.java +26]
              ├── CreateClusterStmt [vim org/apache/doris/analysis/CreateClusterStmt.java +33]
              ├── CreateDataSyncJobStmt     [vim org/apache/doris/analysis/CreateDataSyncJobStmt.java +36]
              ├── CreateDbStmt      [vim org/apache/doris/analysis/CreateDbStmt.java +31]
              ├── CreateEncryptKeyStmt      [vim org/apache/doris/analysis/CreateEncryptKeyStmt.java +30]
              ├── CreateFileStmt    [vim org/apache/doris/analysis/CreateFileStmt.java +35]
              ├── CreateFunctionStmt        [vim org/apache/doris/analysis/CreateFunctionStmt.java +47]
              ├── CreateMaterializedViewStmt        [vim org/apache/doris/analysis/CreateMaterializedViewStmt.java +43]
              ├── CreateRepositoryStmt      [vim org/apache/doris/analysis/CreateRepositoryStmt.java +28]
              ├── CreateResourceStmt        [vim org/apache/doris/analysis/CreateResourceStmt.java +32]
              ├── CreateRoleStmt    [vim org/apache/doris/analysis/CreateRoleStmt.java +28]
              ├── CreateRoutineLoadStmt     [vim org/apache/doris/analysis/CreateRoutineLoadStmt.java +48]
              ├── CreateSqlBlockRuleStmt    [vim org/apache/doris/analysis/CreateSqlBlockRuleStmt.java +37]
              ├── CreateTableAsSelectStmt   [vim org/apache/doris/analysis/CreateTableAsSelectStmt.java +26]
              ├── CreateTableLikeStmt       [vim org/apache/doris/analysis/CreateTableLikeStmt.java +31]
              ├── CreateTableStmt   [vim org/apache/doris/analysis/CreateTableStmt.java +56]
              ├── CreateUserStmt    [vim org/apache/doris/analysis/CreateUserStmt.java +36]
              ├── DeleteStmt        [vim org/apache/doris/analysis/DeleteStmt.java +35]
              ├── DropClusterStmt   [vim org/apache/doris/analysis/DropClusterStmt.java +31]
              ├── DropDbStmt        [vim org/apache/doris/analysis/DropDbStmt.java +30]
              ├── DropEncryptKeyStmt        [vim org/apache/doris/analysis/DropEncryptKeyStmt.java +28]
              ├── DropFileStmt      [vim org/apache/doris/analysis/DropFileStmt.java +34]
              ├── DropFunctionStmt  [vim org/apache/doris/analysis/DropFunctionStmt.java +27]
              ├── DropMaterializedViewStmt  [vim org/apache/doris/analysis/DropMaterializedViewStmt.java +29]
              ├── DropRepositoryStmt        [vim org/apache/doris/analysis/DropRepositoryStmt.java +27]
              ├── DropResourceStmt  [vim org/apache/doris/analysis/DropResourceStmt.java +27]
              ├── DropRoleStmt      [vim org/apache/doris/analysis/DropRoleStmt.java +28]
              ├── DropSqlBlockRuleStmt      [vim org/apache/doris/analysis/DropSqlBlockRuleStmt.java +30]
              ├── DropTableStmt     [vim org/apache/doris/analysis/DropTableStmt.java +28]
              ├── DropUserStmt      [vim org/apache/doris/analysis/DropUserStmt.java +27]
              ├── EnterStmt [vim org/apache/doris/analysis/EnterStmt.java +25]
              ├── GrantStmt [vim org/apache/doris/analysis/GrantStmt.java +39]
              ├── InsertStmt        [vim org/apache/doris/analysis/InsertStmt.java +66]
              ├── InstallPluginStmt [vim org/apache/doris/analysis/InstallPluginStmt.java +31]
              ├── LinkDbStmt        [vim org/apache/doris/analysis/LinkDbStmt.java +31]
              ├── LoadStmt  [vim org/apache/doris/analysis/LoadStmt.java +45]
              ├── MigrateDbStmt     [vim org/apache/doris/analysis/MigrateDbStmt.java +29]
              ├── PauseRoutineLoadStmt      [vim org/apache/doris/analysis/PauseRoutineLoadStmt.java +26]
              ├── PauseSyncJobStmt  [vim org/apache/doris/analysis/PauseSyncJobStmt.java +22]
              ├── RecoverDbStmt     [vim org/apache/doris/analysis/RecoverDbStmt.java +33]
              ├── RecoverPartitionStmt      [vim org/apache/doris/analysis/RecoverPartitionStmt.java +32]
              ├── RecoverTableStmt  [vim org/apache/doris/analysis/RecoverTableStmt.java +32]
              ├── ResumeRoutineLoadStmt     [vim org/apache/doris/analysis/ResumeRoutineLoadStmt.java +26]
              ├── ResumeSyncJobStmt [vim org/apache/doris/analysis/ResumeSyncJobStmt.java +22]
              ├── RevokeStmt        [vim org/apache/doris/analysis/RevokeStmt.java +32]
              ├── SetUserPropertyStmt       [vim org/apache/doris/analysis/SetUserPropertyStmt.java +31]
              ├── StopRoutineLoadStmt       [vim org/apache/doris/analysis/StopRoutineLoadStmt.java +23]
              ├── StopSyncJobStmt   [vim org/apache/doris/analysis/StopSyncJobStmt.java +22]
              ├── SyncStmt  [vim org/apache/doris/analysis/SyncStmt.java +22]
              ├── TruncateTableStmt [vim org/apache/doris/analysis/TruncateTableStmt.java +27]
              ├── UninstallPluginStmt       [vim org/apache/doris/analysis/UninstallPluginStmt.java +28]
              ├── UpdateStmt        [vim org/apache/doris/analysis/UpdateStmt.java +35]
              ├── AbstractBackupStmt        [vim org/apache/doris/analysis/AbstractBackupStmt.java +36]
              │   ├── BackupStmt    [vim org/apache/doris/analysis/BackupStmt.java +29]
              │   └── RestoreStmt   [vim org/apache/doris/analysis/RestoreStmt.java +33]
              ├── BaseViewStmt      [vim org/apache/doris/analysis/BaseViewStmt.java +39]
              │   ├── AlterViewStmt [vim org/apache/doris/analysis/AlterViewStmt.java +31]
              │   └── CreateViewStmt        [vim org/apache/doris/analysis/CreateViewStmt.java +33]
              └── CancelStmt        [vim org/apache/doris/analysis/CancelStmt.java +19]
                  ├── CancelAlterSystemStmt [vim org/apache/doris/analysis/CancelAlterSystemStmt.java +28]
                  ├── CancelAlterTableStmt  [vim org/apache/doris/analysis/CancelAlterTableStmt.java +31]
                  └── CancelBackupStmt      [vim org/apache/doris/analysis/CancelBackupStmt.java +30]
    

    org.apache.doris.qe.StmtExecutor#execute(TUniqueId)

    analyze(context.getSessionVariable().toThrift()); //语义解析
    if (isForwardToMaster()) {
        forwardToMaster();  //转发处理
        if (masterOpExecutor != null && masterOpExecutor.getQueryId() != null) {
            context.setQueryId(masterOpExecutor.getQueryId());
        }
        return;
    } else {
        LOG.debug("no need to transfer to Master. stmt: {}", context.getStmtId());
    }
    
    
    //命令执行
    else if (parsedStmt instanceof DdlStmt) {
        handleDdlStmt();
    } else if (parsedStmt instanceof ShowStmt) {
        handleShow();
    } else if (parsedStmt instanceof KillStmt) {
        handleKill();
    }
    

    语义解析

    判断含义的正确性

    @Override
    public void analyze(Analyzer analyzer) throws UserException {
    
        super.analyze(analyzer);
        tableName.analyze(analyzer);   
        
        checkTblPriv(ConnectContext.get(), tableName.getDb(),
            tableName.getTbl(), PrivPredicate.CREATE)
            
        analyzeEngineName();  
          
        keysDesc.analyze(columnDefs);
        
        for (ColumnDef columnDef : columnDefs) {
            columnDef.analyze(engineName.equals("olap"));
        }   
        
        partitionDesc.analyze(columnDefs, properties);
        
        distributionDesc.analyze(columnSet);
    }
    

    验证名称是否合法
    权限是否正确
    分区是否合法
    列类型是否合法

    转发处理

    Master、Follower、Observer
    只有Master有元数据的修改能力
    所有需要修改元数据的操作,需要转发到Master去执行
    转发类型:
    FORWARD_NO_SYNC
    FORWARD_WITH_SYNC
    NO_FORWARD
    DDL 采用 FORWARD_WITH_SYNC

    命令执行

    org.apache.doris.qe.DdlExecutor#execute()

    //根据语句类型执行相应的函数
    
    if (ddlStmt instanceof CreateTableStmt) {
        catalog.createTable((CreateTableStmt) ddlStmt);
    } 
    

    支持多种表类型, 除了olap 表, 其余都为映射表

    if (engineName.equals("olap")) {
        createOlapTable(db, stmt);
        return;
    } else if (engineName.equals("odbc")) {
        createOdbcTable(db, stmt);
        return;
    } else if (engineName.equals("mysql")) {
        createMysqlTable(db, stmt);
        return;
    } else if (engineName.equals("broker")) {
        createBrokerTable(db, stmt);
        return;
    } else if (engineName.equalsIgnoreCase("elasticsearch") || engineName.equalsIgnoreCase("es")) {
        createEsTable(db, stmt);
        return;
    } else if (engineName.equalsIgnoreCase("hive")) {
        createHiveTable(db, stmt);
        return;
    }
    

    org.apache.doris.catalog.Catalog#createOlapTable

    //将语法对象转为元数据对象
    String tableName = stmt.getTableName();
    LOG.debug("begin create olap table: {}", tableName);
    
    // create columns
    List<Column> baseSchema = stmt.getColumns();
    validateColumns(baseSchema);
    
    // create partition info
    PartitionDesc partitionDesc = stmt.getPartitionDesc();
    PartitionInfo partitionInfo = null;
    
    //创建table对象
    long tableId = Catalog.getCurrentCatalog().getNextId();
    OlapTable olapTable = new OlapTable(tableId, tableName, baseSchema, keysType, partitionInfo,
            distributionInfo, indexes);
            
      // 创建Partition 对象
      if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
            // this is a 1-level partitioned table
            // use table name as partition name
            String partitionName = tableName;
            long partitionId = partitionNameToId.get(partitionName);
            // create partition
            Partition partition = createPartitionWithIndices()
            olapTable.addPartition(partition);
        }
        
        
    //添加元数据并进行持久化
    Pair<Boolean, Boolean> result = db.createTableWithLock(olapTable, false, stmt.isSetIfNotExists());
    

    org.apache.doris.catalog.Catalog#createPartitionWithIndices

    分区是表的实体

    image

    // create base index first.
    Preconditions.checkArgument(baseIndexId != -1);
    MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId, IndexState.NORMAL);
    
    // create partition with base index
    Partition partition = new Partition(partitionId, partitionName, baseIndex, distributionInfo);
    
    // add to index map
    Map<Long, MaterializedIndex> indexMap = new HashMap<>();
    indexMap.put(baseIndexId, baseIndex);
    
    // create rollup index if has
    for (long indexId : indexIdToMeta.keySet()) {
        if (indexId == baseIndexId) {
            continue;
        }
    
        MaterializedIndex rollup = new MaterializedIndex(indexId, IndexState.NORMAL);
        indexMap.put(indexId, rollup);
    }
    
    for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) {
    // create tablets
    int schemaHash = indexMeta.getSchemaHash();
    TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, storageMedium);
    createTablets(clusterName, index, ReplicaState.NORMAL, distributionInfo, version, versionHash,
            replicaAlloc, tabletMeta, tabletIdSet);
            
        
    // add create replica task for olap
    short shortKeyColumnCount = indexMeta.getShortKeyColumnCount();
    TStorageType storageType = indexMeta.getStorageType();
    List<Column> schema = indexMeta.getSchema();
    KeysType keysType = indexMeta.getKeysType();
    int totalTaskNum = index.getTablets().size() * totalReplicaNum;
    MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<Long, Long>(totalTaskNum);
    AgentBatchTask batchTask = new AgentBatchTask();
    for (Tablet tablet : index.getTablets()) {
        long tabletId = tablet.getId();
        for (Replica replica : tablet.getReplicas()) {
            long backendId = replica.getBackendId();
            countDownLatch.addMark(backendId, tabletId);
            CreateReplicaTask task = new CreateReplicaTask(backendId, dbId, tableId,
                    partitionId, indexId, tabletId,
                    shortKeyColumnCount, schemaHash,
                    version, versionHash,
                    keysType,
                    storageType, storageMedium,
                    schema, bfColumns, bfFpp,
                    countDownLatch,
                    indexes,
                    isInMemory,
                    tabletType);
            task.setStorageFormat(storageFormat);
            batchTask.addTask(task);
            // add to AgentTaskQueue for handling finish report.
            // not for resending task
            AgentTaskQueue.addTask(task);
        }
    }
    AgentTaskExecutor.submit(batchTask);    
           
    }
    

    整体流程:

    • 创建Partition 对象
    • 创建MaterializedIndex对象
    • 对于每个MaterializedIndex对象 创建创建Tablet
    • 创建replica并下发任务到BE
    
    // estimate timeout
    long timeout = Config.tablet_create_timeout_second * 1000L * totalTaskNum;
    timeout = Math.min(timeout, Config.max_create_table_timeout_second * 1000);
    try {
        ok = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        LOG.warn("InterruptedException: ", e);
        ok = false;
    }
    

    等待BE执行任务完成

    org.apache.doris.catalog.Database#createTableWithLock

    idToTable.put(table.getId(), table);
    nameToTable.put(table.getName(), table);
    lowerCaseToTableName.put(tableName.toLowerCase(), tableName);
    
    if (!isReplay) {
        // Write edit log
        CreateTableInfo info = new CreateTableInfo(fullQualifiedName, table);
        Catalog.getCurrentCatalog().getEditLog().logCreateTable(info);
    }
    if (table.getType() == TableType.ELASTICSEARCH) {
        Catalog.getCurrentCatalog().getEsRepository().registerTable((EsTable) table);
    }
    

    将table添加到DataBase对象里
    判断是否replay
    写入元数据日志

    流程总结:

    image

    FE与BE交互

    image

    FE与BE交互流程图

    FE 发送任务
    BE执行
    BE汇报执行结果
    FE汇总结果

    AgentBatchTask batchTask = new AgentBatchTask();
    for (Tablet tablet : index.getTablets()) {
        long tabletId = tablet.getId();
        for (Replica replica : tablet.getReplicas()) {
            long backendId = replica.getBackendId();
            countDownLatch.addMark(backendId, tabletId);
            CreateReplicaTask task = new CreateReplicaTask(backendId, dbId, tableId,
                    partitionId, indexId, tabletId,
                    shortKeyColumnCount, schemaHash,
                    version, versionHash,
                    keysType,
                    storageType, storageMedium,
                    schema, bfColumns, bfFpp,
                    countDownLatch,
                    indexes,
                    isInMemory,
                    tabletType);
            task.setStorageFormat(storageFormat);
            batchTask.addTask(task);
            // add to AgentTaskQueue for handling finish report.
            // not for resending task
            AgentTaskQueue.addTask(task);
        }
    }
    AgentTaskExecutor.submit(batchTask);
    

    AgentBatchTask:
    收集Task并按照Be分组

    AgentTaskExecutor:
    发送AgentBatchTask

    AgentTaskQueue:
    处理任务完成的上报

    BE任务接收
    be/src/agent/agent_server.cpp
    接收Task

    // resend request when something is wrong(BE may need some logic to guarantee idempotence.
    void AgentServer::submit_tasks(TAgentResult& agent_result,
                                   const std::vector<TAgentTaskRequest>& tasks) {
        Status ret_st;
        // TODO check master_info here if it is the same with that of heartbeat rpc
        if (_master_info.network_address.hostname == "" || _master_info.network_address.port == 0) {
            Status ret_st = Status::Cancelled("Have not get FE Master heartbeat yet");
            ret_st.to_thrift(&agent_result.status);
            return;
        }
    
        for (auto task : tasks) {
            VLOG_RPC << "submit one task: " << apache::thrift::ThriftDebugString(task).c_str();
            TTaskType::type task_type = task.task_type;
            int64_t signature = task.signature;
    
    #define HANDLE_TYPE(t_task_type, work_pool, req_member)                         \
        case t_task_type:                                                           \
            if (task.__isset.req_member) {                                          \
                work_pool->submit_task(task);                                       \
            } else {                                                                \
                ret_st = Status::InvalidArgument(strings::Substitute(               \
                        "task(signature=$0) has wrong request member", signature)); \
            }                                                                       \
            break;
    
           ...
    
        ret_st.to_thrift(&agent_result.status);
    }
    

    工作线程

    while (_is_work) {
        TAgentTaskRequest agent_task_req;
        TCreateTabletReq create_tablet_req;
        {
            lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
            while (_is_work && _tasks.empty()) {
                _worker_thread_condition_variable.wait();
            }
            if (!_is_work) {
                return;
            }
            //从队列中取出任务
            agent_task_req = _tasks.front();
            create_tablet_req = agent_task_req.create_tablet_req;
            _tasks.pop_front();
          
            //执行
            OLAPStatus create_status = _env->storage_engine()->create_tablet(create_tablet_req);
          
          
            TFinishTaskRequest finish_task_request;
            finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);
            finish_task_request.__set_backend(_backend);
            finish_task_request.__set_report_version(_s_report_version);
            finish_task_request.__set_task_type(agent_task_req.task_type);
            finish_task_request.__set_signature(agent_task_req.signature);
            finish_task_request.__set_task_status(task_status);
            //汇报结果
            _finish_task(finish_task_request);
       }   
    

    处理任务汇报

    org.apache.doris.service.FrontendServiceImpl#finishTask
    org.apache.doris.master.MasterImpl#finishTask
    FE、BE通过Thrift协议通信

    错误处理

    org.apache.doris.task.AgentTaskQueue 存储正在执行的Task

    image

    org.apache.doris.master.ReportHandler#handleReport
    org.apache.doris.master.ReportHandler#taskReport
    BE: Report tasks/olap tablet/disk state to the master server
    FE master 处理任务,超时会进行重试

    private static void taskReport(long backendId, Map<TTaskType, Set<Long>> runningTasks) {
        ...
        
        // to escape sending duplicate agent task to be
        if (task.shouldResend(taskReportTime)) {
            batchTask.addTask(task);
        }
        ...
    }
    

    元数据持久化

    image

    Edit类似WAL

    BDBJE 分布式KV存储

    元数据持久化:org.apache.doris.catalog.Database#createTableWithLock

    public Pair<Boolean, Boolean> createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) {
       ...
       //更新内存
       nameToTable.put(table.getName(), table);
        // Write edit log
        //构建元数据日志
        CreateTableInfo info = new CreateTableInfo(fullQualifiedName, table);
        //写入元数据日志
        Catalog.getCurrentCatalog().getEditLog().logCreateTable(info);  
       ...
        
    }
    

    元数据回放

    元数据回放发生在FE leader 给 其他FE节点同步的时候

    image

    逐一回放元数据
    在内存中复原元数据
    org.apache.doris.catalog.Catalog#replayCreateTable

    public void replayCreateTable(String dbName, Table table) {
        Database db = this.fullNameToDb.get(dbName);
        db.createTableWithLock(table, true, false);
        ...
    }
    

    如何实现一个新的语句

    fe/fe-core/src/main/cup/sql_parser.cup 语法文件

    KW_CREATE opt_external:isExternal KW_TABLE opt_if_not_exists:ifNotExists table_name:name
            LPAREN column_definition_list:columns COMMA index_definition_list:indexes RPAREN opt_engine:engineName
            opt_keys:keys
            opt_comment:tableComment
            opt_partition:partition
            opt_distribution:distribution
            opt_rollup:index
            opt_properties:tblProperties
            opt_ext_properties:extProperties
    {:
        RESULT = new CreateTableStmt(ifNotExists, isExternal, name, columns, indexes, engineName, keys, partition,
        distribution, tblProperties, extProperties, tableComment, index);
    :}
    

    fe/fe-core/src/main/jflex/sql_scanner.flex 词法文件

    keywordMap.put("create", new Integer(SqlParserSymbols.KW_CREATE));
    keywordMap.put("cross", new Integer(SqlParserSymbols.KW_CROSS));
    keywordMap.put("cube", new Integer(SqlParserSymbols.KW_CUBE));
    keywordMap.put("current", new Integer(SqlParserSymbols.KW_CURRENT));
    keywordMap.put("current_user", new Integer(SqlParserSymbols.KW_CURRENT_USER));
    keywordMap.put("data", new Integer(SqlParserSymbols.KW_DATA));
    keywordMap.put("database", new Integer(SqlParserSymbols.KW_DATABASE));
    

    词法语法的代码生成:

    cd fe/ && mvn clean install –DskipTests
    • SqlScanner.java
    • SqlParser.java
    • SqlParserSymbols.java
    
    

    实现新语句步骤总结:

    1. 定义词法语法文件
    2. 实现对应的语句类,比如CreateTableStmt
    3. 实现元数据修改的方法,如Catalog.createTable()
    4. 定义对应操作的元数据日志类,如CreateTableInfo
    5. 实现元数据日志的写入
    6. 实现对应的replay方法,如Catalog.replayCreateTable()
    本文作者: chaplinthink, 关注领域:大数据、基础架构、系统设计, 一个热爱学习、分享的大数据工程师
  • 相关阅读:
    JSON解析
    [Git的常见使用方式]
    [二进制]原码,反码,补码
    [伪] 级联菜单,两级菜单
    [心得]Java开发中的小心得
    [纪录]仿IOS滚轮效果(竖直滑动选择器)
    web测试总结
    科目组2
    科目组
    凭证:条目视图,总账视图
  • 原文地址:https://www.cnblogs.com/bigdata1024/p/15574434.html
Copyright © 2011-2022 走看看