zoukankan      html  css  js  c++  java
  • Flink SQL 写 hudi

    # Flink SQL 写 hudi

    最近在做一个数据湖项目,基于 Hudi 的湖仓一体项目,计算引擎是 Flink + Spark

    之前稍稍研究了一下数据湖的三个主要技术组件 IceBerg,以为可能会用,在网上看资料的时候,同样也发现,IceBerg 对 Flink 比较亲和,Hudi 对 Spark 比较亲和

    一直以为会选 IceBerg,没想到 IceBerg 还有很多功能没有实现,相对来说 Hudi 会好很多

    ## 版本

    Flink 的 Hudi bundle 是 0.9-SNAPSHOT
    Hive 2.3
    Hadoop 3.1

    ## 编译 hudi

    Hudi 选择的是还没发布的 SNAPSHOT 版本,所以需要自己编译

    ```sh

    mvn install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2

    [INFO] ------------------------------------------------------------------------
    [INFO] Reactor Summary:
    [INFO]
    [INFO] Hudi ............................................... SUCCESS [ 1.948 s]
    [INFO] hudi-common ........................................ SUCCESS [ 6.494 s]
    [INFO] hudi-timeline-service .............................. SUCCESS [ 2.287 s]
    [INFO] hudi-client ........................................ SUCCESS [ 0.054 s]
    [INFO] hudi-client-common ................................. SUCCESS [ 2.930 s]
    [INFO] hudi-hadoop-mr ..................................... SUCCESS [ 2.892 s]
    [INFO] hudi-spark-client .................................. SUCCESS [ 6.267 s]
    [INFO] hudi-sync-common ................................... SUCCESS [ 0.502 s]
    [INFO] hudi-hive-sync ..................................... SUCCESS [ 2.651 s]
    [INFO] hudi-spark-datasource .............................. SUCCESS [ 0.089 s]
    [INFO] hudi-spark-common_2.11 ............................. SUCCESS [ 2.346 s]
    [INFO] hudi-spark2_2.11 ................................... SUCCESS [ 1.436 s]
    [INFO] hudi-spark_2.11 .................................... SUCCESS [ 9.377 s]
    [INFO] hudi-utilities_2.11 ................................ SUCCESS [ 4.049 s]
    [INFO] hudi-utilities-bundle_2.11 ......................... SUCCESS [ 12.717 s]
    [INFO] hudi-cli ........................................... SUCCESS [ 4.430 s]
    [INFO] hudi-java-client ................................... SUCCESS [ 0.902 s]
    [INFO] hudi-flink-client .................................. SUCCESS [ 1.406 s]
    [INFO] hudi-spark3_2.12 ................................... SUCCESS [ 2.199 s]
    [INFO] hudi-dla-sync ...................................... SUCCESS [ 1.347 s]
    [INFO] hudi-sync .......................................... SUCCESS [ 0.042 s]
    [INFO] hudi-hadoop-mr-bundle .............................. SUCCESS [ 4.292 s]
    [INFO] hudi-hive-sync-bundle .............................. SUCCESS [ 1.297 s]
    [INFO] hudi-spark-bundle_2.11 ............................. SUCCESS [ 9.176 s]
    [INFO] hudi-presto-bundle ................................. SUCCESS [ 4.972 s]
    [INFO] hudi-timeline-server-bundle ........................ SUCCESS [ 4.643 s]
    [INFO] hudi-hadoop-docker ................................. SUCCESS [ 0.445 s]
    [INFO] hudi-hadoop-base-docker ............................ SUCCESS [ 0.204 s]
    [INFO] hudi-hadoop-namenode-docker ........................ SUCCESS [ 0.053 s]
    [INFO] hudi-hadoop-datanode-docker ........................ SUCCESS [ 0.045 s]
    [INFO] hudi-hadoop-history-docker ......................... SUCCESS [ 0.096 s]
    [INFO] hudi-hadoop-hive-docker ............................ SUCCESS [ 0.278 s]
    [INFO] hudi-hadoop-sparkbase-docker ....................... SUCCESS [ 0.064 s]
    [INFO] hudi-hadoop-sparkmaster-docker ..................... SUCCESS [ 0.048 s]
    [INFO] hudi-hadoop-sparkworker-docker ..................... SUCCESS [ 0.048 s]
    [INFO] hudi-hadoop-sparkadhoc-docker ...................... SUCCESS [ 0.047 s]
    [INFO] hudi-hadoop-presto-docker .......................... SUCCESS [ 0.087 s]
    [INFO] hudi-integ-test .................................... SUCCESS [ 4.581 s]
    [INFO] hudi-integ-test-bundle ............................. SUCCESS [ 32.789 s]
    [INFO] hudi-examples ...................................... SUCCESS [ 1.140 s]
    [INFO] hudi-flink_2.11 .................................... SUCCESS [ 1.734 s]
    [INFO] hudi-flink-bundle_2.11 ............................. SUCCESS [ 21.285 s]
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 02:34 min
    [INFO] Finished at: 2021-07-18T18:17:51+08:00
    [INFO] Final Memory: 232M/1644M
    [INFO] ------------------------------------------------------------------------
    [WARNING] The requested profile "include-flink-sql-connector-hive" could not be activated because it does not exist.

    ```

    ## 前提

    Flink 写 Hudi 很简单,把 “hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar” 放到Flink 的 lib 下,把 Hudi 当成Flink 的一个 connector,使用 sql-client 就可以直接写Hudi 了,但是这样不能直接将 Hudi 的元数据同步到 Hive,Flink 在建 hudi 表的时候指定 hive 同步参数,可以将 Flink 建的表的元数据,直接同步到 Hive 中

    * 配置 hive
    * 将 hudi-hadoop-mr-bundle-0.9.0-SNAPSHOT.jar 放入 hive lib 中(让 hive 支持 hudi 格式的数据)
    * 启动 hive metastore
    * 启动 hive hiveserver2

    ```sh

    nohup hive --service metastore &
    nohup hive --service hiveserver2 &

    ```

    ## Flink SQL

    启动一个 yarn session

    ```sh
    ./bin/yarn-session.sh -d -ynm sql

    ```

    启动 sql-client
    ```sh
    ./bin/sql-client.sh embedded -s application_1626588183454_0001

    ```

    Flink sql
    ```sql

    create table kafka_ods_user_info (
    id int
    ,name string
    ,sex string
    ,age int
    ,birthday string
    ) with (
    'connector' = 'kafka',
    'topic' = 'test_topic_1',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'csv'
    );

    create table ods_user_info_3(
    dl_uuid string
    ,id int
    ,name string
    ,sex string
    ,age int
    ,birthday string
    ,`etl_create_time` TIMESTAMP(3) COMMENT 'ETL创建时间'
    ,`etl_update_time` TIMESTAMP(3) COMMENT 'ETL更新时间'
    ,`partition` string
    ) with (
    'connector' = 'hudi'
    ,'is_generic' = 'true'
    ,'path' = 'hdfs:///user/hive/warehouse/ods.db/ods_user_info_3'
    ,'hoodie.datasource.write.recordkey.field' = 'dl_uuid'
    ,'hoodie.datasource.write.partitionpath.field' = 'partition'
    ,'write.precombine.field' = 'etl_update_time'
    ,'write.tasks' = '1'
    ,'table.type' = 'MERGE_ON_READ'
    ,'compaction.tasks' = '1'
    ,'compaction.trigger.strategy' = 'num_or_time'
    ,'compaction.delta_commits' = '30'
    ,'compaction.delta_seconds' = '3600'
    ,'hive_sync.enable' = 'true'
    ,'hive_sync.db' = 'ods'
    ,'hive_sync.table' = 'ods_user_info'
    ,'hive_sync.file_format' = 'PARQUET'
    ,'hive_sync.support_timestamp' = 'true'
    ,'hive_sync.use_jdbc' = 'true'
    ,'hive_sync.jdbc_url' = 'jdbc:hive2://localhost:10000'
    ,'hive_sync.metastore.uris' = 'thrift://thinkpad:9083'
    ,'hoodie.datasource.hive_style_partition' = 'true'
    ,'hive_sync.partition_fields' = 'partition'
    ,'read.tasks' = '1'
    ,'read.streaming.enabled' = 'true'
    ,'hoodie.datasource.query.type' = 'snapshot'
    ,'read.streaming.start-commit' = '20210101000000'
    ,'read.streaming.check-interval' = '30'
    ,'hoodie.datasource.merge.type' = 'payload_combine'
    ,'read.utc-timezone' = 'false'
    );

    insert into ods_user_info_3
    select cast(id as string) dl_uuid
    ,id
    ,name
    ,sex
    ,age
    ,birthday
    ,now() etl_create_time
    ,now() etl_update_time
    ,date_format(now(), 'yyyy/MM/dd') -- only support partition format
    from kafka_ods_user_info;


    ```

    ## 写入测试数据

    ```sh

    cho $message | /opt/kafka2.2/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic_1

    ```

    ## hdfs 查看数据```sh
    hadoop fs -ls /user/hive/warehouse/ods.db/ods_user_info_3/2021/07/18/
    Found 4 items
    -rw-r--r-- 1 wuxu supergroup 115647 2021-07-18 18:09 /user/hive/warehouse/ods.db/ods_user_info_3/2021/07/18/.baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_20210718175258.log.1_0-1-0
    -rw-r--r-- 1 wuxu supergroup 93 2021-07-18 17:05 /user/hive/warehouse/ods.db/ods_user_info_3/2021/07/18/.hoodie_partition_metadata
    -rw-r--r-- 1 wuxu supergroup 436892 2021-07-18 17:20 /user/hive/warehouse/ods.db/ods_user_info_3/2021/07/18/9a29cbb2-b78c-4f32-a71e-36f975617ed0_0-1-0_20210718171958.parquet
    -rw-r--r-- 1 wuxu supergroup 461463 2021-07-18 17:53 /user/hive/warehouse/ods.db/ods_user_info_3/2021/07/18/baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718175258.parquet

    ```

    ## hive 查询数据


    ```sh

    hive> select * from ods_user_info_rt limit 10;
    OK
    _hoodie_commit_time _hoodie_commit_seqno _hoodie_record_key _hoodie_partition_path _hoodie_file_name dl_uuid id name sex age birthday etl_create_time etl_update_time partition
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    20210718171958 20210718171958_0_1 4970 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4970 4970 zhangsan_4970 male_4970 18 2020-01-01 1626599814075 1626599814075 2021-07-18
    20210718171958 20210718171958_0_2 4850 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4850 4850 zhangsan_4850 male_4850 18 2020-01-01 1626599551180 1626599551180 2021-07-18
    20210718171958 20210718171958_0_3 4971 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4971 4971 zhangsan_4971 male_4971 18 2020-01-01 1626599816273 1626599816273 2021-07-18
    20210718171958 20210718171958_0_4 4727 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4727 4727 zhangsan_4727 male_4727 18 2020-01-01 1626599281780 1626599281780 2021-07-18
    20210718171958 20210718171958_0_5 4848 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4848 4848 zhangsan_4848 male_4848 18 2020-01-01 1626599546853 1626599546853 2021-07-18
    20210718171958 20210718171958_0_6 4969 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4969 4969 zhangsan_4969 male_4969 18 2020-01-01 1626599811859 1626599811859 2021-07-18
    20210718171958 20210718171958_0_7 4728 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4728 4728 zhangsan_4728 male_4728 18 2020-01-01 1626599284072 1626599284072 2021-07-18
    20210718171958 20210718171958_0_8 4849 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4849 4849 zhangsan_4849 male_4849 18 2020-01-01 1626599548969 1626599548969 2021-07-18
    20210718171958 20210718171958_0_9 4729 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4729 4729 zhangsan_4729 male_4729 18 2020-01-01 1626599286234 1626599286234 2021-07-18
    20210718171958 20210718171958_0_10 4840 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4840 4840 zhangsan_4840 male_4840 18 2020-01-01 1626599529532 1626599529532 2021-07-18
    Time taken: 1.46 seconds, Fetched: 10 row(s)

    ```

    参考文档:

    hudi 官网

    HUDI FLINK 答疑解惑

  • 相关阅读:
    Sage CRM升级后的问题跟进
    js 控制滚动条位置
    初学Jquery插件制作:在SageCRM的查询屏幕隐藏部分行的功能
    [转摘] Reporting Service 200
    js数组去重复项
    JavaScript tips 1024 * 768
    javascript 的 replace 函数
    the secret of sagecrm urls
    Sage CRM 自增ID的方案和遇到的问题
    EXTJS 可编辑列表的时间编辑问题
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/15028614.html
Copyright © 2011-2022 走看看