zoukankan      html  css  js  c++  java
  • Spark On YARN启动流程源码分析(一)

    本文主要参考:

    a. https://www.cnblogs.com/yy3b2007com/p/10934090.html

    0. 说明

    a. 关于spark源码会不定期的更新与补充

    b. 对于spark源码的历史博文,也会不定期修改、增加、优化

    c. spark源码对应的spark版本为2.4.1

    1. 引导

    该篇主要讲解执行spark-submit.sh脚本时将任务提交给Yarn阶段代码分析。其中spark的代码版本为2.4.1.

    (1) spark-submit的入口函数

    一般提交一个spark作业的方式采用spark-submit来提交

    # Run on a Spark standalone cluster
    ./bin/spark-submit 
      --class org.apache.spark.examples.SparkPi 
      --master spark://207.184.161.138:7077 
      --executor-memory 20G 
      --total-executor-cores 100 
      /path/to/examples.jar 
      1000
    View Code

    这个是提交到standalone集群的方式,其中脚本spark-submit从spark2.4的安装bin目录下找到

    #!/usr/bin/env bash
    
    if [ -z "${SPARK_HOME}" ]; then
      source "$(dirname "$0")"/find-spark-home
    fi
    
    # disable randomized hash for string in Python 3.3+
    export PYTHONHASHSEED=0
    
    exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
    View Code

    从如上脚本内容上来看,可以发现:

    a. spark-submit提交任务时,实际上最终是调用了SparkSubmit类。

    b. 调用bin目录下的spark-class脚本,实际上执行的是java进程命令。

    从SparkSubmit的伴生类上可以看到入口main函数:

    object SparkSubmit extends CommandLineUtils with Logging {
    
      // Cluster managers -------- Spark集群管理的模式
      private val YARN = 1
      private val STANDALONE = 2
      private val MESOS = 4
      private val LOCAL = 8
      private val KUBERNETES = 16
      private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES
    
      // Deploy modes  ---------- 部署模式
      private val CLIENT = 1
      private val CLUSTER = 2
      private val ALL_DEPLOY_MODES = CLIENT | CLUSTER
    
      // Special primary resource names that represent shells rather than application jars.
      private val SPARK_SHELL = "spark-shell"
      private val PYSPARK_SHELL = "pyspark-shell"
      private val SPARKR_SHELL = "sparkr-shell"
      private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip"
      private val R_PACKAGE_ARCHIVE = "rpkg.zip"
    
      private val CLASS_NOT_FOUND_EXIT_STATUS = 101
    
      // Following constants are visible for testing.
      private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
        "org.apache.spark.deploy.yarn.YarnClusterApplication"  // Yarn集群的提交类
      private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName() // 集群模式提交的其他类名
      private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName() // 独立部署模式提交类
      private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS =
        "org.apache.spark.deploy.k8s.submit.KubernetesClientApplication"  // K8s集群提交的类
    
      override def main(args: Array[String]): Unit = {
        // 构建SparkSubmit实例
        val submit = new SparkSubmit() {
          self =>
    
          // 重写SparkSubmit的解析参数方法
          override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
            // 构建SparkSubmitArguments对象
            new SparkSubmitArguments(args) {
          // 重写logInfo和logWarning,调用该类中如下定义的2个方法
              override protected def logInfo(msg: => String): Unit = self.logInfo(msg)
    
              override protected def logWarning(msg: => String): Unit = self.logWarning(msg)
            }
          }
    
          override protected def logInfo(msg: => String): Unit = printMessage(msg)
    
          override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")
    
          override def doSubmit(args: Array[String]): Unit = {
            try {
              super.doSubmit(args)
            } catch {
              case e: SparkUserAppException =>
                exitFn(e.exitCode)
            }
          }
    
        }
    
        // 执行提交代码
        submit.doSubmit(args)
      }
    
      /**
       * Return whether the given primary resource represents a user jar.
       */
      private[deploy] def isUserJar(res: String): Boolean = {
        !isShell(res) && !isPython(res) && !isInternal(res) && !isR(res)
      }
    
      /**
       * Return whether the given primary resource represents a shell.
       */
      private[deploy] def isShell(res: String): Boolean = {
        (res == SPARK_SHELL || res == PYSPARK_SHELL || res == SPARKR_SHELL)
      }
    
      /**
       * Return whether the given main class represents a sql shell.
       */
      private[deploy] def isSqlShell(mainClass: String): Boolean = {
        mainClass == "org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
      }
    
      /**
       * Return whether the given main class represents a thrift server.
       */
      private def isThriftServer(mainClass: String): Boolean = {
        mainClass == "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"
      }
    
      /**
       * Return whether the given primary resource requires running python.
       */
      private[deploy] def isPython(res: String): Boolean = {
        res != null && res.endsWith(".py") || res == PYSPARK_SHELL
      }
    
      /**
       * Return whether the given primary resource requires running R.
       */
      private[deploy] def isR(res: String): Boolean = {
        res != null && res.endsWith(".R") || res == SPARKR_SHELL
      }
    
      private[deploy] def isInternal(res: String): Boolean = {
        res == SparkLauncher.NO_RESOURCE
      }
    
    }
    View Code

    在SparkSubmit类中doSubmit函数实现十分简单:

      def doSubmit(args: Array[String]): Unit = {
        // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
        // be reset before the application starts.
        val uninitLog = initializeLogIfNecessary(true, silent = true)
    
        // 解析参数信息
        val appArgs = parseArguments(args)
        if (appArgs.verbose) {
          logInfo(appArgs.toString)
        }
        appArgs.action match {
          case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) // 应用提交
          case SparkSubmitAction.KILL => kill(appArgs) // 应用删除(只适用于standalone和memos集群)
          case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) // 查询应用状态(只适用于standalone和memos集群)
          case SparkSubmitAction.PRINT_VERSION => printVersion()  // 打印应用版本信息
        }
      }
    View Code

    不难明白这是一个主控函数,根据接受的action类型,调用对应的处理:

    a. case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)---提交spark任务

    b.case SparkSubmitAction.KILL => kill(appArgs)---杀掉spark任务

    c. case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)---获取任务状态

    d. case SparkSubmitAction.PRINT_VERSION => printVersion()---打印版本信息

    我们想明白spark任务提交的具体实现类,需要进入submit函数查看具体的业务:

       /**
       * 使用提供的参数信息来提交application
       * Submit the application using the provided parameters.
       *
       * 运行包含两步:
       * 第一步,我们通过设置适当的类路径,系统属性和应用程序参数来准备启动环境,以便基于集群管理和部署模式运行子主类。
       * 第二步,我们使用这个启动环境来调用子主类的主方法。
       * This runs in two steps. First, we prepare the launch environment by setting up
       * the appropriate classpath, system properties, and application arguments for
       * running the child main class based on the cluster manager and the deploy mode.
       * Second, we use this launch environment to invoke the main method of the child
       * main class.
       */
      @tailrec
      private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
        // 通过设置适当的类路径,系统属性和应用程序参数来准备启动环境,以便基于集群管理和部署模式运行子主类。
        val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
    
        def doRunMain(): Unit = {
          if (args.proxyUser != null) {
            val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
              UserGroupInformation.getCurrentUser())
            try {
              proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
                override def run(): Unit = {
                  runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
                }
              })
            } catch {
              case e: Exception =>
                // Hadoop's AuthorizationException suppresses the exception's stack trace, which
                // makes the message printed to the output by the JVM not very helpful. Instead,
                // detect exceptions with empty stack traces here, and treat them differently.
                if (e.getStackTrace().length == 0) {
                  error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
                } else {
                  throw e
                }
            }
          } else {
            runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
          }
        }
    
        // Let the main class re-initialize the logging system once it starts.
        if (uninitLog) {
          Logging.uninitialize()
        }
    
        //在独立集群模式下,有两个提交网关:
        //(1)使用o.a.s.deploy.Client作为包装器的传统RPC网关
        //(2)Spark 1.3中引入了新的基于REST的网关
        //后者是Spark 1.3的默认行为,但如果主端点不是REST服务器,则Spark Submit将故障转移到使用旧网关。
        // In standalone cluster mode, there are two submission gateways:
        //   (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
        //   (2) The new REST-based gateway introduced in Spark 1.3
        // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
        // to use the legacy gateway if the master endpoint turns out to be not a REST server.
        if (args.isStandaloneCluster && args.useRest) {
          try {
            logInfo("Running Spark using the REST application submission protocol.")
            doRunMain()
          } catch {
            // Fail over to use the legacy submission gateway
            case e: SubmitRestConnectionException =>
              logWarning(s"Master endpoint ${args.master} was not a REST server. " +
                "Falling back to legacy submission gateway instead.")
              args.useRest = false
              submit(args, false)
          }
        // In all other modes, just run the main class as prepared
        // 其他模式,只需直接运行主类
        } else {
          doRunMain()
        }
      }
    View Code

    其中:

    prepareSubmitEnvironment方法通过设置适当的类路径,系统属性和应用程序参数来准备启动环境,以便基于集群管理和部署模式运行子主类

    submit(…)函数最后一行会调用该函数内部自定义函数doRunMain(),该函数会根据应用程序参数(args.proxyUser)做一次判断处理:

    a. 如果是代理用户,则使用proxyUser 对runMain()函数包装调用;

    b.  如果非代理用户,则直接调用runMain()函数。

    (2) 任务运行环境准备

    通过设置适当的类路径,系统属性和应用程序参数来准备启动环境,以便基于集群管理和部署模式运行子主类。

    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
    /**
       * 未提交的应用程序准备环境
       * Prepare the environment for submitting an application.
       *
       * @param args the parsed SparkSubmitArguments used for environment preparation.
       * @param conf the Hadoop Configuration, this argument will only be set in unit test.
       * 返回一个4元组(childArgs, childClasspath, sparkConf, childMainClass)
       *        childArgs:子进程的参数
       *        childClasspath:子级的类路径条目列表
       *        sparkConf:系统参数map集合
       *        childMainClass:子级的主类
       * @return a 4-tuple:
       *        (1) the arguments for the child process,
       *        (2) a list of classpath entries for the child,
       *        (3) a map of system properties, and
       *        (4) the main class for the child
       *
       * Exposed for testing.
       */
      private[deploy] def prepareSubmitEnvironment(
          args: SparkSubmitArguments,
          conf: Option[HadoopConfiguration] = None)
          : (Seq[String], Seq[String], SparkConf, String) = {
        // Return values
        val childArgs = new ArrayBuffer[String]()
        val childClasspath = new ArrayBuffer[String]()
        val sparkConf = new SparkConf()
        var childMainClass = ""
    
        // 设置集群管理器,
        // 从这个列表中可以得到信息:spark目前支持的集群管理器包含:YARN,STANDLONE,MESOS,KUBERNETES,LOCAL,
        // 在spark-submit参数的--master中指定。
        // Set the cluster manager
        val clusterManager: Int = args.master match {
          case "yarn" => YARN
          case "yarn-client" | "yarn-cluster" =>
            // spark2.0之前可以使用yarn-cleint,yarn-cluster作为--master参数,从spark2.0起,不再支持,这里默认自动转化为yarn,并给出警告信息。
            logWarning(s"Master ${args.master} is deprecated since 2.0." +
              " Please use master "yarn" with specified deploy mode instead.")
            YARN
          case m if m.startsWith("spark") => STANDALONE
          case m if m.startsWith("mesos") => MESOS
          case m if m.startsWith("k8s") => KUBERNETES
          case m if m.startsWith("local") => LOCAL
          case _ =>
            error("Master must either be yarn or start with spark, mesos, k8s, or local")
            -1
        }
    
        // 设置部署模式--deploy-mode,默认为client模式。
        // Set the deploy mode; default is client mode
        var deployMode: Int = args.deployMode match {
          case "client" | null => CLIENT
          case "cluster" => CLUSTER
          case _ =>
            error("Deploy mode must be either client or cluster")
            -1
        }
    
        // 由于”yarn-cluster“和”yarn-client“方式已被弃用,因此封装了--master和--deploy-mode。
        // 如果只指定了一个--master和--deploy-mode,我们有一些逻辑来推断它们之间的关系;如果它们不一致,我们可以提前退出。
        // Because the deprecated way of specifying "yarn-cluster" and "yarn-client" encapsulate both
        // the master and deploy mode, we have some logic to infer the master and deploy mode
        // from each other if only one is specified, or exit early if they are at odds.
        if (clusterManager == YARN) {
          (args.master, args.deployMode) match {
            case ("yarn-cluster", null) =>
              deployMode = CLUSTER
              args.master = "yarn"
            case ("yarn-cluster", "client") =>
              error("Client deploy mode is not compatible with master "yarn-cluster"")
            case ("yarn-client", "cluster") =>
              error("Cluster deploy mode is not compatible with master "yarn-client"")
            case (_, mode) =>
              args.master = "yarn"
          }
    
          // 如果我们想去使用YARN的话,必须确保它包含在我们构建中。
          // Make sure YARN is included in our build if we're trying to use it
          if (!Utils.classIsLoadable(YARN_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
            error(
              "Could not load YARN classes. " +
              "This copy of Spark may not have been compiled with YARN support.")
          }
        }
    
        if (clusterManager == KUBERNETES) {
          args.master = Utils.checkAndGetK8sMasterUrl(args.master)
          // Make sure KUBERNETES is included in our build if we're trying to use it
          if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
            error(
              "Could not load KUBERNETES classes. " +
                "This copy of Spark may not have been compiled with KUBERNETES support.")
          }
        }
    
        // 下边的一些模式是不支持,尽早让它们失败。
        // Fail fast, the following modes are not supported or applicable
        (clusterManager, deployMode) match {
          case (STANDALONE, CLUSTER) if args.isPython =>
            error("Cluster deploy mode is currently not supported for python " +
              "applications on standalone clusters.")
          case (STANDALONE, CLUSTER) if args.isR =>
            error("Cluster deploy mode is currently not supported for R " +
              "applications on standalone clusters.")
          case (LOCAL, CLUSTER) =>
            error("Cluster deploy mode is not compatible with master "local"")
          case (_, CLUSTER) if isShell(args.primaryResource) =>
            error("Cluster deploy mode is not applicable to Spark shells.")
          case (_, CLUSTER) if isSqlShell(args.mainClass) =>
            error("Cluster deploy mode is not applicable to Spark SQL shell.")
          case (_, CLUSTER) if isThriftServer(args.mainClass) =>
            error("Cluster deploy mode is not applicable to Spark Thrift server.")
          case _ =>
        }
    
        // 如果args.deployMode为null的话,给它赋值更新。稍后它将作为Spark的属性向下传递
        // Update args.deployMode if it is null. It will be passed down as a Spark property later.
        (args.deployMode, deployMode) match {
          case (null, CLIENT) => args.deployMode = "client"
          case (null, CLUSTER) => args.deployMode = "cluster"
          case _ =>
        }
    
        // 根据资源管理器和部署模式,进行逻辑判断出几种特殊运行方式。
        val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
        val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
        val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
        val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
        val isMesosClient = clusterManager == MESOS && deployMode == CLIENT
    
        if (!isMesosCluster && !isStandAloneCluster) {
          // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
          // too for packages that include Python code
          val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(
            args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath,
            args.ivySettingsPath)
    
          if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
            args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
            if (args.isPython || isInternal(args.primaryResource)) {
              args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
            }
          }
    
          // install any R packages that may have been passed through --jars or --packages.
          // Spark Packages may contain R source code inside the jar.
          if (args.isR && !StringUtils.isBlank(args.jars)) {
            RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
          }
        }
    
        args.sparkProperties.foreach { case (k, v) => sparkConf.set(k, v) }
        val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
        val targetDir = Utils.createTempDir()
    
        // assure a keytab is available from any place in a JVM
        if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient) {
          if (args.principal != null) {
            if (args.keytab != null) {
              require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
              // Add keytab and principal configurations in sysProps to make them available
              // for later use; e.g. in spark sql, the isolated class loader used to talk
              // to HiveMetastore will use these settings. They will be set as Java system
              // properties and then loaded by SparkConf
              sparkConf.set(KEYTAB, args.keytab)
              sparkConf.set(PRINCIPAL, args.principal)
              UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
            }
          }
        }
    
        // Resolve glob path for different resources.
        args.jars = Option(args.jars).map(resolveGlobPaths(_, hadoopConf)).orNull
        args.files = Option(args.files).map(resolveGlobPaths(_, hadoopConf)).orNull
        args.pyFiles = Option(args.pyFiles).map(resolveGlobPaths(_, hadoopConf)).orNull
        args.archives = Option(args.archives).map(resolveGlobPaths(_, hadoopConf)).orNull
    
        lazy val secMgr = new SecurityManager(sparkConf)
    
        // In client mode, download remote files.
        var localPrimaryResource: String = null
        var localJars: String = null
        var localPyFiles: String = null
        if (deployMode == CLIENT) {
          localPrimaryResource = Option(args.primaryResource).map {
            downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr)
          }.orNull
          localJars = Option(args.jars).map {
            downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
          }.orNull
          localPyFiles = Option(args.pyFiles).map {
            downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
          }.orNull
        }
    
        // When running in YARN, for some remote resources with scheme:
        //   1. Hadoop FileSystem doesn't support them.
        //   2. We explicitly bypass Hadoop FileSystem with "spark.yarn.dist.forceDownloadSchemes".
        // We will download them to local disk prior to add to YARN's distributed cache.
        // For yarn client mode, since we already download them with above code, so we only need to
        // figure out the local path and replace the remote one.
        if (clusterManager == YARN) {
          val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES)
    
          def shouldDownload(scheme: String): Boolean = {
            forceDownloadSchemes.contains("*") || forceDownloadSchemes.contains(scheme) ||
              Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isFailure
          }
    
          def downloadResource(resource: String): String = {
            val uri = Utils.resolveURI(resource)
            uri.getScheme match {
              case "local" | "file" => resource
              case e if shouldDownload(e) =>
                val file = new File(targetDir, new Path(uri).getName)
                if (file.exists()) {
                  file.toURI.toString
                } else {
                  downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr)
                }
              case _ => uri.toString
            }
          }
    
          args.primaryResource = Option(args.primaryResource).map { downloadResource }.orNull
          args.files = Option(args.files).map { files =>
            Utils.stringToSeq(files).map(downloadResource).mkString(",")
          }.orNull
          args.pyFiles = Option(args.pyFiles).map { pyFiles =>
            Utils.stringToSeq(pyFiles).map(downloadResource).mkString(",")
          }.orNull
          args.jars = Option(args.jars).map { jars =>
            Utils.stringToSeq(jars).map(downloadResource).mkString(",")
          }.orNull
          args.archives = Option(args.archives).map { archives =>
            Utils.stringToSeq(archives).map(downloadResource).mkString(",")
          }.orNull
        }
    
        // If we're running a python app, set the main class to our specific python runner
        if (args.isPython && deployMode == CLIENT) {
          if (args.primaryResource == PYSPARK_SHELL) {
            args.mainClass = "org.apache.spark.api.python.PythonGatewayServer"
          } else {
            // If a python file is provided, add it to the child arguments and list of files to deploy.
            // Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
            args.mainClass = "org.apache.spark.deploy.PythonRunner"
            args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs
          }
          if (clusterManager != YARN) {
            // The YARN backend handles python files differently, so don't merge the lists.
            args.files = mergeFileLists(args.files, args.pyFiles)
          }
        }
    
        if (localPyFiles != null) {
          sparkConf.set("spark.submit.pyFiles", localPyFiles)
        }
    
        // In YARN mode for an R app, add the SparkR package archive and the R package
        // archive containing all of the built R libraries to archives so that they can
        // be distributed with the job
        if (args.isR && clusterManager == YARN) {
          val sparkRPackagePath = RUtils.localSparkRPackagePath
          if (sparkRPackagePath.isEmpty) {
            error("SPARK_HOME does not exist for R application in YARN mode.")
          }
          val sparkRPackageFile = new File(sparkRPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
          if (!sparkRPackageFile.exists()) {
            error(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
          }
          val sparkRPackageURI = Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString
    
          // Distribute the SparkR package.
          // Assigns a symbol link name "sparkr" to the shipped package.
          args.archives = mergeFileLists(args.archives, sparkRPackageURI + "#sparkr")
    
          // Distribute the R package archive containing all the built R packages.
          if (!RUtils.rPackages.isEmpty) {
            val rPackageFile =
              RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), R_PACKAGE_ARCHIVE)
            if (!rPackageFile.exists()) {
              error("Failed to zip all the built R packages.")
            }
    
            val rPackageURI = Utils.resolveURI(rPackageFile.getAbsolutePath).toString
            // Assigns a symbol link name "rpkg" to the shipped package.
            args.archives = mergeFileLists(args.archives, rPackageURI + "#rpkg")
          }
        }
    
        // TODO: Support distributing R packages with standalone cluster
        if (args.isR && clusterManager == STANDALONE && !RUtils.rPackages.isEmpty) {
          error("Distributing R packages with standalone cluster is not supported.")
        }
    
        // TODO: Support distributing R packages with mesos cluster
        if (args.isR && clusterManager == MESOS && !RUtils.rPackages.isEmpty) {
          error("Distributing R packages with mesos cluster is not supported.")
        }
    
        // If we're running an R app, set the main class to our specific R runner
        if (args.isR && deployMode == CLIENT) {
          if (args.primaryResource == SPARKR_SHELL) {
            args.mainClass = "org.apache.spark.api.r.RBackend"
          } else {
            // If an R file is provided, add it to the child arguments and list of files to deploy.
            // Usage: RRunner <main R file> [app arguments]
            args.mainClass = "org.apache.spark.deploy.RRunner"
            args.childArgs = ArrayBuffer(localPrimaryResource) ++ args.childArgs
            args.files = mergeFileLists(args.files, args.primaryResource)
          }
        }
    
        if (isYarnCluster && args.isR) {
          // In yarn-cluster mode for an R app, add primary resource to files
          // that can be distributed with the job
          args.files = mergeFileLists(args.files, args.primaryResource)
        }
    
        // Special flag to avoid deprecation warnings at the client
        sys.props("SPARK_SUBMIT") = "true"
    
        // A list of rules to map each argument to system properties or command-line options in
        // each deploy mode; we iterate through these below
        val options = List[OptionAssigner](
    
          // All cluster managers
          OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.master"),
          OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
            confKey = "spark.submit.deployMode"),
          OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"),
          OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.jars.ivy"),
          OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
            confKey = "spark.driver.memory"),
          OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
            confKey = "spark.driver.extraClassPath"),
          OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
            confKey = "spark.driver.extraJavaOptions"),
          OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
            confKey = "spark.driver.extraLibraryPath"),
    
          // Propagate attributes for dependency resolution at the driver side
          OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.packages"),
          OptionAssigner(args.repositories, STANDALONE | MESOS, CLUSTER,
            confKey = "spark.jars.repositories"),
          OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.ivy"),
          OptionAssigner(args.packagesExclusions, STANDALONE | MESOS,
            CLUSTER, confKey = "spark.jars.excludes"),
    
          // Yarn only
          OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.queue"),
          OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
            confKey = "spark.executor.instances"),
          OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.pyFiles"),
          OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.jars"),
          OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.files"),
          OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.archives"),
          OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.principal"),
          OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"),
    
          // Other options
          OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
            confKey = "spark.executor.cores"),
          OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES,
            confKey = "spark.executor.memory"),
          OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
            confKey = "spark.cores.max"),
          OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
            confKey = "spark.files"),
          OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"),
          OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
            confKey = "spark.jars"),
          OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
            confKey = "spark.driver.memory"),
          OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
            confKey = "spark.driver.cores"),
          OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
            confKey = "spark.driver.supervise"),
          OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, confKey = "spark.jars.ivy"),
    
          // An internal option used only for spark-shell to add user jars to repl's classloader,
          // previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to
          // remote jars, so adding a new option to only specify local jars for spark-shell internally.
          OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.repl.local.jars")
        )
    
        // In client mode, launch the application main class directly
        // In addition, add the main application jar and any added jars (if any) to the classpath
        if (deployMode == CLIENT) {
          childMainClass = args.mainClass
          if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
            childClasspath += localPrimaryResource
          }
          if (localJars != null) { childClasspath ++= localJars.split(",") }
        }
        // Add the main application jar and any added jars to classpath in case YARN client
        // requires these jars.
        // This assumes both primaryResource and user jars are local jars, or already downloaded
        // to local by configuring "spark.yarn.dist.forceDownloadSchemes", otherwise it will not be
        // added to the classpath of YARN client.
        if (isYarnCluster) {
          if (isUserJar(args.primaryResource)) {
            childClasspath += args.primaryResource
          }
          if (args.jars != null) { childClasspath ++= args.jars.split(",") }
        }
    
        if (deployMode == CLIENT) {
          if (args.childArgs != null) { childArgs ++= args.childArgs }
        }
    
        // Map all arguments to command-line options or system properties for our chosen mode
        for (opt <- options) {
          if (opt.value != null &&
              (deployMode & opt.deployMode) != 0 &&
              (clusterManager & opt.clusterManager) != 0) {
            if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) }
            if (opt.confKey != null) { sparkConf.set(opt.confKey, opt.value) }
          }
        }
    
        // In case of shells, spark.ui.showConsoleProgress can be true by default or by user.
        if (isShell(args.primaryResource) && !sparkConf.contains(UI_SHOW_CONSOLE_PROGRESS)) {
          sparkConf.set(UI_SHOW_CONSOLE_PROGRESS, true)
        }
    
        // Add the application jar automatically so the user doesn't have to call sc.addJar
        // For YARN cluster mode, the jar is already distributed on each node as "app.jar"
        // For python and R files, the primary resource is already distributed as a regular file
        if (!isYarnCluster && !args.isPython && !args.isR) {
          var jars = sparkConf.getOption("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
          if (isUserJar(args.primaryResource)) {
            jars = jars ++ Seq(args.primaryResource)
          }
          sparkConf.set("spark.jars", jars.mkString(","))
        }
    
        // In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).
        // All Spark parameters are expected to be passed to the client through system properties.
        if (args.isStandaloneCluster) {
          if (args.useRest) {
            childMainClass = REST_CLUSTER_SUBMIT_CLASS
            childArgs += (args.primaryResource, args.mainClass)
          } else {
            // In legacy standalone cluster mode, use Client as a wrapper around the user class
            childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
            if (args.supervise) { childArgs += "--supervise" }
            Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
            Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
            childArgs += "launch"
            childArgs += (args.master, args.primaryResource, args.mainClass)
          }
          if (args.childArgs != null) {
            childArgs ++= args.childArgs
          }
        }
    
        // Let YARN know it's a pyspark app, so it distributes needed libraries.
        if (clusterManager == YARN) {
          if (args.isPython) {
            sparkConf.set("spark.yarn.isPython", "true")
          }
        }
    
        if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) {
          setRMPrincipal(sparkConf)
        }
    
        // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
        if (isYarnCluster) {
          childMainClass = YARN_CLUSTER_SUBMIT_CLASS
          if (args.isPython) {
            childArgs += ("--primary-py-file", args.primaryResource)
            childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
          } else if (args.isR) {
            val mainFile = new Path(args.primaryResource).getName
            childArgs += ("--primary-r-file", mainFile)
            childArgs += ("--class", "org.apache.spark.deploy.RRunner")
          } else {
            if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
              childArgs += ("--jar", args.primaryResource)
            }
            childArgs += ("--class", args.mainClass)
          }
          if (args.childArgs != null) {
            args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
          }
        }
    
        if (isMesosCluster) {
          assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API")
          childMainClass = REST_CLUSTER_SUBMIT_CLASS
          if (args.isPython) {
            // Second argument is main class
            childArgs += (args.primaryResource, "")
            if (args.pyFiles != null) {
              sparkConf.set("spark.submit.pyFiles", args.pyFiles)
            }
          } else if (args.isR) {
            // Second argument is main class
            childArgs += (args.primaryResource, "")
          } else {
            childArgs += (args.primaryResource, args.mainClass)
          }
          if (args.childArgs != null) {
            childArgs ++= args.childArgs
          }
        }
    
        if (isKubernetesCluster) {
          childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS
          if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
            if (args.isPython) {
              childArgs ++= Array("--primary-py-file", args.primaryResource)
              childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
              if (args.pyFiles != null) {
                childArgs ++= Array("--other-py-files", args.pyFiles)
              }
            } else if (args.isR) {
              childArgs ++= Array("--primary-r-file", args.primaryResource)
              childArgs ++= Array("--main-class", "org.apache.spark.deploy.RRunner")
            }
            else {
              childArgs ++= Array("--primary-java-resource", args.primaryResource)
              childArgs ++= Array("--main-class", args.mainClass)
            }
          } else {
            childArgs ++= Array("--main-class", args.mainClass)
          }
          if (args.childArgs != null) {
            args.childArgs.foreach { arg =>
              childArgs += ("--arg", arg)
            }
          }
        }
    
        // Load any properties specified through --conf and the default properties file
        for ((k, v) <- args.sparkProperties) {
          sparkConf.setIfMissing(k, v)
        }
    
        // Ignore invalid spark.driver.host in cluster modes.
        if (deployMode == CLUSTER) {
          sparkConf.remove("spark.driver.host")
        }
    
        // Resolve paths in certain spark properties
        val pathConfigs = Seq(
          "spark.jars",
          "spark.files",
          "spark.yarn.dist.files",
          "spark.yarn.dist.archives",
          "spark.yarn.dist.jars")
        pathConfigs.foreach { config =>
          // Replace old URIs with resolved URIs, if they exist
          sparkConf.getOption(config).foreach { oldValue =>
            sparkConf.set(config, Utils.resolveURIs(oldValue))
          }
        }
    
        // Resolve and format python file paths properly before adding them to the PYTHONPATH.
        // The resolving part is redundant in the case of --py-files, but necessary if the user
        // explicitly sets `spark.submit.pyFiles` in his/her default properties file.
        sparkConf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
          val resolvedPyFiles = Utils.resolveURIs(pyFiles)
          val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) {
            PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
          } else {
            // Ignoring formatting python path in yarn and mesos cluster mode, these two modes
            // support dealing with remote python files, they could distribute and add python files
            // locally.
            resolvedPyFiles
          }
          sparkConf.set("spark.submit.pyFiles", formattedPyFiles)
        }
    
        (childArgs, childClasspath, sparkConf, childMainClass)
      }
    View Code

    其中:

    a. 当部署模式为client,则子进程的主类为用户通过spark-submit提交的类,即代码中的:childMainClass = args.mainClass

    b. 当master为Yarn且部署模式为cluster时,子进程的主类为:org.apache.spark.deploy.yarn.YarnClusterApplication

    1) 准备Yarn(Cluster Manager)的执行类:

    使用spark-submit启动时,实际上执行的是exec "SPARKHOME"/bin/sparkclass  org.apache.spark.deploy.SparkSubmit   "@"

    在SparkSubmit中,prepareSubmitEnvironment方法中会为spark提交做准备,准备好运行环境相关。

    private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments,conf: Option[HadoopConfiguration] = None): (Seq[String], Seq[String], SparkConf, String)

    其中该方法内部代码中,发现当cluster manager为yarn时:

    a. 当--deploy-mode:cluster时

    会调用YarnClusterApplication进行提交。YarnClusterApplication是org.apache.spark.deploy.yarn.Client中的一个内部类,在YarnClusterApplication中new了一个Client对象,并调用了run方法

    override def start(args: Array[String], conf: SparkConf): Unit = {
        // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
        // so remove them from sparkConf here for yarn mode.
        conf.remove("spark.jars")
        conf.remove("spark.files")
    
        new Client(new ClientArguments(args), conf).run()
      }
    View Code

    b. 当--deploy-mode:client时

    调用application-jar.jar自身main函数,执行的是JavaMainApplication

    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *    http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.apache.spark.deploy
    
    import java.lang.reflect.Modifier
    
    import org.apache.spark.SparkConf
    
    /**
     * Entry point for a Spark application. Implementations must provide a no-argument constructor.
     */
    private[spark] trait SparkApplication {
    
      def start(args: Array[String], conf: SparkConf): Unit
    
    }
    
    /**
     * Implementation of SparkApplication that wraps a standard Java class with a "main" method.
     *
     * Configuration is propagated to the application via system properties, so running multiple
     * of these in the same JVM may lead to undefined behavior due to configuration leaks.
     */
    private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {
    
      override def start(args: Array[String], conf: SparkConf): Unit = {
        val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
        if (!Modifier.isStatic(mainMethod.getModifiers)) {
          throw new IllegalStateException("The main method in the given main class must be static")
        }
    
        val sysProps = conf.getAll.toMap
        sysProps.foreach { case (k, v) =>
          sys.props(k) = v
        }
    
        mainMethod.invoke(null, args)
      }
    
    }
    View Code

    从JavaMainApplication实现可以发现,JavaMainApplication中调用start方法时,只是通过反射执行application-jar.jar的main函数

    (3) 提交到Yarn

    1) yarn-cluster运行流程:

    当yarn-custer模式中,YarnClusterApplication类中运行的是Client中run方法,Client#run()中实现了任务提交流程:

    /**
       * Submit an application to the ResourceManager.
       * If set spark.yarn.submit.waitAppCompletion to true, it will stay alive
       * reporting the application's status until the application has exited for any reason.
       * Otherwise, the client process will exit after submission.
       * If the application finishes with a failed, killed, or undefined status,
       * throw an appropriate SparkException.
       */
      def run(): Unit = {
        this.appId = submitApplication()
        if (!launcherBackend.isConnected() && fireAndForget) {
          val report = getApplicationReport(appId)
          val state = report.getYarnApplicationState
          logInfo(s"Application report for $appId (state: $state)")
          logInfo(formatReportDetails(report))
          if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
            throw new SparkException(s"Application $appId finished with status: $state")
          }
        } else {
          val YarnAppReport(appState, finalState, diags) = monitorApplication(appId)
          if (appState == YarnApplicationState.FAILED || finalState == FinalApplicationStatus.FAILED) {
            diags.foreach { err =>
              logError(s"Application diagnostics message: $err")
            }
            throw new SparkException(s"Application $appId finished with failed status")
          }
          if (appState == YarnApplicationState.KILLED || finalState == FinalApplicationStatus.KILLED) {
            throw new SparkException(s"Application $appId is killed")
          }
          if (finalState == FinalApplicationStatus.UNDEFINED) {
            throw new SparkException(s"The final status of application $appId is undefined")
          }
        }
      }
    View Code

    在Client类的run()方法中会调用submitApplication()方法,该方法实现:

    /**
       * Submit an application running our ApplicationMaster to the ResourceManager.
       *
       * The stable Yarn API provides a convenience method (YarnClient#createApplication) for
       * creating applications and setting up the application submission context. This was not
       * available in the alpha API.
       */
      def submitApplication(): ApplicationId = {
        var appId: ApplicationId = null
        try {
          launcherBackend.connect()
          yarnClient.init(hadoopConf)
          yarnClient.start()
    
          logInfo("Requesting a new application from cluster with %d NodeManagers"
            .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
    
          // Get a new application from our RM
          val newApp = yarnClient.createApplication()
          val newAppResponse = newApp.getNewApplicationResponse()
          appId = newAppResponse.getApplicationId()
    
          new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
            Option(appId.toString)).setCurrentContext()
    
          // Verify whether the cluster has enough resources for our AM
          verifyClusterResources(newAppResponse)
    
          // Set up the appropriate contexts to launch our AM
          val containerContext = createContainerLaunchContext(newAppResponse)
          val appContext = createApplicationSubmissionContext(newApp, containerContext)
    
          // Finally, submit and monitor the application
          logInfo(s"Submitting application $appId to ResourceManager")
          yarnClient.submitApplication(appContext)
          launcherBackend.setAppId(appId.toString)
          reportLauncherState(SparkAppHandle.State.SUBMITTED)
    
          appId
        } catch {
          case e: Throwable =>
            if (appId != null) {
              cleanupStagingDir(appId)
            }
            throw e
        }
      }
    View Code

    run()方法则是实现向yarn中的ResourceManager(后文全部简称RM)提交运行任务,并运行我们的ApplicationMaster(后文简称AM)。

    稳定的Yarn API提供了一种方便的方法(YarnClient#createApplication),用于创建应用程序和设置应用程序提交上下文。

    submitApplication()方法具体操作步骤:

    l  初始化并启动YarnClient,后边将使用yarnClient提供的各种API

    l  通过调用yarnClient#createApplication()方法,从RM获取一个newApp(application),该newApp用于运行AM。通过newApp#getNewApplicationResponse()返回newApp需要资源情况(newAppResponse)。

    l  通过newAppResponse验证集群是否有足够的资源来运行AM。

    l  设置适当的上下文来以启动AM。

    l  调用yarnClient#submitApplication(appContext)向yarn提交任务启动的请求,并监控application。

    yarn-client运行流程:

    • 对于部署方式是Client的情况,SparkSubmit的main函数中通过反射执行应用程序的main方法
    • 在应用程序的main方法中,创建SparkContext实例
    • 在创建SparkContext的实例过程中,通过如下语句创建Scheduler和Backend实例
    复制代码
      private var _schedulerBackend: SchedulerBackend = _
      private var _taskScheduler: TaskScheduler = _
      
      private[spark] def schedulerBackend: SchedulerBackend = _schedulerBackend
    
      private[spark] def taskScheduler: TaskScheduler = _taskScheduler
      private[spark] def taskScheduler_=(ts: TaskScheduler): Unit = {
        _taskScheduler = ts
      }
      
      // 构造函数中初始化赋值
        // Create and start the scheduler
        val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
        _schedulerBackend = sched
    _taskScheduler = ts
    复制代码

    https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala

    SparkContext初始化过程

    在Yarn模式下,SparkContext初始化位置因--deploy-mode不同而不同:

    yarn-cluster模式下:client会先申请向RM(Yarn Resource Manager)一个Container,来启动AM(ApplicationMaster)进程,而SparkContext运行在AM(ApplicationMaster)进程中;

    yarn-client模式下  :在提交节点上执行SparkContext初始化,由client类(JavaMainApplication)调用。

    复制代码
    /**
       * Create a task scheduler based on a given master URL.
       * Return a 2-tuple of the scheduler backend and the task scheduler.
       */
      private def createTaskScheduler(。。。): (SchedulerBackend, TaskScheduler) = {
        。。。
        master match {
          case "local" =>
            。。。
          case LOCAL_N_REGEX(threads) =>
            。。。
          case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
            。。。。
          case SPARK_REGEX(sparkUrl) =>
            。。。。
          case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
            。。。。
          case masterUrl =>
            val cm = getClusterManager(masterUrl) match {
              case Some(clusterMgr) => clusterMgr
              case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
            }
            try {
              val scheduler = cm.createTaskScheduler(sc, masterUrl)
              val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
              cm.initialize(scheduler, backend)
              (backend, scheduler)
            } catch {
              case se: SparkException => throw se
              case NonFatal(e) =>
                throw new SparkException("External scheduler cannot be instantiated", e)
            }
        }
      }
    
      private def getClusterManager(url: String): Option[ExternalClusterManager] = {
        val loader = Utils.getContextOrSparkClassLoader
        val serviceLoaders =
          ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
        if (serviceLoaders.size > 1) {
          throw new SparkException(
            s"Multiple external cluster managers registered for the url $url: $serviceLoaders")
        }
        serviceLoaders.headOption
      }  
    复制代码

    https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/SparkContext.scala

    1)SparkContext#createTaskScheduler(。。。)

    根据不同的资源管理方式cluster manager来创建不同的TaskScheduler,SchedulerBackend。

      1.1)SchedulerBackend与cluster manager资源管理器交互取得应用被分配的资源。

      1.2)TaskSheduler在不同的job之间调度,同时接收被分配的资源,之后由他来给每一个Task分配资源。

    2)SparkContext#createTaskScheduler(。。。)

    最后一个match case是对其他资源管理方式(除了local和standelone{spark://}外的mesos,yarn,kubernetes【外部资源管理器】的资源管理方式)的处理。

    SparkContext#createTaskScheduler(。。。)#master match#case masterUrl下边调用了getClusterManager(masterUrl)方法,该方法返回对象是实现了ExternalClusterManager接口的YarnClusterManager类对象。

    备注:实现了ExternalClusterManager接口的类还包含:

    MesosClusterManager (https://github.com/apache/spark/blob/branch-2.4/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala

    KubernetesClusterManager (https://github.com/apache/spark/blob/branch-2.4/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala

    ExternalClusterManager接口定义:

    复制代码
    private[spark] trait ExternalClusterManager {
      def canCreate(masterURL: String): Boolean
    
      def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
      
      def createSchedulerBackend(sc: SparkContext,
          masterURL: String,
          scheduler: TaskScheduler): SchedulerBackend
          
      def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
    }
    复制代码

    https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala

    ExternalClusterManager接口提供了4个方法:

    -canCreate(masterURL: String):Boolean  检查此群集管理器实例是否可以为某个masterURL创建scheduler组件。

    -createTaskScheduler(sc: SparkContext, masterURL: String):TaskScheduler  为给定的SparkContext创建TaskScheduler实例

    -createSchedulerBackend(sc: SparkContext,masterURL: String,scheduler: TaskScheduler): SchedulerBackend  为给定的SparkContext和调度程序创建SchedulerBackend 。这是在使用“ExternalClusterManager.createTaskScheduler()”创建TaskScheduler后调用的。

    -initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit  初始化TaskScheduler和SchedulerBackend,在创建调度程序组件之后调用。

    YarnClusterManager类定义:

    复制代码
    private[spark] class YarnClusterManager extends ExternalClusterManager {
    
      override def canCreate(masterURL: String): Boolean = {
        masterURL == "yarn"
      }
    
      override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
        sc.deployMode match {
          case "cluster" => new YarnClusterScheduler(sc)
          case "client" => new YarnScheduler(sc)
          case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
        }
      }
    
      override def createSchedulerBackend(sc: SparkContext,
          masterURL: String,
          scheduler: TaskScheduler): SchedulerBackend = {
        sc.deployMode match {
          case "cluster" =>
            new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
          case "client" =>
            new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
          case  _ =>
            throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
        }
      }
    
      override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
        scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
      }
    }
    复制代码

    https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala

    YarnClusterManager#createTaskScheduler(...)

    在该方法中会根据SparkContext对象的deployMode属性来进行分支判断:

    client时,返回YarnScheduler(https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala)实例对象;

    cluster时,返回YarnClusterScheduler(https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala)实例对象。

    YarnClusterManager#createSchedulerBackend(...)

    在该方法中会根据SparkContext对象的deployMode属性来进行分支判断:

    client时,返回YarnClientSchedulerBackend(https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala)实例对象;

    cluster时,返回YarnClusterSchedulerBackend(https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala)实例对象。

    Yarn作业运行运行架构原理解析:

    1、分析Spark on YARN的Cluster模式,从用户提交作业到作业运行结束整个运行期间的过程分析。

    客户端进行操作

    •   1、根据yarnConf来初始化yarnClient,并启动yarnClient
    •   2、创建客户端Application,并获取Application的ID,进一步判断集群中的资源是否满足executor和ApplicationMaster申请的资源,如果不满足则抛出IllegalArgumentException;
    •   3、设置资源、环境变量:其中包括了设置Application的Staging目录、准备本地资源(jar文件、log4j.properties)、设置Application其中的环境变量、创建Container启动的Context等;
    •   4、设置Application提交的Context,包括设置应用的名字、队列、AM的申请的Container、标记该作业的类型为Spark;
    •   5、申请Memory,并最终通过yarnClient.submitApplication向ResourceManager提交该Application。

      当作业提交到YARN上之后,客户端就没事了,甚至在终端关掉那个进程也没事,因为整个作业运行在YARN集群上进行,运行的结果将会保存到HDFS或者日志中。

    提交到YARN集群,YARN操作

    •   1、运行ApplicationMaster的run方法;
    •   2、设置好相关的环境变量。
    •   3、创建amClient,并启动;
    •   4、在Spark UI启动之前设置Spark UI的AmIpFilter;
    •   5、在startUserClass函数专门启动了一个线程(名称为Driver的线程)来启动用户提交的Application,也就是启动了Driver。在Driver中将会初始化SparkContext;
    •   6、等待SparkContext初始化完成,最多等待spark.yarn.applicationMaster.waitTries次数(默认为10),如果等待了的次数超过了配置的,程序将会退出;否则用SparkContext初始化yarnAllocator;

      怎么知道SparkContext初始化完成?
      其实在5步骤中启动Application的过程中会初始化SparkContext,在初始化SparkContext的时候将会创建YarnClusterScheduler,在SparkContext初始化完成的时候,会调用YarnClusterScheduler类中的postStartHook方法,而该方法会通知ApplicationMaster已经初始化好了SparkContext

    •   7、当SparkContext、Driver初始化完成的时候,通过amClient向ResourceManager注册ApplicationMaster
    •   8、分配并启动Executeors。在启动Executeors之前,先要通过yarnAllocator获取到numExecutors个Container,然后在Container中启动Executeors。如果在启动Executeors的过程中失败的次数达到了maxNumExecutorFailures的次数,maxNumExecutorFailures的计算规则如下:
    // Default to numExecutors * 2, with minimum of 3
    private val maxNumExecutorFailures =sparkConf.getInt("spark.yarn.max.executor.failures",
    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors *2,3)))

      那么这个Application将失败,将Application Status标明为FAILED,并将关闭SparkContext。其实,启动Executeors是通过ExecutorRunnable实现的,而ExecutorRunnable内部是启动CoarseGrainedExecutorBackend的。

    •   9、最后,Task将在CoarseGrainedExecutorBackend里面运行,然后运行状况会通过Akka通知CoarseGrainedScheduler,直到作业运行完成。

    2、Spark on YARN client 模式作业运行全过程分析

    我们知道Spark on yarn有两种模式:yarn-cluster和yarn-client。这两种模式作业虽然都是在yarn上面运行,但是其中的运行方式很不一样,今天我就来谈谈Spark on YARN yarn-client模式作业从提交到运行的过程剖析。
      和yarn-cluster模式一样,整个程序也是通过spark-submit脚本提交的。但是yarn-client作业程序的运行不需要通过Client类来封装启动,而是直接通过反射机制调用作业的main函数。下面就来分析:

    •   1、通过SparkSubmit类的launch的函数直接调用作业的main函数(通过反射机制实现),如果是集群模式就会调用Client的main函数。
    •   2、而应用程序的main函数一定都有个SparkContent,并对其进行初始化;
    •   3、在SparkContent初始化中将会依次做如下的事情:设置相关的配置、注册MapOutputTracker、BlockManagerMaster、BlockManager,创建taskScheduler和dagScheduler;其中比较重要的是创建taskScheduler和dagScheduler。在创建taskScheduler的时候会根据我们传进来的master来选择Scheduler和SchedulerBackend。由于我们选择的是yarn-client模式,程序会选择YarnClientClusterScheduler和YarnClientSchedulerBackend,并将YarnClientSchedulerBackend的实例初始化YarnClientClusterScheduler,上面两个实例的获取都是通过反射机制实现的,YarnClientSchedulerBackend类是CoarseGrainedSchedulerBackend类的子类,YarnClientClusterScheduler是TaskSchedulerImpl的子类,仅仅重写了TaskSchedulerImpl中的getRackForHost方法。
    •   4、初始化完taskScheduler后,将创建dagScheduler,然后通过taskScheduler.start()启动taskScheduler,而在taskScheduler启动的过程中也会调用SchedulerBackend的start方法。在SchedulerBackend启动的过程中将会初始化一些参数,封装在ClientArguments中,并将封装好的ClientArguments传进Client类中,并client.runApp()方法获取Application ID。
    •   5、client.runApp里面的做是和前面客户端进行操作那节类似,不同的是在里面启动是ExecutorLauncher(yarn-cluster模式启动的是ApplicationMaster)。
    •   6、在ExecutorLauncher里面会初始化并启动amClient,然后向ApplicationMaster注册该Application。注册完之后将会等待driver的启动,当driver启动完之后,会创建一个MonitorActor对象用于和CoarseGrainedSchedulerBackend进行通信(只有事件AddWebUIFilter他们之间才通信,Task的运行状况不是通过它和CoarseGrainedSchedulerBackend通信的)。然后就是设置addAmIpFilter,当作业完成的时候,ExecutorLauncher将通过amClient设置Application的状态为FinalApplicationStatus.SUCCEEDED。
    •   7、分配Executors,这里面的分配逻辑和yarn-cluster里面类似,就不再说了。
    •   8、最后,Task将在CoarseGrainedExecutorBackend里面运行,然后运行状况会通过Akka通知CoarseGrainedScheduler,直到作业运行完成。
    •   9、在作业运行的时候,YarnClientSchedulerBackend会每隔1秒通过client获取到作业的运行状况,并打印出相应的运行信息,当Application的状态是FINISHED、FAILED和KILLED中的一种,那么程序将退出等待。
    •   10、最后有个线程会再次确认Application的状态,当Application的状态是FINISHED、FAILED和KILLED中的一种,程序就运行完成,并停止SparkContext。整个过程就结束了。

    YARN-Cluster运行架构原理

    在YARN-Cluster模式中,当用户向YARN中提交一个应用程序后,YARN将分两个阶段运行该应用程序:

    • 1.第一个阶段是把Spark的Driver作为一个ApplicationMaster在YARN集群中先启动;
    • 2.第二个阶段是由ApplicationMaster创建应用程序,然后为它向ResourceManager申请资源,并启动Executor来运行Task,同时监控它的整个运行过程,直到运行完成

    说明如下:

    • Spark Yarn Client向YARN中提交应用程序,包括ApplicationMaster程序、启动ApplicationMaster的命令、需要在Executor中运行的程序等;
    • ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,其中ApplicationMaster进行SparkContext等的初始化;
    • ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态,然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控它们的运行状态直到运行结束;
    • 一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向ApplicationMaster中的SparkContext注册并申请Task。这一点和Standalone模式一样,只不过SparkContext在Spark Application中初始化时,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler进行任务的调度,其中YarnClusterScheduler只是对TaskSchedulerImpl的一个简单包装,增加了对Executor的等待逻辑等;
    • ApplicationMaster中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
    • 应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己;

    跟踪CoarseGrainedExecutorBackend启动脚本:

     View Code

    YARN-Client运行架构原理

    说明如下:

    • Spark Yarn Client向YARN的ResourceManager申请启动Application Master。同时在SparkContent初始化中将创建DAGScheduler和TASKScheduler等,由于我们选择的是Yarn-Client模式,程序会选择YarnClientClusterSchedulerYarnScheduler和YarnClientSchedulerBackend;
    • ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,与YARN-Cluster区别的是在该ApplicationMaster不运行SparkContext,只与SparkContext进行联系进行资源的分派;
    • Client中的SparkContext初始化完毕后,与ApplicationMaster建立通讯,向ResourceManager注册,根据任务信息向ResourceManager申请资源(Container);
    • 一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向Client中的SparkContext注册并申请Task;
    • client中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向Driver汇报运行的状态和进度,以让Client随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
    • 应用程序运行完成后,Client的SparkContext向ResourceManager申请注销并关闭自己。

    Client模式 vs Cluster模式

      • 理解YARN-Client和YARN-Cluster深层次的区别之前先清楚一个概念:Application Master。在YARN中,每个Application实例都有一个ApplicationMaster进程,它是Application启动的第一个容器。它负责和ResourceManager打交道并请求资源,获取资源之后告诉NodeManager为其启动Container。从深层次的含义讲YARN-Cluster和YARN-Client模式的区别其实就是ApplicationMaster进程的区别;
      • YARN-Cluster模式下,Driver运行在AM(Application Master)中,它负责向YARN申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉Client,作业会继续在YARN上运行,因而YARN-Cluster模式不适合运行交互类型的作业;
      • YARN-Client模式下,Application Master仅仅向YARN请求Executor,Client会和请求的Container通信来调度他们工作,也就是说Client不能离开;
  • 相关阅读:
    POJ 3259 Wormholes【BellmanFord】
    POJ 2960 SNim【SG函数的应用】
    ZOJ 3578 Matrixdp水题
    HDU 2897 邂逅明下【bash博弈】
    BellmanFord 算法及其优化【转】
    【转】几个Java的网络爬虫
    thinkphp 反字符 去标签 自动加点 去换行 截取字符串 冰糖
    php 二维数组转 json文本 (jquery datagrid 数据格式) 冰糖
    PHP 汉字转拼音(首拼音,所有拼音) 冰糖
    设为首页与加入收藏 兼容firefox 冰糖
  • 原文地址:https://www.cnblogs.com/mengrennwpu/p/11754341.html
Copyright © 2011-2022 走看看