zoukankan      html  css  js  c++  java
  • Sparksubmit执行流程,了解一下

    摘要:本文主要是通过Spark代码走读来了解spark-submit的流程。

    1.任务命令提交

    我们在进行Spark任务提交时,会使用“spark-submit -class .....”样式的命令来提交任务,该命令为Spark目录下的shell脚本。它的作用是查询spark-home,调用spark-class命令。

    if [ -z "${SPARK_HOME}" ]; then
      source "$(dirname "$0")"/find-spark-home
    fi
    
    # disable randomized hash for string in Python 3.3+
    export PYTHONHASHSEED=0
    
    exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

    随后会执行spark-class命令,以SparkSubmit类为参数进行任务向Spark程序的提交,而Spark-class的shell脚本主要是执行以下几个步骤:

    (1)加载spark环境参数,从conf中获取

    if [ -z "${SPARK_HOME}" ]; then
      source "$(dirname "$0")"/find-spark-home
    fi
    
    . "${SPARK_HOME}"/bin/load-spark-env.sh
    
    # 寻找javahome
    if [ -n "${JAVA_HOME}" ]; then
      RUNNER="${JAVA_HOME}/bin/java"
    else
      if [ "$(command -v java)" ]; then
        RUNNER="java"
      else
        echo "JAVA_HOME is not set" >&2
        exit 1
      fi
    fi

    (2)载入java,jar包等

    # Find Spark jars.
    if [ -d "${SPARK_HOME}/jars" ]; then
      SPARK_JARS_DIR="${SPARK_HOME}/jars"
    else
      SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
    fi

    (3)调用org.apache.spark.launcher中的Main进行参数注入

    build_command() {
      "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
      printf "%d\0" $?
    }

    (4)shell脚本监测任务执行状态,是否完成或者退出任务,通过执行返回值,判断是否结束

    if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
      echo "${CMD[@]}" | head -n-1 1>&2
      exit 1
    fi
    
    if [ $LAUNCHER_EXIT_CODE != 0 ]; then
      exit $LAUNCHER_EXIT_CODE
    fi
    
    CMD=("${CMD[@]:0:$LAST}")
    exec "${CMD[@]}"

    2.任务检测及提交任务到Spark

    检测执行模式(class or submit)构建cmd,在submit中进行参数的检查(SparkSubmitOptionParser),构建命令行并且打印回spark-class中,最后调用exec执行spark命令行提交任务。通过组装而成cmd内容如下所示:

    /usr/local/java/jdk1.8.0_91/bin/java-cp
    /data/spark-1.6.0-bin-hadoop2.6/conf/:/data/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/data/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/data/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/data/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/data/hadoop-2.6.5/etc/hadoop/
    -Xms1g-Xmx1g -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=1234
    org.apache.spark.deploy.SparkSubmit
    --classorg.apache.spark.repl.Main
    --nameSpark shell
    --masterspark://localhost:7077
    --verbose/tool/jarDir/maven_scala-1.0-SNAPSHOT.jar

    3.SparkSubmit函数的执行

    (1)Spark任务在提交之后会执行SparkSubmit中的main方法

     def main(args: Array[String]): Unit = {
        val submit = new SparkSubmit()
        submit.doSubmit(args)
      }

    (2)doSubmit()对log进行初始化,添加spark任务参数,通过参数类型执行任务:

     def doSubmit(args: Array[String]): Unit = {
        // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
        // be reset before the application starts.
        val uninitLog = initializeLogIfNecessary(true, silent = true)
    
        val appArgs = parseArguments(args)
        if (appArgs.verbose) {
          logInfo(appArgs.toString)
        }
        appArgs.action match {
          case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
          case SparkSubmitAction.KILL => kill(appArgs)
          case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
          case SparkSubmitAction.PRINT_VERSION => printVersion()
        }
      }

    SUBMIT:使用提供的参数提交application

    KILL(Standalone and Mesos cluster mode only):通过REST协议终止任务

    REQUEST_STATUS(Standalone and Mesos cluster mode only):通过REST协议请求已经提交任务的状态

    PRINT_VERSION:对log输出版本信息

    (3)调用submit函数:

    def doRunMain(): Unit = {
          if (args.proxyUser != null) {
            val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
              UserGroupInformation.getCurrentUser())
            try {
              proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
                override def run(): Unit = {
                  runMain(args, uninitLog)
                }
              })
            } catch {
              case e: Exception =>
                // Hadoop's AuthorizationException suppresses the exception's stack trace, which
                // makes the message printed to the output by the JVM not very helpful. Instead,
                // detect exceptions with empty stack traces here, and treat them differently.
                if (e.getStackTrace().length == 0) {
                  error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
                } else {
                  throw e
                }
            }
          } else {
            runMain(args, uninitLog)
          }
        }

    doRunMain为集群调用子main class准备参数,然后调用runMain()执行任务invoke main

    4.总结

    Spark在作业提交中会采用多种不同的参数及模式,都会根据不同的参数选择不同的分支执行,因此在最后提交的runMain中会将所需要的参数传递给执行函数。

    本文分享自华为云社区《Spark内核解析之Spark-submit》,原文作者:笨熊爱喝cola。

     

    点击关注,第一时间了解华为云新鲜技术~

  • 相关阅读:
    echarts
    联合省选2021游记
    高维 FWT 学习笔记
    Unicode简介
    mac安装brew
    原生JS实现分页跳转
    Kubernetes Pod Probes 探针解析
    Kubernetes Secrets
    Kubernetes Container lifecycle hooks
    个人作业1——四则运算题目生成程序(基于java)
  • 原文地址:https://www.cnblogs.com/huaweiyun/p/14120613.html
Copyright © 2011-2022 走看看