zoukankan      html  css  js  c++  java
  • Spark学习进度10-DS&DF基础操作

    有类型操作

    flatMap

    通过 flatMap 可以将一条数据转为一个数组, 后再展开这个数组放入 Dataset

     val ds1=Seq("hello spark","hello hadoop").toDS()
        ds1.flatMap(item => item.split(" ")).show()

    map

    可以将数据集中每条数据转为另一种形式

    val ds2=Seq(Person("zhangsan",15),Person("lisi",32)).toDS()
        ds2.map(p => Person(p.name,p.age*2)).show()

    mapPartitions

    mapPartitions 和 map 一样, 但是 map 的处理单位是每条数据, mapPartitions 的处理单位是每个分区

    ds2.mapPartitions(item => {
          val persons = item.map(p => Person(p.name, p.age * 2))
          persons
        }).show()

    transform

    map 和 mapPartitions 以及 transform 都是转换, map 和 mapPartitions 是针对数据, 而 transform 是针对整个数据集, 这种方式最大的区别就是 transform 可以直接拿到 Dataset 进行操作

    @Test
      def transform(): Unit ={
        val ds=spark.range(10)
        ds.transform(dataset => dataset.withColumn("doubleid",'id*2)).show()
      }

    as

    as[Type] 算子的主要作用是将弱类型的 Dataset 转为强类型的 Dataset,

    @Test
      def as(): Unit ={
        val structType = StructType(
          Seq(
            StructField("name", StringType),
            StructField("age", IntegerType),
            StructField("gpa", FloatType)
          )
        )
    
        val sourceDF = spark.read
          .schema(structType)
          .option("delimiter", "	")
          .csv("dataset/studenttab10k")
    
        val dataset = sourceDF.as[(String,Int,Float)]
        dataset.show()
      }

    filter

    用来按照条件过滤数据集

    @Test
      def filter(): Unit ={
        val ds=Seq(Person("zhangsan",15),Person("lisi",32)).toDS()
        ds.filter(person => person.age>20).show()
      }

    groupByKey

    grouByKey 算子的返回结果是 KeyValueGroupedDataset, 而不是一个 Dataset, 所以必须要先经过 KeyValueGroupedDataset 中的方法进行聚合, 再转回 Dataset, 才能使用 Action 得出结果

    @Test
      def groupByKey(): Unit ={
        val ds=Seq(Person("zhangsan",15),Person("zhangsan",20),Person("lisi",32)).toDS()
        val grouped = ds.groupByKey(p => p.name)
        val result: Dataset[(String, Long)] = grouped.count()
        result.show()
      }

    randomSplit

    randomSplit 会按照传入的权重随机将一个 Dataset 分为多个 Dataset, 传入 randomSplit 的数组有多少个权重, 最终就会生成多少个 Dataset, 这些权重的加倍和应该为 1, 否则将被标准化

    @Test
      def randomSplit(): Unit ={
        val ds = spark.range(15)
        val datasets: Array[Dataset[lang.Long]] = ds.randomSplit(Array[Double](2, 3))
        datasets.foreach(dataset => dataset.show())
    
        ds.sample(withReplacement = false, fraction = 0.4).show()
      }

    orderBy

    orderBy 配合 Column 的 API, 可以实现正反序排列

    val ds=Seq(Person("zhangsan",15),Person("zhangsan",20),Person("lisi",32)).toDS()
        ds.orderBy('age.desc).show() //select * from .. order by  .. desc

    sort

    ds.sort('age.asc).show()

    dropDuplicates

    使用 dropDuplicates 可以去掉某一些列中重复的行

     @Test
      def dropDuplicates(): Unit ={
        val ds=Seq(Person("zhangsan",15),Person("zhangsan",20),Person("lisi",32)).toDS()
        ds.distinct().show()
        ds.dropDuplicates("age").show()
      }

    distinct

    根据所有列去重

    ds.distinct().show()

    集合操作

    差集,交集,并集,limit

    @Test
      def collection(): Unit ={
        val ds1=spark.range(1,10)
        val ds2=spark.range(5,15)
    
        //差集
        ds1.except(ds2).show()
    
        //交集
        ds1.intersect(ds2).show()
    
        //并集
        ds1.union(ds2).show()
    
        //limit
        ds1.limit(3).show()
    
      }

    无类型转换

    选择

    select:选择某些列出现在结果集中

    selectExpr :在 SQL 语句中, 经常可以在 select 子句中使用 count(age)rand() 等函数, 在 selectExpr 中就可以使用这样的 SQL 表达式, 同时使用 select 配合 expr 函数也可以做到类似的效果

    @Test
      def select(): Unit ={
    
        val ds=Seq(Person("zhangsan",15),Person("zhangsan",20),Person("lisi",32)).toDS()
        ds.select('name).show()
        ds.selectExpr("sum(age)").show()
    
      }

    withColumn:通过 Column 对象在 Dataset 中创建一个新的列或者修改原来的列

    withColumnRenamed:修改列名

    @Test
      def withcolumn(): Unit ={
        import org.apache.spark.sql.functions._
        val ds=Seq(Person("zhangsan",15),Person("zhangsan",20),Person("lisi",32)).toDS()
        ds.withColumn("random",expr("rand()")).show()
        ds.withColumn("name_new",'name).show()
        ds.withColumn("name_jok",'name === "").show()
    
        ds.withColumnRenamed("name","new_name").show()
    
      }

    剪除

    drop:减掉某列

    @Test
      def drop(): Unit ={
        import spark.implicits._
        val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
        ds.drop('age).show()
      }

    集合

    groupBy:按给定的行进行分组

     @Test
      def groupBy(): Unit ={
        import spark.implicits._
        val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
        ds.groupBy('name).count().show()
      }

    Column 对象

    创建

    val spark=SparkSession.builder().master("local[6]").appName("trans").getOrCreate()
      import spark.implicits._
      @Test
      def column(): Unit ={
    
        import spark.implicits._
        val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDF()
    
        val c1: Symbol ='name
    
        val c2: ColumnName =$"name"
    
        import org.apache.spark.sql.functions._
        val c3: Column =col("name")
        //val c4: Column =column("name")
    
        val c5: Column =personDF.col("name")
    
        val c6: Column =personDF.apply("name")
    
        val c7: Column =personDF("name")
    
        personDF.select(c1).show()
        personDF.where(c1 ==="zhangsan").show()
    
      }

    别名和转换

    @Test
      def as(): Unit ={
        val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDF()
        import org.apache.spark.sql.functions._
        //as 用法1:更换数据类型
        personDF.select(col("age").as[Long]).show()
        personDF.select('age.as[Long]).show()
        //as:用法二
        personDF.select(col("age").as("new_age")).show()
        personDF.select('age as 'new_age ).show()
      }

    添加列

    val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
        //增加一列
        ds.withColumn("doubled",'age *2).show()

    操作

    like:模糊查询;isin:是否含有;sort:排序

    //模糊查询
        ds.where('name like "zhang%").show()
        //排序
        ds.sort('age asc).show()
        //枚举判断
        ds.where('name isin("zhangsan","wangwu","wxlf")).show()

    聚合

    package cn.itcast.spark.sql
    
    import org.apache.spark.sql.{RelationalGroupedDataset, SparkSession}
    import org.apache.spark.sql.types.{DoubleType, IntegerType, StructField, StructType}
    import org.junit.Test
    
    class Aggprocessor {
      val spark=SparkSession.builder().master("local[6]").appName("trans").getOrCreate()
      import spark.implicits._
      @Test
      def groupBy(): Unit ={
    
         val schema = StructType(
          List(
            StructField("id", IntegerType),
            StructField("year", IntegerType),
            StructField("month", IntegerType),
            StructField("day", IntegerType),
            StructField("hour", IntegerType),
            StructField("season", IntegerType),
            StructField("pm", DoubleType)
          )
        )
    
        //读取数据集
        val sourceDF=spark.read
          .schema(schema)
          .option("header",value = true)
          .csv("dataset/beijingpm_with_nan.csv")
    
        //去掉pm为空的
        val clearDF=sourceDF.where('pm =!= Double.NaN)
        //分组
        val groupedDF: RelationalGroupedDataset = clearDF.groupBy('year, 'month)
    
        import org.apache.spark.sql.functions._
        //进行聚合
        groupedDF.agg(avg('pm) as("pm_avg"))
          .orderBy('pm_avg desc)
          .show()
    
        //方法二
        groupedDF.avg("pm")
          .select($"avg(pm)" as "pm_avg")
          .orderBy('pm_avg desc)
          .show()
    
        groupedDF.max("pm").show()
        groupedDF.min("pm").show()
        groupedDF.sum("pm").show()
        groupedDF.count().show()
        groupedDF.mean("pm").show()
    
    
      }
    
    }

    连接

    无类型连接 join

    @Test
      def join(): Unit ={
        val person = Seq((0, "Lucy", 0), (1, "Lily", 0), (2, "Tim", 2), (3, "Danial", 0))
          .toDF("id", "name", "cityId")
    
        val cities = Seq((0, "Beijing"), (1, "Shanghai"), (2, "Guangzhou"))
          .toDF("id", "name")
    
        val df=person.join(cities, person.col("cityId") === cities.col("id"))
          .select(person.col("id"),
            person.col("name"),
            cities.col("name") as "city")
        
          df.createOrReplaceTempView("user_city")
        spark.sql("select id ,name,city from user_city where city=='Beijing'")
          .show()
      }

    连接类型

    交叉连接:cross交叉连接就是笛卡尔积, 就是两个表中所有的数据两两结对

    @Test
      def crossJoin(): Unit ={
    
        person.crossJoin(cities)
          .where(person.col("cityId") === cities.col("id"))
          .show()
    
        spark.sql("select p.id,p.name,c.name from person  p cross join cities c where  p.cityId = c.id")
          .show()
    
      }

    内连接:就是按照条件找到两个数据集关联的数据, 并且在生成的结果集中只存在能关联到的数据

    @Test
      def inner(): Unit ={
        person.join(cities,person.col("cityId")===cities.col("id"),joinType = "inner")
          .show()
    
        spark.sql("select p.id,p.name,c.name from person p inner join cities c on p.cityId=c.id").show()
      }

    全外连接:

    @Test
      def fullOuter(): Unit ={
         person.join(cities,person.col("cityId") === cities.col("id"),joinType = "full")
          .show()
    
        spark.sql("select p.id,p.name,c.name from person p full outer join cities c on p.cityId=c.id")
          .show()
      }

    左外连接

     person.join(cities,person.col("cityId") === cities.col("id"),joinType = "left")
          .show()

    右外连接

    person.join(cities,person.col("cityId") === cities.col("id"),joinType = "right")
          .show()
  • 相关阅读:
    Python爬虫之selenium各种注意报错
    以后的路还很长
    巧学DBhelper
    怎么学习程序语言
    C# winform 实现图片轮播
    P6477 [NOI Online #2 提高组]子序列问题(民间数据) 题解
    NOI Online Round 2 TG 游记
    「EZEC」 Round1 开学信心赛 游记
    P6023 走路 题解
    P6022 快乐水 题解
  • 原文地址:https://www.cnblogs.com/xiaofengzai/p/14273745.html
Copyright © 2011-2022 走看看