zoukankan      html  css  js  c++  java
  • spark学习进度17(Catalyst优化器、dataset介绍、dataframe介绍)

     RDD 和 SparkSQL 运行时的区别

    RDD 的运行流程

    1e627dcc1dc31f721933d3e925fa318b

    大致运行步骤

    先将 RDD 解析为由 Stage 组成的 DAG, 后将 Stage 转为 Task 直接运行

    问题

    任务会按照代码所示运行, 依赖开发者的优化, 开发者的会在很大程度上影响运行效率

    解决办法

    创建一个组件, 帮助开发者修改和优化代码, 但是这在 RDD 上是无法实现的

    SparkSQL 提供了什么?

    72e4d163c029f86fafcfa083e6cf6eda

    和 RDD 不同, SparkSQL 的 Dataset 和 SQL 并不是直接生成计划交给集群执行, 而是经过了一个叫做 Catalyst 的优化器, 这个优化器能够自动帮助开发者优化代码

    也就是说, 在 SparkSQL 中, 开发者的代码即使不够优化, 也会被优化为相对较好的形式去执行Catalyst

     

    为了解决过多依赖 Hive 的问题, SparkSQL 使用了一个新的 SQL 优化器替代 Hive 中的优化器, 这个优化器就是 Catalyst, 整个 SparkSQL 的架构大致如下

    4d025ea8579395f704702eb94572b8de
    1. API 层简单的说就是 Spark 会通过一些 API 接受 SQL 语句

    2. 收到 SQL 语句以后, 将其交给 CatalystCatalyst 负责解析 SQL, 生成执行计划等

    3. Catalyst 的输出应该是 RDD 的执行计划

    4. 最终交由集群运行

     
    67b14d92b21b191914800c384cbed439
     

    总结

    SparkSQL 和 RDD 不同的主要点是在于其所操作的数据是结构化的, 提供了对数据更强的感知和分析能力, 能够对代码进行更深层的优化, 而这种能力是由一个叫做 Catalyst 的优化器所提供的

    Catalyst 的主要运作原理是分为三步, 先对 SQL 或者 Dataset 的代码解析, 生成逻辑计划, 后对逻辑计划进行优化, 再生成物理计划, 最后生成代码到集群中以 RDD 的形式运行

     

    Dataset 的特点

    目标

    1. 理解 Dataset 是什么

    2. 理解 Dataset 的特性

    Dataset 是什么?

    @Test
      def dataset1(): Unit = {
        // 1. 创建 SparkSession
        val spark = new sql.SparkSession.Builder()
          .master("local[6]")
          .appName("dataset1")
          .getOrCreate()
    
        // 2. 导入隐式转换
        import spark.implicits._
    
        // 3. 演示
        val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15)))
        val dataset = sourceRDD.toDS()
    
        // Dataset 支持强类型的 API
        dataset.filter( item => item.age > 10 ).show()
        // Dataset 支持弱类型 API
        dataset.filter( 'age > 10 ).show()
        dataset.filter( $"age" > 10 ).show()
        // Dataset 可以直接编写 SQL 表达式
        dataset.filter("age > 10").show()
      }

    问题1: People 是什么?

    People 是一个强类型的类

    问题2: 这个 Dataset 中是结构化的数据吗?

    非常明显是的, 因为 People 对象中有结构信息, 例如字段名和字段类型

    问题3: 这个 Dataset 能够使用类似 SQL 这样声明式结构化查询语句的形式来查询吗?

    当然可以, 已经演示过了

    问题4: Dataset 是什么?

    Dataset 是一个强类型, 并且类型安全的数据容器, 并且提供了结构化查询 API 和类似 RDD 一样的命令式 API

    Dataset 的底层是什么?

    Dataset 最底层处理的是对象的序列化形式, 通过查看 Dataset 生成的物理执行计划, 也就是最终所处理的 RDD, 就可以判定 Dataset 底层处理的是什么形式的数据

    val dataset: Dataset[People] = spark.createDataset(Seq(People("zhangsan", 9), People("lisi", 15)))
    val internalRDD: RDD[InternalRow] = dataset.queryExecution.toRdd

    dataset.queryExecution.toRdd 这个 API 可以看到 Dataset 底层执行的 RDD, 这个 RDD 中的范型是 InternalRowInternalRow 又称之为 Catalyst Row, 是 Dataset 底层的数据结构, 也就是说, 无论 Dataset 的范型是什么, 无论是 Dataset[Person] 还是其它的, 其最底层进行处理的数据结构都是 InternalRow

    所以, Dataset 的范型对象在执行之前, 需要通过 Encoder 转换为 InternalRow, 在输入之前, 需要把 InternalRow 通过 Decoder 转换为范型对象

    cc610157b92466cac52248a8bf72b76e
    @Test
      def dataset2(): Unit = {
        // 1. 创建 SparkSession
        val spark = new sql.SparkSession.Builder()
          .master("local[6]")
          .appName("dataset1")
          .getOrCreate()
    
        // 2. 导入隐式转换
        import spark.implicits._
    
        // 3. 演示
        val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15)))
        val dataset = sourceRDD.toDS()
    
    //    dataset.explain(true)
        // 无论Dataset中放置的是什么类型的对象, 最终执行计划中的RDD上都是 InternalRow
        val executionRdd: RDD[InternalRow] = dataset.queryExecution.toRdd
      }

     可以获取 Dataset 对应的 RDD 表示

    在 Dataset 中, 可以使用一个属性 rdd 来得到它的 RDD 表示, 例如 Dataset[T] → RDD[T]

    def dataset3(): Unit = {
        // 1. 创建 SparkSession
        val spark = new sql.SparkSession.Builder()
          .master("local[6]")
          .appName("dataset1")
          .getOrCreate()
    
        // 2. 导入隐式转换
        import spark.implicits._
    
        // 3. 演示
    //    val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15)))
    //    val dataset = sourceRDD.toDS()
        val dataset: Dataset[Person] = spark.createDataset(Seq(Person("zhangsan", 10), Person("lisi", 15)))
    
        //    dataset.explain(true)
        // 无论Dataset中放置的是什么类型的对象, 最终执行计划中的RDD上都是 InternalRow
        // 直接获取到已经分析和解析过的 Dataset 的执行计划, 从中拿到 RDD
        val executionRdd: RDD[InternalRow] = dataset.queryExecution.toRdd
    
        // 通过将 Dataset 底层的 RDD[InternalRow] 通过 Decoder 转成了和 Dataset 一样的类型的 RDD
        val typedRdd: RDD[Person] = dataset.rdd
    
        println(executionRdd.toDebugString)
        println()
        println()
        println(typedRdd.toDebugString)
      }
      使用 Dataset.rdd 将 Dataset 转为 RDD 的形式
      Dataset 的执行计划底层的 RDD

    可以看到 (1) 对比 (2) 对了两个步骤, 这两个步骤的本质就是将 Dataset 底层的 InternalRow 转为 RDD 中的对象形式, 这个操作还是会有点重的, 所以慎重使用 rdd 属性来转换 Dataset 为 RDD

    总结

    1. Dataset 是一个新的 Spark 组件, 其底层还是 RDD

    2. Dataset 提供了访问对象中某个特定字段的能力, 不用像 RDD 一样每次都要针对整个对象做操作

    3. Dataset 和 RDD 不同, 如果想把 Dataset[T] 转为 RDD[T], 则需要对 Dataset 底层的 InternalRow 做转换, 是一个比价重量级的操作

     

    DataFrame 的作用和常见操作

    目标

    1. 理解 DataFrame 是什么

    2. 理解 DataFrame 的常见操作

     

    DataFrame 是 SparkSQL 中一个表示关系型数据库中  的函数式抽象, 其作用是让 Spark 处理大规模结构化数据的时候更加容易. 一般 DataFrame 可以处理结构化的数据, 或者是半结构化的数据, 因为这两类数据中都可以获取到 Schema 信息. 也就是说 DataFrame 中有 Schema 信息, 可以像操作表一样操作 DataFrame.

    eca0d2e1e2b5ce678161438d87707b61

    DataFrame 由两部分构成, 一是 row 的集合, 每个 row 对象表示一个行, 二是描述 DataFrame 结构的 Schema.

    238c241593cd5b0fd06d4d74294680e2

    DataFrame 支持 SQL 中常见的操作, 例如: selectfilterjoingroupsortjoin 等

    @Test
      def dataframe1(): Unit = {
        // 1. 创建 SparkSession
        val spark = SparkSession.builder()
          .appName("dataframe1")
          .master("local[6]")
          .getOrCreate()
    
        // 2. 创建 DataFrame
        import spark.implicits._
    
        val dataFrame: DataFrame = Seq(Person("zhangsan", 15), Person("lisi", 20)).toDF()
    
        // 3. 看看 DataFrame 可以玩出什么花样
        // select name from ... t where t.age > 10
        dataFrame.where('age > 10)
          .select('name)
          .show()
      }

     

     DataFrame如何创建:

      @Test
      def dataframe2(): Unit = {
        val spark = SparkSession.builder()
          .appName("dataframe1")
          .master("local[6]")
          .getOrCreate()
    
        import spark.implicits._
    
        val personList = Seq(Person("zhangsan", 15), Person("lisi", 20))
    
        // 1. toDF
        val df1 = personList.toDF()
        val df2 = spark.sparkContext.parallelize(personList).toDF()
    
        // 2. createDataFrame
        val df3 = spark.createDataFrame(personList)
    
        // 3. read
        val df4 = spark.read.csv("dataset/BeijingPM20100101_20151231_noheader.csv")
        df4.show()
      }

    DataFrame介绍:

     @Test
      def dataframe3(): Unit = {
        // 1. 创建 SparkSession
        val spark = SparkSession.builder()
          .master("local[6]")
          .appName("pm analysis")
          .getOrCreate()
    
        import spark.implicits._
    
        // 2. 读取数据集
        val sourceDF: DataFrame = spark.read
          .option("header", value = true)//把第一行弄成header
          .csv("dataset/BeijingPM20100101_20151231.csv")
    
        // 查看 DataFrame 的 Schema 信息, 要意识到 DataFrame 中是有结构信息的, 叫做 Schema
        sourceDF.printSchema()
    
        // 3. 处理
        //     1. 选择列
        //     2. 过滤掉 NA 的 PM 记录
        //     3. 分组 select year, month, count(PM_Dongsi) from ... where PM_Dongsi != NA group by year, month
        //     4. 聚合
        // 4. 得出结论
    //    sourceDF.select('year, 'month, 'PM_Dongsi)
    //      .where('PM_Dongsi =!= "NA")
    //      .groupBy('year, 'month)
    //      .count()
    //      .show()
    
        // 是否能直接使用 SQL 语句进行查询
        // 1. 将 DataFrame 注册为临表
        sourceDF.createOrReplaceTempView("pm")
    
        // 2. 执行查询
        val resultDF = spark.sql("select year, month, count(PM_Dongsi) from pm where PM_Dongsi != 'NA' group by year, month")
    
        resultDF.show()
    
        spark.stop()
      }

     

     Dataset 和 DataFrame 的异同

    目标

    1. 理解 Dataset 和 DataFrame 之间的关系

    DataFrame 就是 Dataset

    根据前面的内容, 可以得到如下信息

    1. Dataset 中可以使用列来访问数据, DataFrame 也可以

    2. Dataset 的执行是优化的, DataFrame 也是

    3. Dataset 具有命令式 API, 同时也可以使用 SQL 来访问, DataFrame 也可以使用这两种不同的方式访问

    所以这件事就比较蹊跷了, 两个这么相近的东西为什么会同时出现在 SparkSQL 中呢?

    44fb917304a91eab99d131010448331b

    确实, 这两个组件是同一个东西, DataFrame 是 Dataset 的一种特殊情况, 也就是说 DataFrame 是 Dataset[Row] 的别名

    DataFrame 和 Dataset 所表达的语义不同

    第一点: DataFrame 表达的含义是一个支持函数式操作的 , 而 Dataset 表达是是一个类似 RDD 的东西, Dataset 可以处理任何对象

    第二点: DataFrame 中所存放的是 Row 对象, 而 Dataset 中可以存放任何类型的对象

    val spark: SparkSession = new sql.SparkSession.Builder()
      .appName("hello")
      .master("local[6]")
      .getOrCreate()
    
    import spark.implicits._
    
    val df: DataFrame = Seq(People("zhangsan", 15), People("lisi", 15)).toDF()       
    
    val ds: Dataset[People] = Seq(People("zhangsan", 15), People("lisi", 15)).toDS() 
      DataFrame 就是 Dataset[Row]
      Dataset 的范型可以是任意类型

    第三点: DataFrame 的操作方式和 Dataset 是一样的, 但是对于强类型操作而言, 它们处理的类型不同

    DataFrame 在进行强类型操作时候, 例如 map 算子, 其所处理的数据类型永远是 Row

    df.map( (row: Row) => Row(row.get(0), row.getAs[Int](1) * 10) )(RowEncoder.apply(df.schema)).show()

    但是对于 Dataset 来讲, 其中是什么类型, 它就处理什么类型

    ds.map( (item: People) => People(item.name, item.age * 10) ).show()

    第三点: DataFrame 只能做到运行时类型检查, Dataset 能做到编译和运行时都有类型检查

    1. DataFrame 中存放的数据以 Row 表示, 一个 Row 代表一行数据, 这和关系型数据库类似

    2. DataFrame 在进行 map 等操作的时候, DataFrame 不能直接使用 Person 这样的 Scala 对象, 所以无法做到编译时检查

    3. Dataset 表示的具体的某一类对象, 例如 Person, 所以再进行 map 等操作的时候, 传入的是具体的某个 Scala 对象, 如果调用错了方法, 编译时就会被检查出来

    val ds: Dataset[People] = Seq(People("zhangsan", 15), People("lisi", 15)).toDS()
    ds.map(person => person.hello) 
      这行代码明显报错, 无法通过编译
     
     
     @Test
      def dataframe4(): Unit = {
        val spark = SparkSession.builder()
          .appName("dataframe1")
          .master("local[6]")
          .getOrCreate()
    
        import spark.implicits._
    
        val personList = Seq(Person("zhangsan", 15), Person("lisi", 20))
    
        // DataFrame 是弱类型的
        val df: DataFrame = personList.toDF()
        df.map( (row: Row) => Row(row.get(0), row.getAs[Int](1) * 2) )(RowEncoder.apply(df.schema))
          .show()
    
        // DataFrame 所代表的弱类型操作是编译时不安全
    //    df.groupBy("name, school")
    
        // Dataset 是强类型的
        val ds: Dataset[Person] = personList.toDS()
        ds.map( (person: Person) => Person(person.name, person.age * 2) )
          .show()
    
        // Dataset 所代表的操作, 是类型安全的, 编译时安全的
    //    ds.filter( person => person.school )
      }

      

     Row

     
     @Test
      def row(): Unit = {
        // 1. Row 如何创建, 它是什么
        // row 对象必须配合 Schema 对象才会有 列名
        val p = Person("zhangsan", 15)
        val row = Row("zhangsan", 15)
    
        // 2. 如何从 Row 中获取数据
        row.getString(0)
        row.getInt(1)
    
        // 3. Row 也是样例类
        row match {
          case Row(name, age) => println(name, age)
        }
    
      }

     
     
  • 相关阅读:
    ASP.NET中无刷新分页
    Http之基础
    ASP.NET中刷新分页
    SQL之Case when 语句
    在线HTML编辑器
    Highcharts 统计图
    Leetcode练习(Python):数组类:第162题:峰值元素是指其值大于左右相邻值的元素。 给定一个输入数组 nums,其中 nums[i] ≠ nums[i+1],找到峰值元素并返回其索引。 数组可能包含多个峰值,在这种情况下,返回任何一个峰值所在位置即可。 你可以假设 nums[-1] = nums[n] = -∞。
    Leetcode练习(Python):数组类:第154题:假设按照升序排序的数组在预先未知的某个点上进行了旋转。 ( 例如,数组 [0,1,2,4,5,6,7] 可能变为 [4,5,6,7,0,1,2] )。 请找出其中最小的元素。 注意数组中可能存在重复的元素。
    Leetcode练习(Python):数组类:第153题:假设按照升序排序的数组在预先未知的某个点上进行了旋转。 ( 例如,数组 [0,1,2,4,5,6,7] 可能变为 [4,5,6,7,0,1,2] )。 请找出其中最小的元素。 你可以假设数组中不存在重复元素。
    Leetcode练习(Python):数组类:第152题:给你一个整数数组 nums ,请你找出数组中乘积最大的连续子数组(该子数组中至少包含一个数字)。
  • 原文地址:https://www.cnblogs.com/dazhi151/p/14264820.html
Copyright © 2011-2022 走看看