我们知道,spark中每个分片都代表着一部分数据,那么分片数量如何被确认的呢?
首先我们使用最常见的HDFS+Spark,sparkDeploy的方式来讨论,spark读取HDFS数据使用的是sparkcontext.textfile(Path, minPartitions):
1 def textFile( 2 path: String, 3 minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { 4 assertNotStopped() 5 hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 6 minPartitions).map(pair => pair._2.toString) 7 }
在用户指定minPartitions时,便会使用用户指定的分片数量来划分,否则使用defaultMinPartitions。那么defaultMinPartitions是怎么来的?
def defaultMinPartitions: Int = math.min(defaultParallelism, 2) ... def defaultParallelism: Int = { assertNotStopped() taskScheduler.defaultParallelism } ... override def defaultParallelism(): Int = backend.defaultParallelism() ... override def defaultParallelism(): Int = { conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)) }
可以看到这个参数是通过SparkConf中的spark.default.parallelism指定的。如果两边都没指定,那么分片数就为2。
在内存小,分片数少而数据量较大的情况下,会产生GC error,因为内存占用过大,java的垃圾回收无法完成,所以在出现内存错误的时候不妨试试将默认的分片数量加大,或者干脆在textfile中指定。这样有助于数据的处理完成。