zoukankan      html  css  js  c++  java
  • 47、Spark SQL核心源码深度剖析(DataFrame lazy特性、Optimizer优化策略等)

    一、源码分析

    1、

    ###入口org.apache.spark.sql/SQLContext.scala
    
    sql()方法:
    /**
        * 使用Spark执行一条SQL查询语句,将结果作为DataFrame返回,SQL解析使用的方言,可以
        * 通过spark.sql.dialect参数,来进行设置
        */
      def sql(sqlText: String): DataFrame = {
        // 首先,查看我们通过SQLContext.setConf()方法设置的参数,Spark.sql.dialect,
        // 如果是sql方言,就进入接下来的执行,如果不是sql,就直接报错
        // 这里的conf就是SQLConf
        if (conf.dialect == "sql") {
          // SQLContext的sql()方法正式进入执行阶段,Spark SQL也是有lazy特性的,其实,调用sql()去执行一条SQL语句的时候
          // 默认只会调用SqlParser组件针对SQL语句生成一个Unresolved LogicPlan,然后,将Unresolved LogicPlan和SQLXontext
          // 自身的实例(this),封装为一个DataFrame,返回DataFrame给用户,其中仅仅封装了SQL语句的Unresolved LogicPlan
     
     
          // 在用户拿到了封装了Unresolved LogicPlan的DataFrame之后,可以执行一些show()、select().show()、groupBy().show()
          // 或者拿到DataFrame对应的RDD,执行一系列transformation操作,最后执行一个Action后
          // 才会去触发Spark SQL后续的SQL执行流程,包括Analyzer、Optimizer、SparkPlan、execute PysicalPlan
     
          // 首先看parseSql()方法,传入SQL语句,调用SqlParser解析SQL,获取Unresolved LogicPlan
     
          // parseSql(),总结一下,就是调用了SqlParser的apply()方法,即由SqlParser将SQL语句通过内部的各种select、insert这种词法、语法解析器
          // 来进行解析,然后将SQL语句的各个部分,组装成一个LogicalPlan,但是这里的LogicalPlan,只是一颗语法树,还不知道自己具体执行计划的时候,
          // 数据从哪里来,所以,叫做UnResolved LogicalPlan,解析了SQL,拿到了UnResolved LogicalPlan,会封装成一个DataFrame,返回给用户,
          // 用户此时就开始用DataFrame执行各种操作了
          DataFrame(this, parseSql(sqlText))
        } else {
          sys.error(s"Unsupported SQL dialect: ${conf.dialect}")
        }
      }
    
    
    
    
    setConf()方法:
    protected[sql] lazy val conf: SQLConf = new SQLConf
      /**
        * 如果要给Spark SQL设置一些参数,那么要使用SQLContext.setConf()方法,底层是会将配置信息放入SQLConf对象中的
        */
      def setConf(props: Properties): Unit = conf.setConf(props)
    
    
    
    
    parseSql()方法:
      // sqlParser 实际上是SparkSQLParser的实例,SparkSQLParser里面,又封装了catalyst的SqlParser
      @transient
      protected[sql] val sqlParser = {
        val fallback = new catalyst.SqlParser
        new SparkSQLParser(fallback(_))
      }
      protected[sql] def parseSql(sql: String): LogicalPlan = {
        // parseSql()方法,是SqlParser执行的入口,实际上,会调用SqlParser的apply()方法,来获取一个对SQL语句解析后的LogicalPlan
        ddlParser(sql, false).getOrElse(sqlParser(sql))
      }
    
    
    
    parseSql()会调SqlParser的apply()方法
    SqlParser这个类在org.apache.spark.sql.catalyst包下,其继承了AbstractSparkSQLParser 类
    
    class SqlParser extends AbstractSparkSQLParser {
    
    parseSql()会调SqlParser的apply()方法,会调AbstractSparkSQLParser的apply()方法
    
    private[sql] abstract class AbstractSparkSQLParser
      extends StandardTokenParsers with PackratParsers {
     
      /**
        * 实际上,调用SqlParser的apply()方法,将SQL解析成LogicalPlan时,会调用到SqlParser的父类,AbstractSparkSQLParser
        * 的apply()方法
        */
      def apply(input: String): LogicalPlan = {
        // Initialize the Keywords.
        lexical.initialize(reservedWords)
        // 这个代码的意思,就是用lexical.Scanner,针对SQL语句,来进行语法检查、分析,满足语法检查结果的话,就使用SQL解析器
        // 针对SQL进行解析,包括词法解析(将SQL语句解析成一个个短语,token)、语法解析,最后生成一个Unresolved LogicalPlan
        // 该LogicalPlan仅仅针对SQL语句本身生成,纯语法,不设计任何关联的数据源等等信息
        phrase(start)(new lexical.Scanner(input)) match {
          case Success(plan, _) => plan
          case failureOrError => sys.error(failureOrError.toString)
        }
      }
    }
    
    
    
    
    
    
    ###org.apache.spark.sql.catalyst/AbstractSparkSQLParser.scala
    
    /**
      * 用SqlLexical,对SQL语句,执行一个检查,如果满足检查的话,那么才去分析,
      * 否则,说明SQL语句本身的语法,是有问题的
      */
     
    class SqlLexical extends StdLexical {
      case class FloatLit(chars: String) extends Token {
        override def toString = chars
      }
    }
    
    
    
    
    /**
        * 从这里可以看出来,Spark SQL是支持两种主要的SQL语法的,包括select语句和insert语句
        */
      protected lazy val start: Parser[LogicalPlan] =
        ( (select | ("(" ~> select <~ ")")) *
          ( UNION ~ ALL        ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) }
          | INTERSECT          ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) }
          | EXCEPT             ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)}
          | UNION ~ DISTINCT.? ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
          )
        | insert
        )
     
      /**
        * 这里,就是对select语句执行解析,语句里面可以包括FROM,WHERE,GROUP,HAVING,LIMIT
        */
      protected lazy val select: Parser[LogicalPlan] =
        SELECT ~> DISTINCT.? ~
          repsep(projection, ",") ~
          (FROM   ~> relations).? ~
          (WHERE  ~> expression).? ~
          (GROUP  ~  BY ~> rep1sep(expression, ",")).? ~
          (HAVING ~> expression).? ~
          sortType.? ~
          (LIMIT  ~> expression).? ^^ {
            case d ~ p ~ r ~ f ~ g ~ h ~ o ~ l  =>
              val base = r.getOrElse(NoRelation)
              val withFilter = f.map(Filter(_, base)).getOrElse(base)
              val withProjection = g
                .map(Aggregate(_, assignAliases(p), withFilter))
                .getOrElse(Project(assignAliases(p), withFilter))
              val withDistinct = d.map(_ => Distinct(withProjection)).getOrElse(withProjection)
              val withHaving = h.map(Filter(_, withDistinct)).getOrElse(withDistinct)
              val withOrder = o.map(_(withHaving)).getOrElse(withHaving)
              val withLimit = l.map(Limit(_, withOrder)).getOrElse(withOrder)
              withLimit
          }
     
      /**
        * 解析INSERT、OVERWRITE这种语法
        */
      protected lazy val insert: Parser[LogicalPlan] =
        INSERT ~> (OVERWRITE ^^^ true | INTO ^^^ false) ~ (TABLE ~> relation) ~ select ^^ {
          case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o)
        }
     
      /**
        * 查那些列
        */
      protected lazy val projection: Parser[Expression] =
        expression ~ (AS.? ~> ident.?) ^^ {
          case e ~ a => a.fold(e)(Alias(e, _)())
        }
     
      // Based very loosely on the MySQL Grammar.
      // http://dev.mysql.com/doc/refman/5.0/en/join.html
      /**
        * 会将你的SQL语句里面解析出来的各种token,或者TreeNode,给关联起来,最后组成一颗语法树
        * 语法树封装在LogicalPlan中,但是要注意,此时的LogicalPlan,还是Unresolved LogicalPlan
        */
     
      protected lazy val relations: Parser[LogicalPlan] =
        ( relation ~ rep1("," ~> relation) ^^ {
            case r1 ~ joins => joins.foldLeft(r1) { case(lhs, r) => Join(lhs, r, Inner, None) } }
        | relation
        )


    2、

    ###org.apache.spark.sql/SQLContext.scala
    
    /**
      * 通过这个Join, left: LogicalPlan,right: LogicalPlan
      * 意思就是说,将SQL语句的各个部分,通过Spark SQL的规则,组合拼装成一个语法树
      */
    case class Join(
      left: LogicalPlan,
      right: LogicalPlan,
      joinType: JoinType,
      condition: Option[Expression]) extends BinaryNode {
    }
    
    
    
    
      /**
        * 实际上,在后面操作DataFrame的时候,在实际真正要执行SQL语句,对数据进行查询,返回结果的时候,会触发SQLContext的executePlan()方法的执行,
        * 该方法,实际上会返回一个QueryExecution,这个QueryExecution实际上,会触发整个后续的流程
        */
      protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql))
     
      protected[sql] def executePlan(plan: LogicalPlan) = new this.QueryExecution(plan)
    
    
    
    
    
    @DeveloperApi
      protected[sql] class QueryExecution(val logical: LogicalPlan) {
        def assertAnalyzed(): Unit = checkAnalysis(analyzed)
     
        // 用一个Unresolved LogicalPlan去构造一个QueryExecution的实例对象,那么SQL语句的执行就会一步步触发
     
        // Analyzer的apply()方法执行结束后,得到一个Resolved LogicalPlan
        lazy val analyzed: LogicalPlan = analyzer(logical)
        // 会通过CacheManager 执行一个缓存的操作,用一个cacheManager,调用其useCachedData()方法,就是说,如果之前已经缓存过这个执行计划
        // 又再次执行的话,那么,其实可以使用缓存中的数据
        lazy val withCachedData: LogicalPlan = {
          assertAnalyzed
          cacheManager.useCachedData(analyzed)
        }
        // 调用Optimizer的apply()方法,针对Resolved LogicalPlan 调用Optimizer,进行优化,获得Optimized LogicalPlan,获得优化后的逻辑执行计划
        lazy val optimizedPlan: LogicalPlan = optimizer(withCachedData)
     
        // TODO: Don't just pick the first one...
        // 用SparkPlanner,对Optimizer生成的Optimized LogicalPlan,创建一个SparkPlan
        lazy val sparkPlan: SparkPlan = {
          SparkPlan.currentContext.set(self)
          planner(optimizedPlan).next()
        }
        // executedPlan should not be used to initialize any SparkPlan. It should be
        // only used for execution.
        // 使用SparkPlan 生成一个可以执行的SparkPlan,此时就是PhysicalPlan,物理执行计划,直接可以执行了,已经绑定到了数据源,而且
        // 知道对各个表的join,如何进行join,包括join的时候,spark内部会对小表进行广播
        lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
     
        /** Internal version of the RDD. Avoids copies and has no schema */
        // 最后一步,调用SparkPlan(封装了PhysicalPlan的SparkPlan)的executor()方法,execute()方法,实际上就会去执行物理执行计划
        // execute()方法返回的是RDD[Row],就是一个元素类型为Row的RDD
        lazy val toRdd: RDD[Row] = executedPlan.execute()
     
        protected def stringOrError[A](f: => A): String =
          try f.toString catch { case e: Throwable => e.toString }
     
        def simpleString: String =
          s"""== Physical Plan ==
             |${stringOrError(executedPlan)}
          """.stripMargin.trim
     
        override def toString: String =
          // TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)})
          // however, the `toRdd` will cause the real execution, which is not what we want.
          // We need to think about how to avoid the side effect.
          s"""== Parsed Logical Plan ==
             |${stringOrError(logical)}
             |== Analyzed Logical Plan ==
             |${stringOrError(analyzed)}
             |== Optimized Logical Plan ==
             |${stringOrError(optimizedPlan)}
             |== Physical Plan ==
             |${stringOrError(executedPlan)}
             |Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
             |== RDD ==
          """.stripMargin.trim
      }
    
    
    
    
    
    
     /**
        * Analyzer的所在地,QueryExecution实际执行SQL语句的时候,第一步,就是用之前SqlParser解析出来的纯逻辑的封装了语法树的Unresolved LogicalPlan
        * 调用Analyzer的apply()方法,将Unresolved LogicalPlan生成一个Resolved LogicalPlan
        */
      @transient
      protected[sql] lazy val analyzer: Analyzer =
        new Analyzer(catalog, functionRegistry, caseSensitive = true) {
          override val extendedResolutionRules =
            ExtractPythonUdfs ::
            sources.PreInsertCastAndRename ::
            Nil
        }
    
    
    
    Analyzer这个类在org.apache.spark.sql.catalyst.analysis包下
    
    /**
      * Analyzer的父类是RuleExecutor,调用Analyzer的apply()方法,实际上会调用RuleExecutor的apply()方法,并传入一个Unresolved LogicalPlan
      */
    class Analyzer(catalog: Catalog,
                   registry: FunctionRegistry,
                   caseSensitive: Boolean,
                   maxIterations: Int = 100)
      extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {
    
    
    
    
    
    ###org.apache.spark.sql.catalyst.rules/RuleExecutor.scalla
    
     /**
        * 调用这个apply()方法,做了一些事情,总重要的一件事情,就是将这个LogicalPlan与它要查询的数据源绑定起来,从而让
        * Unresolved LogicalPlan 变成一个Resolved LogicalPlan
        */
      def apply(plan: TreeType): TreeType = {
        var curPlan = plan
     
        batches.foreach { batch =>
          val batchStartPlan = curPlan
          var iteration = 1
          var lastPlan = curPlan
          var continue = true
    }


    3、Optimizer

    接着看Optimizer,不关心apply()方法,关注它的优化逻辑
    Optimizer在org.apache.spark.sql.catalyst.optimize
    
    object DefaultOptimizer extends Optimizer {
      // 这里的batches是非常重要的,这里封装了每一个Spark SQL版本中,可以对逻辑执行计划执行的优化策略,在这里,重点理解Optimizer的各种优化策略
      // 这样,才清楚,Spark SQL 内部是如何对我们写的SQL语句进行优化的,我们可以再编写SQL语句的时候,直接用优化策略建议的方式来编写SQL语句,传递给
      // Spark SQL执行的SQL语句,本身就已经是最优的,这样,就可以避免在执行SQL解析的时候,进行大量的Spark SQL内部的优化,这样,在某种程度上,也可以提升性能
      //
      val batches =
        Batch("Combine Limits", FixedPoint(100),
          CombineLimits) ::
          // CombineLimits,就是合并limit语句,比如,你的SQL语句中,有多个limit子句,那么在这里会进行合并,取一个并集就可以了,这样的话
          // 在后面SQL执行的时候,limit就执行一次就好,所以,我们再写SQL的时候,尽量就写一个limit
        Batch("ConstantFolding", FixedPoint(100),
          NullPropagation,// 针对NULL的优化,尽量避免出现null的情况,否则,null是很容易产生数据倾斜的
          ConstantFolding,// 针对常量的优化,在这里直接计算可以获得的常量,所以我们自己对可能出现的常量尽量直接给出
          LikeSimplification, // like的简化优化
          BooleanSimplification,// boolean的简化优化
          SimplifyFilters,
          SimplifyCasts,
          SimplifyCaseConversionExpressions,
          OptimizeIn) ::
        Batch("Decimal Optimizations", FixedPoint(100),
          DecimalAggregates) ::
        Batch("Filter Pushdown", FixedPoint(100),
          UnionPushdown, // 将union下推,意思是和filter pushdown ,就是说将union where这种子句,下推到子查询中进行,尽量早的执行union操作和where
                        // 操作,避免在外层查询,针对大量的数据,执行where操作
          CombineFilters, // 合并filter,就是合并where子句,比如子查询中有针对某个字段的where子句,外层查询中,也有针对同样一个字段的where子句,
                          // 那么此时是可以合并where子句的,只保留一个即可,取并集即可,所以自己写SQL的时候哦,也要注意where的使用,如果针对一个字段,写一次就好
          PushPredicateThroughProject,
          PushPredicateThroughJoin,
          PushPredicateThroughGenerate,
          ColumnPruning) :: // ColumnPruning 列剪裁,就是针对你要查询的列进行剪裁,自己写SQL,如果表中有n个字段,但是你只需要查询一个字段, 那么就用select x from 不要使用select * from
        Batch("LocalRelation", FixedPoint(100),
          ConvertToLocalRelation) :: Nil
    }
    
    
    
    
    ###org.apache.spark.sql/SQLContext.scala
    
     /**
        * 用一些策略,比如说DataSourceStrategy,针对逻辑执行计划,执行进一步的具体化和物化
        */
      protected[sql] class SparkPlanner extends SparkStrategies {
        val sparkContext: SparkContext = self.sparkContext
     
        val sqlContext: SQLContext = self
     
        def codegenEnabled = self.conf.codegenEnabled
     
        def numPartitions = self.conf.numShufflePartitions
     
        def strategies: Seq[Strategy] =
          experimental.extraStrategies ++ (
          DataSourceStrategy ::
          DDLStrategy ::
          TakeOrdered ::
          HashAggregation ::
          LeftSemiJoin ::
          HashJoin ::
          InMemoryScans ::
          ParquetOperations ::
          BasicOperators ::
          CartesianProduct ::
          BroadcastNestedLoopJoin :: Nil)
    }
  • 相关阅读:
    MVC模式-----struts2框架(2)
    MVC模式-----struts2框架
    html的<h>标签
    jsp脚本元素
    LeetCode "Paint House"
    LeetCode "Longest Substring with At Most Two Distinct Characters"
    LeetCode "Graph Valid Tree"
    LeetCode "Shortest Word Distance"
    LeetCode "Verify Preorder Sequence in Binary Search Tree"
    LeetCode "Binary Tree Upside Down"
  • 原文地址:https://www.cnblogs.com/weiyiming007/p/11315111.html
Copyright © 2011-2022 走看看