zoukankan      html  css  js  c++  java
  • Spark SQL

    Spark SQL

    DataFrame

    创建DataFrame

    scala> spark
    res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@4e8598d9
    

    查看spark数据源进行创建的文件格式

    scala> spark.read.
    csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile
    

    读取json文件创建DataFrame

    scala> val df = spark.read.json("hdfs://192.168.83.101:9000/datas/2.json")
    res2: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    

    展示结果

    scala> df.show()
    +---+--------+
    |age|    name|
    +---+--------+
    | 20|zhangsan|
    | 20|    lisi|
    | 20|  wangwu|
    +---+--------+
    

    创建临时视图表 (view只能读不能改)

    scala> df.createTempView("emp")
    

    查询临时视图表

    scala> spark.sql("select * from emp").show
    +---+--------+
    |age|    name|
    +---+--------+
    | 20|zhangsan|
    | 20|    lisi|
    | 20|  wangwu|
    +---+--------+
    
    临时表是Session范围内的 ,Session退出后,表就失效了,如果想应用范围内有效,使用全局表
    全局表需要全路径访问,如:global_temp.people
    

    DSL语法风格

    scala> df.printSchema
    root
     |-- age: long (nullable = true)
     |-- name: string (nullable = true)
    

    查看name列数据

    scala> df.select("name").show()
    +--------+
    |    name|
    +--------+
    |zhangsan|
    |    lisi|
    |  wangwu|
    +--------+
    

    查看name列数据以及age+1数据

    scala> df.select($"name",$"age" + 1).show()
    +--------+---------+
    |    name|(age + 1)|
    +--------+---------+
    |zhangsan|       21|
    |    lisi|       21|
    |  wangwu|       21|
    +--------+---------+
    

    查看age小于21的数据

    scala> df.filter($"age" < 21).show()
    +---+--------+
    |age|    name|
    +---+--------+
    | 20|zhangsan|
    | 20|    lisi|
    | 20|  wangwu|
    +---+--------+
    

    按照年龄分组,查看数据条目

    scala> df.groupBy("age").count().show()
    +---+-----+
    |age|count|
    +---+-----+
    | 21|    2|
    | 20|    2|
    +---+-----+
    

    RDD转换为DataFrame

    scala> val rdd = sc.makeRDD(List(1,2,3,4))
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at <console>:24
    
    scala> val df = rdd.toDF("id")
    df: org.apache.spark.sql.DataFrame = [id: int]
    
    scala> df.show()
    +---+
    | id|
    +---+
    |  1|
    |  2|
    |  3|
    |  4|
    +---+
    

    一条RDD转换为DataFrame

    scala> val rdd = sc.makeRDD(List((1,"zhangsan",20),(2,"lisi",30),(3,"wangwu",40)))
    rdd: org.apache.spark.rdd.RDD[(Int, String, Int)] = ParallelCollectionRDD[45] at makeRDD at <console>:24
    
    scala> val df = rdd.toDF("id","name","age")
    df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]
    
    scala> df.show()
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  1|zhangsan| 20|
    |  2|    lisi| 30|
    |  3|  wangwu| 40|
    +---+--------+---+
    

    通过样例类来创建表

    scala> case class People(name:String,age:Int)
    defined class People
    
    scala> val rdd = sc.makeRDD(List(("zhangsan",20),("lisi",30),("wangwu",40)))
    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[55] at makeRDD at <console>:24
    
    scala> val peopleRDD = rdd.map(t=>{People(t._1,t._2)})
    peopleRDD: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[56] at map at <console>:28
    
    scala> peopleRDD.toDF
    res25: org.apache.spark.sql.DataFrame = [name: string, age: int]
    

    DataFrame转换为RDD

    直接调用rdd即可

    scala> df.rdd
    res27: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[64] at rdd at <console>:29
    

    DataSet

    先创建一个样例类

    scala> case class Person(name:String,age:Long)
    defined class Person
    

    创建DataSet

    scala> val caseClassDS = Seq(Person("Andy",32)).toDS()
    caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
    

    访问Dataset (和DataFrame大同小异)

    scala> caseClassDS.show()
    +----+---+
    |name|age|
    +----+---+
    |Andy| 32|
    +----+---+
    

    RDD转换为DataSet

    scala> case class Person(name:String,age:Long)
    defined class Person
    
    scala> val mapRDD = rdd.map(t=>{Person(t._1,t._2)})
    mapRDD: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[65] at map at <console>:28
    
    scala> mapRDD.toDS
    res30: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
    
    scala> mapRDD.toDS.show()
    +--------+---+
    |    name|age|
    +--------+---+
    |zhangsan| 20|
    |    lisi| 30|
    |  wangwu| 40|
    +--------+---+
    

    DataSet转换为RDD

    scala> val ds = mapRDD.toDS
    ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
    
    scala> ds.rdd
    res32: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[73] at rdd at <console>:33
    

    DataFrame转换为DataSet

    scala> rdd.collect
    res33: Array[(String, Int)] = Array((zhangsan,20), (lisi,30), (wangwu,40))
    
    scala> val df = rdd.toDF("name","age")
    df: org.apache.spark.sql.DataFrame = [name: string, age: int]
    
    scala> df.as[Person]
    res34: org.apache.spark.sql.Dataset[Person] = [name: string, age: int]
    

    DataSet转换为DataFrame

    scala> val ds = df.as[Person]
    ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: int]
    
    scala> ds.toDF
    res35: org.apache.spark.sql.DataFrame = [name: string, age: int]
    

    使用IDEA来创建项目

    package com.bigdata.sparksql
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.SparkSession
    
    object SparkSQL01_Demo {
      def main(args: Array[String]): Unit = {
        //TODO 创建配置对象
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Demo")
        val sc = new SparkContext(sparkConf)
        //TODO 创建SparkSQL的环境对象
        val spark = SparkSession.builder().config(sparkConf).config("spark.sql.warehouse.dir", "C:/Users/RenZetong/Desktop/桌面备份/SparkSQLDev/spark-warehouse").getOrCreate()
        //TODO 构建DataFrame
        val frame = spark.read.json("input/2.json")
        frame.show()
        spark.stop()
      }
    }
    

    构建DataFrame

    package com.bigdata.sparksql
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.{SparkConf, SparkContext}
    
    object SparkSQL02_SQL {
      def main(args: Array[String]): Unit = {
        //TODO 创建配置对象
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Demo")
        val sc = new SparkContext(sparkConf)
        //TODO 创建SparkSQL的环境对象
        val spark = SparkSession.builder().config(sparkConf).config("spark.sql.warehouse.dir", "C:/Users/RenZetong/Desktop/桌面备份/SparkSQLDev/spark-warehouse").getOrCreate()
        //TODO 构建DataFrame
        val frame = spark.read.json("input/2.json")
        //将frame转换为表
        frame.createTempView("user")
        //采用SQL语法进行访问
        spark.sql("select * from user").show()
        spark.stop()
      }
    }
    

    类型相互转换

    package com.bigdata.sparksql
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.{SparkConf, SparkContext}
    
    object SparkSQL03_Transform {
      def main(args: Array[String]): Unit = {
        //TODO 创建配置对象
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Demo")
        val sc = new SparkContext(sparkConf)
        //TODO 创建SparkSQL的环境对象
        val spark = SparkSession.builder().config(sparkConf).config("spark.sql.warehouse.dir", "C:/Users/RenZetong/Desktop/桌面备份/SparkSQLDev/spark-warehouse").getOrCreate()
    
        //转换之前,需要引入隐式转换规则
        import spark.implicits._
    
        //创建RDD
        val rdd = spark.sparkContext.makeRDD(List((1,"张三",22),(2,"李四",20),(3,"王五",30)))
        //转换为DF
        val df = rdd.toDF("id","name","age")
        //转换为DS
        val ds = df.as[User]
        //转换为DF
        val df1 = ds.toDF()
        //转换为RDD
        val rdd1 = df1.rdd
    
        rdd1.foreach(
          row=>{
            println(row.getInt(0) + "," + row.getString(1) + "," + row.getInt(2))
          }
        )
      }
    }
    //TODO 增加一个样例类来辅助表结构
    case class User(id:Int,name:String,age:Int)
    

    用户自定义函数

    在Sehll窗口中可以通过spark.udf功能用户可以自定义函数

    scala> var df = spark.read.json("hdfs://192.168.83.101:9000/datas/2.json")
    df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    scala> spark.udf.register("addName",(x:String)=>"Name:" + x)
    res1: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
    
    scala> df.createTempView
       def createTempView(viewName: String): Unit
    
    scala> df.createTempView("user")
    +---+--------+
    |age|    name|
    +---+--------+
    | 20|zhangsan|
    | 20|    lisi|
    | 21|  wangwu|
    | 21|   liliu|
    | 18|   liyue|
    +---+--------+
    
    scala> spark.sql("select * from user").show()
    
    scala> spark.sql("select addName(name)  from user").show()
    +-------------+
    |    UDF(name)|
    +-------------+
    |Name:zhangsan|
    |    Name:lisi|
    |  Name:wangwu|
    |   Name:liliu|
    |   Name:liyue|
    +-------------+
    

    用户自定义聚合函数(弱类型)

    强类型的Dataset和弱类型的DataFrame都提供了相关的聚合函数,如count() countDistinct() avg() max() min() 。除此之外,用户可以设定自己的定义聚合函数

    弱类型用户自定义聚合函数:通过继承UserDefinedAggregateFunction来实现用户自定义聚合函数

    package com.bigdata.sparksql
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
    import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StructType}
    
    object SparkSQL01_Demo {
      def main(args: Array[String]): Unit = {
        //TODO 创建配置对象
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Demo")
        val sc = new SparkContext(sparkConf)
        //TODO 创建SparkSQL的环境对象
        val spark = SparkSession.builder().config(sparkConf).config("spark.sql.warehouse.dir", "C:/Users/RenZetong/Desktop/桌面备份/SparkSQLDev/spark-warehouse").getOrCreate()
        //TODO 自定义聚合函数
        val udaf = new MyAgeAvgFunction
        spark.udf.register("avgAge", udaf)
    
        //使用聚合函数
        val frame = spark.read.json("input/2.json")
    
        frame.createTempView("user")
        spark.sql("select avgAge(age) from user").show()
    
        spark.stop()
      }
    }
    
    //TODO 声明用户自定义聚合函数_弱类型
    //1、继承UserDefinedAggregateFunction
    //2、实现方法
    class MyAgeAvgFunction extends UserDefinedAggregateFunction {
    
      //TODO 函数输入的数据结构
      override def inputSchema: StructType = {
        //当前输入的类型只有一个age字段,为Long类型
        new StructType().add("age", LongType)
      }
    
      //TODO 计算时的数据结构
      override def bufferSchema: StructType = {
        new StructType().add("sum", LongType).add("count", LongType)
      }
    
      //TODO 数据计算完毕之后的结构类型,函数返回的数据类型
      override def dataType: DataType = DoubleType
    
      //TODO 函数是否稳定,(传入的数据类型是否一支切稳定)
      override def deterministic: Boolean = true
    
      //TODO 计算之前的缓冲区的初始化
      override def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = 0L //代表第一个结构"sum"
        buffer(1) = 0L //代表第二个结构"count"
      }
    
      //TODO 根据查询结果来更新缓冲区数据
      override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        buffer(0) = buffer.getLong(0) + input.getLong(0)
        buffer(1) = buffer.getLong(1) + 1
      }
    
      //TODO 将多个节点的缓冲区合并
      override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        //new_sum = sum + sum
        buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
        //new_count = count + count
        buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
      }
    
      //TODO 最终结果计算
      override def evaluate(buffer: Row): Any = {
        buffer.getLong(0).toDouble / buffer.getLong(1)
      }
    }
    
    

    用户自定义聚合函数(强类型)

    package com.bigdata.sparksql
    
    import org.apache.spark.sql.expressions.Aggregator
    import org.apache.spark.sql.{Dataset, Encoder, Encoders, Row, SparkSession}
    import org.apache.spark.{SparkConf, SparkContext}
    
    object SparkSQL04_UDAF_Class {
      def main(args: Array[String]): Unit = {
        //TODO 创建配置对象
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Demo")
        val sc = new SparkContext(sparkConf)
        //TODO 创建SparkSQL的环境对象
        val spark = SparkSession.builder().config(sparkConf).config("spark.sql.warehouse.dir", "C:/Users/RenZetong/Desktop/桌面备份/SparkSQLDev/spark-warehouse").getOrCreate()
        //转换之前,需要引入隐式转换规则
        import spark.implicits._
    
        //创建聚合函数对象
        val udaf = new MyAgeAvgClassFunction
        //将聚合函数转换为查询列
        val vagCol = udaf.toColumn.name("avgAge")
    
        val frame = spark.read.json("input/2.json")
    
        val userDS = frame.as[UserBean]
    
        //应用函数
        userDS.select(vagCol).show()
    
        spark.stop()
      }
    }
    
    case class UserBean(name: String, age: BigInt)
    
    case class AvgBuffer(var sum: BigInt, var count: Int)
    
    //TODO 声明用户自定义聚合函数_强类型
    //1、继承Aggregator
    //2、实现方法
    
    class MyAgeAvgClassFunction extends Aggregator[UserBean, AvgBuffer, Double] {
      //TODO 初始化
      override def zero: AvgBuffer = {
        AvgBuffer(0, 0)
      }
    
      /**
       * 聚合数据
       */
      override def reduce(b: AvgBuffer, a: UserBean): AvgBuffer = {
        b.sum = b.sum + a.age
        b.count = b.count + 1
        b
      }
    
      //TODO 缓冲区合并
      override def merge(b1: AvgBuffer, b2: AvgBuffer): AvgBuffer = {
        b1.sum = b1.sum + b2.sum
        b1.count = b1.count + b2.count
        b1
      }
    
      //TODO 完成计算
      override def finish(reduction: AvgBuffer): Double = {
        reduction.sum.toDouble / reduction.count
      }
    
      //TODO 数据类型转码操作
      override def bufferEncoder: Encoder[AvgBuffer] = Encoders.product
    
      //TODO 数据类型转码操作
      override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
    }
    
    
  • 相关阅读:
    TSQL循环打印一年中所有的日期(WHILE循环)
    给Table加字段的SQL
    [正则表达式]前台JS得到控件ID (该控件被其它控件包住了)
    1.SQL Server中批量更新Object的Owner 2.附加数据库
    转:动态LINQ的几种方法
    转:查看LINQ生成SQL语句的几种方法
    TrimZero方法
    Oracle关联更新语法(TSQL中的update...from)
    Table之间的空隙或Table与父控件之间的空隙怎么去掉?
    自动完成带来的麻烦
  • 原文地址:https://www.cnblogs.com/MineLSG/p/14107341.html
Copyright © 2011-2022 走看看