zoukankan      html  css  js  c++  java
  • 01 spark2.11-作业提交submit源码分析

    spark2.11-作业提交submit源码分析

    分析spark的第一步spark-submit

    SparkSubmit

    object SparkSubmit {
    
      // Cluster managers
      private val YARN = 1
      private val STANDALONE = 2
      private val MESOS = 4
      private val LOCAL = 8
      private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
    
      // Deploy modes
      private val CLIENT = 1
      private val CLUSTER = 2
      private val ALL_DEPLOY_MODES = CLIENT | CLUSTER
    
      // Special primary resource names that represent shells rather than application jars.
      private val SPARK_SHELL = "spark-shell"
      private val PYSPARK_SHELL = "pyspark-shell"
      private val SPARKR_SHELL = "sparkr-shell"
      private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip"
      private val R_PACKAGE_ARCHIVE = "rpkg.zip"
    
      private val CLASS_NOT_FOUND_EXIT_STATUS = 101
    
      // scalastyle:off println
      // Exposed for testing
      private[spark] var exitFn: Int => Unit = (exitCode: Int) => System.exit(exitCode)
      private[spark] var printStream: PrintStream = System.err
      private[spark] def printWarning(str: String): Unit = printStream.println("Warning: " + str)
      private[spark] def printErrorAndExit(str: String): Unit = {
        printStream.println("Error: " + str)
        printStream.println("Run with --help for usage help or --verbose for debug output")
        exitFn(1)
      }
      private[spark] def printVersionAndExit(): Unit = {
        printStream.println("""Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _ / _ / _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_   version %s
          /_/
                            """.format(SPARK_VERSION))
        printStream.println("Using Scala %s, %s, %s".format(
          Properties.versionString, Properties.javaVmName, Properties.javaVersion))
        printStream.println("Branch %s".format(SPARK_BRANCH))
        printStream.println("Compiled by user %s on %s".format(SPARK_BUILD_USER, SPARK_BUILD_DATE))
        printStream.println("Revision %s".format(SPARK_REVISION))
        printStream.println("Url %s".format(SPARK_REPO_URL))
        printStream.println("Type --help for more information.")
        exitFn(0)
      }
      // scalastyle:on println
    	/**
    	程序入口
    	*/
      def main(args: Array[String]): Unit = {
        val appArgs = new SparkSubmitArguments(args)
        if (appArgs.verbose) {
          // scalastyle:off println
          printStream.println(appArgs)
          // scalastyle:on println
        }
        appArgs.action match {
          case SparkSubmitAction.SUBMIT => submit(appArgs)
          case SparkSubmitAction.KILL => kill(appArgs)
          case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
        }
      }
    
      /**
       * Kill an existing submission using the REST protocol. Standalone and Mesos cluster mode only.
       */
      private def kill(args: SparkSubmitArguments): Unit = {
        new RestSubmissionClient(args.master)
          .killSubmission(args.submissionToKill)
      }
    
      /**
       * Request the status of an existing submission using the REST protocol.
       * Standalone and Mesos cluster mode only.
       */
      private def requestStatus(args: SparkSubmitArguments): Unit = {
        new RestSubmissionClient(args.master)
          .requestSubmissionStatus(args.submissionToRequestStatusFor)
      }
    
      /**
       * Submit the application using the provided parameters.
       *
       * This runs in two steps. First, we prepare the launch environment by setting up
       * the appropriate classpath, system properties, and application arguments for
       * running the child main class based on the cluster manager and the deploy mode.
       * Second, we use this launch environment to invoke the main method of the child
       * main class.
       提交应用程序
       */
      @tailrec
      private def submit(args: SparkSubmitArguments): Unit = {
        val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
    
        def doRunMain(): Unit = {
          if (args.proxyUser != null) {
            val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
              UserGroupInformation.getCurrentUser())
            try {
              proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
                override def run(): Unit = {
                  runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
                }
              })
            } catch {
              case e: Exception =>
                // Hadoop's AuthorizationException suppresses the exception's stack trace, which
                // makes the message printed to the output by the JVM not very helpful. Instead,
                // detect exceptions with empty stack traces here, and treat them differently.
                if (e.getStackTrace().length == 0) {
                  // scalastyle:off println
                  printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
                  // scalastyle:on println
                  exitFn(1)
                } else {
                  throw e
                }
            }
          } else {
            runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
          }
        }
    
         // In standalone cluster mode, there are two submission gateways:
         //   (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
         //   (2) The new REST-based gateway introduced in Spark 1.3
         // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
         // to use the legacy gateway if the master endpoint turns out to be not a REST server.
        if (args.isStandaloneCluster && args.useRest) {
          try {
            // scalastyle:off println
            printStream.println("Running Spark using the REST application submission protocol.")
            // scalastyle:on println
            doRunMain()
          } catch {
            // Fail over to use the legacy submission gateway
            case e: SubmitRestConnectionException =>
              printWarning(s"Master endpoint ${args.master} was not a REST server. " +
                "Falling back to legacy submission gateway instead.")
              args.useRest = false
              submit(args)
          }
        // In all other modes, just run the main class as prepared
        } else {
          doRunMain()
        }
      }
    

    SparkSubmitArguments 一些参数定义和方法省略

     /**
       * Load arguments from environment variables, Spark properties etc.
       */
      private def loadEnvironmentArguments(): Unit = {
        master = Option(master)
          .orElse(sparkProperties.get("spark.master"))
          .orElse(env.get("MASTER"))
          .orNull
        driverExtraClassPath = Option(driverExtraClassPath)
          .orElse(sparkProperties.get("spark.driver.extraClassPath"))
          .orNull
        driverExtraJavaOptions = Option(driverExtraJavaOptions)
          .orElse(sparkProperties.get("spark.driver.extraJavaOptions"))
          .orNull
        driverExtraLibraryPath = Option(driverExtraLibraryPath)
          .orElse(sparkProperties.get("spark.driver.extraLibraryPath"))
          .orNull
        driverMemory = Option(driverMemory)
          .orElse(sparkProperties.get("spark.driver.memory"))
          .orElse(env.get("SPARK_DRIVER_MEMORY"))
          .orNull
        driverCores = Option(driverCores)
          .orElse(sparkProperties.get("spark.driver.cores"))
          .orNull
        executorMemory = Option(executorMemory)
          .orElse(sparkProperties.get("spark.executor.memory"))
          .orElse(env.get("SPARK_EXECUTOR_MEMORY"))
          .orNull
        executorCores = Option(executorCores)
          .orElse(sparkProperties.get("spark.executor.cores"))
          .orElse(env.get("SPARK_EXECUTOR_CORES"))
          .orNull
        totalExecutorCores = Option(totalExecutorCores)
          .orElse(sparkProperties.get("spark.cores.max"))
          .orNull
        name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull
        jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
        files = Option(files).orElse(sparkProperties.get("spark.files")).orNull
        ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull
        packages = Option(packages).orElse(sparkProperties.get("spark.jars.packages")).orNull
        packagesExclusions = Option(packagesExclusions)
          .orElse(sparkProperties.get("spark.jars.excludes")).orNull
        deployMode = Option(deployMode)
          .orElse(sparkProperties.get("spark.submit.deployMode"))
          .orElse(env.get("DEPLOY_MODE"))
          .orNull
        numExecutors = Option(numExecutors)
          .getOrElse(sparkProperties.get("spark.executor.instances").orNull)
        keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull
        principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull
    
        // Try to set main class from JAR if no --class argument is given
        if (mainClass == null && !isPython && !isR && primaryResource != null) {
          val uri = new URI(primaryResource)
          val uriScheme = uri.getScheme()
    
          uriScheme match {
            case "file" =>
              try {
                val jar = new JarFile(uri.getPath)
                // Note that this might still return null if no main-class is set; we catch that later
                mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
              } catch {
                case e: Exception =>
                  SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR $primaryResource")
              }
            case _ =>
              SparkSubmit.printErrorAndExit(
                s"Cannot load main class from JAR $primaryResource with URI $uriScheme. " +
                "Please specify a class through --class.")
          }
        }
    
        // Global defaults. These should be keep to minimum to avoid confusing behavior.
        master = Option(master).getOrElse("local[*]")
    
        // In YARN mode, app name can be set via SPARK_YARN_APP_NAME (see SPARK-5222)
        if (master.startsWith("yarn")) {
          name = Option(name).orElse(env.get("SPARK_YARN_APP_NAME")).orNull
        }
    
        // Set name from main class if not given
        name = Option(name).orElse(Option(mainClass)).orNull
        if (name == null && primaryResource != null) {
          name = Utils.stripDirectory(primaryResource)
        }
    
        // Action should be SUBMIT unless otherwise specified
        action = Option(action).getOrElse(SUBMIT)
      }
      /** Ensure that required fields exists. Call this only once all defaults are loaded. */
      private def validateArguments(): Unit = {
        action match {
          case SUBMIT => validateSubmitArguments()
          case KILL => validateKillArguments()
          case REQUEST_STATUS => validateStatusRequestArguments()
        }
      }
    
  • 相关阅读:
    oracle的安装与plsql的环境配置
    Working with MSDTC
    soapui-java.lang.Exception Failed to load url
    Oracle 一个owner访问另一个owner的table,不加owner
    Call API relation to TLS 1.2
    Call API HTTP header Authorization: Basic
    VS2008 .csproj cannot be opened.The project type is not supported by this installat
    The changes couldn't be completed.Please reboot your computer and try again.
    Create DB Table View Procedure
    DB Change
  • 原文地址:https://www.cnblogs.com/star521/p/9796723.html
Copyright © 2011-2022 走看看