zoukankan      html  css  js  c++  java
  • Spark定期合并Hive表小文件

    一、需求背景

    App端的埋点日志通过LogerServer收集到Kafka,再用Flink写入到HDFS,按天或天加小时分区,文件格式为text 或者Parquet,Checkpoint间隔为5分钟,Sink 并行度为10,每个小时产生600个小文件,由于数据量大,每天几十亿的数据,产生的小文件很多,Namenode压力大,影响Hive Sql & Spark Sql的查询性能。定期对HDFS小文件合并成为迫切的问题,也是数据治理的重点。开始尝试使用Hive Job定期合并小文件,带来的问题是占用资源多,执行时间长,后面改用Spark Job定期合并,效率有明显提升。

    二、Spark定期合并Hive表小文件Spark代码实现

    object MergeFile {
      def main(args: Array[String]): Unit = {
    
        val jobName = args(0)   // 任务名
        val tableName = args(1)  // hive表名
        val format = args(2).toInt   // 1 text格式 && 2 parquet格式
        val pa = args(3).toInt // 并发
        val dt_str = args(4)
        val dt = args(5)       // 分区天 开始dt
        val last = args(6)     // 截止dt
        val hour_str = args(7)
        val hour = args(8)     // 分区小时
    
        val spark = SparkSession
          .builder()
          .config("spark.seriailzer", "org.apache.spark.serializer.KryoSerializer")
          .appName(jobName + "_MergeFile" + dt)
          .master("yarn")
          .enableHiveSupport
          .getOrCreate
    
        val db = tableName.split("[.]")(0) + ".db"
        val orgTableName = tableName.split("[.]")(1)
    
        // 天+小时分区
        if (!hour_str.equals("null")) {
          // 原表导入到文件
          val df: DataFrame = spark.sql(s"select * from ${tableName} where ${dt_str}=${dt} and `${hour_str}`= ${hour} ")
          val origin_table_path = s"hdfs://emr-cluster/user/hive/warehouse/${db}/${orgTableName}/ ${dt_str}=$dt/hour=$hour"
    
          if (format == 1) {
            // text格式文件
            val text_path = s"hdfs://emr-cluster/user/hive/warehouse/temp.db/${jobName}/${dt_str}=${dt}/${hour_str}=${hour}"
            df.rdd.map(_.mkString("01")).coalesce(pa).saveAsTextFile(text_path)
    
            // 文件导入覆盖原表
            spark.read.textFile(text_path).write.mode(SaveMode.Overwrite).save(origin_table_path)
    
          } else {
            // parquet格式文件
            val parquet_path = s"hdfs://emr-cluster/user/hive/warehouse/temp.db/${jobName}/${dt_str}=${dt}/${hour_str}=${hour}"
            df.coalesce(pa).write.mode(SaveMode.Overwrite).parquet(parquet_path)
    
            // 文件导入覆盖原表
            spark.read.parquet(parquet_path).write.mode(SaveMode.Overwrite).save(origin_table_path)
          }
    
        } else {
          // 原表导入到文件
          val df: DataFrame = spark.sql(s"select * from ${tableName} where dt=${dt} ")
          val origin_table_path = s"hdfs://emr-cluster/user/hive/warehouse/${db}/${orgTableName}/${dt_str}=${dt}"
    
          if (format == 1) {
            // text格式文件
            val text_path = s"hdfs://emr-cluster/user/hive/warehouse/temp.db/${jobName}/${dt_str}=${dt}"
            df.rdd.map(_.mkString("01")).coalesce(pa).saveAsTextFile(text_path)
    
            // 文件导入覆盖原表
            spark.read.textFile(text_path).write.mode(SaveMode.Overwrite).save(origin_table_path)
    
          } else {
            // parquet格式文件
            val parquet_path = s"hdfs://emr-cluster/user/hive/warehouse/temp.db/${jobName}/${dt_str}=${dt}"
            df.coalesce(pa).write.mode(SaveMode.Overwrite).parquet(parquet_path)
    
            val aa = spark.read.parquet(parquet_path)
            aa.show(10)
    
            // 文件导入覆盖原表
            spark.read.parquet(parquet_path).write.mode(SaveMode.Overwrite).save(origin_table_path)
          }
        }
    
        spark.close()
      }
    }
    

      

    三、定期执行合并Job

    写个shell脚本传入所需参数,可设定任意的分区开始日期和结束日期,灵活合并Hive表的分区文件。

  • 相关阅读:
    对Spring的简单理解
    对Hibernate的简单认识
    对Struts的简单理解
    浅谈实体类
    xdebug配置
    hosts文件修改完无效的解决办法
    CentOS6.4 中文输入法
    python加密解密
    windows运行命令大全
    vm虚拟机centos文件共享目录设置
  • 原文地址:https://www.cnblogs.com/zfwwdz/p/13154995.html
Copyright © 2011-2022 走看看