zoukankan      html  css  js  c++  java
  • SparkSQL

    Spark SQL

    Spark SQL是Spark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame和DataSet,并且作为分布式SQL查询引擎的作用。

    Hive SQL是转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduc的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!

     SparkSession

    在spark2.0中,引入SparkSession(作为DataSet和DataFrame API的切入点)作为Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext); 为了向后兼容,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。

    SparkSession内部封装了sparkContext、SparkConf、SQLContext,所以计算实际上是由sparkContext完成的。

     ---- 为用户提供一个统一的切入点使用Spark 各项功能

     ---- 允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写程序

     --- 与 Spark 交互之时不需要显示的创建 SparkConf, SparkContext 以及 SQlContext,这些对象已经封闭在 SparkSession 中

    DataFrame

    在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

               

    创建

    在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:

      通过Spark的数据源进行创建;从一个存在的RDD进行转换;还可以从Hive Table进行查询返回。

    读取json文件创建DataFrame

    spark读取json按行读取;只要一行符合json的格式即可;
    scala> val rdd = spark.read.json("/opt/module/spark/spark-local/examples/src/main/resources/people.json") rdd: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> rdd.show +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
    ① SQL风格语法
    ##转化成sql去执行

    scala> rdd.createTempView("user") //view是table的查询结果,只能查不能改 scala> spark.sql("select * from user").show +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ scala> spark.sql("select * from user where age is not null").show +---+------+ |age| name| +---+------+ | 30| Andy| | 19|Justin| +---+------+

    注意:普通临时view是Session范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.people

    scala> rdd.createGlobalTempView("emp")  //提升为全局
    scala> spark.sql("select * from user where age is not null").show
    +---+------+
    |age|  name|
    +---+------+
    | 30|  Andy|
    | 19|Justin|
    +---+------+
    
    scala> spark.sql("select * from emp where age is not null").show    //sql默认从当前session中查找,所以查询时需要加上global_temp
    org.apache.spark.sql.AnalysisException: Table or view not found: emp; line 1 pos 14
    scala> spark.sql("select * from global_temp.emp where age is not null").show
    +---+------+
    |age|  name|
    +---+------+
    | 30|  Andy|
    | 19|Justin|
    +---+------+

     ② 以面向对象方式访问;DSL风格语法 模仿面向对象的方式

    scala> rdd.printSchema
    root
     |-- age: long (nullable = true)
     |-- name: string (nullable = true)
    
    
    scala> rdd.select("age").show
    +----+
    | age|
    +----+
    |null|
    |  30|
    |  19|
    +----+
    
    
    scala> rdd.select($"age"+1).show
    +---------+
    |(age + 1)|
    +---------+
    |     null|
    |       31|
    |       20|
    +---------+

    代码方式:

    方式一:通过 case class 创建 DataFrames(反射)

    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{DataFrame, SparkSession}
    object TestSparkSql {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkSql").setMaster("local[*]")
        val sc: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
        // 将本地的数据读入 RDD, 并将 RDD 与 case class 关联
        val peopleRdd = sc.sparkContext.textFile("file:\F:\Input\people.txt")
          .map(line => People(line.split(",")(0),line.split(",")(1).trim.toInt))
        import sc.implicits._
        // 将RDD 转换成 DataFrames
        val df: DataFrame = peopleRdd.toDF
        //将DataFrames创建成一个临时的视图
        df.createOrReplaceTempView("people")
        sc.sql("select * from people").show() //使用SQL语句进行查询
        sc.stop()
      }
    }
    //定义case class,相当于表结构
    case class People(var name: String, var age: Int)

    说明:

    ① textFile默认是从hdfs读取文件; 本地文件读取 sc.textFile("路径"),在路径前面加上file:// 表示从本地文件系统读

    ② textFile可直接读取多个文件夹(嵌套)下的多个数据文件,如上边路径可写成  "file:\F:\Input"  读取这个目录下多个文件

    方式二:通过 structType 创建 DataFrames(编程接口),测试代码如下

    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    import org.apache.spark.sql.{DataFrame, SparkSession}
    object TestSparkSql {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkSql").setMaster("local[*]")
        val sc: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
        // 将本地的数据读入 RDD
        val peopleRdd = sc.sparkContext.textFile("file:\F:\Input")
        // 将 RDD 数据映射成 Row,需要 import org.apache.spark.sql.Row
        import org.apache.spark.sql.Row
        val rowRDD: RDD[Row] = peopleRdd.map(line => {
          val fields = line.split(",")
          Row(fields(0), fields(1).trim.toInt)
        })
        val structType: StructType = StructType(
          //字段名,字段类型,是否可以为空
          StructField("name", StringType, true) ::
            StructField("age", IntegerType, true) :: Nil
        )
        //将DataFrames创建成一个临时的视图
        val df: DataFrame = sc.createDataFrame(rowRDD,structType)
        df.createTempView("people")
        sc.sql("select * from people").show() //使用SQL语句进行查询
        sc.stop()
      }
    }
    View Code

                          

     方式三:读取json文件

    people.json 必须是在一行:

    {"name":"swenna","age":18}
    {"name": "kk","age":20}
    //读取json数据
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{DataFrame, SparkSession}
    object TestSparkSql {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkSql").setMaster("local[*]")
        val sc: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
        // 将本地的数据读入 RDD
        val df: DataFrame = sc.read.json("file:\F:\Input\people.json")
    
        //将DataFrames创建成一个临时的视图
        df.createOrReplaceTempView("people")
        sc.sql("select * from people").show() //使用SQL语句进行查询
        sc.stop()
      }
    }

                   

    RDD转成DF

    注意:如果需要RDD与DF或者DS之间操作,那么都需要引入 import spark.implicits._  【spark不是包名,而是sparkSession对象的名称】

    前置条件:导入隐式转换并创建一个RDD

    scala> import spark.implicits._  spark对象中的隐式转换规则,而不是导入包名
    import spark.implicits._
    scala
    > val df = rdd.toDF("id", "name") df: org.apache.spark.sql.DataFrame = [id: bigint, name: string] scala> df.show +----+-------+ | id| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ scala> df.createTempView("Student") scala> spark.sql("select * from student").show
    scala> val x = sc.makeRDD(List(("a",1), ("b",4), ("c", 3)))
    
    scala> x.collect
    res36: Array[(String, Int)] = Array((a,1), (b,4), (c,3))
    
    scala> x.toDF("name", "count")
    res37: org.apache.spark.sql.DataFrame = [name: string, count: int]
    
    scala> val y = x.toDF("name", "count")
    y: org.apache.spark.sql.DataFrame = [name: string, count: int]
    
    scala> y.show
    +----+-----+
    |name|count|
    +----+-----+
    | a | 1|
    | b | 4|
    | c | 3|
    +----+-----+

    DF--->RDD  直接调用rdd即可

    scala> y.rdd.collect
    res46: Array[org.apache.spark.sql.Row] = Array([a,1], [b,4], [c,3])
    scala> df.rdd.collect
    res49: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])

    RDD转换为DataSet

    SparkSQL能够自动将包含有case类的RDD转换成DataFrame,case类定义了table的结构,case类属性通过反射变成了表的列名。Case类可以包含诸如Seqs或者Array等复杂的结构。     DataSet是具有强类型的数据集合,需要提供对应的类型信息。

    scala> case class People(age: BigInt, name: String)
    defined class People
    scala> rdd.collect
    res77: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])
    scala> val ds = rdd.as[People]
    ds: org.apache.spark.sql.Dataset[People] = [age: bigint, name: string]
    scala> ds.collect
    res31: Array[People] = Array(People(null,Michael), People(30,Andy), People(19,Justin))
    scala> case class Person(name: String, age: Long)
    defined class Person
    
    scala> val caseclassDS = Seq(Person("kris", 20)).toDS()
    caseclassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
    
    scala> caseclassDS.show
    +----+---+
    |name|age|
    +----+---+
    |kris| 20|
    +----+---+
    scala> caseclassDS.collect
    res51: Array[Person] = Array(Person(kris,20))

    通过textFile方法创建rdd并转DS

    scala> val textFileRDD = sc.textFile("/opt/module/spark/spark-local/examples/src/main/resources/people.txt")
    scala> textFileRDD.collect
    res78: Array[String] = Array(Michael, 29, Andy, 30, Justin, 19)
    scala> case class Person(name: String, age: Long)
    defined class Person
    
    scala> textFileRDD.map(x=>{val rddMap = x.split(","); Person(rddMap(0), rddMap(1).trim.toInt)}).toDS
    res80: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

    DS ----> RDD 调用rdd方法即可

    scala> val DS = Seq(Person("Andy", 32)).toDS()  用这种方式可创建一个DataSet
    DS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
    
    scala> ds.collect
    res76: Array[People] = Array(People(null,Michael), People(30,Andy), People(19,Justin))
    scala> ds.rdd.collect
    res75: Array[People] = Array(People(null,Michael), People(30,Andy), People(19,Justin))

    DF ---> DS

    spark.read.json(“ path ”)即是DataFrame类型; 

    scala> df.collect
    res72: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])
    scala> case class Student(id: BigInt, name: String)
    defined class Student
    scala> df.as[Student]
    res69: org.apache.spark.sql.Dataset[Student] = [id: bigint, name: string]

    DS-->DF

    这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便。在使用一些特殊的操作时,一定要加上 import spark.implicits._ 不然toDF、toDS无法使用。

    scala> ds.collect
    res73: Array[People] = Array(People(null,Michael), People(30,Andy), People(19,Justin))
    
    scala> ds.toDF
    res74: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

     三者的共性

    (1)RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利;

    (2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算;

    (3)三者有许多共同的函数,如filter,排序等;

    (4)在对DataFrame和Dataset进行操作许多操作都需要这个包:import spark.implicits._(在创建好SparkSession对象后尽量直接导入)

    互相转化

    RDD关心数据,DataFrame关心结构,DataSet关心类型;

      ① 将RDD转换为DataFrame,需要增加结构信息,所以调用toDF方法,需要增加结构;

      ② 将RDD转换为DataSet,需要增加结构和类型信息,所以需要转换为指定类型后,调用toDS方法;

      ③ 将DataFrame转换为DataSet时,因为已经包含结构信息,只有增加类型信息就可以,所以调用as[类型]

      ④因为DF中本身包含数据,所以转换为RDD时,直接调用rdd即可;

      ⑤因为DS中本身包含数据,所以转换为RDD时,直接调用rdd即可;

      ⑥因为DS本身包含数据结构信息,所以转换为DF时,直接调用toDF即可

    三者的区别

    联系:RDD、DataFrame、DataSet三者的联系是都是spark当中的一种数据类型,RDD是SparkCore当中的,DataFrame和DataSet都是SparkSql中的,它俩底层都基于RDD实现的;

    区别:RDD 优点: ①编译时类型安全 ;②面向对象的编程风格 ; ③直接通过类名点的方式来操作数据; 缺点是通信or IO操作都需要序列化和反序列化的性能开销 ,比较耗费性能; GC的性能开销 ,频繁的创建和销毁对象, 势必会增加GC;

    DataFrame引入了schema和off-heap堆外内存不会频繁GC,减少了内存的开销; 缺点是类型不安全;

    DataSet结合了它俩的优点并且把缺点给屏蔽掉了;

     

    1. RDD: ① RDD一般和spark mlib同时使用; ② RDD不支持sparksql操作

    2. DataFrame:

      1)与RDD和Dataset不同,DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值,

    testDF.foreach{
      line =>
        val col1=line.getAs[String]("col1")
        val col2=line.getAs[String]("col2")
    }

      2)DataFrame与Dataset一般不与spark mlib同时使用

      3)DataFrame与Dataset均支持sparksql的操作,比如select,groupby之类,还能注册临时表/视窗,进行sql语句操作,如:dataDF.createOrReplaceTempView("tmp")

        spark.sql("select  ROW,DATE from tmp where DATE is not null order by DATE").show(100,false)

      4)DataFrame与Dataset支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然

    val saveoptions = Map("header" -> "true", "delimiter" -> "	", "path" -> "hdfs://hadoop102:9000/test")  //保存
    datawDF.write.format("com.atguigu.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save()  //读取
    
    val options = Map("header" -> "true", "delimiter" -> "	", "path" -> "hdfs://hadoop102:9000/test")
    val datarDF= spark.read.options(options).format("com.atguigu.spark.csv").load()
    利用这样的保存方式,可以方便的获得字段名和列的对应,而且分隔符(delimiter)可以自由指定。

    3. Dataset:

      1)Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。

      2)DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的getAS方法或者模式匹配拿出特定字段。而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息

    case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
    /**
     rdd
     ("a", 1)
     ("b", 1)
     ("a", 1)
    **/
    val test: Dataset[Coltest]=rdd.map{line=>
          Coltest(line._1,line._2)
        }.toDS
    test.map{
          line=>
            println(line.col1)
            println(line.col2)
        }

    可以看出,Dataset在需要访问列中的某个字段时是非常方便的,然而,如果要写一些适配性很强的函数时,如果使用Dataset,行的类型又不确定,可能是各种case class,无法实现适配,这时候用DataFrame即Dataset[Row]就能比较好的解决问题

    IDEA创建SparkSQL程序

    object TestSparkSql {
      def main(args: Array[String]): Unit = {
        //创建配置对象
        val conf: SparkConf = new SparkConf().setAppName("SQL").setMaster("local[*]")
        //创建环境对象
        val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
        //导入隐式转换
        import sparkSession.implicits._
    
        //执行操作
        // TODO 创建DataFrame
        val df: DataFrame = sparkSession.read.json("input/input.json")
        df.createTempView("user")
        sparkSession.sql("select * from user")
        //df.show()
        // TODO 创建DataSet
        val ds: Dataset[Employ] = Seq(Employ("jing", 18)).toDS()
        //ds.show()
        // TODO 将DataFrame转换为DataSet
        val dfToDs: Dataset[Employ] = df.as[Employ]
        dfToDs.foreach(x => {
          println(x.name + "	" + x.age)
        })
        //TODO 将RDD转换为DataSet
        val rdd: RDD[(String, Int)] = sparkSession.sparkContext.makeRDD(Array(("aa", 19)))
        val employRdd: RDD[Employ] = rdd.map {
          case (name, age) => Employ(name, age)
        }
        //employRdd.toDS().show()
    
        // TODO 将RDD转换为DataFrame
        //rdd.toDF().show()
        val rddToDf: DataFrame = sparkSession.sparkContext.makeRDD(Array(("kris", 18))).toDF("username", "age")
    
        //TODO 将DataFrame转换为RDD[Row]
        df.rdd.foreach(row => {
          println(row.getLong(0)+ "," + row.getString(1))
        })
    
        // TODO 将DataSet转换为RDD[类型]
        val dsToRdd: RDD[Employ] = df.as[Employ].rdd
        sparkSession.stop()
      }
    }
    case class Employ(name: String, age: BigInt)

    用户自定义函数

     Spark SQL数据的加载与保存

    通用加载/保存方法 load和save

    通用的读写方法是  sparkSql只读这parquet file这种类型的文件;  否则要改变它的文件类型需要加.format 
    加上format("json");输出也是这个类型

    scala>val df = spark.read.load("/opt/module/spark/spark-local/examples/src/main/resources/users.parquet").show
    
    scala>df.select("name", " color").write.save("user.parquet") //保存数据
    java.lang.RuntimeException: file:/opt/module/spark/spark-local/examples/src/main/resources/people.json is not a Parquet file. 
    用load读取json数据
    scala> spark.read.format("json").load("/opt/module/spark/spark-local/examples/src/main/resources/people.json").show
    
    df.write.format("json").save("/..")
    
    spark.read.format("json").mode("overwrite").save("/..json")

    MySQL  Spark之读取MySQL数据的方式

    Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。

    可在启动shell时指定相关的数据库驱动路径,或者将相关的数据库驱动放到spark的类路径下。

    [kris@hadoop101 jars]$ cp /opt/software/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar ./
    
    scala> val connectionProperties = new java.util.Properties()
    connectionProperties: java.util.Properties = {}
    
    scala> connectionProperties.put("user", "root")
    res0: Object = null
    
    scala> connectionProperties.put("password", "123456")
    res1: Object = null
    
    scala> val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop101:3306/rdd", "test", connectionProperties)
    jdbcDF2: org.apache.spark.sql.DataFrame = [id: int, name: string]
    
    scala> jdbcDF2.show
    +---+-------+
    | id|   name|
    +---+-------+
    |  1| Google|
    |  2|  Baidu|
    |  3|    Ali|
    |  4|Tencent|
    |  5| Amazon|
    +---+-------+
    
    jdbcDF2.write.mode("append").jdbc("jdbc:mysql://hadoop101:3306/rdd", "test", connectionProperties)
    
    
    scala> val rdd = sc.makeRDD(Array((6, "FaceBook")))
    rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[4] at makeRDD at <console>:24
    
    scala> rdd.toDF("id", "name")
    res5: org.apache.spark.sql.DataFrame = [id: int, name: string]
    
    scala> val df = rdd.toDF("id", "name")
    df: org.apache.spark.sql.DataFrame = [id: int, name: string]
    
    scala> df.show
    +---+--------+
    | id|    name|
    +---+--------+
    |  6|FaceBook|
    +---+--------+
    scala> df.write.mode("append").jdbc("jdbc:mysql://hadoop101:3306/rdd", "test", connectionProperties)
    scala> jdbcDF2.show
    +---+--------+
    | id|    name|
    +---+--------+
    |  1|  Google|
    |  2|   Baidu|
    |  3|     Ali|
    |  4| Tencent|
    |  5|  Amazon|
    |  6|FaceBook|
    +---+--------+

    代码

    import java.util.Properties
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    object TestSparkSql {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkSql").setMaster("local[*]")
        val sc: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
        //method01(sc)
        method02(sc)
        //method03(sc)
      }
      /**
        * 方式一:不指定查询条件
        * 所有的数据由RDD的一个分区处理,如果你这个表数据量很大,表的所有数据都是由RDD的一个分区处理,很可能会出现OOM
        * @param sc
        */
      def method01(sc: SparkSession): Unit = {
        // 将本地的数据读入 RDD
        val url = "jdbc:mysql://hadoop101/company?"
        val table = "staff"
        val prop = new Properties()
        prop.setProperty("user", "root")
        prop.setProperty("password", "123456")
        //需要传入Mysql的URL、表名、properties(连接数据库的用户名密码)
        val df: DataFrame = sc.read.jdbc(url, table, prop)
        println(df.count()) //3
        println(df.rdd.partitions.size) //1
        df.createOrReplaceTempView("staff")
        sc.sql("select * from staff where id <=2").show()
        //df.show()
        sc.stop()
      }
      /**
        * 方式二:指定数据库字段的范围
        * 通过lowerBound和upperBound 指定分区的范围
        * 通过columnName 指定分区的列(只支持整形)
        * 通过numPartitions 指定分区数量 (不宜过大)
        * 说明:将表的数据分布到RDD的几个分区中,分区的数量由numPartitions参数决定,在理想情况下,每个分区处理相同数量的数据,我们在使用的时候不建议将这个值设置的比较大,因为这可能导致数据库挂掉!这个函数的缺点就是只能使用整形数据字段作为分区关键字。
        * @param sc
        */
      def method02(sc: SparkSession): Unit = {
        val lowerBound = 1
        val upperBound = 100000
        val numPartitions = 5
        val url = "jdbc:mysql://hadoop101/company?user=root&password=123456"
        val prop = new Properties()
        val df: DataFrame = sc.read.jdbc(url, "staff", "id", lowerBound, upperBound,numPartitions,prop)
    
        df.show()
        println(df.count())
        println(df.rdd.partitions.length) //5个分区
      }
    
      /**
        * 方式三:根据任意字段进行分区
        * 通过predicates将数据根据score分为2个区
        * 基于前面两种方法的限制,Spark还提供了根据任意字段进行分区的方法;rdd的分区数量就等于predicates.length
         * @param sc
        */
      def method03(sc: SparkSession) = {
        val predicates = Array[String]("id <=2", "id > 1 and id < 3") //2个分区
        val url = "jdbc:mysql://hadoop101/company?user=root&password=123456"
        val prop = new Properties()
        val df: DataFrame = sc.read.jdbc(url,"staff",predicates,prop)
        println(df.count()) //3
        println(df.rdd.partitions.length) //2
        df.show()
      }
    
    }
    
    
    方式四: 通过load获取,和方式二类似
    options函数支持url、driver、dbtable、partitionColumn、lowerBound、upperBound以及numPartitions选项,与方法二的参数一致。
    其内部实现原理部分和方法二大体一致。同时load方法还支持json、orc等数据源的读取。
    val df: DataFrame = sc.read.format("jdbc").options(Map ("url" -> url, "dbtable" -> "staff")).load() 加载条件查询后的数据,报错: Every derived table must have its own alias,这句话的意思是说每个派生出来的表都必须有一个自己的别名,加了一个没有别名即可
    val df: DataFrame = sc.read.format("jdbc").options(Map ("url" -> url, "dbtable" -> "(select s1.id,s2.name,s1.age from stu1 s1 join stu2 s2 on s1.id = s2.id ) stu")).load()

    Hive   Spark之HiveSupport连接(spark-shell和IDEA)

     spark.sparkContext.setLogLevel("WARN") //设置日志输出级别

    Apache Hive是Hadoop上的SQL引擎,Spark SQL编译时可以包含Hive支持,也可以不包含。包含Hive支持的Spark SQL可以支持Hive表访问、UDF(用户自定义函数)以及Hive查询语言(HQL)等。spark-shell默认是Hive支持的;代码中是默认不支持的,需要手动指定(加一个参数即可)。

    如果要使用内嵌的Hive,什么都不用做,直接用就可以了。

    可以修改其数据仓库地址,参数为:--conf spark.sql.warehouse.dir=./wear

    scala> spark.sql("create table emp(name String, age Int)").show
    19/04/11 01:10:17 WARN HiveMetaStore: Location: file:/opt/module/spark/spark-local/spark-warehouse/emp specified for non-external table:emp
    
    scala> spark.sql("load data local inpath '/opt/module/spark/spark-local/examples/src/main/resources/people.txt' into table emp").show
    
    scala> spark.sql("show tables").show
    +--------+---------+-----------+
    |database|tableName|isTemporary|
    +--------+---------+-----------+
    | default|      emp|      false|
    +--------+---------+-----------+
    scala> spark.sql("select * from emp").show
    
    /opt/module/spark/spark-local/spark-warehouse/emp
    [kris@hadoop101 emp]$ ll
    -rwxr-xr-x. 1 kris kris 32 4月  11 01:10 people.txt

    外部Hive应用

    [kris@hadoop101 spark-local]$ rm -rf metastore_db/ spark-warehouse/
    
    [kris@hadoop101 conf]$ cp hive-site.xml /opt/module/spark/spark-local/conf/
    
    [kris@hadoop101 spark-local]$ bin/spark-shell 
    scala> spark.sql("show tables").show
    +--------+--------------------+-----------+
    |database|           tableName|isTemporary|
    +--------+--------------------+-----------+
    | default|            bigtable|      false|
    | default|            business|      false|
    | default|                dept|      false|
    | default|      dept_partition|      false|
    | default|     dept_partition2|      false|
    | default|     dept_partitions|      false|
    | default|                 emp|      false|
    ...
    
    [kris@hadoop101 spark-local]$ bin/spark-sql 
    log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
    log4j:WARN Please initialize the log4j system properly.
    spark-sql (default)> show tables;

    代码中操作Hive

    log4j.properties

    log4j.rootLogger=INFO, stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
    log4j.appender.logfile=org.apache.log4j.FileAppender
    log4j.appender.logfile.File=target/spring.log
    log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
    log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
    View Code

     拷贝Hadoop中core-site.xml、hdfs-site.xml,Hive中hive-site.xml三个文件到resources中(也可以只拷贝hive-site.xml),集群环境把hive的配置文件要发到$SPARK_HOME/conf目录下;

    Maven所依赖的jar包:

        <dependencies>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.27</version>
            </dependency>
            <!--spark操作Hive所需引入的包 spark版本-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.11</artifactId>
                <version>2.1.1</version>
            </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>2.1.1</version>
            </dependency>
    
    
        </dependencies>
    View Code
    val sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
    支持hive

     测试:

    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    object TestSparkSql {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkSql").setMaster("local[*]")
        val sc: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
        sc.sql("show tables").show()
        sc.stop()
      }
    }

                  

    SparkSQL 的元数据

    1.1元数据的状态

    SparkSQL 的元数据的状态有两种:

    1、in_memory,用完了元数据也就丢了

    2、hive , 通过hive去保存的,也就是说,hive的元数据存在哪儿,它的元数据也就存在哪儿。

    换句话说,SparkSQL的数据仓库在建立在Hive之上实现的。我们要用SparkSQL去构建数据仓库的时候,必须依赖于Hive。

    2.2Spark-SQL脚本

    如果用户直接运行bin/spark-sql命令。会导致我们的元数据有两种状态:

    1、in-memory状态:如果SPARK-HOME/conf目录下没有放置hive-site.xml文件,元数据的状态就是in-memory

    2、hive状态:如果我们在SPARK-HOME/conf目录下放置了,hive-site.xml文件,那么默认情况下,spark-sql的元数据的状态就是hive.

    伴生对象相当于static,可直接类名.
    给类起别名,相当于属性使用type ..

    spark.sql("select age, addName(name) from user").show
    
    scala> case class tbStock(ordernumber:String,locationid:String,dateid:String) extends Serializable
    scala> val tbStockRdd = spark.sparkContext.textFile("/opt/module/datas/sparkData/tbStock.txt")
    tbStockRdd: org.apache.spark.rdd.RDD[String] = /opt/module/datas/sparkData/tbStock.txt MapPartitionsRDD[30] at textFile at <console>:23
    scala> val tbStockDS = tbStockRdd.map(_.split("	")).map(x => tbStock(x(0), x(1), x(2))).toDS
    tbStockDS: org.apache.spark.sql.Dataset[tbStock] = [ordernumber: string, locationid: string ... 1 more field]
    
    scala> tbStockDS.show
    +-----------+----------+----------+
    |ordernumber|locationid|    dateid|
    +-----------+----------+----------+
    |      lj111|        jd| 2018-3-13|
    |      lj112|        jd| 2018-2-13|
    |      lj113|        jd| 2019-1-13|
    |      lj114|        jd| 2019-3-13|
    |      lj115|        jd| 2018-9-13|
    |      lj116|        jd|2018-11-13|
    |      lj117|        jd|2017-12-13|
    |      lj118|        jd| 2017-5-13|
    +-----------+----------+----------+
    
    scala> case class tbStockDetail(ordernumber:String, rownum:Int, itemid:String, number:Int, price:Double, amount:Double) extends Serializable
    defined class tbStockDetail
    scala> val tbStockDetailRdd = spark.sparkContext.textFile("/opt/module/datas/sparkData/tbStockDetail.txt")
    tbStockDetailRdd: org.apache.spark.rdd.RDD[String] = /opt/module/datas/sparkData/tbStockDetail.txt MapPartitionsRDD[43] at textFile at <console>:23
    scala> val tbStockDetailDS = tbStockDetailRdd.map(_.split("	")).map(x => tbStockDetail(x(0), x(1).trim().toInt, x(2), x(3).trim().toInt, x(4).trim().toDouble,x(5).trim().toDouble)).toDS
    tbStockDetailDS: org.apache.spark.sql.Dataset[tbStockDetail] = [ordernumber: string, rownum: int ... 4 more fields]
    
    scala> tbStockDetailDS.show
    +-----------+------+------+------+-----+------+
    |ordernumber|rownum|itemid|number|price|amount|
    +-----------+------+------+------+-----+------+
    |      lj111|    12|item11|    10|100.0| 300.0|
    |      lj112|    12|item12|    10|100.0| 200.0|
    |      lj113|    12|item13|    10|100.0| 300.0|
    |      lj114|    12|item14|    10|100.0| 100.0|
    |      lj115|    12|item15|    10|100.0| 300.0|
    |      lj116|    12|item16|    10|100.0| 700.0|
    |      lj117|    12|item17|    10|100.0| 600.0|
    |      lj118|    12|item18|    10|100.0| 500.0|
    +-----------+------+------+------+-----+------+
    tbstock、tbstockdetail--amount 、tbdate
    计算所有订单中每年的销售单数、销售总额
    三个表连接后以count(distinct a.ordernumber)计销售单数,sum(b.amount)计销售总额
    select 
        theyear, 
        count(tbstock.ordernumber), 
        sum(tbstockdetail.amount) 
    from tbstock join tbstockdetail on tbstock.ordernumber = tbstockdetail.ordernumber 
                 join tbdate on tbdate.dateid = tbstock.dateid 
        group by tbdate.theyear 
        order by tbdate.theyear;
    
    
    统计每年最大金额订单的销售额:
    统计每个订单一共有多少销售额
    select 
        a.dateid, 
        a.ordernumber, 
        sum(b.amount) sumAmount
    from tbstock a join tbstockdetail b on a.ordernumber = b.ordernumber
        group by a.dateid, a.ordernumber
    
    select 
        theyear, 
        max(c.sumAmount) sumOfAmount
    from tbdate join (select a.dateid, a.ordernumber, sum(b.amount) sumAmount
    from tbstock a join tbstockdetail b on a.ordernumber = b.ordernumber
        group by a.dateid, a.ordernumber)c on tbdate.dateid = c.dateid
        group by tbdate.theyear order by tbdate.theyear desc
    
    计算所有订单中每年最畅销货品
    目标:统计每年最畅销货品(哪个货品销售额amount在当年最高,哪个就是最畅销货品)
    1求出每年每个货品的销售额
    每年 tbdate.theyear 
    货品tbstockdetail.itemid
    销售额amount在当年最高 
    select tbdate.theyear, tbstockdetail.itemid, sum(tbstockdetail.amount) sumAmount
    from tbdate join tbstock on tbdate.dateid = tbstock.dateid
    join tbstockdetail on tbstockdetail.ordernumber = tbstock.ordernumber
    group by tbdate.theyear, tbstockdetail.itemid
    
    2在第一步的基础上,统计每年 所有 货品中的最大金额
    select aa.theyear, max(sumAmount) maxAmount
        from (
    select tbdate.theyear, tbstockdetail.itemid, sum(tbstockdetail.amount) sumAmount
        from tbdate 
        join tbstock on tbdate.dateid = tbstock.dateid
        join tbstockdetail on tbstockdetail.ordernumber = tbstock.ordernumber
            group by tbdate.theyear, tbstockdetail.itemid)aa
            group by aa.theyear
    
    用最大销售额和统计好的每个货品的销售额join,以及用年join,集合得到最畅销货品那一行信息
    每年每个货品的销售额 join 每年所有货品中的最大金额
    select distinct e.theyear, e.itemid, f.maxAmount
        from (
    select tbdate.theyear, tbstockdetail.itemid, sum(tbstockdetail.amount) sumAmount
        from tbdate 
            join tbstock on tbdate.dateid = tbstock.dateid
            join tbstockdetail on tbstockdetail.ordernumber = tbstock.ordernumber
        group by tbdate.theyear, tbstockdetail.itemid)e join (select aa.theyear, max(sumAmount) maxAmount
        from (
    select tbdate.theyear, tbstockdetail.itemid, sum(tbstockdetail.amount) sumAmount
        from tbdate join tbstock on tbdate.dateid = tbstock.dateid
            join tbstockdetail on tbstockdetail.ordernumber = tbstock.ordernumber
        group by tbdate.theyear, tbstockdetail.itemid)aa
        group by aa.theyear)f on e.theyear = f.theyear and e.sumAmount = f.maxAmount
        order by e.theyear
  • 相关阅读:
    Linux ansible 常用模块二
    Linux之ansible 常用模块
    flask websocket实现用例
    flask--上下文原理
    python Django
    python 并发编程 锁 / 信号量 / 事件 / 队列(进程间通信(IPC)) /生产者消费者模式
    并发编程 process 模块的方法及运用 僵尸与孤儿
    python 并发编程 操作系统 进程 并发.并行 及 同步/异步,阻塞/非阻塞
    python 网络编程粘包解决方案2 + ftp上传 + socketserver
    第一节 机器学习基础
  • 原文地址:https://www.cnblogs.com/shengyang17/p/10683487.html
Copyright © 2011-2022 走看看