zoukankan      html  css  js  c++  java
  • spark-sql架构与原理

    . 整体架构


    总结为如下图:

     

    Dataframe本质是  数据  +  数据的描述信息(结构元信息)

    所有的上述SQLdataframe操作最终都通过Catalyst翻译成spark程序RDD操作代码

     

    sparkSQL前身是shark,大量依赖Hive项目的jar包与功能,但在上面的扩展越来越难,因此出现了SparkSQL,它重写了分析器,执行器   脱离了对Hive项目的大部分依赖,基本可以独立去运行,只用到两个地方:

    1.借用了hive的词汇分析的jarHiveQL解析器

    2.借用了hivemetastore和数据访问APIhiveCatalog

     

    也就是说上图的左半部分的操作全部用的是sparkSQL本身自带的内置SQL解析器解析SQL进行翻译,用到内置元数据信息(比如结构化文件中自带的结构元信息,RDDschema中的结构元信息)

    右半部分则是走的HiveHQL解析器,还有Hive元数据信息

    因此左右两边的API调用的底层类会有不同

     

    SQLContext使用:

    简单的解析器(scala语言写的sql解析器)【比如:1.在半结构化的文件里面使用sql查询时,是用这个解析器解析的,2.访问(半)结构化文件的时候,通过sqlContext使用schema,类生成Dataframe,然后dataframe注册为表时,registAsTmpTable   然后从这个表里面进行查询时,即使用的简单的解析器,一些hive语法应该是不支持的,有待验证)】

    simpleCatalog【此对象中存放关系(表),比如我们指定的schema信息,类的信息,都是关系信息】

    HiveContext使用:

    HiveQL解析器【支持hivehql语法,如只有通过HiveContext生成的dataframe才能调用saveAsTable操作

    hiveCatalog(存放数据库和表的元数据信息)

    Sparksql的解析与Hiveql的解析的执行流程:

    一个Sql语句转化为实际可执行的Spark的RDD模型需要经过以下几个步骤:


    主要介绍下Spark-SQL里面的主要类成员:

    1.2 SQLContext

    SQL上下文环境,它保存了QueryExecution中所需要的几个类:

    1.2.1 Catalog

    一个存储<tableName,logicalPlan>的map结构,查找关系的目录,注册表,注销表,查询表和逻辑计划关系的类

    [java] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. @transient  
    2. protected[sql] lazy val catalog: Catalog = new SimpleCatalog(conf)  
    3. class SimpleCatalog(val conf: CatalystConf) extends Catalog {  
    4.   val tables = new mutable.HashMap[String, LogicalPlan]()  
    5.   override def registerTable(  
    6.       tableIdentifier: Seq[String],  
    7.       plan: LogicalPlan): Unit = {  
    8.     //转化大小写  
    9.     val tableIdent = processTableIdentifier(tableIdentifier)  
    10.     tables += ((getDbTableName(tableIdent), plan))  
    11.   }  
    12.   override def unregisterTable(tableIdentifier: Seq[String]): Unit = {  
    13.     val tableIdent = processTableIdentifier(tableIdentifier)  
    14.     tables -= getDbTableName(tableIdent)  
    15.   }  
    16.   override def unregisterAllTables(): Unit = {  
    17.     tables.clear()  
    18.   }  
    19.   override def tableExists(tableIdentifier: Seq[String]): Boolean = {  
    20.     val tableIdent = processTableIdentifier(tableIdentifier)  
    21.     tables.get(getDbTableName(tableIdent)) match {  
    22.       case Some(_) => true  
    23.       case None => false  
    24.     }  
    25.   }  
    26.   override def lookupRelation(  
    27.       tableIdentifier: Seq[String],  
    28.       alias: Option[String] = None): LogicalPlan = {  
    29.     val tableIdent = processTableIdentifier(tableIdentifier)  
    30.     val tableFullName = getDbTableName(tableIdent)  
    31.     //  val tables = new mutable.HashMap[String, LogicalPlan](),根据表名获取logicalplan  
    32.     val table = tables.getOrElse(tableFullName, sys.error(s"Table Not Found: $tableFullName"))  
    33.     val tableWithQualifiers = Subquery(tableIdent.last, table)  
    34.     // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are  
    35.     // properly qualified with this alias.  
    36.     alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)  
    37.   }  
    38.   override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {  
    39.     tables.map {  
    40.       case (name, _) => (name, true)  
    41.     }.toSeq  
    42.   }  
    43.   override def refreshTable(databaseName: String, tableName: String): Unit = {  
    44.     throw new UnsupportedOperationException  
    45.   }  
    46. }  

    1.2.2 SparkSQLParser

    将Sql语句解析成语法树,返回一个Logical Plan。它首先拆分不同的SQL(将其分类),然后利用fallback解析。 

    [java] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.  * The top level Spark SQL parser. This parser recognizes syntaxes that are available for all SQL 
    3.  * dialects supported by Spark SQL, and delegates all the other syntaxes to the `fallback` parser. 
    4.  * 
    5.  * @param fallback A function that parses an input string to a logical plan 
    6.  */  
    7. private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends AbstractSparkSQLParser {  
    8. protected val AS = Keyword("AS")  
    9. protected val CACHE = Keyword("CACHE")  
    10. protected val CLEAR = Keyword("CLEAR")  
    11. protected val IN = Keyword("IN")  
    12. protected val LAZY = Keyword("LAZY")  
    13. protected val SET = Keyword("SET")  
    14. protected val SHOW = Keyword("SHOW")  
    15. protected val TABLE = Keyword("TABLE")  
    16. protected val TABLES = Keyword("TABLES")  
    17. protected val UNCACHE = Keyword("UNCACHE")  
    18. override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | show | others  
    19. private lazy val cache: Parser[LogicalPlan] =  
    20.   CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> restInput).? ^^ {  
    21.     case isLazy ~ tableName ~ plan =>  
    22.       CacheTableCommand(tableName, plan.map(fallback), isLazy.isDefined)  
    23.   }  
    24. private lazy val uncache: Parser[LogicalPlan] =  
    25.   ( UNCACHE ~ TABLE ~> ident ^^ {  
    26.       case tableName => UncacheTableCommand(tableName)  
    27.     }  
    28.   | CLEAR ~ CACHE ^^^ ClearCacheCommand  
    29.   )  
    30. private lazy val set: Parser[LogicalPlan] =  
    31.   SET ~> restInput ^^ {  
    32.     case input => SetCommandParser(input)  
    33.   }  
    34. private lazy val show: Parser[LogicalPlan] =  
    35.   SHOW ~> TABLES ~ (IN ~> ident).? ^^ {  
    36.     case _ ~ dbName => ShowTablesCommand(dbName)  
    37.   }  
    38. private lazy val others: Parser[LogicalPlan] =  
    39.   wholeInput ^^ {  
    40.     case input => fallback(input)  
    41.   }  
    42. }  

    1.2.3 Analyzer

    语法分析器,Analyzer会使用Catalog和FunctionRegistry将UnresolvedAttribute和UnresolvedRelation转换为catalyst里全类型的对象。例如将

    'UnresolvedRelation[test], None

    转化为

    Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42]org.apache.spark.sql.parquet.ParquetRelation2@2a400010

    [java] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. class Analyzer(  
    2.     catalog: Catalog,  
    3.     registry: FunctionRegistry,  
    4.     conf: CatalystConf,  
    5.     maxIterations: Int = 100)  
    6.   extends RuleExecutor[LogicalPlan] with HiveTypeCoercion with CheckAnalysis {  
    7. ……  
    8. }  

    1.2.4 Optimizer

    优化器,将Logical Plan进一步进行优化

    [java] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. object DefaultOptimizer extends Optimizer {  
    2.   val batches =  
    3.     // SubQueries are only needed for analysis and can be removed before execution.  
    4.     Batch("Remove SubQueries", FixedPoint(100),  
    5.       EliminateSubQueries) ::  
    6.     Batch("Operator Reordering", FixedPoint(100),  
    7.       UnionPushdown,  
    8.       CombineFilters,  
    9.       PushPredicateThroughProject,  
    10.       PushPredicateThroughJoin,  
    11.       PushPredicateThroughGenerate,  
    12.       ColumnPruning,  
    13.       ProjectCollapsing,  
    14.       CombineLimits) ::  
    15.     Batch("ConstantFolding", FixedPoint(100),  
    16.     NullPropagation,  
    17.       OptimizeIn,  
    18.       ConstantFolding,  
    19.       LikeSimplification,  
    20.       BooleanSimplification,  
    21.       SimplifyFilters,  
    22.       SimplifyCasts,  
    23.       SimplifyCaseConversionExpressions) ::  
    24.     Batch("Decimal Optimizations", FixedPoint(100),  
    25.       DecimalAggregates) ::  
    26.     Batch("LocalRelation", FixedPoint(100),  
    27.       ConvertToLocalRelation) :: Nil  
    28. }  
     例如:

    CombineFilters:递归合并两个相邻的filter。例如:将

    Filter(a>1)

     Filter(b>1)

    Project……

    转化为

    Filter(a>1) AND Filter(b>1)

     Project……

    CombineLimits:合并两个相邻的limit。例如:将select * from (select * from c_picrecord limit 100)a limit 10

    优化为:

    Limit if ((100 < 10)) 100 else 10
    Relation[id#0L,dev_id#1,dev_chnnum#2L,de……

    1.2.5 SparkPlanner

    将LogicalPlan转化为SparkPlan

    [java] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. protected[sql] class SparkPlanner extends SparkStrategies {  
    2.   val sparkContext: SparkContext = self.sparkContext  
    3.   val sqlContext: SQLContext = self  
    4.   def codegenEnabled: Boolean = self.conf.codegenEnabled  
    5.   def unsafeEnabled: Boolean = self.conf.unsafeEnabled  
    6.   def numPartitions: Int = self.conf.numShufflePartitions  
    7.   def strategies: Seq[Strategy] =  
    8.     experimental.extraStrategies ++ (  
    9.     DataSourceStrategy ::  
    10.     DDLStrategy ::  
    11.     TakeOrdered ::  
    12.     HashAggregation ::  
    13.     LeftSemiJoin ::  
    14.     HashJoin ::  
    15.     InMemoryScans ::  
    16.       ParquetOperations ::  
    17.       BasicOperators ::  
    18.       CartesianProduct ::  
    19.       BroadcastNestedLoopJoin :: Nil)  
    20.  }  

    比方说:

    Subquery test

    Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42]org.apache.spark.sql.parquet.ParquetRelation2@2a400010

    通过DataSourceStrategy中的
    1. // Scanning non-partitioned HadoopFsRelation  
    2. case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>  
    将其转化为
    PhysicalRDD

    1.2.6 PrepareForExecution

    在SparkPlan中插入Shuffle的操作,如果前后2个SparkPlan的outputPartitioning不一样的话,则中间需要插入Shuffle的动作,比分说聚合函数,先局部聚合,然后全局聚合,局部聚合和全局聚合的分区规则是不一样的,中间需要进行一次Shuffle。
    1. /** 
    2.  * Prepares a planned SparkPlan for execution by inserting shuffle operations as needed. 
    3.  */  
    4. @transient  
    5. protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {  
    6.   val batches =  
    7.     Batch("Add exchange", Once, EnsureRequirements(self)) :: Nil  
    8. }  
    例如

    GeneratedAggregate false,[Coalesce(SUM(PartialCount#44L),0) AS count#43L], false

     GeneratedAggregatetrue, [COUNT(1) AS PartialCount#44L], false

        PhysicalRDDMapPartitionsRDD[1]

    经过PrepareForExecution,转化为

    GeneratedAggregate false,[Coalesce(SUM(PartialCount#44L),0) AS count#43L], false

     Exchange SinglePartition

     GeneratedAggregate true, [COUNT(1) AS PartialCount#44L], false

          PhysicalRDDMapPartitionsRDD[1]

    1.3 QueryExecution

    SQL语句执行环境

    [java] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. protected[sql] class QueryExecution(val logical: LogicalPlan) {//logical包含了Aggregate(groupingExprs, aggregates, df.logicalPlan)  
    2.   def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed)  
    3.   lazy val analyzed: LogicalPlan = analyzer.execute(logical)  
    4.   lazy val withCachedData: LogicalPlan = {  
    5.     assertAnalyzed()  
    6.     cacheManager.useCachedData(analyzed)  
    7.   }  
    8.   lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData)//优化过的LogicalPlan  
    9.   // TODO: Don't just pick the first one...  
    10.   lazy val sparkPlan: SparkPlan = {  
    11.     SparkPlan.currentContext.set(self)  
    12.     //SparkPlanner把LogicalPlan转化为SparkPlan  
    13.     //1.4.1选取的是第一个strategies DataSourceStrategy  
    14.     planner.plan(optimizedPlan).next()  
    15.   }  
    16.   lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan)  
    17.   lazy val toRdd: RDD[Row] = {  
    18.     toString  
    19.     executedPlan.execute()  
    20.   }  
    21.   protected def stringOrError[A](f: => A): String =  
    22.     try f.toString catch { case e: Throwable => e.toString }  
    23.   def simpleString: String =  
    24.     s"""== Physical Plan ==  
    25.        |${stringOrError(executedPlan)}  
    26.     """.stripMargin.trim  
    27.   //TODO:如何打印  
    28.   override def toString: String = {  
    29.     def output =  
    30.       analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")  
    31.     // TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)})  
    32.     // however, the `toRdd` will cause the real execution, which is not what we want.  
    33.     // We need to think about how to avoid the side effect.  
    34.     s"""== Parsed Logical Plan ==  
    35.        |${stringOrError(logical)}  
    36.        |== Analyzed Logical Plan ==  
    37.        |${stringOrError(output)}  
    38.        |${stringOrError(analyzed)}  
    39.        |== Optimized Logical Plan ==  
    40.        |${stringOrError(optimizedPlan)}  
    41.        |== Physical Plan ==  
    42.        |${stringOrError(executedPlan)}  
    43.        |Code Generation: ${stringOrError(executedPlan.codegenEnabled)}  
    44.        |== RDD ==  
    45.     """.stripMargin.trim  
    46.   }  
    47. }  

    这里唯一需要注意的是analyzedoptimizedPlansparkPlanexecutedPlan都为懒变量,也就是说只有真正要用到的时时候才会去执行相应的代码逻辑,没有用到的时候是不会发生任何事情的。

    1.4 LogicalPlan and SparkPlan

    LogicalPlan和SparkPlan都继承自QueryPlan,QueryPlan为泛型类

    [java] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] {  
    2. }  
    3. abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {  
    4. }  
    5. abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable {  
    6. }  

    以上都为抽象类,然后在此基础上又根据不同的类型衍生出不同的树节点

    [java] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.  * A logical plan node with no children.叶子节点,没有子节点 
    3.  */  
    4. abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {  
    5.   self: Product =>  
    6. }  
    7. /** 
    8.  * A logical plan node with single child. 一元节点 
    9.  */  
    10. abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan] {  
    11.   self: Product =>  
    12. }  
    13. /** 
    14.  * A logical plan node with a left and right child 二元节点. 
    15.  */  
    16. abstract class BinaryNode extends LogicalPlan with trees.BinaryNode[LogicalPlan] {  
    17.   self: Product =>  
    18. }  

    [java] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. //叶子节点,没有子节点  
    2. private[sql] trait LeafNode extends SparkPlan with trees.LeafNode[SparkPlan] {  
    3.   self: Product =>  
    4. }  
    5. //一元节点  
    6. private[sql] trait UnaryNode extends SparkPlan with trees.UnaryNode[SparkPlan] {  
    7.   self: Product =>  
    8. }  
    9. //二元节点  
    10. private[sql] trait BinaryNode extends SparkPlan with trees.BinaryNode[SparkPlan] {  
    11.   self: Product =>  
    12. }  

    其各自真正的具体类为:

    [java] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {  
    2.   self: Product =>  
    3. }  

    [java] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan] {  
    2.   self: Product =>  
    3. }  

    [java] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. abstract class BinaryNode extends LogicalPlan with trees.BinaryNode[LogicalPlan] {  
    2.   self: Product =>  
    3. }  

    [java] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. private[sql] trait LeafNode extends SparkPlan with trees.LeafNode[SparkPlan] {  
    2.   self: Product =>  
    3. }  

    [java] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. private[sql] trait UnaryNode extends SparkPlan with trees.UnaryNode[SparkPlan] {  
    2.   self: Product =>  
    3. }  

    [java] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. private[sql] trait BinaryNode extends SparkPlan with trees.BinaryNode[SparkPlan] {  
    2.   self: Product =>  
    3. }  

    可见Spark-Sql里面二叉树结构贯穿了整个解析过程。


    二. Catalyst

    所有的SQL操作最终都通过Catalyst翻译成spark程序代码

    三. SparkSQL整体架构(前端+后端)


    thriftserver作为一个前端,它其实只是主要分为两大块:

    1.维护与用户的JDBC连接

    2.通过HiveContextAPI提交用户的HQL查询


    正因为当初对未来做了太多的憧憬,所以对现在的自己尤其失望。生命中曾经有过的所有灿烂,终究都需要用寂寞来偿还。
  • 相关阅读:
    python之xlwt模块列宽width、行高Heights详解
    Testlink在CentOS、windows安装
    Appium中长按按钮操作
    CentOS oracle Client客户端安装
    WebDriver中自动识别验证码--Python实现
    shell批量重命令文件脚本
    MFC 显示图片
    MFC CEdit控件 自动换行
    第一次社会
    undefined reference 问题各种情况分析
  • 原文地址:https://www.cnblogs.com/candlia/p/11920342.html
Copyright © 2011-2022 走看看