zoukankan      html  css  js  c++  java
  • 【总结】Spark任务的core,executor,memory资源配置方法

    执行Spark任务,资源分配是很重要的一方面。如果配置不准确,Spark任务将耗费整个集群的机缘导致其他应用程序得不到资源。

    怎么去配置Spark任务的executors,cores,memory,有如下几个因素需要考虑:

    • 数据量
    • 任务完成时间点
    • 静态或者动态的资源分配
    • 上下游应用

    Spark应用当中术语的基本定义:

    • Partitions : 分区是大型分布式数据集的一小部分。 Spark使用分区来管理数据,这些分区有助于并行化数据处理,并且使executor之间的数据交换最小化
    • Task:任务是一个工作单元,可以在分布式数据集的分区上运行,并在单个Excutor上执行。并行执行的单位是任务级别。单个Stage中的Tasks可以并行执行
    • Executor:在一个worker节点上为应用程序创建的JVM,Executor将巡行task的数据保存在内存或者磁盘中。每个应用都有自己的一些executors,单个节点可以运行多个Executor,并且一个应用可以跨多节点。Executor始终伴随Spark应用执行过程,并且以多线程方式运行任务。spark应用的executor个数可以通过SparkConf或者命令行 –num-executor进行配置
    • Cores:CPU最基本的计算单元,一个CPU可以有一个或者多个core执行task任务,更多的core带来更高的计算效率,Spark中,cores决定了一个executor中并行task的个数
    • Cluster Manager:cluster manager负责从集群中请求资源

    cluster模式执行的Spark任务包含了如下步骤:

    1. driver端,SparkContext连接cluster manager(Standalone/Mesos/Yarn)
    2. Cluster Manager在其他应用之间定位资源,只要executor执行并且能够相互通信,可以使用任何Cluster Manager
    3. Spark获取集群中节点的Executor,每个应用都能够有自己的executor处理进程
    4. 发送应用程序代码到executor中
    5. SparkContext将Tasks发送到executors

    以上步骤可以清晰看到executors个数和内存设置在spark中的重要作用。

    以下将尝试理解优化spark任务的最佳方式:

    1. 静态分配:配置值从spark-submit中体现
    2. 动态分配:从数据量和计算需求上衡量资源需求,并在使用后释放掉,这样可以让其他应用重复利用资源

    静态分配

    以下按不同例子讨论验证不同参数和配置组合

    例子1
    硬件资源: 6 节点,每个节点16 cores, 64 GB 内存
    每个节点在计算资源时候,给操作系统和Hadoop的进程预留1core,1GB,所以每个节点剩下15个core和63GB
    内存。
    core的个数,决定一个executor能够并发任务的个数。所以通常认为,一个executor越多的并发任务能够得到更好的性能,但有研究显示一个应用并发任务超过5,导致更差的性能。所以core的个数暂设置为5个。
    5个core是表明executor并发任务的能力,并不是说一个系统有多少个core,即使我们一个CPU有32个core,也设置5个core不变。
    executor个数,接下来,一个executor分配 5 core,一个node有15 core,从而我们计算一个node上会有3 executor(15 / 5),然后通过每个node的executor个数得到整个任务可以分配的executors个数。
    我们有6个节点,每个节点3个executor,6 × 3 = 18个executors,额外预留1个executor给AM,最终要配置17个executors。
    最后spark-submit启动脚本中配置 –num-executors = 17
    memory,配置每个executor的内存,一个node,3 executor, 63G内存可用,所以每个executor可配置内存为63 / 3 = 21G
    从Spark的内存模型角度,Executor占用的内存分为两部分:ExecutorMemory和MemoryOverhead,预留出MemoryOverhead的内存量之后,才是ExecutorMemory的内存。
    MemoryOverhead的计算公式: max(384M, 0.07 × spark.executor.memory)

    因此 MemoryOverhead值为0.07 × 21G = 1.47G > 384M

    最终executor的内存配置值为 21G – 1.47 ≈ 19 GB

    至此, Cores = 5, Executors= 17, Executor Memory = 19 GB


    例子2
    硬件资源:6 node,32 core / node ,64G RAM / node
    core个数:5,与例子1中描述具体原因相同
    每个node的executor个数是 32 / 5 = 6
    executor总个数: 6 node × 6 executor - 1 (AM) = 35
    executor memory:
    每个node的内存 = 63G(1G留给系统应用) / 6 ≈ 10
    MemoryOverhead = 0.07 × 10G = 700M 约等于 1G
    ExecutorMemory = 10 G - 1 G = 9 GB

    最终 Cores = 5, Executors = 5, Executor Memory = 9 GB


    例子3
    硬件资源: 6 节点,每个节点16 cores, 64 GB 内存
    例子1和2的场景,都是从固定的core数量,计算executor和memory的数量。
    基于例子1,假设memory不需要19G,而仅需要10G内存就足够数据计算,此时core, executor,memory按如下计算方式:
    core个数据:5
    每个node的executor个数:15 / 3 = 5
    按照例子1计算方式,这个阶段的内存使用21G,除去MemoryOverhead的内存占用,剩下19G。当我们10G内存就足够的场景下,不能使用 63G / 10G = 6个executor。因为 6 exector × 5 core = 30 core,每个node要有30core,而例子1硬件配置是16core。这样我们就需要修改每个executor的core配置。

    接下来,将core的个数从5改为3,每个node有5个executor,一共应该有 5 executor * 6 node - 1(AM占用的core) = 29 ,memory则 63G / 5 ≈ 12
    MemoryOverhead = 0.07 × 12G = 840M 约等于 1G
    ExecutorMemory = 12 G - 1 G = 11 GB

    最终 Core = 3,Executor = 29,Executor Memory = 11G

    动态分配

    如果采用动态分配策略,executer的个数上限是无穷大。这就意味着Spark任务在需要资源的情况下,会占用到集群左右资源,集群中会有其他应用也需要资源运行,所以我们需要在集群层面上对core进行分配控制。

    这就意味着我们在基于用户访问基础上,在基于YARN的任务中进行分配core,我们创建一个spark_user,分配min max的core个数,这些core从YARN中剥离,区别于其他基于YARN调度应用(例如Hadoop等)

    为了理解动态资源分配,首先需要了解一些属性配置项:

    spark.dynamicAllocation.enabled : 设置为true,意味着我们不关心executor的个数,考虑用到动态资源分配策略,在stage阶段会有一下区别:

    以多少个executor启动Spark任务?

    spark.dynamicAllocation.initialExecutors:初始化executor个数

    怎样动态控制executor的个数?
    我们通过spark-submit的方式初始化executor个数,根据负载情况,任务在min(
    spark.dynamicAllocation.minExecutors)和max(
    spark.dynamicAllocation.maxExecutors)之间决定executor个数。

    什么时候获取一个新的executor和放弃一个executor
    spark.dynamicAllocation.schedulerBacklogTimeout :依靠这个参数,决定我们什么时候获取一个新的executor,每轮任务executor个数较前一轮将逞指数增长,比如第一轮1个executor,后续添加2,4,8个executor的方式,在某些特定场景下,将使用max(spark.dynamicAllocation.maxExecutors)个executor。
    spark.dynamicAllocation.executorIdleTimeout :executor空闲的时间,超时之后,将executor移除

    转载于:https://blog.51cto.com/10120275/2364992

     
     
  • 相关阅读:
    dubbox 入门demo
    manjaro 安装后的基本配置
    ajax传递参数与controller接收参数映射关系
    如何等待ajax完成再执行相应操作
    Java之取余操作 "%"
    javascript基本属性访问对象的属性和方法
    jQuery之过滤选择器
    Spring Framework
    大佬帮忙看一下
    Python笔记
  • 原文地址:https://www.cnblogs.com/javalinux/p/15104362.html
Copyright © 2011-2022 走看看