zoukankan      html  css  js  c++  java
  • cube.js driver&&query&& cube schema 关系&&简单说明

    从术语上应该是没有直接关系的,但是实际运行的时候彼此是有依赖的
    driver 主要进行 sql 的查询处理,同时进行连接管理的处理,query 进行
    sql 生成的处理(创建实际需要的sql 查询),cube schema 主要是定义
    cube 的规则,query 实际上也是对于schema 抽象语言的sql 处理提供工具
    如果阅读了api gateway 的处理机制,会发现cube.js 对于load 的处理流程如下

    api gateway load 处理机制

    • 解析请求(RequestContext)
    • 基于compilerer(类似基于单例模式) 获取cube schema 的元数据定义生成查询sql (需要自定义驱动query的支持)
    • 基于OrchestratorApi 进行sql 的查询执行(需要driver的支持)

    代码说明

    • load
     
    public async load({ query, context, res, ...props }: any) {
        const requestStarted = new Date();
        try {
          query = this.parseQueryParam(query);
          this.log({
            type: 'Load Request',
            query
          }, context);
          const [queryType, normalizedQueries] = await this.getNormalizedQueries(query, context);
          // 此处主要利用了compiler api 提供的能力进行cube schema 的编译处理,同时生成sql 查询(包含占位符的,后边的具体执行
    需要依赖查询的条件)
          const [metaConfigResult, ...sqlQueries] = await Promise.all(
            [
              this.getCompilerApi(context).metaConfig({ requestId: context.requestId })
            ].concat(normalizedQueries.map(
              async (normalizedQuery, index) => {
                const loadRequestSQLStarted = new Date();
                const sqlQuery = await this.getCompilerApi(context).getSql(
                  this.coerceForSqlQuery(normalizedQuery, context)
                );
                this.log({
                  type: 'Load Request SQL',
                  duration: this.duration(loadRequestSQLStarted),
                  query: normalizedQueries[index],
                  sqlQuery
                }, context);
                return sqlQuery;
              }
            ))
          );
          let slowQuery = false;
          const results = await Promise.all(normalizedQueries.map(async (normalizedQuery, index) => {
            const sqlQuery = sqlQueries[index];
            const annotation = prepareAnnotation(metaConfigResult, normalizedQuery);
            const aliasToMemberNameMap = sqlQuery.aliasNameToMember;
            const toExecute = {
              ...sqlQuery,
              query: sqlQuery.sql[0],
              values: sqlQuery.sql[1],
              continueWait: true,
              renewQuery: normalizedQuery.renewQuery,
              requestId: context.requestId
            };
           // 此处的adapterApi 实际上就是OrchestratorApi 核心是进行sql 的查询处理
            const response = await this.getAdapterApi(context).executeQuery(toExecute);
            const flattenAnnotation = {
              ...annotation.measures,
              ...annotation.dimensions,
              ...annotation.timeDimensions
            };
            slowQuery = slowQuery || Boolean(response.slowQuery);
            return {
              query: normalizedQuery,
              data: transformData(
                aliasToMemberNameMap,
                flattenAnnotation,
                response.data,
                normalizedQuery,
                queryType
              ),
              lastRefreshTime: response.lastRefreshTime && response.lastRefreshTime.toISOString(),
              ...(process.env.NODE_ENV === 'production' ? undefined : {
                refreshKeyValues: response.refreshKeyValues,
                usedPreAggregations: response.usedPreAggregations
              }),
              annotation,
              slowQuery: Boolean(response.slowQuery)
            };
          }));
          this.log({
            type: 'Load Request Success',
            query,
            duration: this.duration(requestStarted)
          }, context);
          if (queryType !== QUERY_TYPE.REGULAR_QUERY && props.queryType == null) {
            throw new UserError(`'${queryType}' query type is not supported by the client. Please update the client.`);
          }
          if (props.queryType === 'multi') {
            res({
              queryType,
              results,
              pivotQuery: getPivotQuery(queryType, normalizedQueries),
              slowQuery
            });
          } else {
            res(results[0]);
          }
        } catch (e) {
          this.handleError({
            e, context, query, res, requestStarted
          });
        }
      }
    • getSQL 的方法定义
      具体操作packages/cubejs-server-core/src/core/CompilerApi.js
      使用了包装的compiler
     
      async getSql(query, options) {
        options = options || {};
        const { includeDebugInfo } = options;
        const dbType = this.getDbType();
        const compilers = await this.getCompilers({ requestId: query.requestId });
        let sqlGenerator = this.createQueryByDataSource(compilers, query);
        if (!sqlGenerator) {
          throw new Error(`Unknown dbType: ${dbType}`);
        }
        const dataSource = compilers.compiler.withQuery(sqlGenerator, () => sqlGenerator.dataSource);
        if (dataSource !== 'default' && dbType !== this.getDbType(dataSource)) {
          // TODO consider more efficient way than instantiating query
          sqlGenerator = this.createQueryByDataSource(
            compilers,
            query,
            dataSource
          );
          if (!sqlGenerator) {
            throw new Error(`Can't find dialect for '${dataSource}' data source: ${this.getDbType(dataSource)}`);
          }
        }
        //  packages/cubejs-schema-compiler/src/compiler/DataSchemaCompiler.js 方法定义
        return compilers.compiler.withQuery(sqlGenerator, () => ({
          external: sqlGenerator.externalPreAggregationQuery(),
          sql: sqlGenerator.buildSqlAndParams(),
          timeDimensionAlias: sqlGenerator.timeDimensions[0] && sqlGenerator.timeDimensions[0].unescapedAliasName(),
          timeDimensionField: sqlGenerator.timeDimensions[0] && sqlGenerator.timeDimensions[0].dimension,
          order: sqlGenerator.order,
          cacheKeyQueries: sqlGenerator.cacheKeyQueries(),
          preAggregations: sqlGenerator.preAggregations.preAggregationsDescription(),
          dataSource: sqlGenerator.dataSource,
          aliasNameToMember: sqlGenerator.aliasNameToMember,
          rollupMatchResults: includeDebugInfo ?
            sqlGenerator.preAggregations.rollupMatchResultDescriptions() : undefined,
          canUseTransformedQuery: sqlGenerator.preAggregations.canUseTransformedQuery()
        }));
      }
    • adapterApi 的初始化
      主要是api gateway 依赖的,基于缓存进行存储,由server-core 进行创建
     
      protected apiGateway() {
        if (!this.apiGatewayInstance) {
          this.apiGatewayInstance = new ApiGateway(
            this.options.apiSecret,
            this.getCompilerApi.bind(this),
            this.getOrchestratorApi.bind(this),
            this.logger, {
              standalone: this.standalone,
              dataSourceStorage: this.orchestratorStorage,
              basePath: this.options.basePath,
              checkAuthMiddleware: this.options.checkAuthMiddleware,
              checkAuth: this.options.checkAuth,
              queryTransformer: this.options.queryTransformer,
              extendContext: this.options.extendContext,
              refreshScheduler: () => new RefreshScheduler(this),
            }
          );
        }
        return this.apiGatewayInstance;
      }

    getOrchestratorApi 的处理 (主要是基于配置定义driver 实例,同时cache方便复用)

      public getOrchestratorApi(context: DriverContext): OrchestratorApi {
        const orchestratorId = this.contextToOrchestratorId(context);
        if (this.orchestratorStorage.has(orchestratorId)) {
          return this.orchestratorStorage.get(orchestratorId);
        }
        const driverPromise = {};
        let externalPreAggregationsDriverPromise;
        const orchestratorApi = this.createOrchestratorApi({
          getDriver: async (dataSource) => {
            if (!driverPromise[dataSource || 'default']) {
              orchestratorApi.addDataSeenSource(dataSource);
              const driver = await this.options.driverFactory({ ...context, dataSource });
              if (driver.setLogger) {
                driver.setLogger(this.logger);
              }
              driverPromise[dataSource || 'default'] = driver.testConnection().then(() => driver).catch(e => {
                driverPromise[dataSource || 'default'] = null;
                throw e;
              });
            }
            return driverPromise[dataSource || 'default'];
          },
          getExternalDriverFactory: this.options.externalDriverFactory && (async () => {
            if (!externalPreAggregationsDriverPromise) {
              const driver = await this.options.externalDriverFactory(context);
              if (driver.setLogger) {
                driver.setLogger(this.logger);
              }
              externalPreAggregationsDriverPromise = driver.testConnection().then(() => driver).catch(e => {
                externalPreAggregationsDriverPromise = null;
                throw e;
              });
            }
            return externalPreAggregationsDriverPromise;
          }),
          redisPrefix: orchestratorId,
          orchestratorOptions: this.orchestratorOptions(context)
        });
        this.orchestratorStorage.set(orchestratorId, orchestratorApi);
        return orchestratorApi;
      }
     

    说明

    cube.js 流程机制并不是很难,通过跟踪代码我们可以有整体的了解,后边会介绍下关于编译部分以及
    OrchestratorApi部分的说明,编译部分cube.js 的设计还是很巧妙的利用了babel 以及ast 处理,同时为了
    进行数据的隔离, 使用了node 的vm 实例进行编译处理

  • 相关阅读:
    TYVJ P1092 麻将
    BZOJ 1020——[SHOI2008]安全的航线flight
    JSOI2008 火星人prefix
    Silverlight下“DataGrid”和“Pdf”导出
    Jquery实现“Iframe”页面切换
    遍历表,执行存储过程的方法
    “JS”和“Aspx”之注册“JS脚本”、刷新页面、TreeNode下JS连接设置
    WeatherWebService
    WCF方法“异步调用”的“同步问题”
    “Silverlight”中获取“HTML元素和参数”及JS交互
  • 原文地址:https://www.cnblogs.com/rongfengliang/p/14375508.html
Copyright © 2011-2022 走看看