zoukankan      html  css  js  c++  java
  • opendistro elasticsearch cube.js driver 开发说明

    driver 的开发参考了官方elasticsearch 驱动,但是因为默认sql plugin 对于sql 函数支持的问题,部分功能是不支持的(时间函数是一个问题)

    参考代码

    • 代码结构

    • driver 开发说明
      参考了官方的,同时集成了方言
     
      static dialectClass() {
        return OpendistroQuery;
      } 

    一些修改说明:
    因为opendistro elasticsearch 支持多种格式的,但是jdbc 比较方便(格式规范,方便集成使用)
    所以直接集成了jdbc 的,同时原有对于jdbc 的处理因为格式兼容问题,进行了一些调整

     
      async query(query, values) {
        try {
          const result = (await this.sqlClient.sql.query({ // TODO cursor
            format: this.config.queryFormat,
            body: {
              query: SqlString.format(query, values)
            }
          })).body;
     
          // INFO: cloud left in place for backward compatibility should use jdbc
          if (this.config.cloud || ['jdbc'].includes(this.config.queryFormat)) {
            const compiled = result.datarows.map(
              r =>  {
                // fix with alias should use jdbc 
                return result.schema.reduce((prev, cur, idx) => ({ ...prev, [cur.alias]: r[idx] }), {})
              }
            );
            return compiled;
          }
          return result && result.aggregations && this.traverseAggregations(result.aggregations);
        } catch (e) {
          if (e.body) {
            throw new Error(JSON.stringify(e.body, null, 2));
          }
     
          throw e;
        }
      }

    使用

    • opendistro elasticsearch 版本要去
      推荐使用新版本的(1.13以及以上)
     
    version: "3"
    services: 
      elasticsearch:
        image: amazon/opendistro-for-elasticsearch:1.13.1
        ports: 
        - "9200:9200"
        environment:
          - "discovery.type=single-node"
          - "http.host=0.0.0.0"
          - "opendistro_security.ssl.http.enabled=false"
          - "cluster.name=odfe-cluster"
          - "transport.host=0.0.0.0"
          - "network.host=0.0.0.0"
          - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    • 写入数据
    const { Client } = require('@elastic/elasticsearch')
    const client = new Client({ node: 'http://admin:admin@localhost:9200' })
    async function run () {
      // Let's start by indexing some data
      await client.index({
        index: 'game_of_thrones_demo',
        // type: '_doc', // uncomment this line if you are using Elasticsearch ≤ 6
        body: {
          character: 'Ned Stark',
          quote: 'Winter is coming.',
          userinfo:"appv1"
        }
      })
      await client.index({
        index: 'game_of_thrones_demo',
        // type: '_doc', // uncomment this line if you are using Elasticsearch ≤ 6
        body: {
          character: 'Daenerys Targaryen',
          quote: 'I am the blood of the dragon.',
          userinfo:"appv2"
        }
      })
      await client.index({
        index: 'game_of_thrones_demo',
        // type: '_doc', // uncomment this line if you are using Elasticsearch ≤ 6
        body: {
          character: 'Tyrion Lannister',
          quote: 'A mind needs books like a sword needs a whetstone.',
          userinfo:"appv2"
        }
      })
      // here we are forcing an index refresh, otherwise we will not
      // get any result in the consequent search
      await client.indices.refresh({ index: 'game_of_thrones_demo' })
      // Let's search!
      const { body } = await client.search({
        index: 'game_of_thrones_demo',
        // type: '_doc', // uncomment this line if you are using Elasticsearch ≤ 6
        body: {
          query: {
            match: { quote: 'winter' }
          }
        }
      })
      console.log(body.hits.hits)
    }
    run().catch(console.log)
    • 查询
      需要创建一个cube.js 项目,很简单参考官方文档就可以了
     
    {
      "name": "demoapp",
      "version": "0.0.1",
      "private": true,
      "scripts": {
        "dev": "./node_modules/.bin/cubejs-server"
      },
      "template": "docker",
      "templateVersion": "0.26.72",
      "devDependencies": {
        "@cubejs-backend/server": "^0.26.72",
        "@dalongrong/opendistro-driver": "^1.0.0"
      }
    }

    .env 文件

    CUBEJS_DB_URL=http://admin:admin@localhost:9200
    CUBEJS_DB_ELASTIC_QUERY_FORMAT=jdbc
    CUBEJS_DB_ELASTIC_OPENDISTRO=true
    CUBEJS_DEV_MODE=true
    CUBEJS_SCHEDULED_REFRESH_TIMER=false
    CUBEJS_API_SECRET=17e2fe65b5427f1a5038566f96792f07f1a08b24aa2e2f8a53121ab3fee3f8e8d26495888262da6476d9141c5d04aca5770fabc82961110703b6f06daf7fdde3

    cube.js

    // Cube.js configuration options: https://cube.dev/docs/config
    const {OpendistroDriver,OpendistroQuery} = require("@dalongrong/opendistro-driver")
    module.exports = {
        dialectFactory: (dataSource) => {
            // need config  datasource  for multitenant env
            return OpendistroQuery
        },
        dbType: ({ dataSource } = {}) => {
            return "opendistro"
        },
        driverFactory: ({ dataSource } = {}) => {
            return new OpendistroDriver({})
        }
    }; 

    生成schema (注意因为默认main 的问题,需要处理下,具体可以参考源码)

    cube(`GameOfThronesDemo`, {
      sql: `SELECT * FROM game_of_thrones_demo`,
     
      joins: {
     
      },
      measures: {
        count: {
          type: `count`,
          drillMembers: []
        }
      },
     
      dimensions: {
        character: {
          sql: `character`,
          type: `string`
        },
     
        quote: {
          sql: `quote`,
          type: `string`
        },
     
        userinfo: {
          sql: `userinfo`,
          type: `string`
        }
      },
     
      dataSource: `default`
    });
    • 效果

    说明

    以上是基于官方的进行了一些修改,具体可以直接查看源码

    参考资料

    https://github.com/rongfengliang/cubejs-opendistro-driver#readme

  • 相关阅读:
    shell脚本基础->
    1->小规模集群架构规划
    推荐系统读书笔记(一)好的推荐系统
    数据挖掘概念与技术读书笔记(二)认识数据
    Linux编辑器vi使用方法详细介绍
    用户不在sudoers文件中的解决方法
    机器学习实战读书笔记(三)决策树
    机器学习实战读书笔记(二)k-近邻算法
    机器学习实战读书笔记(一)机器学习基础
    R语言实战读书笔记(十三)广义线性模型
  • 原文地址:https://www.cnblogs.com/rongfengliang/p/14604431.html
Copyright © 2011-2022 走看看