zoukankan      html  css  js  c++  java
  • Spark SQL源码解析(四)Optimization和Physical Planning阶段解析

    Spark SQL原理解析前言:

    Spark SQL源码剖析(一)SQL解析框架Catalyst流程概述

    Spark SQL源码解析(二)Antlr4解析Sql并生成树

    Spark SQL源码解析(三)Analysis阶段分析

    前面已经介绍了SQL parse,将一条SQL语句使用antlr4解析成语法树并使用访问者模式生成Unresolved LogicalPlan,然后是Analysis阶段将Unresolved LogicalPlan转换成Resolved LogicalPlan。这一篇我们介绍Optimization阶段,和生成Physical Planning阶段。

    经过这两个阶段后,就差不多要到最后转换成Spark的RDD任务了。

    Spark SQL Optimization阶段概述

    先来看看Logical Optimization阶段。

    上一篇我们讨论了Analysis阶段如何生成一个真正的Logical Plan树。这一阶段听名字就知道是优化阶段,Spark SQL中有两个部分的优化,第一部分就是这里,是rule-base阶段的优化,就是根据各种关系代数的优化规则,对生成的Logical Plan适配,匹配到就进行相应的优化逻辑。这些规则大概有:投影消除,constant folding,替换null值,布尔表达式简化等等。当然大部分规则细节我也不是很清楚,仅仅能从名字推断一二。这

    同时还可以添加自己的优化rule,也比较容易实现,论文中就给出了一段自定义优化rule的代码:

    object DecimalAggregates extends Rule[LogicalPlan] {
      /** Maximum number of decimal digits in a Long */
      val MAX_LONG_DIGITS = 18
      def apply(plan: LogicalPlan): LogicalPlan = {
        plan transformAllExpressions {
          case Sum(e @ DecimalType.Expression(prec , scale))
            if prec + 10 <= MAX_LONG_DIGITS =>
              MakeDecimal(Sum(UnscaledValue(e)), prec + 10, scale)
      }
    }
    
    

    这段代码的大意是自定义了一个rule,如果匹配到SUM的表达式,那就执行相应的逻辑,论文里描述这里是找到对应的小数并将其转换为未缩放的64位LONG。具体逻辑看不是很明白不过不重要,重要的是编写自己的优化rule很方便就是。

    顺便点一下另一种优化,名字叫做cost-base优化(CBO),是发生在Physical Planning阶段的,这里就先卖个关子,后面说到的时候再讨论吧。

    然后看到源码的时候,会发现Optimizer这个类也是继承自RuleExecutor,继承这个类之后的流程基本都是一样的。前面分析Analysis阶段的时候已经有详细介绍过这个流程,这里就不展开说了。

    其实这优化器的重点应该是各种优化规则,这里我觉得更多的是设计到关系代数表达式优化理论方面的知识,这部分我也不甚精通,所以也就不说了。对这块感兴趣的童鞋可以看看网上别人的文章,这里顺便列几个可能有帮助的博客,

    下面还是来看看最开始的例子进行Optimization阶段后会变成什么样吧,先看看之前的示例代码:

        val df = Seq((1, 1)).toDF("key", "value")
        df.createOrReplaceTempView("src")
        val queryCaseWhen = sql("select key from src ")
    

    然后在Optimization优化阶段后,变成了:

    Project [_1#2 AS key#5]
    +- LocalRelation [_1#2, _2#3]
    

    好吧,看起来没什么变化,与Analysis阶段相比,也就少了个SubqueryAlias ,符合预期。不过也对,就一条SELECT语句能优化到哪去啊。

    Physical Planning生成阶段概述

    相比较于Logical Plan,Physical plan算是Spark可以去执行的东西了,当然本质上它也是一棵树。

    前面说到,Spark有一种cost-based的优化。主要就在这一阶段,在这一阶段,会生成一个或多个Physical Plan,然后使用cost model预估各个Physical Plan的处理性能,最后选择一个最优的Physical Plan。这里最主要优化的是join操作,当触发join操作的时候,会根据左右两边的数据集判断,然后决定使用Broadcast join,还是传统的Hash join,抑或是MergeSort join,有关这几种join的区别这里就不详细解释了,有兴趣童鞋可以百度看看。

    除了cost-based优化,这一阶段也依旧会有rule-based优化,所以说RuleExecutor这个类是很重要的,前面提到的Analysis阶段也好,Optimization阶段也好,包括这里的Physical Plan阶段,只要是涉及到rule-based优化,都会跟RuleExecutor这个类扯上关系。当然这样无疑是极大使用了面向对象的特性,不同的阶段编写不同的rule就行,一次编写,到处复用。

    Physical Planning源码分析

    首先是在QueryExecution中调度,

    class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
      ......其他代码
      lazy val sparkPlan: SparkPlan = {
        SparkSession.setActiveSession(sparkSession)
        // TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
        //       but we will implement to choose the best plan.
        planner.plan(ReturnAnswer(optimizedPlan)).next()
      }
      ......其他代码
    }
      
    

    这里的planner是org.apache.spark.sql.execution.SparkPlanner这个类,而这个类继承自org.apache.spark.sql.catalyst.planning.QueryPlanner,plan()方法也是在父类QueryPlanner中实现的。和RuleExecution类似,QueryPlanner中有一个返回Seq[GenericStrategy[PhysicalPlan]]的方法:def strategies: Seq[GenericStrategy[PhysicalPlan]],这个方法会在子类(也就是SparkPlanner)重写,然后被QueryPlanner的plan()方法调用。

    我们来看看SparkPlanner中strategies方法的重写,再来看QueryPlanner的plan()方法吧。

    class SparkPlanner(
        val sparkContext: SparkContext,
        val conf: SQLConf,
        val experimentalMethods: ExperimentalMethods)
      extends SparkStrategies {
      ......其他代码
      override def strategies: Seq[Strategy] =
        experimentalMethods.extraStrategies ++
          extraPlanningStrategies ++ (
          PythonEvals ::
          DataSourceV2Strategy ::
          FileSourceStrategy ::
          DataSourceStrategy(conf) ::
          SpecialLimits ::
          Aggregation ::
          Window ::
          JoinSelection ::
          InMemoryScans ::
          BasicOperators :: Nil)
    	......其他代码
    	
    

    strategies()返回策略列表,是生成策略GenericStrategy,这是个具体的抽象类,位于org.apache.spark.sql.catalyst.planning包。所谓生成策略,就是决定如果根据Logical Plan生成Physical Plan的策略。比如上面介绍的join操作可以生成Broadcast join,Hash join,抑或是MergeSort join,就是一种生成策略,具体的类就是上面代码中的JoinSelection。每个生成策略GenericStrategy都是object,其apply()方法返回的是Seq[SparkPlan],这里的SparkPlan就是PhysicalPlan(注意:下文会将SparkPlan和PhysicalPlan混着用)。

    明白了生成策略后,就可以来看看QueryPlanner的plan()方法了。

    abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
      ......其他代码
      def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
        // Obviously a lot to do here still...
    
        // Collect physical plan candidates.
        val candidates = strategies.iterator.flatMap(_(plan))	//迭代调用并平铺,变成Iterator[SparkPlan]
    
        // The candidates may contain placeholders marked as [[planLater]],
        // so try to replace them by their child plans.
        val plans = candidates.flatMap { candidate =>
          val placeholders = collectPlaceholders(candidate)
    
          if (placeholders.isEmpty) {
            // Take the candidate as is because it does not contain placeholders.
            Iterator(candidate)
          } else {
            // Plan the logical plan marked as [[planLater]] and replace the placeholders.
            placeholders.iterator.foldLeft(Iterator(candidate)) {
              case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
                // Plan the logical plan for the placeholder.
                val childPlans = this.plan(logicalPlan)	
    
                candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
                  childPlans.map { childPlan =>
                    // Replace the placeholder by the child plan
                    candidateWithPlaceholders.transformUp {
                      case p if p.eq(placeholder) => childPlan
                    }
                  }
                }
            }
          }
        }
    
        val pruned = prunePlans(plans)
        assert(pruned.hasNext, s"No plan for $plan")
        pruned
      }
      
      ......其他代码
    }
    

    这里的流程其实不难,主要工作其实就是调用各个生成策略GenericStrategy的apply()方法,生成Iterator[SparkPlan]。后面很大部分代码是处理占位符,按我的理解,在生成Logical Plan的时候,可能有些无意义的占位符,这种需要使用子节点替换调它。倒数第三行prunePlans()方法按注释说是用来去掉bad plan的,但看实际代码只是原封不动返回。

    这样最终就得到一个Iterator[SparkPlan],每个SparkPlan就是可执行的物理操作了。

    大致流程就是如此,当然具体到一些生成策略没有细说,包括输入源策略,聚合策略等等,每一个都蛮复杂的,这里就不细说,有兴趣可以自行查阅。

    对了,最后还要看看示例代码到这一步变成什么样了,先上示例代码:

        //生成DataFrame
        val df = Seq((1, 1)).toDF("key", "value")
        df.createOrReplaceTempView("src")
        //调用spark.sql
        val queryCaseWhen = sql("select key from src ")
    

    经过Physical Planning阶段后,变成如下:

    Project [_1#2 AS key#5]
    +- LocalTableScan [_1#2, _2#3]
    

    对比上面的optimized阶段,直观看就是LocalRelation变成LocalTableScan。变得更加具体了,但实际上,Project也变了,虽然打印名字相同,但一个的类型是Project,本质上是LogicalPlan。而一个是ProjectExec,本质上是SparkPlan(也就是PhysicalPlan)。这一点通过断点看的更清楚。

    到这一步已经很解决终点了,后面再经过一个Preparations阶段就能生成RDD了,剩下的部分留待下篇介绍吧。

    以上~

  • 相关阅读:
    程序中图片透明 函数(使用SetBkColor API函数)
    编程中使用锁
    event内存泄漏
    diskcache
    linux内核管理
    Vue
    Paxos算法
    索引以及页和区
    CoreRT
    二叉树
  • 原文地址:https://www.cnblogs.com/listenfwind/p/12886205.html
Copyright © 2011-2022 走看看