zoukankan      html  css  js  c++  java
  • Spark SQL源码解析(三)Analysis阶段分析

    Spark SQL原理解析前言:
    Spark SQL源码剖析(一)SQL解析框架Catalyst流程概述

    Spark SQL源码解析(二)Antlr4解析Sql并生成树

    Analysis阶段概述

    首先,这里需要引入一个新概念,前面介绍SQL parse阶段,会使用antlr4,将一条SQL语句解析成语法树,然后使用antlr4的访问者模式遍历生成语法树,也就是Logical Plan。但其实,SQL parse这一阶段生成的Logical Plan是被称为Unresolved Logical Plan。所谓unresolved,就是说SQL语句中的对象都是未解释的。

    比如说一条语句SELECT col FROM sales,当我们不知道col的具体类型(Int,String,还是其他),甚至是否在sales表中有col这一个列的时候,就称之为是Unresolved的。

    而在analysis阶段,主要就是解决这个问题,也就是将Unresolved的变成Resolved的。Spark SQL通过使用Catalyst rule和Catalog来跟踪数据源的table信息。并对Unresolved应用如下的rules(rule可以理解为一条一条的规则,当匹配到树某些节点的时候就会被应用)。

    • 从Catalog中,查询Unresolved Logical Plan中对应的关系(relations)
    • 根据输入属性名(比如上述的col列),映射到具体的属性
    • 确定哪些属性引用相同的值并赋予它们唯一的ID(这个是论文中的内容,看不是很明白,不过主要是方便后面优化器实现的)
    • Propagating and coercing types through expressions,这个看着也是有点迷,大概是对数据进行强制转换,方便后续对1 + col 这样的数据进行处理。

    而处理过后,就会真正生成一棵Resolved Logical Plan,接下来就去看看源码里面是怎么实现的吧。

    Analysis阶段详细解析

    通过跟踪调用代码,在调用完SQL parse的内容后,就会跑去org.apache.spark.sql.execution.QueryExecution这个类中执行,后面包括Logical Optimization阶段,Physical Planning阶段,生成RDD任务阶段都是在这个类中进行调度的。不过此次只介绍Analysis。

    在QueryExecution中,会去调用org.apache.spark.sql.catalyst.Analyzer这个类,这个类是继承自org.apache.spark.sql.catalyst.rules.RuleExecutor,记住这个,后面还有很多个阶段都是通过继承这个类实现的,实现原理也和Analysis阶段相似。

    继承自RuleExecutor的类,包括这里的Analyzer类,都是在自身实现大量的rule,然后注册到batch变量中,这里大概贴点代码瞅瞅。

    class Analyzer(
        catalog: SessionCatalog,
        conf: SQLConf,
        maxIterations: Int)
      extends RuleExecutor[LogicalPlan] with CheckAnalysis {
      
      ......其他代码
        lazy val batches: Seq[Batch] = Seq(
        Batch("Hints", fixedPoint,
          new ResolveHints.ResolveBroadcastHints(conf),
          ResolveHints.ResolveCoalesceHints,
          ResolveHints.RemoveAllHints),
        Batch("Simple Sanity Check", Once,
          LookupFunctions),
        Batch("Substitution", fixedPoint,
          CTESubstitution,
          WindowsSubstitution,
          EliminateUnions,
          new SubstituteUnresolvedOrdinals(conf)),
          
        ......其他代码
      
    

    先大概说下batches这个变量吧,batches是由Batch的列表构成。而Batch的具体签名如下:

    abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
      ......其他代码
      protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
      ......其他代码
    }
    

    一个Batch由策略Strategy,和一组Rule构成,其中策略Strategy主要是区分迭代次数用的,按我的理解,某些rule可以迭代多次,越多次效果会越好,类似机器学习的学习过程。而策略Strategy会规定迭代一次还是固定次数。而rule就是具体的应用规则了,这个先略过。

    在Analyzer这个类中,你会发现很大篇幅的代码都是各种各样rule的实现。然后最终,Analyzer会去调用super.execute()方法,也就是调用父类(RuleExecutor)的方法执行具体逻辑。而父类又会去调用这个batches变量,循环来与Sql Parse阶段生成的Unresolved Logical Plan做匹配,匹配到了就执行具体的验证。还是贴下代码看看吧。

    abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
      def execute(plan: TreeType): TreeType = {
        var curPlan = plan
        val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
        //遍历Analyzer中定义的batchs变量
        batches.foreach { batch =>
          val batchStartPlan = curPlan
          var iteration = 1
          var lastPlan = curPlan
          var continue = true
    
          // Run until fix point (or the max number of iterations as specified in the strategy.
    	  //这里的continue决定是否再次循环,由batch的策略(固定次数或单次),以及该batch对plan的作用效果这两者控制
          while (continue) {
    	    //调用foldLeft让batch中每条rule应用于plan,然后就是执行对应rule规则逻辑了
            curPlan = batch.rules.foldLeft(curPlan) {
              case (plan, rule) =>
                val startTime = System.nanoTime()
                val result = rule(plan)
                val runTime = System.nanoTime() - startTime
    
                if (!result.fastEquals(plan)) {
                  queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
                  queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
                  logTrace(
                    s"""
                      |=== Applying Rule ${rule.ruleName} ===
                      |${sideBySide(plan.treeString, result.treeString).mkString("
    ")}
                    """.stripMargin)
                }
                queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
                queryExecutionMetrics.incNumExecution(rule.ruleName)
    
                // Run the structural integrity checker against the plan after each rule.
                if (!isPlanIntegral(result)) {
                  val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " +
                    "the structural integrity of the plan is broken."
                  throw new TreeNodeException(result, message, null)
                }
    
                result
            }
            iteration += 1
    		//策略的生效地方
            if (iteration > batch.strategy.maxIterations) {
              // Only log if this is a rule that is supposed to run more than once.
              if (iteration != 2) {
                val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}"
                if (Utils.isTesting) {
                  throw new TreeNodeException(curPlan, message, null)
                } else {
                  logWarning(message)
                }
              }
              continue = false
            }
    
            if (curPlan.fastEquals(lastPlan)) {
              logTrace(
                s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
              continue = false
            }
            lastPlan = curPlan
          }
    
          if (!batchStartPlan.fastEquals(curPlan)) {
            logDebug(
              s"""
                |=== Result of Batch ${batch.name} ===
                |${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("
    ")}
              """.stripMargin)
          } else {
            logTrace(s"Batch ${batch.name} has no effect.")
          }
        }
    
        curPlan
      }
    
    }
    

    其实这个类的逻辑不难懂,就是遍历batchs变量,而每个batch又会去使用scala的foldLeft函数,遍历应用里面的每条rule。然后根据Batch的策略以及将新生成的Plan与旧的Plan比较,决定是否要再次遍历。然后最后将新生成的Plan输出。

    如果不清楚scala的foldLeft函数内容,可以百度下看看,不难懂的。然后跟RuleExecutor有关的基本都是这个套路,区别只在于rule的不同。

    接下来我们来看看具体是如果应用一条rule,将Unresolved LogicalPlan转换成Resolved LogicalPlan吧。

    Rule介绍

    前面说到,在Analyzer中重写了Batchs变量,Batchs包含多个Batch,每个Batch又有多个Rule,所以不可能全部看过来,庆幸的是,要了解Unresolved LogicalPlan转换成Resolved LogicalPlan,只需要看一个Rule就行,那就是ResolveRelations这个Rule,我们就只介绍这个Rule来管中窥豹。

    各自Rule基本都是object类型,也就是静态的,且继承自Rule这个抽象类,Rule很简单,就一个ruleName变量喝一个apply方法用以实现逻辑,然后就没了。所以重点还是在继承后的实现逻辑。

    前面提到,从Unresolved到Resolved的过程,可以理解为就是将SQL语句中的类型和字段,映射到实体表中的字段信息。而存储实体表元数据信息的,是Catalog,到具体的类,是org.apache.spark.sql.catalyst.catalog.SessionCatalog。

    我们来看看具体的逻辑代码:

    object ResolveRelations extends Rule[LogicalPlan] {
        ......其他代码
        def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
          case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
            EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
              case v: View =>
                u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
              case other => i.copy(table = other)
            }
          case u: UnresolvedRelation => resolveRelation(u)
        }
    	......其他代码
    }
    

    逻辑其实也蛮简单的,就是匹配UnresolvedRelation(就是Unresolved的节点),然后递归去Catlog中获取对应的元数据信息,递归将它及子节点变成Resoulved。不过还有个要提的是,SQL中对应的,有可能是文件数据,或是数据库中的表,抑或是视图(view),针对文件数据是不会转换的,转换成Resolved会在后面进行。而表和视图则会立即转换。

    最后,接上一篇的例子,接着来看看,经过Analysis阶段后,LogicalPlan变成什么样吧,上一篇SQL parse使用的示例代码:

        //生成DataFrame
        val df = Seq((1, 1)).toDF("key", "value")
        df.createOrReplaceTempView("src")
        //调用spark.sql
        val queryCaseWhen = sql("select key from src ")
    

    经过上次介绍的SQL parse后是变成这样:

    'Project ['key]
    +- 'UnresolvedRelation `src`
    

    这里的涵义上篇已介绍,不再赘述,而经过本次的Analysis后,则会变成这样

    Project [key#5]
    +- SubqueryAlias `src`
       +- Project [_1#2 AS key#5, _2#3 AS value#6]
          +- LocalRelation [_1#2, _2#3]
    

    可以发现,主要就是对UnresolvedRelation进行展开,现在我们可以发现src有两个字段,分别是key和value及其对应的别名(1#2,2#3)。这里还有一个SubqueryAlias,这个我也不是很明白,按源码里面的说法,这里的subquery仅用来提供属性的作用域信息,Analysis阶段过后就就可以将其删除,所以在Optimization阶段后会发现SubqueryAlias消失了。

    小结

    OK,那今天就先介绍到这里吧,主要综述了Analysis的内容,然后介绍RuleExecution的逻辑,最后简单看了个Rule的具体内容以及承接SQL parse阶段的例子。有兴趣的童鞋可以自己去顺着思路翻源码看看。

    以上~

  • 相关阅读:
    would clobber existing tag
    已成功与服务器建立连接,但是在登录前的握手期间发生错误。 (provider: TCP 提供程序, error: 0
    C#搭建简单的http服务器,访问静态资源
    使用iis反向代理
    WorkerServices部署为Windows服务
    mongo 操作数据库的方式
    odoo db_name 指定多个数据库
    odoo 如何设置字段变更跟踪
    odoo qweb 视图使用widget
    odoo 代码片段比较全的扩展
  • 原文地址:https://www.cnblogs.com/listenfwind/p/12795934.html
Copyright © 2011-2022 走看看