zoukankan      html  css  js  c++  java
  • 第十篇:Spark SQL 源码分析之 In-Memory Columnar Storage源码分析之 query

    /** Spark SQL源码分析系列文章*/

        前面讲到了Spark SQL In-Memory Columnar Storage的存储结构是基于列存储的。

        那么基于以上存储结构,我们查询cache在jvm内的数据又是如何查询的,本文将揭示查询In-Memory Data的方式。

    一、引子

    本例使用hive console里查询cache后的src表。
    select value from src

    当我们将src表cache到了内存后,再次查询src,可以通过analyzed执行计划来观察内部调用。

    即parse后,会形成InMemoryRelation结点,最后执行物理计划时,会调用InMemoryColumnarTableScan这个结点的方法。

    如下:

    [java] view plain copy
     
    1. scala> val exe = executePlan(sql("select value from src").queryExecution.analyzed)  
    2. 14/09/26 10:30:26 INFO parse.ParseDriver: Parsing command: select value from src  
    3. 14/09/26 10:30:26 INFO parse.ParseDriver: Parse Completed  
    4. exe: org.apache.spark.sql.hive.test.TestHive.QueryExecution =   
    5. == Parsed Logical Plan ==  
    6. Project [value#5]  
    7.  InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)  
    8.   
    9. == Analyzed Logical Plan ==  
    10. Project [value#5]  
    11.  InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)  
    12.   
    13. == Optimized Logical Plan ==  
    14. Project [value#5]  
    15.  InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)  
    16.   
    17. == Physical Plan ==  
    18. InMemoryColumnarTableScan [value#5], (InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)) //查询内存中表的入口  
    19.   
    20. Code Generation: false  
    21. == RDD ==  

    二、InMemoryColumnarTableScan

    InMemoryColumnarTableScan是Catalyst里的一个叶子结点,包含了要查询的attributes,和InMemoryRelation(封装了我们缓存的In-Columnar Storage数据结构)。
    执行叶子节点,出发execute方法对内存数据进行查询。
    1、查询时,调用InMemoryRelation,对其封装的内存数据结构的每个分区进行操作。
    2、获取要请求的attributes,如上,查询请求的是src表的value属性。
    3、根据目的查询表达式,来获取在对应存储结构中,请求列的index索引。
    4、通过ColumnAccessor来对每个buffer进行访问,获取对应查询数据,并封装为Row对象返回。

    [java] view plain copy
     
    1. private[sql] case class InMemoryColumnarTableScan(  
    2.     attributes: Seq[Attribute],  
    3.     relation: InMemoryRelation)  
    4.   extends LeafNode {  
    5.   
    6.   
    7.   override def output: Seq[Attribute] = attributes  
    8.   
    9.   
    10.   override def execute() = {  
    11.     relation.cachedColumnBuffers.mapPartitions { iterator =>  
    12.       // Find the ordinals of the requested columns.  If none are requested, use the first.  
    13.       val requestedColumns = if (attributes.isEmpty) {  
    14.         Seq(0)  
    15.       } else {  
    16.         attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId)) //根据表达式exprId找出对应列的ByteBuffer的索引  
    17.       }  
    18.   
    19.   
    20.       iterator  
    21.         .map(batch => requestedColumns.map(batch(_)).map(ColumnAccessor(_)))//根据索引取得对应请求列的ByteBuffer,并封装为ColumnAccessor。  
    22.         .flatMap { columnAccessors =>  
    23.           val nextRow = new GenericMutableRow(columnAccessors.length) //Row的长度  
    24.           new Iterator[Row] {  
    25.             override def next() = {  
    26.               var i = 0  
    27.               while (i < nextRow.length) {  
    28.                 columnAccessors(i).extractTo(nextRow, i) //根据对应index和长度,从byterbuffer里取得值,封装到row里  
    29.                 i += 1  
    30.               }  
    31.               nextRow  
    32.             }  
    33.   
    34.   
    35.             override def hasNext = columnAccessors.head.hasNext  
    36.           }  
    37.         }  
    38.     }  
    39.   }  
    40. }  

    查询请求的列,如下:

    [java] view plain copy
     
    1. scala> exe.optimizedPlan  
    2. res93: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   
    3. Project [value#5]  
    4.  InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)  
    5.   
    6.   
    7. scala> val relation =  exe.optimizedPlan(1)  
    8. relation: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   
    9. InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)  
    10.   
    11.   
    12. scala> val request_relation = exe.executedPlan  
    13. request_relation: org.apache.spark.sql.execution.SparkPlan =   
    14. InMemoryColumnarTableScan [value#5], (InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None))  
    15.   
    16.   
    17. scala> request_relation.output //请求的列,我们请求的只有value列  
    18. res95: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(value#5)  
    19.   
    20. scala> relation.output //默认保存在relation中的所有列  
    21. res96: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(key#4, value#5)  
    22.   
    23.   
    24. scala> val attributes = request_relation.output   
    25. attributes: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(value#5)  



    整个流程很简洁,关键步骤是第三步。根据ExprId来查找到,请求列的索引
    attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
    [java] view plain copy
     
    1. //根据exprId找出对应ID  
    2. scala> val attr_index = attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))  
    3. attr_index: Seq[Int] = ArrayBuffer(1) //找到请求的列value的索引是1, 我们查询就从Index为1的bytebuffer中,请求数据  
    4.   
    5. scala> relation.output.foreach(e=>println(e.exprId))  
    6. ExprId(4)    //对应<span style="font-family: Arial, Helvetica, sans-serif;">[key#4,value#5]</span>  
    7. ExprId(5)  
    8.   
    9. scala> request_relation.output.foreach(e=>println(e.exprId))  
    10. ExprId(5)  

    三、ColumnAccessor

    ColumnAccessor对应每一种类型,类图如下:

    最后返回一个新的迭代器:

    [java] view plain copy
     
    1. new Iterator[Row] {  
    2.   override def next() = {  
    3.     var i = 0  
    4.     while (i < nextRow.length) { //请求列的长度  
    5.       columnAccessors(i).extractTo(nextRow, i)//调用columnType.setField(row, ordinal, extractSingle(buffer))解析buffer  
    6.       i += 1  
    7.     }  
    8.     nextRow//返回解析后的row  
    9.   }  
    10.   
    11.   override def hasNext = columnAccessors.head.hasNext  
    12. }  

    四、总结

        Spark SQL In-Memory Columnar Storage的查询相对来说还是比较简单的,其查询思想主要和存储的数据结构有关。

        即存储时,按每列放到一个bytebuffer,形成一个bytebuffer数组。

        查询时,根据请求列的exprId查找到上述数组的索引,然后使用ColumnAccessor对buffer中字段进行解析,最后封装为Row对象,返回。

    ——EOF——

    创文章,转载请注明:

    转载自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

    本文链接地址:http://blog.csdn.net/oopsoom/article/details/39577419

    注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

    image

    转自:http://blog.csdn.net/oopsoom/article/details/39577419

  • 相关阅读:
    深度解析VC中的消息传递机制(上)
    DLL的远程注入技术
    一些游戏编程的书[转]
    [转]小小C的C++之歌
    Windows Server 2008无法使用arp命令添加静态MAC绑定
    如何调用未公开的API函数[转]
    IOCP中的socket错误和资源释放处理方法
    TinyXML应用例子
    微软C/C++ 编译器选项参考
    [摘录]这几本游戏编程书籍你看过吗?
  • 原文地址:https://www.cnblogs.com/sh425/p/7596432.html
Copyright © 2011-2022 走看看