zoukankan      html  css  js  c++  java
  • Spark08-SparkSQL之DataFrame

    一、DataFrame

    1、DataFrame是什么

    DataFrame 是SparkSQL中一个表示关系型数据库中表的函数式抽象,其作用是让Spark 处理大规模结构化数据的时候更加容易.一般DataFrame可以处理结构化的数据,或者是半结构化的数据,因为这两类数据中都可以获取到Schema信息。也就是说DataFrame中有 Schema 信息,可以像操作表一样操作DataFrame .

    DataFrame 由两部分构成, 一是 row 的集合, 每个 row 对象表示一个行, 二是描述 DataFrame 结构的 Schema.

    DataFrame 支持 SQL 中常见的操作, 例如: selectfilterjoingroupsortjoin 等。

    class DataFrame {
      @Test
      def dataFrame(): Unit ={
        //1. 创建SparkSession
        val spark = SparkSession.builder() // 更简单的创建 Session 方式
          .appName("dataFrame1").master("local[6]").getOrCreate()
    
        // 2. 创建 DataFrame
        import spark.implicits._
        val dataFrame: sql.DataFrame = Seq(Person("zhangsan", 14), Person("lisi", 16), Person("wangwu", 25)).toDF()
    
        // 3. DataFrame 的一些操作
        dataFrame.where('age > 15)
          .select('name).show()
        }
    
    }
    case class Person(name:String, age:Int)

    2、数据处理

    —般处理数据都差不多是ETL三个步骤:E->抽取,T->处理、转换,L->装载,落地

    Spark代码编写的套路:

      1、创建 DataFrame(或者Dataset、 RDD),制造或者读取数据

      2、通过 DataFrame(或者Dataset、 RDD)的API来进行数据处理

      3、通过 DataFrame(或者Dataset、 RDD)进行数据落地

    DataFrame 如何创建?

    有三种方式,toDF,createDataFrame,DataFrameReader。代码如下。

    class DataFrame {/**
       * 介绍创建 DataFrame 的三种方法
       */
      @Test
      def createDF(): Unit ={
        //1. 创建SparkSession
        val spark = SparkSession.builder() // 更简单的创建 Session 方式
          .appName("dataFrame1").master("local[6]").getOrCreate()
    
        // 2. 创建 DataFrame
        import spark.implicits._
    
        val personList = Seq(Person("zhangsan", 14), Person("lisi", 16), Person("wangwu", 25))
        // (1) toDF
        val df1 = personList.toDF() // 作用于普通序列
        val df2 = spark.sparkContext.parallelize(personList).toDF() // 作用于 RDD 对象
    
        // (2) createDataFrame
        val df3 = spark.createDataFrame(personList)
    
        // (3) read
        val df4 = spark.read.csv("../dataset/BeijingPM20100101_20151231_noheader.csv")
    
        // 运行展示一下read csv 的结果
        df4.show()
      }
    }

    DataFrame支持什么操作?

    我们通过一个案例入门DataFrame 的操作,需求是查看北京雾霾数据集上每个月的统计数量。

    方法一:命令式

      @Test
      def caseDF(): Unit ={
        // 1. 创建 SparkSession
        val spark = SparkSession.builder().master("local[6]").appName("Case_Beijing").getOrCreate()
    
        import spark.implicits._
    
        // 2. 读取数据集
        val sourceDF = spark.read
          .option("header", value = true) // 设置头信息,默认数据集中的第一行
          .csv("../dataset/BeijingPM20100101_20151231.csv")
        sourceDF.show()
    
        // 3. 处理数据集(与处理 RDD 时的思路完全不同)
        // 3.1 选择列
        sourceDF.select('year, 'month, 'PM_Dongsi)
    
        // 3.2 过滤到空(NA)的PM记录
          .where('PM_Dongsi =!= "NA")
    
        // 3.3 分组 select year, month, count(PM_Dongsi) from ... where PM_Dongsi != 'NA' group by year, month
          .groupBy('year, 'month)
    
        // 3.4 聚合
          .count()
    
          // 得出结论
          .show()
    
        spark.stop()
      }

    方法二:直接使用sql

    @Test
      def case_sql(): Unit ={
        // 1. 创建 SparkSession
        val spark = SparkSession.builder().master("local[6]").appName("Case_Beijing").getOrCreate()
    
        import spark.implicits._
    
        // 2. 读取数据集
        val sourceDF = spark.read
          .option("header", value = true) // 设置头信息,默认数据集中的第一行
          .csv("../dataset/BeijingPM20100101_20151231.csv")
        sourceDF.show()
    
        // 3. 直接使用 sql 进行查询
        // 3.1 将 DataFrame 注册为临表
        sourceDF.createOrReplaceTempView("pm")
    
        // 3.2 执行查询
        val resultDF = spark.sql("select year, month, count(PM_Dongsi) from pm where PM_Dongsi != 'NA' group by year, month")
    
        resultDF.show()
        spark.stop()
      }

    3、总结

    1. DataFrame 是一个类似于关系型数据库表的函数式组件

    2. DataFrame 一般处理结构化数据和半结构化数据

    3. DataFrame 具有数据对象的 Schema 信息

    4. 可以使用命令式的 API 操作 DataFrame, 同时也可以使用 SQL 操作 DataFrame

    5. DataFrame 可以由一个已经存在的集合直接创建, 也可以读取外部的数据源来创建

    二、DataFrame 与 Dataset 的异同

    1、DataFrame 就是 Dataset

    根据前面的内容,可以得到如下信息

    1. Dataset中可以使用列来访问数据,DataFrame也可以

    2. Dataset 的执行是优化的,DataFrame 也是

    3.Dataset 具有命令式 API ,同时也可以使用SQL来访问, DataFrame也可以使用这两种不同的方式访问

    那么为什么两个相同的东西会同时出现在 Spark SQL 中呢?我们看这二者在Scala源码中的表示。

     

    确实,这两个组件是同一个东西, DataFrame 是 Dataset 的一种特殊情况,也就是说 DataFrame 是 Dataset[Row] 的别名

    2、DataFrame 与 Dataset 表达的语义不同

    第一点: DataFrame 表达的含义是一个支持函数式操作的 , 而 Dataset 表达是是一个类似 RDD 的东西, Dataset 可以处理任何对象

    第二点: DataFrame 中所存放的是 Row 对象, 而 Dataset 中可以存放任何类型的对象
        val spark = SparkSession.builder().appName("df1").master("local[6]").getOrCreate()
        import spark.implicits._
        val df: sql.DataFrame = Seq(Person("zhangsan", 14), Person("lisi", 16), Person("wangwu", 25)).toDF()
        val ds: sql.Dataset[Person] = Seq(Person("zhangsan", 14), Person("lisi", 16), Person("wangwu", 25)).toDS()
    可知,DataFrame是弱类型,Dataset是强类型。
    DataFrame就是Dataset [ row ] ,Dataset的泛型可以是任意类型。
    第三点: DataFrame 的操作方式和 Dataset 是一样的, 但是对于强类型操作而言, 它们处理的类型不同
      @Test
      def dataFrame2(): Unit ={
        val spark = SparkSession.builder().appName("df1").master("local[6]").getOrCreate()
        val personList = Seq(Person("zhangsan", 14), Person("lisi", 16), Person("wangwu", 25))
    
        import spark.implicits._
    
        val df: sql.DataFrame = personList.toDF()
        // 首先通过最后的解码器,设置程序按照 df 的 schema结构,然后row对象 => Row(row.get(0), row.getAs[Int](1) * 2)
        // 即第一位不变,第二位强转为Int然后 * 2, 这里取出对象中的元素用 get 方法, 强转用 getAs
        df.map( (row:Row) => Row(row.get(0), row.getAs[Int](1) * 2) )(RowEncoder.apply(df.schema))
          .show()
    
        val ds = personList.toDS()
        // 这里取出对象元素用 person.name, 即 对象.属性名
        ds.map( (person: Person) => Person(person.name, person.age * 2) )
          .show()
      }
    第四点: DataFrame 只能做到运行时类型检查, Dataset 能做到编译和运行时都有类型检查

     3、Row是什么?

    Row 对象表示的是一个

    Row 的操作类似于 Scala 中的 Map 数据类型

      @Test
      def row(): Unit ={
        // 1、 row 如何创建,它是什么
        val p = Person("zhangsan",19)
        val row = Row("zhangsan",19)
        // 两者区别,对于p,列是有名字的,name, age
        // 对于 row, 列是没有名字的,若想有列名,必须配合schema
    
        // 2、如何从  row 中获取数据
        row.getString(0)
        row.getInt(1)
    
        // Row 也是样例类
        row match {
          case Row(name, age) => println(name, age)
        }
      }

     4、DataFrame 和 Dataset 之间的相互转换

    val spark: SparkSession = new sql.SparkSession.Builder()
      .appName("hello")
      .master("local[6]")
      .getOrCreate()
    
    import spark.implicits._
    
    val df: DataFrame = Seq(People("zhangsan", 15), People("lisi", 15)).toDF()
    val ds_fdf: Dataset[People] = df.as[People]
    
    val ds: Dataset[People] = Seq(People("zhangsan", 15), People("lisi", 15)).toDS()
    val df_fds: DataFrame = ds.toDF()

    5、总结

    1. DataFrame 就是 Dataset, 他们的方式是一样的, 也都支持 API 和 SQL 两种操作方式

    2. DataFrame 只能通过表达式的形式, 或者列的形式来访问数据, 只有 Dataset 支持针对于整个对象的操作

    3. DataFrame 中的数据表示为 Row, 是一个行的概念

    对Spark SQL 的两种API:Dataset 和 DataFrame 的一些理解

    DataFrame 和 Dataset 就是 SparkSQL 最重要的两个API,同时,DataFrame 就是放置了 Row 对象 的 Dataset,DataFrame 和 Dataset 同时支持两个编程范式,我们可以在DataFrame 中使用 .where .filter .map .flatMap等,还支持 SQL语句。也就是说我们可以使用 spark.sql(...)查询得到一个DataFrame对象,然后再根据 .flatMap() ... Map()等,我们可以先使用sql语句,在使用命令式的api来进行操作,非常灵活。所以集成了两大编程范式的优点。

  • 相关阅读:
    redis乐观锁
    redis
    解决创建Redis容器没有conf配置文件
    redis缓存配置
    Docker架构
    Flask获取数据的一些方法
    Nginx正向代理、反向代理与负载均衡
    Sanic
    Dockerfile详解
    Centos7上安装docker
  • 原文地址:https://www.cnblogs.com/dongao/p/14400092.html
Copyright © 2011-2022 走看看