zoukankan      html  css  js  c++  java
  • Spark SQL

    Spark SQL简介

    Shark

    Shark即Hive On Spark,Shark再HiveQL方面重用了Hive中HiveQL的解析器、编译器、优化器,可以近似认为仅将物理执行计划从MapReduce作业替换成了Spark作业,通过Hive的HiveQL解析,把HiveQL翻译成Spark上的RDD操作

    但是Shark存在两个问题:

    • 不方便添加新的优化策略,依赖Hive的计划优化(针对MapReduce)
    • MapReduce是进程级并行,Spark是线程级并行,导致线程安全问题

    因此需要一个新的类SQL组件,就出现了Spark SQL。

    Spark SQL架构

    整体上,与Hive架构类似,具有解析器、优化器、编译器、执行器。将SQL语句转换为抽象语法树,然后把抽象语法树转换成逻辑执行计划,接着优化逻辑执行计划,最后将逻辑执行计划转换为物理执行计划,并执行。

    从HQL被解析为抽象语法树AST后,后面组件代码全部由Spark重写。编译器被替换为Catalyst(函数式关系查询优化框架)。

    小结

    Spark SQL提供了DataFrame API,可以对内部和外部的数据源执行各种关系操作,其次支持大量数据分析算法,可以提高机器学习算法的数据处理能力。

    DataFrame

    大数据时代,用户需要读取结构化数据,也要读取大量非结构化数据。于是增加了DataFrame,具有模式信息的RDD,使用户可以在Spark SQL执行SQL语句,数据既可以来之RDD,也可以来自Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据。

    DataFrame与RDD的区别

    • Spark能够轻松从MySQL转换到DataFrame
    • RDD是分布式的Java对象集合,对象内部结构对于RDD来说是未知的
    • DataFrame以RDD为基础,但提供了详细的结构信息

    DataFrame的创建

    从Spark2.0开始Spark使用SparkSession接口替代SQLContext和HiveContext接口实现对数据加载、转换、处理等功能。

    SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并支持把DataFrame转换成表,然后用SQL语句操作数据,也提供了其他依赖于Hive的功能支持。

    from pyspark import SparkContext,SparkConf
    from pyspark.sql import SparkSession
    spark = SparkSession.bulder.config(conf = SparkConf()).getOrCreate()
    

    在pyspark里,会自动生成sc和spark对象,分别为SparkContext和SparkSession

    //读取text,json,parquet
    spark.read.text("people.txt")
    spark.read.json("people.json")
    spark.read.parquet("people.parquet")
    
    spark.read.format("text").load("people.txt")
    spark.read.format("json").load("people.json")
    

    DataFrame保存

    //df为DataFrame
    df.write.txt("people.txt")
    df.write.json("people.json")
    
    df.write.format("text").save("people.txt")
    df.write.format("json").save("people.json")
    

    DataFrame操作

    • .printSchema(),打印模式信息,显示结构信息
    • .select(),选取列的信息
    • .show(),显示信息
    • .filter(),过滤条件
    • .groupBy(),分组
    • .count(),统计
    • .sort(),排序

    利用反射机制推断RDD

    实现RDD和DataFrame的转换

    from pyspark.sql import Row
    //RDD转换DataFrame,先生成Row对象
    people = spark.SparkContext.textFile("people.txt").
    map(lambda line:line.split(",")).
    map(lambda p:Row(name=p[0],age=int(p[1])))
    
    schemaPeople = spark.createDataFrame(people)
    //把DataFrame注册成临时表,才能查询使用
    schemaPeople.createOrReplaceTempView("people")
    //DataFrame每一个元素都是一行记录,包含两个字段,用p.name和p.age来获取值
    personsDF = spark.sql("select name,age from people where age > 20")
    //DataFrame转换成RDD
    personsRDD = personsDF.rdd.map(lambda p:"Name:"+p.name+","+"Age:"+str(p.age))
    personsRdd.foreach(print) 
    

    使用编程方式定义RDD模式

    当无法得知模式结构时,需要采用编程方式定义RDD模式

    • 第一步:制作表头
    • 第二步:制作记录,使用Row对象
    • 第三步:拼接表头,表记录
    //生成表头
    schemaString = "name age"
    //fields = [name,age] StringType类型 NULL为TRUE
    fields = [StructField(field_name,StringType(),True) for field_name in schemaString.split(" ")]
    schema = StructType(fields)
    //生成表记录
    lines = spark.spark>context.textFile("people.txt")
    parts = lines.map(lambda x:x.split(","))
    people = parts.map(lambda p:Row(p[0],p[1].strip()))
    //把表头和表记录连接
    schemaPeople = spark.createDataFrame(people,schema)
    //注册临时表
    schemaPeople.createOrReplaceTempView("people")
    result = spark.sql("SELECT name,age FROM people")
    result.show()
    

    读写MySQL

    先安装MySQL的JDBC驱动程序,放到spark/jars

    读取数据
    //.option()函数配置数据库等信息
    jdbcDF = spark.read.format("jdbc")
    .option("driver","com.mysql.jdbc.Driver")
    .option("url","jdbc:mysql://localhost:3306/spark").option("dbtable","student")
    .option("user","root").option("password","root").load()
    
    jdbcDF.show()
    
    写入数据
    from pyspark.sql import Row
    from pyspark.sqLtypes import *
    from pyspark import SparkContext,SparkConf
    from pyspark.sql import SparkSession
    
    spark = SparkSession.build.config(conf = SparkConf()).getOrCreate()
    
    //设置模式信息
    schema = StructType([StructField("id",IntegerType(),True),StructField("name",StringType(),True),StructField("gender",StringType(),True),StructField("age",IntegrType(),True)])
    
    //设置两条信息
    studentRDD = spark.sparkContext.parallelize(["3 name1 M 21","4 name2 M 22"]).map(lambda x:x.split(" "))
    
    rowRDD = studentRdd.map(lambda p:Row(int(p[0].strip()),p[1].strip(),p[2].strip(),int(p[3])))
    
    //连接Row对象和模式信息
    studentDF = spark.createDataFrame(rowRDD,schema)
    
    //设置数据库信息,并写入
    prop={}
    prop['user']='root'
    prop['password']='root'
    prop['driver']='com.mysql.jdbc.Driver'
    studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark","student","append",prop)
    
  • 相关阅读:
    全面理解面向对象的 JavaScript
    账号
    移动端 前端框架 amaze ui
    javascript 精典案例分析一览
    前端事件系统(一)
    周总结12
    周总结11
    相比较于其他的同类软件
    团队冲刺第十五天
    团队冲刺第十四天
  • 原文地址:https://www.cnblogs.com/chenshaowei/p/12504077.html
Copyright © 2011-2022 走看看