zoukankan      html  css  js  c++  java
  • SparkSQL之旅

    1.准备数据employee.txt

    1001,Gong Shaocheng,1
    1002,Li Dachao,1
    1003,Qiu Xin,1
    1004,Cheng Jiangzhong,2
    1005,Wo Binggang,3

    将数据放入hdfs

    [root@jfp3-1 spark-studio]# hdfs dfs -put employee.txt /user/spark_studio

    2.启动spark shell

    [root@jfp3-1 spark-1.0.0-bin-hadoop2]# ./bin/spark-shell --master spark://192.168.0.71:7077

    3.编写脚本

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext._
    
    case class Employee(employeeId: Int, name: String, departmentId: Int)
    
    // Create an RDD of Employee objects and register it as a table.
    val employees = sc.textFile("hdfs://jfp3-1:8020/user/spark_studio/employee.txt").map(_.split(",")).map(p => Employee(p(0), p(1), p(2).trim.toInt))
    employees.registerAsTable("employee")
    
    // SQL statements can be run by using the sql methods provided by sqlContext.
    val fsis = sql("SELECT name FROM employee WHERE departmentId = 1")
    
    // The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by ordinal.
    fsis.map(t => "Name: " + t(0)).collect().foreach(println)

    4.运行

    scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@17124319
    
    scala> import sqlContext._
    import sqlContext._
    
    scala> case class Employee(employeeId: String, name: String, departmentId: Int)
    defined class Employee
    
    scala> val employees = sc.textFile("hdfs://jfp3-1:8020/user/spark_studio/employee.txt").map(_.split(",")).map(p => Employee(p(0), p(1), p(2).trim.toInt))
    14/06/18 09:54:25 INFO MemoryStore: ensureFreeSpace(138763) called with curMem=0, maxMem=309225062
    14/06/18 09:54:25 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 135.5 KB, free 294.8 MB)
    employees: org.apache.spark.rdd.RDD[Employee] = MappedRDD[3] at map at <console>:19
    
    scala> employees.registerAsTable("employee")
    
    scala> val fsis = sql("SELECT name FROM employee WHERE departmentId = 1")
    14/06/18 09:54:44 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations
    14/06/18 09:54:44 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences
    14/06/18 09:54:44 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange
    14/06/18 09:54:44 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions
    fsis: org.apache.spark.sql.SchemaRDD = 
    SchemaRDD[6] at RDD at SchemaRDD.scala:98
    == Query Plan ==
    Project [name#1:1]
     Filter (departmentId#2:2 = 1)
      ExistingRdd [employeeId#0,name#1,departmentId#2], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:174
    
    scala> fsis.map(t => "Name: " + t(0)).collect().foreach(println)
    14/06/18 09:55:27 INFO FileInputFormat: Total input paths to process : 1
    14/06/18 09:55:27 INFO SparkContext: Starting job: collect at <console>:20
    14/06/18 09:55:27 INFO DAGScheduler: Got job 0 (collect at <console>:20) with 2 output partitions (allowLocal=false)
    14/06/18 09:55:27 INFO DAGScheduler: Final stage: Stage 0(collect at <console>:20)
    14/06/18 09:55:27 INFO DAGScheduler: Parents of final stage: List()
    14/06/18 09:55:27 INFO DAGScheduler: Missing parents: List()
    14/06/18 09:55:27 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[9] at map at <console>:20), which has no missing parents
    14/06/18 09:55:27 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[9] at map at <console>:20)
    14/06/18 09:55:27 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
    14/06/18 09:55:27 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor 1: jfp3-2 (NODE_LOCAL)
    14/06/18 09:55:27 INFO TaskSetManager: Serialized task 0.0:0 as 3508 bytes in 2 ms
    14/06/18 09:55:27 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on executor 2: jfp3-3 (NODE_LOCAL)
    14/06/18 09:55:27 INFO TaskSetManager: Serialized task 0.0:1 as 3508 bytes in 0 ms
    14/06/18 09:55:28 INFO TaskSetManager: Finished TID 1 in 1266 ms on jfp3-3 (progress: 1/2)
    14/06/18 09:55:28 INFO TaskSetManager: Finished TID 0 in 1276 ms on jfp3-2 (progress: 2/2)
    14/06/18 09:55:28 INFO DAGScheduler: Completed ResultTask(0, 1)
    14/06/18 09:55:28 INFO DAGScheduler: Completed ResultTask(0, 0)
    14/06/18 09:55:28 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
    14/06/18 09:55:28 INFO DAGScheduler: Stage 0 (collect at <console>:20) finished in 1.284 s
    14/06/18 09:55:28 INFO SparkContext: Job finished: collect at <console>:20, took 1.386154401 s
    Name: Gong Shaocheng Name: Li Dachao Name: Qiu Xin

     5.将数据存为parquet格式,并运行sql

    scala> val parquetFile = sqlContext.parquetFile("hdfs://jfp3-1:8020/user/spark_studio/employee.parquet")
    14/06/18 10:24:36 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations
    14/06/18 10:24:36 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences
    14/06/18 10:24:36 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange
    14/06/18 10:24:36 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions
    parquetFile: org.apache.spark.sql.SchemaRDD = 
    SchemaRDD[13] at RDD at SchemaRDD.scala:98
    == Query Plan ==
    ParquetTableScan [employeeId#9,name#10,departmentId#11], (ParquetRelation hdfs://jfp3-1:8020/user/spark_studio/employee.parquet), None
    
    
    scala> parquetFile.registerAsTable("parquetFile")
    
    
    scala> val telcos = sql("SELECT name FROM parquetFile WHERE departmentId = 3")
    14/06/18 10:24:37 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations
    14/06/18 10:24:37 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences
    14/06/18 10:24:37 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange
    14/06/18 10:24:37 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions
    14/06/18 10:24:37 INFO MemoryStore: ensureFreeSpace(180579) called with curMem=138763, maxMem=309225062
    14/06/18 10:24:37 INFO MemoryStore: Block broadcast_1 stored as values to memory (estimated size 176.3 KB, free 294.6 MB)
    telcos: org.apache.spark.sql.SchemaRDD = 
    SchemaRDD[14] at RDD at SchemaRDD.scala:98
    == Query Plan ==
    Project [name#10:0]
     Filter (departmentId#11:1 = 3)
      ParquetTableScan [name#10,departmentId#11], (ParquetRelation hdfs://jfp3-1:8020/user/spark_studio/employee.parquet), None
    
    scala> telcos.collect().foreach(println)
    14/06/18 10:24:40 INFO FileInputFormat: Total input paths to process : 2
    14/06/18 10:24:40 INFO ParquetInputFormat: Total input paths to process : 2
    14/06/18 10:24:40 INFO ParquetFileReader: reading summary file: hdfs://jfp3-1:8020/user/spark_studio/employee.parquet/_metadata
    14/06/18 10:24:40 INFO deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize
    14/06/18 10:24:40 INFO deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
    14/06/18 10:24:40 INFO SparkContext: Starting job: collect at <console>:20
    14/06/18 10:24:40 INFO DAGScheduler: Got job 2 (collect at <console>:20) with 2 output partitions (allowLocal=false)
    14/06/18 10:24:40 INFO DAGScheduler: Final stage: Stage 2(collect at <console>:20)
    14/06/18 10:24:40 INFO DAGScheduler: Parents of final stage: List()
    14/06/18 10:24:40 INFO DAGScheduler: Missing parents: List()
    14/06/18 10:24:40 INFO DAGScheduler: Submitting Stage 2 (SchemaRDD[14] at RDD at SchemaRDD.scala:98
    == Query Plan ==
    Project [name#10:0]
     Filter (departmentId#11:1 = 3)
      ParquetTableScan [name#10,departmentId#11], (ParquetRelation hdfs://jfp3-1:8020/user/spark_studio/employee.parquet), None), which has no missing parents
    14/06/18 10:24:40 INFO DAGScheduler: Submitting 2 missing tasks from Stage 2 (SchemaRDD[14] at RDD at SchemaRDD.scala:98
    == Query Plan ==
    Project [name#10:0]
     Filter (departmentId#11:1 = 3)
      ParquetTableScan [name#10,departmentId#11], (ParquetRelation hdfs://jfp3-1:8020/user/spark_studio/employee.parquet), None)
    14/06/18 10:24:40 INFO TaskSchedulerImpl: Adding task set 2.0 with 2 tasks
    14/06/18 10:24:40 INFO TaskSetManager: Starting task 2.0:0 as TID 4 on executor 2: jfp3-3 (NODE_LOCAL)
    14/06/18 10:24:40 INFO TaskSetManager: Serialized task 2.0:0 as 3116 bytes in 1 ms
    14/06/18 10:24:40 INFO TaskSetManager: Starting task 2.0:1 as TID 5 on executor 0: jfp3-4 (NODE_LOCAL)
    14/06/18 10:24:40 INFO TaskSetManager: Serialized task 2.0:1 as 3116 bytes in 1 ms
    14/06/18 10:24:40 INFO DAGScheduler: Completed ResultTask(2, 0)
    14/06/18 10:24:40 INFO TaskSetManager: Finished TID 4 in 200 ms on jfp3-3 (progress: 1/2)
    14/06/18 10:24:42 INFO DAGScheduler: Completed ResultTask(2, 1)
    14/06/18 10:24:42 INFO TaskSetManager: Finished TID 5 in 2162 ms on jfp3-4 (progress: 2/2)
    14/06/18 10:24:42 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
    14/06/18 10:24:42 INFO DAGScheduler: Stage 2 (collect at <console>:20) finished in 2.177 s
    14/06/18 10:24:42 INFO SparkContext: Job finished: collect at <console>:20, took 2.210887848 s
    [Wo Binggang]

     6. DSL syntax支持

    scala> all.collect().foreach(println)
    14/06/18 10:37:45 INFO SparkContext: Starting job: collect at <console>:24
    14/06/18 10:37:45 INFO DAGScheduler: Got job 6 (collect at <console>:24) with 2 output partitions (allowLocal=false)
    14/06/18 10:37:45 INFO DAGScheduler: Final stage: Stage 6(collect at <console>:24)
    14/06/18 10:37:45 INFO DAGScheduler: Parents of final stage: List()
    14/06/18 10:37:45 INFO DAGScheduler: Missing parents: List()
    14/06/18 10:37:45 INFO DAGScheduler: Submitting Stage 6 (SchemaRDD[33] at RDD at SchemaRDD.scala:98
    == Query Plan ==
    Project [name#19:1]
     Filter (departmentId#20:2 >= 1)
      ExistingRdd [employeeId#18,name#19,departmentId#20], MapPartitionsRDD[30] at mapPartitions at basicOperators.scala:174), which has no missing parents
    14/06/18 10:37:45 INFO DAGScheduler: Submitting 2 missing tasks from Stage 6 (SchemaRDD[33] at RDD at SchemaRDD.scala:98
    == Query Plan ==
    Project [name#19:1]
     Filter (departmentId#20:2 >= 1)
      ExistingRdd [employeeId#18,name#19,departmentId#20], MapPartitionsRDD[30] at mapPartitions at basicOperators.scala:174)
    14/06/18 10:37:45 INFO TaskSchedulerImpl: Adding task set 6.0 with 2 tasks
    14/06/18 10:37:45 INFO TaskSetManager: Starting task 6.0:0 as TID 200 on executor 2: jfp3-3 (NODE_LOCAL)
    14/06/18 10:37:45 INFO TaskSetManager: Serialized task 6.0:0 as 3541 bytes in 0 ms
    14/06/18 10:37:45 INFO TaskSetManager: Starting task 6.0:1 as TID 201 on executor 1: jfp3-2 (NODE_LOCAL)
    14/06/18 10:37:45 INFO TaskSetManager: Serialized task 6.0:1 as 3541 bytes in 1 ms
    14/06/18 10:37:45 INFO TaskSetManager: Finished TID 200 in 33 ms on jfp3-3 (progress: 1/2)
    14/06/18 10:37:45 INFO DAGScheduler: Completed ResultTask(6, 0)
    14/06/18 10:37:45 INFO DAGScheduler: Completed ResultTask(6, 1)
    14/06/18 10:37:45 INFO TaskSetManager: Finished TID 201 in 37 ms on jfp3-2 (progress: 2/2)
    14/06/18 10:37:45 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool 
    14/06/18 10:37:45 INFO DAGScheduler: Stage 6 (collect at <console>:24) finished in 0.039 s
    14/06/18 10:37:45 INFO SparkContext: Job finished: collect at <console>:24, took 0.052556716 s
    [Gong Shaocheng]
    [Li Dachao]
    [Qiu Xin]
    [Cheng Jiangzhong]
    [Wo Binggang]
  • 相关阅读:
    webpack4.0在项目中的安装配置
    Java调用开源GDAL解析dxf成shp,再调用开源GeoTools解析shp文件
    VUE-CLI3.0组件封装打包使用
    鼠标光标在input框,单击回车键后防止页面刷新的问题
    MapBox GL加载天地图以及加载导航控件
    web前端监控视频的展示
    css外部字体库文件的引用
    IIS上部署的程序,PLSQL能连上数据库,系统登录报错
    部署在IIS上的程序,可以找到文件夹,能看到文件却报404
    继承
  • 原文地址:https://www.cnblogs.com/littlesuccess/p/3794045.html
Copyright © 2011-2022 走看看