Spark Job调度
1、概览
Spark有几种用于在计算之间调度资源的工具。首先,回想一下,如集群模式概述中所述,每个Spark应用程序(SparkContext的实例)都运行一组独立的executor进程。Spark运行的集群管理器提供了跨应用程序的调度工具。其次, 在每个Spark应用程序中,如果多个“job”(每个Spark action都是一个job)由不同的线程提交,则它们可以同时运行。如果您的应用程序通过网络提供请求,则这很常见。Spark包含一个公平的调度程序来调度每个SparkContext中的资源。
2、跨应用程序进行调度
在集群上运行时,每个Spark应用程序都会获得一组独立的executor JVM,它们只运行task并为该应用程序存储数据。如果多个用户需要共享您的群集,则可以使用不同的选项来管理分配,具体取决于群集管理器。
所有集群管理器上都可以使用的最简单选项是资源的静态分区。通过这种方法,每个应用程序都可以使用最大量的资源,并在整个持续时间内持有它们。这是Spark standalone、YARN以及粗粒度Mesos模式模式中使用的方法。可以根据群集类型配置资源分配如下:
-
独立模式
默认情况下,提交到独立模式群集的应用程序将以FIFO(先进先出)顺序运行,每个应用程序将尝试使用所有可用节点。您可以通过spark.cores.max在其中设置配置属性来限制应用程序使用的节点数,或者更改未设此条目的应用程序的默认值spark.deploy.defaultCores。最后,除了控制core外,每个应用程序的spark.executor.memory设置还控制其内存使用。
-
Mesos
要在Mesos上使用静态分区,请将spark.mesos.coarse配置属性设置为true,并可选择设置spark.cores.max为限制每个应用程序的资源共享,如独立模式。您还应该设置spark.executor.memory控制执行程序内存。
-
YARN
--num-executors* Spark YARN客户端的选项控制它将在集群上分配的执行程序数(spark.executor.instances作为配置属性),而--executor-memory (spark.executor.memory配置属性)和--executor-cores(spark.executor.cores配置属性)控制每个执行程序的资源。有关更多信息,请参阅 YARN Spark属性。
Mesos上的第二个选项是动态共享 CPU内核。在此模式下,每个Spark应用程序仍然具有固定且独立的内存分配(设置方式spark.executor.memory),但是当应用程序未在计算机上运行任务时,其他应用程序可能会在这些核心上运行任务。当您期望大量不过度活跃的应用程序(例如来自不同用户的shell会话)时,此模式非常有用。但是,它存在延迟可预测性较低的风险,因为应用程序可能需要一段时间才能在一个节点上获取内核。要使用此模式,只需使用 mesos://URL并设置spark.mesos.coarse为false。
注意,目前没有一种模式可以跨应用程序提供内存共享。如果您希望以这种方式共享数据,我们建议运行单个服务器应用程序,通过查询相同的RDD来提供多个请求。
2.1 动态资源分配
Spark提供了一种机制,可根据工作负载动态调整应用程序占用的资源。这意味着如果不再使用它们时会将资源返回给群集,并在有需求时再次请求它们。这一特性在多个应用程序间共享群集资源尤为有用。
默认情况下禁用此功能,并且可在所有粗粒度集群管理器上使用,即 独立模式,YARN模式和 Mesos粗粒度模式。
2.1.1 配置和设置
使用此功能有两个要求。首先,您的应用程序必须设置 spark.dynamicAllocation.enabled为true。其次,必须 在同一群集中的每个工作节点上设置外部shuffle服务,并spark.shuffle.service.enabled在应用程序中设置为true。外部shuffle服务的目的是允许删除执行程序而不删除由它们写入的shuffle文件(更详细描述 如下)。该属性配置在spark-default.conf文件中,不是spark-env.sh文件中。
#spark-default.conf
spark.dynamicAllocation.enabled true
spark.shuffle.service.enabled true
设置此服务的方式因集群管理器而异:
- 在独立模式下,只需启动spark.shuffle.service.enabled设置为的工作人员即可true。
- 在Mesos粗粒度模式下,$SPARK_HOME/sbin/start-mesos-shuffle-service.sh在spark.shuffle.service.enabled设置为的所有从属节点上运行true。例如,你可以通过Marathon这样做。
- 在YARN模式下,请按照此处的说明操作。
所有其他相关配置都是可选的,位于spark.dynamicAllocation.和 spark.shuffle.servic.称空间下。有关更多详细信息,请参阅 配置页面。
2.1.2 资源分配策略
在较高的层面上,Spark在不再使用executor时放弃它,并在需要时重新获取。由于没有确定的方法可以预测即将被删除的执行程序是否会在不久的将来运行任务,或者即将添加的新执行程序是否实际上是空闲的,我们需要一组启发式来确定何时删除并请求执行者。
a) 请求策略
启用动态分配的Spark应用程序在等待计划等待挂起的任务时请求其他执行程序。这种情况必然意味着现有的执行者集合不足以同时使已提交但尚未完成的所有任务饱和。
Spark会轮次请求执行程序。当有待处理的任务持续spark.dynamicAllocation.schedulerBacklogTimeout几秒钟时会触发实际请求,spark.dynamicAllocation.sustainedSchedulerBacklogTimeout如果待处理任务队列仍然存在,则会在此后每隔几秒再次触发。此外,每轮请求的执行者数量与上一轮相比呈指数增长。例如,一个应用程序将在第一轮中添加1个执行程序,然后在后续轮次中添加执行程序中的2,4,8等。
指数增长政策的动机是双重的。首先,应用程序应该在开始时谨慎地请求执行程序,以防只有少数额外的执行程序就足够了。这与TCP慢启动的理由相呼应。其次,应用程序应该能够及时提高其资源使用率,以防万一实际需要许多执行程序。
b) 删除策略
删除执行程序的策略要简单得多。当Spark应用程序空闲超过spark.dynamicAllocation.executorIdleTimeout几秒钟时,它会删除执行程序。请注意,在大多数情况下,此条件与请求条件互斥,因为如果仍有待安排的待处理任务,则执行程序不应处于空闲状态。
2.1.3 executor的优雅退役
在动态分配之前,Spark executor会在失败时或关联的应用程序退出时也会退出。在这两种情况下,不再需要与executor关联的所有状态,并且可以安全地丢弃。但是,通过动态分配,当显式删除executor时,应用程序仍在运行。如果应用程序尝试访问存储在执行程序中或由执行程序写入的状态,则必须执行重新计算状态。因此,Spark需要一种机制来通过在删除executor之前保留其状态来优雅地停用executor。
这一要求对于shuffle尤其重要。在shuffle期间,Spark executor首先将其自己的map输出写入本地磁盘,然后在其他executor尝试获取这些文件时充当这些文件的服务器。如果straggler是比同行运行更长时间的任务,动态分配可能会在shuffle完成之前删除executor,在这种情况下,必须重新计算由该executor写入的shuffle文件。
保留shuffle文件的解决方案是使用外部shuffle服务,也在Spark 1.2中引入。此服务指的是一个长期运行的进程,它独立于Spark应用程序及其执行程序在集群的每个节点上运行。如果启用该服务,Spark执行程序将从服务而不是相互之间获取随机文件。这意味着遗嘱执行人所写的任何shuffle状态可能会继续在遗嘱执行人的有效期内提供。
除了编写shuffle文件外,执行程序还会将数据缓存在磁盘或内存中。但是,删除执行程序后,将无法再访问所有缓存的数据。为了缓解这种情况,默认情况下,永远不会删除包含缓存数据的执行程序。您可以使用配置此行为 spark.dynamicAllocation.cachedExecutorIdleTimeout。在将来的版本中,缓存的数据可以通过堆外存储来保存,这类存储的精神与通过外部shuffle服务保存shuffle文件的方式类似。
3、应用程序内的调度
在给定的Spark应用程序(SparkContext实例)中,如果从不同线程提交多个并行job,则它们可以同时运行。Job这里指的是action,并在需要运行,以评估行动的任何任务。Spark的调度程序是完全线程安全的,并支持此用例,以支持提供多个请求的应用程序(例如,查询多个用户)。
默认情况下,Spark的调度程序以FIFO方式运行作业。每个Job分为“Stage”(例如map和reduce阶段),第一个job优先sh使用所有可用资源,而其阶段有任务要启动,然后第二个工作获得优先权等。如果工作在头部队列不需要使用整个集群,以后的作业可以立即开始运行,但如果队列头部的作业很大,则后续作业可能会显着延迟。
从Spark 0.8开始,还可以在作业之间配置公平共享。在公平共享下,Spark以“循环”方式在作业之间分配任务,以便所有作业获得大致相等的群集资源份额。这意味着在长时间工作运行时提交的短工作可以立即开始接收资源,并且仍然可以获得良好的响应时间,而无需等待长时间工作完成。此模式最适合多用户设置。
要启用公平调度程序,只需在配置SparkContext时将spark.scheduler.mode属性设置为FAIR:
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)
公平调度池
公平调度程序还支持将作业分组到池中,并为每个池设置不同的调度选项(例如权重)。例如,这可以用于为更重要的作业创建“高优先级”池,或者将每个用户的作业组合在一起,并为用户提供相同的份额,而不管他们拥有多少并发作业,而不是给予作业相等的份额。此方法以Hadoop Fair Scheduler为模型 。
在没有任何干预的情况下,新提交的作业将进入默认池,但可以通过将spark.scheduler.pool“本地属性” 添加到提交它们的线程中的SparkContext 来设置作业池。这样做如下:
// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")
设置这个本地属性后,所有的工作(在此线程通过调用这个线程内提交RDD.save,count,collect,等)将使用此池名称。该设置是每个线程,以便让线程代表同一个用户运行多个作业变得容易。如果您想清除与线程关联的池,只需调用:
sc.setLocalProperty("spark.scheduler.pool", null)
池的默认行为
默认情况下,每个池获得的集群份额相等(默认池中的每个作业的份额也相等),但在每个池中,作业按FIFO顺序运行。例如,如果您为每个用户创建一个池,这意味着每个用户将获得相同的群集份额,并且每个用户的查询将按顺序运行,而不是以后的查询从该用户的早期查询中获取资源。
配置池属性
还可以通过配置文件修改特定池的属性。每个池支持三个属性:
- schedulingMode:这可以是FIFO或FAIR,用于控制池中的作业是否相互排队(默认)或公平地共享池的资源。
- weight:这将控制池相对于其他池的共享。默认情况下,所有池的权重均为1.例如,如果为特定池提供权重2,则其资源将比其他活动池多2倍。设置高权重(例如1000)也可以实现 池之间的优先级 - 实质上,只要有活动作业,weight-1000池就会始终首先启动任务。
- minShare:除了总体权重之外,每个池都可以获得管理员希望拥有的最小份额(作为许多CPU核心)。公平调度程序始终尝试满足所有活动池的最小份额,然后根据权重重新分配额外资源。minShare因此,该属性可以是另一种确保池可以总是快速获得一定数量的资源(例如10个核心)的方法,而不会为集群的其余部分赋予高优先级。默认情况下,每个池minShare为0。
可以通过创建XML文件来设置池属性,类似于conf/fairscheduler.xml.template,并且要么fairscheduler.xml在类路径中放置一个文件,要么spark.scheduler.allocation.file在SparkConf中设置属性 。
conf.set("spark.scheduler.allocation.file", "/path/to/file")
XML文件的格式只是
<?xml version="1.0"?>
<allocations>
<pool name="production">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
<pool name="test">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>3</minShare>
</pool>
</allocations>
一个完整的例子也可用于conf/fairscheduler.xml.template。请注意,未在XML文件中配置的任何池将仅获取所有设置的默认值(调度模式FIFO,权重1和minShare 0)。