zoukankan      html  css  js  c++  java
  • Spark(四十九):Spark On YARN启动流程源码分析(一)

    引导:

    该篇章主要讲解执行spark-submit.sh提交到将任务提交给Yarn阶段代码分析。

    spark-submit的入口函数

    一般提交一个spark作业的方式采用spark-submit来提交

    # Run on a Spark standalone cluster
    ./bin/spark-submit 
      --class org.apache.spark.examples.SparkPi 
      --master spark://207.184.161.138:7077 
      --executor-memory 20G 
      --total-executor-cores 100 
      /path/to/examples.jar 
      1000

    这个是提交到standalone集群的方式,其中spark-submit内容如下:

    https://github.com/apache/spark/blob/branch-2.4/bin/spark-submit

    或者从spark2.4安装目录下找到spark-submit

    [cp011@CDH-103 bin]$ 
    more opt/cloudera/parcels/SPARK2-2.4.0.cloudera1-1.cdh5.13.3.p0.1007356/lib/spark2/bin/spark-submit
    
    #!/usr/bin/env bash
    
    #
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    #
    
    if [ -z "${SPARK_HOME}" ]; then
      source "$(dirname "$0")"/find-spark-home
    fi
    
    # disable randomized hash for string in Python 3.3+
    export PYTHONHASHSEED=0
    
    exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

    从spark-submit内容上来看,可以发现spark-submit提交任务时,实际上最终是调用了SparkSubmit类。

    从SparkSubmit的半生类上可以看到入口main函数:

    object SparkSubmit extends CommandLineUtils with Logging {
      // Cluster managers
      private val YARN = 1
      private val STANDALONE = 2
      private val MESOS = 4
      private val LOCAL = 8
      private val KUBERNETES = 16
      private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES
    
      // Deploy modes
      private val CLIENT = 1
      private val CLUSTER = 2
      private val ALL_DEPLOY_MODES = CLIENT | CLUSTER
    
      // Special primary resource names that represent shells rather than application jars.
      private val SPARK_SHELL = "spark-shell"
      private val PYSPARK_SHELL = "pyspark-shell"
      private val SPARKR_SHELL = "sparkr-shell"
      private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip"
      private val R_PACKAGE_ARCHIVE = "rpkg.zip"
    
      private val CLASS_NOT_FOUND_EXIT_STATUS = 101
    
      // Following constants are visible for testing.
      private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
        "org.apache.spark.deploy.yarn.YarnClusterApplication"
      private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName()
      private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
      private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS =
        "org.apache.spark.deploy.k8s.submit.KubernetesClientApplication"
    
      override def main(args: Array[String]): Unit = {
        val submit = new SparkSubmit() {
          self =>
    
          override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
            new SparkSubmitArguments(args) {
              override protected def logInfo(msg: => String): Unit = self.logInfo(msg)
    
              override protected def logWarning(msg: => String): Unit = self.logWarning(msg)
            }
          }
    
          override protected def logInfo(msg: => String): Unit = printMessage(msg)
    
          override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")
    
          override def doSubmit(args: Array[String]): Unit = {
            try {
              super.doSubmit(args)
            } catch {
              case e: SparkUserAppException =>
                exitFn(e.exitCode)
            }
          }
    
        }
    
        submit.doSubmit(args)
      }
      。。。
    }

    https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

    在SparkSubmit类中doSubmit函数实现十分简单:

      def doSubmit(args: Array[String]): Unit = {
        // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
        // be reset before the application starts.
        val uninitLog = initializeLogIfNecessary(true, silent = true)
    
        val appArgs = parseArguments(args)
        if (appArgs.verbose) {
          logInfo(appArgs.toString)
        }
        appArgs.action match {
          case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
          case SparkSubmitAction.KILL => kill(appArgs)
          case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
          case SparkSubmitAction.PRINT_VERSION => printVersion()
        }
      }

    https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

    不难明白这是一个主控函数,根据接受的action类型,调用对应的处理:

    l  case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)---提交spark任务

    l  case SparkSubmitAction.KILL => kill(appArgs)---杀掉spark任务

    l  case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)---获取任务状态

    l  case SparkSubmitAction.PRINT_VERSION => printVersion()---打印版本信息

    我们想明白spark任务提交的具体实现类,需要进入submit函数查看具体的业务:

    /**
       * 运行包含两步:
       * 第一步,我们通过设置适当的类路径,系统属性和应用程序参数来准备启动环境,以便基于集群管理和部署模式运行子主类。
       * 第二步,我们使用这个启动环境来调用子主类的主方法。
       * Submit the application using the provided parameters.
       * 使用提供的参数信息来提交application
       * This runs in two steps. First, we prepare the launch environment by setting up
       * the appropriate classpath, system properties, and application arguments for
       * running the child main class based on the cluster manager and the deploy mode.
       * Second, we use this launch environment to invoke the main method of the child
       * main class.
       */
      @tailrec
      private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
        // 通过设置适当的类路径,系统属性和应用程序参数来准备启动环境,以便基于集群管理和部署模式运行子主类。
        val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
    
        def doRunMain(): Unit = {
          if (args.proxyUser != null) {
            val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
              UserGroupInformation.getCurrentUser())
            try {
              proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
                override def run(): Unit = {
                  runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
                }
              })
            } catch {
              case e: Exception =>
                // Hadoop's AuthorizationException suppresses the exception's stack trace, which
                // makes the message printed to the output by the JVM not very helpful. Instead,
                // detect exceptions with empty stack traces here, and treat them differently.
                if (e.getStackTrace().length == 0) {
                  error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
                } else {
                  throw e
                }
            }
          } else {
            runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
          }
        }
    
        // Let the main class re-initialize the logging system once it starts.
        if (uninitLog) {
          Logging.uninitialize()
    }
    
        //在独立集群模式下,有两个提交网关:
        //(1)使用o.a.s.deploy.Client作为包装器的传统RPC网关
        //(2)Spark 1.3中引入了新的基于REST的网关
        //后者是Spark 1.3的默认行为,但如果主端点不是REST服务器,则Spark Submit将故障转移到使用旧网关。
        // In standalone cluster mode, there are two submission gateways:
        //   (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
        //   (2) The new REST-based gateway introduced in Spark 1.3
        // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
        // to use the legacy gateway if the master endpoint turns out to be not a REST server.
        if (args.isStandaloneCluster && args.useRest) {
          try {
            logInfo("Running Spark using the REST application submission protocol.")
            doRunMain()
          } catch {
            // Fail over to use the legacy submission gateway
            case e: SubmitRestConnectionException =>
              logWarning(s"Master endpoint ${args.master} was not a REST server. " +
                "Falling back to legacy submission gateway instead.")
              args.useRest = false
              submit(args, false)
          }
        // 其他模式,只需直接运行主类
        // In all other modes, just run the main class as prepared
        } else {
          doRunMain()
        }
      }

    https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

    上边submit(…)函数最后一行会调用该函数内部自定义函数doRunMain(),该函数会根据应用程序参数(args.proxyUser)做一次判断处理:

    1)  如果是代理用户,则使用proxyUser 对runMain()函数包装调用;

    2)  如果非代理用户,则直接调用runMain()函数。

    任务运行环境准备

    通过设置适当的类路径,系统属性和应用程序参数来准备启动环境,以便基于集群管理和部署模式运行子主类。

    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
    /**
       * 未提交的应用程序准备环境
       * Prepare the environment for submitting an application.
       *
       * @param args the parsed SparkSubmitArguments used for environment preparation.
       * @param conf the Hadoop Configuration, this argument will only be set in unit test.
       * @return a 4-tuple:
       *        (1) the arguments for the child process,
       *        (2) a list of classpath entries for the child,
       *        (3) a map of system properties, and
       *        (4) the main class for the child
       *        返回一个4元组(childArgs, childClasspath, sparkConf, childMainClass)
       *        childArgs:子进程的参数
       *        childClasspath:子级的类路径条目列表
       *        sparkConf:系统参数map集合
       *        childMainClass:子级的主类
       *
       * Exposed for testing.
       */
      private[deploy] def prepareSubmitEnvironment(
          args: SparkSubmitArguments,
          conf: Option[HadoopConfiguration] = None)
          : (Seq[String], Seq[String], SparkConf, String) = {
        // Return values
        val childArgs = new ArrayBuffer[String]()
        val childClasspath = new ArrayBuffer[String]()
        val sparkConf = new SparkConf()
        var childMainClass = ""
    
        // 设置集群管理器,
        // 从这个列表中可以得到信息:spark目前支持的集群管理器包含:YARN,STANDLONE,MESOS,KUBERNETES,LOCAL,
        // 在spark-submit参数的--master中指定。
        // Set the cluster manager
        val clusterManager: Int = args.master match {
          case "yarn" => YARN
          case "yarn-client" | "yarn-cluster" => 
          // spark2.0之前可以使用yarn-cleint,yarn-cluster作为--master参数,从spark2.0起,不再支持,这里默认自动转化为yarn,并给出警告信息。
            logWarning(s"Master ${args.master} is deprecated since 2.0." +
              " Please use master "yarn" with specified deploy mode instead.")
            YARN
          case m if m.startsWith("spark") => STANDALONE
          case m if m.startsWith("mesos") => MESOS
          case m if m.startsWith("k8s") => KUBERNETES
          case m if m.startsWith("local") => LOCAL
          case _ =>
            error("Master must either be yarn or start with spark, mesos, k8s, or local")
            -1
        }
    
        // 设置部署模式--deploy-mode,默认为client模式。
        // Set the deploy mode; default is client mode
        var deployMode: Int = args.deployMode match {
          case "client" | null => CLIENT
          case "cluster" => CLUSTER
          case _ =>
            error("Deploy mode must be either client or cluster")
            -1
        }
        
        // 由于”yarn-cluster“和”yarn-client“方式已被弃用,因此封装了--master和--deploy-mode。
        // 如果只指定了一个--master和--deploy-mode,我们有一些逻辑来推断它们之间的关系;如果它们不一致,我们可以提前退出。
        // Because the deprecated way of specifying "yarn-cluster" and "yarn-client" encapsulate both
        // the master and deploy mode, we have some logic to infer the master and deploy mode
        // from each other if only one is specified, or exit early if they are at odds.
        if (clusterManager == YARN) {
          (args.master, args.deployMode) match {
            case ("yarn-cluster", null) =>
              deployMode = CLUSTER
              args.master = "yarn"
            case ("yarn-cluster", "client") =>
              error("Client deploy mode is not compatible with master "yarn-cluster"")
            case ("yarn-client", "cluster") =>
              error("Cluster deploy mode is not compatible with master "yarn-client"")
            case (_, mode) =>
              args.master = "yarn"
          }
    
          // 如果我们想去使用YARN的话,必须确保它包含在我们产品中。
          // Make sure YARN is included in our build if we're trying to use it
          if (!Utils.classIsLoadable(YARN_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
            error(
              "Could not load YARN classes. " +
              "This copy of Spark may not have been compiled with YARN support.")
          }
        }
    
        if (clusterManager == KUBERNETES) {
          args.master = Utils.checkAndGetK8sMasterUrl(args.master)
          // Make sure KUBERNETES is included in our build if we're trying to use it
          if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
            error(
              "Could not load KUBERNETES classes. " +
                "This copy of Spark may not have been compiled with KUBERNETES support.")
          }
        }
    
        // 下边的一些模式是不支持,尽早让它们失败。
        // Fail fast, the following modes are not supported or applicable
        (clusterManager, deployMode) match {
          case (STANDALONE, CLUSTER) if args.isPython =>
            error("Cluster deploy mode is currently not supported for python " +
              "applications on standalone clusters.")
          case (STANDALONE, CLUSTER) if args.isR =>
            error("Cluster deploy mode is currently not supported for R " +
              "applications on standalone clusters.")
          case (LOCAL, CLUSTER) =>
            error("Cluster deploy mode is not compatible with master "local"")
          case (_, CLUSTER) if isShell(args.primaryResource) =>
            error("Cluster deploy mode is not applicable to Spark shells.")
          case (_, CLUSTER) if isSqlShell(args.mainClass) =>
            error("Cluster deploy mode is not applicable to Spark SQL shell.")
          case (_, CLUSTER) if isThriftServer(args.mainClass) =>
            error("Cluster deploy mode is not applicable to Spark Thrift server.")
          case _ =>
        }
        
        // 如果args.deployMode为null的话,给它赋值更新。稍后它将作为Spark的属性向下传递
        // Update args.deployMode if it is null. It will be passed down as a Spark property later.
        (args.deployMode, deployMode) match {
          case (null, CLIENT) => args.deployMode = "client"
          case (null, CLUSTER) => args.deployMode = "cluster"
          case _ =>
        }
        // 根据资源管理器和部署模式,进行逻辑判断出几种特殊运行方式。
        val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
        val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
        val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
        val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
        val isMesosClient = clusterManager == MESOS && deployMode == CLIENT
    
        if (!isMesosCluster && !isStandAloneCluster) {
          // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
          // too for packages that include Python code
          val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(
            args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath,
            args.ivySettingsPath)
    
          if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
            args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
            if (args.isPython || isInternal(args.primaryResource)) {
              args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
            }
          }
    
          // install any R packages that may have been passed through --jars or --packages.
          // Spark Packages may contain R source code inside the jar.
          if (args.isR && !StringUtils.isBlank(args.jars)) {
            RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
          }
        }
    
        args.sparkProperties.foreach { case (k, v) => sparkConf.set(k, v) }
        val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
        val targetDir = Utils.createTempDir()
    
        // assure a keytab is available from any place in a JVM
        if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient) {
          if (args.principal != null) {
            if (args.keytab != null) {
              require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
              // Add keytab and principal configurations in sysProps to make them available
              // for later use; e.g. in spark sql, the isolated class loader used to talk
              // to HiveMetastore will use these settings. They will be set as Java system
              // properties and then loaded by SparkConf
              sparkConf.set(KEYTAB, args.keytab)
              sparkConf.set(PRINCIPAL, args.principal)
              UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
            }
          }
        }
    
        // Resolve glob path for different resources.
        args.jars = Option(args.jars).map(resolveGlobPaths(_, hadoopConf)).orNull
        args.files = Option(args.files).map(resolveGlobPaths(_, hadoopConf)).orNull
        args.pyFiles = Option(args.pyFiles).map(resolveGlobPaths(_, hadoopConf)).orNull
        args.archives = Option(args.archives).map(resolveGlobPaths(_, hadoopConf)).orNull
    
        lazy val secMgr = new SecurityManager(sparkConf)
    
        // In client mode, download remote files.
        var localPrimaryResource: String = null
        var localJars: String = null
        var localPyFiles: String = null
        if (deployMode == CLIENT) {
          localPrimaryResource = Option(args.primaryResource).map {
            downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr)
          }.orNull
          localJars = Option(args.jars).map {
            downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
          }.orNull
          localPyFiles = Option(args.pyFiles).map {
            downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
          }.orNull
        }
    
        // When running in YARN, for some remote resources with scheme:
        //   1. Hadoop FileSystem doesn't support them.
        //   2. We explicitly bypass Hadoop FileSystem with "spark.yarn.dist.forceDownloadSchemes".
        // We will download them to local disk prior to add to YARN's distributed cache.
        // For yarn client mode, since we already download them with above code, so we only need to
        // figure out the local path and replace the remote one.
        if (clusterManager == YARN) {
          val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES)
    
          def shouldDownload(scheme: String): Boolean = {
            forceDownloadSchemes.contains("*") || forceDownloadSchemes.contains(scheme) ||
              Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isFailure
          }
    
          def downloadResource(resource: String): String = {
            val uri = Utils.resolveURI(resource)
            uri.getScheme match {
              case "local" | "file" => resource
              case e if shouldDownload(e) =>
                val file = new File(targetDir, new Path(uri).getName)
                if (file.exists()) {
                  file.toURI.toString
                } else {
                  downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr)
                }
              case _ => uri.toString
            }
          }
    
          args.primaryResource = Option(args.primaryResource).map { downloadResource }.orNull
          args.files = Option(args.files).map { files =>
            Utils.stringToSeq(files).map(downloadResource).mkString(",")
          }.orNull
          args.pyFiles = Option(args.pyFiles).map { pyFiles =>
            Utils.stringToSeq(pyFiles).map(downloadResource).mkString(",")
          }.orNull
          args.jars = Option(args.jars).map { jars =>
            Utils.stringToSeq(jars).map(downloadResource).mkString(",")
          }.orNull
          args.archives = Option(args.archives).map { archives =>
            Utils.stringToSeq(archives).map(downloadResource).mkString(",")
          }.orNull
        }
    
        // If we're running a python app, set the main class to our specific python runner
        。。。。
        // In YARN mode for an R app, add the SparkR package archive and the R package
        // archive containing all of the built R libraries to archives so that they can
        // be distributed with the job
        。。。。
        // TODO: Support distributing R packages with standalone cluster
        。。。。
        // TODO: Support distributing R packages with mesos cluster
        。。。。
        // If we're running an R app, set the main class to our specific R runner
        。。。。   
    
        // Special flag to avoid deprecation warnings at the client
        sys.props("SPARK_SUBMIT") = "true"
    
        // In client mode, launch the application main class directly
        // In addition, add the main application jar and any added jars (if any) to the classpath
        if (deployMode == CLIENT) {
          childMainClass = args.mainClass
          if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
            childClasspath += localPrimaryResource
          }
          if (localJars != null) { childClasspath ++= localJars.split(",") }
        }
        // Add the main application jar and any added jars to classpath in case YARN client
        // requires these jars.
        // This assumes both primaryResource and user jars are local jars, or already downloaded
        // to local by configuring "spark.yarn.dist.forceDownloadSchemes", otherwise it will not be
        // added to the classpath of YARN client.
        if (isYarnCluster) {
          if (isUserJar(args.primaryResource)) {
            childClasspath += args.primaryResource
          }
          if (args.jars != null) { childClasspath ++= args.jars.split(",") }
        }
    
        if (deployMode == CLIENT) {
          if (args.childArgs != null) { childArgs ++= args.childArgs }
        }
    
        // Map all arguments to command-line options or system properties for our chosen mode
        for (opt <- options) {
          if (opt.value != null &&
              (deployMode & opt.deployMode) != 0 &&
              (clusterManager & opt.clusterManager) != 0) {
            if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) }
            if (opt.confKey != null) { sparkConf.set(opt.confKey, opt.value) }
          }
        }
    
        // In case of shells, spark.ui.showConsoleProgress can be true by default or by user.
        if (isShell(args.primaryResource) && !sparkConf.contains(UI_SHOW_CONSOLE_PROGRESS)) {
          sparkConf.set(UI_SHOW_CONSOLE_PROGRESS, true)
        }
    
        // Let YARN know it's a pyspark app, so it distributes needed libraries.
        if (clusterManager == YARN) {
          if (args.isPython) {
            sparkConf.set("spark.yarn.isPython", "true")
          }
        }
    
        // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
        if (isYarnCluster) {
          childMainClass = YARN_CLUSTER_SUBMIT_CLASS
          if (args.isPython) {
            childArgs += ("--primary-py-file", args.primaryResource)
            childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
          } else if (args.isR) {
            val mainFile = new Path(args.primaryResource).getName
            childArgs += ("--primary-r-file", mainFile)
            childArgs += ("--class", "org.apache.spark.deploy.RRunner")
          } else {
            if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
              childArgs += ("--jar", args.primaryResource)
            }
            childArgs += ("--class", args.mainClass)
          }
          if (args.childArgs != null) {
            args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
          }
        }
    
        // Load any properties specified through --conf and the default properties file
        for ((k, v) <- args.sparkProperties) {
          sparkConf.setIfMissing(k, v)
        }
    
        // Ignore invalid spark.driver.host in cluster modes.
        if (deployMode == CLUSTER) {
          sparkConf.remove("spark.driver.host")
        }
    
        // Resolve paths in certain spark properties
        val pathConfigs = Seq(
          "spark.jars",
          "spark.files",
          "spark.yarn.dist.files",
          "spark.yarn.dist.archives",
          "spark.yarn.dist.jars")
        pathConfigs.foreach { config =>
          // Replace old URIs with resolved URIs, if they exist
          sparkConf.getOption(config).foreach { oldValue =>
            sparkConf.set(config, Utils.resolveURIs(oldValue))
          }
        }
    
        // Resolve and format python file paths properly before adding them to the PYTHONPATH.
        // The resolving part is redundant in the case of --py-files, but necessary if the user
        // explicitly sets `spark.submit.pyFiles` in his/her default properties file.
        sparkConf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
          val resolvedPyFiles = Utils.resolveURIs(pyFiles)
          val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) {
            PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
          } else {
            // Ignoring formatting python path in yarn and mesos cluster mode, these two modes
            // support dealing with remote python files, they could distribute and add python files
            // locally.
            resolvedPyFiles
          }
          sparkConf.set("spark.submit.pyFiles", formattedPyFiles)
        }
    
        (childArgs, childClasspath, sparkConf, childMainClass)
      }

    https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

    准备Yarn(Cluster Manager)的执行类:

    使用spark-submit(https://github.com/apache/spark/blob/branch-2.4/bin/spark-submit)启动时,实际上执行的是exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

    在SparkSubmit中

    private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments,conf: Option[HadoopConfiguration] = None): (Seq[String], Seq[String], SparkConf, String)

    方法中会为spark提交做准备,准备好运行环境相关。

    其中这方法内部代码中,发现当cluster manager为yarn时:

    1)当--deploy-mode:cluster时

    会调用YarnClusterApplication进行提交

    YarnClusterApplication这是org.apache.spark.deploy.yarn.Client中的一个内部类,在YarnClusterApplication中new了一个Client对象,并调用了run方法

    private[spark] class YarnClusterApplication extends SparkApplication {
    
      override def start(args: Array[String], conf: SparkConf): Unit = {
        // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
        // so remove them from sparkConf here for yarn mode.
        conf.remove("spark.jars")
        conf.remove("spark.files")
    
        new Client(new ClientArguments(args), conf).run()
      }
    
    }

    https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

    2)当--deploy-mode:client时

    调用application-jar.jar自身main函数,执行的是JavaMainApplication

    /**
     * Entry point for a Spark application. Implementations must provide a no-argument constructor.
     */
    private[spark] trait SparkApplication {
    
      def start(args: Array[String], conf: SparkConf): Unit
    
    }
    
    /**
     * Implementation of SparkApplication that wraps a standard Java class with a "main" method.
     *
     * Configuration is propagated to the application via system properties, so running multiple
     * of these in the same JVM may lead to undefined behavior due to configuration leaks.
     */
    private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {
    
      override def start(args: Array[String], conf: SparkConf): Unit = {
        val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
        if (!Modifier.isStatic(mainMethod.getModifiers)) {
          throw new IllegalStateException("The main method in the given main class must be static")
        }
    
        val sysProps = conf.getAll.toMap
        sysProps.foreach { case (k, v) =>
          sys.props(k) = v
        }
    
        mainMethod.invoke(null, args)
      }
    
    }

    https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/deploy/SparkApplication.scala

    从JavaMainApplication实现可以发现,JavaSparkApplication中调用start方法时,只是通过反射执行application-jar.jar的main函数。

    提交到Yarn

    yarn-cluster运行流程:

    当yarn-custer模式中,YarnClusterApplication类中运行的是Client中run方法,Client#run()中实现了任务提交流程:

    /**
       * Submit an application to the ResourceManager.
       * If set spark.yarn.submit.waitAppCompletion to true, it will stay alive
       * reporting the application's status until the application has exited for any reason.
       * Otherwise, the client process will exit after submission.
       * If the application finishes with a failed, killed, or undefined status,
       * throw an appropriate SparkException.
       */
      def run(): Unit = {
        this.appId = submitApplication()
        if (!launcherBackend.isConnected() && fireAndForget) {
          val report = getApplicationReport(appId)
          val state = report.getYarnApplicationState
          logInfo(s"Application report for $appId (state: $state)")
          logInfo(formatReportDetails(report))
          if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
            throw new SparkException(s"Application $appId finished with status: $state")
          }
        } else {
          val YarnAppReport(appState, finalState, diags) = monitorApplication(appId)
          if (appState == YarnApplicationState.FAILED || finalState == FinalApplicationStatus.FAILED) {
            diags.foreach { err =>
              logError(s"Application diagnostics message: $err")
            }
            throw new SparkException(s"Application $appId finished with failed status")
          }
          if (appState == YarnApplicationState.KILLED || finalState == FinalApplicationStatus.KILLED) {
            throw new SparkException(s"Application $appId is killed")
          }
          if (finalState == FinalApplicationStatus.UNDEFINED) {
            throw new SparkException(s"The final status of application $appId is undefined")
          }
        }
      }

    https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

    在Client类的run()方法中会调用submitApplication()方法,该方法实现:

      /**
       * Submit an application running our ApplicationMaster to the ResourceManager.
       *
       * The stable Yarn API provides a convenience method (YarnClient#createApplication) for
       * creating applications and setting up the application submission context. This was not
       * available in the alpha API.
       */
      def submitApplication(): ApplicationId = {
        var appId: ApplicationId = null
        try {
          launcherBackend.connect()
          yarnClient.init(hadoopConf)
          yarnClient.start()
    
          logInfo("Requesting a new application from cluster with %d NodeManagers"
            .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
    
          // Get a new application from our RM
          val newApp = yarnClient.createApplication()
          val newAppResponse = newApp.getNewApplicationResponse()
          appId = newAppResponse.getApplicationId()
    
          new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
            Option(appId.toString)).setCurrentContext()
    
          // Verify whether the cluster has enough resources for our AM
          verifyClusterResources(newAppResponse)
    
          // Set up the appropriate contexts to launch our AM
          val containerContext = createContainerLaunchContext(newAppResponse)
          val appContext = createApplicationSubmissionContext(newApp, containerContext)
    
          // Finally, submit and monitor the application
          logInfo(s"Submitting application $appId to ResourceManager")
          yarnClient.submitApplication(appContext)
          launcherBackend.setAppId(appId.toString)
          reportLauncherState(SparkAppHandle.State.SUBMITTED)
    
          appId
        } catch {
          case e: Throwable =>
            if (appId != null) {
              cleanupStagingDir(appId)
            }
            throw e
        }
      }

    https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

    run()方法则是实现向yarn中的ResourceManager(后文全部简称RM)提交运行任务,并运行我们的ApplicationMaster(后文简称AM)。

    稳定的Yarn API提供了一种方便的方法(YarnClient#createApplication),用于创建应用程序和设置应用程序提交上下文。

    submitApplication()方法具体操作步骤:

    l  初始化并启动YarnClient,后边将使用yarnClient提供的各种API

    l  通过调用yarnClient#createApplication()方法,从RM获取一个newApp(application),该newApp用于运行AM。通过newApp#getNewApplicationResponse()返回newApp需要资源情况(newAppResponse)。

    l  通过newAppResponse验证集群是否有足够的资源来运行AM。

    l  设置适当的上下文来以启动AM。

    l  调用yarnClient#submitApplication(appContext)向yarn提交任务启动的请求,并监控application。

    yarn-client运行流程:

    • 对于部署方式是Client的情况,SparkSubmit的main函数中通过反射执行应用程序的main方法
    • 在应用程序的main方法中,创建SparkContext实例
    • 在创建SparkContext的实例过程中,通过如下语句创建Scheduler和Backend实例
      private var _schedulerBackend: SchedulerBackend = _
      private var _taskScheduler: TaskScheduler = _
      
      private[spark] def schedulerBackend: SchedulerBackend = _schedulerBackend
    
      private[spark] def taskScheduler: TaskScheduler = _taskScheduler
      private[spark] def taskScheduler_=(ts: TaskScheduler): Unit = {
        _taskScheduler = ts
      }
      
      // 构造函数中初始化赋值
        // Create and start the scheduler
        val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
        _schedulerBackend = sched
    _taskScheduler = ts

    https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala

    SparkContext初始化过程

    在Yarn模式下,SparkContext初始化位置因--deploy-mode不同而不同:

    yarn-cluster模式下:client会先申请向RM(Yarn Resource Manager)一个Container,来启动AM(ApplicationMaster)进程,而SparkContext运行在AM(ApplicationMaster)进程中;

    yarn-client模式下  :在提交节点上执行SparkContext初始化,由client类(JavaMainApplication)调用。

    /**
       * Create a task scheduler based on a given master URL.
       * Return a 2-tuple of the scheduler backend and the task scheduler.
       */
      private def createTaskScheduler(。。。): (SchedulerBackend, TaskScheduler) = {
        。。。
        master match {
          case "local" =>
            。。。
          case LOCAL_N_REGEX(threads) =>
            。。。
          case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
            。。。。
          case SPARK_REGEX(sparkUrl) =>
            。。。。
          case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
            。。。。
          case masterUrl =>
            val cm = getClusterManager(masterUrl) match {
              case Some(clusterMgr) => clusterMgr
              case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
            }
            try {
              val scheduler = cm.createTaskScheduler(sc, masterUrl)
              val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
              cm.initialize(scheduler, backend)
              (backend, scheduler)
            } catch {
              case se: SparkException => throw se
              case NonFatal(e) =>
                throw new SparkException("External scheduler cannot be instantiated", e)
            }
        }
      }
    
      private def getClusterManager(url: String): Option[ExternalClusterManager] = {
        val loader = Utils.getContextOrSparkClassLoader
        val serviceLoaders =
          ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
        if (serviceLoaders.size > 1) {
          throw new SparkException(
            s"Multiple external cluster managers registered for the url $url: $serviceLoaders")
        }
        serviceLoaders.headOption
      }  

    https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/SparkContext.scala

    1)SparkContext#createTaskScheduler(。。。)

    根据不同的资源管理方式cluster manager来创建不同的TaskScheduler,SchedulerBackend。

      1.1)SchedulerBackend与cluster manager资源管理器交互取得应用被分配的资源。

      1.2)TaskSheduler在不同的job之间调度,同时接收被分配的资源,之后由他来给每一个Task分配资源。

    2)SparkContext#createTaskScheduler(。。。)

    最后一个match case是对其他资源管理方式(除了local和standelone{spark://}外的mesos,yarn,kubernetes【外部资源管理器】的资源管理方式)的处理。

    SparkContext#createTaskScheduler(。。。)#master match#case masterUrl下边调用了getClusterManager(masterUrl)方法,该方法返回对象是实现了ExternalClusterManager接口的YarnClusterManager类对象。

    备注:实现了ExternalClusterManager接口的类还包含:

    MesosClusterManager (https://github.com/apache/spark/blob/branch-2.4/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala

    KubernetesClusterManager (https://github.com/apache/spark/blob/branch-2.4/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala

    ExternalClusterManager接口定义:

    private[spark] trait ExternalClusterManager {
      def canCreate(masterURL: String): Boolean
    
      def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
      
      def createSchedulerBackend(sc: SparkContext,
          masterURL: String,
          scheduler: TaskScheduler): SchedulerBackend
          
      def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
    }

    https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala

    ExternalClusterManager接口提供了4个方法:

    -canCreate(masterURL: String):Boolean  检查此群集管理器实例是否可以为某个masterURL创建scheduler组件。

    -createTaskScheduler(sc: SparkContext, masterURL: String):TaskScheduler  为给定的SparkContext创建TaskScheduler实例

    -createSchedulerBackend(sc: SparkContext,masterURL: String,scheduler: TaskScheduler): SchedulerBackend  为给定的SparkContext和调度程序创建SchedulerBackend 。这是在使用“ExternalClusterManager.createTaskScheduler()”创建TaskScheduler后调用的。

    -initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit  初始化TaskScheduler和SchedulerBackend,在创建调度程序组件之后调用。

    YarnClusterManager类定义:

    private[spark] class YarnClusterManager extends ExternalClusterManager {
    
      override def canCreate(masterURL: String): Boolean = {
        masterURL == "yarn"
      }
    
      override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
        sc.deployMode match {
          case "cluster" => new YarnClusterScheduler(sc)
          case "client" => new YarnScheduler(sc)
          case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
        }
      }
    
      override def createSchedulerBackend(sc: SparkContext,
          masterURL: String,
          scheduler: TaskScheduler): SchedulerBackend = {
        sc.deployMode match {
          case "cluster" =>
            new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
          case "client" =>
            new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
          case  _ =>
            throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
        }
      }
    
      override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
        scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
      }
    }

    https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala

    YarnClusterManager#createTaskScheduler(...)

    在该方法中会根据SparkContext对象的deployMode属性来进行分支判断:

    client时,返回YarnScheduler(https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala)实例对象;

    cluster时,返回YarnClusterScheduler(https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala)实例对象。

    YarnClusterManager#createSchedulerBackend(...)

    在该方法中会根据SparkContext对象的deployMode属性来进行分支判断:

    client时,返回YarnClientSchedulerBackend(https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala)实例对象;

    cluster时,返回YarnClusterSchedulerBackend(https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala)实例对象。

    Yarn作业运行运行架构原理解析:

    1、分析Spark on YARN的Cluster模式,从用户提交作业到作业运行结束整个运行期间的过程分析。

    客户端进行操作

    •   1、根据yarnConf来初始化yarnClient,并启动yarnClient
    •   2、创建客户端Application,并获取Application的ID,进一步判断集群中的资源是否满足executor和ApplicationMaster申请的资源,如果不满足则抛出IllegalArgumentException;
    •   3、设置资源、环境变量:其中包括了设置Application的Staging目录、准备本地资源(jar文件、log4j.properties)、设置Application其中的环境变量、创建Container启动的Context等;
    •   4、设置Application提交的Context,包括设置应用的名字、队列、AM的申请的Container、标记该作业的类型为Spark;
    •   5、申请Memory,并最终通过yarnClient.submitApplication向ResourceManager提交该Application。

      当作业提交到YARN上之后,客户端就没事了,甚至在终端关掉那个进程也没事,因为整个作业运行在YARN集群上进行,运行的结果将会保存到HDFS或者日志中。

    提交到YARN集群,YARN操作

    •   1、运行ApplicationMaster的run方法;
    •   2、设置好相关的环境变量。
    •   3、创建amClient,并启动;
    •   4、在Spark UI启动之前设置Spark UI的AmIpFilter;
    •   5、在startUserClass函数专门启动了一个线程(名称为Driver的线程)来启动用户提交的Application,也就是启动了Driver。在Driver中将会初始化SparkContext;
    •   6、等待SparkContext初始化完成,最多等待spark.yarn.applicationMaster.waitTries次数(默认为10),如果等待了的次数超过了配置的,程序将会退出;否则用SparkContext初始化yarnAllocator;

      怎么知道SparkContext初始化完成?
      其实在5步骤中启动Application的过程中会初始化SparkContext,在初始化SparkContext的时候将会创建YarnClusterScheduler,在SparkContext初始化完成的时候,会调用YarnClusterScheduler类中的postStartHook方法,而该方法会通知ApplicationMaster已经初始化好了SparkContext

    •   7、当SparkContext、Driver初始化完成的时候,通过amClient向ResourceManager注册ApplicationMaster
    •   8、分配并启动Executeors。在启动Executeors之前,先要通过yarnAllocator获取到numExecutors个Container,然后在Container中启动Executeors。如果在启动Executeors的过程中失败的次数达到了maxNumExecutorFailures的次数,maxNumExecutorFailures的计算规则如下:
    // Default to numExecutors * 2, with minimum of 3
    private val maxNumExecutorFailures =sparkConf.getInt("spark.yarn.max.executor.failures",
    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors *2,3)))

      那么这个Application将失败,将Application Status标明为FAILED,并将关闭SparkContext。其实,启动Executeors是通过ExecutorRunnable实现的,而ExecutorRunnable内部是启动CoarseGrainedExecutorBackend的。

    •   9、最后,Task将在CoarseGrainedExecutorBackend里面运行,然后运行状况会通过Akka通知CoarseGrainedScheduler,直到作业运行完成。

    2、Spark on YARN client 模式作业运行全过程分析

    我们知道Spark on yarn有两种模式:yarn-cluster和yarn-client。这两种模式作业虽然都是在yarn上面运行,但是其中的运行方式很不一样,今天我就来谈谈Spark on YARN yarn-client模式作业从提交到运行的过程剖析。
      和yarn-cluster模式一样,整个程序也是通过spark-submit脚本提交的。但是yarn-client作业程序的运行不需要通过Client类来封装启动,而是直接通过反射机制调用作业的main函数。下面就来分析:

    •   1、通过SparkSubmit类的launch的函数直接调用作业的main函数(通过反射机制实现),如果是集群模式就会调用Client的main函数。
    •   2、而应用程序的main函数一定都有个SparkContent,并对其进行初始化;
    •   3、在SparkContent初始化中将会依次做如下的事情:设置相关的配置、注册MapOutputTracker、BlockManagerMaster、BlockManager,创建taskScheduler和dagScheduler;其中比较重要的是创建taskScheduler和dagScheduler。在创建taskScheduler的时候会根据我们传进来的master来选择Scheduler和SchedulerBackend。由于我们选择的是yarn-client模式,程序会选择YarnClientClusterScheduler和YarnClientSchedulerBackend,并将YarnClientSchedulerBackend的实例初始化YarnClientClusterScheduler,上面两个实例的获取都是通过反射机制实现的,YarnClientSchedulerBackend类是CoarseGrainedSchedulerBackend类的子类,YarnClientClusterScheduler是TaskSchedulerImpl的子类,仅仅重写了TaskSchedulerImpl中的getRackForHost方法。
    •   4、初始化完taskScheduler后,将创建dagScheduler,然后通过taskScheduler.start()启动taskScheduler,而在taskScheduler启动的过程中也会调用SchedulerBackend的start方法。在SchedulerBackend启动的过程中将会初始化一些参数,封装在ClientArguments中,并将封装好的ClientArguments传进Client类中,并client.runApp()方法获取Application ID。
    •   5、client.runApp里面的做是和前面客户端进行操作那节类似,不同的是在里面启动是ExecutorLauncher(yarn-cluster模式启动的是ApplicationMaster)。
    •   6、在ExecutorLauncher里面会初始化并启动amClient,然后向ApplicationMaster注册该Application。注册完之后将会等待driver的启动,当driver启动完之后,会创建一个MonitorActor对象用于和CoarseGrainedSchedulerBackend进行通信(只有事件AddWebUIFilter他们之间才通信,Task的运行状况不是通过它和CoarseGrainedSchedulerBackend通信的)。然后就是设置addAmIpFilter,当作业完成的时候,ExecutorLauncher将通过amClient设置Application的状态为FinalApplicationStatus.SUCCEEDED。
    •   7、分配Executors,这里面的分配逻辑和yarn-cluster里面类似,就不再说了。
    •   8、最后,Task将在CoarseGrainedExecutorBackend里面运行,然后运行状况会通过Akka通知CoarseGrainedScheduler,直到作业运行完成。
    •   9、在作业运行的时候,YarnClientSchedulerBackend会每隔1秒通过client获取到作业的运行状况,并打印出相应的运行信息,当Application的状态是FINISHED、FAILED和KILLED中的一种,那么程序将退出等待。
    •   10、最后有个线程会再次确认Application的状态,当Application的状态是FINISHED、FAILED和KILLED中的一种,程序就运行完成,并停止SparkContext。整个过程就结束了。

    YARN-Cluster运行架构原理

    在YARN-Cluster模式中,当用户向YARN中提交一个应用程序后,YARN将分两个阶段运行该应用程序:

    • 1.第一个阶段是把Spark的Driver作为一个ApplicationMaster在YARN集群中先启动;
    • 2.第二个阶段是由ApplicationMaster创建应用程序,然后为它向ResourceManager申请资源,并启动Executor来运行Task,同时监控它的整个运行过程,直到运行完成

    说明如下:

    • Spark Yarn Client向YARN中提交应用程序,包括ApplicationMaster程序、启动ApplicationMaster的命令、需要在Executor中运行的程序等;
    • ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,其中ApplicationMaster进行SparkContext等的初始化;
    • ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态,然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控它们的运行状态直到运行结束;
    • 一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向ApplicationMaster中的SparkContext注册并申请Task。这一点和Standalone模式一样,只不过SparkContext在Spark Application中初始化时,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler进行任务的调度,其中YarnClusterScheduler只是对TaskSchedulerImpl的一个简单包装,增加了对Executor的等待逻辑等;
    • ApplicationMaster中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
    • 应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己;

    跟踪CoarseGrainedExecutorBackend启动脚本:

      1 [root@CDH-143 bin]$ yarn applicationattempt -list application_1559203334026_0010
      2 19/05/31 09:36:10 INFO client.RMProxy: Connecting to ResourceManager at CDH-143/10.132.52.143:8032
      3 Total number of application attempts :1
      4          ApplicationAttempt-Id                 State                        AM-Container-Id                            Tracking-URL
      5 appattempt_1559203334026_0010_000001                 RUNNING    container_1559203334026_0010_01_000001  http://CDH-143:8088/proxy/application_1559203334026_0010/
      6 
      7 [root@CDH-143 bin]$ yarn container -list appattempt_1559203334026_0010_000001
      8 19/05/31 09:36:51 INFO client.RMProxy: Connecting to ResourceManager at CDH-143/10.132.52.143:8032
      9 Total number of containers :16
     10                   Container-Id            Start Time             Finish Time                   State                    Host                                LOG-URL
     11 container_1559203334026_0010_01_000015  Thu May 30 19:52:19 +0800 2019                   N/A                 RUNNING            CDH-146:8041    http://CDH-146:8042/node/containerlogs/container_1559203334026_0010_01_000015/dx
     12 container_1559203334026_0010_01_000016  Thu May 30 19:52:19 +0800 2019                   N/A                 RUNNING            CDH-146:8041    http://CDH-146:8042/node/containerlogs/container_1559203334026_0010_01_000016/dx
     13 container_1559203334026_0010_01_000003  Thu May 30 19:52:19 +0800 2019                   N/A                 RUNNING            CDH-141:8041    http://CDH-141:8042/node/containerlogs/container_1559203334026_0010_01_000003/dx
     14 container_1559203334026_0010_01_000004  Thu May 30 19:52:19 +0800 2019                   N/A                 RUNNING            CDH-141:8041    http://CDH-141:8042/node/containerlogs/container_1559203334026_0010_01_000004/dx
     15 container_1559203334026_0010_01_000005  Thu May 30 19:52:19 +0800 2019                   N/A                 RUNNING            CDH-141:8041    http://CDH-141:8042/node/containerlogs/container_1559203334026_0010_01_000005/dx
     16 container_1559203334026_0010_01_000006  Thu May 30 19:52:19 +0800 2019                   N/A                 RUNNING            CDH-141:8041    http://CDH-141:8042/node/containerlogs/container_1559203334026_0010_01_000006/dx
     17 container_1559203334026_0010_01_000001  Thu May 30 19:52:06 +0800 2019                   N/A                 RUNNING            CDH-142:8041    http://CDH-142:8042/node/containerlogs/container_1559203334026_0010_01_000001/dx
     18 container_1559203334026_0010_01_000002  Thu May 30 19:52:19 +0800 2019                   N/A                 RUNNING            CDH-141:8041    http://CDH-141:8042/node/containerlogs/container_1559203334026_0010_01_000002/dx
     19 container_1559203334026_0010_01_000011  Thu May 30 19:52:19 +0800 2019                   N/A                 RUNNING            CDH-146:8041    http://CDH-146:8042/node/containerlogs/container_1559203334026_0010_01_000011/dx
     20 container_1559203334026_0010_01_000012  Thu May 30 19:52:19 +0800 2019                   N/A                 RUNNING            CDH-146:8041    http://CDH-146:8042/node/containerlogs/container_1559203334026_0010_01_000012/dx
     21 container_1559203334026_0010_01_000013  Thu May 30 19:52:19 +0800 2019                   N/A                 RUNNING            CDH-146:8041    http://CDH-146:8042/node/containerlogs/container_1559203334026_0010_01_000013/dx
     22 container_1559203334026_0010_01_000014  Thu May 30 19:52:19 +0800 2019                   N/A                 RUNNING            CDH-146:8041    http://CDH-146:8042/node/containerlogs/container_1559203334026_0010_01_000014/dx
     23 container_1559203334026_0010_01_000007  Thu May 30 19:52:19 +0800 2019                   N/A                 RUNNING            CDH-141:8041    http://CDH-141:8042/node/containerlogs/container_1559203334026_0010_01_000007/dx
     24 container_1559203334026_0010_01_000008  Thu May 30 19:52:19 +0800 2019                   N/A                 RUNNING            CDH-141:8041    http://CDH-141:8042/node/containerlogs/container_1559203334026_0010_01_000008/dx
     25 container_1559203334026_0010_01_000009  Thu May 30 19:52:19 +0800 2019                   N/A                 RUNNING            CDH-141:8041    http://CDH-141:8042/node/containerlogs/container_1559203334026_0010_01_000009/dx
     26 container_1559203334026_0010_01_000010  Thu May 30 19:52:19 +0800 2019                   N/A                 RUNNING            CDH-146:8041    http://CDH-146:8042/node/containerlogs/container_1559203334026_0010_01_000010/dx
     27 
     28 [root@CDH-141 ~]$ ps axu | grep container_1559203334026_0010_01_000003
     29 yarn     30557  0.0  0.0 113144  1496 ?        S    May30   0:00 bash 
     30     /data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/default_container_executor.sh
     31 yarn     30569  0.0  0.0 113280  1520 ?        Ss   May30   0:00 /bin/bash -c /usr/java/jdk1.8.0_171-amd64/bin/java 
     32     -server -Xmx6144m 
     33     -Djava.io.tmpdir=/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/tmp 
     34     '-Dspark.driver.port=50365' 
     35     '-Dspark.network.timeout=10000000' 
     36     '-Dspark.port.maxRetries=32' 
     37     -Dspark.yarn.app.container.log.dir=/data4/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003 
     38     -XX:OnOutOfMemoryError='kill %p'
     39     org.apache.spark.executor.CoarseGrainedExecutorBackend 
     40     --driver-url spark://CoarseGrainedScheduler@CDH-143:50365 
     41     --executor-id 2 
     42     --hostname CDH-141 
     43     --cores 2 
     44     --app-id application_1559203334026_0010 
     45     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/__app__.jar 
     46     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/dx-domain-perf-3.0.0.jar    
     47     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/dx-common-3.0.0.jar 
     48     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/spark-sql-kafka-0-10_2.11-2.4.0.jar 
     49     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/spark-avro_2.11-3.2.0.jar 
     50     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/shc-core-1.1.2-2.2-s_2.11-SNAPSHOT.jar 
     51     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/rocksdbjni-5.17.2.jar 
     52     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/kafka-clients-0.10.0.1.jar 
     53     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/elasticsearch-spark-20_2.11-6.4.1.jar 
     54     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/dx_Spark_State_Store_Plugin-1.0-SNAPSHOT.jar 
     55     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/bijection-core_2.11-0.9.5.jar 
     56     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/bijection-avro_2.11-0.9.5.jar 
     57     1>/data4/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003/stdout 
     58     2>/data4/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003/stderr
     59 yarn     30700  161  5.3 8738480 7032916 ?     Sl   May30 1392:01 /usr/java/jdk1.8.0_171-amd64/bin/java 
     60     -server -Xmx6144m 
     61     -Djava.io.tmpdir=/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/tmp 
     62     -Dspark.driver.port=50365 
     63     -Dspark.network.timeout=10000000 
     64     -Dspark.port.maxRetries=32 
     65     -Dspark.yarn.app.container.log.dir=/data4/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003 
     66     -XX:OnOutOfMemoryError=kill %p 
     67     org.apache.spark.executor.CoarseGrainedExecutorBackend 
     68     --driver-url spark://CoarseGrainedScheduler@CDH-143:50365 
     69     --executor-id 2 
     70     --hostname CDH-141 
     71     --cores 2 
     72     --app-id application_1559203334026_0010 
     73     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/__app__.jar 
     74     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/dx-domain-perf-3.0.0.jar 
     75     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/dx-common-3.0.0.jar 
     76     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/spark-sql-kafka-0-10_2.11-2.4.0.jar 
     77     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/spark-avro_2.11-3.2.0.jar 
     78     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/shc-core-1.1.2-2.2-s_2.11-SNAPSHOT.jar 
     79     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/rocksdbjni-5.17.2.jar 
     80     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/kafka-clients-0.10.0.1.jar 
     81     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/elasticsearch-spark-20_2.11-6.4.1.jar 
     82     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/dx_Spark_State_Store_Plugin-1.0-SNAPSHOT.jar 
     83     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/bijection-core_2.11-0.9.5.jar 
     84     --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/bijection-avro_2.11-0.9.5.jar
     85 dx     37775  0.0  0.0 112780   952 pts/1    S+   10:14   0:00 grep --color=auto container_1559203334026_0010_01_000003
     86 
     87 
     88 [root@CDH-141 dx]# more /data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/default_container_executor.sh
     89 #!/bin/bash
     90 /bin/bash "/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/default_container_executor_session.sh"
     91 rc=$?
     92 echo $rc > "/data6/yarn/nm/nmPrivate/application_1559203334026_0010/container_1559203334026_0010_01_000003/container_1559203334026_0010_01_000003.pid.exitcode.tmp"
     93 /bin/mv -f "/data6/yarn/nm/nmPrivate/application_1559203334026_0010/container_1559203334026_0010_01_000003/container_1559203334026_0010_01_000003.pid.exitcode.tmp" 
     94 "/data6/yarn/nm/nmPrivate/application_1559203334026_0010/container_1559203334026_0010_01_000003/container_1559203334026_0010_01_000003.pid.exitcode"
     95 exit $rc
     96 
     97 [root@CDH-141 dx]# more /data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/default_container_executor_session.sh
     98 #!/bin/bash
     99 
    100 echo $$ > /data6/yarn/nm/nmPrivate/application_1559203334026_0010/container_1559203334026_0010_01_000003/container_1559203334026_0010_01_000003.pid.tmp
    101 /bin/mv -f /data6/yarn/nm/nmPrivate/application_1559203334026_0010/container_1559203334026_0010_01_000003/container_1559203334026_0010_01_000003.pid.tmp 
    102 /data6/yarn/nm/nmPrivate/application_1559203334026_0010/container_1559203334026_0010_01_000003/container_1559203334026_0010_01_000003.pid
    103 exec setsid /bin/bash "/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/launch_container.sh"
    104 
    105 
    106 [root@CDH-141 dx]# more /data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/launch_container.sh
    107 #!/bin/bash
    108 
    109 export SPARK_YARN_STAGING_DIR="hdfs://CDH-143:8020/user/dx/.sparkStaging/application_1559203334026_0010"
    110 export HADOOP_CONF_DIR="/run/cloudera-scm-agent/process/2037-yarn-NODEMANAGER"
    111 export JAVA_HOME="/usr/java/jdk1.8.0_171-amd64"
    112 export SPARK_LOG_URL_STDOUT="http://CDH-141:8042/node/containerlogs/container_1559203334026_0010_01_000003/dx/stdout?start=-4096"
    113 export NM_HOST="CDH-141"
    114 export HADOOP_HDFS_HOME="/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-hdfs"
    115 export LOGNAME="dx"
    116 export JVM_PID="$$"
    117 export PWD="/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003"
    118 export HADOOP_COMMON_HOME="/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop"
    119 export LOCAL_DIRS="/data1/yarn/nm/usercache/dx/appcache/application_1559203334026_0010,/data2/yarn/nm/usercache/dx/appcache/application_1559203334026_0010,/data3/ya
    120 rn/nm/usercache/dx/appcache/application_1559203334026_0010,/data4/yarn/nm/usercache/dx/appcache/application_1559203334026_0010,/data5/yarn/nm/usercache/dx/appcach
    121 e/application_1559203334026_0010,/data6/yarn/nm/usercache/dx/appcache/application_1559203334026_0010,/opt/yarn/nm/usercache/dx/appcache/application_1559203334026_00
    122 10"
    123 export NM_HTTP_PORT="8042"
    124 export SPARK_DIST_CLASSPATH="/opt/cloudera/parcels/SPARK2-2.4.0.cloudera1-1.cdh5.13.3.p0.1007356/lib/spark2/kafka-0.10/*:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/jars/xmlenc-0.52
    125 .jar:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/jars/*.jar:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/LICENSE.txt:/op
    126 t/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/NOTICE.txt"
    127 export LOG_DIRS="/data1/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003,/data2/yarn/container-logs/application_1559203334026_0
    128 010/container_1559203334026_0010_01_000003,/data3/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003,/data4/yarn/container-logs/a
    129 pplication_1559203334026_0010/container_1559203334026_0010_01_000003,/data5/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003,/d
    130 ata6/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003,/opt/yarn/container-logs/application_1559203334026_0010/container_1559203
    131 334026_0010_01_000003"
    132 export NM_AUX_SERVICE_mapreduce_shuffle="AAA0+gAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=
    133 "
    134 export NM_PORT="8041"
    135 export USER="dx"
    136 export HADOOP_YARN_HOME="/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-yarn"
    137 export CLASSPATH="$PWD:$PWD/__spark_conf__:$PWD/__spark_libs__/*:$HADOOP_CLIENT_CONF_DIR:$HADOOP_CONF_DIR:$HADOOP_COMMON_HOME/*:$HADOOP_COMMON_HOME/lib/*:$HADOOP_HDFS_H
    138 OME/*:$HADOOP_HDFS_HOME/lib/*:$HADOOP_YARN_HOME/*:$HADOOP_YARN_HOME/lib/*:$HADOOP_MAPRED_HOME/*:$HADOOP_MAPRED_HOME/lib/*:$MR2_CLASSPATH:/opt/cloudera/parcels/SPARK2-2.
    139 4.0.cloudera1-1.cdh5.13.3.p0.1007356/lib/spark2/kafka-0.10/*:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/jars/*.jar
    140 OTICE.txt:$PWD/__spark_conf__/__hadoop_conf__"
    141 export HADOOP_TOKEN_FILE_LOCATION="/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/container_tokens"
    142 export NM_AUX_SERVICE_spark_shuffle=""
    143 export SPARK_USER="dx"
    144 export SPARK_LOG_URL_STDERR="http://CDH-141:8042/node/containerlogs/container_1559203334026_0010_01_000003/dx/stderr?start=-4096"
    145 export HOME="/home/"
    146 export CONTAINER_ID="container_1559203334026_0010_01_000003"
    147 export MALLOC_ARENA_MAX="4"
    148 ln -sf "/data5/yarn/nm/usercache/dx/filecache/1427931/kafka-clients-0.10.0.1.jar" "kafka-clients-0.10.0.1.jar"
    149 hadoop_shell_errorcode=$?
    150 if [ $hadoop_shell_errorcode -ne 0 ]
    151 then
    152   exit $hadoop_shell_errorcode
    153 fi
    154 ln -sf "/data6/yarn/nm/usercache/dx/filecache/1427932/elasticsearch-spark-20_2.11-6.4.1.jar" "elasticsearch-spark-20_2.11-6.4.1.jar"
    155 hadoop_shell_errorcode=$?
    156 if [ $hadoop_shell_errorcode -ne 0 ]
    157 then
    158   exit $hadoop_shell_errorcode
    159 fi
    160 ln -sf "/opt/yarn/nm/usercache/dx/filecache/1427933/__spark_libs__3031377885391114478.zip" "__spark_libs__"
    161 hadoop_shell_errorcode=$?
    162 if [ $hadoop_shell_errorcode -ne 0 ]
    163 then
    164   exit $hadoop_shell_errorcode
    165 fi
    166 ln -sf "/data6/yarn/nm/usercache/dx/filecache/1427925/dx_Spark_State_Store_Plugin-1.0-SNAPSHOT.jar" "dx_Spark_State_Store_Plugin-1.0-SNAPSHOT.jar"
    167 hadoop_shell_errorcode=$?
    168 if [ $hadoop_shell_errorcode -ne 0 ]
    169 then
    170   exit $hadoop_shell_errorcode
    171 fi
    172 ln -sf "/data3/yarn/nm/usercache/dx/filecache/1427929/spark-sql-kafka-0-10_2.11-2.4.0.jar" "spark-sql-kafka-0-10_2.11-2.4.0.jar"
    173 hadoop_shell_errorcode=$?
    174 if [ $hadoop_shell_errorcode -ne 0 ]
    175 then
    176   exit $hadoop_shell_errorcode
    177 fi
    178 ln -sf "/data4/yarn/nm/usercache/dx/filecache/1427923/streaming-common-3.0.0.jar" "streaming-common-3.0.0.jar"
    179 hadoop_shell_errorcode=$?
    180 if [ $hadoop_shell_errorcode -ne 0 ]
    181 then
    182   exit $hadoop_shell_errorcode
    183 fi
    184 ln -sf "/data1/yarn/nm/usercache/dx/filecache/1427934/spark-avro_2.11-3.2.0.jar" "spark-avro_2.11-3.2.0.jar"
    185 hadoop_shell_errorcode=$?
    186 if [ $hadoop_shell_errorcode -ne 0 ]
    187 then
    188   exit $hadoop_shell_errorcode
    189 fi
    190 ln -sf "/data2/yarn/nm/usercache/dx/filecache/1427928/bijection-avro_2.11-0.9.5.jar" "bijection-avro_2.11-0.9.5.jar"
    191 hadoop_shell_errorcode=$?
    192 if [ $hadoop_shell_errorcode -ne 0 ]
    193 then
    194   exit $hadoop_shell_errorcode
    195 fi
    196 ln -sf "/data2/yarn/nm/usercache/dx/filecache/1427935/shc-core-1.1.2-2.2-s_2.11-SNAPSHOT.jar" "shc-core-1.1.2-2.2-s_2.11-SNAPSHOT.jar"
    197 hadoop_shell_errorcode=$?
    198 if [ $hadoop_shell_errorcode -ne 0 ]
    199 then
    200   exit $hadoop_shell_errorcode
    201 fi
    202 ln -sf "/data1/yarn/nm/usercache/dx/filecache/1427927/bijection-core_2.11-0.9.5.jar" "bijection-core_2.11-0.9.5.jar"
    203 hadoop_shell_errorcode=$?
    204 if [ $hadoop_shell_errorcode -ne 0 ]
    205 then
    206   exit $hadoop_shell_errorcode
    207 fi
    208 ln -sf "/data5/yarn/nm/usercache/dx/filecache/1427924/rocksdbjni-5.17.2.jar" "rocksdbjni-5.17.2.jar"
    209 hadoop_shell_errorcode=$?
    210 if [ $hadoop_shell_errorcode -ne 0 ]
    211 then
    212   exit $hadoop_shell_errorcode
    213 fi
    214 ln -sf "/opt/yarn/nm/usercache/dx/filecache/1427926/__spark_conf__.zip" "__spark_conf__"
    215 hadoop_shell_errorcode=$?
    216 if [ $hadoop_shell_errorcode -ne 0 ]
    217 then
    218   exit $hadoop_shell_errorcode
    219 fi
    220 ln -sf "/data4/yarn/nm/usercache/dx/filecache/1427930/dx-domain-perf-3.0.0.jar" "dx-domain-perf-3.0.0.jar"
    221 hadoop_shell_errorcode=$?
    222 if [ $hadoop_shell_errorcode -ne 0 ]
    223 then
    224   exit $hadoop_shell_errorcode
    225 fi
    226 exec /bin/bash -c "$JAVA_HOME/bin/java -server -Xmx6144m -Djava.io.tmpdir=$PWD/tmp 
    227 '-Dspark.driver.port=50365' 
    228 '-Dspark.network.timeout=10000000' 
    229 '-Dspark.port.maxRetries=32' 
    230 -Dspark.yarn.app.container.log.dir=/data4/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003 
    231 -XX:OnOutOfMemoryError='kill %p' 
    232 org.apache.spark.executor.CoarseGrainedExecutorBackend 
    233 --driver-url spark://CoarseGrainedScheduler@CDH-143:50365 
    234 --executor-id 2 
    235 --hostname CDH-141 
    236 --cores 2 
    237 --app-id application_1559203334026_0010 
    238 --user-class-path file:$PWD/__app__.jar 
    239 --user-class-path file:$PWD/dx-domain-perf-3.0.0.jar 
    240 --user-class-path file:$PWD/streaming-common-3.0.0.jar 
    241 --user-class-path file:$PWD/spark-sql-kafka-0-10_2.11-2.4.0.jar 
    242 --user-class-path file:$PWD/spark-avro_2.11-3.2.0.jar 
    243 --user-class-path file:$PWD/shc-core-1.1.2-2.2-s_2.11-SNAPSHOT.jar 
    244 --user-class-path file:$PWD/rocksdbjni-5.17.2.jar 
    245 --user-class-path file:$PWD/kafka-clients-0.10.0.1.jar 
    246 --user-class-path file:$PWD/elasticsearch-spark-20_2.11-6.4.1.jar 
    247 --user-class-path file:$PWD/dx_Spark_State_Store_Plugin-1.0-SNAPSHOT.jar 
    248 --user-class-path file:$PWD/bijection-core_2.11-0.9.5.jar 
    249 --user-class-path file:$PWD/bijection-avro_2.11-0.9.5.jar 
    250 1>/data4/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003/stdout 
    251 2>/data4/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003/stderr"
    252 hadoop_shell_errorcode=$?
    253 if [ $hadoop_shell_errorcode -ne 0 ]
    254 then
    255   exit $hadoop_shell_errorcode
    256 fi
    257 [root@CDH-141 dx]# 
    View Code

    YARN-Client运行架构原理

    说明如下:

    • Spark Yarn Client向YARN的ResourceManager申请启动Application Master。同时在SparkContent初始化中将创建DAGScheduler和TASKScheduler等,由于我们选择的是Yarn-Client模式,程序会选择YarnClientClusterSchedulerYarnScheduler和YarnClientSchedulerBackend;
    • ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,与YARN-Cluster区别的是在该ApplicationMaster不运行SparkContext,只与SparkContext进行联系进行资源的分派;
    • Client中的SparkContext初始化完毕后,与ApplicationMaster建立通讯,向ResourceManager注册,根据任务信息向ResourceManager申请资源(Container);
    • 一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向Client中的SparkContext注册并申请Task;
    • client中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向Driver汇报运行的状态和进度,以让Client随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
    • 应用程序运行完成后,Client的SparkContext向ResourceManager申请注销并关闭自己。

    Client模式 vs Cluster模式

    • 理解YARN-Client和YARN-Cluster深层次的区别之前先清楚一个概念:Application Master。在YARN中,每个Application实例都有一个ApplicationMaster进程,它是Application启动的第一个容器。它负责和ResourceManager打交道并请求资源,获取资源之后告诉NodeManager为其启动Container。从深层次的含义讲YARN-Cluster和YARN-Client模式的区别其实就是ApplicationMaster进程的区别;
    • YARN-Cluster模式下,Driver运行在AM(Application Master)中,它负责向YARN申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉Client,作业会继续在YARN上运行,因而YARN-Cluster模式不适合运行交互类型的作业;
    • YARN-Client模式下,Application Master仅仅向YARN请求Executor,Client会和请求的Container通信来调度他们工作,也就是说Client不能离开;

    提交涉及重要类:

    JavaMainApplication

    https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/deploy/SparkApplication.scala

    StandaloneAppClient

    https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala

    SparkSubmitArguments

    https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

    ApplicationMaster

    https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

    ClientApp

    https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/deploy/Client.scala

    LauncherBackend

    https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala

    YarnClient

    https://github.com/apache/hadoop/blob/branch-2.7.0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java

    YarnClientImpl

    https://github.com/apache/hadoop/blob/branch-2.7.0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java

    ApplicationClientProtocol

    https://github.com/apache/hadoop/blob/branch-2.7.0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java

    ApplicationClientProtocolPBClientImpl

    https://github.com/apache/hadoop/blob/branch-2.7.0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java

    参考文章:

    Yarn源码剖析(三)--- ApplicationMaster的启动

    https://blog.csdn.net/weixin_42642341/article/details/81636135

    Yarn源码剖析(二) --- spark-submit

    https://blog.csdn.net/weixin_42642341/article/details/81544101
    Spark On YARN启动流程源码分析

    https://blog.csdn.net/CRISPY_RICE/article/details/71255113

    【Spark三十六】Spark On Yarn之yarn-client方式部署

    https://bit1129.iteye.com/blog/2182018

    白话Spark——DAGScheduler,TaskScheduler,SchedulerBackend模块实现机制

    https://blog.csdn.net/handoking/article/details/81122877

  • 相关阅读:
    Git 基础
    SharePoint 2013 对象模型操作"网站设置"菜单
    SharePoint 2013 隐藏部分Ribbon菜单
    SharePoint 2013 Designer系列之数据视图筛选
    SharePoint 2013 Designer系列之数据视图
    SharePoint 2013 Designer系列之自定义列表表单
    SharePoint 2013 设置自定义布局页
    SharePoint 2013 "通知我"功能简介
    SharePoint 2013 创建web应用程序报错"This page can’t be displayed"
    SharePoint 禁用本地回环的两个方法
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/10934090.html
Copyright © 2011-2022 走看看