zoukankan      html  css  js  c++  java
  • Spark Job 内存的分配管理

    可以申请的最大内存

    启动 Spark Job 之前要先检查需要的内存是否太大,这部分代码在

    // resource-managersyarnsrcmainscalaorgapachesparkdeployyarnClient.scala
    
      private def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit = {
        val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
        logInfo("Verifying our application has not requested more than the maximum " +
          s"memory capability of the cluster ($maxMem MB per container)")
        val executorMem = executorMemory + executorMemoryOverhead + pysparkWorkerMemory
        if (executorMem > maxMem) {
          throw new IllegalArgumentException(s"Required executor memory ($executorMemory), overhead " +
            s"($executorMemoryOverhead MB), and PySpark memory ($pysparkWorkerMemory MB) is above " +
            s"the max threshold ($maxMem MB) of this cluster! Please check the values of " +
            s"'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.")
        }
        val amMem = amMemory + amMemoryOverhead
        if (amMem > maxMem) {
          throw new IllegalArgumentException(s"Required AM memory ($amMemory" +
            s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " +
            "Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or " +
            "'yarn.nodemanager.resource.memory-mb'.")
        }
        logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format(
          amMem,
          amMemoryOverhead))
    
        // We could add checks to make sure the entire cluster has enough resources but that involves
        // getting all the node reports and computing ourselves.
      }
    

    executorMemory 就是 spark-submit 指定的 executor memory 大小,就是 java 的 -Xmx 参数
    executorMemoryOverhead 是堆外内存,可以通过 spark-submit 指定,默认 executorMemory * 0.1 或是 384M
    pysparkWorkerMemory 是启动 JVM 前运行的 Python 代码需要的内存,一般不设置,默认是 0

      val MEMORY_OVERHEAD_FACTOR = 0.10
      val MEMORY_OVERHEAD_MIN = 384L
    
      private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
    
      private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
        math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
    
      private val pysparkWorkerMemory: Int = if (isPython) {
        sparkConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
      } else {
        0
      }
    

    amMemory 就是 spark-submit 指定的 Driver 的 memory 大小,就是 java 的 -Xmx 参数
    amMemoryOverhead 是 Driver 的堆外内存,默认 amMemory * 0.1 或是 384M

    而 maxMem 就是 Yarn 可以分配的最大内存,主要取决于两个配置

    yarn.scheduler.maximum-allocation-mb     # yarn 可以分配给 container 的最大内存
    yarn.nodemanager.resource.memory-mb      # yarn 可以使用的总内存
    

    申请的内存必须满足

    (executor 堆内内存 + executor 堆外内存 + executor python 内存) < (Yarn 可分配的 container 内存)
    (driver 堆内内存 + driver 堆外内存) < (Yarn 可分配的 container 内存)
    

    这就是 Spark Job 可以申请的最大内存

    默认堆外内存是不开启的,可以通过 spark.memory.offHeap.enabled 参数开启

    可以申请的最小内存

    coresrcmainscalaorgapachesparkmemoryUnifiedMemoryManager.scala
    
      private def getMaxMemory(conf: SparkConf): Long = {
        val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
        val reservedMemory = conf.getLong("spark.testing.reservedMemory",
          if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
        val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
        if (systemMemory < minSystemMemory) {
          throw new IllegalArgumentException(s"System memory $systemMemory must " +
            s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
            s"option or spark.driver.memory in Spark configuration.")
        }
        // SPARK-12759 Check executor memory to fail fast if memory is insufficient
        if (conf.contains("spark.executor.memory")) {
          val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
          if (executorMemory < minSystemMemory) {
            throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
              s"$minSystemMemory. Please increase executor memory using the " +
              s"--executor-memory option or spark.executor.memory in Spark configuration.")
          }
        }
        val usableMemory = systemMemory - reservedMemory
        val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
        (usableMemory * memoryFraction).toLong
      }
    
    private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
    

    systemMemory 等于指定的 driver memory
    executorMemory 等于指定的 executor memory

    这两个值不得小于 reservedMemory * 1.5 = 450M

    堆内内存的划分

    指定的堆内内存可以划分为几个部分

    • 预留内存(Reserved):用于存储 Spark 内部对象,大小固定为 300M
    • 用户内存(User):存储 RDD 转换操作所需要的数据,例如 RDD 依赖等信息
    • 存储内存(Storage):主要用于存储 spark 的 cache 数据,例如 RDD 的缓存、unroll 数据
    • 执行内存(Execution):主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据

    大小分别为

    • 预留内存 = 300M
    • 用户内存 = (堆内内存 - 预留内存) * (1 - spark.memory.fraction)
    • 存储内存 = (堆内内存 - 预留内存) * spark.memory.fraction * spark.memory.storageFraction
    • 执行内存 = (堆内内存 - 预留内存) * spark.memory.fraction * (1 - spark.memory.storageFraction)

    其中
    spark.memory.fraction 默认是 0.6
    spark.memory.storageFraction 默认是 0.5

    其中,存储内存和执行内存,在自己空间不足而对方还有空间时,可以占用对方空间

    可以看到 spark-submit 指定的内存,必须比数据大,并不是所有内存都用于存储数据



  • 相关阅读:
    设计模式命令模式(Command)
    设计模式责任链模式(COR)
    设计模式备忘录模式(Memento)
    设计模式中介者模式(Mediator)
    设计模式策略模式(Strategy)
    设计模式解释器模式(Interpreter)
    设计模式迭代器模式(Iterator)
    设计模式状态模式(State)
    Ext终于开始收费了
    设计模式观察者模式(Observer)
  • 原文地址:https://www.cnblogs.com/moonlight-lin/p/13894508.html
Copyright © 2011-2022 走看看