zoukankan      html  css  js  c++  java
  • Spark SQL的介绍和DataFrame的建立及使用

    1.

    Spark SQL定位处理结构化数据的模块。SparkSQL提供相应的优化机制,并支持不同语言的开发API。
    java、scala、Python,类SQL的方法调用(DSL)
    2.

    RDD与Spark SQL的比较说明:
      使用Spark SQL的优势:a.面向结构化数据;b.优化机制;
      RDD缺点:a.没有优化机制,如对RDD执行Filter操作;
         b.RDD类型转换后无法进行模式推断
    3.

    DataFrame/SchemaRDD
      DataFrame是一个分布式的数据集合,该数据集合以命名列的方式进行整合。
      Dateframe=RDD(数据集)+Schema(元数据/模型)
      SchemaRDD就是DataFrame的前身,在1.3.0版本后。
      DataFrame存放的是ROW对象。每个Row 对象代表一行记录。      

      SchemaRDD还包含记录的结构信息(即数据字段)
    4.

    创建Spark SQL环境
      a.将SparkSQL依赖库添加至pom.xml文件中
        <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.2</version>
        </dependency>
      b.创建SparkSQL Context-->SparkSession
        通过SparkSession.builder()创建构造器;
        并调用.appName("sparkSQL").master("local")设置集群模式以及app名称
        最后必须调用getOrCreate()方法创建SparkSession对象。
        val spark = SparkSession.builder().appName("sparkSQL").master("local").getOrCreate()
      c.加载外部数据源:
        通过SparkSession的read()方法加载不同的数据源:json、CVS、jdbc、textfile、parquert等
        val df = spark.read.textFile("file:///d:/测试数据/users.txt").toDF()
        df.show()

    DF的创建方式

      

      (1)通过SparkSession的createDataFrame(...)方法创建DF对象
        a.将Seq序列转换为DF
        b.将RDD[Product]多元素转换为DF
      (2)通过SparkSession的read读取外部文件调用toDF()
      (3)通过导入隐式转换,可直接将Scala中的序列转换为DF
        val spark = SparkSession.builder().appName("sparkSQL").master("local").getOrCreate()
        import spark.implicits._
        val list = List(("zhangsan",12,"changchun"),("lilei",25,"haerbin"))
        val df_implicits = list.toDF()

     查看DF的Schema

    1.案例说明:
      val rdd = sc.textFile("file:///d:/测试数据/users.txt").map(x=>x.split(" ")).map(x=>(x(0),x(1),x(2)))
      val df_rdd = spark.createDataFrame(rdd)
      df_rdd.show()
      df_rdd.select("_1","_2").where("_1 like '%o%'").show()
      df_rdd.printSchema()
        root
          |-- _1: string (nullable = true)
          |-- _2: string (nullable = true)
          |-- _3: string (nullable = true)
      通过case用例类可以对DF进行Schema匹配
      case class Person(name:String,age:Int,address:String)

      val rdd = sc.textFile("file:///d:/测试数据/users.txt").map(x=>x.split(" ")).map(x=>new Person(x(0),x(1).toInt,x(2)))
      val df_rdd = spark.createDataFrame(rdd)
      df_rdd.printSchema()
        root
          |-- name: string (nullable = true)
          |-- age: integer (nullable = true)
          |-- address: string (nullable = true)
      df_rdd.show()
        +------+---+-------+
        | name|age|address|
        +------+---+-------+
        | anne| 22| NY|
        | joe| 39| CO|
        |alison| 35| NY|
        +------+---+-------+
    2.实现简单的select操作
      df_rdd.select("name","age").where("name like '%o%'").show()
        +------+---+
        | name|age|
        +------+---+
        | joe| 39|
        |alison| 35|
        | bob| 71|
        +------+---+

     DF的操作方式

    1.显示:
      df_rdd.show()
    2.查询
      df_rdd.select("name").show()
    3.条件查询:
      df_rdd.select($"name",$"age").where("name like '%o%'").show() //注:引入spark.implicits._
        +------+---+
        | name|age|
        +------+---+
        | joe| 39|
        |alison| 35|
        | bob| 71|
        +------+---+
    4.条件查询:
      df_rdd.select($"name",$"age"+1).where("name like '%o%'").show() //$是scala的用法,需要隐式转换 import spark.implicits._
        +------+---------+
        | name|(age + 1)|
        +------+---------+
        | joe| 40|
        |alison| 36|
        | bob| 72|
        +------+---------+
    5.过滤操作
      a.通过过滤表达式:
        df_rdd.filter("age > 36").show()
      b.通过func式编程进行处理,DF中每个元素均为ROW
        df_rdd.filter(x=>{if(x.getAs[Int]("age") > 36) true else false }).show()
    6.分组操作
        df_rdd.groupBy("address").count().show
          +-------+-----+
          |address|count|
          +-------+-----+
          | OR| 2|
          | VA| 2|
          | CA| 2|
          | NY| 3|
          | CO| 1|
          +-------+-----+

  • 相关阅读:
    基于Enterprise Library 6 的AOP实现
    命行下的查询与替换字符串
    软件架构中质量特性
    【redis】突然流量增大,不定时挂死排障记录
    Heritrix 3.1.0 源码解析(二)
    Apache Jackrabbit源码研究(四)
    Heritrix 3.1.0 源码解析(三)
    Apache Jackrabbit源码研究(五)
    Heritrix 3.1.0 源码解析(一)
    JVM 自定义的类加载器的实现和使用
  • 原文地址:https://www.cnblogs.com/lyr999736/p/10202276.html
Copyright © 2011-2022 走看看