可以申请的最大内存
启动 Spark Job 之前要先检查需要的内存是否太大,这部分代码在
// resource-managersyarnsrcmainscalaorgapachesparkdeployyarnClient.scala
private def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit = {
val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
logInfo("Verifying our application has not requested more than the maximum " +
s"memory capability of the cluster ($maxMem MB per container)")
val executorMem = executorMemory + executorMemoryOverhead + pysparkWorkerMemory
if (executorMem > maxMem) {
throw new IllegalArgumentException(s"Required executor memory ($executorMemory), overhead " +
s"($executorMemoryOverhead MB), and PySpark memory ($pysparkWorkerMemory MB) is above " +
s"the max threshold ($maxMem MB) of this cluster! Please check the values of " +
s"'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.")
}
val amMem = amMemory + amMemoryOverhead
if (amMem > maxMem) {
throw new IllegalArgumentException(s"Required AM memory ($amMemory" +
s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " +
"Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or " +
"'yarn.nodemanager.resource.memory-mb'.")
}
logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format(
amMem,
amMemoryOverhead))
// We could add checks to make sure the entire cluster has enough resources but that involves
// getting all the node reports and computing ourselves.
}
executorMemory 就是 spark-submit 指定的 executor memory 大小,就是 java 的 -Xmx 参数
executorMemoryOverhead 是堆外内存,可以通过 spark-submit 指定,默认 executorMemory * 0.1 或是 384M
pysparkWorkerMemory 是启动 JVM 前运行的 Python 代码需要的内存,一般不设置,默认是 0
val MEMORY_OVERHEAD_FACTOR = 0.10
val MEMORY_OVERHEAD_MIN = 384L
private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
private val pysparkWorkerMemory: Int = if (isPython) {
sparkConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
} else {
0
}
amMemory 就是 spark-submit 指定的 Driver 的 memory 大小,就是 java 的 -Xmx 参数
amMemoryOverhead 是 Driver 的堆外内存,默认 amMemory * 0.1 或是 384M
而 maxMem 就是 Yarn 可以分配的最大内存,主要取决于两个配置
yarn.scheduler.maximum-allocation-mb # yarn 可以分配给 container 的最大内存
yarn.nodemanager.resource.memory-mb # yarn 可以使用的总内存
申请的内存必须满足
(executor 堆内内存 + executor 堆外内存 + executor python 内存) < (Yarn 可分配的 container 内存)
(driver 堆内内存 + driver 堆外内存) < (Yarn 可分配的 container 内存)
这就是 Spark Job 可以申请的最大内存
默认堆外内存是不开启的,可以通过 spark.memory.offHeap.enabled 参数开启
可以申请的最小内存
coresrcmainscalaorgapachesparkmemoryUnifiedMemoryManager.scala
private def getMaxMemory(conf: SparkConf): Long = {
val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val reservedMemory = conf.getLong("spark.testing.reservedMemory",
if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
s"option or spark.driver.memory in Spark configuration.")
}
// SPARK-12759 Check executor memory to fail fast if memory is insufficient
if (conf.contains("spark.executor.memory")) {
val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
if (executorMemory < minSystemMemory) {
throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
s"$minSystemMemory. Please increase executor memory using the " +
s"--executor-memory option or spark.executor.memory in Spark configuration.")
}
}
val usableMemory = systemMemory - reservedMemory
val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
(usableMemory * memoryFraction).toLong
}
private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
systemMemory 等于指定的 driver memory
executorMemory 等于指定的 executor memory
这两个值不得小于 reservedMemory * 1.5 = 450M
堆内内存的划分
指定的堆内内存可以划分为几个部分
- 预留内存(Reserved):用于存储 Spark 内部对象,大小固定为 300M
- 用户内存(User):存储 RDD 转换操作所需要的数据,例如 RDD 依赖等信息
- 存储内存(Storage):主要用于存储 spark 的 cache 数据,例如 RDD 的缓存、unroll 数据
- 执行内存(Execution):主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据
大小分别为
- 预留内存 = 300M
- 用户内存 = (堆内内存 - 预留内存) * (1 - spark.memory.fraction)
- 存储内存 = (堆内内存 - 预留内存) * spark.memory.fraction * spark.memory.storageFraction
- 执行内存 = (堆内内存 - 预留内存) * spark.memory.fraction * (1 - spark.memory.storageFraction)
其中
spark.memory.fraction 默认是 0.6
spark.memory.storageFraction 默认是 0.5
其中,存储内存和执行内存,在自己空间不足而对方还有空间时,可以占用对方空间
可以看到 spark-submit 指定的内存,必须比数据大,并不是所有内存都用于存储数据