zoukankan      html  css  js  c++  java
  • spark SQL学习三

    1 Dataset(DataFrame)的基础操作

    1.1 有类型操作

    分类算子解释

    转换

    flatMap

    通过 flatMap 可以将一条数据转为一个数组, 后再展开这个数组放入 Dataset

    import spark.implicits._
    val ds = Seq("hello world", "hello pc").toDS()
    ds.flatMap( _.split(" ") ).show()

    map

    map 可以将数据集中每条数据转为另一种形式

    import spark.implicits._
    val ds = Seq(Person("zhangsan", 15), Person("lisi", 15)).toDS()
    ds.map( person => Person(person.name, person.age * 2) ).show()

    mapPartitions

    mapPartitions 和 map 一样, 但是 map 的处理单位是每条数据, mapPartitions 的处理单位是每个分区

    import spark.implicits._
    val ds = Seq(Person("zhangsan", 15), Person("lisi", 15)).toDS()
    ds.mapPartitions( iter => {
      val returnValue = iter.map(
      item => Person(item.name, item.age * 2)
      ) returnValue
      } )
      .show()

    transform

    map 和 mapPartitions 以及 transform 都是转换, map 和 mapPartitions 是针对数据, 而 transform 是针对整个数据集, 这种方式最大的区别就是 transform 可以直接拿到 Dataset 进行操作

    import spark.implicits._
    val ds = spark.range(5)
    ds.transform( dataset => dataset.withColumn("doubled", 'id * 2) )

    as

    as[Type] 算子的主要作用是将弱类型的 Dataset 转为强类型的 Dataset, 它有很多适用场景, 但是最常见的还是在读取数据的时候, 因为 DataFrameReader 体系大部分情况下是将读出来的数据转换为 DataFrame 的形式, 如果后续需要使用 Dataset 的强类型 API, 则需要将 DataFrame 转为 Dataset. 可以使用 as[Type] 算子完成这种操作

    import spark.implicits._
    val structType = StructType( Seq( StructField("name", StringType), StructField("age", IntegerType), StructField("gpa", FloatType) ) )
    val sourceDF = spark.read .schema(structType) .option("delimiter", " ") .csv("dataset/studenttab10k")
    val dataset = sourceDF.as[Student] dataset.show()

    过滤

    filter

    filter 用来按照条件过滤数据集

    import spark.implicits._
    val ds = Seq(Person("zhangsan", 15), Person("lisi", 15)).toDS() ds.filter( person => person.name == "lisi" ).show()

    聚合

    groupByKey

    grouByKey 算子的返回结果是 KeyValueGroupedDataset, 而不是一个 Dataset, 所以必须要先经过 KeyValueGroupedDataset 中的方法进行聚合, 再转回 Dataset, 才能使用 Action 得出结果

    其实这也印证了分组后必须聚合的道理

    import spark.implicits._ val ds = Seq(Person("zhangsan", 15), Person("zhangsan", 15), Person("lisi", 15)).toDS() ds.groupByKey( person => person.name ).count().show()

    切分

    randomSplit

    randomSplit 会按照传入的权重随机将一个 Dataset 分为多个 Dataset, 传入 randomSplit 的数组有多少个权重, 最终就会生成多少个 Dataset, 这些权重的加倍和应该为 1, 否则将被标准化

    val ds = spark.range(15) val datasets: Array[Dataset[lang.Long]] = ds.randomSplit(Array[Double](2, 3)) datasets.foreach(dataset => dataset.show())

    sample

    sample 会随机在 Dataset 中抽样

    val ds = spark.range(15) ds.sample(withReplacement = false, fraction = 0.4).show()

    排序

    orderBy

    orderBy 配合 Column 的 API, 可以实现正反序排列

    import spark.implicits._ val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() ds.orderBy("age").show() ds.orderBy('age.desc).show()

    sort

    其实 orderBy 是 sort 的别名, 所以它们所实现的功能是一样的

    import spark.implicits._ val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() ds.sort('age.desc).show()

    分区

    coalesce

    减少分区, 此算子和 RDD 中的 coalesce 不同, Dataset 中的 coalesce 只能减少分区数, coalesce 会直接创建一个逻辑操作, 并且设置 Shuffle 为 false

    val ds = spark.range(15) ds.coalesce(1).explain(true)

    repartitions

    repartitions 有两个作用, 一个是重分区到特定的分区数, 另一个是按照某一列来分区, 类似于 SQL 中的 DISTRIBUTE BY

    val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() ds.repartition(4) ds.repartition('name)

    去重

    dropDuplicates

    使用 dropDuplicates 可以去掉某一些列中重复的行

    import spark.implicits._ val ds = spark.createDataset(Seq(Person("zhangsan", 15), Person("zhangsan", 15), Person("lisi", 15))) ds.dropDuplicates("age").show()

    distinct

    当 dropDuplicates 中没有传入列名的时候, 其含义是根据所有列去重, dropDuplicates() 方法还有一个别名, 叫做 distinct

    所以, 使用 distinct 也可以去重, 并且只能根据所有的列来去重

    import spark.implicits._ val ds = spark.createDataset(Seq(Person("zhangsan", 15), Person("zhangsan", 15), Person("lisi", 15))) ds.distinct().show()

    集合操作

    except

    except 和 SQL 语句中的 except 一个意思, 是求得 ds1 中不存在于 ds2 中的数据, 其实就是差集

    val ds1 = spark.range(1, 10) val ds2 = spark.range(5, 15) ds1.except(ds2).show()

    intersect

    求得两个集合的交集

    val ds1 = spark.range(1, 10) val ds2 = spark.range(5, 15) ds1.intersect(ds2).show()

    union

    求得两个集合的并集

    val ds1 = spark.range(1, 10) val ds2 = spark.range(5, 15) ds1.union(ds2).show()

    limit

    限制结果集数量

    val ds = spark.range(1, 10) ds.limit(3).show()

    1.2 无类型转换

    分类算子解释

    选择

    select

    select 用来选择某些列出现在结果集中

    import spark.implicits._ val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() ds.select($"name").show()

    selectExpr

    在 SQL 语句中, 经常可以在 select 子句中使用 count(age)rand() 等函数, 在 selectExpr 中就可以使用这样的 SQL 表达式, 同时使用 select 配合 expr 函数也可以做到类似的效果

    import spark.implicits._ import org.apache.spark.sql.functions._ val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() ds.selectExpr("count(age) as count").show() ds.selectExpr("rand() as random").show() ds.select(expr("count(age) as count")).show()

    withColumn

    通过 Column 对象在 Dataset 中创建一个新的列或者修改原来的列

    import spark.implicits._ import org.apache.spark.sql.functions._ val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() ds.withColumn("random", expr("rand()")).show()

    withColumnRenamed

    修改列名

    import spark.implicits._ val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() ds.withColumnRenamed("name", "new_name").show()

    剪除

    drop

    剪掉某个列

    import spark.implicits._ val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() ds.drop('age).show()

    聚合

    groupBy

    按照给定的行进行分组

    import spark.implicits._ val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() ds.groupBy('name).count().show()

    1.3 Column对象

    Column 表示了 Dataset 中的一个列, 并且可以持有一个表达式, 这个表达式作用于每一条数据, 对每条数据都生成一个值

    分类操作解释

    创建

    '

    单引号 ' 在 Scala 中是一个特殊的符号, 通过 ' 会生成一个 Symbol 对象, Symbol 对象可以理解为是一个字符串的变种, 但是比字符串的效率高很多, 在 Spark 中, 对 Scala 中的 Symbol 对象做了隐式转换, 转换为一个 ColumnName 对象, ColumnName 是 Column 的子类, 所以在 Spark 中可以如下去选中一个列

    val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate() import spark.implicits._ val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() val c1: Symbol = 'name

    $

    同理, $ 符号也是一个隐式转换, 同样通过 spark.implicits 导入, 通过 $ 可以生成一个 Column 对象

    val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate() import spark.implicits._ val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() val c2: ColumnName = $"name"

    col

    SparkSQL 提供了一系列的函数, 可以通过函数实现很多功能, 在后面课程中会进行详细介绍, 这些函数中有两个可以帮助我们创建 Column 对象, 一个是 col, 另外一个是 column

    val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate() import org.apache.spark.sql.functions._ val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() val c3: sql.Column = col("name")

    column

    val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate() import org.apache.spark.sql.functions._ val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() val c4: sql.Column = column("name")

    Dataset.col

    前面的 Column 对象创建方式所创建的 Column 对象都是 Free 的, 也就是没有绑定任何 Dataset, 所以可以作用于任何 Dataset, 同时, 也可以通过 Dataset 的 col 方法选择一个列, 但是这个 Column 是绑定了这个 Dataset 的, 所以只能用于创建其的 Dataset 上

    val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate() val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() val c5: sql.Column = personDF.col("name")

    Dataset.apply

    可以通过 Dataset 对象的 apply 方法来获取一个关联此 Dataset 的 Column 对象

    val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate() val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS() val c6: sql.Column = personDF.apply("name")

    apply 的调用有一个简写形式

    val c7: sql.Column = personDF("name")

    别名和转换

    as[Type]

    as 方法有两个用法, 通过 as[Type] 的形式可以将一个列中数据的类型转为 Type 类型

    personDF.select(col("age").as[Long]).show()

    as(name)

    通过 as(name) 的形式使用 as 方法可以为列创建别名

    personDF.select(col("age").as("age_new")).show()

    添加列

    withColumn

    通过 Column 在添加一个新的列时候修改 Column 所代表的列的数据

    personDF.withColumn("double_age", 'age * 2).show()

    操作

    like

    通过 Column 的 API, 可以轻松实现 SQL 语句中 LIKE 的功能

    personDF.filter('name like "%zhang%").show()

    isin

    通过 Column 的 API, 可以轻松实现 SQL 语句中 ISIN 的功能

    personDF.filter('name isin ("hello", "zhangsan")).show()

    sort

    在排序的时候, 可以通过 Column 的 API 实现正反序

    personDF.sort('age.asc).show() personDF.sort('age.desc).show()

    1.4 缺失值处理

    处理 null 和 NaN

    删除

    • 当某行数据所有值都是 null 或者 NaN 的时候丢弃此行:df.na.drop("all").show()
    • 当某行中特定列所有值都是 null 或者 NaN 的时候丢弃此行:df.na.drop("all", List("pm", "id")).show()
    • 当某行数据任意一个字段为 null 或者 NaN 的时候丢弃此行:df.na.drop().show()  df.na.drop("any").show()
    • 当某行中特定列任意一个字段为 null 或者 NaN 的时候丢弃此行:df.na.drop(List("pm", "id")).show() df.na.drop("any", List("pm", "id")).show()

    填充

    • 填充所有包含 null 和 NaN 的列:df.na.fill(0).show()
    • 填充特定包含 null 和 NaN 的列:df.na.fill(0, List("pm")).show()

    处理字符串

    • 使用函数直接转换非法的字符串

        df.select('No as "id", 'year, 'month, 'day, 'hour, 'season,
          when('PM_Dongsi === "NA", 0)
          .otherwise('PM_Dongsi cast DoubleType)
          .as("pm"))
          .show()

    • 使用 where 直接过滤

        df.select('No as "id", 'year, 'month, 'day, 'hour, 'season, 'PM_Dongsi)
          .where('PM_Dongsi =!= "NA")
          .show()

    转载自:https://www.cnblogs.com/MoooJL/p/14275061.html

  • 相关阅读:
    Python 存储引擎 数据类型 主键
    Python 数据库
    Python 线程池进程池 异步回调 协程 IO模型
    Python GIL锁 死锁 递归锁 event事件 信号量
    Python 进程间通信 线程
    Python 计算机发展史 多道技术 进程 守护进程 孤儿和僵尸进程 互斥锁
    Python 异常及处理 文件上传事例 UDP socketserver模块
    Python socket 粘包问题 报头
    Django基础,Day7
    Django基础,Day6
  • 原文地址:https://www.cnblogs.com/022414ls/p/14458315.html
Copyright © 2011-2022 走看看