zoukankan      html  css  js  c++  java
  • Spark 学习(七) Spark的运行流程

    一,Spark中的基本概念

    二,Spark的运行流程

    三,Spark在不同集群的运行架构

      3.1 Spark on Standalone运行流程

      3.2 Spark on YARN运行过程

     

     

    正文

    文章原文:https://www.cnblogs.com/qingyunzong/p/8945933.html

    一,Spark中的基本概念

      在进行Spark的运作流程分析前请看下图:

      

      在这对上面的这些名词进行解释:

      (1)Application:表示你的应用程序

      (2)Driver:表示main()函数,创建SparkContext。由SparkContext负责与ClusterManager通信,进行资源的申请,任务的分配和监控等。程序执行完毕后关闭SparkContext

      (3)Executor:某个Application运行在Worker节点上的一个进程,该进程负责运行某些task,并且负责将数据存在内存或者磁盘上。在Spark on Yarn模式下,其进程名称为 CoarseGrainedExecutor Backend,一个CoarseGrainedExecutor Backend进程有且仅有一个executor对象,它负责将Task包装成taskRunner,并从线程池中抽取出一个空闲线程运行Task,这样,每个CoarseGrainedExecutorBackend能并行运行Task的数据就取决于分配给它的CPU的个数。

      (4)Worker:集群中可以运行Application代码的节点。在Standalone模式中指的是通过slave文件配置的worker节点,在Spark on Yarn模式中指的就是NodeManager节点。

      (5)Task:在Executor进程中执行任务的工作单元,多个Task组成一个Stage

      (6)Job:包含多个Task组成的并行计算,是由Action行为触发的

      (7)Stage:每个Job会被拆分很多组Task,作为一个TaskSet,其名称为Stage

      (8)DAGScheduler:根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler,其划分Stage的依据是RDD之间的依赖关系

      (9)TaskScheduler:将TaskSet提交给Worker(集群)运行,每个Executor运行什么Task就是在此处分配的。

      

    二,Spark的运行流程

      2.1 运行流程

      

      再次结合上面的图,对流程进行解析:

      (1)构建Spark Application的运行环境(启动Driver),Driver向资源管理器(可以是Standalone、Mesos或YARN)注册并申请运行Executor资源;

      (2)资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行情况将随着心跳发送到资源管理器上;

      (3)Driver构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler。Executor向SparkContext申请Task

      (4)Task Scheduler将Task发放给Executor运行同时Driver将应用程序代码发放给Executor。

      (5)Task在Executor上运行,运行完毕释放所有资源。

      详细图解:

      

      2.2 spark运行特点:

      (1)每个Application获取专属的executor进程,该进程在Application期间一直驻留,并以多线程方式运行tasks。这种Application隔离机制有其优势的,无论是从调度角度看(每个Driver调度它自己的任务),还是从运行角度看(来自不同Application的Task运行在不同的JVM中)。当然,这也意味着Spark Application不能跨应用程序共享数据,除非将数据写入到外部存储系统。

      (2)Spark与资源管理器无关,只要能够获取executor进程,并能保持相互通信就可以了。

      (3)提交SparkContext的Client应该靠近Worker节点(运行Executor的节点),最好是在同一个Rack里,因为Spark Application运行过程中SparkContext和Executor之间有大量的信息交换;如果想在远程集群中运行,最好使用RPC将SparkContext提交给集群,不要远离Worker运行SparkContext。

      (4)Task采用了数据本地性和推测执行的优化机制。

      2.3 DAGScheduler

      Job=多个stage,Stage=多个同种task, Task分为ShuffleMapTask和ResultTask,Dependency分为ShuffleDependency和NarrowDependency

      面向stage的切分,切分依据为宽依赖

      维护waiting jobs和active jobs,维护waiting stages、active stages和failed stages,以及与jobs的映射关系

      主要职能:

      1、接收提交Job的主入口,submitJob(rdd, ...)runJob(rdd, ...)。在SparkContext里会调用这两个方法。  

      生成一个Stage并提交,接着判断Stage是否有父Stage未完成,若有,提交并等待父Stage,以此类推。结果是:DAGScheduler里增加了一些waiting stage和一个runningstage。

      running stage提交后,分析stage里Task的类型,生成一个Task描述,即TaskSet。

      调用TaskScheduler.submitTask(taskSet, ...)方法,把Task描述提交给TaskScheduler。TaskScheduler依据资源量和触发分配条件,会为这个TaskSet分配资源并触发执行DAGScheduler提交job后,异步返回JobWaiter对象,能够返回job运行状态,能够cancel job,执行成功后会处理并返回结果

      2、处理TaskCompletionEvent 

      如果task执行成功,对应的stage里减去这个task,做一些计数工作: 

      如果task是ResultTask,计数器Accumulator加一,在job里为该task置true,job finish总数加一。加完后如果finish数目与partition数目相等,说明这个stage完成了,标记stage完成,从running stages里减去这个stage,做一些stage移除的清理工作

      如果task是ShuffleMapTask,计数器Accumulator加一,在stage里加上一个output location,里面是一个MapStatus类。MapStatusShuffleMapTask执行完成的返回,包含location信息和block size(可以选择压缩或未压缩)。同时检查该stage完成,向MapOutputTracker注册本stage里的shuffleId和location信息。然后检查stage的output location里是否存在空,若存在空,说明一些task失败了,整个stage重新提交;否则,继续从waiting stages里提交下一个需要做的stage

      如果task是重提交,对应的stage里增加这个task

      如果task是fetch失败,马上标记对应的stage完成,从running stages里减去。如果不允许retry,abort整个stage;否则,重新提交整个stage。另外,把这个fetch相关的location和map任务信息,从stage里剔除,从MapOutputTracker注销掉。最后,如果这次fetch的blockManagerId对象不为空,做一次ExecutorLost处理,下次shuffle会换在另一个executor上去执行。

      其他task状态会由TaskScheduler处理,如Exception, TaskResultLost, commitDenied等。

      3、其他与job相关的操作还包括:cancel job, cancel stage, resubmit failed stage等

       2.4 TaskScheduler

      维护task和executor对应关系,executor和物理资源对应关系,在排队的task和正在跑的task。

      内部维护一个任务队列,根据FIFO或Fair策略,调度任务。

       TaskScheduler本身是个接口,spark里只实现了一个TaskSchedulerImpl,理论上任务调度可以定制。

    三,Spark在不同集群的运行架构

      Spark注重建立良好的生态系统,它不仅支持多种外部文件存储系统,提供了多种多样的集群运行模式。部署在单台机器上时,既可以用本地(Local)模式运行,也可以使用伪分布式模式来运行;当以分布式集群部署的时候,可以根据自己集群的实际情况选择Standalone模式(Spark自带的模式)、YARN-Client模式或者YARN-Cluster模式。Spark的各种运行模式虽然在启动方式、运行位置、调度策略上各有不同,但它们的目的基本都是一致的,就是在合适的位置安全可靠的根据用户的配置和Job的需要运行和管理Task。

      3.1 Spark on Standalone运行流程

      Standalone模式是Spark实现的资源调度框架,其主要的节点有Client节点、Master节点和Worker节点。其中Driver既可以运行在Master节点上中,也可以运行在本地Client端。当用spark-shell交互式工具提交Spark的Job时,Driver在Master节点上运行;当使用spark-submit工具提交Job或者在Eclips、IDEA等开发平台上使用”new SparkConf().setMaster(“spark://master:7077”)”方式运行Spark任务时,Driver是运行在本地Client端上的。

      运行过程文字说明:

      

    1、我们提交一个任务,任务就叫Application
    2、初始化程序的入口SparkContext, 
      2.1 初始化DAG Scheduler
      2.2 初始化Task Scheduler
    3、Task Scheduler向master去进行注册并申请资源(CPU Core和Memory)
    4、Master根据SparkContext的资源申请要求和Worker心跳周期内报告的信息决定在哪个Worker上分配资源,然后在该Worker上获取资源,然后启动StandaloneExecutorBackend;顺便初
          始化好了一个线程池
    5、StandaloneExecutorBackend向Driver(SparkContext)注册,这样Driver就知道哪些Executor为他进行服务了。
       到这个时候其实我们的初始化过程基本完成了,我们开始执行transformation的代码,但是代码并不会真正的运行,直到我们遇到一个action操作。生产一个job任务,进行stage的划分
    6、SparkContext将Applicaiton代码发送给StandaloneExecutorBackend;并且SparkContext解析Applicaiton代码,构建DAG图,并提交给DAG Scheduler分解成Stage
    (当碰到Action操作 时,就会催生Job;每个Job中含有1个或多个Stage,Stage一般在获取外部数据和shuffle之前产生)。
    7、将Stage(或者称为TaskSet)提交给Task Scheduler。Task Scheduler负责将Task分配到相应的Worker,最后提交给StandaloneExecutorBackend执行; 8、对task进行序列化,并根据task的分配算法,分配task 9、对接收过来的task进行反序列化,把task封装成一个线程 10、开始执行Task,并向SparkContext报告,直至Task完成。 11、资源注销

      运行过程图形说明:

      3.2 Spark on YARN运行过程

      YARN是一种统一资源管理机制,在其上面可以运行多套计算框架。目前的大数据技术世界,大多数公司除了使用Spark来进行数据计算,由于历史原因或者单方面业务处理的性能考虑而使用着其他的计算框架,比如MapReduce、Storm等计算框架。Spark基于此种情况开发了Spark on YARN的运行模式,由于借助了YARN良好的弹性资源管理机制,不仅部署Application更加方便,而且用户在YARN集群中运行的服务和Application的资源也完全隔离,更具实践应用价值的是YARN可以通过队列的方式,管理同时运行在集群中的多个服务。

    Spark on YARN模式根据Driver在集群中的位置分为两种模式:一种是YARN-Client模式,另一种是YARN-Cluster(或称为YARN-Standalone模式)。

      任何框架与YARN的结合,都必须遵循YARN的开发模式。在分析Spark on YARN的实现细节之前,有必要先分析一下YARN框架的一些基本原理。

      参考:http://www.cnblogs.com/qingyunzong/p/8615096.html

      Yarn-Client模式中,Driver在客户端本地运行,这种模式可以使得Spark Application和客户端进行交互,因为Driver在客户端,所以可以通过webUI访问Driver的状态,默认是http://hadoop1:4040访问,而YARN通过http:// hadoop1:8088访问。

    YARN-client的工作流程分为以下几个步骤:

      

    1.Spark Yarn Client向YARN的ResourceManager申请启动Application Master。同时在SparkContent初始化中将创建DAGScheduler和TASKScheduler等,
    由于我们选择的是Yarn-Client模式,程序会选择YarnClientClusterScheduler和YarnClientSchedulerBackend; 2.ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,
    与YARN-Cluster区别的是在该ApplicationMaster不运行SparkContext,只与SparkContext进行联系进行资源的分派; 3.Client中的SparkContext初始化完毕后,与ApplicationMaster建立通讯,向ResourceManager注册,根据任务信息向ResourceManager申请资源(Container); 4.一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动启动CoarseGrainedExecutorBackend,
    CoarseGrainedExecutorBackend启动后会向Client中的SparkContext注册并申请Task;
    5.Client中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向Driver汇报运行的状态和进度,
    以让Client随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
    6.应用程序运行完成后,Client的SparkContext向ResourceManager申请注销并关闭自己。

      图例:

      

      

      在YARN-Cluster模式中,当用户向YARN中提交一个应用程序后,YARN将分两个阶段运行该应用程序:第一个阶段是把Spark的Driver作为一个ApplicationMaster在YARN集群中先启动;第二个阶段是由ApplicationMaster创建应用程序,然后为它向ResourceManager申请资源,并启动Executor来运行Task,同时监控它的整个运行过程,直到运行完成。

    YARN-cluster的工作流程分为以下几个步骤:

      

    1.   Spark Yarn Client向YARN中提交应用程序,包括ApplicationMaster程序、启动ApplicationMaster的命令、需要在Executor中运行的程序等;
    
    2.   ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,
    其中ApplicationMaster进行SparkContext等的初始化;
    3. ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态,然后它将采用轮询的方式通过RPC协议为各个任务申请资源,
    并监控它们的运行状态直到运行结束;
    4. 一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动启动CoarseGrainedExecutorBackend,
    CoarseGrainedExecutorBackend启动后会向ApplicationMaster中的SparkContext注册并申请Task。这一点和Standalone模式一样,只不过SparkContext在
    Spark Application中初始化时,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler进行任务的调度,其中YarnClusterScheduler只是对TaskSchedulerImpl
    的一个简单包装,增加了对Executor的等待逻辑等;
    5. ApplicationMaster中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster
    汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
    6. 应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己。

      图例:

      YARN-Client 与 YARN-Cluster 区别

      理解YARN-Client和YARN-Cluster深层次的区别之前先清楚一个概念:Application Master。在YARN中,每个Application实例都有一个ApplicationMaster进程,它是Application启动的第一个容器。它负责和ResourceManager打交道并请求资源,获取资源之后告诉NodeManager为其启动Container。从深层次的含义讲YARN-Cluster和YARN-Client模式的区别其实就是ApplicationMaster进程的区别。

    1、YARN-Cluster模式下,Driver运行在AM(Application Master)中,它负责向YARN申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉Client,
    作业会继续在YARN上运行,因而YARN-Cluster模式不适合运行交互类型的作业; 2、YARN-Client模式下,Application Master仅仅向YARN请求Executor,Client会和请求的Container通信来调度他们工作,也就是说Client不能离开。

      

      

  • 相关阅读:
    开源项目
    [Accessibility] Missing contentDescription attribute on image [可取行]失踪contentDescription属性图像
    Android 布局 中实现适应屏幕大小及组件滚动
    EF 错误记录
    EasyUI 加载时需要显示和隐藏 panel(面板)内容破版问题
    IE 报表缩放后页面破版
    VS 2017 引入nuget 问题
    SSRS 报表显示页面 asp net session丢失或者找不到 asp net session has expired or could not be found()
    log4net 配置
    网站
  • 原文地址:https://www.cnblogs.com/tashanzhishi/p/10991367.html
Copyright © 2011-2022 走看看