zoukankan      html  css  js  c++  java
  • Apache Hudi集成Spark SQL抢先体验

    Apache Hudi集成Spark SQL抢先体验

    1. 摘要

    社区小伙伴一直期待的Hudi整合Spark SQL的PR正在积极Review中并已经快接近尾声,Hudi集成Spark SQL预计会在下个版本正式发布,在集成Spark SQL后,会极大方便用户对Hudi表的DDL/DML操作,下面就来看看如何使用Spark SQL操作Hudi表。

    2. 环境准备

    首先需要将PR拉取到本地打包,生成SPARK_BUNDLE_JAR(hudi-spark-bundle_2.11-0.9.0-SNAPSHOT.jar)

    2.1 启动spark-sql

    在配置完spark环境后可通过如下命令启动spark-sql

    spark-sql --jars $PATH_TO_SPARK_BUNDLE_JAR  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
    

    2.2 设置并发度

    由于Hudi默认upsert/insert/delete的并发度是1500,对于演示的小规模数据集可设置更小的并发度。

    set hoodie.upsert.shuffle.parallelism = 1;
    set hoodie.insert.shuffle.parallelism = 1;
    set hoodie.delete.shuffle.parallelism = 1;
    

    同时设置不同步Hudi表元数据

    set hoodie.datasource.meta.sync.enable=false;
    

    3. Create Table

    使用如下SQL创建表

    create table test_hudi_table (
      id int,
      name string,
      price double,
      ts long,
      dt string
    ) using hudi
     partitioned by (dt)
     options (
      primaryKey = 'id',
      type = 'mor'
     )
     location 'file:///tmp/test_hudi_table'
    

    说明:表类型为MOR,主键为id,分区字段为dt,合并字段默认为ts。

    创建Hudi表后查看创建的Hudi表

    show create table test_hudi_table
    

    4. Insert Into

    4.1 Insert

    使用如下SQL插入一条记录

     insert into test_hudi_table select 1 as id, 'hudi' as name, 10 as price, 1000 as ts, '2021-05-05' as dt
    

    insert完成后查看Hudi表本地目录结构,生成的元数据、分区和数据与Spark Datasource写入均相同。

    4.2 Select

    使用如下SQL查询Hudi表数据

    select * from test_hudi_table
    

    查询结果如下

    5. Update

    5.1 Update

    使用如下SQL将id为1的price字段值变更为20

    update test_hudi_table set price = 20.0 where id = 1
    

    5.2 Select

    再次查询Hudi表数据

    select * from test_hudi_table
    

    查询结果如下,可以看到price已经变成了20.0

    查看Hudi表的本地目录结构如下,可以看到在update之后又生成了一个deltacommit,同时生成了一个增量log文件。

    6. Delete

    6.1 Delete

    使用如下SQL将id=1的记录删除

    delete from test_hudi_table where id = 1
    

    查看Hudi表的本地目录结构如下,可以看到delete之后又生成了一个deltacommit,同时生成了一个增量log文件。

    6.2 Select

    再次查询Hudi表

    select * from test_hudi_table;
    

    查询结果如下,可以看到已经查询不到任何数据了,表明Hudi表中已经不存在任何记录了。

    7. Merge Into

    7.1 Merge Into Insert

    使用如下SQL向test_hudi_table插入数据

     merge into test_hudi_table as t0
     using (
      select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-03-21' as dt
     ) as s0
     on t0.id = s0.id
     when not matched and s0.id % 2 = 1 then insert *
    

    7.2 Select

    查询Hudi表数据

    select * from test_hudi_table
    

    查询结果如下,可以看到Hudi表中存在一条记录

    7.4 Merge Into Update

    使用如下SQL更新数据

     merge into test_hudi_table as t0
     using (
      select 1 as id, 'a1' as name, 12 as price, 1001 as ts, '2021-03-21' as dt
     ) as s0
     on t0.id = s0.id
     when matched and s0.id % 2 = 1 then update set *
    

    7.5 Select

    查询Hudi表

    select * from test_hudi_table
    

    查询结果如下,可以看到Hudi表中的分区已经更新了

    7.6 Merge Into Delete

    使用如下SQL删除数据

    merge into test_hudi_table t0
     using (
      select 1 as s_id, 'a2' as s_name, 15 as s_price, 1001 as s_ts, '2021-03-21' as dt
     ) s0
     on t0.id = s0.s_id
     when matched and s_ts = 1001 then delete
    

    查询结果如下,可以看到Hudi表中已经没有数据了

    8. 删除表

    使用如下命令删除Hudi表

    drop table test_hudi_table;
    

    使用show tables查看表是否存在

    show tables;
    

    可以看到已经没有表了

    9. 总结

    通过上面示例简单展示了通过Spark SQL Insert/Update/Delete Hudi表数据,通过SQL方式可以非常方便地操作Hudi表,降低了使用Hudi的门槛。另外Hudi集成Spark SQL工作将继续完善语法,尽量对标Snowflake和BigQuery的语法,如插入多张表(INSERT ALL WHEN condition1 INTO t1 WHEN condition2 into t2),变更Schema以及CALL Cleaner、CALL Clustering等Hudi表服务。

    PS:如果您觉得阅读本文对您有帮助,请点一下“推荐”按钮,您的“推荐”,将会是我不竭的动力!
    作者:leesf    掌控之中,才会成功;掌控之外,注定失败。
    出处:http://www.cnblogs.com/leesf456/
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
    如果觉得本文对您有帮助,您可以请我喝杯咖啡!

  • 相关阅读:
    Begin Example with Override Encoded SOAP XML Serialization
    State Machine Terminology
    How to: Specify an Alternate Element Name for an XML Stream
    How to: Publish Metadata for a WCF Service.(What is the Metadata Exchange Endpoint purpose.)
    Beginning Guide With Controlling XML Serialization Using Attributes(XmlSerializaiton of Array)
    Workflow 4.0 Hosting Extensions
    What can we do in the CacheMetaData Method of Activity
    How and Why to use the System.servicemodel.MessageParameterAttribute in WCF
    How to: Begin Sample with Serialization and Deserialization an Object
    A Test WCF Service without anything of config.
  • 原文地址:https://www.cnblogs.com/leesf456/p/14802281.html
Copyright © 2011-2022 走看看