zoukankan      html  css  js  c++  java
  • 【原创】大叔问题定位分享(12)Spark保存文本类型文件(text、csv、json等)到hdfs时为什么是压缩格式的

    问题重现

    rdd.repartition(1).write.csv(outPath)

    写文件之后发现文件是压缩过的

    write时首先会获取hadoopConf,然后从中获取是否压缩以及压缩格式

    org.apache.spark.sql.execution.datasources.DataSource

      def write(

    org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand

        val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options)

    org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

        public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
    
            Configuration conf = job.getConfiguration();
    
            boolean isCompressed = getCompressOutput(job);
    
            String keyValueSeparator = conf.get(SEPERATOR, "	");
    
            CompressionCodec codec = null;
    
            String extension = "";
    
            if (isCompressed) {
    
                Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
    
                codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf);
    
                extension = codec.getDefaultExtension();
    
            }

    isCompressed取的是mapreduce.output.fileoutputformat.compress,codecClass取的是mapreduce.output.fileoutputformat.compress.codec

    hadoopConf初始化过程为

    org.apache.spark.sql.internal.SessionState

      def newHadoopConf(): Configuration = {
    
        val hadoopConf = new Configuration(sparkSession.sparkContext.hadoopConfiguration)

    org.apache.spark.SparkContext

      _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
    
     
    
      def newConfiguration(conf: SparkConf): Configuration = {
    
        val hadoopConf = new Configuration()
    
        appendS3AndSparkHadoopConfigurations(conf, hadoopConf)
    
        hadoopConf
    
      }
    
     
    
      def appendS3AndSparkHadoopConfigurations(conf: SparkConf, hadoopConf: Configuration): Unit = {
    
      ...
    
          conf.getAll.foreach { case (key, value) =>
    
            if (key.startsWith("spark.hadoop.")) {
    
              hadoopConf.set(key.substring("spark.hadoop.".length), value)
    
            }
    
          }
    
     

    hadoopConf默认会从classpath中加载所有的hadoop相关配置文件,可以通过spark-shell来简单测试:

    scala> val hc = spark.sparkContext.hadoopConfiguration

    hc: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml

    scala> println(hc.get("mapreduce.output.fileoutputformat.compress"))

    true

    scala> println(hc.get("mapreduce.output.fileoutputformat.compress.codec"))

    org.apache.hadoop.io.compress.DefaultCodec

    综上,只需要在创建SparkConf的时候设置spark.hadoop.mapreduce.output.fileoutputformat.compress=false即可不压缩,

    val sparkConf = new SparkConf().set("spark.hadoop.mapreduce.output.fileoutputformat.compress", "false")

    另外还可以通过option来控制

    rdd.repartition(1).write.option("compression", "none").csv(outPath)
  • 相关阅读:
    平衡二叉树之RB树
    平衡二叉树之AVL树
    实现哈希表
    LeetCode Median of Two Sorted Arrays
    LeetCode Minimum Window Substring
    LeetCode Interleaving String
    LeetCode Regular Expression Matching
    PAT 1087 All Roads Lead to Rome
    PAT 1086 Tree Traversals Again
    LeetCode Longest Palindromic Substring
  • 原文地址:https://www.cnblogs.com/barneywill/p/10109568.html
Copyright © 2011-2022 走看看