zoukankan      html  css  js  c++  java
  • 大数据之Spark SQL

     

    1、Spark SQL简介

    说SparkSql之前,就不得不说下它的前身-Shark。首先,Hive是一个基于Hadoop的数据仓库工具,用户可以通过HiveQL语句将其转化为MapReduce任务来运行。其主要过程是用户输入HiveQL语句,进入到驱动模块后编译器会进行解析辨析,并有优化器对该操作进行优化计算。接下来将其交给执行器去执行器,执行器会启动一个或多个MapReduce任务。

    Spark为了利用SQL的优势,借鉴了HiveQL转化为MapReduce的经验,诞生了Sparksql的前身,即Shark。为了保证和Hive的兼容,Shark重用了Hive的HiveQL解析、逻辑执行计划翻译、执行计划优化等逻辑,唯一改变的就是将物理执行计划从MapReduce作业改变成Spark作业。相比较于Hive本身,这种SQL-on-Hadoop的性能提升了10~100倍。但是仍有两个大问题:

    1.执行计划完全依赖于Hive,不方便添加新的优化策略。

    2.因为Spark是线程级并行,而MapReduce是进程级运行,所以Spark在兼容Hive的实现上存在线程安全问题。

    于是,后面诞生了Spark SQL。Spark SQL在Hive兼容层面仅依赖HiveQL解析和Hive元数据,即是HiveQL语句被解析成抽象语法树(AST)后,接下来的工作全部由Spark SQL来执行,执行计划生成和优化均由Catalyst(函数式关系查询优化框架)负责。另外,Spark SQL增加了DataFrame,兼容了多种外部数据源的获取和存储功能

     

    2、DataFrame创建和保存

    从Spark2.0以后开始,Spark使用全新的SparkSession接口替代了Spark1.6中的SQLContext以及HiveContext接口。其支持从不同的数据源加载数据,以及将数据转化为DataFrame,而且可以将DataFrame再转化为表,使用SQL语句来操作数据。本篇后续会统一使用SparkSession接口来做演示。启动spark-shell,默认创建SparkContext对象(简称sc)和SparkSession对象(简称spark)。

    1.导入隐式转换的包,支持RDD转化为DataFrame以及后续的SQL操作

    import spark.implicits._
    

    2.读取数据源创建RDD,用read和write方法。下面演示为本地读取,保存在HDFS上,注意本地和集群的区别。

    scala> spark.read.format("csv").load("file:///data/user_info.csv")          //读取csv文件并转化为DataFrame
    scala> spark.read.format("json").load("file:///data/user_info.json")        //读取json文件并转化为DataFrame
    scala> spark.read.format("parquet").load("file:///data/user_info.parquet")  //读取parquet文件并转化为DataFrame
    scala> 
    scala> df.write.format("csv").save("hdfs://ahcuc:8020/data/user_info.csv")          //读取DataFrame并保存为csv文件
    scala> df.write.format("json").save("hdfs://ahcuc:8020/data/user_info.json")        //读取DataFrame并保存为json文件
    scala> df.write.format("parquet").save("hdfs://ahcuc:8020/data/user_info.parquet")  //读取DataFrame并保存为parquet文件
    
     

    3、RDD创建DataFrame

    (1)反射机制推断RDD模式

    利用反射机制来推断包含特定类型的RDD模式(Schema),适用于对已经数据结构的RDD进行转换。

    1.导入隐式转换的包,支持RDD转化为DataFrame以及后续的SQL操作

    scala> import spark.implicits._ 
    import spark.implicits._
    

    2.定义case class,匹配表结构

    scala> case class user_info(name: String, age: Long) 
    defined class user_info
    

    3.读取文本文件,反射机制推断Schema,并转换为DataFrame

    scala> val user_info_DF = spark.sparkContext.textFile("file:///data1/user_info.txt").map(_.split(",")).map(x=>user_info(x(0),x(1).trim.toInt)).toDF()
    user_info_DF: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
    scala> user_info_DF.show()
    +------+---+
    |  name|age|
    +------+---+
    |   TOM| 16|
    | LiLei| 20|
    |Maggie| 18|
    +------+---+
    

    4.注册表为临时表,方便后续查询使用

    scala> user_info_DF.createOrReplaceTempView("user_info")
    

    5.使用sql接口调用,返回的是DataFrame

    scala> val results = spark.sql("select * from user_info where age > 18")
    results: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
    
    scala> results.show()
    +-----+---+
    | name|age|
    +-----+---+
    |LiLei| 20|
    +-----+---+
    

    (2)编程方式定义RDD

    若是无法提前定义case class,则可以通过编程方式定义RDD模式构造Schema,并将其应用在已知的RDD上。

    1.制作表头Schema,其描述了模式信息,模式中包含name和age字段。

    scala> import org.apache.spark.sql.types._
    import org.apache.spark.sql.types._
    
    scala> val fields = Array(StructField("name",StringType,true),StructField("age",IntegerType,true))
    fields: Array[org.apache.spark.sql.types.StructField] = Array(StructField(name,StringType,true), StructField(age,IntegerType,true))
    
    scala> val schema = StructType(fields)
    schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))
    

    2.表中的记录

    scala> import org.apache.spark.sql.Row
    import org.apache.spark.sql.Row
    
    scala> val user_info_RDD = spark.sparkContext.textFile("file:///data1/user_info.txt").map(_.split(",")).map(x=>Row(x(0),x(1).trim.toInt))
    user_info_RDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[13] at map at <console>:30
    

    3.拼接起来表头和表中记录

    scala> val user_info_DF = spark.createDataFrame(user_info_RDD,schema)
    user_info_DF: org.apache.spark.sql.DataFrame = [name: string, age: int]
    

    4.注册为临时表

    scala> val results = user_info_DF.createOrReplaceTempView("user_info")
    results: Unit = ()
    

    5.使用sql接口调用,返回的是DataFrame

    scala> val results = spark.sql("select * from user_info where age < 18")
    results: org.apache.spark.sql.DataFrame = [name: string, age: int]
    
    scala> results.show()
    +----+---+
    |name|age|
    +----+---+
    | TOM| 16|
    +----+---+
    
     

    4、Spark SQL读写数据库

    (1)通过JDBC连接数据库

    1.读取数据

    //方式一:
    scala> val jdbcDF = spark.read.format("jdbc")
               .option("url", "jdbc:mysql://127.0.0.1:3306/spark")
               .option("driver","com.mysql.jdbc.Driver")
               .option("dbtable", "db.user_info")
               .option("user", "root")
               .option("password", "123456")
               .load();
    scala> jdbcDF.select("name","age").show();
    
    //方式二:
    //创建一个变量用来保存JDBC的链接参数
    scala> import java.util.Properties
    scala> val prop = new Properties()
    scala> prop.put("user","root")
    scala> prop.put("password","123456")
    scala> prop.put("driver","com.mysql.jdbc.Driver")
    
    scala> val jdbcDF2 = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/spark","db.user_info",prop)
    scala> jdbcDF.select("name","age").show();
    

    2.写入数据

    首先创建DataFrame,过程略。

    //方式一:
    scala> user_info_DF.write
              .format("jdbc")
              .option("url", "jdbc:mysql://127.0.0.1:3306/hive")
              .option("dbtable", "db.user_info")
              .option("user", "root")
              .option("password", "123456")
              .save()
    
    //方式二:
    //创建一个变量用来保存JDBC的链接参数
    scala> import java.util.Properties
    scala> val prop = new Properties()
    scala> prop.put("user","root")
    scala> prop.put("password","123456")
    scala> prop.put("driver","com.mysql.jdbc.Driver")
    
    scala> user_info_DF.write.mode("append").jbdc("jdbc:mysql://127.0.0.1:3306/spark", "db.user_info", prop)
    

    (2)连接Hive读写数据

    1.读入数据

    scala> spark.sql("select * from db.user_info where age > 18")
    

    2.写入数据

    scala> spark.sql("insert into db.user_info_tmp select * from db.user_info where age > 18")
    
     

    5、总结

    本篇主要介绍了Spark SQL的基础知识以及与外部数据存储系统的读写方式。若有不足之处,欢迎指正。

  • 相关阅读:
    logback
    GC
    常用JVM配置参数
    JVM
    linux
    简单的webService 实例
    [转载]Java 工程师成神之路
    ActiveMQ 在mac 上的安装与运行
    subline3 + emmet 加快前端开发效率
    Spring WebMVC 4.1.4返回json时导致的 406(Not Acceptable)
  • 原文地址:https://www.cnblogs.com/Maggieli/p/12706302.html
Copyright © 2011-2022 走看看