zoukankan      html  css  js  c++  java
  • cube.js driver&&query&& cube schema 关系&&简单说明

    从术语上应该是没有直接关系的,但是实际运行的时候彼此是有依赖的
    driver 主要进行 sql 的查询处理,同时进行连接管理的处理,query 进行
    sql 生成的处理(创建实际需要的sql 查询),cube schema 主要是定义
    cube 的规则,query 实际上也是对于schema 抽象语言的sql 处理提供工具
    如果阅读了api gateway 的处理机制,会发现cube.js 对于load 的处理流程如下

    api gateway load 处理机制

    • 解析请求(RequestContext)
    • 基于compilerer(类似基于单例模式) 获取cube schema 的元数据定义生成查询sql (需要自定义驱动query的支持)
    • 基于OrchestratorApi 进行sql 的查询执行(需要driver的支持)

    代码说明

    • load
     
    public async load({ query, context, res, ...props }: any) {
        const requestStarted = new Date();
        try {
          query = this.parseQueryParam(query);
          this.log({
            type: 'Load Request',
            query
          }, context);
          const [queryType, normalizedQueries] = await this.getNormalizedQueries(query, context);
          // 此处主要利用了compiler api 提供的能力进行cube schema 的编译处理,同时生成sql 查询(包含占位符的,后边的具体执行
    需要依赖查询的条件)
          const [metaConfigResult, ...sqlQueries] = await Promise.all(
            [
              this.getCompilerApi(context).metaConfig({ requestId: context.requestId })
            ].concat(normalizedQueries.map(
              async (normalizedQuery, index) => {
                const loadRequestSQLStarted = new Date();
                const sqlQuery = await this.getCompilerApi(context).getSql(
                  this.coerceForSqlQuery(normalizedQuery, context)
                );
                this.log({
                  type: 'Load Request SQL',
                  duration: this.duration(loadRequestSQLStarted),
                  query: normalizedQueries[index],
                  sqlQuery
                }, context);
                return sqlQuery;
              }
            ))
          );
          let slowQuery = false;
          const results = await Promise.all(normalizedQueries.map(async (normalizedQuery, index) => {
            const sqlQuery = sqlQueries[index];
            const annotation = prepareAnnotation(metaConfigResult, normalizedQuery);
            const aliasToMemberNameMap = sqlQuery.aliasNameToMember;
            const toExecute = {
              ...sqlQuery,
              query: sqlQuery.sql[0],
              values: sqlQuery.sql[1],
              continueWait: true,
              renewQuery: normalizedQuery.renewQuery,
              requestId: context.requestId
            };
           // 此处的adapterApi 实际上就是OrchestratorApi 核心是进行sql 的查询处理
            const response = await this.getAdapterApi(context).executeQuery(toExecute);
            const flattenAnnotation = {
              ...annotation.measures,
              ...annotation.dimensions,
              ...annotation.timeDimensions
            };
            slowQuery = slowQuery || Boolean(response.slowQuery);
            return {
              query: normalizedQuery,
              data: transformData(
                aliasToMemberNameMap,
                flattenAnnotation,
                response.data,
                normalizedQuery,
                queryType
              ),
              lastRefreshTime: response.lastRefreshTime && response.lastRefreshTime.toISOString(),
              ...(process.env.NODE_ENV === 'production' ? undefined : {
                refreshKeyValues: response.refreshKeyValues,
                usedPreAggregations: response.usedPreAggregations
              }),
              annotation,
              slowQuery: Boolean(response.slowQuery)
            };
          }));
          this.log({
            type: 'Load Request Success',
            query,
            duration: this.duration(requestStarted)
          }, context);
          if (queryType !== QUERY_TYPE.REGULAR_QUERY && props.queryType == null) {
            throw new UserError(`'${queryType}' query type is not supported by the client. Please update the client.`);
          }
          if (props.queryType === 'multi') {
            res({
              queryType,
              results,
              pivotQuery: getPivotQuery(queryType, normalizedQueries),
              slowQuery
            });
          } else {
            res(results[0]);
          }
        } catch (e) {
          this.handleError({
            e, context, query, res, requestStarted
          });
        }
      }
    • getSQL 的方法定义
      具体操作packages/cubejs-server-core/src/core/CompilerApi.js
      使用了包装的compiler
     
      async getSql(query, options) {
        options = options || {};
        const { includeDebugInfo } = options;
        const dbType = this.getDbType();
        const compilers = await this.getCompilers({ requestId: query.requestId });
        let sqlGenerator = this.createQueryByDataSource(compilers, query);
        if (!sqlGenerator) {
          throw new Error(`Unknown dbType: ${dbType}`);
        }
        const dataSource = compilers.compiler.withQuery(sqlGenerator, () => sqlGenerator.dataSource);
        if (dataSource !== 'default' && dbType !== this.getDbType(dataSource)) {
          // TODO consider more efficient way than instantiating query
          sqlGenerator = this.createQueryByDataSource(
            compilers,
            query,
            dataSource
          );
          if (!sqlGenerator) {
            throw new Error(`Can't find dialect for '${dataSource}' data source: ${this.getDbType(dataSource)}`);
          }
        }
        //  packages/cubejs-schema-compiler/src/compiler/DataSchemaCompiler.js 方法定义
        return compilers.compiler.withQuery(sqlGenerator, () => ({
          external: sqlGenerator.externalPreAggregationQuery(),
          sql: sqlGenerator.buildSqlAndParams(),
          timeDimensionAlias: sqlGenerator.timeDimensions[0] && sqlGenerator.timeDimensions[0].unescapedAliasName(),
          timeDimensionField: sqlGenerator.timeDimensions[0] && sqlGenerator.timeDimensions[0].dimension,
          order: sqlGenerator.order,
          cacheKeyQueries: sqlGenerator.cacheKeyQueries(),
          preAggregations: sqlGenerator.preAggregations.preAggregationsDescription(),
          dataSource: sqlGenerator.dataSource,
          aliasNameToMember: sqlGenerator.aliasNameToMember,
          rollupMatchResults: includeDebugInfo ?
            sqlGenerator.preAggregations.rollupMatchResultDescriptions() : undefined,
          canUseTransformedQuery: sqlGenerator.preAggregations.canUseTransformedQuery()
        }));
      }
    • adapterApi 的初始化
      主要是api gateway 依赖的,基于缓存进行存储,由server-core 进行创建
     
      protected apiGateway() {
        if (!this.apiGatewayInstance) {
          this.apiGatewayInstance = new ApiGateway(
            this.options.apiSecret,
            this.getCompilerApi.bind(this),
            this.getOrchestratorApi.bind(this),
            this.logger, {
              standalone: this.standalone,
              dataSourceStorage: this.orchestratorStorage,
              basePath: this.options.basePath,
              checkAuthMiddleware: this.options.checkAuthMiddleware,
              checkAuth: this.options.checkAuth,
              queryTransformer: this.options.queryTransformer,
              extendContext: this.options.extendContext,
              refreshScheduler: () => new RefreshScheduler(this),
            }
          );
        }
        return this.apiGatewayInstance;
      }

    getOrchestratorApi 的处理 (主要是基于配置定义driver 实例,同时cache方便复用)

      public getOrchestratorApi(context: DriverContext): OrchestratorApi {
        const orchestratorId = this.contextToOrchestratorId(context);
        if (this.orchestratorStorage.has(orchestratorId)) {
          return this.orchestratorStorage.get(orchestratorId);
        }
        const driverPromise = {};
        let externalPreAggregationsDriverPromise;
        const orchestratorApi = this.createOrchestratorApi({
          getDriver: async (dataSource) => {
            if (!driverPromise[dataSource || 'default']) {
              orchestratorApi.addDataSeenSource(dataSource);
              const driver = await this.options.driverFactory({ ...context, dataSource });
              if (driver.setLogger) {
                driver.setLogger(this.logger);
              }
              driverPromise[dataSource || 'default'] = driver.testConnection().then(() => driver).catch(e => {
                driverPromise[dataSource || 'default'] = null;
                throw e;
              });
            }
            return driverPromise[dataSource || 'default'];
          },
          getExternalDriverFactory: this.options.externalDriverFactory && (async () => {
            if (!externalPreAggregationsDriverPromise) {
              const driver = await this.options.externalDriverFactory(context);
              if (driver.setLogger) {
                driver.setLogger(this.logger);
              }
              externalPreAggregationsDriverPromise = driver.testConnection().then(() => driver).catch(e => {
                externalPreAggregationsDriverPromise = null;
                throw e;
              });
            }
            return externalPreAggregationsDriverPromise;
          }),
          redisPrefix: orchestratorId,
          orchestratorOptions: this.orchestratorOptions(context)
        });
        this.orchestratorStorage.set(orchestratorId, orchestratorApi);
        return orchestratorApi;
      }
     

    说明

    cube.js 流程机制并不是很难,通过跟踪代码我们可以有整体的了解,后边会介绍下关于编译部分以及
    OrchestratorApi部分的说明,编译部分cube.js 的设计还是很巧妙的利用了babel 以及ast 处理,同时为了
    进行数据的隔离, 使用了node 的vm 实例进行编译处理

  • 相关阅读:
    Codeforces 1265A Beautiful String
    1039 Course List for Student (25)
    1038 Recover the Smallest Number (30)
    1037 Magic Coupon (25)
    1024 Palindromic Number (25)
    1051 Pop Sequence (25)
    1019 General Palindromic Number (20)
    1031 Hello World for U (20)
    1012 The Best Rank (25)
    1011 World Cup Betting (20)
  • 原文地址:https://www.cnblogs.com/rongfengliang/p/14375508.html
Copyright © 2011-2022 走看看