zoukankan      html  css  js  c++  java
  • 01 spark2.11-作业提交submit源码分析

    spark2.11-作业提交submit源码分析

    分析spark的第一步spark-submit

    SparkSubmit

    object SparkSubmit {
    
      // Cluster managers
      private val YARN = 1
      private val STANDALONE = 2
      private val MESOS = 4
      private val LOCAL = 8
      private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
    
      // Deploy modes
      private val CLIENT = 1
      private val CLUSTER = 2
      private val ALL_DEPLOY_MODES = CLIENT | CLUSTER
    
      // Special primary resource names that represent shells rather than application jars.
      private val SPARK_SHELL = "spark-shell"
      private val PYSPARK_SHELL = "pyspark-shell"
      private val SPARKR_SHELL = "sparkr-shell"
      private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip"
      private val R_PACKAGE_ARCHIVE = "rpkg.zip"
    
      private val CLASS_NOT_FOUND_EXIT_STATUS = 101
    
      // scalastyle:off println
      // Exposed for testing
      private[spark] var exitFn: Int => Unit = (exitCode: Int) => System.exit(exitCode)
      private[spark] var printStream: PrintStream = System.err
      private[spark] def printWarning(str: String): Unit = printStream.println("Warning: " + str)
      private[spark] def printErrorAndExit(str: String): Unit = {
        printStream.println("Error: " + str)
        printStream.println("Run with --help for usage help or --verbose for debug output")
        exitFn(1)
      }
      private[spark] def printVersionAndExit(): Unit = {
        printStream.println("""Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _ / _ / _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_   version %s
          /_/
                            """.format(SPARK_VERSION))
        printStream.println("Using Scala %s, %s, %s".format(
          Properties.versionString, Properties.javaVmName, Properties.javaVersion))
        printStream.println("Branch %s".format(SPARK_BRANCH))
        printStream.println("Compiled by user %s on %s".format(SPARK_BUILD_USER, SPARK_BUILD_DATE))
        printStream.println("Revision %s".format(SPARK_REVISION))
        printStream.println("Url %s".format(SPARK_REPO_URL))
        printStream.println("Type --help for more information.")
        exitFn(0)
      }
      // scalastyle:on println
    	/**
    	程序入口
    	*/
      def main(args: Array[String]): Unit = {
        val appArgs = new SparkSubmitArguments(args)
        if (appArgs.verbose) {
          // scalastyle:off println
          printStream.println(appArgs)
          // scalastyle:on println
        }
        appArgs.action match {
          case SparkSubmitAction.SUBMIT => submit(appArgs)
          case SparkSubmitAction.KILL => kill(appArgs)
          case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
        }
      }
    
      /**
       * Kill an existing submission using the REST protocol. Standalone and Mesos cluster mode only.
       */
      private def kill(args: SparkSubmitArguments): Unit = {
        new RestSubmissionClient(args.master)
          .killSubmission(args.submissionToKill)
      }
    
      /**
       * Request the status of an existing submission using the REST protocol.
       * Standalone and Mesos cluster mode only.
       */
      private def requestStatus(args: SparkSubmitArguments): Unit = {
        new RestSubmissionClient(args.master)
          .requestSubmissionStatus(args.submissionToRequestStatusFor)
      }
    
      /**
       * Submit the application using the provided parameters.
       *
       * This runs in two steps. First, we prepare the launch environment by setting up
       * the appropriate classpath, system properties, and application arguments for
       * running the child main class based on the cluster manager and the deploy mode.
       * Second, we use this launch environment to invoke the main method of the child
       * main class.
       提交应用程序
       */
      @tailrec
      private def submit(args: SparkSubmitArguments): Unit = {
        val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
    
        def doRunMain(): Unit = {
          if (args.proxyUser != null) {
            val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
              UserGroupInformation.getCurrentUser())
            try {
              proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
                override def run(): Unit = {
                  runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
                }
              })
            } catch {
              case e: Exception =>
                // Hadoop's AuthorizationException suppresses the exception's stack trace, which
                // makes the message printed to the output by the JVM not very helpful. Instead,
                // detect exceptions with empty stack traces here, and treat them differently.
                if (e.getStackTrace().length == 0) {
                  // scalastyle:off println
                  printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
                  // scalastyle:on println
                  exitFn(1)
                } else {
                  throw e
                }
            }
          } else {
            runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
          }
        }
    
         // In standalone cluster mode, there are two submission gateways:
         //   (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
         //   (2) The new REST-based gateway introduced in Spark 1.3
         // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
         // to use the legacy gateway if the master endpoint turns out to be not a REST server.
        if (args.isStandaloneCluster && args.useRest) {
          try {
            // scalastyle:off println
            printStream.println("Running Spark using the REST application submission protocol.")
            // scalastyle:on println
            doRunMain()
          } catch {
            // Fail over to use the legacy submission gateway
            case e: SubmitRestConnectionException =>
              printWarning(s"Master endpoint ${args.master} was not a REST server. " +
                "Falling back to legacy submission gateway instead.")
              args.useRest = false
              submit(args)
          }
        // In all other modes, just run the main class as prepared
        } else {
          doRunMain()
        }
      }
    

    SparkSubmitArguments 一些参数定义和方法省略

     /**
       * Load arguments from environment variables, Spark properties etc.
       */
      private def loadEnvironmentArguments(): Unit = {
        master = Option(master)
          .orElse(sparkProperties.get("spark.master"))
          .orElse(env.get("MASTER"))
          .orNull
        driverExtraClassPath = Option(driverExtraClassPath)
          .orElse(sparkProperties.get("spark.driver.extraClassPath"))
          .orNull
        driverExtraJavaOptions = Option(driverExtraJavaOptions)
          .orElse(sparkProperties.get("spark.driver.extraJavaOptions"))
          .orNull
        driverExtraLibraryPath = Option(driverExtraLibraryPath)
          .orElse(sparkProperties.get("spark.driver.extraLibraryPath"))
          .orNull
        driverMemory = Option(driverMemory)
          .orElse(sparkProperties.get("spark.driver.memory"))
          .orElse(env.get("SPARK_DRIVER_MEMORY"))
          .orNull
        driverCores = Option(driverCores)
          .orElse(sparkProperties.get("spark.driver.cores"))
          .orNull
        executorMemory = Option(executorMemory)
          .orElse(sparkProperties.get("spark.executor.memory"))
          .orElse(env.get("SPARK_EXECUTOR_MEMORY"))
          .orNull
        executorCores = Option(executorCores)
          .orElse(sparkProperties.get("spark.executor.cores"))
          .orElse(env.get("SPARK_EXECUTOR_CORES"))
          .orNull
        totalExecutorCores = Option(totalExecutorCores)
          .orElse(sparkProperties.get("spark.cores.max"))
          .orNull
        name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull
        jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
        files = Option(files).orElse(sparkProperties.get("spark.files")).orNull
        ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull
        packages = Option(packages).orElse(sparkProperties.get("spark.jars.packages")).orNull
        packagesExclusions = Option(packagesExclusions)
          .orElse(sparkProperties.get("spark.jars.excludes")).orNull
        deployMode = Option(deployMode)
          .orElse(sparkProperties.get("spark.submit.deployMode"))
          .orElse(env.get("DEPLOY_MODE"))
          .orNull
        numExecutors = Option(numExecutors)
          .getOrElse(sparkProperties.get("spark.executor.instances").orNull)
        keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull
        principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull
    
        // Try to set main class from JAR if no --class argument is given
        if (mainClass == null && !isPython && !isR && primaryResource != null) {
          val uri = new URI(primaryResource)
          val uriScheme = uri.getScheme()
    
          uriScheme match {
            case "file" =>
              try {
                val jar = new JarFile(uri.getPath)
                // Note that this might still return null if no main-class is set; we catch that later
                mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
              } catch {
                case e: Exception =>
                  SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR $primaryResource")
              }
            case _ =>
              SparkSubmit.printErrorAndExit(
                s"Cannot load main class from JAR $primaryResource with URI $uriScheme. " +
                "Please specify a class through --class.")
          }
        }
    
        // Global defaults. These should be keep to minimum to avoid confusing behavior.
        master = Option(master).getOrElse("local[*]")
    
        // In YARN mode, app name can be set via SPARK_YARN_APP_NAME (see SPARK-5222)
        if (master.startsWith("yarn")) {
          name = Option(name).orElse(env.get("SPARK_YARN_APP_NAME")).orNull
        }
    
        // Set name from main class if not given
        name = Option(name).orElse(Option(mainClass)).orNull
        if (name == null && primaryResource != null) {
          name = Utils.stripDirectory(primaryResource)
        }
    
        // Action should be SUBMIT unless otherwise specified
        action = Option(action).getOrElse(SUBMIT)
      }
      /** Ensure that required fields exists. Call this only once all defaults are loaded. */
      private def validateArguments(): Unit = {
        action match {
          case SUBMIT => validateSubmitArguments()
          case KILL => validateKillArguments()
          case REQUEST_STATUS => validateStatusRequestArguments()
        }
      }
    
  • 相关阅读:
    html(单纯html标签)
    python 知识点
    浅谈五大Python Web框架
    Python3的变化
    测试开发
    常见的测试用例设计方法
    数字数据类型及其对应转移字符
    char *p 与char p[]
    C语言运算符优先级 详细列表
    软件自动化测试
  • 原文地址:https://www.cnblogs.com/star521/p/9796723.html
Copyright © 2011-2022 走看看