zoukankan      html  css  js  c++  java
  • Spark作业调度

    概述

    研究Spark作业调度,是为了合理使用集群的资源。更具体一点,是看看是否提供了可以个性化配置的点,然后根据应用的具体情况制定配置或者使用方案。

    本文参考官网作业调度文档

    spark的作业调度分为两个场景:跨应用的调度和应用内部的调度,下面分别介绍。

    声明:文中配图是自己的理解,并不敢保证其准确性。

    跨应用调度

    跨应用的调度是由底层的集群管理器负责的,有两种资源分配策略。

    一种是静态资源分隔,即一个应用一开始就申请所有的资源,并在程序运行期间使用持有这些资源。

    一种是动态资源分配,应用根据自己的负载情况动态请求或释放资源。这种策略默认是不开启的。

    静态资源分隔

    所有的集群管理器都支持静态资源分隔,只是具体的配置策略不同:

    Standalone mode

    提交到Standalone mode集群的应用会以FIFO的顺序运行,每一个正在运行的应用都会尝试占用所有的可用资源。使用下面的配置项可以限制每个应用申请的资源:

    spark.cores.max

    应用可以申请的最大数量的CPU核的数量,如果没有设置,取spark.deploy.defaultCores的值。

    spark.executor.memory

    分配给每个executor进程的内存资源。

    Mesos

    为了使用静态资源隔离,需要设置spark.mesos.coarse为true,这称为粗粒度的Mesos模式。

    另外,spark.cores.maxspark.executor.memory在Mesos模式下同样有效。

    YARN

    --num-executors

    在使用spark-submit提交作业时,可以使用--num-executors选项请求指定的executor个数。

    在程序内部,可以通过设置spark.executor.instances属性达到同样的目的。

    --executor-memory

    在使用spark-submit提交作业时,可以使用--executor-memory选项设置每个executor申请的内存。

    在程序内部,可以通过设置spark.executor.memory属性达到同样的目的。

    --executor-cores

    在使用spark-submit提交作业时,可以使用--executor-cores选项设置每个executor申请的CPU核。

    在程序内部,可以通过设置spark.executor.cores属性达到同样的目的。

    动态资源分配

    spark的运行模型是基于executor的,executor是资源的实际持有者。所以动态资源分配,是通过动态的申请executor和释放executor来实现的。

    动态资源分配涉及到两个方面,如何在需要的时候动态申请资源,以及如何在空闲的时候动态释放资源。

    动态请求策略:如果一个应用有tasks在等待,超过一定的时间(spark.dynamicAllocation.schedulerBacklogTimeout秒)就会申请1个executor。此后每隔一定的时间(spark.dynamicAllocation.sustainedSchedulerBacklogTimeout秒)就检测应用是否有tasks在等待,有就继续申请executor。

    动态请求资源的数量是指数级的,第一次申请1个,第二次申请2个,接着是4, 8 ...这种考虑是为了在谨慎申请资源的同时,又可以在允许的时间范围内获得真正需要的资源量。

    动态释放资源:是通过检查应用占据的executor是否超过了指定的时间(spark.dynamicAllocation.executorIdleTimeout秒)来决定的,超过了就释放。

    释放资源的条件和请求资源的条件是互斥的,即如果一个应用有tasks在排队,就不应该会有空闲的executor。

    how to do

    为了使用动态资源分配,需要做两件事:

    1. 设置spark.dynamicAllocation.enabled值为true
    2. 在每一个工作节点启动external shuffle service,并设置spark.shuffle.service.enabled为true

    external shuffle service的作用在后面会介绍,不同集群模式下启动external shuffle service的方式不同:

    • Standalone模式,不需要额外的工作来启动external shuffle service,只需要设置spark.shuffle.service.enabled为true即可。
    • Mesos粗粒度模式,在每一个slave nodes运行脚本$SPARK_HOME/sbin/start-mesos-shuffle-service.sh来启动external shuffle service
    • YARN模式,参考Configuring the External Shuffle Service On Yarn

    动态移除executor面对的问题

    动态释放资源需要额外的支持,因为executor可能会产生中间结果并输出到本地,在需要的时候需要通过这个executor获取它的中间结果。冒然移除executor会丢失它计算的中间结果,导致在真正需要的时候又要重新计算。

    比如在map阶段executor输出map结果,在shuffle阶段这些map结果又需要通过executor读出来传送到负责reduce的executor。

    spark通过external shuffle service来解决这个问题。external shuffle service是指在每一个node都运行的一个长期进程,这个进程独立于应用和executor,负责提供executor的输出数据的获取服务。原来executor之间相互请求来获取对方的输出结果,变成了统一从external shuffle service获取结果。

    即使executor已经被移除了,它所输出的数据依然可以通过external shuffle service来获取。

    另外,executor还可能会把中间结果缓存到内存,目前的策略是不移除此类的executor。未来可能采取将缓存持久化的方式,进而释放executor。

    应用内部调度

    一个spark应用可以支持多个不同线程的job同时提交,这常见于spark应用提供网络服务的场景。

    spark默认的调度策略是FIFO,如果队列头部的job比较大,占用了集群的所有资源,后面的小任务将迟迟得不到运行的机会。

    另外,spark还支持配置FAIR调度,spark循环调度每个job的task。这样即使有大job在运行,刚提交的小job也可以及时获得资源,而不需要等到大job结束。

    通过设置属性spark.scheduler.mode来启用公平调度:

    val conf = new SparkConf().setMaster(...).setAppName(...)
    conf.set("spark.scheduler.mode", "FAIR")
    val sc = new SparkContext(conf)
    

    公平调度池

    spark支持公平调度池的概念,每个线程可以指定将jobs提交到哪个池子,最细粒度的场景下是每个线程对应一个池,也可以多个线程使用同一个池。

    每个线程默认使用的池是default,也可以通过设置参数来明确指定池。

    // Assuming sc is your SparkContext variable
    sc.setLocalProperty("spark.scheduler.pool", "pool1")
    

    如果想重置当前线程绑定的池子,调用sc.setLocalProperty("spark.scheduler.pool", null)

    可以通过配置文件将资源按照一定的比重分配到池,配置文件的模板:conf/fairscheduler.xml.template

    通过conf.set("spark.scheduler.allocation.file", "/path/to/file")指定配置文件。

    每个池可支持的参数有三个:

    • schedulingMode:FIFO 或 FAIR,FIFO是默认的策略。
    • weight:每个池子分配资源的权重,默认情况下所有的权重为1。
    • minShare:最小资源,CPU核的数量,默认为0。在进行资源分配时,总是最先满足所有池的minShare,再根据weight分配剩下的资源。

    配置文件示例:

    <?xml version="1.0"?>
    <allocations>
      <pool name="production">
        <schedulingMode>FAIR</schedulingMode>
        <weight>1</weight>
        <minShare>2</minShare>
      </pool>
      <pool name="test">
        <schedulingMode>FIFO</schedulingMode>
        <weight>2</weight>
        <minShare>3</minShare>
      </pool>
    </allocations>
    

    没有出现在配置文件中的池,所有参数取默认值(schedulingMode=FIFO,weight=1,minShare=0)。

    概念澄清

    executor到底指什么?和容器、JVM的关系是怎样的?

    executor是负责一定职责的程序组件,可以在已有的JVM中运行(比如local mode),也可以在新的JVM中运行。使用YARN时,executor是在YARN容器中运行的。

    spark的job - stage - task的划分是怎么样的?

    spark的job可以划分为多个stage,这些stage构成了DAG。每一个stage又可以划分为多个tasks。stage的划分是根据shuffle map task来的,这一类的task相当于MapReduce中shuffle的map端,负责在本地RDD分区进行计算,并将结果输出到新的分区,供后续的使用。在划分stage时,shuffle map任务作为阶段的结束的边界。

    Mesos的粗粒度和细粒度

    Mesos可以启用CPU核的共享,即同一个节点executor在不使用核的情况下可以让给另一个executor来使用。

    不启用CPU核共享称为粗粒度,启用则称为细粒度,相关的配置项为spark.mesos.coarse,值为true表示粗粒度。

  • 相关阅读:
    字典树Trie
    转载一个不错的LRU cache
    git和github基础入门
    git基础之常用操作
    python矩阵和向量的转置问题
    梯度下降法注意要点
    python 浮点数问题
    Python数据分析基础——读写CSV文件2
    Python数据分析基础——读写CSV文件
    读书笔记----javascript函数编程
  • 原文地址:https://www.cnblogs.com/ywjy/p/7792639.html
Copyright © 2011-2022 走看看