zoukankan      html  css  js  c++  java
  • 第十一篇:Spark SQL 源码分析之 External DataSource外部数据源

    上周Spark1.2刚发布,周末在家没事,把这个特性给了解一下,顺便分析下源码,看一看这个特性是如何设计及实现的。

        /** Spark SQL源码分析系列文章*/

      (Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部数据源(一)示例 http://blog.csdn.net/oopsoom/article/details/42061077

    一、Sources包核心

        Spark SQL在Spark1.2中提供了External DataSource API,开发者可以根据接口来实现自己的外部数据源,如avro, csv, json, parquet等等。

        在Spark SQL源代码的org/spark/sql/sources目录下,我们会看到关于External DataSource的相关代码。这里特别介绍几个:

        1、DDLParser 

        专门负责解析外部数据源SQL的SqlParser,解析create temporary table xxx using options (key 'value', key 'value') 创建加载外部数据源表的语句。

    [java] view plain copy
     
    1. protected lazy val createTable: Parser[LogicalPlan] =  
    2.    CREATE ~ TEMPORARY ~ TABLE ~> ident ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {  
    3.      case tableName ~ provider ~ opts =>  
    4.        CreateTableUsing(tableName, provider, opts)  
    5.    }  

        2、CreateTableUsing

       一个RunnableCommand,通过反射从外部数据源lib中实例化Relation,然后注册到为temp table。

    [java] view plain copy
     
    1. private[sql] case class CreateTableUsing(  
    2.     tableName: String,  
    3.     provider: String,  // org.apache.spark.sql.json   
    4.     options: Map[String, String]) extends RunnableCommand {  
    5.   
    6.   def run(sqlContext: SQLContext) = {  
    7.     val loader = Utils.getContextOrSparkClassLoader  
    8.     val clazz: Class[_] = try loader.loadClass(provider) catch { //do reflection  
    9.       case cnf: java.lang.ClassNotFoundException =>  
    10.         try loader.loadClass(provider + ".DefaultSource") catch {  
    11.           case cnf: java.lang.ClassNotFoundException =>  
    12.             sys.error(s"Failed to load class for data source: $provider")  
    13.         }  
    14.     }  
    15.     val dataSource = clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider] //json包DefaultDataSource  
    16.     val relation = dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))//创建JsonRelation  
    17.   
    18.     sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)//注册  
    19.     Seq.empty  
    20.   }  
    21. }  

        2、DataSourcesStrategy

        在 Strategy 一文中,我已讲过Streategy的作用,用来Plan生成物理计划的。这里提供了一种专门为了解析外部数据源的策略。

        最后会根据不同的BaseRelation生产不同的PhysicalRDD。不同的BaseRelation的scan策略下文会介绍。

    [java] view plain copy
     
    1. private[sql] object DataSourceStrategy extends Strategy {  
    2.   def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {  
    3.     case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: CatalystScan)) =>  
    4.       pruneFilterProjectRaw(  
    5.         l,  
    6.         projectList,  
    7.         filters,  
    8.         (a, f) => t.buildScan(a, f)) :: Nil  
    9.     ......  
    10.     case l @ LogicalRelation(t: TableScan) =>  
    11.       execution.PhysicalRDD(l.output, t.buildScan()) :: Nil  
    12.   
    13.     case _ => Nil  
    14.   }  

       3、interfaces.scala 

        该文件定义了一系列可扩展的外部数据源接口,对于想要接入的外部数据源,我们只需实现该接口即可。里面比较重要的trait RelationProvider 和 BaseRelation,下文会详细介绍。

        4、filters.scala

        该Filter定义了如何在加载外部数据源的时候,就进行过滤。注意哦,是加载外部数据源到Table里的时候,而不是Spark里进行filter。这个有点像hbase的coprocessor,查询过滤在Server上就做了,不在Client端做过滤。

       5、LogicalRelation

       封装了baseRelation,继承了catalyst的LeafNode,实现MultiInstanceRelation。

            

    二、External DataSource注册流程

    用spark sql下sql/json来做示例, 画了一张流程图,如下:
     
     
    注册外部数据源的表的流程:
    1、提供一个外部数据源文件,比如json文件。
    2、提供一个实现了外部数据源所需要的interfaces的类库,比如sql下得json包,在1.2版本后改为了External Datasource实现。
    3、引入SQLContext,使用DDL创建表,如create temporary table xxx using options (key 'value', key 'value') 
    4、External Datasource的DDLParser将对该SQL进行Parse
    5、Parse后封装成为一个CreateTableUsing类的对象。该类是一个RunnableCommand,其run方法会直接执行创建表语句。
    6、该类会通过反射来创建一个org.apache.spark.sql.sources.RelationProvider,该trait定义要createRelation,如json,则创建JSONRelation,若avro,则创建AvroRelation。
    7、得到external releation后,直接调用SQLContext的baseRelationToSchemaRDD转换为SchemaRDD
    8、最后registerTempTable(tableName) 来注册为Table,可以用SQL来查询了。
     

    三、External DataSource解析流程

    先看图,图如下:
     
    Spark SQL解析SQL流程如下:
    1、Analyzer通过Rule解析,将UnresolvedRelation解析为JsonRelation。
    2、通过Parse,Analyzer,Optimizer最后得到JSONRelation(file:///path/to/shengli.json,1.0)  
    3、通过sources下得DataSourceStrategy将LogicalPlan映射到物理计划PhysicalRDD。
    4、PhysicalRDD里包含了如何查询外部数据的规则,可以调用execute()方法来执行Spark查询。
     

    四、External Datasource Interfaces

    在第一节我已经介绍过,主要的interfaces,主要看一下BaseRelation和RelationProvider。
    如果我们要实现一个外部数据源,比如avro数据源,支持spark sql操作avro file。那么久必须定义AvroRelation来继承BaseRelation。同时也要实现一个RelationProvider。
     
    BaseRelation:
    是外部数据源的抽象,里面存放了schema的映射,和如何scan数据的规则。
    [java] view plain copy
     
    1. abstract class BaseRelation {  
    2.   def sqlContext: SQLContext  
    3.   def schema: StructType  
    [java] view plain copy
     
    1. abstract class PrunedFilteredScan extends BaseRelation {  
    2.   def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]  
    3. }  
    1、schema我们如果自定义Relation,必须重写schema,就是我们必须描述对于外部数据源的Schema。
    2、buildScan我们定义如何查询外部数据源,提供了4种Scan的策略,对应4种BaseRelation。
     
     
    我们支持4种BaseRelation,分为TableScan, PrunedScan,PrunedFilterScan,CatalystScan。
       1、TableScan
              默认的Scan策略。
       2、PrunedScan
              这里可以传入指定的列,requiredColumns,列裁剪,不需要的列不会从外部数据源加载。
       3、PrunedFilterScan
              在列裁剪的基础上,并且加入Filter机制,在加载数据也的时候就进行过滤,而不是在客户端请求返回时做Filter。
       4、CatalystScan
               Catalyst的支持传入expressions来进行Scan。支持列裁剪和Filter。
     
    RelationProvider:
    我们要实现这个,接受Parse后传入的参数,来生成对应的External Relation,就是一个反射生产外部数据源Relation的接口。
    [java] view plain copy
     
    1. trait RelationProvider {  
    2.   /** 
    3.    * Returns a new base relation with the given parameters. 
    4.    * Note: the parameters' keywords are case insensitive and this insensitivity is enforced 
    5.    * by the Map that is passed to the function. 
    6.    */  
    7.   def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation  
    8. }  

    五、External Datasource定义示例

    在Spark1.2之后,json和parquet也改为通过实现External API来进行外部数据源查询的。
    下面以json的外部数据源定义为示例,说明是如何实现的:
     
    1、JsonRelation
     
    定义处理对于json文件的,schema和Scan策略,均基于JsonRDD,细节可以自行阅读JsonRDD。
     
    [java] view plain copy
     
    1. private[sql] case class JSONRelation(fileName: String, samplingRatio: Double)(  
    2.     @transient val sqlContext: SQLContext)  
    3.   extends TableScan {  
    4.   
    5.   private def baseRDD = sqlContext.sparkContext.textFile(fileName) //读取json file  
    6.   
    7.   override val schema =  
    8.     JsonRDD.inferSchema(  // jsonRDD的inferSchema方法,能自动识别json的schema,和类型type。  
    9.       baseRDD,  
    10.       samplingRatio,  
    11.       sqlContext.columnNameOfCorruptRecord)  
    12.   
    13.   override def buildScan() =  
    14.     JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.columnNameOfCorruptRecord) //这里还是JsonRDD,调用jsonStringToRow查询返回Row  
    15. }  
     
    2、DefaultSource
    parameters中可以获取到options中传入的path等自定义参数。
    这里接受传入的参数,来狗仔JsonRelation。
    [java] view plain copy
     
    1. private[sql] class DefaultSource extends RelationProvider {  
    2.   /** Returns a new base relation with the given parameters. */  
    3.   override def createRelation(  
    4.       sqlContext: SQLContext,  
    5.       parameters: Map[String, String]): BaseRelation = {  
    6.     val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified"))  
    7.     val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)  
    8.   
    9.     JSONRelation(fileName, samplingRatio)(sqlContext)  
    10.   }  
    11. }  
    六、总结
      External DataSource源码分析下来,可以总结为3部分。
      1、外部数据源的注册流程
      2、外部数据源Table查询的计划解析流程
      3、如何自定义一个外部数据源,重写BaseRelation定义外部数据源的schema和scan的规则。定义RelationProvider,如何生成外部数据源Relation。
      
      External Datasource此部分API还有可能在后续的build中改动,目前只是涉及到了查询,关于其它的操作还未涉及。
    ——EOF——
     

    原创文章,转载请注明:

    转载自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

    本文链接地址:http://blog.csdn.net/oopsoom/article/details/42064075  

    注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

    image

    转自:http://blog.csdn.net/oopsoom/article/details/42064075
  • 相关阅读:
    nowcoderD Xieldy And His Password
    Codeforces681D Gifts by the List
    nowcoder80D applese的生日
    Codeforces961E Tufurama
    Codeforces957 Mahmoud and Ehab and yet another xor task
    nowcoder82E 无向图中的最短距离
    nowcoder82B 区间的连续段
    Codeforces903E Swapping Characters
    Codeforces614C Peter and Snow Blower
    Codeforces614D Skills
  • 原文地址:https://www.cnblogs.com/sh425/p/7596435.html
Copyright © 2011-2022 走看看