zoukankan      html  css  js  c++  java
  • 02spark sql

    1、概念

      Spark SQL是一个用来处理结构化数据的Spark组件。

      优点:

          ①SparkSQL是一个SQL解析引擎,将SQL解析成特殊的RDD(DataFrame),然后在Spark集群中运行
          ②SparkSQL是用来处理结构化数据的(先将非结构化的数据转换成结构化数据)
          ③SparkSQL支持两种编程API 1.SQL方式 2.DataFrame的方式(DSL)
          ④SparkSQL兼容hive(元数据库、SQL语法、UDF、序列化、反序列化机制) //UDF很重要,自定义函数
          ⑤支持多种输入输出(mysql,json,csv,parquet等)

    2、DataFrame、Dataset、RDD

     

      ①共同点:
    
            1、RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,都是spark的数据集合抽象。
    
            2、三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action时,三者才会开始遍历运算
                val rdd=spark.sparkContext.parallelize(Seq(("a", 1), ("b", 1), ("a", 1)))
                rdd.map{line=>
                      println("运行")
                      line._1
                }                                    //不会运行

    3、三者都有partition的概念、都有filter、map等常用算子 var predata=data.repartition(24).mapPartitions(func) //有分区概念,说明是分布式 ②区别: RDD: 1、RDD不支持sparksql操作 2、会频繁的创建销毁对象,会增加GC开销 //比如map操作,对每行进行遍历,每行都会创建line等对象,然后销毁 注:我们可以利用mapPartitions方法来重载RDD单个分片内的数据创建方式,用复用可变对象的方式来减小对象分配和GC的开销。 注:我们写的sql、算子里面其实Spark SQL在框架内部已经在各种可能的情况下尽量重用了对象 3、无论是集群间的通信,还是IO操作都需要对对象的结构和数据进行序列化和反序列化,RDD由于带上了元数据,所以序列化、反序列化的开销比较大 注:由于DataFrame每行的数据类型相同,所以序列化不需要带上元数据 4、由于它基本和hadoop一样万能的,对于行数据没有特定的数据结构,因此没有针对特殊场景的优化,只能在执行的逻辑上进行优化 总结:GC、序列化开销大,不能针对性优化 DataFrame: 1、结构化数据处理非常方便,支持多种输入输出(mysql,json,csv,parquet等) 2、DataFrame引入了schema和off-heap(数据存储使用堆外内存,增加元数据) schema : RDD每一行的数据, 结构都是一样的. 这个结构就存储在schema中,并且Spark对schema进行了保存. Spark通过schame就能够读懂数据,    因此在通信和IO时就只需要序列化和反序列化数据, 而结构的部分就可以省略了. off-heap : 意味着JVM堆外内存, 这些内存直接受操作系统管理(而不是JVM)。Spark能够以二进制的形式序列化数据(不包括结构)到off-heap中,     当要操作数据时(比如shuffle), 就直接操作off-heap内存. 由于Spark理解schema, 所以知道该如何操作,也同样减少了GC,提升了运行速度 注:比如说shuffle,每个分区的类型都一样,就不用每个分区都序列化schema了,去读一下原来Spark保存的schema就行 数据存储在堆外内存,也就直接从堆外去取就行,也同样较少了GC,提升了运行速度 注:schema包含了 表头(表的描述信息),描述了有多少列,每一列数叫什么名字、什么类型、能不能为空 注:DataFrame = RDD+Schema。类似于传统数据库中的二维表格。 3、RDD执行作业只能在调度阶段进行简单通用的优化,而DataFrame带有数据集内部的结构,可以根据这些信息进行针对性的优化,最终实现优化运行效率 4、hive兼容,支持hql、udf(自定义函数)等 //udf自定义函数很重要,hive中的函数、都支持!!! 缺点: 1.编译时不能类型转化安全检查,运行时才能确定是否有问题 2.对于对象支持不友好,rdd内部数据直接以java对象存储,dataframe内存存储的是row对象而不能是自定义对象 注:dataframe里面都是一个个的Row对象,即便存入的是样例类,也是Row,其实可以将Row理解为Object,虽然也能很好的拿出每一列数据 总结:GC、序列化开销小,堆外保存(序列化文件),能针对性优化,支持SQL,支持多种数据来源与保存,只是不能自定义对象 Dataset:
    1.和RDD一样,只不过支持自定义对象存储 2.类型转化安全,代码友好 注:在进行toDF、toDS操作都需要这个包进行支持:import x.implicits._ //这里的x是SparkSession的变量名 注:dataframe每行的类型为Row(object),Dataset可以为Person、Animal等自定义类,但都是一张表,类似于数据库中的表一样! 注:所以说DF是弱类型,DS是强类型 val col1=line.getAs[String]("col1") //因为是弱类型,所以根据列名获取某一列的值 val col1=line.col1 //强类型,直接点就行了 ③转化 rdd、toDF、toDS val rdd1=testDF.rdd 特例: val ds1 = df.as[Person] ④常用方法:    df.show()    df.printSchema() //打印底层的物理执行计划 df.explain() //收集计算结果到dirver端 df.collect() //收集结果到driver端

    3、Parquet

        Parquet是列式存储格式的一种文件类型,列式存储有以下的核心优势:

        a)可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。

        b)压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如RunLength Encoding和Delta Encoding)进一步节约存储空间。

        c)只读取需要的列,支持向量运算,能够获取更好的扫描性能。

        注:Parquet是spark其推荐的数据存储格式(默认存储为parquet)。
        注:parquet本质是对shuffle数据进行了压缩,减少了网络开销,提高了shuffle速度。其他的压缩还有orc等

    4、自定义函数

        spark sql、hive都是这三种:

          UDF(User-Defined-Function),即最基本的自定义函数,类似to_char,to_date等(一进一出)
          UDAF(User- Defined Aggregation Funcation),用户自定义聚合函数,类似在group by之后使用的sum,avg等(多进一出)
          UDTF(User-Defined Table-Generating Functions),用户自定义生成函数,有点像stream里面的flatMap(多进多出)

    5、Join优化

      ①小表 join 大表

          join操作尽量以大表为基表,join默认右边为基表,左连接,左表为基表,右连接,右表为基表。

      ②Broadcast Join

          维度表:一般指固定的、变动较少的表,例如联系人、物品种类等,一般数据有限。
          事实表:一般记录流水,比如销售清单等,通常随着时间的增长不断膨胀。

          eg:因为Join操作是对两个表中key值相同的记录进行连接,在SparkSQL中,对两个表做Join最直接的方式是先根据key
            分区,再在每个分区中把key值相同的记录拿出来做连接操作。但这样就不可避免地涉及到shuffle,而shuffle在
            Spark中是比较耗时的操作,我们应该尽可能的设计Spark应用使其避免大量的shuffle。

            为了避免shuffle,我们可以将大小有限的维度表的全部数据分发到每个节点
            上,供事实表使用。executor存储维度表的全部数据,一定程度上牺牲了空间,换取shuffle操作大量的耗时,这在
            SparkSQL中称作Broadcast Join

          Broadcast Join的条件:

            1. 被广播的表需要小于spark.sql.autoBroadcastJoinThreshold所配置的值,默认是10M
            2. 基表不能被广播,比如left outer join时,只能广播右表

      ③Shuffle Hash Join

          当一侧的表比较小时,我们选择将其广播出去以避免shuffle,提高性能。但因为被广播的表首先被collect到driver
          段,然后被冗余分发到每个executor上,所以当表比较大时,采用broadcast join会对driver端和executor端造成
          较大的压力。

          Shuffle Hash Join分为两步:
            1. 对两张表分别按照join keys进行重分区,即shuffle,目的是为了让有相同join keys值的记录分到对应的分区中
            2. 对对应分区中的数据进行join,此处先将小表分区构造为一张hash表,然后根据大表分区中记录的join keys值拿出来进行匹配

       ④Sort Merge Join

          比Shuffle Hash Join多了一层排序,每个分区进行排序,当join时候,但能过滤掉很多数据

  • 相关阅读:
    SSH综合练习-仓库管理系统-第二天
    SSH综合练习-第1天
    Spring第三天
    Spring第二天
    Spring第一天
    【pandas】pandas.Series.str.split()---字符串分割
    【剑指offer】和为s的两个数字
    【剑指offer】数组中只出现一次的数字
    【剑指offer】输入一颗二叉树的根节点,求二叉树的深度,C++实现
    【剑指offer】输入一颗二叉树的根节点,判断是不是平衡二叉树,C++实现
  • 原文地址:https://www.cnblogs.com/lihaozong2013/p/10586997.html
Copyright © 2011-2022 走看看