zoukankan      html  css  js  c++  java
  • 第四篇:Spark SQL Catalyst源码分析之TreeNode Library

    /** Spark SQL源码分析系列文章*/

        前几篇文章介绍了Spark SQL的Catalyst的核心运行流程SqlParser,和Analyzer,本来打算直接写Optimizer的,但是发现忘记介绍TreeNode这个Catalyst的核心概念,介绍这个可以更好的理解Optimizer是如何对Analyzed Logical Plan进行优化的生成Optimized Logical Plan,本文就将TreeNode基本架构进行解释。

        

    一、TreeNode类型

       TreeNode Library是Catalyst的核心类库,语法树的构建都是由一个个TreeNode组成。TreeNode本身是一个BaseType <: TreeNode[BaseType] 的类型,并且实现了Product这个trait,这样可以存放异构的元素了。
       TreeNode有三种形态:BinaryNodeUnaryNodeLeaf Node
       在Catalyst里,这些Node都是继承自Logical Plan,可以说每一个TreeNode节点就是一个Logical Plan(包含Expression)(直接继承自TreeNode)

       主要继承关系类图如下:

     1、BinaryNode 

    二元节点,即有左右孩子的二叉节点

    [java] view plain copy
     
    1. [[TreeNode]] that has two children, [[left]] and [[right]].  
    2. trait BinaryNode[BaseType <: TreeNode[BaseType]] {  
    3.   def left: BaseType  
    4.   def right: BaseType  
    5.   def children = Seq(left, right)  
    6. }  
    7. abstract class BinaryNode extends LogicalPlan with trees.BinaryNode[LogicalPlan] {  
    8.   self: Product =>  
    9. }  

     节点定义比较简单,左孩子,右孩子都是BaseType。 children是一个Seq(left, right)

    下面列出主要继承二元节点的类,可以当查询手册用 :)

    这里提示下平常常用的二元节点:Join和Union

     2、UnaryNode

     一元节点,即只有一个孩子节点

    [java] view plain copy
     
    1.  A [[TreeNode]] with a single [[child]].  
    2. trait UnaryNode[BaseType <: TreeNode[BaseType]] {  
    3.   def child: BaseType  
    4.   def children = child :: Nil  
    5. }  
    6. abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan] {  
    7.   self: Product =>  
    8. }  

    下面列出主要继承一元节点的类,可以当查询手册用 :)

    常用的二元节点有,Project,Subquery,Filter,Limit ...等

    3、Leaf Node 

    叶子节点,没有孩子节点的节点。

    [java] view plain copy
     
    1. A [[TreeNode]] with no children.  
    2. trait LeafNode[BaseType <: TreeNode[BaseType]] {  
    3.   def children = Nil  
    4. }  
    5. abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {  
    6.   self: Product =>  
    7.   // Leaf nodes by definition cannot reference any input attributes.  
    8.   override def references = Set.empty  
    9. }  

    下面列出主要继承叶子节点的类,可以当查询手册用 :)

    提示常用的叶子节点: Command类系列,一些Funtion函数,以及Unresolved Relation...etc.

    二、TreeNode 核心方法

      简单介绍一个TreeNode这个类的属性和方法

      currentId
      一颗树里的TreeNode有个唯一的id,类型是java.util.concurrent.atomic.AtomicLong原子类型。

    [java] view plain copy
     
    1. private val currentId = new java.util.concurrent.atomic.AtomicLong  
    2. protected def nextId() = currentId.getAndIncrement()  

      sameInstance
      判断2个实例是否是同一个的时候,只需要判断TreeNode的id。

    [java] view plain copy
     
    1. def sameInstance(other: TreeNode[_]): Boolean = {  
    2.   this.id == other.id  
    3. }  

      fastEquals,更常用的一个快捷的判定方法,没有重写Object.Equals,这样防止scala编译器生成case class equals 方法

    [java] view plain copy
     
    1. def fastEquals(other: TreeNode[_]): Boolean = {  
    2.    sameInstance(other) || this == other  
    3.  }  

      map,flatMap,collect都是递归的对子节点进行应用PartialFunction,其它方法还有很多,篇幅有限这里不一一描述了。

    2.1、核心方法 transform 方法

      transform该方法接受一个PartialFunction,就是就是前一篇文章Analyzer里提到的Batch里面的Rule。
      是会将Rule迭代应用到该节点的所有子节点,最后返回这个节点的副本(一个和当前节点不同的节点,后面会介绍,其实就是利用反射来返回一个修改后的节点)。
      如果rule没有对一个节点进行PartialFunction的操作,就返回这个节点本身。

      来看一个例子:

    [java] view plain copy
     
    1. object GlobalAggregates extends Rule[LogicalPlan] {  
    2.   def apply(plan: LogicalPlan): LogicalPlan = plan transform {   //apply方法这里调用了logical plan(TreeNode) 的transform方法来应用一个PartialFunction。  
    3.     case Project(projectList, child) if containsAggregates(projectList) =>  
    4.       Aggregate(Nil, projectList, child)  
    5.   }  
    6.   def containsAggregates(exprs: Seq[Expression]): Boolean = {  
    7.     exprs.foreach(_.foreach {  
    8.       case agg: AggregateExpression => return true  
    9.       case _ =>  
    10.     })  
    11.     false  
    12.   }  
    13. }  

     这个方法真正的调用是transformChildrenDown,这里提到了用先序遍历来对子节点进行递归的Rule应用。
     如果在对当前节点应用rule成功,修改后的节点afterRule,来对其children节点进行rule的应用。

     transformDown方法:

    [java] view plain copy
     
    1. /** 
    2. * Returns a copy of this node where `rule` has been recursively applied to it and all of its 
    3. * children (pre-order). When `rule` does not apply to a given node it is left unchanged. 
    4. * @param rule the function used to transform this nodes children 
    5. */  
    6. ef transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {  
    7.  val afterRule = rule.applyOrElse(this, identity[BaseType])  
    8.  // Check if unchanged and then possibly return old copy to avoid gc churn.  
    9.  if (this fastEquals afterRule) {  
    10.    transformChildrenDown(rule)  //修改前节点this.transformChildrenDown(rule)  
    11.  } else {  
    12.    afterRule.transformChildrenDown(rule) //修改后节点进行transformChildrenDown  
    13.  }  

      最重要的方法transformChildrenDown:
      对children节点进行递归的调用PartialFunction,利用最终返回的newArgs来生成一个新的节点,这里调用了makeCopy()来生成节点。

     transformChildrenDown方法:

    [java] view plain copy
     
    1.  /** 
    2.  * Returns a copy of this node where `rule` has been recursively applied to all the children of 
    3.  * this node.  When `rule` does not apply to a given node it is left unchanged. 
    4.  * @param rule the function used to transform this nodes children 
    5.  */  
    6. def transformChildrenDown(rule: PartialFunction[BaseType, BaseType]): this.type = {  
    7.   var changed = false  
    8.   val newArgs = productIterator.map {  
    9.     case arg: TreeNode[_] if children contains arg =>  
    10.       val newChild = arg.asInstanceOf[BaseType].transformDown(rule) //递归子节点应用rule  
    11.       if (!(newChild fastEquals arg)) {  
    12.         changed = true  
    13.         newChild  
    14.       } else {  
    15.         arg  
    16.       }  
    17.     case Some(arg: TreeNode[_]) if children contains arg =>  
    18.       val newChild = arg.asInstanceOf[BaseType].transformDown(rule)  
    19.       if (!(newChild fastEquals arg)) {  
    20.         changed = true  
    21.         Some(newChild)  
    22.       } else {  
    23.         Some(arg)  
    24.       }  
    25.     case m: Map[_,_] => m  
    26.     case args: Traversable[_] => args.map {  
    27.       case arg: TreeNode[_] if children contains arg =>  
    28.         val newChild = arg.asInstanceOf[BaseType].transformDown(rule)  
    29.         if (!(newChild fastEquals arg)) {  
    30.           changed = true  
    31.           newChild  
    32.         } else {  
    33.           arg  
    34.         }  
    35.       case other => other  
    36.     }  
    37.     case nonChild: AnyRef => nonChild  
    38.     case null => null  
    39.   }.toArray  
    40.   if (changed) makeCopy(newArgs) else this //根据作用结果返回的newArgs数组,反射生成新的节点副本。  
    41. }  

      makeCopy方法,反射生成节点副本  

    [java] view plain copy
     
    1. /** 
    2.   * Creates a copy of this type of tree node after a transformation. 
    3.   * Must be overridden by child classes that have constructor arguments 
    4.   * that are not present in the productIterator. 
    5.   * @param newArgs the new product arguments. 
    6.   */  
    7.  def makeCopy(newArgs: Array[AnyRef]): this.type = attachTree(this, "makeCopy") {  
    8.    try {  
    9.      val defaultCtor = getClass.getConstructors.head  //反射获取默认构造函数的第一个  
    10.      if (otherCopyArgs.isEmpty) {  
    11.        defaultCtor.newInstance(newArgs: _*).asInstanceOf[this.type] //反射生成当前节点类型的节点  
    12.      } else {  
    13.        defaultCtor.newInstance((newArgs ++ otherCopyArgs).toArray: _*).asInstanceOf[this.type] //如果还有其它参数,++  
    14.      }  
    15.    } catch {  
    16.      case e: java.lang.IllegalArgumentException =>  
    17.        throw new TreeNodeException(  
    18.          this, s"Failed to copy node.  Is otherCopyArgs specified correctly for $nodeName? "  
    19.            + s"Exception message: ${e.getMessage}.")  
    20.    }  
    21.  }  

    三、TreeNode实例

      现在准备从一段sql来出发,画一下这个spark sql的整体树的transformation。
     SELECT * FROM (SELECT * FROM src) a join (select * from src)b on a.key=b.key
     首先,我们先执行一下,在控制台里看一下生成的计划:
    [java] view plain copy
     
    1. <span style="font-size:12px;">sbt/sbt hive/console  
    2. Using /usr/java/default as default JAVA_HOME.  
    3. Note, this will be overridden by -java-home if it is set.  
    4. [info] Loading project definition from /app/hadoop/shengli/spark/project/project  
    5. [info] Loading project definition from /app/hadoop/shengli/spark/project  
    6. [info] Set current project to root (in build file:/app/hadoop/shengli/spark/)  
    7. [info] Starting scala interpreter...  
    8. [info]   
    9. import org.apache.spark.sql.catalyst.analysis._  
    10. import org.apache.spark.sql.catalyst.dsl._  
    11. import org.apache.spark.sql.catalyst.errors._  
    12. import org.apache.spark.sql.catalyst.expressions._  
    13. import org.apache.spark.sql.catalyst.plans.logical._  
    14. import org.apache.spark.sql.catalyst.rules._  
    15. import org.apache.spark.sql.catalyst.types._  
    16. import org.apache.spark.sql.catalyst.util._  
    17. import org.apache.spark.sql.execution  
    18. import org.apache.spark.sql.hive._  
    19. import org.apache.spark.sql.hive.test.TestHive._  
    20. import org.apache.spark.sql.parquet.ParquetTestData  
    21.     
    22. scala> val query = sql("SELECT * FROM (SELECT * FROM src) a join (select * from src)b on a.key=b.key")</span>  

    3.1、UnResolve Logical Plan

      第一步生成UnResolve Logical Plan 如下:
    [java] view plain copy
     
    1. scala> query.queryExecution.logical  
    2. res0: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   
    3. Project [*]  
    4.  Join Inner, Some(('a.key = 'b.key))  
    5.   Subquery a  
    6.    Project [*]  
    7.     UnresolvedRelation None, src, None  
    8.   Subquery b  
    9.    Project [*]  
    10.     UnresolvedRelation None, src, None  
      如果画成树是这样的,仅个人理解:
      我将一开始介绍的三种Node分别用绿色UnaryNode,红色Binary Node 和 蓝色 LeafNode 来表示。

    3.2、Analyzed Logical Plan

      Analyzer会将允用Batch的Rules来对Unresolved Logical  Plan Tree 进行rule应用,这里用来EliminateAnalysisOperators将Subquery给消除掉,Batch("Resolution将Atrribute和Relation给Resolve了,Analyzed Logical Plan Tree如下图:

    3.3、Optimized Plan

      我把Catalyst里的Optimizer戏称为Spark SQL的优化大师,因为整个Spark SQL的优化都是在这里进行的,后面会有文章来讲解Optimizer。
      在这里,优化的不明显,因为SQL本身不复杂
    [java] view plain copy
     
    1. scala> query.queryExecution.optimizedPlan  
    2. res3: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   
    3. Project [key#0,value#1,key#2,value#3]  
    4.  Join Inner, Some((key#0 = key#2))  
    5.   MetastoreRelation default, src, None  
    6.   MetastoreRelation default, src, None  
    生成的树如下图:

    3.4、executedPlan

      最后一步是最终生成的物理执行计划,里面涉及到了Hive的TableScan,涉及到了HashJoin操作,还涉及到了Exchange,Exchange涉及到了Shuffle和Partition操作。
      
    [java] view plain copy
     
    1. scala> query.queryExecution.executedPlan  
    2. res4: org.apache.spark.sql.execution.SparkPlan =   
    3. Project [key#0:0,value#1:1,key#2:2,value#3:3]  
    4.  HashJoin [key#0], [key#2], BuildRight  
    5.   Exchange (HashPartitioning [key#0:0], 150)  
    6.    HiveTableScan [key#0,value#1], (MetastoreRelation default, src, None), None  
    7.   Exchange (HashPartitioning [key#2:0], 150)  
    8.    HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None  
     生成的物理执行树如图:
     
     

    四、总结:

        本文介绍了Spark SQL的Catalyst框架核心TreeNode类库,绘制了TreeNode继承关系的类图,了解了TreeNode这个类在Catalyst所起到的作用。语法树中的Logical Plan均派生自TreeNode,并且Logical Plan派生出TreeNode的三种形态,即Binary Node, Unary Node, Leaft Node。 正式这几种节点,组成了Spark SQl的Catalyst的语法树。
      TreeNode的transform方法是核心的方法,它接受一个rule,会对当前节点的孩子节点进行递归的调用rule,最后会返回一个TreeNode的copy,这种操作就是transformation,贯穿了Spark SQL执行的几个核心阶段,如Analyze,Optimize阶段。
      最后用一个实际的例子,展示出来Spark SQL的执行树生成流程。
      
      我目前的理解就是这些,如果分析不到位的地方,请大家多多指正。
     
    ——EOF——

    原创文章,转载请注明:

    转载自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

    本文链接地址:http://blog.csdn.net/oopsoom/article/details/38084079

    注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

    image

    转自:http://blog.csdn.net/oopsoom/article/details/38084079
  • 相关阅读:
    继承 接口 多态
    组合(补充)和 继承
    面向对象初级
    模块和包
    time,random,os,sys,序列化模块
    inline详解
    C++静态数据成员与静态成员函数
    OpenCV Mat数据类型及位数总结(转载)
    拼搏奋斗类
    c++虚函数实现机制(转)
  • 原文地址:https://www.cnblogs.com/sh425/p/7596401.html
Copyright © 2011-2022 走看看