zoukankan      html  css  js  c++  java
  • Spark SQL源代码分析之核心流程

        /** Spark SQL源代码分析系列文章*/

        自从去年Spark Submit 2013 Michael Armbrust分享了他的Catalyst,到至今1年多了,Spark SQL的贡献者从几人到了几十人,并且发展速度异常迅猛,究其原因,个人觉得有下面2点:

        1、整合:将SQL类型的查询语言整合到 Spark 的核心RDD概念里。这样能够应用于多种任务,流处理,批处理,包含机器学习里都能够引入Sql。
        2、效率:由于Shark受到hive的编程模型限制,无法再继续优化来适应Spark模型里。

        前一段时间測试过Shark,而且对Spark SQL也进行了一些測试,可是还是忍不住对Spark SQL一探到底,就从源码的角度来看一下Spark SQL的核心运行流程吧。

    一、引子

    先来看一段简单的Spark SQL程序:
    1. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    2. import sqlContext._
    3.case class Person(name: String, age: Int)
    4.val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
    5.people.registerAsTable("people")
    6.val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
    7.teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

    程序前两句1和2生成SQLContext,导入sqlContext以下的all,也就是执行SparkSQL的上下文环境。
    程序3,4两句是载入数据源注冊table
    第6句是真正的入口,是sql函数,传入一句sql,先会返回一个SchemaRDD。这一步是lazy的,直到第七句的collect这个action运行时,sql才会运行。


     二、SQLCOntext

    SQLContext是运行SQL的上下文对象,首先来看一下它Hold的有哪些成员:

    Catalog  

     一个存储<tableName,logicalPlan>的map结构,查找关系的文件夹,注冊表,注销表,查询表和逻辑计划关系的类。


    SqlParser 

     Parse 传入的sql来对语法分词,构建语法树,返回一个logical plan


    Analyzer 

      logical plan的语法分析器


    Optimizer 

     logical Plan的优化器


    LogicalPlan 

    逻辑计划,由catalyst的TreeNode组成,能够看到有3种语法树


    SparkPlanner 

    包括不同策略的优化策略来优化物理运行计划


    QueryExecution 

    sql运行的环境上下文


    就是这些对象组成了Spark SQL的执行时,看起来非常酷,有静态的metadata存储,有分析器、优化器、逻辑计划、物理计划、执行执行时。
    那这些对象是怎么相互协作来运行sql语句的呢?

    三、Spark SQL运行流程

    话不多说,先上图,这个图我用一个在线作图工具process on话的,画的不好,图能达意即可:

          

    核心组件都是绿色的方框,每一步流程的结果都是蓝色的框框,调用的方法是橙色的框框。

    先概括一下,大致的运行流程是:
    Parse SQL -> Analyze Logical Plan -> Optimize Logical Plan -> Generate Physical Plan -> Prepareed Spark Plan -> Execute SQL -> Generate RDD

    更详细的运行流程:

         sql or hql -> sql parser(parse)生成 unresolved logical plan -> analyzer(analysis)生成analyzed logical plan  -> optimizer(optimize)optimized logical plan -> spark planner(use strategies to plan)生成physical plan -> 採用不同Strategies生成spark plan -> spark plan(prepare) prepared spark plan -> call toRDD(execute()函数调用) 运行sql生成RDD


    3.1、Parse SQL

     回到開始的程序,我们调用sql函数,事实上是SQLContext里的sql函数它的实现是new一个SchemaRDD,在生成的时候就调用parseSql方法了。
    	  /**
       * Executes a SQL query using Spark, returning the result as a SchemaRDD.
       *
       * @group userf
       */
       def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))
       结果是会生成一个逻辑计划
       @transient
      protected[sql] val parser = new catalyst.SqlParser  
    
    
      protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql)

     3.2、Analyze to Execution

    当我们调用SchemaRDD里面的collect方法时,则会初始化QueryExecution,開始启动运行。
     override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect()
    我们能够非常清晰的看到运行步骤:

    protected abstract class QueryExecution {
        def logical: LogicalPlan
    
        lazy val analyzed = analyzer(logical)  //首先分析器会分析逻辑计划
        lazy val optimizedPlan = optimizer(analyzed) //随后优化器去优化分析后的逻辑计划
        // TODO: Don't just pick the first one...
        lazy val sparkPlan = planner(optimizedPlan).next() //依据策略生成plan物理计划
        // executedPlan should not be used to initialize any SparkPlan. It should be
        // only used for execution.
        lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) //最后生成已经准备好的Spark Plan
    
        /** Internal version of the RDD. Avoids copies and has no schema */
        lazy val toRdd: RDD[Row] = executedPlan.execute() //最后调用toRDD方法运行任务将结果转换为RDD
    
        protected def stringOrError[A](f: => A): String =
          try f.toString catch { case e: Throwable => e.toString }
    
        def simpleString: String = stringOrError(executedPlan)
    
        override def toString: String =
          s"""== Logical Plan ==
             |${stringOrError(analyzed)}
             |== Optimized Logical Plan ==
             |${stringOrError(optimizedPlan)}
             |== Physical Plan ==
             |${stringOrError(executedPlan)}
          """.stripMargin.trim
      }

    至此整个流程结束。

      四、总结:

      通过分析SQLContext我们知道了Spark SQL都包括了哪些组件,SqlParser,Parser,Analyzer,Optimizer,LogicalPlan,SparkPlanner(包括Physical Plan),QueryExecution.
      通过调试代码,知道了Spark SQL的运行流程:
    sql or hql -> sql parser(parse)生成 unresolved logical plan -> analyzer(analysis)生成analyzed logical plan  -> optimizer(optimize)optimized logical plan -> spark planner(use strategies to plan)生成physical plan -> 採用不同Strategies生成spark plan -> spark plan(prepare) prepared spark plan -> call toRDD(execute()函数调用) 运行sql生成RDD
      
      随后还会对里面的每一个组件对象进行研究,看看catalyst到底做了哪些优化。
      
      ——EOF——

    原创文章:转载请注明出自:http://blog.csdn.net/oopsoom/article/details/37658021

  • 相关阅读:
    购物网站数据库表
    C#Excel的导入与导出
    DataTable过滤重复字段
    压力测试~一套完整的压力测试项目文档
    压力测试~测试工具的使用
    C#调用本机摄像头
    linq学习笔记
    EasyPlayer RTSP播放器对RTSP播放地址url的通用兼容修改意见
    我们计划为EasyDSS定制开发一款超低延时的EasyPlayer Flash播放器
    EasyRTMP+EasyDSS实现一套完整的紧急视频回传直播与存储回放方案
  • 原文地址:https://www.cnblogs.com/hrhguanli/p/4084490.html
Copyright © 2011-2022 走看看