zoukankan      html  css  js  c++  java
  • sparksql基础知识一

    目标

    1. 掌握sparksql底层原理

    2. 掌握sparksql中DataFrame和DataSet的数据结构和使用方式

    3. 掌握通过sparksql开发应用程序

    要点

    1.sparksql概述

    1.1 sparksql的前世今生

    • Shark是专门针对于spark的构建大规模数据仓库系统的一个框架

    • Shark与Hive兼容、同时也依赖于Spark版本

    • Hivesql底层把sql解析成了mapreduce程序,Shark是把sql语句解析成了Spark任务

    • 随着性能优化的上限,以及集成SQL的一些复杂的分析功能,发现Hive的MapReduce思想限制了Shark的发展。

    • 最后Databricks公司终止对Shark的开发

      • 决定单独开发一个框架,不在依赖hive,把重点转移到了==sparksql==这个框架上。

    1.2 什么是sparksql

    • Spark SQL is Apache Spark's module for working with structured data.

    • SparkSQL是apache Spark用来处理结构化数据的一个模块

    2. sparksql的四大特性

    • 1、易整合

      • 将SQL查询与Spark程序无缝混合

      • 可以使用不同的语言进行代码开发

        • java

        • scala

        • python

        • R

    • 2、统一的数据源访问

      • 以相同的方式连接到任何数据源

        • sparksql后期可以采用一种统一的方式去对接任意的外部数据源

          SparkSession.read.该数据类型的方法名(该格式数据的路径)
    • 3、兼容hive

      • sparksql可以支持hivesql这种语法 sparksql兼容hivesql

    • 4、支持标准的数据库连接

      • sparksql支持标准的数据库连接JDBC或者ODBC

    3. DataFrame概述

    3.1 DataFrame发展

    • DataFrame前身是schemaRDD,这个schemaRDD是直接继承自RDD,它是RDD的一个实现类

    • 在spark1.3.0之后把schemaRDD改名为DataFrame,它不在继承自RDD,而是自己实现RDD上的一些功能

    • 也可以把dataFrame转换成一个rdd,调用rdd这个方法

      • 例如 val rdd1=dataFrame.rdd

    3.2 DataFrame是什么

    • 在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库的二维表格

    • DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型,但底层做了更多的优化

    • DataFrame可以从很多数据源构建

      • 比如:已经存在的RDD、结构化文件、外部数据库、Hive表。

    DataFrame = RDD + schema元信息(对数据的结构描述信息)
    DataFrame可以看成是一张mysql表。
    表中有数据,同时表中还有字段的名称和类型,这里的字段的名称和类型就可以理解成Schema信息

    3.3 DataFrame和RDD的优缺点

    • 1、RDD

      • 优点

        • 1、编译时类型安全

          • 开发会进行类型检查,在编译的时候及时发现错误

        • 2、具有面向对象编程的风格

      • 缺点

        • 1、构建大量的java对象占用了大量heap堆空间,导致频繁的GC

          由于数据集RDD它的数据量比较大,后期都需要存储在heap堆中,这里有heap堆中的内存空间有限,出现频繁的垃圾回收(GC),程序在进行垃圾回收的过程中,所有的任务都是暂停。影响程序执行的效率
        • 2、数据的序列化和反序列性能开销很大

            在分布式程序中,对象(对象的内容和结构)是先进行序列化,发送到其他服务器,进行大量的网络传输,然后接受到这些序列化的数据之后,再进行反序列化来恢复该对象
    • 2、DataFrame

      • DataFrame引入了schema元信息和off-heap(堆外)

      • 优点

        • 1、DataFrame引入off-heap,大量的对象构建直接使用操作系统层面上的内存,不在使用heap堆中的内存,这样一来heap堆中的内存空间就比较充足,不会导致频繁GC,程序的运行效率比较高,它是解决了RDD构建大量的java对象占用了大量heap堆空间,导致频繁的GC这个缺点。

        • 2、DataFrame引入了schema元信息---就是数据结构的描述信息,后期spark程序中的大量对象在进行网络传输的时候,只需要把数据的内容本身进行序列化就可以,数据结构信息可以省略掉。这样一来数据网络传输的数据量是有所减少,数据的序列化和反序列性能开销就不是很大了。它是解决了RDD数据的序列化和反序列性能开销很大这个缺点

      • 缺点

        • DataFrame引入了schema元信息和off-heap(堆外)它是分别解决了RDD的缺点,同时它也丢失了RDD的优点

          • 1、编译时类型不安全

            • 编译时不会进行类型的检查,这里也就意味着前期是无法在编译的时候发现错误,只有在运行的时候才会发现

          • 2、不在具有面向对象编程的风格

    4. 读取文件构建DataFrame

    4.1 读取文本文件创建DataFrame

    • 第一种方式

    //加载数据
    val rdd1=sc.textFile("/person.txt").map(x=>x.split(" "))
    //定义一个样例类
    case class Person(id:String,name:String,age:Int)
    //把rdd与样例类进行关联
    val personRDD=rdd1.map(x=>Person(x(0),x(1),x(2).toInt))
    //把rdd转换成DataFrame
    val personDF=personRDD.toDF
    ​
    //打印schema信息
    personDF.printSchema
    ​
    //展示数据
    personDF.show
    • 第二种方式

    val personDF=spark.read.text("/person.txt")
    //org.apache.spark.sql.DataFrame = [value: string]
    //打印schema信息
    personDF.printSchema
    ​
    //展示数据
    personDF.show

    4.2 读取json文件创建DataFrame

    val peopleDF=spark.read.json("/people.json")
    //打印schema信息
    peopleDF.printSchema
    ​
    //展示数据
    peopleDF.show

    4.3 读取parquet文件创建DataFrame

    val usersDF=spark.read.parquet("/users.parquet")
    //打印schema信息
    usersDF.printSchema
    ​
    //展示数据
    usersDF.show

    5. DataFrame常用操作

    5.1 DSL风格语法

    • 就是sparksql中的DataFrame自身提供了一套自己的Api,可以去使用这套api来做相应的处理

    //加载数据
    val rdd1=sc.textFile("/person.txt").map(x=>x.split(" "))
    //定义一个样例类
    case class Person(id:String,name:String,age:Int)
    //把rdd与样例类进行关联
    val personRDD=rdd1.map(x=>Person(x(0),x(1),x(2).toInt))
    //把rdd转换成DataFrame
    val personDF=personRDD.toDF
    ​
    //打印schema信息
    personDF.printSchema
    ​
    //展示数据
    personDF.show
    ​
    //查询指定的字段
    personDF.select("name").show
    personDF.select($"name").show
    personDF.select(col("name").show
                    
    //实现age+1
     personDF.select($"name",$"age",$"age"+1).show   
    ​
    //实现age大于30过滤
     personDF.filter($"age" > 30).show
      
     //按照age分组统计次数
     personDF.groupBy("age").count.show 
       
    //按照age分组统计次数降序
     personDF.groupBy("age").count().sort($"count".desc)show   

    5.2 SQL风格语法

    • 可以把DataFrame注册成一张表,然后通过sparkSession.sql(sql语句)操作

    //DataFrame注册成表
    personDF.createTempView("person")
    ​
    //使用SparkSession调用sql方法统计查询
    spark.sql("select * from person").show
    spark.sql("select name from person").show
    spark.sql("select name,age from person").show
    spark.sql("select * from person where age >30").show
    spark.sql("select count(*) from person where age >30").show
    spark.sql("select age,count(*) from person group by age").show
    spark.sql("select age,count(*) as count from person group by age").show
    spark.sql("select * from person order by age desc").show

    6. DataSet概述

    6.1 DataSet是什么

    • DataSet是分布式的数据集合,Dataset提供了强类型支持,也是在RDD的每行数据加了类型约束。

    • DataSet是在Spark1.6中添加的新的接口。它集中了RDD的优点(强类型和可以用强大lambda函数)以及使用了Spark SQL优化的执行引擎。

    6.2 RDD、DataFrame、DataSet的区别

    • 假设RDD中的两行数据长这样

    • 那么DataFrame中的数据长这样

    • Dataset中的数据长这

      • 或者长这样(每行数据是个Object)

    DataSet包含了DataFrame的功能,Spark2.0中两者统一,DataFrame表示为DataSet[Row],即DataSet的子集。
    (1)DataSet可以在编译时检查类型
    (2)并且是面向对象的编程接口

    6.3 DataFrame与DataSet互相转换

    • 1、把一个DataFrame转换成DataSet

      • val dataSet=dataFrame.as[强类型]

    • 2、把一个DataSet转换成DataFrame

      • val dataFrame=dataSet.toDF

    • 补充说明

      • 可以从dataFrame和dataSet获取得到rdd

        • val rdd1=dataFrame.rdd
          val rdd2=dataSet.rdd

    6.4 构建DataSet

    • 1、 通过sparkSession调用createDataset方法

      val ds=spark.createDataset(1 to 10) //scala集合
      val ds=spark.createDataset(sc.textFile("/person.txt"))  //rdd
    • 2、使用scala集合和rdd调用toDS方法

      sc.textFile("/person.txt").toDS
      List(1,2,3,4,5).toDS
    • 3、把一个DataFrame转换成DataSet

      val dataSet=dataFrame.as[强类型]
    • 4、通过一个DataSet转换生成一个新的DataSet

      List(1,2,3,4,5).toDS.map(x=>x*10)

    7. 通过IDEA开发程序实现把RDD转换DataFrame

    • 添加依赖

            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>2.3.3</version>
            </dependency>

    7.1 利用反射机制

    • 定义一个样例类,后期直接映射成DataFrame的schema信息

    • 代码开发

    package com.kaikeba.sql
    ​
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
    ​
    //todo:利用反射机制实现把rdd转成dataFrame
    case class Person(id:String,name:String,age:Int)
    ​
    object CaseClassSchema {
      def main(args: Array[String]): Unit = {
    ​
        //1、构建SparkSession对象
        val spark: SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate()
    ​
        //2、获取sparkContext对象
        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("warn")
    ​
        //3、读取文件数据
        val data: RDD[Array[String]] = sc.textFile("E:\person.txt").map(x=>x.split(" "))
    ​
        //4、定义一个样例类
    //5、将rdd与样例类进行关联
        val personRDD: RDD[Person] = data.map(x=>Person(x(0),x(1),x(2).toInt))
    ​
        //6、将rdd转换成dataFrame
        //需要手动导入隐式转换
        import spark.implicits._
        val personDF: DataFrame = personRDD.toDF
    ​
        //7、对dataFrame进行相应的语法操作
        //todo:----------------- DSL风格语法-----------------start
        //打印schema
        personDF.printSchema()
        //展示数据
        personDF.show()
    ​
        //获取第一行数据
        val first: Row = personDF.first()
        println("first:"+first)
    ​
        //取出前3位数据
        val top3: Array[Row] = personDF.head(3)
        top3.foreach(println)
    ​
        //获取name字段
        personDF.select("name").show()
        personDF.select($"name").show()
        personDF.select(new Column("name")).show()
        personDF.select("name","age").show()
    ​
        //实现age +1
        personDF.select($"name",$"age",$"age"+1).show()
    ​
        //按照age过滤
        personDF.filter($"age" >30).show()
        val count: Long = personDF.filter($"age" >30).count()
        println("count:"+count)
    ​
        //分组
        personDF.groupBy("age").count().show()
    ​
        personDF.show()
        personDF.foreach(row => println(row))
    ​
        //使用foreach获取每一个row对象中的name字段
        personDF.foreach(row =>println(row.getAs[String]("name")))
        personDF.foreach(row =>println(row.get(1)))
        personDF.foreach(row =>println(row.getString(1)))
        personDF.foreach(row =>println(row.getAs[String](1)))
        //todo:----------------- DSL风格语法--------------------end
    ​
    ​
        //todo:----------------- SQL风格语法-----------------start
        personDF.createTempView("person")
        //使用SparkSession调用sql方法统计查询
        spark.sql("select * from person").show
        spark.sql("select name from person").show
        spark.sql("select name,age from person").show
        spark.sql("select * from person where age >30").show
        spark.sql("select count(*) from person where age >30").show
        spark.sql("select age,count(*) from person group by age").show
        spark.sql("select age,count(*) as count from person group by age").show
        spark.sql("select * from person order by age desc").show
        //todo:----------------- SQL风格语法----------------------end
    //关闭sparkSession对象
        spark.stop()
      }
    }

    7.2 通过StructType直接指定Schema

    • 代码开发

    package com.kaikeba.sql
    ​
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    import org.apache.spark.sql.{DataFrame, Row, SparkSession}
    ​
    //todo;通过动态指定dataFrame对应的schema信息将rdd转换成dataFrame
    object StructTypeSchema {
    ​
      def main(args: Array[String]): Unit = {
        //1、构建SparkSession对象
        val spark: SparkSession = SparkSession.builder().appName("StructTypeSchema").master("local[2]").getOrCreate()
    ​
        //2、获取sparkContext对象
        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("warn")
    ​
        //3、读取文件数据
        val data: RDD[Array[String]] = sc.textFile("E:\person.txt").map(x=>x.split(" "))
    ​
        //4、将rdd与Row对象进行关联
        val rowRDD: RDD[Row] = data.map(x=>Row(x(0),x(1),x(2).toInt))
    ​
        //5、指定dataFrame的schema信息   
        //这里指定的字段个数和类型必须要跟Row对象保持一致
        val schema=StructType(
          StructField("id",StringType)::
            StructField("name",StringType)::
            StructField("age",IntegerType)::Nil
        )
    ​
        val dataFrame: DataFrame = spark.createDataFrame(rowRDD,schema)
        dataFrame.printSchema()
        dataFrame.show()
    ​
        dataFrame.createTempView("user")
        spark.sql("select * from user").show()
    ​
    ​
        spark.stop()
    ​
      }
    ​
    }

    8、sparksql 操作hivesql

    • 添加依赖

            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.11</artifactId>
                <version>2.3.3</version>
            </dependency>
    • 代码开发

    package com.kaikeba.sql
    import org.apache.spark.sql.SparkSession
    ​
    ​
    //todo:利用sparksql操作hivesql
    object HiveSupport {
      def main(args: Array[String]): Unit = {
        //1、构建SparkSession对象
        val spark: SparkSession = SparkSession.builder()
          .appName("HiveSupport")
          .master("local[2]")
          .enableHiveSupport() //开启对hive的支持
          .getOrCreate()
        //2、直接使用sparkSession去操作hivesql语句
    //2.1 创建一张hive表
           spark.sql("create table people(id string,name string,age int) row format delimited fields terminated by ','")
    ​
          //2.2 加载数据到hive表中
           spark.sql("load data local inpath './data/kaikeba.txt' into table people ")
    ​
          //2.3 查询
          spark.sql("select * from people").show()
    ​
        spark.stop()
      }
    }
  • 相关阅读:
    使用jackson解析JSON数据
    ANT配置
    Android Webview 与JS交互
    使用ANT将Android打包成Jar包
    单例模式
    工厂模式
    nginx: [emerg] bind() to 0.0.0.0:80 failed (98: Address already in use) 错误解决
    项目开发中的文档管理结构模板
    高并发的成熟解决方案
    Yaf(Yet Another Framework)用户手册 yii框架手册
  • 原文地址:https://www.cnblogs.com/lojun/p/11852970.html
Copyright © 2011-2022 走看看