zoukankan      html  css  js  c++  java
  • 第3章 SparkSQL解析

    3SparkSQL解析

    3.1 新的起始点SparkSession

    在老的版本中,SparkSQL提供两种SQL查询起始点,一个叫SQLContext,用于Spark自己提供的SQL查询,一个叫HiveContext,用于连接Hive的查询,SparkSessionSpark最新的SQL查询起始点,实质上是SQLContextHiveContext的组合,所以在SQLContextHiveContext上可用的APISparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。

    import org.apache.spark.sql.SparkSession
    
    val spark = SparkSession
    .builder()
    .appName("Spark SQL basic example")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
    
    // For implicit conversions like converting RDDs to DataFrames
    import spark.implicits._

    SparkSession.builder 用于创建一个SparkSession

    import spark.implicits._的引入是用于将DataFrames隐式转换成RDD,使df能够使用RDD中的方法。

    如果需要Hive支持,则需要以下创建语句:

    import org.apache.spark.sql.SparkSession
    
    val spark = SparkSession
    .builder()
    .appName("Spark SQL basic example")
    .config("spark.some.config.option", "some-value")
    .enableHiveSupport()
    .getOrCreate()
    
    // For implicit conversions like converting RDDs to DataFrames
    import spark.implicits._

    3.2 创建DataFrames

       Spark SQLSparkSession是创建DataFrames和执行SQL的入口,创建DataFrames有三种方式,一种是可以从一个存在的RDD进行转换,还可以Hive Table进行查询返回,或者通过Spark的数据源进行创建

    Spark数据源建:

    val df = spark.read.json("examples/src/main/resources/people.json")    
    // Displays the content of the DataFrame to stdout
    df.show()
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+

    RDD进行转换:

    /**
    Michael, 29
    Andy, 30
    Justin, 19
    **/
    scala> val peopleRdd = sc.textFile("examples/src/main/resources/people.txt")
    peopleRdd: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[18] at textFile at <console>:24
    
    scala> val peopleDF3 = peopleRdd.map(_.split(",")).map(paras => (paras(0),paras(1).trim().toInt)).toDF("name","age")
    peopleDF3: org.apache.spark.sql.DataFrame = [name: string, age: int]
    
    scala> peopleDF.show()
    +-------+---+
    |   name|age|
    +-------+---+
    |Michael| 29|
    |   Andy| 30|
    | Justin| 19|
    +-------+---+

    Hive在数据源章节介绍

    3.3 DataFrame常用操作

    3.3.1 DSL风格语法

    
    
    val df = spark.read.json("examples/src/main/resources/people.json")    
    // This import is needed to use the $-notation
    import spark.implicits._
    // Print the schema in a tree format
    df.printSchema()
    // root
    // |-- age: long (nullable = true)
    // |-- name: string (nullable = true)
    
    // Select only the "name" column
    df.select("name").show()
    // +-------+
    // |   name|
    // +-------+
    // |Michael|
    // |   Andy|
    // | Justin|
    // +-------+
    
    // Select everybody, but increment the age by 1
    df.select($"name", $"age" + 1).show()
    // +-------+---------+
    // |   name|(age + 1)|
    // +-------+---------+
    // |Michael|     null|
    // |   Andy|       31|
    // | Justin|       20|
    // +-------+---------+
    
    // Select people older than 21
    df.filter($"age" > 21).show()
    // +---+----+
    // |age|name|
    // +---+----+
    // | 30|Andy|
    // +---+----+
    
    // Count people by age
    df.groupBy("age").count().show()
    // +----+-----+
    // | age|count|
    // +----+-----+
    // |  19|    1|
    // |null|    1|
    // |  30|    1|
    // +----+-----+

     

    3.3.2 SQL风格语法

    val df = spark.read.json("examples/src/main/resources/people.json")

    //
    Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // Register the DataFrame as a global temporary view df.createGlobalTempView("people") // Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+

     

    临时表是Session范围内的,Session退出后,表就失效了。如果想应用内有效,可以使用全局表。注意使用全局表需要全路径访问,如:global_temp.people

    3.4 创建DataSet

    Dataset是具有强类型的数据集合,需要提供对应的类型信息。

    // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
    // you can use custom classes that implement the Product interface
    case class Person(name: String, age: Long)
    
    // Encoders are created for case classes
    val caseClassDS = Seq(Person("Andy", 32)).toDS()
    caseClassDS.show()
    // +----+---+
    // |name|age|
    // +----+---+
    // |Andy| 32|
    // +----+---+
    
    // Encoders for most common types are automatically provided by importing spark.implicits._
    val primitiveDS = Seq(1, 2, 3).toDS()
    primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
    
    // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
    val path = "examples/src/main/resources/people.json"
    val peopleDS = spark.read.json(path).as[Person]
    peopleDS.show()
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+

    3.5 DatasetRDD互操作

    Spark SQL支持通过两种方式将存在的RDD转换为Dataset,转换的过程中需要让Dataset获取RDD中的Schema信息,主要有两种方式,一种是通过反射来获取RDD中的Schema信息。这种方式适合于列名已知的情况下。第二种是通过编程接口的方式将Schema信息应用于RDD,这种方式可以处理那种在运行时才能知道列的方式。

    3.5.1 通过反射获取Scheam

    SparkSQL能够自动将包含有case类的RDD转换成DataFramecase类定义了table的结构,case类属性通过反射变成了表的列名。Case类可以包含诸如Seqs或者Array等复杂的结构

    // For implicit conversions from RDDs to DataFrames
    import spark.implicits._
    
    // Create an RDD of Person objects from a text file, convert it to a Dataframe
    val peopleDF = spark.sparkContext
    .textFile("examples/src/main/resources/people.txt")
    .map(_.split(","))
    .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
    .toDF()
    
    // Register the DataFrame as a temporary view
    peopleDF.createOrReplaceTempView("people")
    
    // SQL statements can be run by using the sql methods provided by Spark
    val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
    
    // The columns of a row in the result can be accessed by field index  ROW object
    teenagersDF.map(teenager => "Name: " + teenager(0)).show()
    // +------------+
    // |       value|
    // +------------+
    // |Name: Justin|
    // +------------+
    
    // or by field name
    teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
    // +------------+
    // |       value|
    // +------------+
    // |Name: Justin|
    // +------------+
    
    // No pre-defined encoders for Dataset[Map[K,V]], define explicitly
    implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
    // Primitive types and case classes can be also defined as
    // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
    
    // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
    teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
    // Array(Map("name" -> "Justin", "age" -> 19))

    3.5.2 通过编程设置Schema

      如果case类不能够提前定义,可以通过下面三个步骤定义一个DataFrame

      创建一个多行结构的RDD;

      创建用StructType来表示的行构信息。

      通过SparkSession提供的createDataFrame方法来应用Schema 。

    import org.apache.spark.sql.types._
    
    // Create an RDD
    val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
    
    // The schema is encoded in a string,应该是动态通过程序生成的
    val schemaString = "name age"
    
    // Generate the schema based on the string of schema   Array[StructFiled]
    val fields = schemaString.split(" ")
    .map(fieldName => StructField(fieldName, StringType, nullable = true))
    
    // val filed = schemaString.split(" ").map(filename=> filename match{ case "name"=> StructField(filename,StringType,nullable = true); case "age"=>StructField(filename, IntegerType,nullable = true)} )
    
    val schema = StructType(fields)
    
    // Convert records of the RDD (people) to Rows
    import org.apache.spark.sql._
    val rowRDD = peopleRDD
    .map(_.split(","))
    .map(attributes => Row(attributes(0), attributes(1).trim))
    
    // Apply the schema to the RDD
    val peopleDF = spark.createDataFrame(rowRDD, schema)
    
    // Creates a temporary view using the DataFrame
    peopleDF.createOrReplaceTempView("people")
    
    // SQL can be run over a temporary view created using DataFrames
    val results = spark.sql("SELECT name FROM people")
    
    // The results of SQL queries are DataFrames and support all the normal RDD operations
    // The columns of a row in the result can be accessed by field index or by field name
    results.map(attributes => "Name: " + attributes(0)).show()
    // +-------------+
    // |        value|
    // +-------------+
    // |Name: Michael|
    // |   Name: Andy|
    // | Name: Justin|
    // +-------------+

    3.6 类型之间的转换总结

    RDDDataFrameDataset三者有许多共性,有各自适用的场景常常需要在三者之间转换

      DataFrame/DatasetRDD

      这个转换很简单

    val rdd1=testDF.rdd
    val rdd2=testDS.rdd

      RDDDataFrame

    import spark.implicits._
    val testDF = rdd.map {line=>
          (line._1,line._2)
        }.toDF("col1","col2")

    一般用元组把一行的数据写在一起,然后在toDF中指定字段名

      RDDDataset

    import spark.implicits._
    case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
    val testDS = rdd.map {line=>
          Coltest(line._1,line._2)
        }.toDS

      可以注意到,定义每一行的类型(case class)时,已经给出了字段名和类型,后面只要往case class里面添加值即可

      DatasetDataFrame

      这个也很简单,因为只是把case class封装成Row

    import spark.implicits._
    val testDF = testDS.toDF

      DataFrameDataset

    import spark.implicits._
    case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
    val testDS = testDF.as[Coltest]

    这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便

    在使用一些特殊的操作时,一定要加上 import spark.implicits._ 不然toDFtoDS无法使用

    3.7 用户自定义函数

      通过spark.udf功能用户可以自定义函数。

    3.7.1 用户自定义UDF函数

    scala> val df = spark.read.json("examples/src/main/resources/people.json")
    df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    scala> df.show()
    +----+-------+
    | age|   name|
    +----+-------+
    |null|Michael|
    |  30|   Andy|
    |  19| Justin|
    +----+-------+
    
    
    scala> spark.udf.register("addName", (x:String)=> "Name:"+x)
    res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
    
    scala> df.createOrReplaceTempView("people")
    
    scala> spark.sql("Select addName(name), age from people").show()
    +-----------------+----+
    |UDF:addName(name)| age|
    +-----------------+----+
    |     Name:Michael|null|
    |        Name:Andy|  30|
    |      Name:Justin|  19|
    +-----------------+----+

    3.7.2 用户自定义聚合函数

      强类型的Dataset型的DataFrame都提供了相关的聚合函数, 如 count()countDistinct()avg()max()min()。除此之外,用可以定自己的自定聚合函数。

      弱类型用户自定义聚合函数:通过继承UserDefinedAggregateFunction来实现用户自定义聚合函数。下面展示一个求平均工资的自定义聚合函数。

    import org.apache.spark.sql.expressions.MutableAggregationBuffer
    import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.SparkSession
    
    object MyAverage extends UserDefinedAggregateFunction {
    // 聚合函数输入参数的数据类型 
    def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
    // 聚合缓冲区中值得数据类型 
    def bufferSchema: StructType = {
    StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
    }
    // 返回值的数据类型 
    def dataType: DataType = DoubleType
    // 对于相同的输入是否一直返回相同的输出。 
    def deterministic: Boolean = true
    // 初始化
    def initialize(buffer: MutableAggregationBuffer): Unit = {
    // 存工资的总额
    buffer(0) = 0L
    // 存工资的个数
    buffer(1) = 0L
    }
    // 相同Execute间的数据合并。 
    def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
    buffer(0) = buffer.getLong(0) + input.getLong(0)
    buffer(1) = buffer.getLong(1) + 1
    }
    }
    // 不同Execute间的数据合并 
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
    }
    // 计算最终结果
    def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
    }
    
    // 注册函数
    spark.udf.register("myAverage", MyAverage)
    
    val df = spark.read.json("examples/src/main/resources/employees.json")
    df.createOrReplaceTempView("employees")
    df.show()
    // +-------+------+
    // |   name|salary|
    // +-------+------+
    // |Michael|  3000|
    // |   Andy|  4500|
    // | Justin|  3500|
    // |  Berta|  4000|
    // +-------+------+
    
    val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
    result.show()
    // +--------------+
    // |average_salary|
    // +--------------+
    // |        3750.0|
    // +--------------+

    强类型用户自定义聚合函数:通过继承Aggregator来实现强类型自定义聚合函数,同样是求平均工资

    import org.apache.spark.sql.expressions.Aggregator
    import org.apache.spark.sql.Encoder
    import org.apache.spark.sql.Encoders
    import org.apache.spark.sql.SparkSession
    // 既然是强类型,可能有case类
    case class Employee(name: String, salary: Long)
    case class Average(var sum: Long, var count: Long)
    
    object MyAverage extends Aggregator[Employee, Average, Double] {
    // 定义一个数据结构,保存工资总数和工资总个数,初始都为0
    def zero: Average = Average(0L, 0L)
    // Combine two values to produce a new value. For performance, the function may modify `buffer`
    // and return it instead of constructing a new object
    def reduce(buffer: Average, employee: Employee): Average = {
    buffer.sum += employee.salary
    buffer.count += 1
    buffer
    }
    // 聚合不同execute的结果
    def merge(b1: Average, b2: Average): Average = {
    b1.sum += b2.sum
    b1.count += b2.count
    b1
    }
    // 计算输出
    def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
    // 设定之间值类型的编码器,要转换成case类
    // Encoders.product是进行scala元组和case类转换的编码器 
    def bufferEncoder: Encoder[Average] = Encoders.product
    // 设定最终输出值的编码器
    def outputEncoder: Encoder[Double] = Encoders.scalaDouble
    }
    
    import spark.implicits._
    
    val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
    ds.show()
    // +-------+------+
    // |   name|salary|
    // +-------+------+
    // |Michael|  3000|
    // |   Andy|  4500|
    // | Justin|  3500|
    // |  Berta|  4000|
    // +-------+------+
    
    // Convert the function to a `TypedColumn` and give it a name
    val averageSalary = MyAverage.toColumn.name("average_salary")
    val result = ds.select(averageSalary)
    result.show()
    // +--------------+
    // |average_salary|
    // +--------------+
    // |        3750.0|
    // +--------------+

     

     

  • 相关阅读:
    远程连接ORACLE服务的设置方法
    错误:ORA28002: the password will expire within 7 days 解决方法
    SQL Server、Oracle、Mysql查询前n条记录
    oracle登录错误:ORA28000: the account is locked 解决方法
    学习oracle中的PCTFREE和PCTUSED
    错误“ORA12520: TNS: 监听程序无法找到需要的服务器类型的可用句柄”解决方法
    windows计划任务+批处理文件实现oracle数据库的定时备份与恢复
    Address already in use: JVM_Bind(端口冲突)
    ORACLE VARCHAR 排序问题
    从百度空间到CSDN——博客搬家源码
  • 原文地址:https://www.cnblogs.com/Diyo/p/11342876.html
Copyright © 2011-2022 走看看