zoukankan      html  css  js  c++  java
  • spark SQL学习(认识spark SQL)

    spark SQL初步认识

    spark SQL是spark的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象就是DataFrame。

    DataFrame:它可以根据很多源进行构建,包括:结构化的数据文件,hive中的表,外部的关系型数据库,以及RDD

    创建DataFrame

    数据文件students.json

    {"id":1, "name":"leo", "age":18}
    {"id":2, "name":"jack", "age":19}
    {"id":3, "name":"marry", "age":17}
    
    

    spark-shell里创建DataFrame

    //将文件上传到hdfs目录下
    hadoop@master:~/wujiadong$ hadoop fs -put students.json /student/2016113012/spark
    //启动spark shell
    hadoop@slave01:~$ spark-shell
    //导入SQLContext
    scala> import org.apache.spark.sql.SQLContext
    import org.apache.spark.sql.SQLContext
    //声明一个SQLContext的对象,以便对数据进行操作
    scala> val sql = new SQLContext(sc)
    warning: there was one deprecation warning; re-run with -deprecation for details
    sql: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@27acd9a7
    //读取数据
    scala> val students = sql.read.json("hdfs://master:9000/student/2016113012/spark/students.json")
    students: org.apache.spark.sql.DataFrame = [age: bigint, id: bigint ... 1 more field]
    //显示数据
    scala> students.show
    +---+---+-----+
    |age| id| name|
    +---+---+-----+
    | 18|  1|  leo|
    | 19|  2| jack|
    | 17|  3|marry|
    +---+---+-----+
    
    

    DataFrame常用操作

    scala> students.show
    +---+---+-----+
    |age| id| name|
    +---+---+-----+
    | 18|  1|  leo|
    | 19|  2| jack|
    | 17|  3|marry|
    +---+---+-----+
    
    scala> students.printSchema
    root
     |-- age: long (nullable = true)
     |-- id: long (nullable = true)
     |-- name: string (nullable = true)
     
     
    scala> students.select("name").show
    +-----+
    | name|
    +-----+
    |  leo|
    | jack|
    |marry|
    +-----+ 
    
    scala> students.select(students("name"),students("age")+1).show
    +-----+---------+
    | name|(age + 1)|
    +-----+---------+
    |  leo|       19|
    | jack|       20|
    |marry|       18|
    +-----+---------+
    
    scala> students.filter(students("age")>18).show
    +---+---+----+
    |age| id|name|
    +---+---+----+
    | 19|  2|jack|
    +---+---+----+
    
    
    scala> students.groupBy("age").count().show
    +---+-----+                                                                     
    |age|count|
    +---+-----+
    | 19|    1|
    | 17|    1|
    | 18|    1|
    +---+-----+
    
    

    两种方式将RDD转换成DataFrame

    1)基于反射方式

    package wujiadong_sparkSQL
    
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Created by Administrator on 2017/3/5.
      */
    object RDDDataFrameReflection {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("rdddatafromareflection")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        val fileRDD = sc.textFile("hdfs://master:9000/student/2016113012/data/students.txt")
        val lineRDD = fileRDD.map(line => line.split(","))
        //将RDD和case class关联
        val studentsRDD = lineRDD.map(x => Students(x(0).toInt,x(1),x(2).toInt))
        //在scala中使用反射方式,进行rdd到dataframe的转换,需要手动导入一个隐式转换
        import sqlContext.implicits._
        val studentsDF = studentsRDD.toDF()
        //注册表
        studentsDF.registerTempTable("t_students")
        val df = sqlContext.sql("select * from t_students")
        df.rdd.foreach(row => println(row(0)+","+row(1)+","+row(2)))
        df.rdd.saveAsTextFile("hdfs://master:9000/student/2016113012/data/out")
    
    
      }
    
    }
    //放到外面
    case class Students(id:Int,name:String,age:Int)
    
    

    运行结果

    hadoop@master:~/wujiadong$ spark-submit --class wujiadong_sparkSQL.RDDDataFrameReflection  --executor-memory 500m --total-executor-cores 2 /home/hadoop/wujiadong/wujiadong.spark.jar
    17/03/05 22:46:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    17/03/05 22:46:48 INFO Slf4jLogger: Slf4jLogger started
    17/03/05 22:46:48 INFO Remoting: Starting remoting
    17/03/05 22:46:49 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.131:34921]
    17/03/05 22:46:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
    17/03/05 22:46:51 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
    17/03/05 22:47:00 INFO FileInputFormat: Total input paths to process : 1
    17/03/05 22:47:07 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
    17/03/05 22:47:07 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
    17/03/05 22:47:07 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
    17/03/05 22:47:07 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
    17/03/05 22:47:07 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
    1,leo,17
    2,marry,17
    3,jack,18
    4,tom,19
    17/03/05 22:47:10 INFO FileOutputCommitter: Saved output of task 'attempt_201703052247_0001_m_000000_1' to hdfs://master:9000/student/2016113012/data/out/_temporary/0/task_201703052247_0001_m_000000
    
    
    

    2)编程接口方式

    package wujiadong_sparkSQL
    
    
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.{Row, SQLContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Created by Administrator on 2017/3/5.
      */
    object RDDDataFrameBianchen {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("RDDDataFrameBianchen")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        //指定地址创建rdd
        val studentsRDD = sc.textFile("hdfs://master:9000/student/2016113012/data/students.txt").map(_.split(","))
        //将rdd映射到rowRDD
        val RowRDD = studentsRDD.map(x => Row(x(0).toInt,x(1),x(2).toInt))
        //以编程方式动态构造元素据
        val schema = StructType(
          List(
            StructField("id",IntegerType,true),
            StructField("name",StringType,true),
            StructField("age",IntegerType,true)
          )
        )
        //将schema信息映射到rowRDD
        val studentsDF = sqlContext.createDataFrame(RowRDD,schema)
        //注册表
        studentsDF.registerTempTable("t_students")
        val df = sqlContext.sql("select * from t_students order by age")
        df.rdd.collect().foreach(row => println(row))
      }
    
    }
    
    
    

    运行结果

    hadoop@master:~/wujiadong$ spark-submit --class wujiadong_sparkSQL.RDDDataFrameBianchen --executor-memory 500m --total-executor-cores 2 /home/hadoop/wujiadong/wujiadong.spark.jar
    17/03/06 11:07:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    17/03/06 11:07:27 INFO Slf4jLogger: Slf4jLogger started
    17/03/06 11:07:27 INFO Remoting: Starting remoting
    17/03/06 11:07:28 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.131:49756]
    17/03/06 11:07:32 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
    17/03/06 11:07:38 INFO FileInputFormat: Total input paths to process : 1
    17/03/06 11:07:44 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
    17/03/06 11:07:44 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
    17/03/06 11:07:44 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
    17/03/06 11:07:44 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
    17/03/06 11:07:44 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
    [1,leo,17]                                                                      
    [2,marry,17]
    [3,jack,18]
    [4,tom,19]
    17/03/06 11:07:47 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
    17/03/06 11:07:47 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
    17/03/06 11:07:47 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
    
    
    

    DataFrame与RDD

    1)在spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格

    2)DataFrame与RDD的主要区别就是,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型

    参考资料
    http://9269309.blog.51cto.com/9259309/1851673

    参考资料
    http://blog.csdn.net/ronaldo4511/article/details/53406069

    参考资料
    http://spark.apache.org/docs/latest/sql-programming-guide.html#overview

  • 相关阅读:
    CloudFlare防护下的破绽:寻找真实IP的几条途径
    用Log Parser Studio分析IIS日志
    Log Parser 2.2 + Log Parser Lizard GUI 分析IIS日志示例
    wordPress Development
    MyISAM 与 InnoDB 的区别
    ubuntu安装wiz笔记
    chown命令
    (转载)我们工作到底为了什么
    DS_Store 是什么文件
    Linux命令 ,在当前目录下查找一个,或者多个文件
  • 原文地址:https://www.cnblogs.com/wujiadong2014/p/6516632.html
Copyright © 2011-2022 走看看