zoukankan      html  css  js  c++  java
  • Spark学习之SparkSQL

    SparkSQL 简介

    SparkSQL 是一个即支持 SQL 又支持命令式数据处理的工具

    SparkSQL 不只是一个 SQL 引擎, SparkSQL 也包含了一套对 结构化数据的命令式 API, 事实上, 所有 Spark 中常见的工具, 都是依赖和依照于 SparkSQLAPI 设计的

    发展历程

    发展历程

    解决的问题

    • Spark SQL 使用 Hive 解析 SQL 生成 AST 语法树, 将其后的逻辑计划生成, 优化, 物理计划都自己完成, 而不依赖 Hive
    • 执行计划和优化交给优化器 Catalyst
    • 内建了一套简单的 SQL 解析器, 可以不使用 HQL, 此外, 还引入和 DataFrame 这样的 DSL API, 完全可以不依赖任何 Hive 的组件
    • Shark 只能查询文件, Spark SQL 可以直接降查询作用于 RDD, 这一点是一个大进步

    适用场景

    定义 特点 举例
    结构化数据 有固定的 Schema 有预定义的 Schema 关系型数据库的表
    **半结构化数据 ** 没有固定的 Schema, 但是有结构 没有固定的 Schema, 有结构信息, 数据一般是自描述的 指一些有结构的文件格式, 例如 JSON
    非结构化数据 没有固定 Schema, 也没有结构 没有固定 Schema, 也没有结构 指文档图片之类的格式

    SparkSQL 主要用于处理 结构化数据

    SparkSQL 初体验

    命令式 API 的入门案例

    @Test
      def dsIntro(): Unit = {
        val spark  = new sql.SparkSession.Builder()
          .appName("ds_intro")
          .master("local[2]")
          .getOrCreate()
    
        import spark.implicits._
    
        var sourceRDD = spark.sparkContext.parallelize(Seq(Person("张三",18),Person("李四",10)))
    
        var personDS = sourceRDD.toDS()
        var resultDS = personDS.where('age>10)
          .where('age<20)
          .select('name)
          .as[String]
    
        resultDS.show()
     }
    
    SparkSQL 中有一个新的入口点, 叫做 SparkSession
    SparkSQL 中有一个新的类型叫做 Dataset
    SparkSQL 有能力直接通过字段名访问数据集, 说明 SparkSQL 的 API 中是携带 Schema 信息的
    

    SparkSession

    • SparkContext 作为 RDD 的创建者和入口, 其主要作用有如下两点

      • 创建 RDD, 主要是通过读取文件创建 RDD
      • 监控和调度任务, 包含了一系列组件, 例如 DAGScheduler, TaskSheduler
    • 为什么无法使用 SparkContext 作为 SparkSQL 的入口?

      • SparkContext 在读取文件的时候, 是不包含 Schema 信息的, 因为读取出来的是 RDD
      • SparkContext 在整合数据源如 Cassandra, JSON, Parquet 等的时候是不灵活的, 而 DataFrameDataset 一开始的设计目标就是要支持更多的数据源
      • SparkContext 的调度方式是直接调度 RDD, 但是一般情况下针对结构化数据的访问, 会先通过优化器优化一下

      所以 SparkContext 确实已经不适合作为 SparkSQL 的入口, 所以刚开始的时候 Spark 团队为 SparkSQL 设计了两个入口点, 一个是 SQLContext 对应 Spark 标准的 SQL 执行, 另外一个是 HiveContext 对应 HiveSQL 的执行和 Hive 的支持.

      Spark 2.0 的时候, 为了解决入口点不统一的问题, 创建了一个新的入口点 SparkSession, 作为整个 Spark 生态工具的统一入口点, 包括了 SQLContext, HiveContext, SparkContext 等组件的功能

    • 新的入口应该有什么特性?

      • 能够整合 SQLContext, HiveContext, SparkContext, StreamingContext 等不同的入口点
      • 为了支持更多的数据源, 应该完善读取和写入体系
      • 同时对于原来的入口点也不能放弃, 要向下兼容

    Catalyst 优化器

    RDD 和 SparkSQL 运行时的区别

    • RDD 的运行流程

      RDD运行流程大致运行步骤先将 RDD 解析为由 Stage 组成的 DAG, 后将 Stage 转为 Task 直接运行问题任务会按照代码所示运行, 依赖开发者的优化, 开发者的会在很大程度上影响运行效率解决办法创建一个组件, 帮助开发者修改和优化代码, 但是这在 RDD 上是无法实现的

    • 为什么 RDD 无法自我优化?

      RDD 没有 Schema 信息RDD 可以同时处理结构化和非结构化的数据

    • SparkSQL 提供了什么?

      结构RDD 不同, SparkSQLDatasetSQL 并不是直接生成计划交给集群执行, 而是经过了一个叫做 Catalyst 的优化器, 这个优化器能够自动帮助开发者优化代码也就是说, 在 SparkSQL 中, 开发者的代码即使不够优化, 也会被优化为相对较好的形式去执行为什么 SparkSQL 提供了这种能力?首先, SparkSQL 大部分情况用于处理结构化数据和半结构化数据, 所以 SparkSQL 可以获知数据的 Schema, 从而根据其 Schema 来进行优化

    Catalyst

    为了解决过多依赖 Hive 的问题, SparkSQL 使用了一个新的 SQL 优化器替代 Hive 中的优化器, 这个优化器就是 Catalyst, 整个 SparkSQL 的架构大致如下架构

    1. API 层简单的说就是 Spark 会通过一些 API 接受 SQL 语句
    2. 收到 SQL 语句以后, 将其交给 Catalyst, Catalyst 负责解析 SQL, 生成执行计划等
    3. Catalyst 的输出应该是 RDD 的执行计划
    4. 最终交由集群运行

    执行流程

    • Step 1 : 解析 SQL, 并且生成 AST (抽象语法树)

      解析

    • Step 2 : 在 AST 中加入元数据信息, 做这一步主要是为了一些优化, 例如 col = col 这样的条件, 下图是一个简略图, 便于理解

      加入元信息

      • score.id → id#1#Lscore.id 生成 id 为 1, 类型是 `Long``
      • ``score.math_score → math_score#2#Lscore.math_score生成id为 2, 类型为Long`
      • people.id → id#3#Lpeople.id生成id为 3, 类型为Long
      • ``people.age → age#4#Lpeople.age生成id为 4, 类型为Long`
    • Step 3 : 对已经加入元数据的 AST, 输入优化器, 进行优化, 从两种常见的优化开始, 简单介绍

      优化

      • 谓词下推 Predicate Pushdown, 将 Filter 这种可以减小数据集的操作下推, 放在 Scan 的位置, 这样可以减少操作时候的数据量谓词下推
      • 列值裁剪 Column Pruning, 在谓词下推后, people 表之上的操作只用到了 id 列, 所以可以把其它列裁剪掉, 这样可以减少处理的数据量, 从而优化处理速度
      • 还有其余很多优化点, 大概一共有一二百种, 随着 SparkSQL 的发展, 还会越来越多, 感兴趣的同学可以继续通过源码了解, 源码在 org.apache.spark.sql.catalyst.optimizer.Optimizer
    • Step 4 : 上面的过程生成的 AST 其实最终还没办法直接运行, 这个 AST 叫做 逻辑计划, 结束后, 需要生成 物理计划, 从而生成 RDD 来运行

      • 在生成物理计划的时候, 会经过成本模型对整棵树再次执行优化, 选择一个更好的计划
      • 在生成物理计划以后, 因为考虑到性能, 所以会使用代码生成, 在机器中运行

      可以使用 queryExecution 方法查看逻辑执行计划, 使用 explain 方法查看物理执行计划

    Catalyst 的主要运作原理是分为三步, 先对 SQL 或者 Dataset 的代码解析, 生成逻辑计划, 后对逻辑计划进行优化, 再生成物理计划, 最后生成代码到集群中以 RDD 的形式运行

    Dataset 的特点

    @Test
      def dataset1() :Unit = {
        //1. 创建SparkSession
        val spark = new sql.SparkSession.Builder()
          .master("local[2]")
          .appName(this.getClass.getName)
          .getOrCreate()
        //2. 导入隐式转换
        import spark.implicits._
        //3. 演示
        var sourceRDD = spark.sparkContext.parallelize(Seq(Person("张三",18),Person("李四",10)))
        val dataset: Dataset[Person] = sourceRDD.toDS()
    
        // Dataset支持强类型的API
        dataset.filter(item => item.age>10).show()
        // Dataset支持强类型的API
        dataset.filter('age>10).show()
        dataset.filter($"age">10).show()
        dataset.filter("age>10").show()
        spark.stop()
      }
    
    1. Dataset 是一个新的 Spark 组件, 其底层还是 RDD
    2. Dataset 提供了访问对象中某个特定字段的能力, 不用像 RDD 一样每次都要针对整个对象做操作
    3. DatasetRDD 不同, 如果想把 Dataset[T] 转为 RDD[T], 则需要对 Dataset 底层的 InternalRow 做转换, 是一个比价重量级的操作
    4. Dataset 是一个强类型, 并且类型安全的数据容器, 并且提供了结构化查询 API 和类似 RDD 一样的命令式 API
    5. ``Dataset具有RDD的方便, 同时也具有DataFrame的性能优势, 并且Dataset` 还是强类型的, 能做到类型安全.

    DataFrame 的作用和常见操作

    DataFrame是什么

    DataFrameSparkSQL 中一个表示关系型数据库中 的函数式抽象, 其作用是让 Spark 处理大规模结构化数据的时候更加容易. 一般 DataFrame 可以处理结构化的数据, 或者是半结构化的数据, 因为这两类数据中都可以获取到 Schema 信息. 也就是说 DataFrame 中有 Schema 信息, 可以像操作表一样操作 DataFrame.

    RDD与DataFrame结构图

    DataFrame 由两部分构成, 一是 row 的集合, 每个 row 对象表示一个行, 二是描述 DataFrame 结构的 Schema.

    DataFrame结构

    创建DataFrame

    通过隐式转换创建

    @Test
    def datafream1: Unit = {
      val spark =new SparkSession.Builder()
        .appName(this.getClass.getName)
        .master("local[2]")
        .getOrCreate()
        
      // 必须要导入隐式转换
      // 注意: spark 在此处不是包, 而是 SparkSession 对象
      import spark.implicits._
      val  list = Seq(Person("张三",3),Person("lisi",18))
      val  df = Seq(Person("张三",3),Person("lisi",18)).toDF()
      val df2 = spark.sparkContext.parallelize(list).toDF()
    }
    

    源码

    创建 DataFrame 的时候, 不仅可以包含样例类, 也可以只有普通数据类型, 后通过指定列名来创建

    val df2: DataFrame = Seq(("a", 1), ("b", 1)).toDF("word", "count")
    

    通过外部集合创建

    @Test
      def datafream2: Unit = {
        val spark =new SparkSession.Builder()
          .appName(this.getClass.getName)
          .master("local[2]")
          .getOrCreate()
    
        import spark.implicits._
        val df = spark.read
          .csv("dataset/BeijingPM20100101_20151231_noheader.csv")
      }
    

    其他方式

    val list = Seq(Person("张三",3),Person("lisi",18))
    val df = spark.createDataFrame(list)	
    

    注意:不仅仅可以从 csv 文件创建 DataFrame, 还可以从 Table, JSON, Parquet 等中创建 DataFrame

    常见操作

    DataFrame 支持 SQL 中常见的操作, 例如: select, filter, join, group, sort, join

    1. 首先可以打印 DataFrameSchema, 查看其中所包含的列, 以及列的类型

      val spark: SparkSession = new sql.SparkSession.Builder()
        .appName("hello")
        .master("local[6]")
        .getOrCreate()
      
      val df = spark.read
        .option("header", true)
        .csv("dataset/BeijingPM20100101_20151231.csv")
      
      df.printSchema()
      
    2. 对于大部分计算来说, 可能不会使用所有的列, 所以可以选择其中某些重要的列

      df.select('year, 'month, 'PM_Dongsi)
      
    3. 可以针对某些列进行分组, 后对每组数据通过函数做聚合

      df.select('year, 'month, 'PM_Dongsi)
        .where('PM_Dongsi =!= "Na")
        .groupBy('year, 'month)
        .count()
        .show()
      
    4. 使用SQL操作DataFrame

      //先将 DataFrame 注册为一张临时表
      df.createOrReplaceTempView("temp_table")
      spark.sql("select year,month,count(PM_Dongsi) from temp_table where PM_Dongsi !='NA' group by year,month").show()
      

    总结

    1. DataFrame 是一个类似于关系型数据库表的函数式组件
    2. DataFrame 一般处理结构化数据和半结构化数据
    3. DataFrame 具有数据对象的 Schema 信息
    4. 可以使用命令式的 API 操作 DataFrame, 同时也可以使用 SQL 操作 DataFrame
    5. DataFrame 可以由一个已经存在的集合直接创建, 也可以读取外部的数据源来创建

    Dataset 和 DataFrame 的异同

    DataFrame Dataset 所表达的语义不同

    1. 第一点: DataFrame 表达的含义是一个支持函数式操作的 , 而 Dataset 表达是是一个类似 RDD 的东西, Dataset 可以处理任何对象

    2. 第二点: DataFrame 中所存放的是 Row 对象, 而 Dataset 中可以存放任何类型的对象

      @Test
        def datafream4: Unit = {
          val spark =new SparkSession.Builder()
            .appName(this.getClass.getName)
            .master("local[2]")
            .getOrCreate()
      
          import spark.implicits._
      
          val  list = Seq(Person("张三",3),Person("lisi",18))
          val df: DataFrame = list.toDF()
          val ds: Dataset[Person] = list.toDS()
          spark.stop()
        }
      
      • DataFrame 就是 Dataset[Row]
      • Dataset 的范型可以是任意类型
    3. DataFrame 的操作方式和 Dataset 是一样的, 但是对于强类型操作而言, 它们处理的类型不同

      DataFrame 在进行强类型操作时候, 例如 map 算子, 其所处理的数据类型永远是 `Row

      df.map( (row: Row) => Row(row.get(0), row.getAs[Int](1) * 10) )(RowEncoder.apply(df.schema)).show()
      

      但是对于 Dataset 来讲, 其中是什么类型, 它就处理什么类型

      ds.map( (item: People) => People(item.name, item.age * 10) ).show()
      
    4. DataFrame 只能做到运行时类型检查, Dataset 能做到编译和运行时都有类型检查

      1. DataFrame 中存放的数据以 Row 表示, 一个 Row 代表一行数据, 这和关系型数据库类似
      2. DataFrame 在进行 map 等操作的时候, DataFrame 不能直接使用 Person 这样的 Scala 对象, 所以无法做到编译时检查
      3. Dataset 表示的具体的某一类对象, 例如 Person, 所以再进行 map 等操作的时候, 传入的是具体的某个 Scala 对象, 如果调用错了方法, 编译时就会被检查出来

    ROW是什么

    Row 对象表示的是一个行

    Row 的操作类似于 Scala 中的 Map 数据类型

    // 一个对象就是一个对象
    val p = Person("张三",18)
    
    // 同样一个对象, 还可以通过一个 Row 对象来表示
     val row = Row("张三",18)
    
    // 获取 Row 中的内容
    println(row.get(1))
    println(row(1))
    
    // 获取时可以指定类型
    println(row.getString(1))
    println(row.getAs[Int](1))
    
    // 同时 Row 也是一个样例类, 可以进行 match
    row match {
      case Row(name, age) => println(name, age)
    }
    

    两者之间的转化

    DataFrameDataset 之间可以非常简单的相互转换

    val spark: SparkSession = new sql.SparkSession.Builder()
      .appName(this.getClass.getName)
      .master("local[6]")
      .getOrCreate()
    
    import spark.implicits._
    
    val df: DataFrame = Seq(People("张三", 15), People("李四", 15)).toDF()
    val ds_fdf: Dataset[People] = df.as[People]
    
    val ds: Dataset[People] = Seq(People("张三", 15), People("李四", 15)).toDS()
    val df_fds: DataFrame = ds.toDF()
    

    总结

    1. DataFrame 就是 Dataset, 他们的方式是一样的, 也都支持 APISQL 两种操作方式
    2. DataFrame 只能通过表达式的形式, 或者列的形式来访问数据, 只有 Dataset 支持针对于整个对象的操作
    3. DataFrame 中的数据表示为 Row, 是一个行的概念
  • 相关阅读:
    媒体查询漫谈——@media Queries
    JavaScript工具函数集
    什么是BFC、IFC、GFC和FFC
    HTTP与HTTPS的区别
    reflow 和 repaint
    客户端检测
    ajax
    批量删除
    数据访问
    登录主页面代码
  • 原文地址:https://www.cnblogs.com/xp-thebest/p/14287762.html
Copyright © 2011-2022 走看看