zoukankan      html  css  js  c++  java
  • Apache Hudi + AWS S3 + Athena实战

    Apache Hudi在阿里巴巴集团、EMIS Health,LinkNovate,Tathastu.AI,腾讯,Uber内使用,并且由Amazon AWS EMR和Google云平台支持,最近Amazon Athena支持了在Amazon S3上查询Apache Hudi数据集的能力,本博客将测试Athena查询S3上Hudi格式数据集。

    1. 准备-Spark环境,S3 Bucket

    需要使用Spark写入Hudi数据,登陆Amazon EMR并启动spark-shell:

    $ export SCALA_VERSION=2.12
    $ export SPARK_VERSION=2.4.4
    $ spark-shell 
    --packages org.apache.hudi:hudi-spark-bundle_${SCALA_VERSION}:0.5.3,org.apache.spark:spark-avro_${SCALA_VERSION}:${SPARK_VERSION}
    --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
    ...
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _ / _ / _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_   version 2.4.4
          /_/
             
    Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242)
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala>
    

    接着使用如下scala代码设置表名,基础路径以及数据生成器来生成数据。这里设置basepaths3://hudi_athena_test/hudi_trips,以便后面进行查询

    import org.apache.hudi.QuickstartUtils._
    import scala.collection.JavaConversions._
    import org.apache.spark.sql.SaveMode._
    import org.apache.hudi.DataSourceReadOptions._
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.config.HoodieWriteConfig._
    val tableName = "hudi_trips"
    val basePath = "s3://hudi_athena_test/hudi_trips"
    val dataGen = new DataGenerator
    

    2. 插入数据

    生成新的行程数据,导入DataFrame,并将其写入Hudi表

    val inserts = convertToStringList(dataGen.generateInserts(10))
    val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
    df.write.format("hudi").
      options(getQuickstartWriteConfigs).
      option(PRECOMBINE_FIELD_OPT_KEY, "ts").
      option(RECORDKEY_FIELD_OPT_KEY, "uuid").
      option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
      option(TABLE_NAME, tableName).
      mode(Overwrite).
      save(basePath)
    

    3. 创建Athena数据库/表

    Hudi内置表分区支持,所以在创建表后需要添加分区,安装athenareader工具,其提供Athena多个查询和其他有用的特性。

    go get -u github.com/uber/athenadriver/athenareader
    

    接着创建hudi_athena_test.sql文件,内容如下

    DROP DATABASE IF EXISTS hudi_athena_test CASCADE;
    create database hudi_athena_test;
    CREATE EXTERNAL TABLE `trips`(
      `begin_lat` double,
      `begin_lon` double,
      `driver` string,
      `end_lat` double,
      `end_lon` double,
      `fare` double,
      `rider` string,
      `ts` double,
      `uuid` string
    ) PARTITIONED BY (`partitionpath` string) 
    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
    LOCATION 's3://hudi_athena_test/hudi_trips'
    ALTER TABLE trips ADD
      PARTITION (partitionpath = 'americas/united_states/san_francisco') LOCATION 's3://hudi_athena_test/hudi_trips/americas/united_states/san_francisco'
      PARTITION (partitionpath = 'americas/brazil/sao_paulo') LOCATION 's3://hudi_athena_test/hudi_trips/americas/brazil/sao_paulo'
      PARTITION (partitionpath = 'asia/india/chennai') LOCATION 's3://hudi_athena_test/hudi_trips/asia/india/chennai'
    

    使用如下命令运行SQL语句

    $ athenareader -q hudi_athena_test.sql
    

    4. 使用Athena查询Hudi

    如果没有错误,那么说明库和表在Athena中都已创建好,因此可以在Athena中查询Hudi数据集,使用athenareader查询结果如下

    athenareader -q "select * from trips" -o markdown
    

    也可以带条件进行查询

    athenareader -q "select fare,rider from trips where fare>20" -o markdown
    

    5. 更新Hudi表再次查询

    Hudi支持S3中的数据,回到spark-shell并使用如下命令更新部分数据

    val updates = convertToStringList(dataGen.generateUpdates(10))
    val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
    df.write.format("hudi").
      options(getQuickstartWriteConfigs).
      option(PRECOMBINE_FIELD_OPT_KEY, "ts").
      option(RECORDKEY_FIELD_OPT_KEY, "uuid").
      option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
      option(TABLE_NAME, tableName).
      mode(Append).
      save(basePath)
    

    运行完成后,使用athenareader再次查询

    athenareader -q "select * from trips" -o markdown
    

    可以看到数据已经更新了

    6. 限制

    Athena不支持查询快照或增量查询,Hive/SparkSQL支持,为进行验证,通过spark-shell创建一个快照

    spark.
      read.
      format("hudi").
      load(basePath + "/*/*/*/*").
      createOrReplaceTempView("hudi_trips_snapshot")
    

    使用如下代码查询

    val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
    val beginTime = commits(commits.length - 2)
    

    使用Athena查询将会失败,因为没有物化

    $ athenareader -q "select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime"
    SYNTAX_ERROR: line 1:57: Table awsdatacatalog.hudi_athena_test.hudi_trips_snapshot does not exist
    

    根据官方文档,Athena支持查询Hudi数据集的Read-Optimized视图,同时,我们可以通过Athena来创建视图并进行查询,使用Athena在Hudi表上创建一个视图

    $ athenareader -q "create view fare_greater_than_40 as select * from trips where fare>40" -a
    

    查询视图

    $ athenareader -q "select fare,rider from fare_greater_than_40"
     FARE                RIDER     
     43.4923811219014    rider-213 
     63.72504913279929   rider-284 
     90.25710109008239   rider-284 
     93.56018115236618   rider-213 
     49.527694252432056  rider-284 
     90.9053809533154    rider-284 
     98.3428192817987    rider-284
    
  • 相关阅读:
    Compiling LIBFFM On OSX 10.9
    Linux shell 脚本入门教程+实例
    Understanding the Bias-Variance Tradeoff
    Learning How To Code Neural Networks
    MXNet设计和实现简介
    数据需求统计常用awk命令
    Deal with relational data using libFM with blocks
    MATLAB 在同一个m文件中写多个独立的功能函数
    Debug 路漫漫-06
    MATLAB 求两个矩阵的 欧氏距离
  • 原文地址:https://www.cnblogs.com/leesf456/p/13428190.html
Copyright © 2011-2022 走看看