zoukankan      html  css  js  c++  java
  • 通过 spark.files 传入spark任务依赖的文件源码分析

    版本:spak2.3

    相关源码:org.apache.spark.SparkContext

    在创建spark任务时候,往往会指定一些依赖文件,通常我们可以在spark-submit脚本使用--files /path/to/file指定来实现。

    但是公司产品的架构是通过livy来调spark任务,livy的实现其实是对spark-submit的一个包装,所以如何指定依赖文件归根到底还是在spark这边。既然不能通过命令行--files指定,那在编程中怎么指定?任务在各个节点上运行时又是如何获取到这些文件的呢?

    根据spark-submit的参数传递源码分析得知,spark-submit --files其实是由参数"spark.files"接收,所以在代码中可以通过sparkConf设置该参数。

    比如:

    SparkConf conf = new SparkConf();
    conf.set("spark.files","/path/to/file");
    //如果文件是放在hdfs上,可以通过conf.set("spark.files","hdfs:/path/to/file")指定,注意这里只需要加上个hdfs的schema即可,不需要ip port

    spark官网关于该参数的解释:

    spark.files  Comma-separated list of files to be placed in the working directory of each executor. Globs are allowed.

    具体怎么读取用户指定的文件相关源码在SparkContext.scala中,如下(--jars指定依赖jar包同理):

    def jars: Seq[String] = _jars
    def files: Seq[String] = _files
    ...
    
    _jars = Utils.getUserJars(_conf)
    _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
      .toSeq.flatten
    
    ...
    
    // Add each JAR given through the constructor
    if (jars != null) {
      jars.foreach(addJar)
    }
    
    if (files != null) {
      files.foreach(addFile)
    }

    addFile实现如下:

    /**
    * Add a file to be downloaded with this Spark job on every node.
    *
    * If a file is added during execution, it will not be available until the next TaskSet starts.
    *
    * @param path can be either a local file, a file in HDFS (or other Hadoop-supported
    * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
    * use `SparkFiles.get(fileName)` to find its download location.
    * @param recursive if true, a directory can be given in `path`. Currently directories are
    * only supported for Hadoop-supported filesystems.
    *    1. 文件会下载到每一个节点
    *    2. 如果在运行中增加文件,那么只有到下一批taskset开始执行时有效
    *    3. 文件的位置可以是本地文件,HDFS文件或者其他hadoop支持的文件系统上,HTTP,HTTPS或者FTP URI也可以。在spark jobs中可以通过
    *        SparkFiles.get(fileName)访问此文件
    *    4. 如果要递归获取文件,那么可以给定一个目录,但是这种方式只对Hadoop-supported filesystems有效。
    */
    def addFile(path: String, recursive: Boolean): Unit = {
    val uri = new Path(path).toUri
    val schemeCorrectedPath = uri.getScheme match {
        //如果路径中不指定schema,也就是null. 
        //在命令行指定--files 时候,--files /home/kong/log4j.properties等同于--files local:/home/kong/log4j.properties
      case null | "local" => new File(path).getCanonicalFile.toURI.toString
      case _ => path
    }
    
    val hadoopPath = new Path(schemeCorrectedPath)
    val scheme = new URI(schemeCorrectedPath).getScheme
    if (!Array("http", "https", "ftp").contains(scheme)) {
      val fs = hadoopPath.getFileSystem(hadoopConfiguration)
      val isDir = fs.getFileStatus(hadoopPath).isDirectory
      if (!isLocal && scheme == "file" && isDir) {
        throw new SparkException(s"addFile does not support local directories when not running " +
          "local mode.")
      }
      if (!recursive && isDir) {
        throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " +
          "turned on.")
      }
    } else {
      // SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
      Utils.validateURL(uri)
    }
    
    val key = if (!isLocal && scheme == "file") {
      env.rpcEnv.fileServer.addFile(new File(uri.getPath))
    } else {
      schemeCorrectedPath
    }
    val timestamp = System.currentTimeMillis
    if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
      logInfo(s"Added file $path at $key with timestamp $timestamp")
      // Fetch the file locally so that closures which are run on the driver can still use the
      // SparkFiles API to access files.
      Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), conf,
        env.securityManager, hadoopConfiguration, timestamp, useCache = false)
      postEnvironmentUpdate()
    }
    }

    在addJar和addFile方法的最后都调用了postEnvironmentUpdate方法,而且在SparkContext初始化过程的
    最后也会调用postEnvironmentUpdate,代码如下:

      /** Post the environment update event once the task scheduler is ready */
      private def postEnvironmentUpdate() {
        if (taskScheduler != null) {
          val schedulingMode = getSchedulingMode.toString
          val addedJarPaths = addedJars.keys.toSeq
          val addedFilePaths = addedFiles.keys.toSeq
            // 通过调用SparkEnv的方法environmentDetails将环境的JVM参数、Spark 属性、系统属性、classPath等信息设置为环境明细信息。
          val environmentDetails = SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths,
            addedFilePaths)
            // 生成SparkListenerEnvironmentUpdate事件,并投递到事件总线
          val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
          listenerBus.post(environmentUpdate)
        }
      }

    environmentDetails方法:

      /**
       * Return a map representation of jvm information, Spark properties, system properties, and
       * class paths. Map keys define the category, and map values represent the corresponding
       * attributes as a sequence of KV pairs. This is used mainly for SparkListenerEnvironmentUpdate.
       */
      private[spark]
      def environmentDetails(
          conf: SparkConf,
          schedulingMode: String,
          addedJars: Seq[String],
          addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = {
    
        import Properties._
        val jvmInformation = Seq(
          ("Java Version", s"$javaVersion ($javaVendor)"),
          ("Java Home", javaHome),
          ("Scala Version", versionString)
        ).sorted
    
        // Spark properties
        // This includes the scheduling mode whether or not it is configured (used by SparkUI)
        val schedulerMode =
          if (!conf.contains("spark.scheduler.mode")) {
            Seq(("spark.scheduler.mode", schedulingMode))
          } else {
            Seq.empty[(String, String)]
          }
        val sparkProperties = (conf.getAll ++ schedulerMode).sorted
    
        // System properties that are not java classpaths
        val systemProperties = Utils.getSystemProperties.toSeq
        val otherProperties = systemProperties.filter { case (k, _) =>
          k != "java.class.path" && !k.startsWith("spark.")
        }.sorted
    
        // Class paths including all added jars and files
        val classPathEntries = javaClassPath
          .split(File.pathSeparator)
          .filterNot(_.isEmpty)
          .map((_, "System Classpath"))
        val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
        val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted
    
        Map[String, Seq[(String, String)]](
          "JVM Information" -> jvmInformation,
          "Spark Properties" -> sparkProperties,
          "System Properties" -> otherProperties,
          "Classpath Entries" -> classPaths)
      }
  • 相关阅读:
    HDU 1813 Escape from Tetris
    BZOJ 2276 Temperature
    BZOJ 4499 线性函数
    BZOJ 3131 淘金
    HDU 5738 Eureka
    POJ 2409 Let it Bead
    POJ 1286 Necklace of Beads
    POJ 1696 Space Ant
    Fox And Jumping
    Recover the String
  • 原文地址:https://www.cnblogs.com/zz-ksw/p/11556901.html
Copyright © 2011-2022 走看看