zoukankan      html  css  js  c++  java
  • 真香!PySpark整合Apache Hudi实战

    1. 准备

    Hudi支持Spark-2.x版本,你可以点击如下链接安装Spark,并使用pyspark启动

    # pyspark
    export PYSPARK_PYTHON=$(which python3)
    spark-2.4.4-bin-hadoop2.7/bin/pyspark 
      --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 
      --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
    
    • spark-avro模块需要在--packages显示指定
    • spark-avro和spark的版本必须匹配
    • 本示例中,由于依赖spark-avro_2.11,因此使用的是scala2.11构建hudi-spark-bundle,如果使用spark-avro_2.12,相应的需要使用hudi-spark-bundle_2.12

    进行一些前置变量初始化

    # pyspark
    tableName = "hudi_trips_cow"
    basePath = "file:///tmp/hudi_trips_cow"
    dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
    

    其中DataGenerator可以用来基于行程schema生成插入和删除的样例数据。

    2. 插入数据

    生成一些新的行程数据,加载到DataFrame中,并将DataFrame写入Hudi表

    # pyspark
    inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
    df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
    
    hudi_options = {
      'hoodie.table.name': tableName,
      'hoodie.datasource.write.recordkey.field': 'uuid',
      'hoodie.datasource.write.partitionpath.field': 'partitionpath',
      'hoodie.datasource.write.table.name': tableName,
      'hoodie.datasource.write.operation': 'insert',
      'hoodie.datasource.write.precombine.field': 'ts',
      'hoodie.upsert.shuffle.parallelism': 2, 
      'hoodie.insert.shuffle.parallelism': 2
    }
    
    df.write.format("hudi"). 
      options(**hudi_options). 
      mode("overwrite"). 
      save(basePath)
    

    mode(Overwrite)会覆盖并重新创建数据集。示例中提供了一个主键 (schema中的uuid),分区字段(region/county/city)和组合字段(schema中的ts) 以确保行程记录在每个分区中都是唯一的。

    3. 查询数据

    将数据加载至DataFrame

    # pyspark
    tripsSnapshotDF = spark. 
      read. 
      format("hudi"). 
      load(basePath + "/*/*/*/*")
    
    tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
    
    spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
    spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
    

    该查询提供读取优化视图,由于我们的分区路径格式为region/country/city),从基本路径(basepath)开始,我们使用load(basePath + "/*/*/*/*")来加载数据。

    4. 更新数据

    与插入新数据类似,还是使用DataGenerator生成更新数据,然后使用DataFrame写入Hudi表。

    # pyspark
    updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
    df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
    df.write.format("hudi"). 
      options(**hudi_options). 
      mode("append"). 
      save(basePath)
    

    注意,现在保存模式现在为append。通常,除非是第一次尝试创建数据集,否则请始终使用追加模式。每个写操作都会生成一个新的由时间戳表示的commit

    5. 增量查询

    Hudi提供了增量拉取的能力,即可以拉取从指定commit时间之后的变更,如不指定结束时间,那么将会拉取最新的变更。

    # pyspark
    # reload data
    spark. 
      read. 
      format("hudi"). 
      load(basePath + "/*/*/*/*"). 
      createOrReplaceTempView("hudi_trips_snapshot")
    
    commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(50).collect()))
    beginTime = commits[len(commits) - 2] # commit time we are interested in
    
    # incrementally query data
    incremental_read_options = {
      'hoodie.datasource.query.type': 'incremental',
      'hoodie.datasource.read.begin.instanttime': beginTime,
    }
    
    tripsIncrementalDF = spark.read.format("hudi"). 
      options(**incremental_read_options). 
      load(basePath)
    tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
    
    spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()
    

    这表示查询在开始时间提交之后的所有变更,此增量拉取功能可以在批量数据上构建流式管道。

    6. 特定时间点查询

    即如何查询特定时间的数据,可以通过将结束时间指向特定的提交时间,将开始时间指向”000”(表示最早的提交时间)来表示特定时间。

    # pyspark
    beginTime = "000" # Represents all commits > this time.
    endTime = commits[len(commits) - 2]
    
    # query point in time data
    point_in_time_read_options = {
      'hoodie.datasource.query.type': 'incremental',
      'hoodie.datasource.read.end.instanttime': endTime,
      'hoodie.datasource.read.begin.instanttime': beginTime
    }
    
    tripsPointInTimeDF = spark.read.format("hudi"). 
      options(**point_in_time_read_options). 
      load(basePath)
    
    tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
    spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
    

    7. 删除数据

    删除传入的HoodieKey集合,注意:删除操作只支持append模式

    # pyspark
    # fetch total records count
    spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
    # fetch two records to be deleted
    ds = spark.sql("select uuid, partitionPath from hudi_trips_snapshot").limit(2)
    
    # issue deletes
    hudi_delete_options = {
      'hoodie.table.name': tableName,
      'hoodie.datasource.write.recordkey.field': 'uuid',
      'hoodie.datasource.write.partitionpath.field': 'partitionpath',
      'hoodie.datasource.write.table.name': tableName,
      'hoodie.datasource.write.operation': 'delete',
      'hoodie.datasource.write.precombine.field': 'ts',
      'hoodie.upsert.shuffle.parallelism': 2, 
      'hoodie.insert.shuffle.parallelism': 2
    }
    
    from pyspark.sql.functions import lit
    deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
    df = spark.sparkContext.parallelize(deletes).toDF(['partitionpath', 'uuid']).withColumn('ts', lit(0.0))
    df.write.format("hudi"). 
      options(**hudi_delete_options). 
      mode("append"). 
      save(basePath)
    
    # run the same read query as above.
    roAfterDeleteViewDF = spark. 
      read. 
      format("hudi"). 
      load(basePath + "/*/*/*/*") 
    roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
    # fetch should return (total - 2) records
    spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
    

    8. 总结

    本篇博文展示了如何使用pyspark来插入、删除、更新Hudi表,有pyspark和Hudi需求的小伙伴不妨一试!

  • 相关阅读:
    解释机器学习模型的一些方法(一)——数据可视化
    机器学习模型解释工具-Lime
    Hive SQL 语法学习与实践
    LeetCode 198. 打家劫舍(House Robber)LeetCode 213. 打家劫舍 II(House Robber II)
    LeetCode 148. 排序链表(Sort List)
    LeetCode 18. 四数之和(4Sum)
    LeetCode 12. 整数转罗马数字(Integer to Roman)
    LeetCode 31. 下一个排列(Next Permutation)
    LeetCode 168. Excel表列名称(Excel Sheet Column Title)
    论FPGA建模,与面向对象编程的相似性
  • 原文地址:https://www.cnblogs.com/leesf456/p/12863646.html
Copyright © 2011-2022 走看看