zoukankan      html  css  js  c++  java
  • Apache Hudi:CDC的黄金搭档

    1. 介绍

    Apache Hudi是一个开源的数据湖框架,旨在简化增量数据处理和数据管道开发。借助Hudi可以在Amazon S3、Aliyun OSS数据湖中进行记录级别管理插入/更新/删除。AWS EMR集群已支持Hudi组件,并且可以与AWS Glue Data Catalog无缝集成。此特性可使得直接在Athena或Redshift Spectrum查询Hudi数据集。

    对于企业使用AWS云的一种常见数据流如图1所示,即将数据实时复制到S3。

    本篇文章将介绍如何使用Oracle GoldenGate来捕获变更事件并利用Hudi格式写入S3数据湖。

    Oracle GG可以使用多个处理程序和格式输出,请查看此处获取更多信息。

    本篇文章中不关心处理程序,我们假设使用Avro Operation格式,这种格式较为冗长,但有着广泛应用,因为其平衡了数据完整性和性能。如图2所示,此格式包含每个记录的beforeafter版本。

    即使完整且易于生成,此格式也不适合用Athena或Spectrum进行分析,从使用角度也无法替代源数据。此外你可能需要对历史数据进行分区处理以便快速检索。

    本文我们将介绍如何利用Apache Hudi框架做到这一点,以构建易于分析的目标数据集。

    2. 系统架构

    我们不详细介绍如何将avro格式文件放入Replica S3桶中,整个数据体系结构如下所示

    Hudi代码运行在EMR集群中,从Replica S3桶中读取avro数据,并将目标数据集存储到Target S3桶中。

    EMR软件配置如下

    硬件配置如下

    由于插入/更新始终保留最后一条记录,因此Hudi作业非常具有弹性, 因此可以利用Spot Instance(抢占式实例)大大降低成本。

    除此之外,还需要设置

    • 源bucket(如 my-s3-sourceBucket)
    • 目标bucket (如 my-s4-targetBucket)
    • Glue数据库(如 sales-db)

    配置完后需要确保EMR集群有读写权限。

    如果你需要一些样例数据,可以点击此处获取。当设置好桶后,启动EMR集群并将这些样例数据导入Replica桶。

    3. 关于分区的注意事项

    为构建按时间划分的数据集,必须确定不可变的日期类型字段。参照示例数据集(销售订单),我们假设订单日期永远不会改变,因此我们将DAT_ORDER字段作为写入Hudi数据集的分区字段。

    分区方式是YYYY/MM/DD,通过该方式,所有数据将被组织在嵌套的子文件夹中。Hudi框架将提供此分区信息,并将一个特定字段添加到关联的Hive/Glue表中。当查询时,该字段上的过滤条件将转换为超高效的分区修剪扫描条件。

    实际上这是我们必须对数据集做的唯一强假设,所有其他信息都在avro文件中(字段名称,字段类型,PK等)。

    除此元数据外,GoldenGate通常还会添加一些其他信息,例如表名称,操作时间戳,操作类型(插入/更新/删除)和自定义标记。你可以利用这些字段来构造通用逻辑并构建灵活的迁移平台。

    4. 步骤

    启动spark-shell

    spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
    

    启动后可以运行如下代码:

    val ggDeltaFiles = "s3://" + sourceBucket + "/" + sourceSubFolder + "/" + sourceSystem + "/" + inputTableName + "/";
    val rootDataframe:DataFrame = spark.read.format("avro").load(ggDeltaFiles);
    
    // extract PK fields name from first line
    val pkFields: Seq[String] = rootDataframe.select("primary_keys").limit(1).collect()(0).getSeq(0);
    
    // take into account the "after." fields only
    val columnsPre:Array[String] = rootDataframe.select("after.*").columns;
    
    // exclude "_isMissing" fields added by Oracle GoldenGate
    // The second part of the expression will safely preserve all native "**_isMissing" fields
    val columnsPost:Array[String] = columnsPre.filter { x => (!x.endsWith("_isMissing")) || (!x.endsWith("_isMissing_isMissing") && (columnsPre.filter(y => (y.equals(x + "_isMissing")) ).nonEmpty))};
    val columnsFinal:ArrayBuffer[String] = new ArrayBuffer[String]();
    
    columnsFinal += "op_ts";
    columnsFinal += "pos";
    
    // add the "after." prefix
    columnsPost.foreach(x => (columnsFinal += "after." + x));
    
    // prepare the target dataframe with the partition additional column
    val preparedDataframe = rootDataframe.select("opTypeFieldName", columnsFinal.toArray:_*).
      withColumn("HUDI_PART_DATE", date_format(to_date(col("DAT_ORDER"), "yyyy-MM-dd"),"yyyy/MM/dd")).
      filter(col(opTypeFieldName).isin(admittedValues.toList: _*));
    
    // write data
    preparedDataframe.write.format("org.apache.hudi").
      options(hudiOptions).
      option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, pkFields.mkString(",")).
      mode(SaveMode.Append).
      save(hudiTablePath);
    

    上述简化了部分代码,可以在此处找到完整的代码。

    5. 结果

    输出的S3对象结果如下所示

    同时Glue数据目录将使该表可用于通过外部模式在Athena或Spectrum中进行查询分析,外部表具有我们用于分区的hudi_part_date附加字段。

    PS:如果您觉得阅读本文对您有帮助,请点一下“推荐”按钮,您的“推荐”,将会是我不竭的动力!
    作者:leesf    掌控之中,才会成功;掌控之外,注定失败。
    出处:http://www.cnblogs.com/leesf456/
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
    如果觉得本文对您有帮助,您可以请我喝杯咖啡!

  • 相关阅读:
    K近邻(K Nearest Neighbor-KNN)原理讲解及实现
    Bisecting KMeans (二分K均值)算法讲解及实现
    KMeans (K均值)算法讲解及实现
    NodeJs使用async让代码按顺序串行执行
    NodeJs递归删除非空文件夹
    NodeJs之配置文件管理
    NodeJs针对Express框架配置Mysql进行数据库操作
    在Express中使用Multiparty进行文件上传及POST、GET参数获取
    Linux操作命令
    SftpUtil FTP文件上传
  • 原文地址:https://www.cnblogs.com/leesf456/p/14620479.html
Copyright © 2011-2022 走看看