zoukankan      html  css  js  c++  java
  • 使用Data Lake Analytics从OSS清洗数据到AnalyticDB

    前提

    • 必须是同一阿里云region的Data Lake Analytics(DLA)到AnalyticDB的才能进行清洗操作;
    • 开通并初始化了该region的DLA服务;
    • 开通并购买了AnalyticDB的实例,实例规模和数据清洗速度强相关,与AnalyticDB的实例资源规模基本成线性比例关系。

    整体执行流程示意图:

    步骤 1:在AnalyticDB中为DLA开通一个VPC访问点

    DLA在上海region的VPC参数信息:

    • 可用区:cn-shanghai-d
    • VPC id: vpc-uf6wxkgst74es59wqareb
    • VSwitch id: vsw-uf6m7k4fcq3pgd0yjfdnm
    DLA Region可用区VPC idVSwitch id
    华东1(杭州) cn-hangzhou-g vpc-bp1g66t4f0onrvbht2et5 vsw-bp1nh5ri8di2q7tkof474
    华东2(上海) cn-shanghai-d vpc-uf6wxkgst74es59wqareb vsw-uf6m7k4fcq3pgd0yjfdnm
    华北2(北京) cn-beijing-g vpc-2zeawsrpzbelyjko7i0ir vsw-2zea8ct4hy4hwsrcpd52d
    华南1(深圳) cn-shenzhen-a vpc-wz9622zx341dy24ozifn3 vsw-wz91ov6gj2i4u2kenpe42
    华北3(张家口) cn-zhangjiakou-a vpc-8vbpi1t7c0devxwfe19sn vsw-8vbjl32xkft0ewggef6g9
    新加坡 ap-southeast-a vpc-t4n3sczhu5efvwo1gsupf vsw-t4npcrmzzk64r13e3nhhm
    英国(伦敦) eu-west-1a vpc-d7ovzdful8490upm8b413 vsw-d7opmgixr2h34r1975s8a

    在AnalyticDB中为DLA创建VPC的专有网络,注意,要使用MySQL命令行连接AnalyticDB的经典网络链接,执行:

    alter database txk_cldsj set zone_id='xxx' vpc_id='xxx' vswitch_id='xxx';
    

    其中,“zone_id”、“vpc_id”和“vswitch_id”分别填同region的DLA对应的VPC id和VSwitch id,见上表。

    命令执行成功后,刷新DMS for AnalyticDB控制台页面,应该能看到一个VPC的URL。

    步骤 2:在AnalyticDB中创建好目标的实时表

    具体AnalyticDB的建表文档请参考:https://help.aliyun.com/document_detail/26403.html

    -- 例如:
    
    -- 目标表为实时维度表:
    CREATE DIMENSION TABLE etl_ads_db.etl_ads_dimension_table (
      col1 INT, 
      col2 STRING, 
      col3 INT, 
      col4 STRING,
      primary key (col1)
    )
    options (updateType='realtime');
    
    -- 目标表为实时分区表:
    CREATE TABLE etl_ads_db.etl_ads_partition_table (
      col1 INT, 
      col2 INT, 
      col3 INT, 
      col4 INT, 
      col5 DOUBLE, 
      col6 DOUBLE, 
      col7 DOUBLE
      primary key (col1, col2, col3, col4)
    )
    PARTITION BY HASH KEY(col1)
    PARTITION NUM 32
    TABLEGROUP xxx_group
    options (updateType='realtime');
    
    

    步骤 3:在DLA中创建好与AnalyticDB目标表映射的表

    DLA中的表名、列名与AnalyticDB目标表对应同名

    这种情况下,建表语句会比较简单。
    其中,如下参数需要指明:

    -- 目标AnalyticDB
    LOCATION = 'jdbc:mysql://etl_ads_db-e85fbfe8-vpc.cn-shanghai-1.ads.aliyuncs.com:10001/etl_ads_db'
    
    -- 目标AnalyticDB的访问用户名
    USER='xxx'
    
    -- 目标AnalyticDB的访问密码
    PASSWORD='xxx'
    
    CREATE SCHEMA `etl_dla_schema` WITH DBPROPERTIES 
    ( 
      CATALOG = 'ads', 
      LOCATION = 'jdbc:mysql://etl_ads_db-e85fbfe8-vpc.cn-shanghai-1.ads.aliyuncs.com:10001/etl_ads_db',
      USER='xxx',
      PASSWORD='xxx'
    );
    
    USE etl_dla_schema;
    
    CREATE EXTERNAL TABLE etl_ads_dimension_table (
      col1 INT, 
      col2 VARCHAR(200), 
      col3 INT, 
      col4 VARCHAR(200),
      primary key (col1)
    );
    
    CREATE EXTERNAL TABLE etl_ads_partition_table (
      col1 INT, 
      col2 INT, 
      col3 INT, 
      col4 INT, 
      col5 DOUBLE, 
      col6 DOUBLE, 
      col7 DOUBLE
      primary key (col1, col2, col3, col4)
    )
    

    步骤 4:在DLA中创建表指向源OSS数据

    CREATE SCHEMA oss_data_schema with DBPROPERTIES(
      LOCATION = 'oss://my_bucket/',
      catalog='oss'
    );
    
    CREATE EXTERNAL TABLE IF NOT EXISTS dla_table_1 (
        col_1 INT, 
        col_2 VARCHAR(200), 
        col_3 INT, 
        col_4 VARCHAR(200)
    ) 
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' 
    STORED AS TEXTFILE 
    LOCATION 'oss://my_bucket/oss_table_1';
    
    CREATE EXTERNAL TABLE IF NOT EXISTS dla_table_2 (
      col_1 INT, 
      col_2 INT, 
      col_3 INT, 
      col_4 INT, 
      col_5 DOUBLE, 
      col_6 DOUBLE, 
      col_7 DOUBLE
    ) 
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' 
    STORED AS TEXTFILE 
    LOCATION 'oss://my_bucket/oss_table_2';
    
    

    步骤 5:在DLA中执行INSERT FROM SELECT语句

    INSERT FROM SELECT通常为长时运行任务,建议通过异步执行方式:
    注意:用MySQL命令行执行时,连接时,需要在命令行指定-c参数,用来识别MySQL语句前的hint:

    mysql -hxxx -Pxxx -uxxx -pxxx db_name -c
    

    示例:

    -- 执行OSS到AnalyticDB的全量数据插入
    /*+run-async=true*/
    INSERT INTO etl_dla_schema.etl_dla_dimension_table 
    SELECT * FROM oss_data_schema.dla_table_1;
    
    -- 执行OSS到AnalyticDB的数据插入,包含对OSS数据的筛选逻辑
    /*+run-async=true*/
    INSERT INTO etl_dla_schema.etl_dla_partition_table (col_1, col_2, col_3, col_7)
    SELECT col_1, col_2, col_3, col_7 
    FROM oss_data_schema.dla_table_2 
    WHERE col_1 > 1000 
    LIMIT 10000;
    

    注意:

    • 如果在INSERT INTO子句和SELECT子句中没有指定列信息,请确保源表和目标表的列定义顺序一致,且类型对应匹配;
    • 如果在INSERT INTO子句和SELECT子句中指定了列的信息,请确保两者中的列的顺序符合业务需要的匹配顺序,且类型对应匹配。

    如果在DMS for Data Lake Analytics控制台(https://datalakeanalytics.console.aliyun.com/))执行,请选择“异步执行”。

    然后可以从“执行历史” 中,点击“刷新”,查看任务的执行状态。
    异步执行INSERT FROM SELECT语句,会返回一个task id,通过这个task id,可以轮询任务执行情况,如果status为“SUCCESS”,则任务完成:

    SHOW query_task WHERE id = '26c6b18b_1532588796832'
    

    注意事项

    • AnalyticDB为主键覆盖逻辑,整个INSERT FROM SELECT的ETL任务失败,用户需要整体重试;
    • AnalyticDB消费数据有一定延时,在AnalyticDB端查询写入数据时,会有一定的延迟可见,具体延迟时间取决于AnalyticDB的资源规格;
    • 建议将ETL任务尽量切成小的单位批次执行,比如,OSS数据200GB,在业务允许的情况下,200GB的数据切成100个文件夹,每个文件夹2GB数据,对应DLA中建100张表,100张表分别做ETL,单个ETL任务失败,可以只重试单个ETL任务;
    • ETL任务结束后,视情况删除DLA中的表,包括映射AnalyticDB中的表、以及指向OSS数据的表。


    本文作者:julian.zhou

    原文链接

    本文为云栖社区原创内容,未经允许不得转载。

  • 相关阅读:
    Linux Shell编程(3)——运行shell脚本
    Linux Shell编程(2)——第一个shell程序
    Linux Shell编程(1)——shell编程简介
    做“程序员”,不做“码农”,有哪些好办法?
    玩转大数据,顺利渡过34岁裁退危机!
    Google IO 2017为我们带来了什么
    《经验之谈》想要做好SEO推广必知要事,峰任策划告诉您。
    5月17日云栖精选夜读:分布式大数据系统巧实现,全局数据调度管理不再难
    程序员转型发展:拆除这些墙,才会发现更蓝的天空
    敢问路在何方?程序员转行应该卖水果还是卖烧饼
  • 原文地址:https://www.cnblogs.com/zhaowei121/p/10697563.html
Copyright © 2011-2022 走看看