zoukankan      html  css  js  c++  java
  • sparkSQL-1

    spark SQL

    1.sparksql由来

    • shark是专门针对于spark构建大规模数据仓库系统的一个框架。它依赖于Hive,同时兼容spark。hivesql底层是把sql解析成mapreduce程序,shark(sql解析引擎)是把sql后期解析成了spark任务。
    • 后期由于程序对性能要求比较高,发现mapreduce思想限制了shark发展,同时由于shark依赖于hive,同时跟spark也需要有一定兼容性,后期它版本升级之后,都会导致shark的版本升级。最终把shark这个框架废弃,spark自己开发一个框架,不再依赖hive。开发了sql解析引擎sparksql。

    2.sparksql?

    • SparkSQL是apache Spark用来处理结构化数据的一个模块。它提供了一个编程抽象叫做DataFrame,并且作为分布式SQL查询引擎的作用。

    3.sparksql的特性

    • 易整合:

      • 可以将sparksql与spark程序(RDD)进行混合使用
      • 可以使用不同语言进行代码开发:java,scala,python,R
    • 统一的数据源访问

      • sparksql后期可以采用一种统一方式去对接任意的外部数据源对接。
    • 兼容Hive

      • sparksql可以支持hivesql
    • 支持标准数据连接

      • 通过JDBC 或 ODBC 连接数据

    4.DataFrame概述

    • dataFram前身是SchemaRDD,从Spark 1.3.0 开始将SchemaRDD更名为DataFrame,与SchemaRDD的主要区别是:DataFrame不再直接继承RDD,而是自己实现RDD的绝大多数方法,当然你仍然可以在DataFrame上调用RDD方法将其转换为一个RDD.

    • 在Spark中,DataFrame是一种以RDD为基础分部署数据集,类似于传统二维数据表。DataFrame带有Schema元信息,即DataFrame所表示的二位表数据集的每一列都带有名称和类型,但底层做了更多优化,DataFrame可以从很多数据源构建,比如:已经存在的RDD,结构化文件,外部数据库,Hive表。

    5.DataFrame与RDD区别

    • RDD可看作是分布式的对象的集合,Spark并不知道对象的详细模式信息,DataFrame可看作是分布式的Row对象的集合,其提供了由列组成的详细模式信息(就是列的名称和类型),使得Spark SQL可以进行某些形式的执行优化。DataFrame和普通的RDD的逻辑框架区别如下所示:

    1.左侧RDD[Person] 虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。
    2.而右侧的DataFrame却提供了详细的结构信息,使得Spark SQl可以清楚知道该数据集中包含的列和每列的类型是什么。DataFrame相比RDD多了数据结构信息(schema).这样看起来像一张表。并且DataFrame还引入了off-heap,意味着JVM堆以外的内存,这些内存直接受操作系统管理(而不是JVM)。Spark能够以二进制的形式序列化数据(不包括结构)到off-heap中, 当要操作数据时, 就直接操作off-heap内存. 由于Spark理解schema, 所以知道该如何操作。
    	相比之下:RDD是分布式Java对象集合,DtataFrame是分布式Row对象集合,DataFrame除了提供比RDD更丰富的算子以外,更重要的特点是提升执行效率,减少数据读取一级执行计划优化。
    	有了DataFrame这个高一层的抽象后,我们处理数据更加简单了,甚至可以用SQL来处理数据了,对开发者来说,易用性有了很大的提升。
    	不仅如此,通过DataFrame API或SQL处理数据,会自动经过Spark 优化器(Catalyst)的优化,即使你写的程序或SQL不高效,也可以运行的很快。
    

    6DataFrame与RDD的优缺点

    • RDD优缺点
    优点:
    	1.编译时类型安全:编译时能检查类型错误。
    	2.面向对象编程风格:直接通过对象调用的方法的形式来操作数据
    缺点:
    	1.序列化和反序列化性能开销:无论是集群之间通信,还是IO操作都需要对对象的结构和数据进行序列化和反序列化。
    	2.GC性能开销:频繁的创建和销毁对象, 势必会增加GC
    
    • DataFrame优缺点
    通过引入schema和off-heap(不在堆里面的内存,指的是除了不在堆的内存,使用操作系统上的内存),解决了RDD的缺点, Spark通过schame就能够读懂数据, 因此在通信和IO时就只需要序列化和反序列化数据, 而结构的部分就可以省略了;通过off-heap引入,可以快速的操作数据,避免大量的GC。但是却丢了RDD的优点,DataFrame不是类型安全的, API也不是面向对象风格的。
    

    7.DataFrame的创建

    • 进入spark-shell
    spark-shell --master spark://linux01:7077 --executor-memory 1g --total-executor-cores 1
    
    • 代码
    # 加载数据
    scala> val rdd1=sc.textFile("/u.txt").map(x=>x.split(" "))
    # Person是样例类
    scala> case class Person(name:String,age:Int)
    # 把rdd与样例类进行关联
    scala> val personRDD = rdd1.map(x=>Person(x(0),x(1).toInt))
    # 转换DataFrame
    scala> val personDF = personRDD.toDF
    # 打印schema元数据信息
    scala> personDF.printSchema
    root
     |-- name: string (nullable = true)
     |-- age: integer (nullable = false)
    # 查看数据
    scala> personDF.show
    +------+---+
    |  name|age|
    +------+---+
    | James| 80|
    | Curry| 70|
    |  Dunk| 50|
    |  Wade| 70|
    |  Bosh| 65|
    |  Kebe| 80|
    |Junkai|100|
    +------+---+
    
    # 直接从文件读取文本文件数据
    scala> val df = spark.read.text("/u.txt")
    scala> df.show
    +----------+
    |     value|
    +----------+
    |  James 80|
    |  Curry 70|
    |   Dunk 50|
    |   Wade 70|
    |   Bosh 65|
    |   Kebe 80|
    |Junkai 100|
    +----------+
    # 读取json文件
    val peopleDF = spark.read.json("/people.json")
    # 读取parquet类型压缩文件
    val usersDF = spark.read.parquet("/users.parquet")
    

    8.DataFrame常用操作

    8.1DSL风格语法

    • SparkSQL中DataFrame自身提供一套自己的Api,可以去使用这套Api来做相应处理。

    • 代码示例:

    # 准备数据
    person.txt
    	1 James 100
        2 Jordan 95
        3 Kebe 94
        4 Dunk 92
        5 Curry 90
        6 Wade 89
        7 Bosh 88
        8 Lin 60
    scala> val rdd1=sc.textFile("/person.txt").map(x=>x.split(" "))
    scala> case class Person(id:String,name:String,age:Int)
    scala> val personRDD = rdd1.map(x=>Person(x(0),x(1),x(2).toInt))
    scala> val personDF = personRDD.toDF
    # 当前表结构
    scala> personDF.show
    +---+------+---+
    | id|  name|age|
    +---+------+---+
    |  1| James|100|
    |  2|Jordan| 95|
    |  3|  Kebe| 94|
    |  4|  Dunk| 92|
    |  5| Curry| 90|
    |  6|  Wade| 89|
    |  7|  Bosh| 88|
    |  8|   Lin| 60|
    +---+------+---+
    
    # 1查询name列
    scala> personDF.select(col("name")).show
    # 2查询name,age,id
    scala> personDF.select("name","age","id").show
    # 3将age加1
    scala> personDF.select($"name",$"age"+1,$"id").show
    # 4.将age大于80用户查出
    personDF.filter($"age">80).show
    # 5.将age大于80用户数量
    personDF.filter($"age">80).count
    # 6. 分组统计不同段的数量
    personDF.groupBy("age").count.show
    # 7.降序排序
    personDF.groupBy("age").count.sort($"count".desc).show
    # 8.分组后as别名
    personDF.groupBy("age").agg(count("age").as("cnt")).show
    
    

    8.2 SQL风格语法

    • 可以把DataFrame注册成一张表,然后通过sparkSession.sql(sql语句)查询
    # 9.创建视图
    personDF.createTempView("person")
    # 10.根据视图查询
    spark.sql("select * from person").show
    spark.sql("select name,age from person where age>80").show
    spark.sql("select count(*) from person group by age").show
    spark.sql("select * from person order by age desc").show
    

    9.DateSet

    • DataSet 是分布式的数据集合,DataSet提供了强类型支持,也是在RDD每行数据加了类型约束,DataSet时在Spark1.6中添加的新的接口,它集中了RDD的优点以及使用了Spark SQL优化执行引擎。DataSet可以通过JVM的对象进行构建,可以用函数式的转换(map/flatmap/filter)进行多种操作。

    • DateFrame ,DateSet,RDD区别

      • 假设RDD中的两行数据长这样:

      • 那么DataFrame中的数据长这样:

      • 那么Dataset中的数据长这样:

    • DataSet包含了DataFrame的功能,Spark2.0中两者统一,DataFrame表示为DataSet[Row],即DataSet的子集。

      1.DataSet可以在编译时检查类型
      2.并且是面向对象的编程接口
      
    • 相比DataFrame,Dataset提供了编译时类型检查,对于分布式程序来讲,提交一次作业太费劲了(要编译、打包、上传运行),到提交到集群运行时才发现错误,这会浪费大量的时间,这也是引入Dataset的一个重要原因。

    • DataFrame和DataSet 互相转换

      • 把DataFrame转换成DataSet

        val ds = df.as[String]
        
      • 把DataSet转换成DataFrame

        val df = ds.toDF
        
      • 通过DataFrame 和 DataSet 获取RDD

        DataFrame 转 RDD
        	val rdd1 = df.rdd      df是DataFrame
        DataSet 转 RDD
        	val rdd2 = ds.rdd      ds是DataSet
        
    • 构建DataSet方式

      # 1.通过sparkSession调用createDataset创建DataSet
      val ds1 = spark.createDataset(1 to 10)
      val ds2 = spark.createDataset(sc.textFile("/person.txt"))
      # 2.scala集合和rdd调用toDS方法
      sc.textFile("/person.txt").toDS
      List(1,2,3,4,5).toDS.show
      # 3.把一个DataFrame转换成DataSet
      val dataSet = dataFrame.as[强类型]
      # 4.通过DataSet转换成一个新的DataSet
      List(1,2,3,4,5).toDS.map(x => x*10).show
      

    10idea使用SparkSQL

    • RDD到DataFrame: rdd2.toDF
    • DataFrame 到 DataSet: df.as[Person]

    idea使用SparkSQL

    pom.xml添加如下
    <dependency>
    	<groupId>org.apache.spark</groupId>
    	<artifactId>spark-sql_2.11</artifactId>
    	<version>2.2.0</version>
    </dependency>
    

    10.1利用反射机制定义样例类,后期直接映射DataFrame的schema信息

    • 在idea把RDD转换DataFrame可以利用反射机制(定义一个样例类,后期直接映射成DataFrame的schema信息)
    • 代码示例:
    package cn.doit.sparksql
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
    
    case class Person(id:String,name:String,age:Int)
    
    object CaseClassSchema {
      def main(args: Array[String]): Unit = {
        // 1.构建 SparkSession 对象
        val spark: SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate()
        // 2.获取sparkContext对象
        val sc: SparkContext = spark.sparkContext
        // 3.设置级别
        sc.setLogLevel("warn")
        // 4.读取文件数据
        val data:RDD[Array[String]] = sc.textFile("E:\person.txt").map(x => x.split(" "))
        // 5.将RDD与样例类进行关联
        val personRDD:RDD[Person] = data.map(x => Person(x(0), x(1), x(2).toInt))
        // 6. 将RDD转换DataFrame
          // 需要手动导入隐式转换
          import spark.implicits._
        val personDF:DataFrame = personRDD.toDF
        // 7. 对dataFrame进行相应语法操作
          // todo: -----DSL风格语法---start
          // 打印schema
          personDF.printSchema()
          // 展示数据
          personDF.show(20,truncate = true)
          // 获取第一行数据
          val first: Row = personDF.first()
          println("first", first)
          // head 取出前三行
          val top3: Array[Row] = personDF.head(3)
          top3.foreach(println)
          // 获取name字段 三种方式
          personDF.select("name").show()
          personDF.select($"name").show()
          personDF.select(new Column("name")).show()
          // 实现age + 1
          personDF.select($"name",$"age"+1).show()
          // 按照age过滤
          personDF.filter($"age" > 30).show()
          val count:Long = personDF.filter($"age" > 30).count()
          println("count:" + count)
          // 分组
          personDF.groupBy("age").count().show()
          // 展示每一行
          personDF.foreach(row => println(row))
          //  使用foreach获取每一个row 中的 name字段
          personDF.foreach(row => println(row.getAs[String]("name")))
          personDF.foreach(row => println(row.get(1)))
          personDF.foreach(row => println(row.getString(1)))
          personDF.foreach(row => println(row.getAs[String](1)))
          // todo: -----DSL风格语法----end
          // todo: -----SQL风格语法----start
          // 创建视图
          personDF.createTempView("person")
          spark.sql("select * from person").show()
          spark.sql("select name from person where age > 30").show()
          // todo: -----SQL风格语法----end
        // 关闭sparksession
        spark.stop()
    
      }
    }
    

    10.2通过StructType 直接指定Schema

    package cn.doit.sparksql
    
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    import org.apache.spark.sql.{DataFrame, Row, SparkSession}
    
    // todo: 通过动态指定dataFrame对应schema信息,将RDD转换成dataFrame
    object StructTypeSchema {
      def main(args: Array[String]): Unit = {
        // 1.构建 SparkSession 对象
        val spark: SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate()
        // 2.获取sparkContext对象
        val sc: SparkContext = spark.sparkContext
        // 3.设置级别
        sc.setLogLevel("warn")
        // 4.读取文件数据
        val data:RDD[Array[String]] = sc.textFile("E:\person.txt").map(x => x.split(" "))
        // 5.将RDD与Row对象进行关联, Row可以接受任意数量的变量
        val rowRDD:RDD[Row] = data.map(x => Row(x(0), x(1), x(2).toInt))
        // 6.指定dataFrame的schema信息
        // StructType 里面是一个集合,必须要和Row对象保持一致
        val schema = StructType(
          StructField("id", StringType)::
          StructField("name", StringType)::
          StructField("age",IntegerType)::Nil
        )
        val dataFrame:DataFrame = spark.createDataFrame(rowRDD,schema)
        dataFrame.printSchema()
        dataFrame.show()
        dataFrame.createTempView("person")
        spark.sql("select * from person").show()
        spark.stop()
        sc.stop()
      }
    }
    

    11.spark操作HiveSql

    • pom添加
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>
    
    • 代码
    package cn.doit.sparksql
    
    import org.apache.spark.sql.SparkSession
    
    object HiveSupport {
      def main(args: Array[String]): Unit = {
        // 1.构建 SparkSession 对象
        val spark: SparkSession = SparkSession.builder()
          .appName("HiveSupport")
          .master("local[2]")
          .enableHiveSupport()//开启Hive支持
          .getOrCreate()
        // 2.直接使用SparkSession去操作hivesql语句
        spark.sql("create table people(id string, name string, age int) row format delimited fields terminated by ','")
        // 加载数据到hive表中
        spark.sql("load data local inpath './data/users' into table people")
        spark.sql("select * from people").show()
        spark.stop()
      }
    }
    
  • 相关阅读:
    (Java实现) 洛谷 P1106 删数问题
    (Java实现) 洛谷 P1603 斯诺登的密码
    (Java实现) 洛谷 P1036 选数
    (Java实现) 洛谷 P1012 拼数
    (Java实现) 洛谷 P1028 数的计算
    (Java实现) 洛谷 P1553 数字反转(升级版)
    (Java实现) 洛谷 P1051 谁拿了最多奖学金
    (Java实现) 洛谷 P1051 谁拿了最多奖学金
    (Java实现) 洛谷 P1106 删数问题
    目测ZIP的压缩率
  • 原文地址:https://www.cnblogs.com/xujunkai/p/14974245.html
Copyright © 2011-2022 走看看