zoukankan      html  css  js  c++  java
  • SparkSQL花式查询

     需求:针对personDF中的数据使用SQL和DSL两种方式进行各种查询

    package cn.itcast.sql
    
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    /**
     * Author itcast
     * Desc 演示SparkSQL-SQL和DSL两种方式实现各种查询
     */
    object Demo04_Query {
      def main(args: Array[String]): Unit = {
        //TODO 0.准备环境
        val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]").getOrCreate()
        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")
    
        //TODO 1.加载数据
        val lines: RDD[String] = sc.textFile("data/input/person.txt")
    
        //TODO 2.处理数据
        val personRDD: RDD[Person] = lines.map(line => {
          val arr: Array[String] = line.split(" ")
          Person(arr(0).toInt, arr(1), arr(2).toInt)
        })
    
        //RDD-->DF
        import spark.implicits._
        val personDF: DataFrame = personRDD.toDF()
        personDF.printSchema()
        personDF.show()
        /*
    root
     |-- id: integer (nullable = false)
     |-- name: string (nullable = true)
     |-- age: integer (nullable = false)
    
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  1|zhangsan| 20|
    |  2|    lisi| 29|
    |  3|  wangwu| 25|
    |  4| zhaoliu| 30|
    |  5|  tianqi| 35|
    |  6|    kobe| 40|
    +---+--------+---+
         */
    
        //TODO ===========SQL==============
        //注册表名
        //personDF.registerTempTable("")//过期的
        //personDF.createOrReplaceGlobalTempView("")//创建全局的,夸SparkSession也可以用,但是生命周期太长!
        personDF.createOrReplaceTempView("t_person")//创建临时的,当前SparkSession也可以用
    
        //=1.查看name字段的数据
        spark.sql("select name from t_person").show()
        //=2.查看 name 和age字段数据
        spark.sql("select name,age from t_person").show()
        //=3.查询所有的name和age,并将age+1
        spark.sql("select name,age,age+1 from t_person").show()
        //=4.过滤age大于等于25的
        spark.sql("select name,age from t_person where age >= 25").show()
        //=5.统计年龄大于30的人数
        spark.sql("select count(*) from t_person where age > 30").show()
        //=6.按年龄进行分组并统计相同年龄的人数
        spark.sql("select age,count(*) from t_person group by age").show()
        //=7.查询姓名=张三的
        spark.sql("select * from t_person where name = 'zhangsan'").show()
    
        //TODO ===========DSL:面向对象的SQL==============
        //=1.查看name字段的数据
        //personDF.select(personDF.col("name"))
        personDF.select("name").show()
        //=2.查看 name 和age字段数据
        personDF.select("name","age").show()
        //=3.查询所有的name和age,并将age+1
        //personDF.select("name","age","age+1").show()//错误的:cannot resolve '`age+1`' given input columns: [age, id, name];;
        //注意$是把字符串转为了Column列对象
        personDF.select($"name",$"age",$"age" + 1).show()
        //注意'是把列名转为了Column列对象
        personDF.select('name,'age,'age + 1).show()
        //=4.过滤age大于等于25的
        personDF.filter("age >= 25").show()
        personDF.filter($"age" >= 25).show()
        personDF.filter('age >= 25).show()
        //=5.统计年龄大于30的人数
        val count: Long = personDF.where('age > 30).count() //where底层filter
        println("年龄大于30的人数为:"+count)
        //=6.按年龄进行分组并统计相同年龄的人数
        personDF.groupBy('age).count().show()
        //=7.查询姓名=张三的
        personDF.filter("name = 'zhangsan'").show()
        personDF.filter($"name"==="zhangsan").show()
        personDF.filter('name ==="zhangsan").show()
        personDF.filter('name =!="zhangsan").show()
    
        //TODO 3.输出结果
        //TODO 4.关闭资源
        spark.stop()
      }
      case class Person(id:Int,name:String,age:Int)
    }

    结果:

    root
     |-- id: integer (nullable = false)
     |-- name: string (nullable = true)
     |-- age: integer (nullable = false)
    
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  1|zhangsan| 20|
    |  2|    lisi| 29|
    |  3|  wangwu| 25|
    |  4| zhaoliu| 30|
    |  5|  tianqi| 35|
    |  6|    kobe| 40|
    +---+--------+---+
    
    +--------+
    |    name|
    +--------+
    |zhangsan|
    |    lisi|
    |  wangwu|
    | zhaoliu|
    |  tianqi|
    |    kobe|
    +--------+
    
    +--------+---+
    |    name|age|
    +--------+---+
    |zhangsan| 20|
    |    lisi| 29|
    |  wangwu| 25|
    | zhaoliu| 30|
    |  tianqi| 35|
    |    kobe| 40|
    +--------+---+
    
    +--------+---+---------+
    |    name|age|(age + 1)|
    +--------+---+---------+
    |zhangsan| 20|       21|
    |    lisi| 29|       30|
    |  wangwu| 25|       26|
    | zhaoliu| 30|       31|
    |  tianqi| 35|       36|
    |    kobe| 40|       41|
    +--------+---+---------+
    
    +-------+---+
    |   name|age|
    +-------+---+
    |   lisi| 29|
    | wangwu| 25|
    |zhaoliu| 30|
    | tianqi| 35|
    |   kobe| 40|
    +-------+---+
    
    +--------+
    |count(1)|
    +--------+
    |       2|
    +--------+
    
    +---+--------+
    |age|count(1)|
    +---+--------+
    | 20|       1|
    | 40|       1|
    | 35|       1|
    | 25|       1|
    | 29|       1|
    | 30|       1|
    +---+--------+
    
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  1|zhangsan| 20|
    +---+--------+---+
    
    +--------+
    |    name|
    +--------+
    |zhangsan|
    |    lisi|
    |  wangwu|
    | zhaoliu|
    |  tianqi|
    |    kobe|
    +--------+
    
    +--------+---+
    |    name|age|
    +--------+---+
    |zhangsan| 20|
    |    lisi| 29|
    |  wangwu| 25|
    | zhaoliu| 30|
    |  tianqi| 35|
    |    kobe| 40|
    +--------+---+
    
    +--------+---+---------+
    |    name|age|(age + 1)|
    +--------+---+---------+
    |zhangsan| 20|       21|
    |    lisi| 29|       30|
    |  wangwu| 25|       26|
    | zhaoliu| 30|       31|
    |  tianqi| 35|       36|
    |    kobe| 40|       41|
    +--------+---+---------+
    
    +--------+---+---------+
    |    name|age|(age + 1)|
    +--------+---+---------+
    |zhangsan| 20|       21|
    |    lisi| 29|       30|
    |  wangwu| 25|       26|
    | zhaoliu| 30|       31|
    |  tianqi| 35|       36|
    |    kobe| 40|       41|
    +--------+---+---------+
    
    +---+-------+---+
    | id|   name|age|
    +---+-------+---+
    |  2|   lisi| 29|
    |  3| wangwu| 25|
    |  4|zhaoliu| 30|
    |  5| tianqi| 35|
    |  6|   kobe| 40|
    +---+-------+---+
    
    +---+-------+---+
    | id|   name|age|
    +---+-------+---+
    |  2|   lisi| 29|
    |  3| wangwu| 25|
    |  4|zhaoliu| 30|
    |  5| tianqi| 35|
    |  6|   kobe| 40|
    +---+-------+---+
    
    +---+-------+---+
    | id|   name|age|
    +---+-------+---+
    |  2|   lisi| 29|
    |  3| wangwu| 25|
    |  4|zhaoliu| 30|
    |  5| tianqi| 35|
    |  6|   kobe| 40|
    +---+-------+---+
    
    年龄大于30的人数为:2
    +---+-----+
    |age|count|
    +---+-----+
    | 20|    1|
    | 40|    1|
    | 35|    1|
    | 25|    1|
    | 29|    1|
    | 30|    1|
    +---+-----+
    
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  1|zhangsan| 20|
    +---+--------+---+
    
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  1|zhangsan| 20|
    +---+--------+---+
    
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  1|zhangsan| 20|
    +---+--------+---+
    
    +---+-------+---+
    | id|   name|age|
    +---+-------+---+
    |  2|   lisi| 29|
    |  3| wangwu| 25|
    |  4|zhaoliu| 30|
    |  5| tianqi| 35|
    |  6|   kobe| 40|
    +---+-------+---+

    案例:

    WordCount

    package cn.itcast.sql
    
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
    
    /**
     * Author itcast
     * Desc 演示SparkSQL-SQL和DSL两种方式实现WordCount
     */
    object Demo05_WordCount {
      def main(args: Array[String]): Unit = {
        //TODO 0.准备环境
        val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]").getOrCreate()
        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")
        import spark.implicits._
    
    
        //TODO 1.加载数据
        val df: DataFrame = spark.read.text("data/input/words.txt")
        val ds: Dataset[String] = spark.read.textFile("data/input/words.txt")
        df.printSchema()
        df.show()
        ds.printSchema()
        ds.show()
        /*
    root
     |-- value: string (nullable = true)
    
    +----------------+
    |           value|
    +----------------+
    |hello me you her|
    |   hello you her|
    |       hello her|
    |           hello|
    +----------------+
         */
        //TODO 2.处理数据
        //df.flatMap(_.split(" "))//注意:df没有泛型,不能直接使用split
        val words: Dataset[String] = ds.flatMap(_.split(" "))
        words.printSchema()
        words.show()
        /*
        root
     |-- value: string (nullable = true)
    
    +-----+
    |value|
    +-----+
    |hello|
    |   me|
    |  you|
    |  her|
    |hello|
    |  you|
    |  her|
    |hello|
    |  her|
    |hello|
    +-----+
         */
        //TODO ===SQL===
        words.createOrReplaceTempView("t_words")
        val sql:String =
          """
            |select value,count(*) as counts
            |from t_words
            |group by value
            |order by counts desc
            |""".stripMargin
        spark.sql(sql).show()
    
        //TODO ===DSL===
        words.groupBy('value)
            .count()
            .orderBy('count.desc)
            .show()
    
        //TODO 3.输出结果
        //TODO 4.关闭资源
        spark.stop()
      }
    }

    案例:电影数据分析

    https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples

    package cn.itcast.sql
    
    
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
    
    /**
     * Author itcast
     * Desc 演示SparkSQL-完成电影数据分析
     */
    object Demo07_MovieDataAnalysis {
      def main(args: Array[String]): Unit = {
        //TODO 0.准备环境
        val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]")
          .config("spark.sql.shuffle.partitions", "4")//本次测试时将分区数设置小一点,实际开发中可以根据集群规模调整大小,默认200
          .getOrCreate()
        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")
        import spark.implicits._
    
        //TODO 1.加载数据
        val ds: Dataset[String] = spark.read.textFile("data/input/rating_100k.data")
    
        //TODO 2.处理数据
        val movieDF: DataFrame = ds.map(line => {
          val arr: Array[String] = line.split("	")
          (arr(1), arr(2).toInt)
        }).toDF("movieId", "score")
        movieDF.printSchema()
        movieDF.show()
        /*
       +-------+-----+
      |movieId|score|
      +-------+-----+
      |    242|    3|
      |    302|    3|
         */
    
        //需求:统计评分次数>200的电影平均分Top10
    
        //TODO ======SQL
        //注册表
        movieDF.createOrReplaceTempView("t_movies")
        val sql: String =
          """
            |select movieId,avg(score) as avgscore,count(*) as counts
            |from t_movies
            |group by movieId
            |having counts > 200
            |order by avgscore desc
            |limit 10
            |""".stripMargin
        spark.sql(sql).show()
        /*
        +-------+------------------+------+
        |movieId|          avgscore|counts|
        +-------+------------------+------+
        |    318| 4.466442953020135|   298|
        |    483|  4.45679012345679|   243|
        |     64| 4.445229681978798|   283|
        |    603|4.3875598086124405|   209|
      .....
         */
    
    
        //TODO ======DSL
        import org.apache.spark.sql.functions._
        movieDF.groupBy('movieId)
          .agg(
            avg('score) as "avgscore",
            count("movieId") as "counts"
          ).filter('counts > 200)
          .orderBy('avgscore.desc)
          .limit(10)
          .show()
    
        //TODO 3.输出结果
        //TODO 4.关闭资源
        spark.stop()
      }
    }

  • 相关阅读:
    [HAOI2015] 按位或
    [CF662C] Binary Table
    逻辑、集合运算上的卷积一览(FMT、FWT,……)
    从零开始的伯努利数
    [LGP2000] 拯救世界
    [BZOJ4180] 字符串计数
    [清华集训2017] 生成树计数
    [CF911G] Mass Change Queries
    微信公众号服务器配置(校验)
    mariadb数据库通过.ibd恢复过程(知道数据库结构的情况下)
  • 原文地址:https://www.cnblogs.com/a155-/p/14504928.html
Copyright © 2011-2022 走看看