zoukankan      html  css  js  c++  java
  • 【原创】大叔问题定位分享(12)Spark保存文本类型文件(text、csv、json等)到hdfs时为什么是压缩格式的

    问题重现

    rdd.repartition(1).write.csv(outPath)

    写文件之后发现文件是压缩过的

    write时首先会获取hadoopConf,然后从中获取是否压缩以及压缩格式

    org.apache.spark.sql.execution.datasources.DataSource

      def write(

    org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand

        val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options)

    org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

        public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
    
            Configuration conf = job.getConfiguration();
    
            boolean isCompressed = getCompressOutput(job);
    
            String keyValueSeparator = conf.get(SEPERATOR, "	");
    
            CompressionCodec codec = null;
    
            String extension = "";
    
            if (isCompressed) {
    
                Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
    
                codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf);
    
                extension = codec.getDefaultExtension();
    
            }

    isCompressed取的是mapreduce.output.fileoutputformat.compress,codecClass取的是mapreduce.output.fileoutputformat.compress.codec

    hadoopConf初始化过程为

    org.apache.spark.sql.internal.SessionState

      def newHadoopConf(): Configuration = {
    
        val hadoopConf = new Configuration(sparkSession.sparkContext.hadoopConfiguration)

    org.apache.spark.SparkContext

      _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
    
     
    
      def newConfiguration(conf: SparkConf): Configuration = {
    
        val hadoopConf = new Configuration()
    
        appendS3AndSparkHadoopConfigurations(conf, hadoopConf)
    
        hadoopConf
    
      }
    
     
    
      def appendS3AndSparkHadoopConfigurations(conf: SparkConf, hadoopConf: Configuration): Unit = {
    
      ...
    
          conf.getAll.foreach { case (key, value) =>
    
            if (key.startsWith("spark.hadoop.")) {
    
              hadoopConf.set(key.substring("spark.hadoop.".length), value)
    
            }
    
          }
    
     

    hadoopConf默认会从classpath中加载所有的hadoop相关配置文件,可以通过spark-shell来简单测试:

    scala> val hc = spark.sparkContext.hadoopConfiguration

    hc: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml

    scala> println(hc.get("mapreduce.output.fileoutputformat.compress"))

    true

    scala> println(hc.get("mapreduce.output.fileoutputformat.compress.codec"))

    org.apache.hadoop.io.compress.DefaultCodec

    综上,只需要在创建SparkConf的时候设置spark.hadoop.mapreduce.output.fileoutputformat.compress=false即可不压缩,

    val sparkConf = new SparkConf().set("spark.hadoop.mapreduce.output.fileoutputformat.compress", "false")

    另外还可以通过option来控制

    rdd.repartition(1).write.option("compression", "none").csv(outPath)
  • 相关阅读:
    二分练习题4 查找最接近的元素 题解
    二分练习题5 二分法求函数的零点 题解
    二分练习题3 查找小于x的最大元素 题解
    二分练习题2 查找大于等于x的最小元素 题解
    二分练习题1 查找元素 题解
    code forces 1176 D. Recover it!
    code forces 1173 B. Nauuo and Chess
    code forces 1173 C. Nauuo and Cards
    吴恩达深度学习课程笔记-15
    吴恩达深度学习课程笔记-14
  • 原文地址:https://www.cnblogs.com/barneywill/p/10109568.html
Copyright © 2011-2022 走看看