zoukankan      html  css  js  c++  java
  • cube.js 实现自定义驱动的方式

    基本就是一个套路,开发可以参考官方的driver

    需要提供的几个接口实现

    • BaseQuery
    • BaseDriver

    接口实现说明

    • BaseDriver
      官方的说法是需要query 以及testConnection 同时release 也是比较重要的
      同时推荐基于generic-pool 进行连接池的处理(当然对于已经支持链接池的客户端可以不用处理)
      默认包含的方法

       

    • BaseQuery
      这个还是比较重要的,关于cube 的sql 转换都会用到
      包含的方法很多截图了部分
    • 一个约定
      如果我们的参考了{dbType}-cubejs-driver 格式命名我们的npm包,就可以自动注册了(基于npm 路径查找)
      同时如果需要使用,需要配置方言,同时我们的cube.js server 启动也是需要配置的
      参考配置
     
       驱动添加方言
       class MyDriver extends BaseDriver { 
                static dialectClass() {
                   return MyQuery;
                }
        }
    cube.js 配置
    module.exports = {
        devServer: true,
        dbType: ({ dataSource } = {}) => {
            return 'dalong';
        },
        //  自己的方方言的定义
        dialectFactory: (dataSource) => {
            return MyQuery
        },

    一个参考实现

    基于mysql 的实现

    • pacakge.json
    {
      "dependencies": {
        "@cubejs-backend/query-orchestrator": "^0.26.2",
        "@cubejs-backend/schema-compiler": "^0.26.0",
        "moment-timezone": "^0.5.32",
        "mysql": "^2.18.1"
      },
      "name": "dalong-cubejs-driver",
      "version": "1.0.0",
      "public": true,
      "main": "index.js",
      "author": "rong <1141591465@qq.com>",
      "license": "MIT"
    }
    • MyDriver
    const { BaseDriver } = require("@cubejs-backend/query-orchestrator")
    const genericPool = require('generic-pool');
    const { promisify } = require('util');
    const mysql = require('mysql');
    const MyQuery = require("./MyQuery");
    class MyDriver extends BaseDriver {
        constructor(config) {
            super();
            const { pool, ...restConfig } = config || {};
            this.config = {
                host: process.env.CUBEJS_DB_HOST,
                database: process.env.CUBEJS_DB_NAME,
                port: process.env.CUBEJS_DB_PORT,
                user: process.env.CUBEJS_DB_USER,
                password: process.env.CUBEJS_DB_PASS,
                socketPath: process.env.CUBEJS_DB_SOCKET_PATH,
                timezone: 'Z',
                ssl: this.getSslOptions(),
                dateStrings: true,
                ...restConfig,
            };
            this.pool = genericPool.createPool({
                create: async () => {
                    const conn = mysql.createConnection(this.config);
                    const connect = promisify(conn.connect.bind(conn));
                    if (conn.on) {
                        conn.on('error', () => {
                            conn.destroy();
                        });
                    }
                    conn.execute = promisify(conn.query.bind(conn));
                    await connect();
                    return conn;
                },
                validate: async (connection) => {
                    try {
                        await connection.execute('SELECT 1');
                    } catch (e) {
                        this.databasePoolError(e);
                        return false;
                    }
                    return true;
                },
                destroy: (connection) => promisify(connection.end.bind(connection))(),
            }, {
                min: 0,
                max: process.env.CUBEJS_DB_MAX_POOL && parseInt(process.env.CUBEJS_DB_MAX_POOL, 10) || 8,
                evictionRunIntervalMillis: 10000,
                softIdleTimeoutMillis: 30000,
                idleTimeoutMillis: 30000,
                testOnBorrow: true,
                acquireTimeoutMillis: 20000,
                ...pool
            });
        }
       //  这个比较重要
        static dialectClass() {
            return MyQuery;
        }
        withConnection(fn) {
            const self = this;
            const connectionPromise = this.pool.acquire();
            let cancelled = false;
            const cancelObj = {};
            const promise = connectionPromise.then(async conn => {
                const [{ connectionId }] = await conn.execute('select connection_id() as connectionId');
                cancelObj.cancel = async () => {
                    cancelled = true;
                    await self.withConnection(async processConnection => {
                        await processConnection.execute(`KILL ${connectionId}`);
                    });
                };
                return fn(conn)
                    .then(res => this.pool.release(conn).then(() => {
                        if (cancelled) {
                            throw new Error('Query cancelled');
                        }
                        return res;
                    }))
                    .catch((err) => this.pool.release(conn).then(() => {
                        if (cancelled) {
                            throw new Error('Query cancelled');
                        }
                        throw err;
                    }));
            });
            promise.cancel = () => cancelObj.cancel();
            return promise;
        }
        async testConnection() {
            const conn = await this.pool._factory.create();
            try {
                return await conn.execute('SELECT 1');
            } finally {
                await this.pool._factory.destroy(conn);
            }
        }
        query(query, values) {
            return this.withConnection(db => this.setTimeZone(db)
                .then(() => db.execute(query, values))
                .then(res => res));
        }
        setTimeZone(db) {
            return db.execute(`SET time_zone = '${this.config.storeTimezone || '+00:00'}'`, []);
        }
    }
    module.exports = MyDriver
     

    MyQuery

    const moment = require('moment-timezone');
    const { BaseFilter, BaseQuery } = require('@cubejs-backend/schema-compiler');
    const GRANULARITY_TO_INTERVAL = {
        day: (date) => `DATE_FORMAT(${date}, '%Y-%m-%dT00:00:00.000')`,
        week: (date) => `DATE_FORMAT(date_add('1900-01-01', interval TIMESTAMPDIFF(WEEK, '1900-01-01', ${date}) WEEK), '%Y-%m-%dT00:00:00.000')`,
        hour: (date) => `DATE_FORMAT(${date}, '%Y-%m-%dT%H:00:00.000')`,
        minute: (date) => `DATE_FORMAT(${date}, '%Y-%m-%dT%H:%i:00.000')`,
        second: (date) => `DATE_FORMAT(${date}, '%Y-%m-%dT%H:%i:%S.000')`,
        month: (date) => `DATE_FORMAT(${date}, '%Y-%m-01T00:00:00.000')`,
        year: (date) => `DATE_FORMAT(${date}, '%Y-01-01T00:00:00.000')`
    };
    class MysqlFilter extends BaseFilter {
        likeIgnoreCase(column, not, param) {
            return `${column}${not ? ' NOT' : ''} LIKE CONCAT('%', ${this.allocateParam(param)}, '%')`;
        }
    }
    class MysqlQuery extends BaseQuery {
        newFilter(filter) {
            return new MysqlFilter(this, filter);
        }
        convertTz(field) {
            return `CONVERT_TZ(${field}, @@session.time_zone, '${moment().tz(this.timezone).format('Z')}')`;
        }
        timeStampCast(value) {
            return `TIMESTAMP(convert_tz(${value}, '+00:00', @@session.time_zone))`;
        }
        inDbTimeZone(date) {
            return this.inIntegrationTimeZone(date).clone().utc().format(moment.HTML5_FMT.DATETIME_LOCAL_MS);
        }
        dateTimeCast(value) {
            return `TIMESTAMP(${value})`;
        }
        subtractInterval(date, interval) {
            return `DATE_SUB(${date}, INTERVAL ${interval})`;
        }
        addInterval(date, interval) {
            return `DATE_ADD(${date}, INTERVAL ${interval})`;
        }
        timeGroupedColumn(granularity, dimension) {
            return `CAST(${GRANULARITY_TO_INTERVAL[granularity](dimension)} AS DATETIME)`;
        }
        escapeColumnName(name) {
            return `\`${name}\``;
        }
        seriesSql(timeDimension) {
            const values = timeDimension.timeSeries().map(
                ([from, to]) => `select '${from}' f, '${to}' t`
            ).join(' UNION ALL ');
            return `SELECT TIMESTAMP(dates.f) date_from, TIMESTAMP(dates.t) date_to FROM (${values}) AS dates`;
        }
        concatStringsSql(strings) {
            return `CONCAT(${strings.join(', ')})`;
        }
        wrapSegmentForDimensionSelect(sql) {
            return `IF(${sql}, 1, 0)`;
        }
        preAggregationTableName(cube, preAggregationName, skipSchema) {
            const name = super.preAggregationTableName(cube, preAggregationName, skipSchema);
            if (name.length > 64) {
                throw new UserError(`MySQL can not work with table names that longer than 64 symbols. Consider using the 'sqlAlias' attribute in your cube and in your pre-aggregation definition for ${name}.`);
            }
            return name;
        }
    }
    module.exports = MysqlQuery
    • 代码集成使用
      配置cube.js
     
    // Cube.js configuration options: https://cube.dev/docs/config
    const {MyDriver,MyQuery} = require("dalong-cubejs-driver");
    module.exports = {
        devServer: true,
        dbType: ({ dataSource } = {}) => {
            return 'dalong';
        },
        dialectFactory: (dataSource) => {
            return MyQuery
        },
        scheduledRefreshTimer: true,
        telemetry: false,
        apiSecret: "8ddfaa8939c2edc6fda42c7b4c4e5ee68ca0c8e39189a10ab1919f71de1afeecea1c0bd5efea5a51bb0d85498ada0fe2ffe8e8467580a27ef5eb44c6414c5065",
        driverFactory: ({ dataSource } = {}) => {
            return new MyDriver({
                user:"root",
                database: "gitbase",
                port: 3306,
                host: "127.0.0.1",
                readOnly: true
            });
        }
    };

    说明

    以上是一个简单的使用,核心还是参考了mysql的,同时遵循了cube.js driver 的一些规范,可以方便的进行扩展使用集成使用
    对于具体自己开发的driver 需要配置方言的原因可以查看代码,同时我们也会发现官方文档对于如何进行driver的开发并不是很
    清楚,文档还是需要完善的
    packages/cubejs-schema-compiler/src/adapter/QueryBuilder.js

     
    export const createQuery = (compilers, dbType, queryOptions) => {
      if (!queryOptions.dialectClass && !ADAPTERS[dbType]) {
        return null;
      }
      let externalQueryClass = queryOptions.externalDialectClass;
      if (!externalQueryClass && queryOptions.externalDbType) {
        if (!ADAPTERS[queryOptions.externalDbType]) {
          throw new Error(`Dialect for '${queryOptions.externalDbType}' is not found`);
        }
        externalQueryClass = ADAPTERS[queryOptions.externalDbType];
      }
      return new (queryOptions.dialectClass || ADAPTERS[dbType])(compilers, {
        ...queryOptions,
        externalQueryClass
      });
    };

    参考资料

    https://github.com/cube-js/cube.js/blob/master/CONTRIBUTING.md#implementing-sql-dialect

  • 相关阅读:
    Easyui datagrid 修改分页组件的分页提示信息为中文
    Easyui datagrid 实现表格记录拖拽
    Java:内部类
    算法导论:Trie字典树
    算法导论:找零钱问题
    lintcode:组成最大的数
    lintcode:验证二叉查找树
    lintcode:将二叉查找树转换成双链表
    lintcode:二叉树的路径和
    lintcode:字符串置换
  • 原文地址:https://www.cnblogs.com/rongfengliang/p/14369653.html
Copyright © 2011-2022 走看看