zoukankan      html  css  js  c++  java
  • ALINK(十):数据导入与导出 (三)Catalog读入 (CatalogSourceBatchOp)

    Java 类名:com.alibaba.alink.operator.batch.source.CatalogSourceBatchOp

    Python 类名:CatalogSourceBatchOp

    功能介绍

    Catalog描述了数据库的属性和数据库的位置, 支持Mysql, Derby, Sqlite, Hive.

    在使用时,需要先下载插件,详情请看https://www.yuque.com/pinshu/alink_guide/czg4cx

    定义分成三步:

    第一步,定义Catalog

    数据库

    Java 接口

    Derby

    DerbyCatalog(String catalogName, String defaultDatabase, String derbyVersion, String derbyPath)

    MySql

    MySqlCatalog(String catalogName, String defaultDatabase, String mysqlVersion,String mysqlUrl, String port, String userName, String password)

    Sqlite

    SqliteCatalog(String catalogName, String defaultDatabase, String sqliteVersion, String dbUrl)

    Hive

    HiveCatalog(String catalogName, String defaultDatabase, String hiveVersion, String hiveConfDir)

    HiveCatalog(String catalogName, String defaultDatabase, String hiveVersion, FilePath hiveConfDir)

    HiveCatalog(String catalogName, String defaultDatabase, String hiveVersion, String hiveConfDir,String kerberosPrincipal, String kerberosKeytab)

    示例:
        derby = DerbyCatalog("derby_test_catalog", DERBY_SCHEMA, "10.6.1.0", derbyFolder+'/'+DERBY_DB)
    各插件提供的版本:
        Hive:2.3.4
        MySQL: 5.1.27
        Derby: 10.6.1.0
        SQLite: 3.19.3
        odps: 0.36.4-public

    第二步, 定义CatalogObject

    dbName = "sqlite_db"
    tableName = "table"
    # 第一个参数是Catalog, 第二个参数是DB/Project
    catalogObject = CatalogObject(derby, ObjectPath(dbName, tableName))

    第三步,定义Source和Sink

    参数说明

    名称

    中文名称

    描述

    类型

    是否必须?

    默认值

    catalogObject

    catalog object

    catalog object

    String

     

    代码示例

    以下代码仅用于示意,可能需要修改部分代码或者配置环境后才能正常运行!

    Python 代码

    Derby

    python

    derbyFolder = "*"
    DERBY_SCHEMA = "derby_schema"
    DERBY_DB = "derby_db"
    derby = DerbyCatalog("derby_test_catalog", DERBY_SCHEMA, "10.6.1.0", derbyFolder+'/'+DERBY_DB)
    catalogObject = CatalogObject(derby, ObjectPath("test_catalog_source_sink", "test_catalog_source_sink3"))
    catalogSinkBatchOp = CatalogSinkBatchOp()
        .setCatalogObject(catalogObject)
         
    source.link(catalogSinkBatchOp)
    BatchOperator.execute()
    catalogSourceBatchOp = CatalogSourceBatchOp()
        .setCatalogObject(catalogObject)
    catalogSourceBatchOp.print()

    Java 代码

    String derbyFolder = "*";
    String DERBY_SCHEMA = "derby_schema";
    String DERBY_DB = "derby_db";
    DerbyCatalog derby = new DerbyCatalog("derby_test_catalog", DERBY_SCHEMA, "10.6.1.0",
      derbyFolder + '/' + DERBY_DB);
    CatalogObject catalogObject = new CatalogObject(derby,
      new ObjectPath("test_catalog_source_sink", "test_catalog_source_sink3"));
    catalogSinkBatchOp catalogSinkStreamOp = new catalogSinkBatchOp()
      .setCatalogObject(catalogObject);
    source.link(catalogSinkStreamOp);
    StreamOperator.execute();
    CatalogSourceBatchOp catalogSourceStreamOp = new CatalogSourceBatchOp()
      .setCatalogObject(catalogObject);
    catalogSourceStreamOp.print();
    StreamOperator.execute();

    Sqlite

    Python 代码

    sqliteFolder = "*"
    SQLITE_SCHEMA = "sqlite_schema"
    SQLITE_DB = "sqlite_db"
    sqlite = SqliteCatalog("sqlite_test_catalog", None, "3.19.3",  [sqliteFolder+'/'+SQLITE_DB])
    catalogObject = CatalogObject(sqlite, ObjectPath(SQLITE_DB, "test_catalog_source_sink3"))
    catalogSinkBatchOp = CatalogSinkBatchOp()
        .setCatalogObject(catalogObject)
         
    source.link(catalogSinkBatchOp)
    BatchOperator.execute()
    catalogSourceBatchOp = CatalogSourceBatchOp()
        .setCatalogObject(catalogObject)
    catalogSourceBatchOp.print()

    Java代码

    String sqliteFolder = "*";
    String SQLITE_SCHEMA = "sqlite_schema";
    String SQLITE_DB = "sqlite_db";
    SqliteCatalog sqlite = new SqliteCatalog("sqlite_test_catalog", null, "3.19.3", sqliteFolder + '/' +
      SQLITE_DB);
    CatalogObject catalogObject = new CatalogObject(sqlite, new ObjectPath(SQLITE_DB,
      "test_catalog_source_sink3"));
    CatalogSinkBatchOp catalogSinkStreamOp = CatalogSinkBatchOp()
      .setCatalogObject(catalogObject);
    source.link(catalogSinkBatchOp);
    StreamOperator.execute();
    CatalogSourceBatchOp catalogSourceStreamOp = new CatalogSourceBatchOp()
      .setCatalogObject(catalogObject);
    catalogSourceStreamOp.print();
    StreamOperator.execute();
  • 相关阅读:
    .net core 大文件分片上传
    Python 运算符
    CF1398G Running Competition FFT
    Luogu「StOI-2」简单的树 树链剖分+线段树+倍增
    LOJ#3145. 「APIO2019」桥梁 分块+可撤销并查集
    【UNR #4】序列妙妙值 分块+DP
    LuoguP5008 [yLOI2018] 锦鲤抄 tarjan+贪心
    windows提权
    基于 Laravel 框架的内容管理系统
    趣谈、浅析CRLF和LF
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/14887400.html
Copyright © 2011-2022 走看看