问题重现
rdd.repartition(1).write.csv(outPath)
写文件之后发现文件是压缩过的
write时首先会获取hadoopConf,然后从中获取是否压缩以及压缩格式
org.apache.spark.sql.execution.datasources.DataSource
def write(
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options)
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator = conf.get(SEPERATOR, " "); CompressionCodec codec = null; String extension = ""; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf); extension = codec.getDefaultExtension(); }
isCompressed取的是mapreduce.output.fileoutputformat.compress,codecClass取的是mapreduce.output.fileoutputformat.compress.codec
hadoopConf初始化过程为
org.apache.spark.sql.internal.SessionState
def newHadoopConf(): Configuration = { val hadoopConf = new Configuration(sparkSession.sparkContext.hadoopConfiguration)
org.apache.spark.SparkContext
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf) def newConfiguration(conf: SparkConf): Configuration = { val hadoopConf = new Configuration() appendS3AndSparkHadoopConfigurations(conf, hadoopConf) hadoopConf } def appendS3AndSparkHadoopConfigurations(conf: SparkConf, hadoopConf: Configuration): Unit = { ... conf.getAll.foreach { case (key, value) => if (key.startsWith("spark.hadoop.")) { hadoopConf.set(key.substring("spark.hadoop.".length), value) } }
hadoopConf默认会从classpath中加载所有的hadoop相关配置文件,可以通过spark-shell来简单测试:
scala> val hc = spark.sparkContext.hadoopConfiguration
hc: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml
scala> println(hc.get("mapreduce.output.fileoutputformat.compress"))
true
scala> println(hc.get("mapreduce.output.fileoutputformat.compress.codec"))
org.apache.hadoop.io.compress.DefaultCodec
综上,只需要在创建SparkConf的时候设置spark.hadoop.mapreduce.output.fileoutputformat.compress=false即可不压缩,
val sparkConf = new SparkConf().set("spark.hadoop.mapreduce.output.fileoutputformat.compress", "false")
另外还可以通过option来控制
rdd.repartition(1).write.option("compression", "none").csv(outPath)