zoukankan      html  css  js  c++  java
  • Spark SQL 初步

        已经Spark Submit 2013哪里有介绍Spark SQL。就在很多人都介绍Catalyst查询优化框架。经过一年的发展后,。今年Spark Submit 2014在。Databricks放弃Shark 发育。和开关Spark SQL。是Shark继承了Hive太多,优化出现了瓶颈。如图:

        

        今天把Spark最新的代码签了下来。測试了一下:

    1、编译SparkSQL

    -bash-3.2$ git config --global http.sslVerify false
    -bash-3.2$ git clone https://github.com/apache/spark.git
    正克隆到 'spark'...
    remote: Reusing existing pack: 107821, done.
    remote: Counting objects: 103, done.
    remote: Compressing objects: 100% (72/72), done.
    remote: Total 107924 (delta 20), reused 64 (delta 16)
    Receiving objects: 100% (107924/107924), 69.06 MiB | 3.39 MiB/s, done.
    Resolving deltas: 100% (50174/50174), done.

        这里还是须要先build一下的,sbt/sbt assembly(怎样build匹配版本号,请參考Spark编译及集群搭建

        执行 sbt/sbt hive/console也会进行编译。

        最新的spark sql提供了一个console,在这里能够直接的执行交互式查下。也提供了几个样例。

    2、运行Spark SQL

        官方提供给我们了一个測试用例。

    通过查看log,find . -name TestHive*  找到了位于:

       /app/hadoop/shengli/spark/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveTestHive.scala 有兴趣能够自己打开编译调试下看看。

        首先进入控制台:

    sbt/sbt hive/console
    [info] Starting scala interpreter...
    [info] 
    import org.apache.spark.sql.catalyst.analysis._
    import org.apache.spark.sql.catalyst.dsl._
    import org.apache.spark.sql.catalyst.errors._
    import org.apache.spark.sql.catalyst.expressions._
    import org.apache.spark.sql.catalyst.plans.logical._
    import org.apache.spark.sql.catalyst.rules._
    import org.apache.spark.sql.catalyst.types._
    import org.apache.spark.sql.catalyst.util._
    import org.apache.spark.sql.execution
    import org.apache.spark.sql.hive._
    import org.apache.spark.sql.hive.test.TestHive._
    import org.apache.spark.sql.parquet.ParquetTestData
    Welcome to Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_20).
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala> 

    查看一下当前RunTime下都提供了哪些方法:

    scala> 
    <init>                        DslAttribute                  DslExpression                 DslString                     DslSymbol                     
    ParquetTestData               SqlCmd                        analyzer                      autoConvertJoinSize           binaryToLiteral               
    booleanToLiteral              byteToLiteral                 cacheTable                    cacheTables                   catalog                       
    classOf                       clear                         clone                         configure                     contains                      
    createParquetFile             createSchemaRDD               createTable                   decimalToLiteral              describedTable                
    doubleToLiteral               emptyResult                   eq                            equals                        executePlan                   
    executeSql                    execution                     finalize                      floatToLiteral                get                           
    getAll                        getClass                      getHiveFile                   getOption                     hashCode                      
    hiveDevHome                   hiveFilesTemp                 hiveHome                      hivePlanner                   hiveQTestUtilTables           
    hiveconf                      hiveql                        hql                           inRepoTests                   inferSchema                   
    intToLiteral                  isCached                      joinBroadcastTables           jsonFile                      jsonRDD                       
    loadTestTable                 logger                        logicalPlanToSparkQuery       longToLiteral                 metastorePath                 
    ne                            notify                        notifyAll                     numShufflePartitions          optimizer                     
    originalUdfs                  outputBuffer                  parquetFile                   parseSql                      parser                        
    planner                       prepareForExecution           registerRDDAsTable            registerTestTable             reset                         
    runHive                       runSqlHive                    sessionState                  set                           shortToLiteral                
    sparkContext                  sql                           stringToLiteral               symbolToUnresolvedAttribute   synchronized                  
    table                         testTables                    timestampToLiteral            toDebugString                 toString                      
    uncacheTable                  wait                          warehousePath                 

    我们发现,这个測试用例里面有一个testTables。因为这些成员都是lazy的。所以一開始没有被载入:

    查看測试用例要载入哪些表:

    scala> testTables
    14/07/02 18:45:59 INFO spark.SecurityManager: Changing view acls to: hadoop
    14/07/02 18:45:59 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop)
    14/07/02 18:46:00 INFO slf4j.Slf4jLogger: Slf4jLogger started
    14/07/02 18:46:00 INFO Remoting: Starting remoting
    14/07/02 18:46:00 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@web02.dw:42984]
    14/07/02 18:46:00 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@web02.dw:42984]
    14/07/02 18:46:00 INFO spark.SparkEnv: Registering MapOutputTracker
    14/07/02 18:46:00 INFO spark.SparkEnv: Registering BlockManagerMaster
    14/07/02 18:46:00 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140702184600-9e16
    14/07/02 18:46:00 INFO network.ConnectionManager: Bound socket to port 48348 with id = ConnectionManagerId(web02.dw,48348)
    14/07/02 18:46:00 INFO storage.MemoryStore: MemoryStore started with capacity 1097.0 MB
    14/07/02 18:46:00 INFO storage.BlockManagerMaster: Trying to register BlockManager
    14/07/02 18:46:00 INFO storage.BlockManagerInfo: Registering block manager web02.dw:48348 with 1097.0 MB RAM
    14/07/02 18:46:00 INFO storage.BlockManagerMaster: Registered BlockManager
    14/07/02 18:46:00 INFO spark.HttpServer: Starting HTTP Server
    14/07/02 18:46:01 INFO server.Server: jetty-8.1.14.v20131031
    14/07/02 18:46:01 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:36260
    14/07/02 18:46:01 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.1.8.207:36260
    14/07/02 18:46:01 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-ca40f66c-edc3-484f-b317-d3f512aab244
    14/07/02 18:46:01 INFO spark.HttpServer: Starting HTTP Server
    14/07/02 18:46:01 INFO server.Server: jetty-8.1.14.v20131031
    14/07/02 18:46:01 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:57821
    14/07/02 18:46:01 INFO server.Server: jetty-8.1.14.v20131031
    14/07/02 18:46:02 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
    14/07/02 18:46:02 INFO ui.SparkUI: Started SparkUI at http://web02.dw:4040
    metastore path is /tmp/sparkHiveMetastore8060064816530828092
    warehousePath path is /tmp/sparkHiveWarehouse5366068035857129261
    hiveHome path is Some(/home/hadoop/Java/lib/hive-0.6.0)
    hiveDevHome path is None
    res0: scala.collection.mutable.HashMap[String,org.apache.spark.sql.hive.test.TestHive.TestTable] = Map(sales -> TestTable(sales,WrappedArray(<function0>, <function0>)), src -> TestTable(src,WrappedArray(<function0>, <function0>)), src1 -> TestTable(src1,WrappedArray(<function0>, <function0>)), serdeins -> TestTable(serdeins,WrappedArray(<function0>, <function0>)), src_thrift -> TestTable(src_thrift,WrappedArray(<function0>)), srcpart -> TestTable(srcpart,WrappedArray(<function0>)), episodes -> TestTable(episodes,WrappedArray(<function0>, <function0>)), srcpart1 -> TestTable(srcpart1,WrappedArray(<function0>)))

    測试select语句

     1.首先声明一个sql
     2.这是測试用例会用hive的metastore。创建一个derby的数据库
     3.创建上述的所以表。并把数据载入进去。
     4.Parse这条select * from sales 语句。

     5. 生成SchemaRDD并产生查询计划。

     6. 当对querySales这个RDD运行Action的时候。会计算这条sql的运行。

    下面是运行的具体结果:(能够看到log打出的大概运行步骤)
    scala> val querySales = sql("select * from sales")
    14/07/02 18:51:19 INFO test.TestHive$: Loading test table sales
    14/07/02 18:51:19 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS sales (key STRING, value INT)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
    WITH SERDEPROPERTIES ("input.regex" = "([^ ]*)  ([^ ]*)")
           
    14/07/02 18:51:19 INFO parse.ParseDriver: Parse Completed
    14/07/02 18:51:19 INFO analysis.Analyzer: Max iterations (2) reached for batch MultiInstanceRelations
    14/07/02 18:51:19 INFO analysis.Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences
    14/07/02 18:51:19 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange
    14/07/02 18:51:19 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions
    14/07/02 18:51:19 INFO ql.Driver: <PERFLOG method=Driver.run>
    14/07/02 18:51:19 INFO ql.Driver: <PERFLOG method=TimeToSubmit>
    14/07/02 18:51:19 INFO ql.Driver: <PERFLOG method=compile>
    14/07/02 18:51:19 INFO ql.Driver: <PERFLOG method=parse>
    14/07/02 18:51:19 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS sales (key STRING, value INT)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
    WITH SERDEPROPERTIES ("input.regex" = "([^ ]*)  ([^ ]*)")
           
    14/07/02 18:51:19 INFO parse.ParseDriver: Parse Completed
    14/07/02 18:51:19 INFO ql.Driver: </PERFLOG method=parse start=1404298279883 end=1404298279885 duration=2>
    14/07/02 18:51:19 INFO ql.Driver: <PERFLOG method=semanticAnalyze>
    14/07/02 18:51:19 INFO parse.SemanticAnalyzer: Starting Semantic Analysis
    14/07/02 18:51:19 INFO parse.SemanticAnalyzer: Creating table sales position=27
    14/07/02 18:51:20 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
    14/07/02 18:51:20 INFO metastore.ObjectStore: ObjectStore, initialize called
    14/07/02 18:51:20 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored
    14/07/02 18:51:21 WARN bonecp.BoneCPConfig: Max Connections < 1. Setting to 20
    14/07/02 18:51:25 INFO metastore.ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
    14/07/02 18:51:25 INFO metastore.ObjectStore: Initialized ObjectStore
    14/07/02 18:51:26 WARN bonecp.BoneCPConfig: Max Connections < 1. Setting to 20
    14/07/02 18:51:26 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 0.12.0
    14/07/02 18:51:27 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales
    14/07/02 18:51:27 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=get_table : db=default tbl=sales
    14/07/02 18:51:27 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
    14/07/02 18:51:27 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
    14/07/02 18:51:28 INFO ql.Driver: Semantic Analysis Completed
    14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=semanticAnalyze start=1404298279885 end=1404298288331 duration=8446>
    14/07/02 18:51:28 INFO ql.Driver: Returning Hive schema: Schema(fieldSchemas:null, properties:null)
    14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=compile start=1404298279840 end=1404298288340 duration=8500>
    14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=Driver.execute>
    14/07/02 18:51:28 INFO ql.Driver: Starting command: CREATE TABLE IF NOT EXISTS sales (key STRING, value INT)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
    WITH SERDEPROPERTIES ("input.regex" = "([^ ]*)  ([^ ]*)")
           
    14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=TimeToSubmit start=1404298279840 end=1404298288351 duration=8511>
    14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=runTasks>
    14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=task.DDL.Stage-0>
    14/07/02 18:51:28 INFO metastore.HiveMetaStore: 0: create_table: Table(tableName:sales, dbName:default, owner:hadoop, createTime:1404298288, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:key, type:string, comment:null), FieldSchema(name:value, type:int, comment:null)], location:null, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.RegexSerDe, parameters:{serialization.format=1, input.regex=([^ ]*)        ([^ ]*)}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE, privileges:PrincipalPrivilegeSet(userPrivileges:null, groupPrivileges:null, rolePrivileges:null))
    14/07/02 18:51:28 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=create_table: Table(tableName:sales, dbName:default, owner:hadoop, createTime:1404298288, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:key, type:string, comment:null), FieldSchema(name:value, type:int, comment:null)], location:null, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.RegexSerDe, parameters:{serialization.format=1, input.regex=([^ ]*)       ([^ ]*)}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE, privileges:PrincipalPrivilegeSet(userPrivileges:null, groupPrivileges:null, rolePrivileges:null))
    14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=task.DDL.Stage-0 start=1404298288351 end=1404298288589 duration=238>
    14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=runTasks start=1404298288351 end=1404298288589 duration=238>
    14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=Driver.execute start=1404298288340 end=1404298288589 duration=249>
    14/07/02 18:51:28 INFO ql.Driver: OK
    14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=releaseLocks>
    14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=releaseLocks start=1404298288590 end=1404298288590 duration=0>
    14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=Driver.run start=1404298279839 end=1404298288590 duration=8751>
    14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=releaseLocks>
    14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=releaseLocks start=1404298288590 end=1404298288590 duration=0>
    14/07/02 18:51:28 INFO parse.ParseDriver: Parsing command: LOAD DATA LOCAL INPATH 'sql/hive/src/test/resources/data/files/sales.txt' INTO TABLE sales
    14/07/02 18:51:28 INFO parse.ParseDriver: Parse Completed
    14/07/02 18:51:28 INFO analysis.Analyzer: Max iterations (2) reached for batch MultiInstanceRelations
    14/07/02 18:51:28 INFO analysis.Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences
    14/07/02 18:51:28 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange
    14/07/02 18:51:28 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions
    14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=Driver.run>
    14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=TimeToSubmit>
    14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=compile>
    14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=parse>
    14/07/02 18:51:28 INFO parse.ParseDriver: Parsing command: LOAD DATA LOCAL INPATH 'sql/hive/src/test/resources/data/files/sales.txt' INTO TABLE sales
    14/07/02 18:51:28 INFO parse.ParseDriver: Parse Completed
    14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=parse start=1404298288629 end=1404298288629 duration=0>
    14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=semanticAnalyze>
    14/07/02 18:51:28 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales
    14/07/02 18:51:28 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=get_table : db=default tbl=sales
    14/07/02 18:51:28 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    14/07/02 18:51:28 INFO ql.Driver: Semantic Analysis Completed
    14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=semanticAnalyze start=1404298288630 end=1404298288942 duration=312>
    14/07/02 18:51:28 INFO ql.Driver: Returning Hive schema: Schema(fieldSchemas:null, properties:null)
    14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=compile start=1404298288628 end=1404298288943 duration=315>
    14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=Driver.execute>
    14/07/02 18:51:28 INFO ql.Driver: Starting command: LOAD DATA LOCAL INPATH 'sql/hive/src/test/resources/data/files/sales.txt' INTO TABLE sales
    14/07/02 18:51:28 INFO ql.Driver: </PERFLOG method=TimeToSubmit start=1404298288628 end=1404298288943 duration=315>
    14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=runTasks>
    14/07/02 18:51:28 INFO ql.Driver: <PERFLOG method=task.COPY.Stage-0>
    14/07/02 18:51:28 INFO exec.Task: Copying data from file:/app/hadoop/spark/sql/hive/src/test/resources/data/files/sales.txt to file:/tmp/hive-hadoop/hive_2014-07-02_18-51-28_629_2309366591646930035-1/-ext-10000
    14/07/02 18:51:28 INFO exec.Task: Copying file: file:/app/hadoop/spark/sql/hive/src/test/resources/data/files/sales.txt
    14/07/02 18:51:29 INFO ql.Driver: </PERFLOG method=task.COPY.Stage-0 start=1404298288943 end=1404298289037 duration=94>
    14/07/02 18:51:29 INFO ql.Driver: <PERFLOG method=task.MOVE.Stage-1>
    14/07/02 18:51:29 INFO exec.Task: Loading data to table default.sales from file:/tmp/hive-hadoop/hive_2014-07-02_18-51-28_629_2309366591646930035-1/-ext-10000
    14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales
    14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=get_table : db=default tbl=sales
    14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales
    14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=get_table : db=default tbl=sales
    14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: alter_table: db=default tbl=sales newtbl=sales
    14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=alter_table: db=default tbl=sales newtbl=sales
    14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales
    14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=get_table : db=default tbl=sales
    14/07/02 18:51:29 INFO ql.Driver: </PERFLOG method=task.MOVE.Stage-1 start=1404298289037 end=1404298289196 duration=159>
    14/07/02 18:51:29 INFO ql.Driver: <PERFLOG method=task.STATS.Stage-2>
    14/07/02 18:51:29 INFO exec.StatsTask: Executing stats task
    14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales
    14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=get_table : db=default tbl=sales
    14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales
    14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=get_table : db=default tbl=sales
    14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: alter_table: db=default tbl=sales newtbl=sales
    14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=alter_table: db=default tbl=sales newtbl=sales
    14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales
    14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=get_table : db=default tbl=sales
    14/07/02 18:51:29 INFO exec.Task: Table default.sales stats: [num_partitions: 0, num_files: 1, num_rows: 0, total_size: 13, raw_data_size: 0]
    14/07/02 18:51:29 INFO ql.Driver: </PERFLOG method=task.STATS.Stage-2 start=1404298289196 end=1404298289282 duration=86>
    14/07/02 18:51:29 INFO ql.Driver: </PERFLOG method=runTasks start=1404298288943 end=1404298289282 duration=339>
    14/07/02 18:51:29 INFO ql.Driver: </PERFLOG method=Driver.execute start=1404298288943 end=1404298289282 duration=339>
    14/07/02 18:51:29 INFO ql.Driver: OK
    14/07/02 18:51:29 INFO ql.Driver: <PERFLOG method=releaseLocks>
    14/07/02 18:51:29 INFO ql.Driver: </PERFLOG method=releaseLocks start=1404298289282 end=1404298289282 duration=0>
    14/07/02 18:51:29 INFO ql.Driver: </PERFLOG method=Driver.run start=1404298288628 end=1404298289282 duration=654>
    14/07/02 18:51:29 INFO ql.Driver: <PERFLOG method=releaseLocks>
    14/07/02 18:51:29 INFO ql.Driver: </PERFLOG method=releaseLocks start=1404298289282 end=1404298289282 duration=0>
    14/07/02 18:51:29 INFO analysis.Analyzer: Max iterations (2) reached for batch MultiInstanceRelations
    14/07/02 18:51:29 INFO analysis.Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences
    14/07/02 18:51:29 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=sales
    14/07/02 18:51:29 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=get_table : db=default tbl=sales
    14/07/02 18:51:29 INFO storage.MemoryStore: ensureFreeSpace(355913) called with curMem=0, maxMem=1150314086
    14/07/02 18:51:29 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 347.6 KB, free 1096.7 MB)
    14/07/02 18:51:29 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange
    14/07/02 18:51:29 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions
    querySales: org.apache.spark.sql.SchemaRDD = 
    SchemaRDD[0] at RDD at SchemaRDD.scala:100
    == Query Plan ==
    HiveTableScan [key#2,value#3], (MetastoreRelation default, sales, None), None

    运行spark sql
    scala> querySales.collect()
    14/07/02 18:57:32 WARN snappy.LoadSnappy: Snappy native library is available
    14/07/02 18:57:32 WARN snappy.LoadSnappy: Snappy native library not loaded
    14/07/02 18:57:32 INFO mapred.FileInputFormat: Total input paths to process : 1
    14/07/02 18:57:32 INFO spark.SparkContext: Starting job: collect at SparkPlan.scala:52
    14/07/02 18:57:32 INFO scheduler.DAGScheduler: Got job 0 (collect at SparkPlan.scala:52) with 3 output partitions (allowLocal=false)
    14/07/02 18:57:32 INFO scheduler.DAGScheduler: Final stage: Stage 0(collect at SparkPlan.scala:52)
    14/07/02 18:57:32 INFO scheduler.DAGScheduler: Parents of final stage: List()
    14/07/02 18:57:32 INFO scheduler.DAGScheduler: Missing parents: List()
    14/07/02 18:57:32 INFO scheduler.DAGScheduler: Submitting Stage 0 (MappedRDD[5] at map at SparkPlan.scala:52), which has no missing parents
    14/07/02 18:57:32 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from Stage 0 (MappedRDD[5] at map at SparkPlan.scala:52)
    14/07/02 18:57:32 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
    14/07/02 18:57:32 INFO scheduler.TaskSetManager: Re-computing pending task lists.
    14/07/02 18:57:32 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
    14/07/02 18:57:32 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 3606 bytes in 20 ms
    14/07/02 18:57:32 INFO executor.Executor: Running task ID 0
    14/07/02 18:57:32 INFO storage.BlockManager: Found block broadcast_0 locally
    14/07/02 18:57:32 INFO rdd.HadoopRDD: Input split: file:/tmp/sparkHiveWarehouse5366068035857129261/sales/sales.txt:0+6
    14/07/02 18:57:32 INFO executor.Executor: Serialized size of result for 0 is 1947
    14/07/02 18:57:32 INFO executor.Executor: Sending result for 0 directly to driver
    14/07/02 18:57:32 INFO executor.Executor: Finished task ID 0
    14/07/02 18:57:32 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID 1 on executor localhost: localhost (PROCESS_LOCAL)
    14/07/02 18:57:32 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as 3606 bytes in 0 ms
    14/07/02 18:57:32 INFO executor.Executor: Running task ID 1
    14/07/02 18:57:32 INFO storage.BlockManager: Found block broadcast_0 locally
    14/07/02 18:57:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
    14/07/02 18:57:32 INFO scheduler.TaskSetManager: Finished TID 0 in 243 ms on localhost (progress: 1/3)
    14/07/02 18:57:32 INFO rdd.HadoopRDD: Input split: file:/tmp/sparkHiveWarehouse5366068035857129261/sales/sales.txt:6+6
    14/07/02 18:57:32 INFO executor.Executor: Serialized size of result for 1 is 1948
    14/07/02 18:57:32 INFO executor.Executor: Sending result for 1 directly to driver
    14/07/02 18:57:32 INFO executor.Executor: Finished task ID 1
    14/07/02 18:57:32 INFO scheduler.TaskSetManager: Starting task 0.0:2 as TID 2 on executor localhost: localhost (PROCESS_LOCAL)
    14/07/02 18:57:32 INFO scheduler.TaskSetManager: Serialized task 0.0:2 as 3606 bytes in 1 ms
    14/07/02 18:57:32 INFO executor.Executor: Running task ID 2
    14/07/02 18:57:32 INFO scheduler.TaskSetManager: Finished TID 1 in 36 ms on localhost (progress: 2/3)
    14/07/02 18:57:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 1)
    14/07/02 18:57:32 INFO storage.BlockManager: Found block broadcast_0 locally
    14/07/02 18:57:32 INFO rdd.HadoopRDD: Input split: file:/tmp/sparkHiveWarehouse5366068035857129261/sales/sales.txt:12+1
    14/07/02 18:57:32 INFO executor.Executor: Serialized size of result for 2 is 1721
    14/07/02 18:57:32 INFO executor.Executor: Sending result for 2 directly to driver
    14/07/02 18:57:32 INFO executor.Executor: Finished task ID 2
    14/07/02 18:57:32 INFO scheduler.TaskSetManager: Finished TID 2 in 96 ms on localhost (progress: 3/3)
    14/07/02 18:57:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 2)
    14/07/02 18:57:32 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
    14/07/02 18:57:32 INFO scheduler.DAGScheduler: Stage 0 (collect at SparkPlan.scala:52) finished in 0.366 s
    14/07/02 18:57:32 INFO spark.SparkContext: Job finished: collect at SparkPlan.scala:52, took 0.454512333 s
    res1: Array[org.apache.spark.sql.Row] = Array([Joe,2], [Hank,2])

    运行结果:
    Array([Joe,2], [Hank,2])

    查询计划优化:

    <pre name="code" class="python">scala> val query = sql("SELECT * FROM (SELECT * FROM src) a")
    
    query: org.apache.spark.sql.SchemaRDD = 
    SchemaRDD[6] at RDD at SchemaRDD.scala:100
    == Query Plan ==
    <span style="font-family: Arial, Helvetica, sans-serif;">HiveTableScan [key#6,value#7], (MetastoreRelation default, src, None), None</span>
    
    

    3、Spark SQL LINQ

        在Spark相关的框架里。一切的核心体都是RDD,SchemaRDD提供类似LINQ的语法api:

        such as take, where... etc

    scala> query.
    ++                         aggregate                  as                         asInstanceOf               baseLogicalPlan            baseSchemaRDD              
    cache                      cartesian                  checkpoint                 coalesce                   collect                    compute                    
    context                    count                      countApprox                countApproxDistinct        countByValue               countByValueApprox         
    dependencies               distinct                   filter                     filterWith                 first                      flatMap                    
    flatMapWith                fold                       foreach                    foreachPartition           foreachWith                generate                   
    getCheckpointFile          getPartitions              getStorageLevel            glom                       groupBy                    id                         
    insertInto                 intersection               isCheckpointed             isInstanceOf               iterator                   join                       
    keyBy                      limit                      map                        mapPartitions              mapPartitionsWithContext   mapPartitionsWithIndex     
    mapPartitionsWithSplit     mapWith                    max                        min                        name                       name_=                     
    orderBy                    partitioner                partitions                 persist                    pipe                       preferredLocations         
    printSchema                queryExecution             randomSplit                reduce                     registerAsTable            repartition                
    sample                     saveAsObjectFile           saveAsParquetFile          saveAsTable                saveAsTextFile             schemaString               
    select                     setName                    sortBy                     sparkContext               sqlContext                 subtract                   
    take                       takeOrdered                takeSample                 toArray                    toDebugString              toJavaRDD                  
    toJavaSchemaRDD            toLocalIterator            toSchemaRDD                toString                   top                        union                      
    unionAll                   unpersist                  where                      zip                        zipPartitions              zipWithIndex               
    zipWithUniqueId            


    注意key前面带了一撇。这个是Catalyst的查下语法,以后我会写一篇具体介绍:
    scala> query.where('key === 100).collect()
    14/07/02 19:07:55 INFO analysis.Analyzer: Max iterations (2) reached for batch MultiInstanceRelations
    14/07/02 19:07:55 INFO analysis.Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences
    14/07/02 19:07:55 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=src
    14/07/02 19:07:55 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=get_table : db=default tbl=src
    14/07/02 19:07:55 INFO storage.MemoryStore: ensureFreeSpace(358003) called with curMem=713876, maxMem=1150314086
    14/07/02 19:07:55 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 349.6 KB, free 1096.0 MB)
    14/07/02 19:07:55 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange
    14/07/02 19:07:55 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions
    14/07/02 19:07:55 INFO mapred.FileInputFormat: Total input paths to process : 1
    14/07/02 19:07:55 INFO spark.SparkContext: Starting job: collect at SparkPlan.scala:52
    14/07/02 19:07:55 INFO scheduler.DAGScheduler: Got job 2 (collect at SparkPlan.scala:52) with 2 output partitions (allowLocal=false)
    14/07/02 19:07:55 INFO scheduler.DAGScheduler: Final stage: Stage 2(collect at SparkPlan.scala:52)
    14/07/02 19:07:55 INFO scheduler.DAGScheduler: Parents of final stage: List()
    14/07/02 19:07:55 INFO scheduler.DAGScheduler: Missing parents: List()
    14/07/02 19:07:55 INFO scheduler.DAGScheduler: Submitting Stage 2 (MappedRDD[20] at map at SparkPlan.scala:52), which has no missing parents
    14/07/02 19:07:55 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 2 (MappedRDD[20] at map at SparkPlan.scala:52)
    14/07/02 19:07:55 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 2 tasks
    14/07/02 19:07:55 INFO scheduler.TaskSetManager: Starting task 2.0:0 as TID 5 on executor localhost: localhost (PROCESS_LOCAL)
    14/07/02 19:07:55 INFO scheduler.TaskSetManager: Serialized task 2.0:0 as 3854 bytes in 0 ms
    14/07/02 19:07:55 INFO executor.Executor: Running task ID 5
    14/07/02 19:07:55 INFO storage.BlockManager: Found block broadcast_3 locally
    14/07/02 19:07:55 INFO rdd.HadoopRDD: Input split: file:/tmp/sparkHiveWarehouse5366068035857129261/src/kv1.txt:0+2906
    14/07/02 19:07:55 INFO executor.Executor: Serialized size of result for 5 is 1951
    14/07/02 19:07:55 INFO executor.Executor: Sending result for 5 directly to driver
    14/07/02 19:07:55 INFO scheduler.TaskSetManager: Starting task 2.0:1 as TID 6 on executor localhost: localhost (PROCESS_LOCAL)
    14/07/02 19:07:55 INFO scheduler.TaskSetManager: Serialized task 2.0:1 as 3854 bytes in 0 ms
    14/07/02 19:07:55 INFO executor.Executor: Finished task ID 5
    14/07/02 19:07:55 INFO executor.Executor: Running task ID 6
    14/07/02 19:07:55 INFO scheduler.TaskSetManager: Finished TID 5 in 44 ms on localhost (progress: 1/2)
    14/07/02 19:07:55 INFO scheduler.DAGScheduler: Completed ResultTask(2, 0)
    14/07/02 19:07:55 INFO storage.BlockManager: Found block broadcast_3 locally
    14/07/02 19:07:55 INFO rdd.HadoopRDD: Input split: file:/tmp/sparkHiveWarehouse5366068035857129261/src/kv1.txt:2906+2906
    14/07/02 19:07:55 INFO executor.Executor: Serialized size of result for 6 is 1951
    14/07/02 19:07:55 INFO executor.Executor: Sending result for 6 directly to driver
    14/07/02 19:07:55 INFO executor.Executor: Finished task ID 6
    14/07/02 19:07:55 INFO scheduler.TaskSetManager: Finished TID 6 in 19 ms on localhost (progress: 2/2)
    14/07/02 19:07:55 INFO scheduler.DAGScheduler: Completed ResultTask(2, 1)
    14/07/02 19:07:55 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
    14/07/02 19:07:55 INFO scheduler.DAGScheduler: Stage 2 (collect at SparkPlan.scala:52) finished in 0.062 s
    14/07/02 19:07:55 INFO spark.SparkContext: Job finished: collect at SparkPlan.scala:52, took 0.06947625 s
    res6: Array[org.apache.spark.sql.Row] = Array([100,val_100], [100,val_100])
    查询出2个key为100的结果。


    4、总结:

        Spark SQL 提供了一种Catalyst查询优化框架,在把SQL解析成逻辑运行计划。对运行计划优化,最后变成RDD操作。多种框架一种API,简单,规范。

        本文暂且为止,兴许还会继续相关的深入研究。


    ——EOF——

    原创文章。转载请注明来自http://blog.csdn.net/oopsoom/article/details/36440821

  • 相关阅读:
    spring websocket 记录
    mysql-enum
    再问jvm内存管理
    video相关参数、操作和事件
    监听页面关闭和刷新的总结
    VUE路由新页面打开的方法总结
    VUE的一个数据绑定与页面刷新相关的bug
    element-ui笔记
    Vue笔记(props和 mounted)
    Python总结(二)
  • 原文地址:https://www.cnblogs.com/blfshiye/p/4913945.html
Copyright © 2011-2022 走看看