zoukankan      html  css  js  c++  java
  • Spark中经常使用工具类Utils的简明介绍

    《深入理解Spark:核心思想与源代码分析》一书前言的内容请看链接《深入理解SPARK:核心思想与源代码分析》一书正式出版上市

    《深入理解Spark:核心思想与源代码分析》一书第一章的内容请看链接《第1章 环境准备》

    《深入理解Spark:核心思想与源代码分析》一书第二章的内容请看链接《第2章 SPARK设计理念与基本架构》

    《深入理解Spark:核心思想与源代码分析》一书第三章第一部分的内容请看链接《深入理解Spark:核心思想与源代码分析》——SparkContext的初始化(伯篇)》

    《深入理解Spark:核心思想与源代码分析》一书第三章第二部分的内容请看链接《深入理解Spark:核心思想与源代码分析》——SparkContext的初始化(仲篇)》

    《深入理解Spark:核心思想与源代码分析》一书第三章第三部分的内容请看链接《深入理解Spark:核心思想与源代码分析》——SparkContext的初始化(叔篇)》

    《深入理解Spark:核心思想与源代码分析》一书第三章第四部分的内容请看链接《深入理解Spark:核心思想与源代码分析》——SparkContext的初始化(季篇)》


    Utils是Spark中最经常使用的工具类之中的一个,假设不关心事实上现,也不会对理解Spark有太多影响。可是对于Scala或者Spark的刚開始学习的人来说。通过了解Utils工具类的实现,也是个不错的入门途径。

    以下将逐个介绍Utils工具类提供的经常用法。

    1.localHostName

    功能描写叙述:获取本地机器名。


    def localHostName(): String = {
        customHostname.getOrElse(localIpAddressHostname)
      }

    2.getDefaultPropertiesFile

    功能描写叙述:获取默认的Spark属性文件。

      def getDefaultPropertiesFile(env: Map[String, String] = sys.env): String = {
        env.get("SPARK_CONF_DIR")
          .orElse(env.get("SPARK_HOME").map{ t => s"$t${File.separator}conf"})
          .map { t => new File(s"$t${File.separator}spark-defaults.conf")}
          .filter(_.isFile)
          .map(_.getAbsolutePath)
          .orNull
      }

    3.loadDefaultSparkProperties

    功能描写叙述:载入指定文件里的Spark属性。假设没有指定文件,则载入默认Spark属性文件的属性。

      def loadDefaultSparkProperties(conf:SparkConf, filePath: String = null):String = {
        val path =Option(filePath).getOrElse(getDefaultPropertiesFile())
        Option(path).foreach { confFile =>
          getPropertiesFromFile(confFile).filter{ case (k,v) =>
            k.startsWith("spark.")
          }.foreach { case (k, v) =>
           conf.setIfMissing(k, v)
           sys.props.getOrElseUpdate(k, v)
          }
        }
        path
      }

    4.getCallSite

    功能描写叙述:获取当前SparkContext的当前调用堆栈。将栈里最靠近栈底的属于spark或者Scala核心的类压入callStack的栈顶。并将此类的方法存入lastSparkMethod;将栈里最靠近栈顶的用户类放入callStack,将此类的行号存入firstUserLine。类名存入firstUserFile。终于返回的例子类CallSite存储了最短栈和长度默觉得20的最长栈的例子类。在JavaWordCount例子中。获得的数据例如以下:

    • 最短栈:JavaSparkContext at JavaWordCount.java:44;
    • 最长栈:org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:61)org.apache.spark.examples.JavaWordCount.main(JavaWordCount.java:44)。
    def getCallSite(skipClass: String => Boolean =coreExclusionFunction): CallSite = {
        val trace =Thread.currentThread.getStackTrace().filterNot { ste: StackTraceElement=>
          ste == null || ste.getMethodName == null || ste.getMethodName.contains("getStackTrace")
        }
        var lastSparkMethod= "<unknown>"
        var firstUserFile= "<unknown>"
        var firstUserLine= 0
        var insideSpark= true
        var callStack= new ArrayBuffer[String]() :+ "<unknown>"
     
        for (el<- trace) {
          if (insideSpark){
            if (skipClass(el.getClassName)){
              lastSparkMethod = if(el.getMethodName == "<init>") {
                el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)
              } else {
                el.getMethodName
              }
              callStack(0) = el.toString // Putlast Spark method on top of the stack trace.
            } else {
              firstUserLine = el.getLineNumber
              firstUserFile = el.getFileName
              callStack += el.toString
              insideSpark = false
            }
          } else {
            callStack += el.toString
          }
        }
        val callStackDepth= System.getProperty("spark.callstack.depth","20").toInt
        CallSite(
          shortForm = s"$lastSparkMethod at $firstUserFile:$firstUserLine",
          longForm = callStack.take(callStackDepth).mkString("
    "))
      }

    5.startServiceOnPort

    功能描写叙述:Scala跟其他脚本语言一样。函数也能够传递,此方法正是通过回调startService这个函数来启动服务,并终于返回startService返回的service地址及port。

    假设启动过程有异常。还会多次重试,直到达到maxRetries表示的最大次数。

     

     def startServiceOnPort[T](
          startPort:Int,
          startService:Int => (T, Int),
          conf:SparkConf,
          serviceName:String = ""): (T, Int) = {
       require(startPort == 0 || (1024 <= startPort && startPort < 65536),
          "startPort should be between 1024 and 65535(inclusive), or 0 for a random free port.")
        val serviceString= if (serviceName.isEmpty) "" elses" '$serviceName'"
        val maxRetries= portMaxRetries(conf)
        for (offset<- 0 to maxRetries){
          val tryPort= if (startPort == 0) {
            startPort
          } else {
            ((startPort+ offset - 1024)% (65536 - 1024))+ 1024
          }
          try {
            val (service,port) = startService(tryPort)
            logInfo(s"Successfullystarted service$serviceString on port $port.")
            return (service,port)
          } catch {
            case e:Exception if isBindCollision(e) =>
              if (offset>= maxRetries) {
                val exceptionMessage=
                  s"${e.getMessage}:Service$serviceString failed after $maxRetries retries!"
                val exception= new BindException(exceptionMessage)
                exception.setStackTrace(e.getStackTrace)
                throw exception
              }
             logWarning(s"Service$serviceString couldnot bind on port $tryPort. " +
                s"Attempting port ${tryPort+ 1}.")
          }
        }
        throw newSparkException(s"Failed to start service$serviceString on port$startPort")
      }

    6.createDirectory

    功能描写叙述:用spark+UUID的方式创建暂时文件文件夹,假设创建失败会多次重试,最多重试10次。

      def createDirectory(root: String, namePrefix: String = "spark"): File = {
        var attempts = 0
        val maxAttempts = MAX_DIR_CREATION_ATTEMPTS
        var dir: File = null
        while (dir == null){
          attempts += 1
          if (attempts > maxAttempts) {
            throw newIOException("Failed to create a temp directory(under " + root + ") after "+
             maxAttempts + " attempts!")
          }
          try {
            dir = new File(root, "spark-"+ UUID.randomUUID.toString)
            if (dir.exists() || !dir.mkdirs()) {
              dir = null
            }
          } catch { casee: SecurityException => dir = null;}
        }
     
        dir
      }

    7.getOrCreateLocalRootDirs

    功能描写叙述:依据spark.local.dir的配置,作为本地文件的根文件夹,在创建一、二级文件夹之前要确保根文件夹是存在的。然后调用createDirectory创建一级文件夹。

      private[spark] defgetOrCreateLocalRootDirs(conf: SparkConf): Array[String] = {
        if (isRunningInYarnContainer(conf)) {
         getYarnLocalDirs(conf).split(",")
        } else {
         Option(conf.getenv("SPARK_LOCAL_DIRS"))
           .getOrElse(conf.get("spark.local.dir",System.getProperty("java.io.tmpdir")))
            .split(",")
            .flatMap {root =>
              try {
                val rootDir = newFile(root)
                if (rootDir.exists || rootDir.mkdirs()) {
                  val dir = createDirectory(root)
                 chmod700(dir)
                 Some(dir.getAbsolutePath)
                } else {
                 logError(s"Failed to create dir in $root. Ignoring this directory.")
                  None
                }
              } catch {
                case e: IOException =>
               logError(s"Failed to create local rootdir in $root. Ignoring this directory.")
                None
              }
            }
            .toArray
        }
      }

    8.getLocalDir

    功能描写叙述:查询Spark本地文件的一级文件夹。

      def getLocalDir(conf: SparkConf): String = {
       getOrCreateLocalRootDirs(conf)(0)
      }

    9.createTempDir

    功能描写叙述:在Spark一级文件夹下创建暂时文件夹。并将文件夹注冊到shutdownDeletePaths:scala.collection.mutable.HashSet[String]中。

      def createTempDir(
          root: String= System.getProperty("java.io.tmpdir"),
          namePrefix:String = "spark"): File = {
        val dir =createDirectory(root, namePrefix)
       registerShutdownDeleteDir(dir)
        dir
      }

    10.RegisterShutdownDeleteDir

    功能描写叙述:将文件夹注冊到shutdownDeletePaths:scala.collection.mutable.HashSet[String]中,以便在进程退出时删除。

      def registerShutdownDeleteDir(file: File) {
        val absolutePath =file.getAbsolutePath()
        shutdownDeletePaths.synchronized{
          shutdownDeletePaths += absolutePath
        }
      }

    11.hasRootAsShutdownDeleteDir

    功能描写叙述:推断文件是否匹配关闭时要删除的文件及文件夹,shutdownDeletePaths:scala.collection.mutable.HashSet[String]存储在进程关闭时要删除的文件及文件夹。

      def hasRootAsShutdownDeleteDir(file: File): Boolean = {
        val absolutePath= file.getAbsolutePath()
        val retval= shutdownDeletePaths.synchronized {
          shutdownDeletePaths.exists { path =>
            !absolutePath.equals(path) && absolutePath.startsWith(path)
          }
        }
        if (retval){
          logInfo("path = " + file + ", already present as root for deletion.")
        }
        retval
      }

    12.deleteRecursively

    功能描写叙述:用于删除文件或者删除文件夹及其子文件夹、子文件,而且从shutdownDeletePaths:scala.collection.mutable.HashSet[String]中移除此文件或文件夹。

      def deleteRecursively(file: File) {
        if (file != null){
          try {
            if (file.isDirectory &&!isSymlink(file)) {
              var savedIOException:IOException = null
              for (child<- listFilesSafely(file)) {
                try {
                 deleteRecursively(child)
                } catch {
                  case ioe:IOException => savedIOException = ioe
                }
              }
              if (savedIOException!= null) {
                throw savedIOException
              }
              shutdownDeletePaths.synchronized {
                shutdownDeletePaths.remove(file.getAbsolutePath)
              }
            }
          } finally {
            if (!file.delete()) {
              if (file.exists()) {
                throw newIOException("Failed to delete: " +file.getAbsolutePath)
              }
            }
          }
        }
      }

    13.getSparkClassLoader

    功能描写叙述:获取载入当前class的ClassLoader。

    def getSparkClassLoader = getClass.getClassLoader

    14.getContextOrSparkClassLoader

    功能描写叙述:用于获取线程上下文的ClassLoader,没有设置时获取载入Spark的ClassLoader。

     def getContextOrSparkClassLoader =
       Option(Thread.currentThread().getContextClassLoader).getOrElse(getSparkClassLoader)

    15.newDaemonCachedThreadPool

    功能描写叙述:使用Executors.newCachedThreadPool创建的缓存线程池。

      def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
        val threadFactory =namedThreadFactory(prefix)
       Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
      }

    16.doFetchFile

    功能描写叙述:使用URLConnection通过http协议下载文件。

      private defdoFetchFile(url: String, targetDir: File, filename: String, conf: SparkConf,
          securityMgr:SecurityManager, hadoopConf: Configuration) {
        val tempFile= File.createTempFile("fetchFileTemp",null, newFile(targetDir.getAbsolutePath))
        val targetFile= new File(targetDir, filename)
        val uri = new URI(url)
        val fileOverwrite= conf.getBoolean("spark.files.overwrite",defaultValue = false)
        Option(uri.getScheme).getOrElse("file")match {
          case "http"| "https" | "ftp" =>
            logInfo("Fetching " + url + " to " + tempFile)
            var uc:URLConnection = null
            if (securityMgr.isAuthenticationEnabled()) {
              logDebug("fetchFile with security enabled")
              val newuri= constructURIForAuthentication(uri,securityMgr)
              uc = newuri.toURL().openConnection()
              uc.setAllowUserInteraction(false)
            } else {
              logDebug("fetchFile not using security")
              uc = newURL(url).openConnection()
            }
            valtimeout = conf.getInt("spark.files.fetchTimeout", 60) * 1000
            uc.setConnectTimeout(timeout)
            uc.setReadTimeout(timeout)
            uc.connect()
            val in = uc.getInputStream()
           downloadFile(url, in, tempFile, targetFile,fileOverwrite)
          case "file"=>
            val sourceFile= if (uri.isAbsolute)new File(uri)else newFile(url)
           copyFile(url, sourceFile, targetFile, fileOverwrite)
          case _ =>
            val fs =getHadoopFileSystem(uri, hadoopConf)
            val in = fs.open(newPath(uri))
           downloadFile(url, in, tempFile, targetFile,fileOverwrite)
        }
      }

    17.fetchFile

    功能描写叙述:假设文件在本地有缓存。则从本地获取。否则通过HTTP远程下载。最后对.tar、.tar.gz等格式的文件解压缩后,调用shell命令行的chmod命令给文件添加a+x的权限。

      def fetchFile(
          url: String,
          targetDir:File,
          conf:SparkConf,
          securityMgr:SecurityManager,
          hadoopConf:Configuration,
          timestamp:Long,
          useCache:Boolean) {
        val fileName= url.split("/").last
        val targetFile= new File(targetDir, fileName)
        val fetchCacheEnabled= conf.getBoolean("spark.files.useFetchCache",defaultValue = true)
        if (useCache && fetchCacheEnabled) {
          val cachedFileName= s"${url.hashCode}${timestamp}_cache"
          val lockFileName= s"${url.hashCode}${timestamp}_lock"
          val localDir= new File(getLocalDir(conf))
          val lockFile= new File(localDir,lockFileName)
          val raf = new RandomAccessFile(lockFile,"rw")
          val lock = raf.getChannel().lock()
          val cachedFile= new File(localDir,cachedFileName)
          try {
            if (!cachedFile.exists()){
             doFetchFile(url, localDir, cachedFileName, conf, securityMgr, hadoopConf)
            }
          } finally {
            lock.release()
          }
          copyFile(
            url,
            cachedFile,
            targetFile,
           conf.getBoolean("spark.files.overwrite",false)
          )
        } else {
         doFetchFile(url, targetDir, fileName,conf, securityMgr, hadoopConf)
        }
        if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
          logInfo("Untarring " + fileName)
         Utils.execute(Seq("tar", "-xzf", fileName),targetDir)
        } else if(fileName.endsWith(".tar")){
          logInfo("Untarring " + fileName)
         Utils.execute(Seq("tar", "-xf", fileName),targetDir)
        }
        FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
      }

    18.executeAndGetOutput

    功能描写叙述:运行一条command命令。而且获取它的输出。调用stdoutThread的join方法,让当前线程等待stdoutThread运行完毕。

      def executeAndGetOutput(
          command:Seq[String],
          workingDir:File = new File("."),
         extraEnvironment: Map[String, String] = Map.empty): String = {
        val builder= new ProcessBuilder(command: _*)
           .directory(workingDir)
        val environment= builder.environment()
        for ((key, value) <- extraEnvironment) {
          environment.put(key,value)
        }
        val process= builder.start()
        new Thread("readstderr for " + command(0)) {
          override defrun() {
            for (line<- Source.fromInputStream(process.getErrorStream).getLines()){
              System.err.println(line)
            }
          }
        }.start()
        val output= new StringBuffer
        val stdoutThread= new Thread("readstdout for " + command(0)) {
          override defrun() {
            for (line<- Source.fromInputStream(process.getInputStream).getLines()){
              output.append(line)
            }
          }
        }
        stdoutThread.start()
        val exitCode= process.waitFor()
        stdoutThread.join()   // Wait for itto finish reading output
        if (exitCode!= 0) {
          logError(s"Process $commandexited with code $exitCode: $output")
          throw newSparkException(s"Process $command exited with code $exitCode")
        }
        output.toString
      }

    19.memoryStringToMb

    功能描写叙述:将内存大小字符串转换为以MB为单位的整型值。

      def memoryStringToMb(str: String): Int = {
        val lower =str.toLowerCase
        if (lower.endsWith("k")) {
          (lower.substring(0,lower.length-1).toLong/ 1024).toInt
        } else if(lower.endsWith("m")){
          lower.substring(0,lower.length-1).toInt
        } else if(lower.endsWith("g")){
          lower.substring(0,lower.length-1).toInt* 1024
        } else if(lower.endsWith("t")){
          lower.substring(0,lower.length-1).toInt* 1024 * 1024
        } else {// nosuffix, so it's just a number in bytes
          (lower.toLong / 1024/ 1024).toInt
        }
      }


  • 相关阅读:
    全排列 next_permutation()函数
    hdu1247
    hdu3518
    pku2774 求最长公共子串
    hdu3460 Ancient Printer
    pku2001
    pku 3261
    NOI.AC#2007light【根号分治】
    CF1370F2The Hidden Pair(Hard Version)【交互题,二分】
    P3335[ZJOI2013]蚂蚁寻路【dp】
  • 原文地址:https://www.cnblogs.com/yutingliuyl/p/7259412.html
Copyright © 2011-2022 走看看