zoukankan      html  css  js  c++  java
  • Apache Hudi集成Spark SQL抢先体验

    Apache Hudi集成Spark SQL抢先体验

    1. 摘要

    社区小伙伴一直期待的Hudi整合Spark SQL的PR正在积极Review中并已经快接近尾声,Hudi集成Spark SQL预计会在下个版本正式发布,在集成Spark SQL后,会极大方便用户对Hudi表的DDL/DML操作,下面就来看看如何使用Spark SQL操作Hudi表。

    2. 环境准备

    首先需要将PR拉取到本地打包,生成SPARK_BUNDLE_JAR(hudi-spark-bundle_2.11-0.9.0-SNAPSHOT.jar)

    2.1 启动spark-sql

    在配置完spark环境后可通过如下命令启动spark-sql

    spark-sql --jars $PATH_TO_SPARK_BUNDLE_JAR  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
    

    2.2 设置并发度

    由于Hudi默认upsert/insert/delete的并发度是1500,对于演示的小规模数据集可设置更小的并发度。

    set hoodie.upsert.shuffle.parallelism = 1;
    set hoodie.insert.shuffle.parallelism = 1;
    set hoodie.delete.shuffle.parallelism = 1;
    

    同时设置不同步Hudi表元数据

    set hoodie.datasource.meta.sync.enable=false;
    

    3. Create Table

    使用如下SQL创建表

    create table test_hudi_table (
      id int,
      name string,
      price double,
      ts long,
      dt string
    ) using hudi
     partitioned by (dt)
     options (
      primaryKey = 'id',
      type = 'mor'
     )
     location 'file:///tmp/test_hudi_table'
    

    说明:表类型为MOR,主键为id,分区字段为dt,合并字段默认为ts。

    创建Hudi表后查看创建的Hudi表

    show create table test_hudi_table
    

    4. Insert Into

    4.1 Insert

    使用如下SQL插入一条记录

     insert into test_hudi_table select 1 as id, 'hudi' as name, 10 as price, 1000 as ts, '2021-05-05' as dt
    

    insert完成后查看Hudi表本地目录结构,生成的元数据、分区和数据与Spark Datasource写入均相同。

    4.2 Select

    使用如下SQL查询Hudi表数据

    select * from test_hudi_table
    

    查询结果如下

    5. Update

    5.1 Update

    使用如下SQL将id为1的price字段值变更为20

    update test_hudi_table set price = 20.0 where id = 1
    

    5.2 Select

    再次查询Hudi表数据

    select * from test_hudi_table
    

    查询结果如下,可以看到price已经变成了20.0

    查看Hudi表的本地目录结构如下,可以看到在update之后又生成了一个deltacommit,同时生成了一个增量log文件。

    6. Delete

    6.1 Delete

    使用如下SQL将id=1的记录删除

    delete from test_hudi_table where id = 1
    

    查看Hudi表的本地目录结构如下,可以看到delete之后又生成了一个deltacommit,同时生成了一个增量log文件。

    6.2 Select

    再次查询Hudi表

    select * from test_hudi_table;
    

    查询结果如下,可以看到已经查询不到任何数据了,表明Hudi表中已经不存在任何记录了。

    7. Merge Into

    7.1 Merge Into Insert

    使用如下SQL向test_hudi_table插入数据

     merge into test_hudi_table as t0
     using (
      select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-03-21' as dt
     ) as s0
     on t0.id = s0.id
     when not matched and s0.id % 2 = 1 then insert *
    

    7.2 Select

    查询Hudi表数据

    select * from test_hudi_table
    

    查询结果如下,可以看到Hudi表中存在一条记录

    7.4 Merge Into Update

    使用如下SQL更新数据

     merge into test_hudi_table as t0
     using (
      select 1 as id, 'a1' as name, 12 as price, 1001 as ts, '2021-03-21' as dt
     ) as s0
     on t0.id = s0.id
     when matched and s0.id % 2 = 1 then update set *
    

    7.5 Select

    查询Hudi表

    select * from test_hudi_table
    

    查询结果如下,可以看到Hudi表中的分区已经更新了

    7.6 Merge Into Delete

    使用如下SQL删除数据

    merge into test_hudi_table t0
     using (
      select 1 as s_id, 'a2' as s_name, 15 as s_price, 1001 as s_ts, '2021-03-21' as dt
     ) s0
     on t0.id = s0.s_id
     when matched and s_ts = 1001 then delete
    

    查询结果如下,可以看到Hudi表中已经没有数据了

    8. 删除表

    使用如下命令删除Hudi表

    drop table test_hudi_table;
    

    使用show tables查看表是否存在

    show tables;
    

    可以看到已经没有表了

    9. 总结

    通过上面示例简单展示了通过Spark SQL Insert/Update/Delete Hudi表数据,通过SQL方式可以非常方便地操作Hudi表,降低了使用Hudi的门槛。另外Hudi集成Spark SQL工作将继续完善语法,尽量对标Snowflake和BigQuery的语法,如插入多张表(INSERT ALL WHEN condition1 INTO t1 WHEN condition2 into t2),变更Schema以及CALL Cleaner、CALL Clustering等Hudi表服务。

    PS:如果您觉得阅读本文对您有帮助,请点一下“推荐”按钮,您的“推荐”,将会是我不竭的动力!
    作者:leesf    掌控之中,才会成功;掌控之外,注定失败。
    出处:http://www.cnblogs.com/leesf456/
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
    如果觉得本文对您有帮助,您可以请我喝杯咖啡!

  • 相关阅读:
    FAL_CLIENT和FAL_SERVER参数详解
    Goldengate OGG常见问题与错误列表
    Goldengate:ERROR 180 encountered commit SCN that is not greater than the highest SCN already processed
    OGG-01028 Incompatible Record解决办法
    goldengate–使用filter+@GETENV在线重新初始化指定的table
    RAC环境中threads变更后如何确保goldengate继续正常复制
    default listener is not configured in grid infrastructure home
    11gr2 RAC安装INS-35354问题一例
    为11.2.0.2 Grid Infrastructure添加节点
    修改/dev/shm的大小
  • 原文地址:https://www.cnblogs.com/leesf456/p/14802281.html
Copyright © 2011-2022 走看看