zoukankan      html  css  js  c++  java
  • 【原创】大数据基础之Spark(1)Spark Submit即Spark任务提交过程

    Spark2.1.1

    一 Spark Submit本地解析

    1.1 现象

    提交命令:

    spark-submit --master local[10] --driver-memory 30g --class app.package.AppClass app-1.0.jar

    进程:

    hadoop 225653 0.0 0.0 11256 364 ? S Aug24 0:00 bash /$spark-dir/bin/spark-class org.apache.spark.deploy.SparkSubmit --master local[10] --driver-memory 30g --class app.package.AppClass app-1.0.jar

    hadoop 225654 0.0 0.0 34424 2860 ? Sl Aug24 0:00 /$jdk_dir/bin/java -Xmx128m -cp /spark-dir/jars/* org.apache.spark.launcher.Main org.apache.spark.deploy.SparkSubmit --master local[10] --driver-memory 30g --class app.package.AppClass app-1.0.jar

    1.2 执行过程

    1.2.1 脚本执行

    -bash-4.1$ cat bin/spark-submit
    #!/usr/bin/env bash

    if [ -z "${SPARK_HOME}" ]; then
    source "$(dirname "$0")"/find-spark-home
    fi

    # disable randomized hash for string in Python 3.3+
    export PYTHONHASHSEED=0

    exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

    注释:这里执行了另一个脚本spark-class,具体如下:

    -bash-4.1$ cat bin/spark-class

    ...

    build_command() {
    "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
    printf "%d" $?
    }

    CMD=()
    while IFS= read -d '' -r ARG; do
    CMD+=("$ARG")
    done < <(build_command "$@")

    ...

    CMD=("${CMD[@]:0:$LAST}")
    exec "${CMD[@]}"

    注释:这里执行java class: org.apache.spark.launcher.Main,并传入参数,具体如下:

    1.2.2 代码执行

    org.apache.spark.launcher.Main
    ...
    
            builder = new SparkSubmitCommandBuilder(help);
    
    ...
    
        List<String> cmd = builder.buildCommand(env);
    
    ...
    
          List<String> bashCmd = prepareBashCommand(cmd, env);
    
          for (String c : bashCmd) {
    
            System.out.print(c);
    
            System.out.print('');
    
          }
    
    ...

    注释:其中会调用SparkSubmitCommandBuilder来生成Spark Submit命令,具体如下:

    org.apache.spark.launcher.SparkSubmitCommandBuilder
    ...
    
      private List<String> buildSparkSubmitCommand(Map<String, String> env)
    ...
        addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));
        addOptionString(cmd, System.getenv("SPARK_JAVA_OPTS"));
    ...
        String driverExtraJavaOptions = config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
    ...
        if (isClientMode) {
    ...
          addOptionString(cmd, driverExtraJavaOptions);
    ...
        }
    ...
    
        addPermGenSizeOpt(cmd);
    
        cmd.add("org.apache.spark.deploy.SparkSubmit");
    
        cmd.addAll(buildSparkSubmitArgs());
    
        return cmd;
    
    ...

    注释:这里创建了本地命令,其中java class:org.apache.spark.deploy.SparkSubmit,同时会把各种JavaOptions放到启动命令里(比如SPARK_JAVA_OPTS,DRIVER_EXTRA_JAVA_OPTIONS等),具体如下:

    org.apache.spark.deploy.SparkSubmit
      def main(args: Array[String]): Unit = {
    
        val appArgs = new SparkSubmitArguments(args) //parse command line parameter
    
        if (appArgs.verbose) {
    
          // scalastyle:off println
    
          printStream.println(appArgs)
    
          // scalastyle:on println
    
        }
    
        appArgs.action match {
    
          case SparkSubmitAction.SUBMIT => submit(appArgs)
    
          case SparkSubmitAction.KILL => kill(appArgs)
    
          case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    
        }
    
      }
    
     
    
        private def submit(args: SparkSubmitArguments): Unit = {
    
        val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) //merge all parameters from: command line, properties file, system property, etc...
    
     
    
        def doRunMain(): Unit = {
    
          ...
    
            runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
    
          ...
    
        }
    
             ...
    
     
    
      private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)
    
          : (Seq[String], Seq[String], Map[String, String], String) = {
    
        if (deployMode == CLIENT || isYarnCluster) {
    
          childMainClass = args.mainClass
    
          ...
    
        if (isYarnCluster) {
    
          childMainClass = "org.apache.spark.deploy.yarn.Client"
    
          ...
    
     
    
      private def runMain(
    
          childArgs: Seq[String],
    
          childClasspath: Seq[String],
    
          sysProps: Map[String, String],
    
          childMainClass: String,
    
          verbose: Boolean): Unit = {
    
        // scalastyle:off println
    
        if (verbose) {
    
          printStream.println(s"Main class:
    $childMainClass")
    
          printStream.println(s"Arguments:
    ${childArgs.mkString("
    ")}")
    
          printStream.println(s"System properties:
    ${sysProps.mkString("
    ")}")
    
          printStream.println(s"Classpath elements:
    ${childClasspath.mkString("
    ")}")
    
          printStream.println("
    ")
    
        }
    
        // scalastyle:on println
    
     
    
        val loader =
    
          if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
    
            new ChildFirstURLClassLoader(new Array[URL](0),
    
              Thread.currentThread.getContextClassLoader)
    
          } else {
    
            new MutableURLClassLoader(new Array[URL](0),
    
              Thread.currentThread.getContextClassLoader)
    
          }
    
        Thread.currentThread.setContextClassLoader(loader)
    
     
    
        for (jar <- childClasspath) {
    
          addJarToClasspath(jar, loader)
    
        }
    
     
    
        for ((key, value) <- sysProps) {
    
          System.setProperty(key, value)
    
        }
    
     
    
        var mainClass: Class[_] = null
    
     
    
        try {
    
          mainClass = Utils.classForName(childMainClass)
    
        } catch {
    
        ...
    
        val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
    
        ...
    
          mainMethod.invoke(null, childArgs.toArray)
    
          ...

    注释:这里首先会解析命令行参数,比如mainClass,准备运行环境包括System Property以及classpath等,然后使用一个新的classloader:ChildFirstURLClassLoader来加载用户的mainClass,然后反射调用mainClass的main方法,这样用户的app.package.AppClass的main方法就开始执行了。

    org.apache.spark.SparkConf
    class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {
    
     
    
      import SparkConf._
    
     
    
      /** Create a SparkConf that loads defaults from system properties and the classpath */
    
      def this() = this(true)
    
    ...
    
      if (loadDefaults) {
    
        loadFromSystemProperties(false)
    
      }
    
     
    
      private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
    
        // Load any spark.* system properties
    
        for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
    
          set(key, value, silent)
    
        }
    
        this
    
      }

    注释:这里可以看到spark是怎样加载配置的

    1.2.3 --verbose

    spark-submit --master local[*] --class app.package.AppClass --jars /$other-dir/other.jar  --driver-memory 1g --verbose app-1.0.jar

    输出示例:

    Main class:
    app.package.AppClass
    Arguments:

    System properties:
    spark.executor.logs.rolling.maxSize -> 1073741824
    spark.driver.memory -> 1g
    spark.driver.extraLibraryPath -> /$hadoop-dir/lib/native
    spark.eventLog.enabled -> true
    spark.eventLog.compress -> true
    spark.executor.logs.rolling.time.interval -> daily
    SPARK_SUBMIT -> true
    spark.app.name -> app.package.AppClass
    spark.driver.extraJavaOptions -> -XX:+PrintGCDetails -XX:+UseG1GC -XX:G1HeapRegionSize=32M -XX:+UseGCOverheadLimit -XX:+ExplicitGCInvokesConcurrent -XX:+HeapDumpOnOutOfMemoryError -XX:-UseCompressedClassPointers -XX:CompressedClassSpaceSize=3G -XX:+PrintGCTimeStamps -Xloggc:/export/Logs/hadoop/g1gc.log
    spark.jars -> file:/$other-dir/other.jar
    spark.sql.adaptive.enabled -> true
    spark.submit.deployMode -> client
    spark.executor.logs.rolling.maxRetainedFiles -> 10
    spark.executor.extraClassPath -> /usr/lib/hadoop/lib/hadoop-lzo.jar
    spark.eventLog.dir -> hdfs://myhdfs/spark/history
    spark.master -> local[*]
    spark.sql.crossJoin.enabled -> true
    spark.driver.extraClassPath -> /usr/lib/hadoop/lib/hadoop-lzo.jar
    Classpath elements:
    file:/$other-dir/other.jar
    file:/app-1.0.jar

    启动时添加--verbose参数后,可以输出所有的运行时信息,有助于判断问题。

  • 相关阅读:
    地图校正方法心得
    投影的心得点滴
    android 打包 apk keystore
    scp命令详解
    ubuntu11.10真机调试nopermissions
    android adb server is out of date
    ubuntu删除默认jdk
    android 运行 错误 总结
    android file .apk is not a valid zip file adb install
    ubuntu系统目录结构
  • 原文地址:https://www.cnblogs.com/barneywill/p/9820684.html
Copyright © 2011-2022 走看看