zoukankan      html  css  js  c++  java
  • spark源码学习-withScope

     withScope是最近的发现版中新增加的一个模块,它是用来做DAG可视化的(DAG visualization on SparkUI)

    以前的sparkUI中只有stage的执行情况,也就是说我们不可以看到上个RDD到下个RDD的具体信息。于是为了在

    sparkUI中能展示更多的信息。所以把所有创建的RDD的方法都包裹起来,同时用RDDOperationScope 记录 RDD 的操作历史和关联,就能达成目标。下面就是一张WordCount的DAG visualization on SparkUI

    记录关系的RDDOperationScope源码如下:

    /**
     * A general, named code block representing an operation that instantiates RDDs.
     *
     * All RDDs instantiated in the corresponding code block will store a pointer to this object.
     * Examples include, but will not be limited to, existing RDD operations, such as textFile,
     * reduceByKey, and treeAggregate.
     *
     * An operation scope may be nested in other scopes. For instance, a SQL query may enclose
     * scopes associated with the public RDD APIs it uses under the hood.
     *
     * There is no particular relationship between an operation scope and a stage or a job.
     * A scope may live inside one stage (e.g. map) or span across multiple jobs (e.g. take).
     */
    @JsonInclude(Include.NON_NULL)
    @JsonPropertyOrder(Array("id", "name", "parent"))
    private[spark] class RDDOperationScope(
        val name: String,
        val parent: Option[RDDOperationScope] = None,
        val id: String = RDDOperationScope.nextScopeId().toString) {
    
      def toJson: String = {
        RDDOperationScope.jsonMapper.writeValueAsString(this)
      }
    
      /**
       * Return a list of scopes that this scope is a part of, including this scope itself.
       * The result is ordered from the outermost scope (eldest ancestor) to this scope.
       */
      @JsonIgnore
      def getAllScopes: Seq[RDDOperationScope] = {
        parent.map(_.getAllScopes).getOrElse(Seq.empty) ++ Seq(this)
      }
    
      override def equals(other: Any): Boolean = {
        other match {
          case s: RDDOperationScope =>
            id == s.id && name == s.name && parent == s.parent
          case _ => false
        }
      }
    
      override def hashCode(): Int = Objects.hashCode(id, name, parent)
    
      override def toString: String = toJson
    }
    
    /**
     * A collection of utility methods to construct a hierarchical representation of RDD scopes.
     * An RDD scope tracks the series of operations that created a given RDD.
     */
    private[spark] object RDDOperationScope extends Logging {
      private val jsonMapper = new ObjectMapper().registerModule(DefaultScalaModule)
      private val scopeCounter = new AtomicInteger(0)
    
      def fromJson(s: String): RDDOperationScope = {
        jsonMapper.readValue(s, classOf[RDDOperationScope])
      }
    
      /** Return a globally unique operation scope ID. */
      def nextScopeId(): Int = scopeCounter.getAndIncrement
    
      /**
       * Execute the given body such that all RDDs created in this body will have the same scope.
       * The name of the scope will be the first method name in the stack trace that is not the
       * same as this method's.
       *
       * Note: Return statements are NOT allowed in body.
       */
      private[spark] def withScope[T](
          sc: SparkContext,
          allowNesting: Boolean = false)(body: => T): T = {
        val ourMethodName = "withScope"
        val callerMethodName = Thread.currentThread.getStackTrace()
          .dropWhile(_.getMethodName != ourMethodName)
          .find(_.getMethodName != ourMethodName)
          .map(_.getMethodName)
          .getOrElse {
            // Log a warning just in case, but this should almost certainly never happen
            logWarning("No valid method name for this RDD operation scope!")
            "N/A"
          }
        withScope[T](sc, callerMethodName, allowNesting, ignoreParent = false)(body)
      }
    
      /**
       * Execute the given body such that all RDDs created in this body will have the same scope.
       *
       * If nesting is allowed, any subsequent calls to this method in the given body will instantiate
       * child scopes that are nested within our scope. Otherwise, these calls will take no effect.
       *
       * Additionally, the caller of this method may optionally ignore the configurations and scopes
       * set by the higher level caller. In this case, this method will ignore the parent caller's
       * intention to disallow nesting, and the new scope instantiated will not have a parent. This
       * is useful for scoping physical operations in Spark SQL, for instance.
       *
       * Note: Return statements are NOT allowed in body.
       */
      private[spark] def withScope[T](
          sc: SparkContext,
          name: String,
          allowNesting: Boolean,
          ignoreParent: Boolean)(body: => T): T = {
        // Save the old scope to restore it later
        val scopeKey = SparkContext.RDD_SCOPE_KEY
        val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
        val oldScopeJson = sc.getLocalProperty(scopeKey)
        val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
        val oldNoOverride = sc.getLocalProperty(noOverrideKey)
        try {
          if (ignoreParent) {
            // Ignore all parent settings and scopes and start afresh with our own root scope
            sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson)
          } else if (sc.getLocalProperty(noOverrideKey) == null) {
            // Otherwise, set the scope only if the higher level caller allows us to do so
            sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
          }
          // Optionally disallow the child body to override our scope
          if (!allowNesting) {
            sc.setLocalProperty(noOverrideKey, "true")
            log.info("this is textFile1")
            log.info("this is textFile2" )
            //println("this is textFile3")
            log.error("this is textFile4err")
            log.warn("this is textFile5WARN")
            log.debug("this is textFile6debug")
          }
          body
        } finally {
          // Remember to restore any state that was modified before exiting
          sc.setLocalProperty(scopeKey, oldScopeJson)
          sc.setLocalProperty(noOverrideKey, oldNoOverride)
        }
      }
    }
  • 相关阅读:
    INVALID_STATE_ERR: DOM Exception 11
    测试用户的网络环境
    CentOS修改IP、DNS、网关
    读《结网》
    命名函数表达式
    JavaScript中的编码函数
    linux下C程序printf没有立即输出的问题及我的Makefile文件
    学习使用vimperator
    thinkpad e40 安装 nvidia显卡驱动之后
    fedora:在命令行下删除文件到回收站
  • 原文地址:https://www.cnblogs.com/moonlightml/p/9055184.html
Copyright © 2011-2022 走看看