zoukankan      html  css  js  c++  java
  • Spark SQL(4)-Unresolved Plan到Analyzed Plan

    Spark Sql(4)-Unresolved Plan到Analyzed Plan

    在第三篇总结了Unresolved Plan的生成过程,在此之后就是将其转换为Analyzed Plan。这这一步主要涉及到QueryExecution、Analyzer、catalog等。

    spark.sql() -> Dataset.ofRows :

     def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
        val qe = sparkSession.sessionState.executePlan(logicalPlan)
        qe.assertAnalyzed()
        new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
      }
    

      在上面的方法里面,qe.assertAnalyzed() 这个会触发计划树的analyzed,这么做一部分考虑是为了通过绑定catalog里面的数据信息,及时发现一些错误,例如函数名、字段名不匹配找不到。

    真正的analyzed的操作是在调用QueryExecution的实例化analyzed变量的时候进行的。

     def assertAnalyzed(): Unit = analyzed
     lazy val analyzed: LogicalPlan = {
        SparkSession.setActiveSession(sparkSession)
        sparkSession.sessionState.analyzer.executeAndCheck(logical)
      }
    

      那么QueryExecution以及analyzer还有catalog在什么时候实例化的呢?

    这个是在sparkSession中:

     lazy val sessionState: SessionState = {
        parentSessionState
          .map(_.clone(this))
          .getOrElse {
            val state = SparkSession.instantiateSessionState(
              SparkSession.sessionStateClassName(sparkContext.conf),
              self)
            initialSessionOptions.foreach { case (k, v) => state.conf.setConfString(k, v) }
            state
          }
      }
    

      这里其实是instantiateSessionState() -> BaseSessionStateBuilder.build()方法中build SessionState确定的:

      /**
       * Build the [[SessionState]].
       */
      def build(): SessionState = {
        new SessionState(
          session.sharedState,
          conf,
          experimentalMethods,
          functionRegistry,
          udfRegistration,
          () => catalog,
          sqlParser,
          () => analyzer,
          () => optimizer,
          planner,
          streamingQueryManager,
          listenerManager,
          () => resourceLoader,
          createQueryExecution,
          createClone)
      }
    

      Analyzer:

    protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
        override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
          new FindDataSourceTable(session) +:
            new ResolveSQLOnFile(session) +:
            customResolutionRules
    
        override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
          PreprocessTableCreation(session) +:
            PreprocessTableInsertion(conf) +:
            DataSourceAnalysis(conf) +:
            customPostHocResolutionRules
    
        override val extendedCheckRules: Seq[LogicalPlan => Unit] =
          PreWriteCheck +:
            PreReadCheck +:
            HiveOnlyCheck +:
            customCheckRules
      }
    

      从上面可以看到Analyzer携带了catalog和sqlconf的引用,所以analyzed中主要在这里面进行;analyzed主要是:

    sparkSession.sessionState.analyzer.executeAndCheck(logical)
    

      这一行代码中executeAndCheck内容在Analyzer中:

     def executeAndCheck(plan: LogicalPlan): LogicalPlan = {
        val analyzed = execute(plan)
        try {
          checkAnalysis(analyzed)
          EliminateBarriers(analyzed)
        } catch {
          case e: AnalysisException =>
            val ae = new AnalysisException(e.message, e.line, e.startPosition, Option(analyzed))
            ae.setStackTrace(e.getStackTrace)
            throw ae
        }
      }
    

      val analyzed = execute(plan)这行其实调用的是RuleExecutor.execute;

    def execute(plan: TreeType): TreeType = {
        var curPlan = plan
        val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
    
        batches.foreach { batch =>
          val batchStartPlan = curPlan
          var iteration = 1
          var lastPlan = curPlan
          var continue = true
    
          // Run until fix point (or the max number of iterations as specified in the strategy.
          while (continue) {
            curPlan = batch.rules.foldLeft(curPlan) {
              case (plan, rule) =>
                val startTime = System.nanoTime()
                val result = rule(plan)
                val runTime = System.nanoTime() - startTime
    
                if (!result.fastEquals(plan)) {
                  queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
                  queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
                  logTrace(
                    s"""
                      |=== Applying Rule ${rule.ruleName} ===
                      |${sideBySide(plan.treeString, result.treeString).mkString("
    ")}
                    """.stripMargin)
                }
                queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
                queryExecutionMetrics.incNumExecution(rule.ruleName)
    
                // Run the structural integrity checker against the plan after each rule.
                if (!isPlanIntegral(result)) {
                  val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " +
                    "the structural integrity of the plan is broken."
                  throw new TreeNodeException(result, message, null)
                }
    
                result
            }
            iteration += 1
            if (iteration > batch.strategy.maxIterations) {
              // Only log if this is a rule that is supposed to run more than once.
              if (iteration != 2) {
                val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}"
                if (Utils.isTesting) {
                  throw new TreeNodeException(curPlan, message, null)
                } else {
                  logWarning(message)
                }
              }
              continue = false
            }
    
            if (curPlan.fastEquals(lastPlan)) {
              logTrace(
                s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
              continue = false
            }
            lastPlan = curPlan
          }
    
          if (!batchStartPlan.fastEquals(curPlan)) {
            logDebug(
              s"""
                |=== Result of Batch ${batch.name} ===
                |${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("
    ")}
              """.stripMargin)
          } else {
            logTrace(s"Batch ${batch.name} has no effect.")
          }
        }
    
        curPlan
      }
    }
    

      从上面的代码可以看出,在迭代应用rule的退出条件就是大于batch现在的迭代次数,或者前一次迭代和上次一样。RuleExecutor内部提供了Seq[Batch],子类中可以规定具体的Batch,每个Batch都包含一系列的策略和规则,这里的RuleExecutor.execute只是逻辑的依次遍历batch,然后应用具体的rule,这些具体的batch和rule都由子类去实现。

    到这里其实我们主要关心的就可以放在Analyzer中重写的batch的部分,看看到底都有哪些规则和策略:

    lazy val batches: Seq[Batch] = Seq(
        Batch("Hints", fixedPoint,
          new ResolveHints.ResolveBroadcastHints(conf),
          ResolveHints.RemoveAllHints),
        Batch("Simple Sanity Check", Once,
          LookupFunctions),
        Batch("Substitution", fixedPoint,
          CTESubstitution,
          WindowsSubstitution,
          EliminateUnions,
          new SubstituteUnresolvedOrdinals(conf)),
        Batch("Resolution", fixedPoint,
          ResolveTableValuedFunctions ::
          ResolveRelations :: // 解析表、列名与树中节点进行绑定
          ResolveReferences ::
          ResolveCreateNamedStruct ::
          ResolveDeserializer ::
          ResolveNewInstance ::
          ResolveUpCast ::
          ResolveGroupingAnalytics ::
          ResolvePivot ::
          ResolveOrdinalInOrderByAndGroupBy ::
          ResolveAggAliasInGroupBy ::
          ResolveMissingReferences ::
          ExtractGenerator ::
          ResolveGenerate ::
          ResolveFunctions :: // 解析函数
          ResolveAliases ::
          ResolveSubquery ::
          ResolveSubqueryColumnAliases ::
          ResolveWindowOrder ::
          ResolveWindowFrame ::
          ResolveNaturalAndUsingJoin ::
          ExtractWindowExpressions ::
          GlobalAggregates ::
          ResolveAggregateFunctions ::
          TimeWindowing ::
          ResolveInlineTables(conf) ::
          ResolveTimeZone(conf) ::
          ResolvedUuidExpressions ::
          TypeCoercion.typeCoercionRules(conf) ++
          extendedResolutionRules : _*),
        Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
        Batch("View", Once,
          AliasViewChild(conf)),
        Batch("Nondeterministic", Once,
          PullOutNondeterministic),
        Batch("UDF", Once,
          HandleNullInputsForUDF),
        Batch("FixNullability", Once,
          FixNullability),
        Batch("Subquery", Once,
          UpdateOuterReferences),
        Batch("Cleanup", fixedPoint,
          CleanupAliases)
      )
    

      这里主要看看ResolveRelations和ResolveFunctions看看是怎么和catalog里面的表、函数绑定的:

        def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
          case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
            EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
              case v: View =>
                u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
              case other => i.copy(table = other)
            }
          case u: UnresolvedRelation => resolveRelation(u)
        }
    
    // Look up the table with the given name from catalog. The database we used is decided by the
        // precedence:
        // 1. Use the database part of the table identifier, if it is defined;
        // 2. Use defaultDatabase, if it is defined(In this case, no temporary objects can be used,
        //    and the default database is only used to look up a view);
        // 3. Use the currentDb of the SessionCatalog.
        private def lookupTableFromCatalog(
            u: UnresolvedRelation,
            defaultDatabase: Option[String] = None): LogicalPlan = {
          val tableIdentWithDb = u.tableIdentifier.copy(
            database = u.tableIdentifier.database.orElse(defaultDatabase))
          try {
            catalog.lookupRelation(tableIdentWithDb)
          } catch {
            case e: NoSuchTableException =>
              u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}", e)
            // If the database is defined and that database is not found, throw an AnalysisException.
            // Note that if the database is not defined, it is possible we are looking up a temp view.
            case e: NoSuchDatabaseException =>
              u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}, the " +
                s"database ${e.db} doesn't exist.", e)
          }
        }
    

      上面是绑定表信息的过程,主要是就是调用lookupTableFromCatalog在catalog中查找匹配,其次关于函数的匹配也类似

    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
          case q: LogicalPlan =>
            q transformExpressions {
              case u if !u.childrenResolved => u // Skip until children are resolved.
              case u: UnresolvedAttribute if resolver(u.name, VirtualColumn.hiveGroupingIdName) =>
                withPosition(u) {
                  Alias(GroupingID(Nil), VirtualColumn.hiveGroupingIdName)()
                }
              case u @ UnresolvedGenerator(name, children) =>
                withPosition(u) {
                  catalog.lookupFunction(name, children) match {
                    case generator: Generator => generator
                    case other =>
                      failAnalysis(s"$name is expected to be a generator. However, " +
                        s"its class is ${other.getClass.getCanonicalName}, which is not a generator.")
                  }
                }
              case u @ UnresolvedFunction(funcId, children, isDistinct) =>
                withPosition(u) {
                  catalog.lookupFunction(funcId, children) match {
                    // AggregateWindowFunctions are AggregateFunctions that can only be evaluated within
                    // the context of a Window clause. They do not need to be wrapped in an
                    // AggregateExpression.
                    case wf: AggregateWindowFunction =>
                      if (isDistinct) {
                        failAnalysis(s"${wf.prettyName} does not support the modifier DISTINCT")
                      } else {
                        wf
                      }
                    // We get an aggregate function, we need to wrap it in an AggregateExpression.
                    case agg: AggregateFunction => AggregateExpression(agg, Complete, isDistinct)
                    // This function is not an aggregate function, just return the resolved one.
                    case other =>
                      if (isDistinct) {
                        failAnalysis(s"${other.prettyName} does not support the modifier DISTINCT")
                      } else {
                        other
                      }
                  }
                }
            }
        }
    
    
    def lookupFunction(
          name: FunctionIdentifier,
          children: Seq[Expression]): Expression = synchronized {
        // Note: the implementation of this function is a little bit convoluted.
        // We probably shouldn't use a single FunctionRegistry to register all three kinds of functions
        // (built-in, temp, and external).
        if (name.database.isEmpty && functionRegistry.functionExists(name)) {
          // This function has been already loaded into the function registry.
          return functionRegistry.lookupFunction(name, children)
        }
    
        // If the name itself is not qualified, add the current database to it.
        val database = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
        val qualifiedName = name.copy(database = Some(database))
    
        if (functionRegistry.functionExists(qualifiedName)) {
          // This function has been already loaded into the function registry.
          // Unlike the above block, we find this function by using the qualified name.
          return functionRegistry.lookupFunction(qualifiedName, children)
        }
    
        // The function has not been loaded to the function registry, which means
        // that the function is a permanent function (if it actually has been registered
        // in the metastore). We need to first put the function in the FunctionRegistry.
        // TODO: why not just check whether the function exists first?
        val catalogFunction = try {
          externalCatalog.getFunction(database, name.funcName)
        } catch {
          case _: AnalysisException => failFunctionLookup(name)
          case _: NoSuchPermanentFunctionException => failFunctionLookup(name)
        }
        loadFunctionResources(catalogFunction.resources)
        // Please note that qualifiedName is provided by the user. However,
        // catalogFunction.identifier.unquotedString is returned by the underlying
        // catalog. So, it is possible that qualifiedName is not exactly the same as
        // catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
        // At here, we preserve the input from the user.
        registerFunction(catalogFunction.copy(identifier = qualifiedName), overrideIfExists = false)
        // Now, we need to create the Expression.
        functionRegistry.lookupFunction(qualifiedName, children)
      }
    

      函数的绑定是在functionRegistry中寻找绑定;

    其他的规则和策略感兴趣的可以阅读源码来学习。这一步之后就生成了Analyzed Plan。

  • 相关阅读:
    ssh登录 The authenticity of host 192.168.0.xxx can't be established. 的问题
    Linux学习安装
    linux中的虚拟环境工具
    linux 文件目录权限
    PHP利用百度ai实现文本和图片审核
    Laravel + Swoole 打造IM简易聊天室
    Mysql索引降维 优化查询 提高效率
    Nginx支持比Apache高并发的原因
    网站高并发解决方案(理论知识)
    mysql大量数据分页查询优化-延迟关联
  • 原文地址:https://www.cnblogs.com/ldsggv/p/13380523.html
Copyright © 2011-2022 走看看