zoukankan      html  css  js  c++  java
  • spark sql

    Spark SQL运行机制

    Spark SQL 对 SQL 语句的处理和关系型数据库对 SQL 语句的处理采用了类似的方法,首先会将 SQL 语句进行解析(Parse),然后形成一个 Tree,在后续的如绑定、优化等处理过程都是对 Tree 的操作,而操作的方法是采用 Rule,通过模式匹配,对不同类型的节点采用不同的操作。在整个 SQL语句的处理过程中,Tree 和 Rule 相互配合,完成了解析、绑定(在 Spark SQL中称为 Analysis)、优化、物理计划等过程,最终生成可以执行的物理计划。整个过程概况如下图所示:


    Spark SQL的核心是把已有的RDD,带上Schema信息,然后注册成类似SQL里的”Table”,对其进行SQL查询。这里面主要分两部分,一是生成SchemaRDD,二是执行查询。

    一,SQL生成SchemaRDD:

    对于Spark SQL来说,数据方面,RDD可以来自任何已有的RDD,也可以来自支持的第三方格式,如json file、parquet file。SQLContext会把带case class的RDD隐式转化为SchemaRDD:


    ExsitingRdd单例里会反射出case class的attributes,并把RDD的数据转化成Catalyst的GenericRow(Row和GenericRow是Catalyst里的行表示模型 ),最后返回RDD[Row],即一个 SchemaRDD。这里的具体转化逻辑可以参考ExsitingRdd的productToRowRdd和convertToCatalyst方法。

    之后可以进行SchemaRDD提供的注册table操作、针对Schema复写的部分RDD转化操作、DSL操作、saveAs操作等等。

    二,查询流程:

    sqlContext 总的一个过程如下图所示:


    1 ,SQL 语句经过 SqlParse 解析成 Unresolved LogicalPlan;

    2 ,使用 analyzer 结合数据数据字典(catalog)进行绑定,生成 resolved LogicalPlan;

    3 ,使用 optimizer 对 resolved LogicalPlan 进行优化,生成 optimized LogicalPlan;

    4 ,使用 SparkPlan 将 LogicalPlan 转换成 PhysicalPlan;

    5 ,使用 prepareForExecution()将 PhysicalPlan 转换成可执行物理计划;

    6 ,使用 execute()执行可执行物理计划;

    7 ,生成 SchemaRDD。SQLContext里对sql的一个解析和执行流程。

    阶段1:parseSql(sql: String),simple sql parser做词法语法解析,生成LogicalPlan。

    在sql中引入了一种新的RDD,即SchemaRDD,如下:


    构造函数中总共两入参一为SparkContext,另一个LogicalPlan。sql函数的定义如下:


    parseSql(sqlText)负责生成LogicalPlan,parseSql就是SqlParser的一个实例。

    SqlParser这一部分的代码要理解起来关键是要搞清楚StandardTokenParsers 的调用规则,里面有一大堆的符号,如果不理解是什么意思,估计很难理清头绪。

    由于apply函数可以不被显示调用,所以parseSql(sqlText)一句其实会隐式的调用SqlParser中的apply函数如下:


    phrase(query)(newlexical.Scanner(input)) 这句代码是最关键的,翻译过来就是如果输入的input字符串符合Lexical中定义的规则,则继续使用 query 处理。

    看一下query的定义是什么:


    到了这里终于看到有LogicalPlan了, 也就是说将普通的string转换成LogicalPlan在这里发生了 。

    阶段二:analyzer(logicalPlan),把做完词法语法解析的执行计划进行初步分析和映射。

    目前SQLContext内的Analyzer由Catalyst提供,定义如下:new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive =true) 。catalog为SimpleCatalog,catalog是用来注册table和查询relation的。而这里的FunctionRegistry不支持lookupFunction方法,所以该analyzer不支持Function注册,即UDF。

    Analyzer内定义了几批规则:

    阶段三:得到的是初步的logicalPlan后,接下来第三步是optimizer(plan)。

    Optimizer里面也是定义了几批规则,会按序对执行计划进行优化操作。


    阶段四: 优化后的执行计划,还要丢给SparkPlanner处理。

    里面定义了一些策略,目的是根据逻辑执行计划树生成最后可以执行的物理执行计划树,即得到SparkPlan。


    阶段五: 在最终真正执行物理执行计划前,最后还要进行两次规则。

    SQLContext里定义这个过程叫prepareForExecution,这个步骤是额外增加的,直接new RuleExecutor[SparkPlan]进行的。


    阶段六:最后调用SparkPlan的execute()执行计算。

    这个execute()在每种SparkPlan的实现里定义,一般都会递归调用children的execute()方法,所以会触发整棵Tree的计算。

    hiveContext 是Spark提供的用户接口,继承自SqlContext

    既然是继承自SqlContext,那么我们讲普通的SQL与Hiveql分析执行步骤做一个对比,可以得到下图:  


    总的一个过程如下图所示,代码不做过多解释了:


    1,SQL 语句经过 HiveQl.parseSql 解析成 Unresolved LogicalPlan,在这个解析过程中对 hiveql 语句使用 getAst()获取 AST 树,然后再进行解析;

    2,使用 analyzer 结合数据 hive 源数据 Metastore(新的 catalog)进行绑定,生成resolved LogicalPlan;

    3,使用 optimizer 对 resolved LogicalPlan 进行优化,生成 optimized LogicalPlan,优化前使用了ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))进行预处理;

    4 ,使用 hivePlanner 将 LogicalPlan 转换成 PhysicalPlan;5 使用 prepareForExecution()将 PhysicalPlan 转换成可执行物理计划;

    6 ,使用 execute()执行可执行物理计划;

    7 ,执行后,使用 map(_.copy)将结果导入 SchemaRDD。

    Spark SQL与普通SQL的区别

    我们先来看看传统数据库查询运行的机制,如下图:


    我们假设提交一个很简单的查询:SELECT  a1,a2,a3  FROM tableA  Where  condition。该语句是由Projection(a1,a2,a3)、Data Source(tableA)、Filter(condition)组成的,分别对应sql查询过程中的Result、Data Source、Operation。一般的数据库系统先将读入的SQL语句进行解析(Parse)(在解析SQL语句的时候,会将SQL语句转换成一个树型结构来进行处理 )分辨出SQL语句中哪些词是关键词(如SELECT、FROM、 WHERE),哪些是表达式、哪些是Projection、哪些是Data Source等等。这一步就可以判断SQL语句是否规范,不规范就报错,规范就继续下一步过程绑定(Bind),这个过程将SQL语句和数据库的数据字典(列、表、视图等等)进行绑定,如果相关的Projection、Data Source等等都是存在的话,就表示这个SQL语句是可以执行的;而在执行前,一般的数据库会提供几个执行计划,这些计划一般都有运行统计数据,数据库 会在这些计划中选择一个最优计划(Optimize)(执行策略树可以优化,优化的过程就是对树中节点进行合并或者顺序调整)。最终执行该计划(Execute),并返回结果。

    与普通SQL主要区别:

    1,由于查询基于Spark平台,所以速度会比普通SQL在性能方面有极大的提升,并且与Hive相比性能有比较大的提升,因为Hive原生基于MapReduce,生成MapReduce Job,Hive on Spark生成Spark Job,充分利用Spark的快速执行能力来缩短HiveQI的响应时间。

    2,内存列存储SQLContext下cache/uncache table的时候会调用列存储模块。该模块借鉴自Shark,目的是当把表数据cache在内存的时候做行转列操作,以便压缩。Spark SQL创建的表会变成内存缓存表,对象变少,GC变少,下次需要表中数据的时候可以使用上次缓存的表,因此效率变快。普通SQL只是在执行过程有时候不需要读取物理表就可以返回结果,比如重新运行刚运行过的SQL语句,可能直接从数据库的缓冲池中获取返回结果。

    3,Runtime Bytecode Generation,根据查询动态生成代码。当我们进行查询的时候,普通SQL被处理的时候会生成复杂的树,可是Spark SQL通过自己的解释引擎,结合Scala语言的Runtime Reflection特性极大的简化了SQL语句的处理。

    4,Spark SQL的查询会变成Schema RDD,这样的可以执行RDD的很多操作,比如count。这样的特点带来了无限的可能性。而普通的SQL就不可能有这样的性质。

    5,能在Scala代码里面写SQL,支持简单的SQL语法检查,能把RDD指定为Table存储起来,以此完成实时的数据交互。此外支持部分SQL语法的DSL。

    6,Spark SQL支持Parquet文件的读写,并且保留Schema,支持列存储。除此之外可以非常好的使用JDBC和ODBC,因此非常有利于BI Tools。支持jsonFile和jsonRDD,读取json文件之后,转换成SchemaRDD。这些支持都丰富了Spark SQL数据接入场景。

    SparkSQL与其他部分联系

    One Stack rule them all是Spark的既定方针,各个组件相互集成。都是使用RDD进行操作,RDD就作为通用的计算单元,联系各个组件。

    SparkSQL作为与数据库的交互组件,可以简单的进行数据的统计和分析,而更加复杂的计算可以提供给MLlib和GraphX。

    SparkSQL与SparkStreaming

    SparkStreaming可以对多种数据源(如Kdfka、Flume、Twitter、Zero和TCP套接字)进行类似map、reduce、join、window等复杂操作,将结果保存到外部文件系统、数据库或应用到实时仪表盘。

    Spark Streaming进行实时数据分析得到结果后,通过Spark SQL给用户交互查询。

    SparkSQL与MLlib

    SparkSQL查询出需要进行计算的数据,提供给MLlib。

    例如电商对店铺进行分类,先是使用SparkSQL查询出每个店的销售数量和金额,然后MLlib对其进行分类操作。

    SparkSQL与GraphX

    SparkSQL查询出需要进行计算的数据,提供给GraphX。

    例如商品摆放建议,先使用Sparksql查出店铺的销量、库存和店铺之间的距离等等,来作为图的顶点或边,最后根据规则进行图处理。

    SparkSQL主要用到的类和方法

    SQLContext

    SparkSQL组件入口

    new SparkSQL(sc)

    主要方法:

    sql(String sqlText):执行一个sql查询

    jsonFile(String path):加载一个json文件,作为SchemaRDD

    jsonRDD(RDD(String) json):加载一个存储了json对象的RDD,作为SchemaRDD

    parquetFile(String path):加载一个parquet文件,作为SchemaRDD

    SchemaRDD

    包含RDD常用方法,如take()、join()等

    registerAsTable(String tableName) :将这个RDD注册成表

    registerTempTable(String tableName):将这个RDD注册成临时表

    saveAsParquetFile(String path):将这个RDD中的数据存储为parquet文件

    printSchema():在控制台打印出SchemaRDD的格式

    HiveContext

    hiveql(String hqlQuery)

    hql(String hqlQuery) :操作hive执行查询                                                                                                                                            

  • 相关阅读:
    专家视角 | 小荷的 Oracle Database 18c 新特性快速一瞥
    java.lang.ClassCastException: com.xx.User cannot be cast to com.xx.User
    上传单个文件
    极速体验:Oracle 18c 下载和Scalable Sequence新特性
    开工大吉:Oracle 18c已经发布及新特性介绍
    CentOS7编译安装NodeJS
    .NET 同步与异步之锁(ReaderWriterLockSlim)(八)
    .NET 同步与异步之锁(ReaderWriterLockSlim)(八)
    .NET 同步与异步之锁(ReaderWriterLockSlim)(八)
    .NET 同步与异步之锁(ReaderWriterLockSlim)(八)
  • 原文地址:https://www.cnblogs.com/zhangyunlin/p/6168204.html
Copyright © 2011-2022 走看看