zoukankan      html  css  js  c++  java
  • Spark2.2 saveAsTable 函数使用 overWrite 设置 Partition 会造成全覆盖的问题

    在使用 CDH 6.0.X 的版本还是自带的是 Spark2.2 的版本,2.2 版本的 Spark 使用 saveAsTable 如果使用overWrite PartitionBy 的功能会有和 hive 行为不一致的地方。

    比如我们目前有两个分区 2019-03-22 和 2019-03-23 两个分区,现在我们使用 saveAsTable 想覆盖其中一天的分区,结果却是将整个所有分区遮盖了。重建了整个目录,这明显不是我们想要的到的结果。

    好在 spark 在 2.3 版本中已经修复了这个问题,如果遇到的同学直接升级 cdh 的版本到 6.1.x 那么将会获得 spark2.4 ,就可以解决这个问题。但是由于升级集群需要牵扯到的精力的确还是太多,成本太高。所以我还是选择另外一个办法来解决这个问题,使用 hive 的语法来 overwrite 分区。

    Hive 的分区有两种情况:

    静态分区 - 我们提供一个分区列表,由 Hive 根据这个列表值进行分区

    动态分区 - 我们提供一个列,让其值变成分区的值,比如上面提到的日期。

    来看个例子

    DROP TABLE IF EXISTS stats;
    CREATE EXTERNAL TABLE stats (
        ad              STRING,
        impressions     INT,
        clicks          INT
    ) PARTITIONED BY (country STRING, year INT, month INT, day INT)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '	' 
    LINES TERMINATED BY '
    ';
    MSCK REPAIR TABLE stats;
    -- Specify static partitions INSERT OVERWRITE TABLE stats PARTITION(country = 'US', year = 2017, month = 3, day = 1) SELECT ad, SUM(impressions), SUM(clicks) FROM impression_logs WHERE log_day = 1 GROUP BY ad;
    -- Load data into partitions dynamically SET hive.exec.dynamic.partition = true; SET hive.exec.dynamic.partition.mode = nonstrict;
    INSERT OVERWRITE TABLE stats PARTITION(country
    = 'US', year = 2017, month = 3, day) SELECT ad, SUM(impressions), SUM(clicks), log_day as day FROM impression_logs GROUP BY ad;

    第二个插入操作指定使用 log_day 来作为动态 partition 的一部分。可以实现无数个分区,而第一种插入只能被归类为一种分区。

    最后我们可以让 spark 来直接使用 sql 将数据写入到表中以达到我们的目的。

    static partitions
    
    self.ss.sql("""
                            INSERT OVERWRITE TABLE analytics_db.alpha_md_day_dump_users
                            PARTITION(the_day='{}')
                            SELECT *
                            FROM _md_day_dump_users
                        """.format(st))
    
    ---------------------------------------------------------------
    
    dynamic partitions
    
    self.ss.sql("""
                            INSERT OVERWRITE TABLE analytics_db.alpha_md_day_dump_users
                            PARTITION(the_day=the_day)
                            SELECT the_day, xx, xx, xx
                            FROM _md_day_dump_users
                        """)

    如果生成小文件过多我们可以在写入之前操纵 df进行一次 repartitions。

    Reference:

    https://medium.com/a-muggles-pensieve/writing-into-dynamic-partitions-using-spark-2e2b818a007a   Writing Into Dynamic Partitions Using Spark
    https://issues.apache.org/jira/browse/SPARK-20236   Overwrite a partitioned data source table should only overwrite related partitions

     

  • 相关阅读:
    单独下载克隆clone github中master 分支的文件夹
    caffe makefile.config anaconda2 python3 所有问题一种解决方式
    问题解决
    找不到cannot find -lpython3.5m caffe anaconda python3 ubuntu16.04
    bash./ autogen 没有这个文件 ubuntu16.04git安装glog报错
    cumulative match score
    【Python网络编程】复习
    【Python网络编程】爬取百度贴吧、小说内容、豆瓣小说、Ajax爬微博、多线程爬淘宝
    【Python网络编程】UDP聊天、TCP文件下载、多线程UDP聊天器、多进程拷贝文件
    【前端性能】网站性能优化
  • 原文地址:https://www.cnblogs.com/piperck/p/10578042.html
Copyright © 2011-2022 走看看