zoukankan      html  css  js  c++  java
  • 13、SparkContext详解

    一、SparkContext原理

    1、图解

    image

    1、当driver启动后会去运行我们的application,在运行application的时候,所有spark程序的第一行都是先创建SparkContext,在创建SparkContext的时候,它的内部创建
    两个非常重要的东西DAGSchedule和TaskSchedule,TaskSchedule在创建的时候就会向spark集群的master进行注册。
    
    2、spark最核心的内部会创建3个东西,首先是会createTaskScheduler(),createTaskScheduler()里面会创建三个东西,首先是TaskSchedulerImpl(它其实就是TaskScheduler),
    然后创建SparkDeploySchedulerBackend(它在底层会受TaskSchedulerImp的控制,实际上负责与Master的注册,Executor的反注册,Task发送到Executor等操作),然后调用
    TaskSchedulerImpl的init()方法,创建SchedulerPool调度池 ,它有不同的优先策略,比如FIFO先进先出。
    
    3、在创建完TaskSchedulerImpl和SparkDeploySchedulerBackend之后,是执行TaskSchedulerImpl的start()方法,这个方法内部实际上会调用SparkDeploySchedulerBackend的
    start()方法,在这个start()方法里会创建AppClient,AppClient里会启动一个线程,也就是ClientActor,ClientActor会调用两个方法,registerWithMaster(),会去调用
    tryRegisterAllMaster()。这两个方法会向master发送一个东西叫做RegisterApplication(case class,里面封装了application的信息),就会发送到spark集群的Master上面去,
    后面回去找worker,然后启动executor,然后executor启动后会反向注册到SparkDeploySchedulerBackend上面去。这就是TaskScheduler的初始化机制。TaskSchedulerImpl底层
    主要基于SparkDeploySchedulerBackend工作。
    
    4、DAGScheduler创建的时候有一个非常重要的东西,DAGSchedulerEvenProcessActor,DAGScheduler底层基于该组件进行通讯(线程)
    
    5、SparkUI。4040端口,线上application运行的状态,启动一个jetty服务器,来提供web服务,从而显示网页。

    二、SparkContext源码

    1、TaskScheduler创建

    ###SparkContext.scala
    
    // Create and start the scheduler
      private[spark] var (schedulerBackend, taskScheduler) =
        SparkContext.createTaskScheduler(this, master)
    
    
    
    //不同的提交模式,会创建不同的TaskScheduler
    //standalone模式
          case SPARK_REGEX(sparkUrl) =>
         //TaskSchedulerImpl()底层通过操作一个SchedulerBackend,针对不同的种类的cluster(standalone、yarn和mesos),调度task。
        
         //他也可以通过使用一个LocalBackend,并且将isLocal参数设置为true,来在本地模式下工作。
       
         //它负责处理一些通用的逻辑,比如决定多个job的调度顺序,启动检查任务执行
        
         //客户端首先应用调度initialize()方法和start()方法,然后通过runTasks()方法提交task sets
            val scheduler = new TaskSchedulerImpl(sc)
            val masterUrls = sparkUrl.split(",").map("spark://" + _)
            val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
            scheduler.initialize(backend)
            (backend, scheduler)
    
    
    
    
    
    ###TaskSchedulerImpl.scala
    
      def initialize(backend: SchedulerBackend) {
        this.backend = backend
        // temporarily set rootPool name to empty
        rootPool = new Pool("", schedulingMode, 0, 0)
        schedulableBuilder = {
          schedulingMode match {
            case SchedulingMode.FIFO =>
              new FIFOSchedulableBuilder(rootPool)
            case SchedulingMode.FAIR =>
              new FairSchedulableBuilder(rootPool, conf)
          }
        }
        schedulableBuilder.buildPools()
      }

    TaskScheduler启动:

    ###TaskSchedulerImpl.scala
    
      override def start() {
    //重点是调用了SparkDeploySchedulerBackend类的start
        backend.start()
    
        if (!isLocal && conf.getBoolean("spark.speculation", false)) {
          logInfo("Starting speculative execution thread")
          import sc.env.actorSystem.dispatcher
          sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
                SPECULATION_INTERVAL milliseconds) {
            Utils.tryOrExit { checkSpeculatableTasks() }
          }
        }
      }
    
    
    
    
    
    ###SparkDeploySchedulerBackend.scala
    
      override def start() {
        super.start()
    
        // The endpoint for executors to talk to us
        val driverUrl = AkkaUtils.address(
          AkkaUtils.protocol(actorSystem),
          SparkEnv.driverActorSystemName,
          conf.get("spark.driver.host"),
          conf.get("spark.driver.port"),
          CoarseGrainedSchedulerBackend.ACTOR_NAME)
        val args = Seq(
          "--driver-url", driverUrl,
          "--executor-id", "{{EXECUTOR_ID}}",
          "--hostname", "{{HOSTNAME}}",
          "--cores", "{{CORES}}",
          "--app-id", "{{APP_ID}}",
          "--worker-url", "{{WORKER_URL}}")
        val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
          .map(Utils.splitCommandString).getOrElse(Seq.empty)
        val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
          .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
        val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
          .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    
        // When testing, expose the parent class path to the child. This is processed by
        // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
        // when the assembly is built with the "*-provided" profiles enabled.
        val testingClassPath =
          if (sys.props.contains("spark.testing")) {
            sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
          } else {
            Nil
          }
    
        // Start executors with a few necessary configs for registering with the scheduler
        val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
        val javaOpts = sparkJavaOpts ++ extraJavaOpts
        val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
          args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
        val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
    
        //ApplicationDescription非常重要,它代表了当前的这个
        //application的一切情况
        //包括application最大需要多少CPU core,每个slave上需要多少内存
    
        val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
          appUIAddress, sc.eventLogDir, sc.eventLogCodec)
    
        //创建APPClient
        //APPClient是一个接口,它负责为application与Spark集群进行通信。  
        //它会接收一个Spark Master的URL,以及一个application,和 
        //一个集群事件的监听器,以及各种事件发生时监听器的回调函数
    
        client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
        client.start()
    
        waitForRegistration()
      }

    2、DAGScheduler创建

    ###SparkContext.scala
    
     @volatile private[spark] var dagScheduler: DAGScheduler = _
      try {
      //DAGScheduler类实现了面向stage的调度机制的高层次的调度层,他会为每个job计算一个stage的DAG(有向无环图),
    //追踪RDD和stage的输出是否被物化了(物化就是说,写入了磁盘或者内存等地方),并且寻找一个最少
    //消耗(最优、最小)调度机制来运行job,它会将stage作为tasksets提交到底层的TaskSchedulerImple上,
    //来在集群上运行它们(task)
    //除了处理stage的DAG,它还负责决定运行每个task的最佳位置,基于当前的缓存状态,将这些最佳位置提交底层的
    //TaskSchedulerImpl。此外,它会处理理由于shuffle输出文件丢失导致的失败,在这种情况下,旧的stage可能就会
    //被重新提交,一个stage内部的失败,如果不是由于shuffle文件丢失所导致的,会被TAskSchedule处理,它会多次重试
    //每一个task,直到最后,实在是不行了,才会去取消整个stage
        dagScheduler = new DAGScheduler(this)
      } catch {
        case e: Exception => {
          try {
            stop()
          } finally {
            throw new SparkException("Error while constructing DAGScheduler", e)
          }
        }
      }

    3、SparkUI的创建

    ###SparkContext.scala
    
    // Initialize the Spark UI
      private[spark] val ui: Option[SparkUI] =
        if (conf.getBoolean("spark.ui.enabled", true)) {
          Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener,
            env.securityManager,appName))
        } else {
          // For tests, do not enable the UI
          None
        }
    
    
    
    
    ###SparkUI.scall
    
    //默认端口
    val DEFAULT_PORT = 4040
    def createLiveUI(
          sc: SparkContext,
          conf: SparkConf,
          listenerBus: SparkListenerBus,
          jobProgressListener: JobProgressListener,
          securityManager: SecurityManager,
          appName: String): SparkUI =  {
        create(Some(sc), conf, listenerBus, securityManager, appName,
          jobProgressListener = Some(jobProgressListener))
      }
  • 相关阅读:
    Python该怎么学?
    Python招聘需求
    最短路合集
    最小生成树prim算法
    最小生成树kruskal算法
    React-redux原理探索
    Redux原理探索
    头条前端面试题汇总,会持续更新
    阿里前面面试题(最全),持续更新中
    ASP.Net MVC3/4中Model验证错误信息的本地化
  • 原文地址:https://www.cnblogs.com/weiyiming007/p/11201183.html
Copyright © 2011-2022 走看看