zoukankan      html  css  js  c++  java
  • 通过 spark.files 传入spark任务依赖的文件源码分析

    版本:spak2.3

    相关源码:org.apache.spark.SparkContext

    在创建spark任务时候,往往会指定一些依赖文件,通常我们可以在spark-submit脚本使用--files /path/to/file指定来实现。

    但是公司产品的架构是通过livy来调spark任务,livy的实现其实是对spark-submit的一个包装,所以如何指定依赖文件归根到底还是在spark这边。既然不能通过命令行--files指定,那在编程中怎么指定?任务在各个节点上运行时又是如何获取到这些文件的呢?

    根据spark-submit的参数传递源码分析得知,spark-submit --files其实是由参数"spark.files"接收,所以在代码中可以通过sparkConf设置该参数。

    比如:

    SparkConf conf = new SparkConf();
    conf.set("spark.files","/path/to/file");
    //如果文件是放在hdfs上,可以通过conf.set("spark.files","hdfs:/path/to/file")指定,注意这里只需要加上个hdfs的schema即可,不需要ip port

    spark官网关于该参数的解释:

    spark.files  Comma-separated list of files to be placed in the working directory of each executor. Globs are allowed.

    具体怎么读取用户指定的文件相关源码在SparkContext.scala中,如下(--jars指定依赖jar包同理):

    def jars: Seq[String] = _jars
    def files: Seq[String] = _files
    ...
    
    _jars = Utils.getUserJars(_conf)
    _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
      .toSeq.flatten
    
    ...
    
    // Add each JAR given through the constructor
    if (jars != null) {
      jars.foreach(addJar)
    }
    
    if (files != null) {
      files.foreach(addFile)
    }

    addFile实现如下:

    /**
    * Add a file to be downloaded with this Spark job on every node.
    *
    * If a file is added during execution, it will not be available until the next TaskSet starts.
    *
    * @param path can be either a local file, a file in HDFS (or other Hadoop-supported
    * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
    * use `SparkFiles.get(fileName)` to find its download location.
    * @param recursive if true, a directory can be given in `path`. Currently directories are
    * only supported for Hadoop-supported filesystems.
    *    1. 文件会下载到每一个节点
    *    2. 如果在运行中增加文件,那么只有到下一批taskset开始执行时有效
    *    3. 文件的位置可以是本地文件,HDFS文件或者其他hadoop支持的文件系统上,HTTP,HTTPS或者FTP URI也可以。在spark jobs中可以通过
    *        SparkFiles.get(fileName)访问此文件
    *    4. 如果要递归获取文件,那么可以给定一个目录,但是这种方式只对Hadoop-supported filesystems有效。
    */
    def addFile(path: String, recursive: Boolean): Unit = {
    val uri = new Path(path).toUri
    val schemeCorrectedPath = uri.getScheme match {
        //如果路径中不指定schema,也就是null. 
        //在命令行指定--files 时候,--files /home/kong/log4j.properties等同于--files local:/home/kong/log4j.properties
      case null | "local" => new File(path).getCanonicalFile.toURI.toString
      case _ => path
    }
    
    val hadoopPath = new Path(schemeCorrectedPath)
    val scheme = new URI(schemeCorrectedPath).getScheme
    if (!Array("http", "https", "ftp").contains(scheme)) {
      val fs = hadoopPath.getFileSystem(hadoopConfiguration)
      val isDir = fs.getFileStatus(hadoopPath).isDirectory
      if (!isLocal && scheme == "file" && isDir) {
        throw new SparkException(s"addFile does not support local directories when not running " +
          "local mode.")
      }
      if (!recursive && isDir) {
        throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " +
          "turned on.")
      }
    } else {
      // SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
      Utils.validateURL(uri)
    }
    
    val key = if (!isLocal && scheme == "file") {
      env.rpcEnv.fileServer.addFile(new File(uri.getPath))
    } else {
      schemeCorrectedPath
    }
    val timestamp = System.currentTimeMillis
    if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
      logInfo(s"Added file $path at $key with timestamp $timestamp")
      // Fetch the file locally so that closures which are run on the driver can still use the
      // SparkFiles API to access files.
      Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), conf,
        env.securityManager, hadoopConfiguration, timestamp, useCache = false)
      postEnvironmentUpdate()
    }
    }

    在addJar和addFile方法的最后都调用了postEnvironmentUpdate方法,而且在SparkContext初始化过程的
    最后也会调用postEnvironmentUpdate,代码如下:

      /** Post the environment update event once the task scheduler is ready */
      private def postEnvironmentUpdate() {
        if (taskScheduler != null) {
          val schedulingMode = getSchedulingMode.toString
          val addedJarPaths = addedJars.keys.toSeq
          val addedFilePaths = addedFiles.keys.toSeq
            // 通过调用SparkEnv的方法environmentDetails将环境的JVM参数、Spark 属性、系统属性、classPath等信息设置为环境明细信息。
          val environmentDetails = SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths,
            addedFilePaths)
            // 生成SparkListenerEnvironmentUpdate事件,并投递到事件总线
          val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
          listenerBus.post(environmentUpdate)
        }
      }

    environmentDetails方法:

      /**
       * Return a map representation of jvm information, Spark properties, system properties, and
       * class paths. Map keys define the category, and map values represent the corresponding
       * attributes as a sequence of KV pairs. This is used mainly for SparkListenerEnvironmentUpdate.
       */
      private[spark]
      def environmentDetails(
          conf: SparkConf,
          schedulingMode: String,
          addedJars: Seq[String],
          addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = {
    
        import Properties._
        val jvmInformation = Seq(
          ("Java Version", s"$javaVersion ($javaVendor)"),
          ("Java Home", javaHome),
          ("Scala Version", versionString)
        ).sorted
    
        // Spark properties
        // This includes the scheduling mode whether or not it is configured (used by SparkUI)
        val schedulerMode =
          if (!conf.contains("spark.scheduler.mode")) {
            Seq(("spark.scheduler.mode", schedulingMode))
          } else {
            Seq.empty[(String, String)]
          }
        val sparkProperties = (conf.getAll ++ schedulerMode).sorted
    
        // System properties that are not java classpaths
        val systemProperties = Utils.getSystemProperties.toSeq
        val otherProperties = systemProperties.filter { case (k, _) =>
          k != "java.class.path" && !k.startsWith("spark.")
        }.sorted
    
        // Class paths including all added jars and files
        val classPathEntries = javaClassPath
          .split(File.pathSeparator)
          .filterNot(_.isEmpty)
          .map((_, "System Classpath"))
        val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
        val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted
    
        Map[String, Seq[(String, String)]](
          "JVM Information" -> jvmInformation,
          "Spark Properties" -> sparkProperties,
          "System Properties" -> otherProperties,
          "Classpath Entries" -> classPaths)
      }
  • 相关阅读:
    自定义udf添加一列
    spark执行命令 监控执行命令
    R链接hive/oracle/mysql
    [Hive_6] Hive 的内置函数应用
    [Hive_add_6] Hive 实现 Word Count
    [Hive_add_5] Hive 的 join 操作
    【爬坑】远程连接 MySQL 失败
    [Hive_add_4] Hive 命令行客户端 Beeline 的使用
    [Hive_5] Hive 的 JDBC 编程
    [Hive_add_3] Hive 进行简单数据处理
  • 原文地址:https://www.cnblogs.com/zz-ksw/p/11556901.html
Copyright © 2011-2022 走看看