zoukankan      html  css  js  c++  java
  • 使用AWS Glue进行 ETL 工作

    数据湖

    数据湖的产生是为了存储各种各样原始数据的大型仓库。这些数据根据需求,进行存取、处理、分析等。对于存储部分来说,开源版本常见的就是 hdfs。而各大云厂商也提供了各自的存储服务,如 Amazon S3,Azure Blob 等。

    而由于数据湖中存储的数据全部为原始数据,一般需要对数据做ETL(Extract-Transform-Load)。对于大型数据集,常用的框架是 Spark、pyspark。在数据做完 ETL 后,再次将清洗后的数据存储到存储系统中(如hdfs、s3)。基于这部分清洗后的数据,数据分析师或是机器学习工程师等,可可以基于这些数据进行数据分析或是训练模型。在这些过程中,还有非常重要的一点是:如何对数据进行元数据管理?

    在 AWS 中,Glue 服务不仅提供了 ETL 服务,还提供的元数据的管理。下面我们会使用 S3+Glue +EMR 来展示一个数据湖+ETL+数据分析的一个简单过程。

    准备数据

    此次使用的是GDELT数据,地址为:

    https://registry.opendata.aws/gdelt/

    此数据集中,每个文件名均显示了此文件的日期。作为原始数据,我们首先将2015年的数据放在一个year=2015 的s3目录下:

    aws s3 cp s3://xxx/data/20151231.export.csv s3://xxxx/gdelt/year=2015/20151231.export.csv

    使用Glue爬取数据定义

    通过glue 创建一个爬网程序,爬取此文件中的数据格式,指定的数据源路径为s3://xxxx/gdelt/ 。

    此部分功能及具体介绍可参考aws 官方文档:

    https://docs.aws.amazon.com/zh_cn/glue/latest/dg/console-crawlers.html

    爬网程序结束后,在Glue 的数据目录中,即可看到新创建的 gdelt 表:

    原数据为csv格式,由于没有header,所以列名分别为col0、col1…、col57。其中由于s3下的目录结构为year=2015,所以爬网程序自动将year 识别为分区列。

    至此,这部分原数据的元数据即保存在了Glue。在做ETL 之前,我们可以使用AWS EMR 先验证一下它对元数据的管理。

    AWS EMR

    AWS EMR 是 AWS 提供的大数据集群,可以一键启动带Hive、HBase、Presto、Spark 等常用框架的集群。

    启动AWS EMR,勾选 Hive、Spark,并使用Glue作为它们表的元数据。EMR 启动后,登录到主节点,启动Hive:

    > show tables;

    gdelt

    Time taken: 0.154 seconds, Fetched: 1 row(s)

    可以看到在 hive 中已经可以看到此表,执行查询:

    > select * from gdelt where year=2015 limit 3;
    
    OK
    
    498318487       20060102        200601  2006    2006.0055       CVL     COMMUNITY                                               CVL                                                                                                   1       53      53      5       1       3.8     3       1       3       -2.42718446601942       1       United States   US      US      38.0    -97.0   US      0                               NULL  NULL            1       United States   US      US      38.0    -97.0   US      20151231        http://www.inlander.com/spokane/after-dolezal/Content?oid=2646896       2015
    
    498318488       20060102        200601  2006    2006.0055       CVL     COMMUNITY                                               CVL                     USA     UNITED STATES   USA                                                           1       51      51      5       1       3.4     3       1       3       -2.42718446601942       1       United States   US      US      38.0    -97.0   US      1       United States   US    US      38.0    -97.0   US      1       United States   US      US      38.0    -97.0   US      20151231        http://www.inlander.com/spokane/after-dolezal/Content?oid=2646896       2015
    
    498318489       20060102        200601  2006    2006.0055       CVL     COMMUNITY                                               CVL                     USA     UNITED STATES   USA                                                           1       53      53      5       1       3.8     3       1       3       -2.42718446601942       1       United States   US      US      38.0    -97.0   US      1       United States   US    US      38.0    -97.0   US      1       United States   US      US      38.0    -97.0   US      20151231        http://www.inlander.com/spokane/after-dolezal/Content?oid=2646896       2015

    可以看到原始数据的列非常多,假设我们所需要的仅有4列:事件ID、国家代码、日期、以及网址,并基于这些数据做分析。那我们下一步就是做ETL。

    GLUE ETL

    Glue 服务也提供了 ETL 的工具,可以编写基于spark 或是 python 的脚本,提交给 glue etl 执行。在这个例子中,我们会抽取col0、col52、col56、col57、以及year这些列,并给它们重命名。然后从中抽取仅包含“UK”的记录,最终以date=current_day 的格式写入到最终s3 目录,存储格式为parquet。可以通过 python 或是 scala 语言调用 GLUE 编程接口,在本文中使用的是 scala:

    import com.amazonaws.services.glue.ChoiceOption
    import com.amazonaws.services.glue.GlueContext
    import com.amazonaws.services.glue.DynamicFrame
    import com.amazonaws.services.glue.MappingSpec
    import com.amazonaws.services.glue.ResolveSpec
    import com.amazonaws.services.glue.errors.CallSite
    import com.amazonaws.services.glue.util.GlueArgParser
    import com.amazonaws.services.glue.util.Job
    import com.amazonaws.services.glue.util.JsonOptions
    import org.apache.spark.SparkContext
    import scala.collection.JavaConverters._
    import java.text.SimpleDateFormat
    import java.util.Date
    
    object Gdelt_etl {
      def main(sysArgs: Array[String]) {
          
        val sc: SparkContext = new SparkContext()
        val glueContext: GlueContext = new GlueContext(sc)
        val spark = glueContext.getSparkSession
        
        // @params: [JOB_NAME]
        val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
        Job.init(args("JOB_NAME"), glueContext, args.asJava)
        
        // db and table
        val dbName = "default"
        val tblName = "gdelt"
    
        // s3 location for output
        val format = new SimpleDateFormat("yyyy-MM-dd")
        val curdate = format.format(new Date())
        val outputDir = "s3://xxx-xxx-xxx/cleaned-gdelt/cur_date=" + curdate + "/"
    
        // Read data into DynamicFrame
        val raw_data = glueContext.getCatalogSource(database=dbName, tableName=tblName).getDynamicFrame()
    
        // Re-Mapping Data
        val cleanedDyF = raw_data.applyMapping(Seq(("col0", "long", "EventID", "string"), ("col52", "string", "CountryCode", "string"), ("col56", "long", "Date", "String"), ("col57", "string", "url", "string"), ("year", "string", "year", "string")))
    
        // Spark SQL on a Spark DataFrame
        val cleanedDF = cleanedDyF.toDF()
        cleanedDF.createOrReplaceTempView("gdlttable")
        
        // Get Only UK data
        val only_uk_sqlDF = spark.sql("select * from gdlttable where CountryCode = 'UK'")
        
        val cleanedSQLDyF = DynamicFrame(only_uk_sqlDF, glueContext).withName("only_uk_sqlDF")        
        
        // Write it out in Parquet
        glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> outputDir)), format = "parquet").writeDynamicFrame(cleanedSQLDyF)    
        
        Job.commit()
      }
    }

    将此脚本保存为gdelt.scala 文件,并提交给 GLUE ETL作业执行。等待执行完毕后,我们可以在s3看到生成了输出文件:

    > aws s3 ls s3://xxxx-xxx-xxx/cleaned-gdelt/ date=2020-04-12/

    part-00000-d25201b8-2d9c-49a0-95c8-f5e8cbb52b5b-c000.snappy.parquet

    然后我们再对此/cleaned-gdelt/目录执行一个新的 GLUE 网爬程序:

    执行完成后,可以在GLUE 看到生成了新表,此表结构为:

    可以看到输入输出格式均为parquet,分区键为cur_date,且仅包含了我们所需的列。

    再次进入到 EMR Hive 中,可以看到新表已出现:

    hive> describe cleaned_gdelt;
    OK
    eventid                 string
    countrycode             string
    date                    string
    url                     string
    year                    string
    date                    string
    
    # Partition Information
    # col_name              data_type               comment
    cur_date                    string

    查询此表:

    hive> select * from cleaned_gdelt limit 10;
    OK
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    498318821       UK      20151231        http://wmpoweruser.com/microsoft-denies-lumia-950-xl-withdrawn-due-issues-says-stock-due-strong-demand/ 2015
    498319466       UK      20151231        http://www.princegeorgecitizen.com/news/police-say-woman-man-mauled-by-2-dogs-in-home-in-british-columbia-1.2142296     2015
    498319777       UK      20151231        http://www.catchnews.com/life-society-news/happy-women-do-not-live-any-longer-than-sad-women-1451420391.html    2015
    498319915       UK      20151231        http://www.nationalinterest.org/feature/the-perils-eu-army-14770        2015
    …
    Time taken: 0.394 seconds, Fetched: 10 row(s)

    可以看到出现的结果均的 CountryCode 均为 UK,达到我们的目标。

    自动化

    下面是将 GLUE 网爬 + ETL 进行自动化。在GLUE ETL 的工作流程中,创建一个工作流,创建完后如下所示:

    如图所示,此工作流的过程为:

    1. 每晚11点40开始触发工作流
    2. 触发 gdelt 的网爬作业,爬取原始数据的元数据
    3. 触发gdelt的ETL作业
    4. 触发gdelt-cleaned 网爬程序,爬取清洗后的数据的元数据

    下面我们添加一个新文件到原始文件目录,此新数据为 year=2016 的数据:

    aws s3 cp s3://xxx-xxxx/data/20160101.export.csv s3://xxx-xxx-xxx/gdelt/year=2016/20160101.export.csv

    然后执行此工作流。

    期间我们可以看到ETL job 在raw_crawler_done 之后,被正常触发:

    作业完成后,在Hive 中即可查询到 2016 年的数据:

    select * from cleaned_gdelt where year=2016 limit 10;
    OK
    498554334       UK      20160101        http://medicinehatnews.com/news/national-news/2015/12/31/support-overwhelming-for-bc-couple-mauled-by-dogs-on-christmas-day/    2016
    498554336       UK      20160101        http://medicinehatnews.com/news/national-news/2015/12/31/support-overwhelming-for-bc-couple-mauled-by-dogs-on-christmas-day/    2016
    cur_date
  • 相关阅读:
    根据当前日期转目的国地区时间戳
    时间戳转换作用域问题
    字符串拼接问题
    input全选和取消全选
    循环遍历渲染模块
    jQuery实现获取选中复选框的值
    React组件
    underscore.js依赖库函数分析二(查找)
    underscore.js依赖库函数分析一(遍历)
    React入门
  • 原文地址:https://www.cnblogs.com/zackstang/p/12688892.html
Copyright © 2011-2022 走看看