zoukankan      html  css  js  c++  java
  • spark-submit脚本分析

    执行任务

    ./spark-submit 
    --class cn.com.dtmobile.spark.DebugTest 
    --master yarn 
    --deploy-mode client 
    --num-executors 3 
    --executor-cores 2 
    --executor-memory 1G 
    /home/etluser/kong/debugTest/pucchSinr.jar

     ${SPARK_HOME}/bin/spark-submit脚本

    if [ -z "${SPARK_HOME}" ]; then
      source "$(dirname "$0")"/find-spark-home  
    fi
    
    # disable randomized hash for string in Python 3.3+
    export PYTHONHASHSEED=0
    
    exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

    $@表示所有接收的参数:

    $@=

    --class cn.com.dtmobile.spark.DebugTest --master yarn --deploy-mode client --num-executors 3 --executor-cores 2 --executor-memory 1G /home/etluser/kong/debugTest/pucchSinr.jar

    最后exec执行的内容:
    exec=

    /home/etluser/kong/spark/spark-2.3.4-bin/spark-2.3.4-bin-hadoop2.6/bin/spark-class org.apache.spark.deploy.SparkSubmit --class cn.com.dtmobile.spark.DebugTest --master yarn --deploy-mode client --num-executors 3 --executor-cores 2 --executor-memory 1G /home/etluser/kong/debugTest/pucchSinr.jar

     所以传给spark-class的参数为:

     org.apache.spark.deploy.SparkSubmit --class cn.com.dtmobile.spark.DebugTest --master yarn --deploy-mode client --num-executors 3 --executor-cores 2 --executor-memory 1G /home/etluser/kong/debugTest/pucchSinr.jar

    ${SPARK_HOME}/bin/spark-class脚本

    if [ -z "${SPARK_HOME}" ]; then #如果${SPARK_HOME}长度为0
      source "$(dirname "$0")"/find-spark-home
    fi
    
    . "${SPARK_HOME}"/bin/load-spark-env.sh
    
    # Find the java binary
    if [ -n "${JAVA_HOME}" ]; then  #如果${JAVA_HOME}长度不为0
      RUNNER="${JAVA_HOME}/bin/java"
    else
      if [ "$(command -v java)" ]; then
        RUNNER="java"
      else
        echo "JAVA_HOME is not set" >&2
        exit 1
      fi
    fi
    
    # Find Spark jars.
    if [ -d "${SPARK_HOME}/jars" ]; then #如果${SPARK_HOME}/jars文件夹存在的话
      SPARK_JARS_DIR="${SPARK_HOME}/jars"
    else
      SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars" #不存在的话就去assembly目录下找
    fi
    
    if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then #如果SPARK_JARS_DIR目录不存在,并且相关测试目录也为空
      echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
      echo "You need to build Spark with the target "package" before running this program." 1>&2
      exit 1
    else
      LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*" #指定加载时候的类路径为SPARK_JARS_DIR/所有jar包
    fi
    
    # Add the launcher build dir to the classpath if requested.
    if [ -n "$SPARK_PREPEND_CLASSES" ]; then
      LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
    fi
    
    # For tests
    if [[ -n "$SPARK_TESTING" ]]; then
      unset YARN_CONF_DIR
      unset HADOOP_CONF_DIR
    fi
    
    # The launcher library will print arguments separated by a NULL character, to allow arguments with
    # characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
    # an array that will be used to exec the final command.
    #
    # The exit code of the launcher is appended to the output, so the parent shell removes it from the
    # command array and checks the value to see if the launcher succeeded.
    build_command() {
        #$RUNNER为java,调用类路径中的org.apache.spark.launcher.Main类 参数为spark-submit指定的所有参数,在这里调用launcher生成下面jvm command
      "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
      printf "%d" $?
    }
    
    # Turn off posix mode since it does not allow process substitution
    set +o posix
    CMD=()
    while IFS= read -d '' -r ARG; do
      CMD+=("$ARG")
    done < <(build_command "$@")
    
    COUNT=${#CMD[@]}
    LAST=$((COUNT - 1))
    LAUNCHER_EXIT_CODE=${CMD[$LAST]}
    
    # Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes
    # the code that parses the output of the launcher to get confused. In those cases, check if the
    # exit code is an integer, and if it's not, handle it as a special error case.
    if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
      echo "${CMD[@]}" | head -n-1 1>&2
      exit 1
    fi
    
    if [ $LAUNCHER_EXIT_CODE != 0 ]; then
      exit $LAUNCHER_EXIT_CODE
    fi
    
    CMD=("${CMD[@]:0:$LAST}")
    exec "${CMD[@]}"

     最后exec执行的CMD为:

    ${CMD[@]}=

    /usr/lib/java/jdk1.8.0_144/bin/java -cp 
    /home/etluser/kong/spark/spark-2.3.4-bin/spark-2.3.4-bin-hadoop2.6/conf/:/home/etluser/kong/spark/spark-2.3.4-bin/spark-2.3.4-bin-hadoop2.6/jars/* 
    -Xmx1g 
    org.apache.spark.deploy.SparkSubmit 
    --master yarn 
    --deploy-mode client 
    --class cn.com.dtmobile.spark.DebugTest 
    --num-executors 3 
    --executor-cores 2 
    --executor-memory 1G 
    /home/etluser/kong/debugTest/pucchSinr.jar

    也就是通过上面launcher生成的cmd


    org.apache.spark.launcher.Main类

      public static void main(String[] argsArray) throws Exception {
        checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");
        //argsArray:spark-submit脚本传给spark-class的参数
        List<String> args = new ArrayList<>(Arrays.asList(argsArray));
        String className = args.remove(0); //这里className为org.apache.spark.deploy.SparkSubmit
    
        boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
        AbstractCommandBuilder builder;
        if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
          try {
            builder = new SparkSubmitCommandBuilder(args); //通过SparkSubmitCommandBuilder来生成cmd
          } catch (IllegalArgumentException e) {
            printLaunchCommand = false;
            System.err.println("Error: " + e.getMessage());
            System.err.println();
    
            MainClassOptionParser parser = new MainClassOptionParser();
            try {
              parser.parse(args);
            } catch (Exception ignored) {
              // Ignore parsing exceptions.
            }
    
            List<String> help = new ArrayList<>();
            if (parser.className != null) {
              help.add(parser.CLASS);
              help.add(parser.className);
            }
            help.add(parser.USAGE_ERROR);
            builder = new SparkSubmitCommandBuilder(help);
          }
        } else {
          builder = new SparkClassCommandBuilder(className, args);
        }
    
        Map<String, String> env = new HashMap<>();
        List<String> cmd = builder.buildCommand(env);//调用buildCommand生成cmd
        if (printLaunchCommand) {
          System.err.println("Spark Command: " + join(" ", cmd));
          System.err.println("========================================");
        }
    
        if (isWindows()) {
          System.out.println(prepareWindowsCommand(cmd, env));
        } else {
          // In bash, use NULL as the arg separator since it cannot be used in an argument.
          List<String> bashCmd = prepareBashCommand(cmd, env);
          for (String c : bashCmd) {
            System.out.print(c);
            System.out.print('');
          }
        }
      }

    调用SparkSubmitCommandBuilder来生成Spark Submit命令,如下:

     org.apache.spark.launcher.SparkSubmitCommandBuilder

    //创建SparkSubmitCommandBuilder对象时候,对应的构造方法  
    SparkSubmitCommandBuilder(List<String> args) { this.allowsMixedArguments = false; this.sparkArgs = new ArrayList<>(); boolean isExample = false; List<String> submitArgs = args; if (args.size() > 0) { switch (args.get(0)) { case PYSPARK_SHELL://对应"pyspark-shell-main" this.allowsMixedArguments = true; appResource = PYSPARK_SHELL; submitArgs = args.subList(1, args.size()); break; case SPARKR_SHELL://对应"sparkr-shell-main" this.allowsMixedArguments = true; appResource = SPARKR_SHELL; submitArgs = args.subList(1, args.size()); break; case RUN_EXAMPLE: isExample = true; submitArgs = args.subList(1, args.size()); } this.isExample = isExample; OptionParser parser = new OptionParser();//为SparkSubmitCommandBuilder的内部类,并且继承了SparkSubmitOptionParser类 parser.parse(submitArgs);//parse方法为optionParser的父类方法,用于解析spark-submit的命令行参数 this.isAppResourceReq = parser.isAppResourceReq; } else { this.isExample = isExample; this.isAppResourceReq = false; } }

    SparkSubmitOptionParser 类

    org.apache.spark.launcher.SparkSubmitOptionParser

    package org.apache.spark.launcher;
    
    import java.util.List;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    /**
     * Parser for spark-submit command line options.
     * <p>
     * This class encapsulates the parsing code for spark-submit command line options, so that there
     * is a single list of options that needs to be maintained (well, sort of, but it makes it harder
     * to break things).
     */
    class SparkSubmitOptionParser {
    
      // The following constants define the "main" name for the available options. They're defined
      // to avoid copy & paste of the raw strings where they're needed.
      //
      // The fields are not static so that they're exposed to Scala code that uses this class. See
      // SparkSubmitArguments.scala. That is also why this class is not abstract - to allow code to
      // easily use these constants without having to create dummy implementations of this class.
      protected final String CLASS = "--class";
      protected final String CONF = "--conf";
      protected final String DEPLOY_MODE = "--deploy-mode";
      protected final String DRIVER_CLASS_PATH = "--driver-class-path";
      protected final String DRIVER_CORES = "--driver-cores";
      protected final String DRIVER_JAVA_OPTIONS =  "--driver-java-options";
      protected final String DRIVER_LIBRARY_PATH = "--driver-library-path";
      protected final String DRIVER_MEMORY = "--driver-memory";
      protected final String EXECUTOR_MEMORY = "--executor-memory";
      protected final String FILES = "--files";
      protected final String JARS = "--jars";
      protected final String KILL_SUBMISSION = "--kill";
      protected final String MASTER = "--master";
      protected final String NAME = "--name";
      protected final String PACKAGES = "--packages";
      protected final String PACKAGES_EXCLUDE = "--exclude-packages";
      protected final String PROPERTIES_FILE = "--properties-file";
      protected final String PROXY_USER = "--proxy-user";
      protected final String PY_FILES = "--py-files";
      protected final String REPOSITORIES = "--repositories";
      protected final String STATUS = "--status";
      protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores";
    
      // Options that do not take arguments.
      protected final String HELP = "--help";
      protected final String SUPERVISE = "--supervise";
      protected final String USAGE_ERROR = "--usage-error";
      protected final String VERBOSE = "--verbose";
      protected final String VERSION = "--version";
    
      // Standalone-only options.
    
      // YARN-only options.
      protected final String ARCHIVES = "--archives";
      protected final String EXECUTOR_CORES = "--executor-cores";
      protected final String KEYTAB = "--keytab";
      protected final String NUM_EXECUTORS = "--num-executors";
      protected final String PRINCIPAL = "--principal";
      protected final String QUEUE = "--queue";
    
      /**
       * This is the canonical list of spark-submit options. Each entry in the array contains the
       * different aliases for the same option; the first element of each entry is the "official"
       * name of the option, passed to {@link #handle(String, String)}.
       * <p>
       * Options not listed here nor in the "switch" list below will result in a call to
       * {@link #handleUnknown(String)}.
       * <p>
       * These two arrays are visible for tests.
       */
      final String[][] opts = {
        { ARCHIVES },
        { CLASS },
        { CONF, "-c" },
        { DEPLOY_MODE },
        { DRIVER_CLASS_PATH },
        { DRIVER_CORES },
        { DRIVER_JAVA_OPTIONS },
        { DRIVER_LIBRARY_PATH },
        { DRIVER_MEMORY },
        { EXECUTOR_CORES },
        { EXECUTOR_MEMORY },
        { FILES },
        { JARS },
        { KEYTAB },
        { KILL_SUBMISSION },
        { MASTER },
        { NAME },
        { NUM_EXECUTORS },
        { PACKAGES },
        { PACKAGES_EXCLUDE },
        { PRINCIPAL },
        { PROPERTIES_FILE },
        { PROXY_USER },
        { PY_FILES },
        { QUEUE },
        { REPOSITORIES },
        { STATUS },
        { TOTAL_EXECUTOR_CORES },
      };
    
      /**
       * List of switches (command line options that do not take parameters) recognized by spark-submit.
       */
      final String[][] switches = {
        { HELP, "-h" },
        { SUPERVISE },
        { USAGE_ERROR },
        { VERBOSE, "-v" },
        { VERSION },
      };
    
      /**
       * Parse a list of spark-submit command line options.
       * <p>
       * See SparkSubmitArguments.scala for a more formal description of available options.
       *
       * @throws IllegalArgumentException If an error is found during parsing.
       */
      protected final void parse(List<String> args) {
        Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");
    
        int idx = 0;
        for (idx = 0; idx < args.size(); idx++) {
          String arg = args.get(idx);
          String value = null;
    
          Matcher m = eqSeparatedOpt.matcher(arg);
          if (m.matches()) {
            arg = m.group(1);
            value = m.group(2);
          }
    
          // Look for options with a value.主要用于解析opts里定义的参数,比如--class --deploy-mode --num-executors等等之类的参数
          String name = findCliOption(arg, opts);
          if (name != null) {
            if (value == null) {
              if (idx == args.size() - 1) {
                throw new IllegalArgumentException(
                    String.format("Missing argument for option '%s'.", arg));
              }
              idx++;
              value = args.get(idx);
            }
            if (!handle(name, value)) { //handle方法调用的是OptionParser类重写之后的方法,将spark-submit放进来的参数对应值赋到spark对应的变量中
              break;
            }
            continue;
          }
    
          // Look for a switch.主要用于解析switches类的参数,比如--help --verbose等等这类的
          name = findCliOption(arg, switches);
          if (name != null) {
            if (!handle(name, null)) {
              break;
            }
            continue;
          }
    
          if (!handleUnknown(arg)) {
            break;
          }
        }
    
        if (idx < args.size()) {
          idx++;
        }
        handleExtraArgs(args.subList(idx, args.size()));
      }
    
      /**
       * Callback for when an option with an argument is parsed.
       *
       * @param opt The long name of the cli option (might differ from actual command line).
       * @param value The value. This will be <i>null</i> if the option does not take a value.
       * @return Whether to continue parsing the argument list.
       */
      protected boolean handle(String opt, String value) {
        throw new UnsupportedOperationException();
      }
    
      /**
       * Callback for when an unrecognized option is parsed.
       *
       * @param opt Unrecognized option from the command line.
       * @return Whether to continue parsing the argument list.
       */
      protected boolean handleUnknown(String opt) {
        throw new UnsupportedOperationException();
      }
    
      /**
       * Callback for remaining command line arguments after either {@link #handle(String, String)} or
       * {@link #handleUnknown(String)} return "false". This will be called at the end of parsing even
       * when there are no remaining arguments.
       *
       * @param extra List of remaining arguments.
       */
      protected void handleExtraArgs(List<String> extra) {
        throw new UnsupportedOperationException();
      }
     
      private String findCliOption(String name, String[][] available) {
        for (String[] candidates : available) {
          for (String candidate : candidates) {
            if (candidate.equals(name)) {
              return candidates[0];
            }
          }
        }
        return null;
      }
    
    }

     调用buildCommand方法:

    org.apache.spark.launcher.SparkSubmitCommandBuilder#buildCommand

      public List<String> buildCommand(Map<String, String> env)
          throws IOException, IllegalArgumentException {
        if (PYSPARK_SHELL.equals(appResource) && isAppResourceReq) {
          return buildPySparkShellCommand(env);
        } else if (SPARKR_SHELL.equals(appResource) && isAppResourceReq) {
          return buildSparkRCommand(env);
        } else {
          return buildSparkSubmitCommand(env);//主要在这里
        }
      }

     org.apache.spark.launcher.SparkSubmitCommandBuilder#buildSparkSubmitCommand

      private List<String> buildSparkSubmitCommand(Map<String, String> env)
          throws IOException, IllegalArgumentException {
        // Load the properties file and check whether spark-submit will be running the app's driver
        // or just launching a cluster app. When running the driver, the JVM's argument will be
        // modified to cover the driver's configuration.
        Map<String, String> config = getEffectiveConfig();
        boolean isClientMode = isClientMode(config);
        String extraClassPath = isClientMode ? config.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH) : null;
    
        List<String> cmd = buildJavaCommand(extraClassPath);
        // Take Thrift Server as daemon
        if (isThriftServer(mainClass)) {
          addOptionString(cmd, System.getenv("SPARK_DAEMON_JAVA_OPTS"));
        }
        addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));
    
        // We don't want the client to specify Xmx. These have to be set by their corresponding
        // memory flag --driver-memory or configuration entry spark.driver.memory
        String driverExtraJavaOptions = config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
        if (!isEmpty(driverExtraJavaOptions) && driverExtraJavaOptions.contains("Xmx")) {
          String msg = String.format("Not allowed to specify max heap(Xmx) memory settings through " +
                       "java options (was %s). Use the corresponding --driver-memory or " +
                       "spark.driver.memory configuration instead.", driverExtraJavaOptions);
          throw new IllegalArgumentException(msg);
        }
    
        if (isClientMode) {
          // Figuring out where the memory value come from is a little tricky due to precedence.
          // Precedence is observed in the following order:
          // - explicit configuration (setConf()), which also covers --driver-memory cli argument.
          // - properties file.
          // - SPARK_DRIVER_MEMORY env variable
          // - SPARK_MEM env variable
          // - default value (1g)
          // Take Thrift Server as daemon
          String tsMemory =
            isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null;
          String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY),
            System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
          cmd.add("-Xmx" + memory);
          addOptionString(cmd, driverExtraJavaOptions);
          mergeEnvPathList(env, getLibPathEnvName(),
            config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
        }
    
        cmd.add("org.apache.spark.deploy.SparkSubmit");
        cmd.addAll(buildSparkSubmitArgs());
        return cmd;
      }

     脚本中shell相关内容:

    1.set -o posix

    set命令是shell解释器的一个内置命令,用来设置shell解释器的属性,从而能够控制shell解释器的一些行为。

    在set命令中,选项前面跟着 - 号表示开启这个选项, + 表示关闭这个选项。

    POSIX,Portable Operating System Interface。
    是UNIX系统的一个设计标准,很多类UNIX系统也在支持兼容这个标准,如Linux。
    遵循这个标准的好处是软件可以跨平台。
    所以windows也支持就很容易理解了,那么多优秀的开源软件,支持了这个这些软件就可能有windows版本,就可以完善丰富windows下的软件。
    set -o posix:开启bash的posix模式。

    2.command -v java

    command [-pVv] command [arg ...]

    用command指定可取消正常的shell function寻找。只有内建命令及在PATH中找得到的才会被执行。

    "-p"选项,搜寻命令的方式是用PATH来找。"-V"或"-v"选项,会显示出该命令的一些简约描述。


    4.read -d

    -d :表示delimiter,即定界符,一般情况下是以IFS为参数的间隔,但是通过-d,我们可以定义一直读到出现执行的字符位置。例如read –d madfds value,读到有m的字符的时候就不在继续向后读,例如输入为 hello m,有效值为“hello”,请注意m前面的空格等会被删除。这种方式可以输入多个字符串,例如定义“.”作为结符号等等

    read命令 -n(不换行) -p(提示语句) -n(字符个数) -t(等待时间) -s(不回显)

  • 相关阅读:
    使用hugo在gitee上写blog
    golang初识2
    golang初识1
    install go on ubuntu
    sql优化的几种方式
    UpdatePanel 无刷新弹出窗口
    .net web 点击链接在页面指定位置显示DIV的问题
    重建主键
    sql 日期时间格式转换
    UpdatePanel无法直接弹出窗口的解决
  • 原文地址:https://www.cnblogs.com/zz-ksw/p/11940737.html
Copyright © 2011-2022 走看看