zoukankan      html  css  js  c++  java
  • sparkContext之一:sparkContext的初始化分析

     Spark源码学习:sparkContext的初始化分析

          spark可以运行在本地模式local下,可以运行在yarn和standalone模式下,但是本地程序是通过什么渠道和这些集群交互的呢?那就是sparkContext,他在spark生态系统中的作用不言而喻,绝对是最重要的,整体架构如图所示:

            这里我们简单的来剖析一下,sparkContext在初始化最重要的流程和大致框架。spark代码第一句都是先创建sparkConf之后作为参数传递给sparkContext在进行创建sc,之后的一切操作都离不开sparkContext。

    val conf = new SparkConf().setAppName(appName).setMaster(master)
    sc=new SparkContext(conf)

            在debug模式下进入源代码。

           

          可以看到这里的sparkContext使用sparkConf作为参数来进行传递参数。变量allowMultipleContext是用来决定是否在spark中指运行一个任务,markPartiallyContructed来确保实例的唯一性。

          上面是对sparkConf进行复制之后,然后对配置信息进行校验。上面的代码显示spark运行必须指定master和name,否则就会抛出异常,结束初始化过程,在后面我们会看到master是用来设置部署模式,name用来指定程序名称,相对简单。

          下面重要的是sparkEnv的初始化,sparkEnv是spark的执行环境对象,包括很多与Executor执行相关的对象。在local模式下Driver会创建Executor,local-cluster部署模式或者standalone部署模式下worker另起CoarseGrainedExecutorBackend进程中创建executor,继而创建taskRunner方法,运行runtask运行任务,这个方法会有自己的具体实现类,shufflemaptask和resulttask有具体的实现。所以sparkEnv存在于driver或者CoarseGrainedExecutorBackend进程中。代码如下:

         我们进入createDriverEnv方法,它隶属于SparkEnv的方法。主要保存spark运行时环境变量。这里阐述最重要的变量初始化,至于什么页面监控之类的就不看了。

           (1) Akka的分布式消息系统actorSystem的初始化。

           (2) 创建map任务输出跟踪器mapoutputTracker,主要就是跟踪map任务把数据结果写到哪里去了,reduce也可以去取数据map,reduce,shuffle都有自己所对应的ID,着重介绍一下MapOutputTrackerMaxter,它内部使用mapStatuses来维护跟中map任务的输出状态,这个数据结构是一个map,其中key对应shufleID,value存储各个map任务对应的状态信息mapStatus。由于mapStatus是由blockmanagerid(key)和bytesSize(value)组成,key是表示这些计算的中间结果存放在那个blockManager,value表示不同的reduceID要读取的数据大小这时reduce就知道从哪里fetch数据,并且判断数据的大小(和0比较来确保确实获得了数据)。

          driver和executor处理mapOutputtTrackermMaster的方式不同:

          Driver:创建mapOutputtTrackermMaster,然后创建mapOutputtTrackermMasterActor,并且注册到ActorSystem.

          Executor:创建mapOutputtTrackerm,并从ActorSystem中找到mapOutputtTrackermMasterActor。

          有那么多的executor,当然就有跟多的mapTask,那driver是怎样知道各个mapTask的执行任务信息呢?那就靠我们上面的mapOutputtTrackermMasterActor啦,哈哈。map任务的状态就是有Executor向持有mapOutputtTracker-MasterActor来发送消息,把map任务状态同步到mapOutputtTracker的mapstatuses上去。问题又来了,executor怎样找到mapOutputtTrackermMasterActor呢?那就靠registe'OrLookup函数啦,它后台使用ActorSystem提供的分布式消息机制实现的。

          那就来看看具体的代码吧:

         

           (3)实例化ShuffleManager。它主要是负责管理本地和远程的block的shuffle操作。ShuffleManager通过反射机制来生成默认的SortShuffleManager。可以通过修改spark.shuffle.manager设置为hash来显示的控制使用HashShuffle-Manager。这里的sort主要是指在shuffle中的key-value,key是默认排序好的。具体代码如下:

         (4)块儿传输服务BlockTransferServervice。这里默认的是NettyBlockTransferServervice。可以通过配置属性spark.shuffle.blockTransferService使用NioBlockTransferService。NettyBlockTransferServervice使用Netty提供的异步事件驱动网络应用框架,提供web服务及客户端,获取远程节点上的Block集合。代码如下:

          (5)创建BlockManagerMaster。它是负责block的管理和协调,具体的操作是依赖于BlockManagerMaster-Actor,因为它需要与Executor上的BlockManager通过Actor进行通信。(这里的Endpoint相当于Actor)

          (6) 创建BlockManager。他主要运行在worker节点上。虽然这里创建了,但是只有在他的init初始化函数之后才是有效的。

         (7)创建广播管理器BroadcastManager。主要是负责把序列化时候的RDD,job以及shuffleDependence等,以及配置信息存储在本地,有时候还会存储到其他的节点以保持可靠性。

        (8) 创建存储管理器CacheManager。他主要用于缓存RDD某个分区计算的中间结果,缓存计算结果在迭代计算的时候发生。他很有用,可以减少磁盘IO,加快执行速度。

        (9) HTTP文件服务器httpFileServer。退工对jar以及其他文件的http访问。例如jar包的上传等等。他的端口号由spark.fileserver.port来配置,默认情况下是0,表示随机生成端口号。

        (10)最后创建测量系统MetricsSystem,是spark的测量系统,不用深究,这里我们不管他。

        上面的10条主要说的是sparkEnv的创建,执行环境也就刚刚初始化完毕,核心的核心还没开始呢。

       具体的变量创建等等下篇我们再来介绍。

    spark的sparkContext初始化中的sparkEnv相关的概念以及重要的组成部分在上一部分简单的介绍 ,这篇简单的介绍一下和DAGScheduler,taskSetManager,SchedulerBackend,taskScheduler等等一些和sparkContext初始化相关的概念,这里讨论的是重中之重,下面就来大体的来看看。首先看一下最重要最核心的一个片段:

            在上面的一片代码中我们可以看到,首先是SparkContext.createTaskScheduler来创建SchedulerBackend以及TaskScheduler。理解清楚这两个概念极为重要。首先来看看SchedulerBackend,从源码的注释上可以知道Scheduler-Backend仅仅是后台调度系统的一个抽象接口,目前在1.5.2接口有四个实现类。TaskScheduler主要是底层人物调度接口,仅仅是有一个实现类TaskSchedulerImpl。TaskSchedulerImpl就是通过schedulerBackend来在不同的cluster上调度任务,例如可以设置变量isLocal为true来使用lLocalBackend。

           createTaskScheduler方法的两个参数一个sc一个master,master主要就是用来匹配对应的运行模型,例如:

            上面的这个model代表的是standalone模式,SparkDeploySchedulerBackend就是standalone的运行模式。接下来进入initlalize方法看看:

            上面的这个初始化方法就是在taskSchedulerImpl下初始化的,会发现在任务的调度模式中选择了两种调度方式,先进先出和公平调度方法。

            继而开始了taskscheduler的start方法,进入taskSchedluer的run方法,是一个抽象接口,抽象接口的实现task-SchedluerImpl的run方法来看看。backend就是schedulerBackend后台,那么这个start方法是有4个实现的start方法。

            那么就加入loacl模式下的LocalBackend代码:

    [java] view plain copy
     
     print?在CODE上查看代码片派生到我的代码片
    1. private[spark] class LocalBackend(  
    2.     conf: SparkConf,  
    3.     scheduler: TaskSchedulerImpl,  
    4.     val totalCores: Int)  
    5.   extends SchedulerBackend with ExecutorBackend with Logging {  
    6.   
    7.   private val appId = "local-" + System.currentTimeMillis  
    8.   var localEndpoint: RpcEndpointRef = null  
    9.   
    10.   override def start()   
    11.   
    12.   override def stop()   
    13.   
    14.   override def reviveOffers()   
    15.   
    16.   override def defaultParallelism()  
    17.   
    18.   override def killTask(taskId: Long, executorId: String, interruptThread: Boolean)   
    19.   
    20.   override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer)   
    21.   
    22.   override def applicationId(): String = appId  

          这里还有一部分中最关键的DAGScheduler的创建相关的代码,下篇进行价绍。

  • 相关阅读:
    在C#代码中应用Log4Net(二)典型的使用方式
    在C#代码中应用Log4Net(一)简单使用Log4Net
    Windows Azure Active Directory (2) Windows Azure AD基础
    Windows Azure Virtual Network (6) 设置Azure Virtual Machine固定公网IP (Virtual IP Address, VIP) (1)
    Windows Azure Active Directory (1) 前言
    Azure China (6) SAP 应用在华登陆 Windows Azure 公有云
    Microsoft Azure News(3) Azure新的基本实例上线 (Basic Virtual Machine)
    Microsoft Azure News(2) 在Microsoft Azure上运行SAP应用程序
    Microsoft Azure News(1) 新的数据中心Japan East, Japan West and Brazil South
    Windows Azure HandBook (2) Azure China提供的服务
  • 原文地址:https://www.cnblogs.com/duanxz/p/4423295.html
Copyright © 2011-2022 走看看