zoukankan      html  css  js  c++  java
  • SparkSQL

    1、SparkSql概述

    1、什么是SparkSql?

    SparkSql用于处理结构化数据,底层还是RDD

    2、SparkSql的两个数据抽象: DataFrame、DataSet

    1、什么是DataFrame

    DataFrame可以当做一个二维表格,有schema信息<有列名、列类型>
    DataFrame只关注列不关注行的类型,不管每个元素<每行>是什么类型,表现出来都是Row类型

    2、什么是DataSet

    DataSet可以当做一个二维表格,有schema信息<有列名、列类型>
    DataSet即关注列也关注行的类型,每个的数据类型是啥,表现出来就是啥

    3、DataFrame与DataSet的区别:

    1、DataFrame是弱类型,DataSet是强类型
    2、DataFrame是运行期安全,编译器不安全。DataSet是编译器安全,运行期也安全

    4、DataFrame与DataSet的使用时机:

    1、如果是将rdd转成sparksql编程,
    此时如果rdd里面的元素类型是样例类,转成DataSet或者DataFrame都可以
    此时如果rdd里面的元素类型元组,推荐转成DataFrame,可以通过toDF指定列名
    2、如果想要使用map、flatMap这种写函数的强类型算子,推荐使用DataSet

    5、RDD、DataFrame、DataSet的联系

    1、RDD、DataFrame、DataSet都是弹性分布式数据集
    2、RDD、DataFrame、DataSet都是惰性执行的,都需要调用action算子之后才会真正执行
    3、RDD、DataFrame、DataSet都有分区
    4、RDD、DataFrame、DataSet有很多共同的函数: map、flatMap、filter..
    5、RDD、DataFrame、DataSet都是数据在内存与磁盘中动态存储

    2、SparkSql编程

    1、创建SparkSession: SparkSession.builder().master("").appName(..).getOrCreate()

    2、DataFrame创建:

    1、通过toDF方法

    要想使用toDF方法必须导入隐式转换: import sparksession对象名.implicits._
    1、集合.toDF()
    2、rdd.toDF()
    toDF有两个重载的方式,如果调用的是无参的toDF,此时会生成默认的列名<如果集合/rdd中的元素是样例类,会将属性名作为列名,如果是元组,列名就是_1,_2形式>
    所以如果元素是元组,可以有参的toDF方法指定列名<指定的列名的个数必须与列的个数要相同>

    2、通过读取文件: spark.read.csv/json/jdbc..

    3、通过其他DataFrame衍生

    3、DataSet创建

    1、通过toDS方法

    要想使用toDS方法必须导入隐式转换: import sparksession对象名.implicits._
    1、集合.toDS()
    2、rdd.toDS()
    toDS方法生成的DataSet此时会生成默认的列名<如果集合/rdd中的元素是样例类,会将属性名作为列名,如果是元组,列名就是_1,_2形式>

    2、通过读取文件: spark.read.textFile()

    3、通过其他DataFrame衍生

    4、SparkSql编程的两种方式:

    1、SQL风格

    1、将df/ds注册成表:
    createTempView:: 注册成临时表
    createOrReplaceTempView: 注册成临时表[如果表已经存在会替换],只能在当前SparkSession中使用,后续只在使用表的时候直接用 表名 既可以
    createGlobalTempView:注册成全局表
    createOrReplaceGlobalTempView: 注册成全局表,可以在多个sparkSession中使用,后续在使用的时候,必须通过 global_temp.表名 的方式使用
    2、sql编写: spark.sql("sql语句")

    2、DSL风格: 使用select、filter、where、groupBy等api变成

    常用的DSL api:

    1、过滤:

    1、filter("过滤条件") // filter("age>20")
    2、where("过滤条件") // where("age>20")

    2、去重:

    1、distinct: 只有所有列都相同才会去重
    2、dropDuplicates: 当指定列相同的时候就会去重

    3、列裁剪: selectExpr("字段名","函数(字段名)","字段名 as 别名")

    5、RDD、DataFrame、DataSet转换

    1、RDD转DataFrame: rdd.toDF/rdd.toDF(列名,列名,..)
    2、DataFrame转rdd: df.rdd
    3、Rdd转DataSet: rdd.toDS
    4、DataSet转rdd: val rdd:RDD[DataSet元素类型] = ds.rdd
    5、DataFrame转DataSet: val ds:DataSet[类型] = df.as[类型]
    DataFrame转DataSet的时候,
    如果as后面的类型是样例类,需要样例类的属性名要与列名一致。
    如果as后面的类型是元组,需要元组的个数 = 列的个数,类型也要一致
    6、DataSet转DataFrame: ds.toDF/ds.toDF(列名,列名,..)

    6、Row类型的取值: row.getAs[ 列的类型 ] ( "列名" )

    7、自定义函数:

    1、自定义UDF函数:

    1、定义普通函数

    val func = (id:Int) => id+"-001"

    2、注册udf函数: spark.udf.register("函数名",函数)

    spark.udf.register("myfunc",func)

    3、通过sql使用函数:

    spark.sql("select myfunc(id) from 表名")

    2、自定义udaf函数

    1、弱类型udaf:

    1、定义class继承UserDefinedAggregateFunction
    2、重写抽象方法

    def inputSchema: StructType <定义udaf参数类型>
    def bufferSchema: StructType <定义中间变量的参数类型>
    def dataType: DataType <定义最终结果类型>
    def deterministic: Boolean <一致性>
    def initialize(buffer: MutableAggregationBuffer): Unit <初始化中间变量>
    def update(buffer: MutableAggregationBuffer, input: Row): Unit <每次传入组中一个值,更新中间变量>
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit <合并所有task的统计结果>
    def evaluate(buffer: Row): Any <获取最终结果>

    3、注册udaf:

    1、创建自定义udaf对象: val obj = new xxx
    2、注册: spark.udf.register("函数名",obj)

    2、强类型的udaf

    1、定义class继承Aggregator[IN,BUF,OUT]

    IN: udaf参数类型
    BUF: 中间变量类型
    OUT: 最终结果类型

    2、重写方法

    def zero: Buff <中间变量赋初始值>
    def reduce(buff: Buff, age: Int): Buff <在每个分区中先预聚合,每个传入一个元素,更新中间结果>
    def merge(b1: Buff, b2: Buff): Buff <对所有分区的结果再次聚合>
    def finish(reduction: Buff): Double <获取最终结果>
    def bufferEncoder: Encoder[Buff] <对中间结果类型编码>
    def outputEncoder: Encoder[Double] <对最终结果类型编码>

    3、注册

    1、创建udaf对象: val obj = new XXX
    2、导入隐式转换,使用udaf函数:
    import org.apache.spark.sql.functions._
    val uobj = udaf(obj)
    3、注册: spark.udf.register("函数名",uobj)

    3、数据读取与保存

    1、读取

    1、文件读取:

    1、spark.read

    .format() --指定文件读取格式[csv/json/text/parquet/orc]
    .option().option().. --指定读取的参数
    .load(path) --指定加载路径的数据
    在读取文件的时候,一般只有csv文件才需要配置option,csv文件常用的option:
    sep: 指定字段之间的分隔符
    header: 指定是否以文件的第一行作为列名
    inferSchema: 指定是否自动推断字段的类型

    2、spark.read[.option()].csv/json/csv/parquet/orc

    2、mysql数据读取

    1、spark.read

    .format() --指定文件读取格式[jdbc]
    .option().option().. --指定读取的参数<账号、密码、driver、表、url>
    .load() --指定加载路径的数据

    2、spark.read.jdbc(url,表名,参数设置): 此种方式读取jdbc的时候分区数 = 1,只能用于数据量小的场景

    spark.read.jdbc(url,表名,分区条件参数,参数设置): 此种方式读取jdbc的时候分区数=分区条件参数数组的元素个数。<不常用>
    val arr = Array("age<20","age>=20 and age<40","age>=40")
    spark.read.jdbc("jdbc:mysql://xx:3306/test","person",arr,参数设置)
    spark.read.jdbc(url,表名,mysql字段名,lowerBound,uperBound,分区数,参数设置): <工作常用>
    此种方式读取的时候,分区数 = (uperBound-lowerBound) > 分区数 ? 分区数 : uperBound-lowerBound

    2、数据保存:

    1、df/ds.write

    .mode() --指定写入模式
    .format() --指定数据写入的格式[csv/json/parquet/orc/jdbc]
    .option() --指定数据写入的时候需要的参数
    csv文件写入的时候指定的option:
    header: 写入的时候是否将列名也写入
    sep: 写入的时候指定字段之间的分隔符
    .save() --数据保存

    2、df/ds.write.mode(..).csv/json/parquet/orc/jdbc

    常用的写入模式:
    SaveMode.Overwrite: 如果指定的路径/表已经存在,则覆盖历史数据<数据写入HDFS的时候使用>
    SaveMode.Append: 如果指定的路径/表已经存在,则追加数据<数据写入mysql的时候使用>
    如果写入mysql的时候,主键数据已经存在,此时不能使用append,需要通过foreachPartitions对数据进行更新写入

    3、hive的数据读取和保存

    1、读取数据:

    1、在创建SparkSession的时候通过enableHiveSupport需要开启hive的支持:

    SparkSession.builder().master(..).appName(..).enableHiveSupport().getOrCreate

    2、直接在代码中通过spark.sql("查询hive表数据")

    2、保存数据到hive表:

    df/ds.write.mode(..).saveAsTable("hive表") <不常用,一般都是将数据写入HDFS>

    4、多维聚合:<对多个维度按照不同的组合进行聚合>

    grouping sets:
    语法: select 维度1,维度2,..,聚合函数 from 表 group by 维度1,维度2,.. grouping sets( (维度1),(维度1,维度2),(..) )
    grouping sets后面的字段名必须是group by后面的字段
    案例:
    select A,B,C,count(1) from person group by A,B,C grouping sets( (A),(A,B),(B,C),(A,B,C) )
    等价于:
    select A,null B,null C,count(1) from person group by A
    union
    select A,B,null C,count(1) from person group by A,B
    union
    select null A,B,C,count(1) from person group by B,C
    union
    select A,B,C,count(1) from person group by A,B,C

    作者:Ya
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须在文章页面给出原文连接,否则保留追究法律责任的权利。
  • 相关阅读:
    Win7 64位系统上Hadoop单机模式的安装及开发环境搭建
    HBase配置文件设置
    YARN HA 配置文件设置
    Hadoop的配置文件设置(HDFS HA)
    记一次java heap space的解决办法
    记一次sql优化——left join不走索引问题
    js黑魔法
    css坑了我一下下之line-height
    target-densitydpi=device-dpi会使其他ui插件布局变小
    redis缓存过期key优化-缓存不释放
  • 原文地址:https://www.cnblogs.com/1463490Ya/p/15526711.html
Copyright © 2011-2022 走看看