zoukankan      html  css  js  c++  java
  • 【原创】大叔问题定位分享(12)Spark保存文本类型文件(text、csv、json等)到hdfs时为什么是压缩格式的

    问题重现

    rdd.repartition(1).write.csv(outPath)

    写文件之后发现文件是压缩过的

    write时首先会获取hadoopConf,然后从中获取是否压缩以及压缩格式

    org.apache.spark.sql.execution.datasources.DataSource

      def write(

    org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand

        val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options)

    org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

        public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
    
            Configuration conf = job.getConfiguration();
    
            boolean isCompressed = getCompressOutput(job);
    
            String keyValueSeparator = conf.get(SEPERATOR, "	");
    
            CompressionCodec codec = null;
    
            String extension = "";
    
            if (isCompressed) {
    
                Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
    
                codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf);
    
                extension = codec.getDefaultExtension();
    
            }

    isCompressed取的是mapreduce.output.fileoutputformat.compress,codecClass取的是mapreduce.output.fileoutputformat.compress.codec

    hadoopConf初始化过程为

    org.apache.spark.sql.internal.SessionState

      def newHadoopConf(): Configuration = {
    
        val hadoopConf = new Configuration(sparkSession.sparkContext.hadoopConfiguration)

    org.apache.spark.SparkContext

      _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
    
     
    
      def newConfiguration(conf: SparkConf): Configuration = {
    
        val hadoopConf = new Configuration()
    
        appendS3AndSparkHadoopConfigurations(conf, hadoopConf)
    
        hadoopConf
    
      }
    
     
    
      def appendS3AndSparkHadoopConfigurations(conf: SparkConf, hadoopConf: Configuration): Unit = {
    
      ...
    
          conf.getAll.foreach { case (key, value) =>
    
            if (key.startsWith("spark.hadoop.")) {
    
              hadoopConf.set(key.substring("spark.hadoop.".length), value)
    
            }
    
          }
    
     

    hadoopConf默认会从classpath中加载所有的hadoop相关配置文件,可以通过spark-shell来简单测试:

    scala> val hc = spark.sparkContext.hadoopConfiguration

    hc: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml

    scala> println(hc.get("mapreduce.output.fileoutputformat.compress"))

    true

    scala> println(hc.get("mapreduce.output.fileoutputformat.compress.codec"))

    org.apache.hadoop.io.compress.DefaultCodec

    综上,只需要在创建SparkConf的时候设置spark.hadoop.mapreduce.output.fileoutputformat.compress=false即可不压缩,

    val sparkConf = new SparkConf().set("spark.hadoop.mapreduce.output.fileoutputformat.compress", "false")

    另外还可以通过option来控制

    rdd.repartition(1).write.option("compression", "none").csv(outPath)
  • 相关阅读:
    bzoj1724[Usaco2006 Nov]Fence Repair 切割木板*
    vue-cli脚手架和webpack-simple模板项目
    Vue-router 进阶
    前端路由vue-router介绍
    vue的一些特殊特性
    生命周期钩子
    过滤器
    RSA加密算法
    欧几里得算法
    动态规划
  • 原文地址:https://www.cnblogs.com/barneywill/p/10109568.html
Copyright © 2011-2022 走看看