zoukankan      html  css  js  c++  java
  • Spark-源码-SparkContext的初始化

    Spark版本 1.3
    SparkContext初始化流程

    1.0 在我们的主类 main() 方法中经常会这么写
      val conf = new SparkConf().setAppName("name").setMaster("local")
      val sc = new SparkContext(conf)
      conf 中保存的是Spark的参数
      sc 是我们的Spark上下文...好无聊...
      conf不再去看(里边都是对于参数的操作, 现阶段不看)
      sc 从 SparkContext(config: SparkConf) 开始~

    1.1  * 很重要:SparkContext是Spark提交任务到集群的入口
      * 我们看一下SparkContext的主构造器
      * 1.调用 createSparkEnv 方法创建 SparkEnv, 里面有一个非常重要的对象 ActorSystem
      * 2.创建 TaskScheduler -> 根据提交任务的URL进行匹配 -> TaskSchedulerImpl -> SparkDeploySchedulerBackend(里面有两个Actor)
      * 3.创建 DAGScheduler
      * 4.taskScheduler.start()


    1.2	private[spark] val env = createSparkEnv(conf, isLocal, listenerBus)
      class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
    	// 调用 def createSparkEnv() 方法, 转到:1.4
    	private[spark] def createSparkEnv(
    		  	conf: SparkConf,
    		  	isLocal: Boolean,
    			listenerBus: LiveListenerBus): SparkEnv = {
    		SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
    	}
    
    	//创建一个后端调度器(schedulerBackend) 和 一个任务调度器(taskScheduler), 转到:1.5
    	private[spark] var (schedulerBackend, taskScheduler) =
        	SparkContext.createTaskScheduler(this, master)
    	
    	// 通过 ActorSystem 创建了一个Actor,这个心跳是 Executors 和 DriverActor 的心跳
    	private val heartbeatReceiver = env.actorSystem.actorOf(
        	Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
    
    	// 创建了一个DAGScheduler,以后用来把DAG切分成Stage
    	@volatile private[spark] var dagScheduler: DAGScheduler = _
    	try{
    		dagScheduler = new DAGScheduler(this)
    	}catch{...}
    
    	// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's constructor
    	// 在DAG构造函数中为每个TaskScheduler设置DAGScheduler后, 启动taskScheduler(DAG源码分析, 详见后续文章) 转到:1.6
    	taskScheduler.start()
    	...
    }
    

      

    1.4
    // SparkContext.createSparkEnv中调用了 SparkEnv.createDriverEnv
    private[spark] def createDriverEnv(
    	conf: SparkConf,
    	isLocal: Boolean,
    	listenerBus: LiveListenerBus,
    	mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
    	assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
    	assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
    	val hostname = conf.get("spark.driver.host")
    	val port = conf.get("spark.driver.port").toInt
    	//调用 create 方法 并传入一坨参数
    	create(
    		conf,
    		SparkContext.DRIVER_IDENTIFIER,
    		hostname,
    		port,
    		isDriver = true,
    		isLocal = isLocal,
    		listenerBus = listenerBus,
    		mockOutputCommitCoordinator = mockOutputCommitCoordinator
    	)
    }
    
    private def create(
    	conf: SparkConf,
    	executorId: String,
    	hostname: String,
    	port: Int,
    	isDriver: Boolean,
    	isLocal: Boolean,
    	listenerBus: LiveListenerBus = null,
    	numUsableCores: Int = 0,
    	mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
    
    	...
    
    	// Create the ActorSystem for Akka and get the port it binds to.
    	val (actorSystem, boundPort) = {
    		val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
    		// 利用AkkaUtils这个工具类创建ActorSystem
    		AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)
    	}
    
    	...
    
    	// 最终将创建好的ActorSystem返回给SparkEnv
    	// 回调步骤 new Spark() -> create() -> SparkEnv.createDriverEnv -> SparkContext.createSparkEnv()
    	new SparkEnv(
    		executorId,
    		actorSystem,
    		serializer,
    		closureSerializer,
    		cacheManager,
    		mapOutputTracker,
    		shuffleManager,
    		broadcastManager,
    		blockTransferService,
    		blockManager,
    		securityManager,
    		httpFileServer,
    		sparkFilesDir,
    		metricsSystem,
    		shuffleMemoryManager,
    		outputCommitCoordinator,
    		conf)
    }
    

      

    1.5 
    //SparkContext.createSparkEnv 中调用了 (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master)
    /**
       * Create a task scheduler based on a given master URL.
       * Return a 2-tuple of the scheduler backend and the task scheduler.
       */
    // 根据提交任务时指定的URL创建相应的TaskScheduler 关于TaskScheduler 转到:1.7
    private def createTaskScheduler(sc: SparkContext,
        master: String): (SchedulerBackend, TaskScheduler) = {
    	//模式匹配
    	master match {
    		// spark的StandAlone模式
        	case SPARK_REGEX(sparkUrl) =>
    	        // 创建了一个TaskSchedulerImpl. 注: TaskScheduler是一个特质
    	        val scheduler = new TaskSchedulerImpl(sc)
    	        val masterUrls = sparkUrl.split(",").map("spark://" + _)
    	        // 创建了一个SparkDeploySchedulerBackend(Spark后端部署调度器)
    	        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
    	        // 调用initialize, 使用Spark后端部署调度器 初始化调度器
    	        scheduler.initialize(backend)
    	        (backend, scheduler)
    	    ... // 其他模式
    	}
    }
    

      





    1.6
    DAGScheduler 简介

    实现面向阶段调度的高级调度层。它计算每个作业的阶段DAG,跟踪哪些RDD和阶段输出具体化,并找到运行作业的最小计划。
    然后,它将阶段作为TaskSets提交给在集群上运行它们的底层TaskScheduler实现。

    除了提供阶段的DAG之外,此类还根据当前缓存状态确定运行每个任务的首选位置,并将这些位置传递给低级TaskScheduler。
    此外,它处理由于shuffle输出文件丢失而导致的故障,在这种情况下可能需要重新提交旧阶段。在一个不是由随机文件丢失引
    起的阶段内的故障由TaskScheduler处理,它将在取消整个阶段之前重试每个任务很多次。

    以下是制作或查看此课程更改时使用的核对清单:
    添加新数据结构时,请更新 `DAGSchedulerSuite.assertDataStructuresEmpty`以包含新结构。这将有助于捕获内存泄漏。



    1.7
    TaskScheduler简介

    低级任务调度程序接口,目前由TaskSchedulerImpl专门实现。

    该接口允许插入不同的任务调度程序。 每个TaskScheduler都为单个SparkContext调度任务。

    这些调度程序从DAGScheduler为每个阶段获取提交给它们的任务集,并负责将任务发送到集群,
    运行它们,如果存在故障则重试,以及减轻落后者。 他们将事件返回给DAGScheduler。

  • 相关阅读:
    转:MongoDB · 引擎特性 · journal 与 oplog,究竟谁先写入?
    Oracle 11g R2(11.2.0.4) RAC 数据文件路径错误解决--ORA-01157 ORA-01110: 数据文件
    Oracle 高可用作业测试
    Oracle 常见进程
    Mycat-server-1.6.5 常见分片方式
    转:三思!大规模MySQL运维陷阱之基于MyCat的伪分布式架构
    RabbitMQ 安装
    cmd 切换目录和配置环境变量和Curl批量执行Url
    sql中表变量
    Core 项目下使用SQl语句
  • 原文地址:https://www.cnblogs.com/chinashenkai/p/9982588.html
Copyright © 2011-2022 走看看