zoukankan      html  css  js  c++  java
  • Spark 内存管理模型详解

    一、Spark 内存介绍

    在执行 Spark 的应用程序时,Spark 集群会启动 Driver 和 Executor 两种JVM进程。

    Driver 程序主要负责:

    • 创建 Spark上下文;
    • 提交 Spark作业(Job)并将 Job 转化为计算任务(Task)交给 Executor 计算;
    • 协调各个 Executor 进程间任务调度。

    Executor 程序主要负责:

    • 在工作节点上执行具体的计算任务(Task),并将计算结果返回给 Driver;
    • 为需要持久化的 RDD 提供存储功能。

    由于 Driver 的内存管理相对来说较为简单,本文主要对 Executor 的内存管理进行分析,下文中的 Spark 内存均特指 Executor 的内存。

    二、Executor 内存管理

    Executor 进程作为一个 JVM 进程,其内存管理建立在 JVM 的内存管理之上,整个大致包含两种方式:堆内内存和堆外内存。

    堆内和堆外内存示意图
    一个 Executor 当中的所有 Task 是共享堆内内存的。一个 Work 中的多个 Executor 中的多个 Task 是共享堆外内存的。
    注意:默认的 Spark 是开启堆内内存的,配置参数:--executor-memory
    但是默认的堆外内存是不开启的:
    spark.memory.offHeap.enabled=true  # 开启堆外内存
    spark.memory.offHeap.size =1024    # 分配堆外内存的大小

    2.1 堆内内存

    堆内内存大小,是指 JVM 堆的内存大小,由 Spark 程序启动时的 -executor-memory 或 spark.executor.memory 参数配置。那 Spark 是如何管理堆内内存呢?Spark 对堆内内存的管理是一种逻辑上的“规划式”的管理,因为对象实例占用内存的申请和释放都由 JVM 完成,Spark 只能在申请后和释放前记录这些内存,我们来看看申请内存和释放内存的具体流程:
    申请内存:
    1. Spark 在代码中 new 一个实例对象。
    2. JVM 从堆内内存分配空间,创建对象并返回对象饮用。
    3. Spark 保存该对象的引用,记录该对象占用的内存。
    释放内存:
    1. Spark 记录该对象释放的内存,删除该对象的引用。
    2. 等待 JVM 的垃圾回收机制释放该对象占用的堆内内存。
     
    Spark 内存管理分为静态内存管理和统一内存管理,在 Spark 1.6 之前是采用的静态内存,之后的版本都是采用统一内存管理,与静态内存管理的区别在于 Storeage 内存和 Execution 内存共享统一块空间,可以动态占用对方的空闲区域。
     
    本文主要介绍统一内存管理。如图,可以看出堆内内存大致可以分为以下四个模块:
    • Storage 内存:主要用于存储 Spark 的 cache 数据,例如 RDD 的 cache,Broadcast 变量,Unroll 数据等。需要主要的是,unrolled 的数据如果内存不够,会存储在 driver 端。
    • Execution 内存:用于存储 Spark task 执行过程中需要的对象,如 Shuffle、Join、Sort、Aggregation等计算过程中的临时数据。
    • User 内存:分配 Spark Memory 剩余的内存,用户可以根据需要使用,可以存储 RDD transformations 需要的数据结构。
    • Reserved 内存:这部分内存是预留给系统用的,固定不变。
    统一内存管理图示—堆内

     2.2 堆外内存

    在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同如下图所示(以统一内存管理机制为例),所有运行中的并发任务共享存储内存和执行内存。
    统一内存管理图示—堆外

    三、统一内存管理机制

    Spark 1.6 之后引入的统一内存管理机制,存储内存(Storeage Memory)和执行内存(Execution Memory)可以动态占用对方的空闲区域(如下图),在设定基本的的存储内存和执行内存区域(由 spark.storage.memoryFraction 参数控制)后,便确定了双方各自拥有的空间容量。统一内存管理的核心在内存区域的动态占用机制,其占用规则如下:
    • 双方空间都不足时,则存储到硬盘;如己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)。
    • 执行内存的空间被对方占用后,可让对方将占用的部分存储转存到硬盘,然后“归还”借用的空间。
    • 存储内存的空间被对方占用后,无法让对方“归还”,因为需要考虑到 Shuffle 过程中很多因素,实现起来较为复杂。
    动态占用机制示意图 

    四、Spark 内存参数

    4.1 统一内存管理参数

    参数:spark.memory.fraction
    含义:用于执行和存储的部分(堆空间-300MB)。值越低,溢出和缓存数据逐出的频率就越高。此配置的目的是为稀疏的,异常大的记录留出用于内部元数据,用户数据结构和大小估计不精确的内存。建议将其保留为默认值。
    默认值:0.6 (Spark 1.6 默认为 0.75,Spark 2.0+ 默认为 0.6)
    
    参数:spark.memory.storageFraction
    含义:可以逐出的存储内存量,以spark.memory.fraction预留的区域大小的一部分表示。数值越高,则可用于执行的工作内存就越少,任务可能会更频繁地溢出到磁盘上. 建议将其保留为默认值。
    默认值:0.5
    
    参数:spark.memory.offHeap.enabled
    含义:如果为true,Spark将尝试将堆外内存用于某些操作。如果启用了堆外内存使用,则spark.memory.offHeap.size必须为正。
    默认值:false
    
    参数:spark.memory.offHeap.size
    含义:可用于堆外分配的绝对内存量(以字节为单位)。此设置对堆内存使用没有影响,因此,如果执行者的总内存消耗必须在某个硬限制内,那么请确保相应地缩小JVM堆大小。当spark.memory.offHeap.enabled=true时,必须将此值设置为正值.
    默认值:0
    
    参数:spark.memory.useLegacyMode 
    含义:是否启用Spark 1.5及之前版本中使用的旧版内存管理模式。传统模式将堆空间严格划分为固定大小的区域,如果未调整应用程序,则可能导致过多的溢出. 除非已启用,否则不会读取以下不推荐使用的内存分数配置: spark.shuffle.memoryFraction、spark.storage.memoryFraction、spark.storage.unrollFraction
    默认值:false
    
    参数:spark.shuffle.memoryFraction
    含义:(不建议使用)仅在启用spark.memory.useLegacyMode读。在 shuffle 期间用于聚合和协同分组的Java堆的分数。在任何给定时间,用于随机播放的所有内存映射的集合大小都受到此限制的限制,超出此限制,内容将开始溢出到磁盘。如果经常发生泄漏,请考虑以spark.storage.memoryFraction为代价增加此值.
    默认值:0.2
    
    参数:spark.storage.memoryFraction
    含义:(不建议使用)仅在启用spark.memory.useLegacyMode读。用于Spark的内存缓存的Java堆的分数。这不应大于JVM中对象的""代,默认情况下,该对象的堆大小为0.6,但是如果您配置自己的旧代大小,则可以增加它。
    默认值:0.6
    
    参数:spark.storage.unrollFraction
    含义:(不建议使用)仅在启用spark.memory.useLegacyMode读。spark.storage.memoryFraction分数,用于展开内存中的块。当没有足够的可用存储空间来完全展开新块时,通过删除现有块来动态分配该块。
    默认值:0.2

    4.2 内存参数示例

    为了方便理解堆内内存,可以通过一个简单的例子来看看。假设我们提交的 Spark 作业内存设置为 --executor-memory 18g。
    spark-submit 
    --master yarn 
    --deploy-mode client 
    --executor-memory 18g 
    --queue root.exquery 
    --class org.apache.spark.examples.SparkPi 
    /opt/cloudera/parcels/CDH/lib/spark/examples/lib/spark-examples-1.6.0-cdh5.14.4-hadoop2.6.0-cdh5.14.4.jar 
    100  

    Spark UI 页面显示的内存情况:

    可以看出,Storage Memory 的可用内存是 10 GB,这个数咋来的呢?根据前面的内存规则,可以得出以下计算:

    systemMemory = spark.executor.memory
    reservedMemory = 300MB
    usableMemory = systemMemory - reservedMemory
    StorageMemory= usableMemory * spark.memory.fraction * spark.memory.storageFraction

    代入数据,可以得出以下结果:

    systemMemory = 18Gb = 19327352832 字节
    reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800
    usableMemory = systemMemory - reservedMemory = 19327352832 - 314572800 = 19012780032
    StorageMemory= usableMemory * spark.memory.fraction * spark.memory.storageFraction
                 = 19012780032 * 0.6 * 0.5 = 5703834009.6 = 5.312109375GB

    结果和 Spark UI 界面展示的 10GB 差不多有一般的差距,这是因为 Spark UI 上显示的 Storage Memory 可用内存其实等于 Storage 内存和 Execution 内存之和,也就是 usableMemory * spark.memory.fraction。

    StorageMemory= usableMemory * spark.memory.fraction * spark.memory.storageFraction
                 = 19012780032 * 0.6 / (1024 * 1024 * 1024) = 10.62421 GB

    这个结果和 Spark UI 上看到的结果还是有点出入,为什么呢?这是因为虽然我们设置了 --executor-memory 18g,但是 Spark 的 Executor 端通过 Runtime.getRuntime.maxMemory 拿到的内存其实没这么大,只有 17179869184 字节,所以 systemMemory = 17179869184,然后计算的数据如下:

    systemMemory = 17179869184 字节
    reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800
    usableMemory = systemMemory - reservedMemory = 17179869184 - 314572800 = 16865296384
    StorageMemory= usableMemory * spark.memory.fraction
                 = 16865296384 * 0.6 /(1024 * 1024 * 1024)  = 9.42421 GB

    我们通过将上面的 16865296384 * 0.6 字节除于 1024 * 1024 * 1024 转换成 9.42421875 GB,和 UI 上显示的还是对不上,这是因为 Spark UI 是通过除于 1000 * 1000 * 1000 将字节转换成 GB,如下:

    systemMemory = 17179869184 字节
    reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800
    usableMemory = systemMemory - reservedMemory = 17179869184 - 314572800 = 16865296384
    StorageMemory= usableMemory * spark.memory.fraction
                 = 16865296384 * 0.6 字节 =  16865296384 * 0.6 / (1000 * 1000 * 1000) = 10.1GB

    systemMemory 内存之所以为 17179869184(可以通过方法 Runtime.getRuntime.maxMemory( ) 获得该值),是因为内存分配池的堆部分分为 Eden,Survivor 和 Tenured 三部分空间,而这里面一共包含了两个 Survivor 区域,而这两个 Survivor 区域在任何时候我们只能用到其中一个,所以我们可以使用下面的公式进行描述:

    ExecutorMemory = Eden + 2 * Survivor + Tenured
    Runtime.getRuntime.maxMemory =  Eden + Survivor + Tenured

    上面的 17179869184 字节可能因为 GC 配置不一样得到的数据不一样,但是上面的计算公式是一样的。

     
    【参考链接】
  • 相关阅读:
    鸿蒙的js开发模式19:鸿蒙手机下载python服务器端文件的实现
    【资源下载】Linux下的Hi3861一站式鸿蒙开发烧录(附工具)
    并发编程大扫盲:带你了解何为线程上下文切换
    内存溢出 MAT 排查工具,它真香香香
    面试官问:如何排除GC引起的CPU飙高?我脱口而出5个步骤
    小学妹问我:如何利用可视化工具排查问题?
    你不得不知的6个JDK自带JVM调优工具
    那个小白还没搞懂内存溢出,只能用案例说给他听了
    听说同学你搞不懂Spring Boot集成Mybatis的玩法,田哥来教你
    手把手教你设置JVM调优参数
  • 原文地址:https://www.cnblogs.com/lemonu/p/13433388.html
Copyright © 2011-2022 走看看