zoukankan      html  css  js  c++  java
  • SparkSQL

    SparkSQL



    数据分析的方式

    数据分析的方式大致上可以划分为 SQL 和 命令式两种

    • 命令式

      在前面的 RDD 部分, 非常明显可以感觉的到是命令式的, 主要特征是通过一个算子, 可以得到一个结果, 通过结果再进行后续计算

      1. sc.textFile("...")
      2. .flatMap(_.split(" "))
      3. .map((_, 1))
      4. .reduceByKey(_ + _)
      5. .collect()

      命令式的优点

      命令式的优点操作粒度更细, 能够控制数据的每一个处理环节操作更明确, 步骤更清晰, 容易维护支持非结构化数据的操作

      命令式的缺点

      需要一定的代码功底

      写起来比较麻烦

    • SQL

      对于一些数据科学家, 要求他们为了做一个非常简单的查询, 写一大堆代码, 明显是一件非常残忍的事情, 所以 SQL on Hadoop 是一个非常重要的方向.

      SELECT name, age, school FROM students WHERE age > 10

      SQL 的优点

      表达非常清晰, 比如说这段 SQL 明显就是为了查询三个字段, 又比如说这段 SQL 明显能看到是想查询年龄大于 10 岁的条目

      SQL 的缺点

      想想一下 3 层嵌套的 SQL, 维护起来应该挺力不从心的吧

      试想一下, 如果使用 SQL 来实现机器学习算法, 也挺为难的吧

    SQL 擅长数据分析和通过简单的语法表示查询, 命令式操作适合过程式处理和算法性的处理. 在 Spark 出现之前, 对于结构化数据的查询和处理, 一个工具一向只能支持 SQL 或者命令式, 使用者被迫要使用多个工具来适应两种场景, 并且多个工具配合起来比较费劲.

    而 Spark 出现了以后, 统一了两种数据处理范式, 是一种革新性的进步.

    SparkSQL 的适用场景

    定义特点特点举例
    结构化数据有固定的 Schema有预定义的 Schema
    半结构化数据没有固定的 Schema, 但是有结构没有固定的 Schema, 有结构信息, 数据一般是自描述的
    非结构化数据没有固定 Schema, 也没有结构没有固定 Schema, 也没有结构

    结构化数据

    一般指数据有固定的 Schema, 例如在用户表中, name 字段是 String 型, 那么每一条数据的 name 字段值都可以当作 String 来使用

    1. +----+--------------+---------------------------+-------+---------+
    2. | id | name | url | alexa | country |
    3. +----+--------------+---------------------------+-------+---------+
    4. | 1 | Google | https://www.google.cm/ | 1 | USA |
    5. | 2 | 淘宝 | https://www.taobao.com/ | 13 | CN |
    6. | 3 | 菜鸟教程 | http://www.runoob.com/ | 4689 | CN |
    7. | 4 | 微博 | http://weibo.com/ | 20 | CN |
    8. | 5 | Facebook | https://www.facebook.com/ | 3 | USA |
    9. +----+--------------+---------------------------+-------+---------+

    半结构化数据

    一般指的是数据没有固定的 Schema, 但是数据本身是有结构的

    1. {
    2. "firstName": "John",
    3. "lastName": "Smith",
    4. "age": 25,
    5. "phoneNumber":
    6. [
    7. {
    8. "type": "home",
    9. "number": "212 555-1234"
    10. },
    11. {
    12. "type": "fax",
    13. "number": "646 555-4567"
    14. }
    15. ]
    16. }

    SparkSQL 处理什么数据的问题?

    • Spark 的 RDD 主要用于处理 非结构化数据 和 半结构化数据
    • SparkSQL 主要用于处理 结构化数据

    SparkSQL 相较于 RDD 的优势在哪?

    • SparkSQL 提供了更好的外部数据源读写支持
      • 因为大部分外部数据源是有结构化的, 需要在 RDD 之外有一个新的解决方案, 来整合这些结构化数据源
    • SparkSQL 提供了直接访问列的能力
      • 因为 SparkSQL 主要用做于处理结构化数据, 所以其提供的 API 具有一些普通数据库的能力

    SparkSQL 初体验

    SparkSession

    • SparkContext 作为 RDD 的创建者和入口, 其主要作用有如下两点

      创建 RDD, 主要是通过读取文件创建 RDD监控和调度任务, 包含了一系列组件, 例如 DAGSchedulerTaskSheduler

    • 为什么无法使用 SparkContext 作为 SparkSQL 的入口?

      SparkContext 在读取文件的时候, 是不包含 Schema 信息的, 因为读取出来的是 RDD``SparkContext 在整合数据源如 CassandraJSONParquet 等的时候是不灵活的, 而 DataFrame 和 Dataset 一开始的设计目标就是要支持更多的数据源SparkContext 的调度方式是直接调度 RDD, 但是一般情况下针对结构化数据的访问, 会先通过优化器优化一下

    所以 SparkContext 确实已经不适合作为 SparkSQL 的入口, 所以刚开始的时候 Spark 团队为 SparkSQL 设计了两个入口点, 一个是 SQLContext 对应 Spark 标准的 SQL执行, 另外一个是 HiveContext 对应 HiveSQL 的执行和 Hive 的支持.

    在 Spark 2.0 的时候, 为了解决入口点不统一的问题, 创建了一个新的入口点 SparkSession, 作为整个 Spark 生态工具的统一入口点, 包括了 SQLContextHiveContextSparkContext 等组件的功能

    • 新的入口应该有什么特性?

      能够整合 SQLContextHiveContextSparkContextStreamingContext 等不同的入口点为了支持更多的数据源, 应该完善读取和写入体系同时对于原来的入口点也不能放弃, 要向下兼容

    命令式 API与SQL 的入门案例

    SparkSQL 中有一个新的类型叫做 Dataset

    1. case class People(name: String, age: Int)
    2. val spark: SparkSession = new sql.SparkSession.Builder()
    3. .appName("hello")
    4. .master("local[6]")
    5. .getOrCreate()
    6. import spark.implicits._
    7. val peopleRDD: RDD[People] = spark.sparkContext.parallelize(Seq(People("zhangsan", 9), People("lisi", 15)))
    8. val peopleDS: Dataset[People] = peopleRDD.toDS()
    9. val teenagers: Dataset[String] = peopleDS.where('age > 10)
    10. .where('age < 20)
    11. .select('name)
    12. .as[String]
    13. /*
    14. +----+
    15. |name|
    16. +----+
    17. |lisi|
    18. +----+
    19. */
    20. teenagers.show()
    21. ////DF
    22. @Test
    23. def test2(): Unit ={
    24. val spark: SparkSession = new SparkSession.Builder().appName("test2").master("local[6]").getOrCreate()
    25. import spark.implicits._
    26. val sourceRDD: RDD[People] = spark.sparkContext.parallelize(Seq(People("zhangsan", 10), People("lisi", 15)))
    27. val df: DataFrame = sourceRDD.toDF()
    28. //创建临时表并赋予名字
    29. df.createOrReplaceTempView("person")
    30. val result: DataFrame = spark.sql("select name from person where age > 10 and age <20")
    31. result.show()
    32. }
    33. /*
    34. +----+
    35. |name|
    36. +----+
    37. |lisi|
    38. +----+
    39. *以往使用 SQL 肯定是要有一个表的, 在 Spark 中, 并不存在表的概念, 但是有一个近似的概念, 叫做 DataFrame, 所以一般情况下要先通过 DataFrame 或者 Dataset 注册一张临时表, 然后使用 SQL 操作这张临时表
    40. */

    DataFrame & Dataset

    eca0d2e1e2b5ce678161438d87707b61

    SparkSQL 最大的特点就是它针对于结构化数据设计, 所以 SparkSQL 应该是能支持针对某一个字段的访问的, 而这种访问方式有一个前提, 就是 SparkSQL 的数据集中, 要 包含结构化信息, 也就是俗称的 Schema

    而 SparkSQL 对外提供的 API 有两类, 一类是直接执行 SQL, 另外一类就是命令式. SparkSQL 提供的命令式 API 就是 DataFrame 和 Dataset, 暂时也可以认为 DataFrame就是 Dataset, 只是在不同的 API 中返回的是 Dataset 的不同表现形式


    Catalyst

    SparkSQL 和 RDD 不同的主要点是在于其所操作的数据是结构化的, 提供了对数据更强的感知和分析能力, 能够对代码进行更深层的优化, 而这种能力是由一个叫做 Catalyst 的优化器所提供的

    Catalyst 的主要运作原理是分为三步, 先对 SQL 或者 Dataset 的代码解析, 生成逻辑计划, 后对逻辑计划进行优化, 再生成物理计划, 最后生成代码到集群中以 RDD 的形式运行


    Dataset 的特点

    Dataset API例子

    filter方法

    1. val spark: SparkSession = new sql.SparkSession.Builder()
    2. .appName("hello")
    3. .master("local[6]")
    4. .getOrCreate()
    5. import spark.implicits._
    6. //创建dataset
    7. val dataset: Dataset[People] = spark.createDataset(Seq(People("zhangsan", 9), People("lisi", 15)))
    8. // 方式1: 通过对象来处理
    9. dataset.filter(item => item.age > 10).show()
    10. // 方式2: 通过字段来处理
    11. dataset.filter('age > 10).show()
    12. // 方式3: 通过类似SQL的表达式来处理
    13. dataset.filter("age > 10").show()

    Dataset 是一个强类型, 并且类型安全的数据容器, 并且提供了结构化查询 API 和类似 RDD 一样的命令式 API

    Dataset 的底层是什么?

    Dataset 最底层处理的是对象的序列化形式, 通过查看 Dataset 生成的物理执行计划, 也就是最终所处理的 RDD, 就可以判定 Dataset 底层处理的是什么形式的数据

    1. val dataset: Dataset[People] = spark.createDataset(Seq(People("zhangsan", 9), People("lisi", 15)))
    2. val internalRDD: RDD[InternalRow] = dataset.queryExecution.toRdd
    3. dataset.queryExecution.toRdd` 这个 `API` 可以看到 `Dataset` 底层执行的 `RDD`, 这个 `RDD` 中的范型是 `InternalRow`, `InternalRow` 又称之为 `Catalyst Row`, 是 `Dataset` 底层的数据结构, 也就是说, 无论 `Dataset` 的范型是什么, 无论是 `Dataset[Person]` 还是其它的, 其最底层进行处理的数据结构都是 `InternalRow

    所以, Dataset 的范型对象在执行之前, 需要通过 Encoder 转换为 InternalRow, 在输入之前, 需要把 InternalRow 通过 Decoder 转换为范型对象

    cc610157b92466cac52248a8bf72b76e

    总结

    1. Dataset 是一个新的 Spark 组件, 其底层还是 RDD
    2. Dataset 提供了访问对象中某个特定字段的能力, 不用像 RDD 一样每次都要针对整个对象做操作
    3. Dataset 和 RDD 不同, 如果想把 Dataset[T] 转为 RDD[T], 则需要对 Dataset 底层的 InternalRow 做转换, 是一个比价重量级的操作

    DataFrame 的作用和常见操作

    DataFrame 是什么?

    DataFrame 是 SparkSQL 中一个表示关系型数据库中  的函数式抽象, 其作用是让 Spark 处理大规模结构化数据的时候更加容易. 一般 DataFrame 可以处理结构化的数据, 或者是半结构化的数据, 因为这两类数据中都可以获取到 Schema 信息. 也就是说 DataFrame 中有 Schema 信息, 可以像操作表一样操作 DataFrame.

    eca0d2e1e2b5ce678161438d87707b61

    DataFrame 由两部分构成, 一是 row 的集合, 每个 row 对象表示一个行, 二是描述 DataFrame 结构的 Schema.

    238c241593cd5b0fd06d4d74294680e2

    DataFrame 支持 SQL 中常见的操作, 例如: selectfilterjoingroupsortjoin 等

    测试

    1. @Test
    2. def test5(): Unit ={
    3. val spark: SparkSession = SparkSession.builder().appName("test5").master("local[6]").getOrCreate()
    4. import spark.implicits._
    5. val people: DataFrame = Seq(People("zhangshan",19),People("lsi",9),People("zhaoliu",19)).toDF()
    6. people.groupBy('age)
    7. .count()
    8. .show()
    9. //+---+-----+
    10. //|age|count|
    11. //+---+-----+
    12. //| 19| 2|
    13. //| 9| 1|
    14. //+---+-----+
    15. }

    其他方式:

    通过隐式转换创建 DataFrame

    841503b4240e7a8ecac62d92203e9943

    根据源码可以知道, toDF 方法可以在 RDD 和 Seq 中使用

    通过集合创建 DataFrame 的时候, 集合中不仅可以包含样例类, 也可以只有普通数据类型, 后通过指定列名来创建

    1. val spark: SparkSession = new sql.SparkSession.Builder()
    2. .appName("hello")
    3. .master("local[6]")
    4. .getOrCreate()
    5. import spark.implicits._
    6. val df1: DataFrame = Seq("nihao", "hello").toDF("text")
    7. /*
    8. +-----+
    9. | text|
    10. +-----+
    11. |nihao|
    12. |hello|
    13. +-----+
    14. */
    15. df1.show()
    16. val df2: DataFrame = Seq(("a", 1), ("b", 1)).toDF("word", "count")
    17. /*
    18. +----+-----+
    19. |word|count|
    20. +----+-----+
    21. | a| 1|
    22. | b| 1|
    23. +----+-----+
    24. */
    25. df2.show()

    通过外部集合创建 DataFrame

    1. val spark: SparkSession = new sql.SparkSession.Builder()
    2. .appName("hello")
    3. .master("local[6]")
    4. .getOrCreate()
    5. val df = spark.read
    6. .option("header", true)
    7. .csv("dataset/BeijingPM20100101_20151231.csv")
    8. df.show(10)
    9. df.printSchema()

    1568204176599


    其他操作

    需求: 查看每个月的统计数量

    • Step 1: 首先可以打印 DataFrame 的 Schema, 查看其中所包含的列, 以及列的类型
    • Step 2: 对于大部分计算来说, 可能不会使用所有的列, 所以可以选择其中某些重要的列
    • Step 3: 可以针对某些列进行分组, 后对每组数据通过函数做聚合
    1. @Test
    2. def test8(): Unit ={
    3. val spark: SparkSession = SparkSession.builder().master("local[6]").appName("tets8").getOrCreate()
    4. import spark.implicits._
    5. val df: DataFrame = spark.read
    6. //参数1;"header"指定第一行为表结构
    7. .option("header", value = true)
    8. .csv("dataset/BeijingPM20100101_20151231.csv")
    9. df.select('year, 'month, 'PM_Dongsi)
    10. .where('PM_Dongsi =!= "Na")
    11. .groupBy('year, 'month)
    12. .count()
    13. .show()
    14. }

    1568206025476


    • 使用 SQL 操作 DataFrame

    使用 SQL 来操作某个 DataFrame 的话, SQL 中必须要有一个 from 子句, 所以需要先将 DataFrame 注册为一张临时表

    1. @Test
    2. def test9(): Unit ={
    3. val spark: SparkSession = SparkSession.builder().appName("hello")
    4. .master("local[6]")
    5. .getOrCreate()
    6. val df: DataFrame = spark.read
    7. .option("header", true).csv("dataset/BeijingPM20100101_20151231.csv")
    8. df.createOrReplaceTempView("temp_table")
    9. spark.sql("select year,month,count(*) from temp_table where PM_Dongsi != 'NA' group by year,month order by year").show()
    10. }

    1568206446811



    Dataset 和 DataFrame 的异同

    DataFrame 就是 Dataset

    DataFrame 和 Dataset 所表达的语义不同

    第一点: DataFrame 表达的含义是一个支持函数式操作的 表, 而 Dataset 表达是是一个类似 RDD 的东西, Dataset 可以处理任何对象

    第二点: DataFrame 中所存放的是 Row 对象, 而 Dataset 中可以存放任何类型的对象

    第三点: DataFrame 的操作方式和 Dataset 是一样的, 但是对于强类型操作而言, 它们处理的类型不同

    第四点: DataFrame 只能做到运行时类型检查, Dataset 能做到编译和运行时都有类型检查

    Row 对象表示的是一个 

    Row 的操作类似于 Scala 中的 Map 数据类型

    DataFrame  Dataset 之间可以非常简单的相互转换

    1. val spark: SparkSession = new sql.SparkSession.Builder()
    2. .appName("hello")
    3. .master("local[6]")
    4. .getOrCreate()
    5. import spark.implicits._
    6. val df: DataFrame = Seq(People("zhangsan", 15), People("lisi", 15)).toDF()
    7. val ds_fdf: Dataset[People] = df.as[People]
    8. val ds: Dataset[People] = Seq(People("zhangsan", 15), People("lisi", 15)).toDS()
    9. val df_fds: DataFrame = ds.toDF()

    总结

    1. DataFrame 就是 Dataset, 他们的方式是一样的, 也都支持 API 和 SQL 两种操作方式
    2. DataFrame 只能通过表达式的形式, 或者列的形式来访问数据, 只有 Dataset 支持针对于整个对象的操作
    3. DataFrame 中的数据表示为 Row, 是一个行的概念

    数据读写

    初识 DataFrameReader

    SparkSQL 的一个非常重要的目标就是完善数据读取, 所以 SparkSQL 中增加了一个新的框架, 专门用于读取外部数据源, 叫做 DataFrameReader

    DataFrameReader 由如下几个组件组成

    组件解释
    schema结构信息, 因为 Dataset 是有结构的, 所以在读取数据的时候, 就需要有 Schema 信息, 有可能是从外部数据源获取的, 也有可能是指定的
    option连接外部数据源的参数, 例如 JDBC 的 URL, 或者读取 CSV 文件是否引入 Header 等
    format外部数据源的格式, 例如 csvjdbcjson 等

    DataFrameReader 有两种访问方式, 一种是使用 load 方法加载, 使用 format 指定加载格式, 还有一种是使用封装方法, 类似 csvjsonjdbc 等

    1. val spark: SparkSession = ...
    2. // 使用 load 方法
    3. val fromLoad: DataFrame = spark
    4. .read
    5. .format("csv")
    6. .option("header", true)
    7. .option("inferSchema", true)
    8. .load("dataset/BeijingPM20100101_20151231.csv")
    9. // Using format-specific load operator
    10. val fromCSV: DataFrame = spark
    11. .read
    12. .option("header", true)
    13. .option("inferSchema", true)
    14. .csv("dataset/BeijingPM20100101_20151231.csv")

    但是其实这两种方式本质上一样, 因为类似 csv 这样的方式只是 load 的封装

    e8af7d7e5ec256de27b2e40c8449a906


    初识 DataFrameWriter

    对于 ETL 来说, 数据保存和数据读取一样重要, 所以 SparkSQL 中增加了一个新的数据写入框架, 叫做 DataFrameWriter

    DataFrameWriter 中由如下几个部分组成

    组件解释
    source写入目标, 文件格式等, 通过 format 方法设定
    mode写入模式, 例如一张表已经存在, 如果通过 DataFrameWriter 向这张表中写入数据, 是覆盖表呢, 还是向表中追加呢? 通过 mode方法设定
    extraOptions外部参数, 例如 JDBC 的 URL, 通过 optionsoption 设定
    partitioningColumns类似 Hive 的分区, 保存表的时候使用, 这个地方的分区不是 RDD的分区, 而是文件的分区, 或者表的分区, 通过 partitionBy 设定
    bucketColumnNames类似 Hive 的分桶, 保存表的时候使用, 通过 bucketBy 设定
    sortColumnNames用于排序的列, 通过 sortBy 设定

    mode 指定了写入模式, 例如覆盖原数据集, 或者向原数据集合中尾部添加等

    Scala 对象表示字符串表示解释
    SaveMode.ErrorIfExists"error"将 DataFrame 保存到 source 时, 如果目标已经存在, 则报错
    SaveMode.Append"append"将 DataFrame 保存到 source 时, 如果目标已经存在, 则添加到文件或者 Table
    SaveMode.Overwrite"overwrite"将 DataFrame 保存到 source 时, 如果目标已经存在, 则使用 DataFrame 中的数据完全覆盖目标
    SaveMode.Ignore"ignore"将 DataFrame 保存到 source 时, 如果目标已经存在, 则不会保存 DataFrame 数据, 并且也不修改目标数据集, 类似于 CREATE TABLE IF NOT EXISTS

    DataFrameWriter 也有两种使用方式, 一种是使用 format 配合 save, 还有一种是使用封装方法, 例如 csvjsonsaveAsTable 等

    1. val spark: SparkSession = ...
    2. val df = spark.read
    3. .option("header", true)
    4. .csv("dataset/BeijingPM20100101_20151231.csv")
    5. // 使用 save 保存, 使用 format 设置文件格式
    6. df.write.format("json").save("dataset/beijingPM")
    7. // 使用 json 保存, 因为方法是 json, 所以隐含的 format 是 json
    8. df.write.json("dataset/beijingPM1")

    默认没有指定 format, 默认的 format 是 Parquet

    总结

    1. 类似 DataFrameReaderWriter 中也有 formatoptions, 另外 schema 是包含在 DataFrame 中的
    2. DataFrameWriter 中还有一个很重要的概念叫做 mode, 指定写入模式, 如果目标集合已经存在时的行为
    3. DataFrameWriter 可以将数据保存到 Hive 表中, 所以也可以指定分区和分桶信息

    读写 Parquet 格式文件

    在ETL中,spark将文件读写到HDFS可能会使用到Parquet

    为了能够保存比较复杂的数据, 并且保证性能和压缩率, 通常使用 Parquet 是一个比较不错的选择.

    1. //默认不指定 format 的时候, 默认就是读写 Parquet 格式的文件
    2. val spark: SparkSession = new sql.SparkSession.Builder()
    3. .appName("hello")
    4. .master("local[6]")
    5. .getOrCreate()
    6. val df = spark.read
    7. .option("header", value = true)
    8. .csv("dataset/911.csv")
    9. // 保存 Parquet 文件
    10. df.write.mode("override").save("dataset/911.parquet")
    11. // 读取 Parquet 文件
    12. val dfFromParquet = spark.read.parquet("dataset/911.parquet")
    13. dfFromParquet.createOrReplaceTempView("911")
    14. spark.sql("select * from 911 where zip > 19000 and zip < 19400").show()

    写入 Parquet 的时候可以指定分区

    1. val spark: SparkSession = new sql.SparkSession.Builder()
    2. .appName("hello")
    3. .master("local[6]")
    4. .getOrCreate()
    5. // 从 CSV 中读取内容
    6. val dfFromParquet = spark.read.option("header", value = true).csv("dataset/BeijingPM20100101_20151231.csv")
    7. // 保存为 Parquet 格式文件, 不指定 format 默认就是 Parquet
    8. dfFromParquet.write.partitionBy("year", "month").save("dataset/beijing_pm")

    读写 JSON 格式文件

    什么时候会用到 JSON ?

    00a2a56f725d86b5c27463f109c43d8c

    在 ETL 中, Spark 经常扮演 T 的职务, 也就是进行数据清洗和数据转换.

    在业务系统中, JSON 是一个非常常见的数据格式, 在前后端交互的时候也往往会使用 JSON, 所以从业务系统获取的数据很大可能性是使用 JSON 格式, 所以就需要 Spark 能够支持 JSON 格式文件的读取

    读写 JSON 文件

    1. val spark: SparkSession = new sql.SparkSession.Builder()
    2. .appName("hello")
    3. .master("local[6]")
    4. .getOrCreate()
    5. val dfFromParquet = spark.read.load("dataset/beijing_pm")
    6. // 将 DataFrame 保存为 JSON 格式的文件
    7. dfFromParquet.repartition(1)
    8. .write.format("json")
    9. .save("dataset/beijing_pm_json")

    如果不重新分区, 则会为 DataFrame 底层的 RDD 的每个分区生成一个文件, 为了保持只有一个输出文件, 所以重新分区

    保存为 JSON 格式的文件有一个细节需要注意, 这个 JSON 格式的文件中, 每一行是一个独立的 JSON, 但是整个文件并不只是一个 JSON 字符串, 所以这种文件格式很多时候被成为 JSON Line 文件, 有时候后缀名也会变为 jsonl

    Spark 读取 JSON Line 文件的时候, 会自动的推断类型信息

    Spark 可以从一个保存了 JSON 格式字符串的 Dataset[String] 中读取 JSON 信息, 转为 DataFrame

    1. val spark: SparkSession = ...
    2. import spark.implicits._
    3. val peopleDataset = spark.createDataset(
    4. """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
    5. spark.read.json(peopleDataset).show()

    总结

    1. JSON 通常用于系统间的交互, Spark 经常要读取 JSON 格式文件, 处理, 放在另外一处
    2. 使用 DataFrameReader 和 DataFrameWriter 可以轻易的读取和写入 JSON, 并且会自动处理数据类型信息

    SparkSQL 整合 Hive

    整合什么 ?

    • MetaStore, 元数据存储

      SparkSQL 内置的有一个 MetaStore, 通过嵌入式数据库 Derby 保存元信息, 但是对于生产环境来说, 还是应该使用 Hive 的 MetaStore, 一是更成熟, 功能更强, 二是可以使用 Hive 的元信息

    • 查询引擎

      SparkSQL 内置了 HiveSQL 的支持, 所以无需整合

    MetaStore

    Hive 的 MetaStore 是一个 Hive 的组件, 一个 Hive 提供的程序, 用以保存和访问表的元数据, 整个 Hive 的结构大致如下

    20190523011946

    由上图可知道, 其实 Hive 中主要的组件就三个, HiveServer2 负责接受外部系统的查询请求, 例如 JDBCHiveServer2 接收到查询请求后, 交给 Driver 处理, Driver 会首先去询问 MetaStore 表在哪存, 后 Driver 程序通过 MR 程序来访问 HDFS 从而获取结果返回给查询请求者

    而 Hive 的 MetaStore 对 SparkSQL 的意义非常重大, 如果 SparkSQL 可以直接访问 Hive的 MetaStore, 则理论上可以做到和 Hive 一样的事情, 例如通过 Hive 表查询数据

    而 Hive 的 MetaStore 的运行模式有三种

    • 内嵌 Derby 数据库模式

      这种模式不必说了, 自然是在测试的时候使用, 生产环境不太可能使用嵌入式数据库, 一是不稳定, 二是这个 Derby 是单连接的, 不支持并发

    • Local 模式

      Local 和 Remote 都是访问 MySQL 数据库作为存储元数据的地方, 但是 Local 模式的 MetaStore 没有独立进程, 依附于 HiveServer2 的进程

    • Remote 模式

      和 Loca 模式一样, 访问 MySQL 数据库存放元数据, 但是 Remote 的 MetaStore 运行在独立的进程中

    我们显然要选择 Remote 模式, 因为要让其独立运行, 这样才能让 SparkSQL 一直可以访问

    在 Hive 中创建表

    第一步, 需要先将文件上传到集群中, 使用如下命令上传到 HDFS 中

    1. hdfs dfs -mkdir -p /dataset
    2. hdfs dfs -put studenttabl10k /dataset/

    第二步, 使用 Hive 或者 Beeline 执行如下 SQL

    1. CREATE DATABASE IF NOT EXISTS spark_integrition;
    2. USE spark_integrition;
    3. CREATE EXTERNAL TABLE student
    4. (
    5. name STRING,
    6. age INT,
    7. gpa string
    8. )
    9. ROW FORMAT DELIMITED
    10. FIELDS TERMINATED BY ' '
    11. LINES TERMINATED BY ' '
    12. STORED AS TEXTFILE
    13. LOCATION '/dataset/hive';
    14. LOAD DATA INPATH '/dataset/studenttab10k' OVERWRITE INTO TABLE student;

    通过 SparkSQL 查询 Hive 的表

    查询 Hive 中的表可以直接通过 spark.sql(…) 来进行, 可以直接在其中访问 Hive 的 MetaStore, 前提是一定要将 Hive 的配置文件拷贝到 Spark 的 conf 目录

    1. scala> spark.sql("use spark_integrition")
    2. scala> val resultDF = spark.sql("select * from student limit 10")
    3. scala> resultDF.show()

    通过 SparkSQL 创建 Hive 表

    通过 SparkSQL 可以直接创建 Hive 表, 并且使用 LOAD DATA 加载数据

    1. val createTableStr =
    2. """
    3. |CREATE EXTERNAL TABLE student
    4. |(
    5. | name STRING,
    6. | age INT,
    7. | gpa string
    8. |)
    9. |ROW FORMAT DELIMITED
    10. | FIELDS TERMINATED BY ' '
    11. | LINES TERMINATED BY ' '
    12. |STORED AS TEXTFILE
    13. |LOCATION '/dataset/hive'
    14. """.stripMargin
    15. spark.sql("CREATE DATABASE IF NOT EXISTS spark_integrition1")
    16. spark.sql("USE spark_integrition1")
    17. spark.sql(createTableStr)
    18. spark.sql("LOAD DATA INPATH '/dataset/studenttab10k' OVERWRITE INTO TABLE student")
    19. spark.sql("select * from student limit").show()

    目前 SparkSQL 支持的文件格式有 sequencefilercfileorcparquettextfileavro, 并且也可以指定 serde 的名称

    使用 SparkSQL 处理数据并保存进 Hive 表

    前面都在使用 SparkShell 的方式来访问 Hive, 编写 SQL, 通过 Spark 独立应用的形式也可以做到同样的事, 但是需要一些前置的步骤, 如下

    • Step 1: 导入 Maven 依赖

      <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>${spark.version}</version> </dependency>

    • Step 2: 配置 SparkSession

      如果希望使用 SparkSQL 访问 Hive 的话, 需要做两件事开启 SparkSession 的 Hive 支持经过这一步配置, SparkSQL 才会把 SQL 语句当作 HiveSQL 来进行解析设置 WareHouse 的位置虽然 hive-stie.xml 中已经配置了 WareHouse 的位置, 但是在 Spark 2.0.0 后已经废弃了 hive-site.xml 中设置的 hive.metastore.warehouse.dir, 需要在 SparkSession 中设置 WareHouse 的位置设置 MetaStore 的位置val spark = SparkSession .builder() .appName("hive example") .config("spark.sql.warehouse.dir", "hdfs://node01:8020/dataset/hive") .config("hive.metastore.uris", "thrift://node01:9083")

      `.enableHiveSupport()

      .getOrCreate()设置WareHouse的位置设置MetaStore的位置开启Hive` 支持

    配置好了以后, 就可以通过 DataFrame 处理数据, 后将数据结果推入 Hive 表中了, 在将结果保存到 Hive 表的时候, 可以指定保存模式

    1. val schema = StructType(
    2. List(
    3. StructField("name", StringType),
    4. StructField("age", IntegerType),
    5. StructField("gpa", FloatType)
    6. )
    7. )
    8. val studentDF = spark.read
    9. .option("delimiter", " ")
    10. .schema(schema)
    11. .csv("dataset/studenttab10k")
    12. val resultDF = studentDF.where("age < 50")
    13. resultDF.write.mode(SaveMode.Overwrite).saveAsTable("spark_integrition1.student")

    通过 mode 指定保存模式, 通过 saveAsTable 保存数据到 Hive


    JDBC

    准备 MySQL 环境

    在使用 SparkSQL 访问 MySQL 之前, 要对 MySQL 进行一些操作, 例如说创建用户, 表和库等

    • Step 1: 连接 MySQL 数据库

      在 MySQL 所在的主机上执行如下命令

      1. mysql -u root -p
    • Step 2: 创建 Spark 使用的用户

      登进 MySQL 后, 需要先创建用户

    1. CREATE USER 'spark'@'%' IDENTIFIED BY 'Spark123!';
    2. GRANT ALL ON spark_test.* TO 'spark'@'%';

    Step 3: 创建库和表

    1. CREATE DATABASE spark_test;
    2. USE spark_test;
    3. CREATE TABLE IF NOT EXISTS `student`(
    4. `id` INT AUTO_INCREMENT,
    5. `name` VARCHAR(100) NOT NULL,
    6. `age` INT NOT NULL,
    7. `gpa` FLOAT,
    8. PRIMARY KEY ( `id` )
    9. )ENGINE=InnoDB DEFAULT CHARSET=utf8;

    使用 SparkSQL 向 MySQL 中写入数据

    其实在使用 SparkSQL 访问 MySQL 是通过 JDBC, 那么其实所有支持 JDBC 的数据库理论上都可以通过这种方式进行访问

    在使用 JDBC 访问关系型数据的时候, 其实也是使用 DataFrameReader, 对 DataFrameReader 提供一些配置, 就可以使用 Spark 访问 JDBC, 有如下几个配置可用

    属性含义
    url要连接的 JDBC URL
    dbtable要访问的表, 可以使用任何 SQL 语句中 from 子句支持的语法
    fetchsize数据抓取的大小(单位行), 适用于读的情况
    batchsize数据传输的大小(单位行), 适用于写的情况
    isolationLevel事务隔离级别, 是一个枚举, 取值 NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READSERIALIZABLE, 默认为 READ_UNCOMMITTED

    读取数据集, 处理过后存往 MySQL 中的代码如下

    1. val spark = SparkSession
    2. .builder()
    3. .appName("hive example")
    4. .master("local[6]")
    5. .getOrCreate()
    6. val schema = StructType(
    7. List(
    8. StructField("name", StringType),
    9. StructField("age", IntegerType),
    10. StructField("gpa", FloatType)
    11. )
    12. )
    13. val studentDF = spark.read
    14. .option("delimiter", " ")
    15. .schema(schema)
    16. .csv("dataset/studenttab10k")
    17. studentDF.write.format("jdbc").mode(SaveMode.Overwrite)
    18. .option("url", "jdbc:mysql://node01:3306/spark_test")
    19. .option("dbtable", "student")
    20. .option("user", "spark")
    21. .option("password", "Spark123!")
    22. .save()

    运行程序

    如果是在本地运行, 需要导入 Maven 依赖

    1. <dependency>
    2. <groupId>mysql</groupId>
    3. <artifactId>mysql-connector-java</artifactId>
    4. <version>5.1.47</version>
    5. </dependency>

    如果使用 Spark submit 或者 Spark shell 来运行任务, 需要通过 --jars 参数提交 MySQL 的 Jar 包, 或者指定 --packages 从 Maven 库中读取

    1. bin/spark-shell --packages mysql:mysql-connector-java:5.1.47 --repositories http://maven.aliyun.com/nexus/content/groups/public/

    从 MySQL 中读取数据

    读取 MySQL 的方式也非常的简单, 只是使用 SparkSQL 的 DataFrameReader 加上参数配置即可访问

    1. spark.read.format("jdbc")
    2. .option("url", "jdbc:mysql://node01:3306/spark_test")
    3. .option("dbtable", "student")
    4. .option("user", "spark")
    5. .option("password", "Spark123!")
    6. .load()
    7. .show()

    默认情况下读取 MySQL 表时, 从 MySQL 表中读取的数据放入了一个分区, 拉取后可以使用 DataFrame 重分区来保证并行计算和内存占用不会太高, 但是如果感觉 MySQL 中数据过多的时候, 读取时可能就会产生 OOM, 所以在数据量比较大的场景, 就需要在读取的时候就将其分发到不同的 RDD 分区

    属性含义
    partitionColumn指定按照哪一列进行分区, 只能设置类型为数字的列, 一般指定为 ID
    lowerBoundupperBound确定步长的参数, lowerBound - upperBound 之间的数据均分给每一个分区, 小于 lowerBound 的数据分给第一个分区, 大于 upperBound 的数据分给最后一个分区
    numPartitions分区数量
    1. spark.read.format("jdbc")
    2. .option("url", "jdbc:mysql://node01:3306/spark_test")
    3. .option("dbtable", "student")
    4. .option("user", "spark")
    5. .option("password", "Spark123!")
    6. .option("partitionColumn", "age")
    7. .option("lowerBound", 1)
    8. .option("upperBound", 60)
    9. .option("numPartitions", 10)
    10. .load()
    11. .show()

    有时候可能要使用非数字列来作为分区依据, Spark 也提供了针对任意类型的列作为分区依据的方法

    1. val predicates = Array(
    2. "age < 20",
    3. "age >= 20, age < 30",
    4. "age >= 30"
    5. )
    6. val connectionProperties = new Properties()
    7. connectionProperties.setProperty("user", "spark")
    8. connectionProperties.setProperty("password", "Spark123!")
    9. spark.read
    10. .jdbc(
    11. url = "jdbc:mysql://node01:3306/spark_test",
    12. table = "student",
    13. predicates = predicates,
    14. connectionProperties = connectionProperties
    15. )
    16. .show()

    SparkSQL 中并没有直接提供按照 SQL 进行筛选读取数据的 API 和参数, 但是可以通过 dbtable 来曲线救国, dbtable 指定目标表的名称, 但是因为 dbtable 中可以编写 SQL, 所以使用子查询即可做到

    1. spark.read.format("jdbc")
    2. .option("url", "jdbc:mysql://node01:3306/spark_test")
    3. .option("dbtable", "(select name, age from student where age > 10 and age < 20) as stu")
    4. .option("user", "spark")
    5. .option("password", "Spark123!")
    6. .option("partitionColumn", "age")
    7. .option("lowerBound", 1)
    8. .option("upperBound", 60)
    9. .option("numPartitions", 10)
    10. .load()
    11. .show()

    Dataset (DataFrame) 的基础操作

    有类型操作

    分类算子解释
    转换flatMap通过 flatMap 可以将一条数据转为一个数组, 后再展开这个数组放入 Dataset``import spark.implicits._ val ds = Seq("hello world", "hello pc").toDS() ds.flatMap( _.split(" ") ).show()
    mapmap 可以将数据集中每条数据转为另一种形式import spark.implicits._ val ds = Seq(Person("zhangsan", 15), Person("lisi", 15)).toDS() ds.map( person => Person(person.name, person.age * 2) ).show()
    mapPartitionsmapPartitions 和 map 一样, 但是 map 的处理单位是每条数据, mapPartitions 的处理单位是每个分区import spark.implicits._ val ds = Seq(Person("zhangsan", 15), Person("lisi", 15)).toDS() ds.mapPartitions( iter => { val returnValue = iter.map( item => Person(item.name, item.age * 2) ) returnValue } ) .show()
    transformmap 和 mapPartitions 以及 transform 都是转换, map 和 mapPartitions 是针对数据, 而 transform 是针对整个数据集, 这种方式最大的区别就是 transform 可以直接拿到 Dataset 进行操作20190526111401import spark.implicits._ val ds = spark.range(5) ds.transform( dataset => dataset.withColumn("doubled", 'id * 2) )
    asas[Type] 算子的主要作用是将弱类型的 Dataset 转为强类型的 Dataset, 它有很多适用场景, 但是最常见的还是在读取数据的时候, 因为 DataFrameReader 体系大部分情况下是将读出来的数据转换为 DataFrame的形式, 如果后续需要使用 Dataset 的强类型 API, 则需要将 DataFrame 转为 Dataset. 可以使用 as[Type] 算子完成这种操作import spark.implicits._ val structType = StructType( Seq( StructField("name", StringType), StructField("age", IntegerType), StructField("gpa", FloatType) ) ) val sourceDF = spark.read .schema(structType) .option("delimiter", " ") .csv("dataset/studenttab10k") val dataset = sourceDF.as[Student] dataset.show()
    过滤filterfilter 用来按照条件过滤数据集import spark.implicits._ val ds = Seq(Person("zhangsan", 15), Person("lisi", 15)).toDS() ds.filter( person => person.name == "lisi" ).show()
    聚合groupByKeygrouByKey 算子的返回结果是 KeyValueGroupedDataset, 而不是一个 Dataset, 所以必须要先经过 KeyValueGroupedDataset 中的方法进行聚合, 再转回 Dataset, 才能使用 Action 得出结果其实这也印证了分组后必须聚合的道理import spark.implicits._ val ds = Seq(Person("zhangsan", 15), Person("zhangsan", 15), Person("lisi", 15)).toDS() ds.groupByKey( person => person.name ).count().show()
    切分randomSplitrandomSplit 会按照传入的权重随机将一个 Dataset 分为多个 Dataset, 传入 randomSplit 的数组有多少个权重, 最终就会生成多少个 Dataset, 这些权重的加倍和应该为 1, 否则将被标准化val ds = spark.range(15) val datasets: Array[Dataset[lang.Long]] = ds.randomSplit(Array[Double](2, 3)) datasets.foreach(dataset => dataset.show())
    samplesample 会随机在 Dataset 中抽样val ds = spark.range(15) ds.sample(withReplacement = false, fraction = 0.4).show()
    排序orderByorderBy 配合 Column 的 API, 可以实现正反序排列import spark.implicits._ val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() ds.orderBy("age").show() ds.orderBy('age.desc).show()
    sort其实 orderBy 是 sort 的别名, 所以它们所实现的功能是一样的import spark.implicits._ val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() ds.sort('age.desc).show()
    分区coalesce减少分区, 此算子和 RDD 中的 coalesce 不同, Dataset 中的 coalesce 只能减少分区数, coalesce 会直接创建一个逻辑操作, 并且设置 Shuffle 为 false``val ds = spark.range(15) ds.coalesce(1).explain(true)
    repartitionsrepartitions 有两个作用, 一个是重分区到特定的分区数, 另一个是按照某一列来分区, 类似于 SQL 中的 DISTRIBUTE BY``val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() ds.repartition(4) ds.repartition('name)
    去重dropDuplicates使用 dropDuplicates 可以去掉某一些列中重复的行import spark.implicits._ val ds = spark.createDataset(Seq(Person("zhangsan", 15), Person("zhangsan", 15), Person("lisi", 15))) ds.dropDuplicates("age").show()
    distinct当 dropDuplicates 中没有传入列名的时候, 其含义是根据所有列去重, dropDuplicates() 方法还有一个别名, 叫做 distinct20190525182912所以, 使用 distinct 也可以去重, 并且只能根据所有的列来去重import spark.implicits._ val ds = spark.createDataset(Seq(Person("zhangsan", 15), Person("zhangsan", 15), Person("lisi", 15))) ds.distinct().show()
    集合操作exceptexcept 和 SQL 语句中的 except 一个意思, 是求得 ds1中不存在于 ds2 中的数据, 其实就是差集val ds1 = spark.range(1, 10) val ds2 = spark.range(5, 15) ds1.except(ds2).show()
    intersect求得两个集合的交集val ds1 = spark.range(1, 10) val ds2 = spark.range(5, 15) ds1.intersect(ds2).show()
    union求得两个集合的并集val ds1 = spark.range(1, 10) val ds2 = spark.range(5, 15) ds1.union(ds2).show()
    limit限制结果集数量val ds = spark.range(1, 10) ds.limit(3).show()

    无类型转换

    分类算子解释
    选择selectselect 用来选择某些列出现在结果集中import spark.implicits._ val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() ds.select($"name").show()
    selectExpr在 SQL 语句中, 经常可以在 select 子句中使用 count(age)rand() 等函数, 在 selectExpr 中就可以使用这样的 SQL 表达式, 同时使用 select 配合 expr 函数也可以做到类似的效果import spark.implicits._ import org.apache.spark.sql.functions._ val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() ds.selectExpr("count(age) as count").show() ds.selectExpr("rand() as random").show() ds.select(expr("count(age) as count")).show()
    withColumn通过 Column 对象在 Dataset 中创建一个新的列或者修改原来的列import spark.implicits._ import org.apache.spark.sql.functions._ val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() ds.withColumn("random", expr("rand()")).show()
    withColumnRenamed修改列名import spark.implicits._ val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() ds.withColumnRenamed("name", "new_name").show()
    剪除drop剪掉某个列import spark.implicits._ val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() ds.drop('age).show()
    聚合groupBy按照给定的行进行分组import spark.implicits._ val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() ds.groupBy('name).count().show()

    Column 对象

    分类操作解释
    创建'单引号 ' 在 Scala 中是一个特殊的符号, 通过 ' 会生成一个 Symbol 对象, Symbol对象可以理解为是一个字符串的变种, 但是比字符串的效率高很多, 在 Spark 中, 对 Scala中的 Symbol 对象做了隐式转换, 转换为一个 ColumnName 对象, ColumnName是 Column 的子类, 所以在 Spark 中可以如下去选中一个列val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate() import spark.implicits._ val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() val c1: Symbol = 'name
    $同理, $ 符号也是一个隐式转换, 同样通过 spark.implicits 导入, 通过 $ 可以生成一个 Column 对象val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate() import spark.implicits._ val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() val c2: ColumnName = $"name"
    colSparkSQL 提供了一系列的函数, 可以通过函数实现很多功能, 在后面课程中会进行详细介绍, 这些函数中有两个可以帮助我们创建 Column 对象, 一个是 col, 另外一个是 column``val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate() import org.apache.spark.sql.functions._ val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() val c3: sql.Column = col("name")
    columnval spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate() import org.apache.spark.sql.functions._ val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() val c4: sql.Column = column("name")
    Dataset.col前面的 Column 对象创建方式所创建的 Column 对象都是 Free 的, 也就是没有绑定任何 Dataset, 所以可以作用于任何 Dataset, 同时, 也可以通过 Dataset 的 col 方法选择一个列, 但是这个 Column 是绑定了这个 Dataset 的, 所以只能用于创建其的 Dataset 上val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate() val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() val c5: sql.Column = personDF.col("name")
    Dataset.apply可以通过 Dataset 对象的 apply 方法来获取一个关联此 Dataset 的 Column 对象val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate() val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() val c6: sql.Column = personDF.apply("name")``apply 的调用有一个简写形式val c7: sql.Column = personDF("name")
    别名和转换as[Type]as 方法有两个用法, 通过 as[Type] 的形式可以将一个列中数据的类型转为 Type 类型personDF.select(col("age").as[Long]).show()
    as(name)通过 as(name) 的形式使用 as 方法可以为列创建别名personDF.select(col("age").as("age_new")).show()
    添加列withColumn通过 Column 在添加一个新的列时候修改 Column 所代表的列的数据personDF.withColumn("double_age", 'age * 2).show()
    操作like通过 Column 的 API, 可以轻松实现 SQL 语句中 LIKE 的功能personDF.filter('name like "%zhang%").show()
    isin通过 Column 的 API, 可以轻松实现 SQL 语句中 ISIN 的功能personDF.filter('name isin ("hello", "zhangsan")).show()
    sort在排序的时候, 可以通过 Column 的 API 实现正反序personDF.sort('age.asc).show() personDF.sort('age.desc).show()

    缺失值处理

    缺失值的处理思路

    如果想探究如何处理无效值, 首先要知道无效值从哪来, 从而分析可能产生的无效值有哪些类型, 在分别去看如何处理无效值

    什么是缺失值

    一个值本身的含义是这个值不存在则称之为缺失值, 也就是说这个值本身代表着缺失, 或者这个值本身无意义, 比如说 null, 比如说空字符串

    20190527220736

    关于数据的分析其实就是统计分析的概念, 如果这样的话, 当数据集中存在缺失值, 则无法进行统计和分析, 对很多操作都有影响

    缺失值如何产生的

    20190527215718

    Spark 大多时候处理的数据来自于业务系统中, 业务系统中可能会因为各种原因, 产生一些异常的数据

    例如说因为前后端的判断失误, 提交了一些非法参数. 再例如说因为业务系统修改 MySQL 表结构产生的一些空值数据等. 总之在业务系统中出现缺失值其实是非常常见的一件事, 所以大数据系统就一定要考虑这件事.

    缺失值的类型

    常见的缺失值有两种

    • nullNaN 等特殊类型的值, 某些语言中 null 可以理解是一个对象, 但是代表没有对象, NaN是一个数字, 可以代表不是数字

      针对这一类的缺失值, Spark 提供了一个名为 DataFrameNaFunctions 特殊类型来操作和处理

    • "Null""NA"" " 等解析为字符串的类型, 但是其实并不是常规字符串数据

      针对这类字符串, 需要对数据集进行采样, 观察异常数据, 总结经验, 各个击破

    1. DataFrameNaFunctions

    DataFrameNaFunctions 使用 Dataset 的 na 函数来获取

    1. val df = ...
    2. val naFunc: DataFrameNaFunctions = df.na

    当数据集中出现缺失值的时候, 大致有两种处理方式, 一个是丢弃, 一个是替换为某值, DataFrameNaFunctions 中包含一系列针对空值数据的方案

    • DataFrameNaFunctions.drop 可以在当某行中包含 null 或 NaN 的时候丢弃此行
    • DataFrameNaFunctions.fill 可以在将 null 和 NaN 充为其它值
    • DataFrameNaFunctions.replace 可以把 null 或 NaN 替换为其它值, 但是和 fill 略有一些不同, 这个方法针对值来进行替换

    如何使用 SparkSQL 处理 null 和 NaN ?

    首先要将数据读取出来, 此次使用的数据集直接存在 NaN, 在指定 Schema 后, 可直接被转为 Double.NaN

    1. val schema = StructType(
    2. List(
    3. StructField("id", IntegerType),
    4. StructField("year", IntegerType),
    5. StructField("month", IntegerType),
    6. StructField("day", IntegerType),
    7. StructField("hour", IntegerType),
    8. StructField("season", IntegerType),
    9. StructField("pm", DoubleType)
    10. )
    11. )
    12. val df = spark.read
    13. .option("header", value = true)
    14. .schema(schema)
    15. .csv("dataset/beijingpm_with_nan.csv")

    对于缺失值的处理一般就是丢弃和填充

    • 丢弃包含 null 和 NaN 的行

      当某行数据所有值都是 null 或者 NaN 的时候丢弃此行df.na.drop("all").show()当某行中特定列所有值都是 null 或者 NaN 的时候丢弃此行df.na.drop("all", List("pm", "id")).show()当某行数据任意一个字段为 null 或者 NaN 的时候丢弃此行df.na.drop().show() df.na.drop("any").show()当某行中特定列任意一个字段为 null 或者 NaN 的时候丢弃此行df.na.drop(List("pm", "id")).show() df.na.drop("any", List("pm", "id")).show()

    • 填充包含 null 和 NaN 的列

      填充所有包含 null 和 NaN 的列df.na.fill(0).show()填充特定包含 null 和 NaN 的列df.na.fill(0, List("pm")).show()根据包含 null 和 NaN 的列的不同来填充import scala.collection.JavaConverters._ df.na.fill(Map[String, Any]("pm" -> 0).asJava).show

    如何使用 SparkSQL 处理异常字符串 ?

    读取数据集, 这次读取的是最原始的那个 PM 数据集

    1. val df = spark.read
    2. .option("header", value = true)
    3. .csv("dataset/BeijingPM20100101_20151231.csv")

    使用函数直接转换非法的字符串

    1. df.select('No as "id", 'year, 'month, 'day, 'hour, 'season,
    2. when('PM_Dongsi === "NA", 0)
    3. .otherwise('PM_Dongsi cast DoubleType)
    4. .as("pm"))
    5. .show()

    使用 where 直接过滤

    1. df.select('No as "id", 'year, 'month, 'day, 'hour, 'season, 'PM_Dongsi)
    2. .where('PM_Dongsi =!= "NA")
    3. .show()

    使用 DataFrameNaFunctions 替换, 但是这种方式被替换的值和新值必须是同类型

    1. df.select('No as "id", 'year, 'month, 'day, 'hour, 'season, 'PM_Dongsi)
    2. .na.replace("PM_Dongsi", Map("NA" -> "NaN"))
    3. .show()

    聚合

    groupBy(单聚合

    groupBy 算子会按照列将 Dataset 分组, 并返回一个 RelationalGroupedDataset 对象, 通过 RelationalGroupedDataset 可以对分组进行聚合

    加载实验数据

    1. private val spark = SparkSession.builder()
    2. .master("local[6]")
    3. .appName("aggregation")
    4. .getOrCreate()
    5. import spark.implicits._
    6. private val schema = StructType(
    7. List(
    8. StructField("id", IntegerType),
    9. StructField("year", IntegerType),
    10. StructField("month", IntegerType),
    11. StructField("day", IntegerType),
    12. StructField("hour", IntegerType),
    13. StructField("season", IntegerType),
    14. StructField("pm", DoubleType)
    15. )
    16. )
    17. private val pmDF = spark.read
    18. .schema(schema)
    19. .option("header", value = true)
    20. .csv("dataset/pm_without_null.csv")

    使用 functions 函数进行聚合

    1. import org.apache.spark.sql.functions._
    2. val groupedDF: RelationalGroupedDataset = pmDF.groupBy('year)
    3. groupedDF.agg(avg('pm) as "pm_avg")
    4. .orderBy('pm_avg)
    5. .show()

    除了使用 functions 进行聚合, 还可以直接使用 RelationalGroupedDataset 的 API 进行聚合

    1. groupedDF.avg("pm")
    2. .orderBy('pm_avg)
    3. .show()
    4. groupedDF.max("pm")
    5. .orderBy('pm_avg)
    6. .show()

    多维聚合

    我们可能经常需要针对数据进行多维的聚合, 也就是一次性统计小计, 总计等, 一般的思路如下

    准备数据

    1. private val spark = SparkSession.builder()
    2. .master("local[6]")
    3. .appName("aggregation")
    4. .getOrCreate()
    5. import spark.implicits._
    6. private val schemaFinal = StructType(
    7. List(
    8. StructField("source", StringType),
    9. StructField("year", IntegerType),
    10. StructField("month", IntegerType),
    11. StructField("day", IntegerType),
    12. StructField("hour", IntegerType),
    13. StructField("season", IntegerType),
    14. StructField("pm", DoubleType)
    15. )
    16. )
    17. private val pmFinal = spark.read
    18. .schema(schemaFinal)
    19. .option("header", value = true)
    20. .csv("dataset/pm_final.csv")

    进行多维度聚合

    1. import org.apache.spark.sql.functions._
    2. val groupPostAndYear = pmFinal.groupBy('source, 'year)
    3. .agg(sum("pm") as "pm")
    4. val groupPost = pmFinal.groupBy('source)
    5. .agg(sum("pm") as "pm")
    6. //结果集中添加一列可以使用lit ,
    7. .select('source, lit(null) as "year", 'pm)
    8. groupPostAndYear.union(groupPost)
    9. .sort('source, 'year asc_nulls_last, 'pm)
    10. .show()

    rollup 操作符(在单聚合的基础上多一个总聚合

    rollup 操作符其实就是 groupBy 的一个扩展, rollup 会对传入的列进行滚动 groupBygroupBy 的次数为列数量 + 1, 最后一次是对整个数据集进行聚合

    创建数据集

    1. import org.apache.spark.sql.functions._
    2. val sales = Seq(
    3. ("Beijing", 2016, 100),
    4. ("Beijing", 2017, 200),
    5. ("Shanghai", 2015, 50),
    6. ("Shanghai", 2016, 150),
    7. ("Guangzhou", 2017, 50)
    8. ).toDF("city", "year", "amount")

    rollup 的操作

    1. sales.rollup("city", "year")
    2. .agg(sum("amount") as "amount")
    3. .sort($"city".desc_nulls_last, $"year".asc_nulls_last)
    4. .show()
    5. /**
    6. * 结果集:
    7. * +---------+----+------+
    8. * | city|year|amount|
    9. * +---------+----+------+
    10. * | Shanghai|2015| 50| <-- 上海 2015 的小计
    11. * | Shanghai|2016| 150|
    12. * | Shanghai|null| 200| <-- 上海的总计
    13. * |Guangzhou|2017| 50|
    14. * |Guangzhou|null| 50|
    15. * | Beijing|2016| 100|
    16. * | Beijing|2017| 200|
    17. * | Beijing|null| 300|
    18. * | null|null| 550| <-- 整个数据集的总计
    19. * +---------+----+------+
    20. */
    21. import org.apache.spark.sql.functions._
    22. sales.rollup("city","year")
    23. .agg(sum('amount)as("amount_sum"))
    24. .sort($"city".desc_nulls_last,$"year".asc_nulls_last)
    25. .createOrReplaceTempView("test1")
    26. spark.sql("select t.*,null test from test1 t").show()
    27. /*
    28. *
    29. +---------+----+----------+----+
    30. | city|year|amount_sum|test|
    31. +---------+----+----------+----+
    32. | Shanghai|2015| 50|null|
    33. | Shanghai|2016| 150|null|
    34. | Shanghai|null| 200|null|
    35. |Guangzhou|2017| 50|null|
    36. |Guangzhou|null| 50|null|
    37. | Beijing|2016| 100|null|
    38. | Beijing|2017| 200|null|
    39. | Beijing|null| 300|null|
    40. | null|null| 550|null|
    41. +---------+----+----------+----+
    42. */

    如果使用基础的 groupBy 如何实现效果?

    1. val cityAndYear = sales
    2. .groupBy("city", "year") // 按照 city 和 year 聚合
    3. .agg(sum("amount") as "amount")
    4. val city = sales
    5. .groupBy("city") // 按照 city 进行聚合
    6. .agg(sum("amount") as "amount")
    7. .select($"city", lit(null) as "year", $"amount")
    8. val all = sales
    9. .groupBy() // 全局聚合
    10. .agg(sum("amount") as "amount")
    11. .select(lit(null) as "city", lit(null) as "year", $"amount")
    12. cityAndYear
    13. .union(city)
    14. .union(all)
    15. .sort($"city".desc_nulls_last, $"year".asc_nulls_last)
    16. .show()
    17. /**
    18. * 统计结果:
    19. * +---------+----+------+
    20. * | city|year|amount|
    21. * +---------+----+------+
    22. * | Shanghai|2015| 50|
    23. * | Shanghai|2016| 150|
    24. * | Shanghai|null| 200|
    25. * |Guangzhou|2017| 50|
    26. * |Guangzhou|null| 50|
    27. * | Beijing|2016| 100|
    28. * | Beijing|2017| 200|
    29. * | Beijing|null| 300|
    30. * | null|null| 550|
    31. * +---------+----+------+
    32. */

    很明显可以看到, 在上述案例中, rollup 就相当于先按照 cityyear 进行聚合, 后按照 city 进行聚合, 最后对整个数据集进行聚合, 在按照 city 聚合时, year 列值为 null, 聚合整个数据集的时候, 除了聚合列, 其它列值都为 null


    cube(在rollup的总聚合上再多出中间的其他聚合)

    cube 的功能和 rollup 是一样的, 但也有区别, 区别如下

    • rollup(A, B).sum©

      其结果集中会有三种数据形式: A B CA null Cnull null C

      不知道大家发现没, 结果集中没有对 B 列的聚合结果

    • cube(A, B).sum©

      其结果集中会有四种数据形式: A B CA null Cnull null Cnull B C

      不知道大家发现没, 比 rollup 的结果集中多了一个 null B C, 也就是说, rollup 只会按照第一个列来进行组合聚合, 但是 cube 会将全部列组合聚合

    1. import org.apache.spark.sql.functions._
    2. pmFinal.cube('source, 'year)
    3. .agg(sum("pm") as "pm_total")
    4. .sort('source.asc_nulls_last, 'year.asc_nulls_last)
    5. .show()
    6. /**
    7. * 结果集为
    8. *
    9. * +-------+----+---------+
    10. * | source|year| pm_total|
    11. * +-------+----+---------+
    12. * | dongsi|2013| 735606.0|
    13. * | dongsi|2014| 745808.0|
    14. * | dongsi|2015| 752083.0|
    15. * | dongsi|null|2233497.0|
    16. * |us_post|2010| 841834.0|
    17. * |us_post|2011| 796016.0|
    18. * |us_post|2012| 750838.0|
    19. * |us_post|2013| 882649.0|
    20. * |us_post|2014| 846475.0|
    21. * |us_post|2015| 714515.0|
    22. * |us_post|null|4832327.0|
    23. * | null|2010| 841834.0| <-- 新增
    24. * | null|2011| 796016.0| <-- 新增
    25. * | null|2012| 750838.0| <-- 新增
    26. * | null|2013|1618255.0| <-- 新增
    27. * | null|2014|1592283.0| <-- 新增
    28. * | null|2015|1466598.0| <-- 新增
    29. * | null|null|7065824.0|
    30. * +-------+----+---------+
    31. */

    join连接

    例子:

    1. val person = Seq((0, "Lucy", 0), (1, "Lily", 0), (2, "Tim", 2), (3, "Danial", 0))
    2. .toDF("id", "name", "cityId")
    3. val cities = Seq((0, "Beijing"), (1, "Shanghai"), (2, "Guangzhou"))
    4. .toDF("id", "name")
    5. person.join(cities, person.col("cityId") === cities.col("id"))
    6. .select(person.col("id"),
    7. person.col("name"),
    8. cities.col("name") as "city")
    9. .show()
    10. /**
    11. * 执行结果:
    12. *
    13. * +---+------+---------+
    14. * | id| name| city|
    15. * +---+------+---------+
    16. * | 0| Lucy| Beijing|
    17. * | 1| Lily| Beijing|
    18. * | 2| Tim|Guangzhou|
    19. * | 3|Danial| Beijing|
    20. * +---+------+---------+
    21. */

    连接类型

    准备

    1. private val spark = SparkSession.builder()
    2. .master("local[6]")
    3. .appName("aggregation")
    4. .getOrCreate()
    5. import spark.implicits._
    6. val person = Seq((0, "Lucy", 0), (1, "Lily", 0), (2, "Tim", 2), (3, "Danial", 3))
    7. .toDF("id", "name", "cityId")
    8. person.createOrReplaceTempView("person")
    9. val cities = Seq((0, "Beijing"), (1, "Shanghai"), (2, "Guangzhou"))
    10. .toDF("id", "name")
    11. cities.createOrReplaceTempView("cities")
    连接类型类型字段解释
    交叉连接cross解释交叉连接就是笛卡尔积, 就是两个表中所有的数据两两结对交叉连接是一个非常重的操作, 在生产中, 尽量不要将两个大数据集交叉连接, 如果一定要交叉连接, 也需要在交叉连接后进行过滤, 优化器会进行优化20190529120732SQL 语句select * from person cross join cities``Dataset 操作person.crossJoin(cities) .where(person.col("cityId") === cities.col("id")) .show()
    内连接inner解释内连接就是按照条件找到两个数据集关联的数据, 并且在生成的结果集中只存在能关联到的数据20190529115831SQL 语句select * from person inner join cities on person.cityId = cities.id``Dataset 操作person.join(right = cities, joinExprs = person("cityId") === cities("id"), joinType = "inner") .show()
    全外连接outerfullfullouter解释内连接和外连接的最大区别, 就是内连接的结果集中只有可以连接上的数据, 而外连接可以包含没有连接上的数据, 根据情况的不同, 外连接又可以分为很多种, 比如所有的没连接上的数据都放入结果集, 就叫做全外连接20190529120033SQL 语句select * from person full outer join cities on person.cityId = cities.id``Dataset 操作person.join(right = cities, joinExprs = person("cityId") === cities("id"), joinType = "full") // "outer", "full", "full_outer" .show()
    左外连接leftouterleft解释左外连接是全外连接的一个子集, 全外连接中包含左右两边数据集没有连接上的数据, 而左外连接只包含左边数据集中没有连接上的数据20190529120139SQL 语句select * from person left join cities on person.cityId = cities.id``Dataset 操作person.join(right = cities, joinExprs = person("cityId") === cities("id"), joinType = "left") // leftouter, left .show()
    LeftAntileftanti解释LeftAnti 是一种特殊的连接形式, 和左外连接类似, 但是其结果集中没有右侧的数据, 只包含左边集合中没连接上的数据20190529120454SQL 语句select * from person left anti join cities on person.cityId = cities.id``Dataset 操作person.join(right = cities, joinExprs = person("cityId") === cities("id"), joinType = "left_anti") .show()
    LeftSemileftsemi解释和 LeftAnti 恰好相反, LeftSemi 的结果集也没有右侧集合的数据, 但是只包含左侧集合中连接上的数据20190529120406SQL 语句select * from person left semi join cities on person.cityId = cities.id``Dataset 操作person.join(right = cities, joinExprs = person("cityId") === cities("id"), joinType = "left_semi") .show()
    右外连接rightouterright解释右外连接和左外连接刚好相反, 左外是包含左侧未连接的数据, 和两个数据集中连接上的数据, 而右外是包含右侧未连接的数据, 和两个数据集中连接上的数据20190529120222SQL 语句select * from person right join cities on person.cityId = cities.id``Dataset 操作person.join(right = cities, joinExprs = person("cityId") === cities("id"), joinType = "right") // rightouter, right .show()

    UDF

    //需求:将id不满8位,补足8位[补0]

    test.json

    1. {"id":"1","name":"a","clazz":1,"score":80}
    2. {"id":"02","name":"b","clazz":1,"score":78}
    3. {"id":"003","name":"c","clazz":1,"score":95}
    4. {"id":"05","name":"d","clazz":2,"score":74}
    5. {"id":"06","name":"e","clazz":2,"score":92}
    6. {"id":"7","name":"f","clazz":3,"score":99}
    7. {"id":"8","name":"g","clazz":3,"score":99}
    8. {"id":"9","name":"h","clazz":3,"score":45}
    9. {"id":"10","name":"i","clazz":3,"score":55}
    10. {"id":"11","name":"j","clazz":3,"score":78}

    UDFtest.scala

    1. import org.apache.spark.sql.SparkSession
    2. /** *
    3. *
    4. * @Author : Le
    5. * @CreatDate : 2019/9/15
    6. * **/
    7. object test1 {
    8. def main(args: Array[String]): Unit = {
    9. //需求:将id不满8位,补足8位[补0]
    10. //1、创建SparkSession
    11. val spark = SparkSession.builder().master("local[3]").appName("test").getOrCreate()
    12. //2、读取json文件
    13. spark.read.json("dataset/test.json").createOrReplaceTempView("student")
    14. //3、自定义udf
    15. def increPrfix(id:String):String={
    16. "0"*(8-id.length)+id
    17. }
    18. //4、注册udf函数
    19. spark.udf.register("increPrfix",increPrfix _)
    20. //5、使用
    21. spark.sql("select increPrfix(id) id,name,clazz,score from student").show
    22. }

    UDAFtest

    1. import org.apache.spark.sql.SparkSession
    2. object UdafTest {
    3. def main(args: Array[String]): Unit = {
    4. //1、创建SparkSession
    5. val spark = SparkSession.builder().master("local[3]").appName("test").getOrCreate()
    6. //2、读取json文件,创建中间表
    7. spark.read.json("data/test.json").createOrReplaceTempView("student")
    8. //3、注册udaf函数 ,需要定义一个新的类
    9. spark.udf.register("myAvg",new MyUdaf)
    10. //4、使用
    11. spark.sql("select myAvg(score) from student").show
    12. }
    13. }

    MyUdaf.scala

    1. import org.apache.spark.sql.Row
    2. import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
    3. import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, StructType}
    4. /**
    5. * 自定义udaf函数,实现avg功能
    6. */
    7. class MyUdaf extends UserDefinedAggregateFunction{
    8. //输入值的类型 指定输入值的类型
    9. override def inputSchema: StructType = {
    10. new StructType().add("input",DoubleType)
    11. }
    12. //缓冲区 指在计算过程中需要用到的中间变量[需要用到两个中间变量:一个是输入的总和,一个是输入的总条数]
    13. override def bufferSchema: StructType = {
    14. new StructType().add("sum",DoubleType).add("total",IntegerType)
    15. }
    16. //指最终的计算结果的类型
    17. override def dataType: DataType = {
    18. DoubleType
    19. }
    20. //指数据的一致性 一般赋值true
    21. override def deterministic: Boolean = true
    22. //初始化缓冲区 也就是指定中间变量的初始值
    23. override def initialize(buffer: MutableAggregationBuffer): Unit = {
    24. //指定sum的初始值为0
    25. buffer(0) = 0.0
    26. //指定total的初始值为0
    27. buffer(1) = 0
    28. }
    29. //进入一条数据就对对中间变量进行更新
    30. override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    31. //更新sum = 缓冲区中的sum+传进来的score
    32. buffer(0) = buffer.getDouble(0) + input.getDouble(0)
    33. //更新total = 缓冲区中的total+1
    34. buffer(1) = buffer.getInt(1) + 1
    35. }
    36. //合并缓冲区的数据
    37. override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    38. buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)
    39. buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)
    40. }
    41. //计算最终结果 = sum/total
    42. override def evaluate(buffer: Row): Any = {
    43. buffer.getDouble(0) / buffer.getInt(1)
    44. }
    45. }

    开窗函数

    1. import org.apache.spark.sql.SparkSession
    2. object Window {
    3. def main(args: Array[String]): Unit = {
    4. /**
    5. * 常用开窗函数:(最常用的应该是1.2.3 的排序)
    6. * --排序函数
    7. * 1、row_number() over(partition by ... order by ...)
    8. * 2、rank() over(partition by ... order by ...)
    9. * 3、dense_rank() over(partition by ... order by ...)
    10. * --聚合函数
    11. * 4、count() over(partition by ... order by ...)
    12. * 5、max() over(partition by ... order by ...)
    13. * 6、min() over(partition by ... order by ...)
    14. * 7、sum() over(partition by ... order by ...)
    15. * 8、avg() over(partition by ... order by ...)
    16. * 9、first_value() over(partition by ... order by ...)
    17. * 10、last_value() over(partition by ... order by ...)
    18. */
    19. //需求: 获得班级成绩前两名的学生信息
    20. //1、创建SparkSession
    21. val spark = SparkSession.builder().master("local[3]").appName("test").getOrCreate()
    22. //2、读取文件注册成临时表
    23. spark.read.json("data/test.json").createOrReplaceTempView("student")
    24. //3、使用开窗函数获得学生信息
    25. //row_number rank dense_rank必须要指定order by
    26. spark.sql(
    27. """
    28. |select t.id,t.name,t.clazz,t.score from(
    29. |select s.id,s.name,s.clazz,s.score,row_number() over(partition by s.clazz order by s.score) rn
    30. | from student s) t where t.rn<=2
    31. """.stripMargin).show
    32. /**
    33. * {"id":"003","name":"c","clazz":1,"score":95} 1
    34. * {"id":"1","name":"a","clazz":1,"score":80} 2
    35. * {"id":"02","name":"b","clazz":1,"score":78} 3
    36. *
    37. * {"id":"06","name":"e","clazz":2,"score":92} 1
    38. * {"id":"05","name":"d","clazz":2,"score":74} 2
    39. *
    40. * {"id":"7","name":"f","clazz":3,"score":99} 1
    41. * {"id":"8","name":"g","clazz":3,"score":99} 2
    42. * {"id":"11","name":"j","clazz":3,"score":78} 3
    43. * {"id":"10","name":"i","clazz":3,"score":55} 4
    44. * {"id":"9","name":"h","clazz":3,"score":45} 5
    45. *
    46. */
    47. spark.sql(
    48. """select t.id,t.name,t.clazz,t.score from(
    49. |select s.*,rank() over(partition by s.clazz order by s.score desc) rn
    50. | from student s) t where t.rn<=2
    51. """.stripMargin)//.show
    52. /**
    53. * +-----+---+----+-----+---+
    54. * |clazz| id|name|score| rn|
    55. * +-----+---+----+-----+---+
    56. * | 1|003| c| 95| 1|
    57. * | 1| 1| a| 80| 2|
    58. * | 1| 02| b| 78| 3|
    59. *
    60. * | 3| 7| f| 99| 1|
    61. * | 3| 8| g| 99| 1|2
    62. * | 3| 8| g| 99| 1|3
    63. * | 3| 11| j| 78| 4|
    64. * | 3| 10| i| 55| 5|
    65. * | 3| 9| h| 45| 6|
    66. *
    67. * | 2| 06| e| 92| 1|
    68. * | 2| 05| d| 74| 2|
    69. */
    70. spark.sql(
    71. """
    72. |select s.*,dense_rank() over(partition by s.clazz order by s.score desc) rn
    73. | from student s
    74. """.stripMargin)//.show
    75. /**
    76. * +-----+---+----+-----+---+
    77. * |clazz| id|name|score| rn|
    78. * +-----+---+----+-----+---+
    79. * | 1|003| c| 95| 1|
    80. * | 1| 1| a| 80| 2|
    81. * | 1| 02| b| 78| 3|
    82. *
    83. * | 3| 7| f| 99| 1|
    84. * | 3| 8| g| 99| 1|
    85. * | 3| 11| j| 78| 2|
    86. * | 3| 10| i| 55| 3|
    87. * | 3| 9| h| 45| 4|
    88. *
    89. * | 2| 06| e| 92| 1|
    90. * | 2| 05| d| 74| 2|
    91. * +-----+---+----+-----+---+
    92. */
    93. //聚合与开窗函数结合的时候
    94. // 1、聚合函数(需要指定字段)
    95. // 2、over(可以不用指定partition by 与order by),如果不指定就是指全局
    96. spark.sql(
    97. """
    98. |select s.*,max(s.score) over() max_score
    99. | from student s
    100. """.stripMargin)//.show
    101. /**
    102. * +-----+---+----+-----+---------+
    103. * |clazz| id|name|score|max_score|
    104. * +-----+---+----+-----+---------+
    105. * | 1| 1| a| 80| 95|
    106. * | 1| 02| b| 78| 95|
    107. * | 1|003| c| 95| 95|
    108. * | 3| 7| f| 99| 99|
    109. * | 3| 8| g| 99| 99|
    110. * | 3| 9| h| 45| 99|
    111. * | 3| 10| i| 55| 99|
    112. * | 3| 11| j| 78| 99|
    113. * | 2| 05| d| 74| 92|
    114. * | 2| 06| e| 92| 92|
    115. * +-----+---+----+-----+---------+
    116. */
    117. }
    118. }

    :_* ` 把数组集合中的数据遍历到方法参数中

    sql

    1. name,price,crawl_time,market,province,city table
    2. 1、每个省份农产品市场的个数
    3. select provice,count(distinct market) from table group by provice
    4. 2、没有农产品市场的省份
    5. select b.* from table a right join table2 on a.province = b.province
    6. where a.province is null
    7. 3、根据农产品类型数据,统计前三名
    8. select name,count(1)
    9. from table group by name
    10. order by count(1) desc
    11. limit 3
    12. 4、根据农产品类型数量,统计每个省份前三名
    13. select province,name from(
    14. select province,name,rank() over(partition by province order by count(*) desc) rn
    15. from table
    16. group by province,name) t where t.rn<=3
    17. 5、计算山西省每种农产品价格波动
    18. select name,(sum(price)-max(price)-min(price))/(count(1)-2) price
    19. from table where provice = '山西'
    20. group by name

    总结

    1. 1、读取
    2. spark.read
    3. format: 指定数据读取的类型
    4. option: 指定读取时的属性:headerinfreschema...
    5. schema: 指定读取后数据的schema信息
    6. load: 加载数据
    7. 简洁:spark.read.csv
    8. 2、写入
    9. df.write.mode(SaveMode.Append).csv
    10. 写入模式:
    11. SaveMode.Append:追加
    12. SaveMode.Overwrite:覆盖
    13. 3parquet:
    14. 读取:
    15. 1spark.read.format("parquet").load
    16. 2spark.read.parquet(目录名/具体文件名)
    17. 写入:
    18. 1df.write.mode(..).parquet(目录名)
    19. 2df.write.partitionBy(分区字段).mode.parquet
    20. 4json:
    21. 读取:
    22. 1spark.read.format("json").load
    23. 2spark.read.json(目录名/具体文件名)
    24. 写入:
    25. df.write.json(...)
    26. DataFrame或者DataSet转为json格式: df.toJson
    27. 读取json格式的RDDspark.read.json(RDD[String])
    28. 5hive
    29. 编程:
    30. 1、指定metastore的地址: hive.meatstore.uris
    31. 2、指定warehouse路径: spark.sql.warehouse.dir
    32. 3、开启hive支持: enableHiveSupport
    33. 读取:spark.sql("select * from hive表")
    34. 写入:df.write.mode.saveAsTable(hive表名)
    35. 6mysql
    36. 读取: spark.read.jdbc(url,table,prop)
    37. 写入: df.write.mode.jdbc(url,table,prop)
    38. 7
    39. 有类型:
    40. 1mapflatMapmapPartitiontransform[函数只有一个参数:DataSet]
    41. 2、将DataFrame转为DataSet: df.as[待转类型]
    42. 3filter
    43. 1filter(函数)
    44. 2filter(sql表达式)
    45. 3filter(column对象)
    46. 4groupByKey(需要指定key)
    47. 5split(Array(5,2,3)) //Array中有几个值就分为几份,Array中的值为每一份的权重
    48. 6orderBy
    49. 5
    50. distinct: 所有列的值都必须相同才能去重
    51. dropDuplicates: 指定列的值都必须相同才能去重
    52. 6、集合
    53. 差集、交集、并集
    54. 无类型:
    55. 1、选择
    56. 1select
    57. 2selectExpr
    58. 2、分组
    59. groupBy
    60. 3Column
    61. 1、创建
    62. 1、无绑定
    63. 1'列名 : import spark.implicats._
    64. 2、$"列名" : import spark.implicats._
    65. 3、col("列名") : import org.apache.spark.sql.functions._
    66. 4、Column("列名") : import org.apache.spark.sql.functions._
    67. 2、有绑定
    68. 1、dataset.col("列名")
    69. 2、dataset.apply("列名")
    70. 2、操作
    71. 1、别名:
    72. col("列名") as "别名"
    73. 2、类型转换
    74. col("列名").as[类型]
    75. 3、其他操作
    76. like
    77. isin
    78. ....
    79. 8、缺失值
    80. 缺失值: null、"",NaN、"Null" 等都叫缺失值
    81. API: df.na
    82. drop:
    83. any:
    84. 如果一行数据中有任意一列的值为NaN或者null就删除该行
    85. all:
    86. 如果一行数据中所有列的值全部为NaN或者null才会删除该行
    87. 针对特定列:
    88. 以上两种规则只针对指定的列
    89. fill:
    90. 对NaN或者null的值进行填充
    91. replace:
    92. 针对指定的值进行替换
    93. 字符串缺失值的处理:
    94. select中使用when
    95. where进行过滤
  • 相关阅读:
    BeanFactory 简介以及它 和FactoryBean的区别
    由kill 和 kill -9 引发的Linux signal 学习
    验证整数和小数的正则表达式
    重构!重构!重构!
    Java常用命令:jps、jstack、jmap、jstat(带有实例教程)
    子网掩码是4个255代表什么?
    常见的访问控制模型 Access Control Policy:RBAC,DAC,MAC,ABAC
    安装驱动
    大话数据治理-01什么是治理,治理什么数据
    提高 nginx 服务器 安全性,稳定性、性能 --经验总结-持续更新
  • 原文地址:https://www.cnblogs.com/leccoo/p/11525214.html
Copyright © 2011-2022 走看看