zoukankan      html  css  js  c++  java
  • Spark执行流程【资源分配、资源调度、任务调度】

    一.资源分配策略

      1.静态分配

        Spark程序启动时即一次性分配所有的资源,运行过程中固定不变,直至程序退出。这是一种最简单可靠的分配策略,强烈建议使用这种策略,除非非常确定这种方式无法满足需求。需要注意的是,目前所有模式下都没有在不同Spark程序之间提供内存共享的能力。如果你想使用这种方式来共享数据,建议运行一个单独的服务程序来响应不同的情况去查询同一个RDD。在Spark1.6及以上版本中,可以使用in-memory存储系统【例如:Tachyon】会提供另外的方式在不同的程序之间共享RDD。

      2.动态分配

        Spark1.2引入了基于负载情况的集群动态资源分配,运行过程中可以不断调整分配的资源,可以按需增加或减少。这意味着程序可能会在不使用资源的时候把资源还给集群,在需要的时候再向集群申请,这个特性在多个程序共享集群资源的时候特别有用。这比静态分配复杂的多,需要在实践中不断调试才能达到最优。

        这个特性的所有配置项都是类似spark.dynamicAllocation.*形式命名。要想开启这个特性,必须设置spark.dynamicAllocation.enabled=true。此外,Spark程序还必须使用一个外部shuffle服务,这个服务的目的是保存执行器输出的shuffle文件,这样执行器可以安全移除。开启这个服务需设置spark.shuffle.service.enabled=true。在YARN中,这个外部的shuffle服务是集群中各个NodeManager的org.apache.spark.yarn.network.YarnShuffleService实现的。

    二.资源请求策略【仅限动态资源分配】

      从最上面看,Spark应该在执行器不使用的时候扔掉它,然后在需要的时候再获取。由于没有准确的方式来预计一个即将被删除的执行器是否会稍后马上运行一个Task,或者一个马上要添加的新执行器实际上是空闲的,因此需要一些启发性的方法来判断是否应该新增或删除执行器。

      1.请求策略

        开启动态资源分配之后,当Spark程序有Task处于排队待调度时,会请求额外的执行器资源,这种状况说明现有的执行器不足以让所有尚未执行完的Task的并发达到饱和。Spark会循环请求执行器资源。每spark.dynamicAllocation.schedulerBacklogTimeout触发执行一次。每一轮请求的执行器的数量会成指数增长,初始值为1。

      2.移除策略

        当Spark程序的某个执行器处于空闲状态的时间超过 spark.dynamicAllocation.executorIdleTimeout秒之后会被移除。注意,在大部分情况下,移除执行器与请求执行器的条件是互斥的,即当有Task在等待执行时,此时是没有空闲执行器的。

    三.执行器安全退出

      在出现动态分配之前,一个Spark执行器的退出要么是程序执行失败,要么是程序执行成功。不管哪一种情况,执行器的所有附属状态信息都不在需要,可以安全地直接丢弃。然而有了动态分配之后,执行器退出时进程可能还在运行。如果程序尝试去访问执行器的数据,就会触发重新计算。因此,Spark需要一种在执行器退出之前保存执行器状态的机制。

      这个需求对于shuffle特别重要。在shuffle期间,Spark执行器首先将自己的map结果写入本地磁盘,然后作为一个服务来运行,其它执行器请求访问这些数据时进行响应。但是对于那些执行时间远超同伴的task任务来说,动态分配可能会在shuffle结束之前移除这个执行器,这样执行器写的shuffle文件就需要进行无谓的重新计算。

      保存shuffle文件的方法是使用一个外部的shuffle服务,这个服务在Spark1.2中已经引入。这个服务指向一个长期运行的进程,在集群的每个节点上都会运行,而且独立于Spark程序和它们的执行器。如果这个服务开启,Spark执行器会从这些服务那里获取shuffle文件,而不是其他执行器。这意味着执行器写的任何shuffle状态在执行器退出之后还可以保留。

      除了写shuffle文件外,执行器还会在磁盘和内存中缓存数据。当执行器被移除后,所有的缓存数据都不能被访问。在Spark1.2中还没有被解决。在以后的版本中,缓存的数据可以通过一个堆外的存储系统【Tachyon】被保留下来,思路跟shuffle文件被一个外部的shuffle服务保存起来一样。

    四.资源调度&任务调度

      1.启动集群后,Worker节点会周期性的【心跳】向Master节点汇报资源情况,Master掌握集群资源情况。

      2.当Spark提交一个Application后,根据RDD之间的依赖关系将Application构建成一个DAG有向无环图。

      3.任务提交后,Spark会在Driver端创建两个对象:DAGScheduler和TaskScheduler。

      4.DAGScheduler是任务调度的高层调度器,是一个对象。DAGScheduler的主要作用就是将DAG根据RDD之间的宽窄依赖关系划分为一个个Stage,然后将这些Stage以TaskSet的形式提交给TaskScheduler【TaskScheduler是任务调度的底层调度器,这里TaskSet其实就是一个集合,里面封装的就是一个个的task任务,也就是stage中并行的task任务】。

      5.TaskScheduler会遍历TaskSet集合,拿到对应的task后会将task发送到计算节点Executor上去执行【就是发送到Executor上的线程池ThreadPool上执行】。

      6.Task在Executor线程池中的运行情况会向TaskScheduler反馈,当task运行失败时,则由TaskScheduler负责重试,将task重新发送到Executor去执行,默认重试3次。如果重试3次依然失败,那么这个task所在的Stage就失败了。Stage失败则由DAGScheduler负责重试,重新发送TaskSet到TaskScheduler,Stage默认会重试4次。如果4次以后依然失败,那么这个job就失败了,对应的这个application也失败。

      备注:TaskScheduler不仅负责重试task,还负责重试straggling【执行相比其它任务缓慢的task】task。TaskScheduler会重新启动一个新的task来运行这个缓慢的task执行的处理逻辑。两个task那个先执行完,就以那个task的执行结果为准。这就是spark的推测执行机制。在spark中推测执行默认是关闭的。推测执行可以通过配置spark.speculation属性来配置。

    五.图解调度流程

      

    六.注意

      1.对于ETL类型要入数据库的业务要关闭推测执行机制,这样就不会又重复的数据入库。

      2.如果遇到数据倾斜的情况,开启推测执行则会有可能导致一直会有task重新启动处理相同的逻辑,任务可能一直处于处理不完的状态。

  • 相关阅读:
    MyEclipse中的Tomcat跑大项目时内存溢出:permgen space
    MyEclipse新建工作空间后的配置详细步骤
    解决eclipse复制粘贴js代码卡死的问题
    eclipse复制工作空间配置
    maven项目检出后报错(包括编译报错和运行报错)的常见检查处理方式
    MyEclipse中引用的maven配置文件只访问私服的配置
    图标网站
    afasf
    一个权限管理模块的设计(转载)
    奇艺下载
  • 原文地址:https://www.cnblogs.com/yszd/p/10656699.html
Copyright © 2011-2022 走看看