基本就是一个套路,开发可以参考官方的driver
需要提供的几个接口实现
- BaseQuery
- BaseDriver
接口实现说明
- BaseDriver
官方的说法是需要query
以及testConnection
同时release 也是比较重要的
同时推荐基于generic-pool 进行连接池的处理(当然对于已经支持链接池的客户端可以不用处理)
默认包含的方法
- BaseQuery
这个还是比较重要的,关于cube 的sql 转换都会用到
包含的方法很多截图了部分 - 一个约定
如果我们的参考了{dbType}-cubejs-driver
格式命名我们的npm包,就可以自动注册了(基于npm 路径查找)
同时如果需要使用,需要配置方言,同时我们的cube.js server 启动也是需要配置的
参考配置
驱动添加方言
class MyDriver extends BaseDriver {
static dialectClass() {
return MyQuery;
}
}
cube.js 配置
module.exports = {
devServer: true,
dbType: ({ dataSource } = {}) => {
return 'dalong';
},
// 自己的方方言的定义
dialectFactory: (dataSource) => {
return MyQuery
},
一个参考实现
基于mysql 的实现
- pacakge.json
{
"dependencies": {
"@cubejs-backend/query-orchestrator": "^0.26.2",
"@cubejs-backend/schema-compiler": "^0.26.0",
"moment-timezone": "^0.5.32",
"mysql": "^2.18.1"
},
"name": "dalong-cubejs-driver",
"version": "1.0.0",
"public": true,
"main": "index.js",
"author": "rong <1141591465@qq.com>",
"license": "MIT"
}
- MyDriver
const { BaseDriver } = require("@cubejs-backend/query-orchestrator")
const genericPool = require('generic-pool');
const { promisify } = require('util');
const mysql = require('mysql');
const MyQuery = require("./MyQuery");
class MyDriver extends BaseDriver {
constructor(config) {
super();
const { pool,
this.config = {
host: process.env.CUBEJS_DB_HOST,
database: process.env.CUBEJS_DB_NAME,
port: process.env.CUBEJS_DB_PORT,
user: process.env.CUBEJS_DB_USER,
password: process.env.CUBEJS_DB_PASS,
socketPath: process.env.CUBEJS_DB_SOCKET_PATH,
timezone: 'Z',
ssl: this.getSslOptions(),
dateStrings: true,
};
this.pool = genericPool.createPool({
create: async () => {
const conn = mysql.createConnection(this.config);
const connect = promisify(conn.connect.bind(conn));
if (conn.on) {
conn.on('error', () => {
conn.destroy();
});
}
conn.execute = promisify(conn.query.bind(conn));
await connect();
return conn;
},
validate: async (connection) => {
try {
await connection.execute('SELECT 1');
} catch (e) {
this.databasePoolError(e);
return false;
}
return true;
},
destroy: (connection) => promisify(connection.end.bind(connection))(),
}, {
min: 0,
max: process.env.CUBEJS_DB_MAX_POOL && parseInt(process.env.CUBEJS_DB_MAX_POOL, 10) || 8,
evictionRunIntervalMillis: 10000,
softIdleTimeoutMillis: 30000,
idleTimeoutMillis: 30000,
testOnBorrow: true,
acquireTimeoutMillis: 20000,
});
}
// 这个比较重要
static dialectClass() {
return MyQuery;
}
withConnection(fn) {
const self = this;
const connectionPromise = this.pool.acquire();
let cancelled = false;
const cancelObj = {};
const promise = connectionPromise.then(async conn => {
const [{ connectionId }] = await conn.execute('select connection_id() as connectionId');
cancelObj.cancel = async () => {
cancelled = true;
await self.withConnection(async processConnection => {
await processConnection.execute(`KILL ${connectionId}`);
});
};
return fn(conn)
.then(res => this.pool.release(conn).then(() => {
if (cancelled) {
throw new Error('Query cancelled');
}
return res;
}))
.catch((err) => this.pool.release(conn).then(() => {
if (cancelled) {
throw new Error('Query cancelled');
}
throw err;
}));
});
promise.cancel = () => cancelObj.cancel();
return promise;
}
async testConnection() {
const conn = await this.pool._factory.create();
try {
return await conn.execute('SELECT 1');
} finally {
await this.pool._factory.destroy(conn);
}
}
query(query, values) {
return this.withConnection(db => this.setTimeZone(db)
.then(() => db.execute(query, values))
.then(res => res));
}
setTimeZone(db) {
return db.execute(`SET time_zone = '${this.config.storeTimezone || '+00:00'}'`, []);
}
}
module.exports = MyDriver
MyQuery
const moment = require('moment-timezone');
const { BaseFilter, BaseQuery } = require('@cubejs-backend/schema-compiler');
const GRANULARITY_TO_INTERVAL = {
day: (date) => `DATE_FORMAT(${date}, '%Y-%m-%dT00:00:00.000')`,
week: (date) => `DATE_FORMAT(date_add('1900-01-01', interval TIMESTAMPDIFF(WEEK, '1900-01-01', ${date}) WEEK), '%Y-%m-%dT00:00:00.000')`,
hour: (date) => `DATE_FORMAT(${date}, '%Y-%m-%dT%H:00:00.000')`,
minute: (date) => `DATE_FORMAT(${date}, '%Y-%m-%dT%H:%i:00.000')`,
second: (date) => `DATE_FORMAT(${date}, '%Y-%m-%dT%H:%i:%S.000')`,
month: (date) => `DATE_FORMAT(${date}, '%Y-%m-01T00:00:00.000')`,
year: (date) => `DATE_FORMAT(${date}, '%Y-01-01T00:00:00.000')`
};
class MysqlFilter extends BaseFilter {
likeIgnoreCase(column, not, param) {
return `${column}${not ? ' NOT' : ''} LIKE CONCAT('%', ${this.allocateParam(param)}, '%')`;
}
}
class MysqlQuery extends BaseQuery {
newFilter(filter) {
return new MysqlFilter(this, filter);
}
convertTz(field) {
return `CONVERT_TZ(${field}, @@session.time_zone, '${moment().tz(this.timezone).format('Z')}')`;
}
timeStampCast(value) {
return `TIMESTAMP(convert_tz(${value}, '+00:00', @@session.time_zone))`;
}
inDbTimeZone(date) {
return this.inIntegrationTimeZone(date).clone().utc().format(moment.HTML5_FMT.DATETIME_LOCAL_MS);
}
dateTimeCast(value) {
return `TIMESTAMP(${value})`;
}
subtractInterval(date, interval) {
return `DATE_SUB(${date}, INTERVAL ${interval})`;
}
addInterval(date, interval) {
return `DATE_ADD(${date}, INTERVAL ${interval})`;
}
timeGroupedColumn(granularity, dimension) {
return `CAST(${GRANULARITY_TO_INTERVAL[granularity](dimension)} AS DATETIME)`;
}
escapeColumnName(name) {
return `\`${name}\``;
}
seriesSql(timeDimension) {
const values = timeDimension.timeSeries().map(
([from, to]) => `select '${from}' f, '${to}' t`
).join(' UNION ALL ');
return `SELECT TIMESTAMP(dates.f) date_from, TIMESTAMP(dates.t) date_to FROM (${values}) AS dates`;
}
concatStringsSql(strings) {
return `CONCAT(${strings.join(', ')})`;
}
wrapSegmentForDimensionSelect(sql) {
return `IF(${sql}, 1, 0)`;
}
preAggregationTableName(cube, preAggregationName, skipSchema) {
const name = super.preAggregationTableName(cube, preAggregationName, skipSchema);
if (name.length > 64) {
throw new UserError(`MySQL can not work with table names that longer than 64 symbols. Consider using the 'sqlAlias' attribute in your cube and in your pre-aggregation definition for ${name}.`);
}
return name;
}
}
module.exports = MysqlQuery
- 代码集成使用
配置cube.js
// Cube.js configuration options: https://cube.dev/docs/config
const {MyDriver,MyQuery} = require("dalong-cubejs-driver");
module.exports = {
devServer: true,
dbType: ({ dataSource } = {}) => {
return 'dalong';
},
dialectFactory: (dataSource) => {
return MyQuery
},
scheduledRefreshTimer: true,
telemetry: false,
apiSecret: "8ddfaa8939c2edc6fda42c7b4c4e5ee68ca0c8e39189a10ab1919f71de1afeecea1c0bd5efea5a51bb0d85498ada0fe2ffe8e8467580a27ef5eb44c6414c5065",
driverFactory: ({ dataSource } = {}) => {
return new MyDriver({
user:"root",
database: "gitbase",
port: 3306,
host: "127.0.0.1",
readOnly: true
});
}
};
说明
以上是一个简单的使用,核心还是参考了mysql的,同时遵循了cube.js driver 的一些规范,可以方便的进行扩展使用集成使用
对于具体自己开发的driver 需要配置方言的原因可以查看代码,同时我们也会发现官方文档对于如何进行driver的开发并不是很
清楚,文档还是需要完善的
packages/cubejs-schema-compiler/src/adapter/QueryBuilder.js
export const createQuery = (compilers, dbType, queryOptions) => {
if (!queryOptions.dialectClass && !ADAPTERS[dbType]) {
return null;
}
let externalQueryClass = queryOptions.externalDialectClass;
if (!externalQueryClass && queryOptions.externalDbType) {
if (!ADAPTERS[queryOptions.externalDbType]) {
throw new Error(`Dialect for '${queryOptions.externalDbType}' is not found`);
}
externalQueryClass = ADAPTERS[queryOptions.externalDbType];
}
return new (queryOptions.dialectClass || ADAPTERS[dbType])(compilers, {
externalQueryClass
});
};
参考资料
https://github.com/cube-js/cube.js/blob/master/CONTRIBUTING.md#implementing-sql-dialect