zoukankan      html  css  js  c++  java
  • Spark SQL原理详解及优化器

    一.简介

      从Spark 1.3开始,Spark SQL正式发布。而之前的另一个基于Spark的SQL开源项目Shark随之停止更新,基于Spark的最佳SQL计算就是Spark SQL。Spark SQL是Spark的一个模块,专门用于处理结构化数据。Spark SQL与Spark核心及其他模块之间的关系如下:

      

      Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。与Spark SQL交互的方法有很多种,包括SQL和Dataset API。计算结果时,将使用相同的执行引擎,而与用的表达计算API或语言无关。这种同一意味着开发人员可以轻松地在不同的API之间来回切换,从而提供最自然的方式表达给定的转换。

    二.Dataset & DataFrame

        数据集时数据的分布式集合。数据集是Spark 1.6中添加的新接口,它具有RDD的优点【强类型输入,使用强大的Lambda函数的能力】和Spark SQL的优化执行引擎的优点。数据集可以从JVM对象中构造,然后使用功能性的转换【操作map、flatMap、filter等】。Dataset API在Scala和Java中都可使用。Python不支持Dataset API。但是由于Python的动态特性,Dataset API的许多优点已经可用。R语言与之类似。

        DataFrame从概念上讲,它等效于关系数据库中的表或R/Python中的数据框,但是在后台进行了更丰富的优化,可以从多种来源构造DataFrame。例如:结构化数据文件,Hive中的表,外部数据库或现有RDD。DataFrame API在Scala,Java,Python和R中都可以使用。在Scala和Java中,DataFrame表示由Row构成的数据集。在Scala API中,DataFrame只是类型Dataset[Row]的别名。而在Java API中,用户需要使用Dataset<Row>来代表DataFrame。

    三.整体架构

      

      注意:Spark SQL是Spark Core之上的一个模块,所有SQL操作最终都通过Catalyst翻译成类似的Spark程序代码被Spark Core调度执行,其过程也有Job、Stage、Task的概念。

    四.全局临时视图

      Spark SQL中的临时视图是有会话作用域的,如果创建它的会话终止,它将消失。如果要在所有会话中共享一个临时视图并保存活动状态,直到Spark应用程序终止,则需要创建全局临时视图。全局临时视图与系统保留的数据库global_temp相关联,必须使用限定名称来引用它,代码例子如下:

    df.createGlobalTempView("people")
    // 全局临时视图与系统保留的数据库global_temp spark.sql(
    "select * from global_temp.people").show()
    // 全局临时视图垮会话
    spark.newSession().sql("select * from global_temp.people").show()

    五.创建数据集

      数据集与RDD相似,但是它们不是使用Java或Kryo进行序列化,而是使用专门的Encoder对对象进行序列化以进行网络处理或传输。虽然编码器和标准序列化都负责将对象转换为字节,但是编码器是动态生成的代码,并使用一种格式,该格式允许Spark执行许多操作,例如过滤,排序和哈希处理,而无需将字节反序列化为对象。

    object DataSetDeml {
    
      //设置日志级别
      Logger.getLogger("org").setLevel(Level.WARN)
      // 放在引用的函数外部
      case class Person(name : String, age : Long)
      def main(args: Array[String]) {
        val spark = SparkSession.builder().appName("Spark SQL").master("local[2]").getOrCreate()
    
        // 数据集直接的转换
        import spark.implicits._
    
        // 使用样例类创建数据集
        val caseClassDS = Seq(Person("Andy", 32)).toDS()
        caseClassDS.show()
    
        val primitiveDS = Seq(1, 2, 3).toDS()
        primitiveDS.map(_ + 1).show()
      }
    }

      执行结果:

      

      

    六.与RDD互操作

      Spark SQL支持两种将现有RDD转换为数据集的方法。第一种方法使用反射来推断包含特定对象类型的RDD的架构。这种基于反射的方法可以使代码更简洁,并且当编写Spark应用程序时已经了解架构时,可以很好地工作。

      创建数据集的第二种方法是通过编程界面,该界面允许构造模式,然后将其应用于现有的RDD。尽管此方法较为冗长,但可以在运行时才知道列及其类型的情况下构造数据集。

      1.使用反射

    object DataFrameDeml {
    
      //设置日志级别
      Logger.getLogger("org").setLevel(Level.WARN)
      // 放在引用的函数外部
      case class Technology(name : String, level : Long, age : Long)
      def main(args: Array[String]) {
        val spark = SparkSession.builder().appName("Spark SQL").master("local[2]").getOrCreate()
    
        // 数据集直接的转换
        import spark.implicits._
    
        val technology = spark.sparkContext
          .textFile("D:\software\spark-2.4.4\data\sql\dataframe.txt")
          .map(_.split(","))
          .map(row => Technology(row(0), row(1).toLong, row(2)toLong))
          .toDF()
    
        technology.show()
        // 注册临时视图
        technology.createOrReplaceTempView("technology")
    
        // SQL查询
        val level_2 = spark.sql("select name,age from technology where level = 2")
        level_2.show()
    
        // 指定编码器
        implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
        val result = level_2.map(row => row.getValuesMap[Any](List("name", "age")))
        result.show(false)
      }
    }

      执行结果:

      

      

      

      2.使用模式

    val userData = Array(
          "2015,11,www.baidu.com", "2016,14,www.google.com",
          "2017,13,www.apache.com", "2015,21,www.spark.com",
          "2016,32,www.hadoop.com", "2017,18,www.solr.com",
          "2017,14,www.hive.com"
        )
    
        val userDataRDD = sc.parallelize(userData) // 转化为RDD
        val userDataType = userDataRDD.map(line => {
            val Array(age, id, url) = line.split(",")
            Row(age, id.toInt, url)
          })
        val structTypes = StructType(Array(
          StructField("age", StringType, true),
          StructField("id", IntegerType, true),
          StructField("url", StringType, true)
        ))
        // RDD转化为DataFrame
        val userDataFrame = sqlContext.createDataFrame(userDataType,structTypes)

    七.Catalyst执行优化器

      1 Catalyst最主要的数据结构是树,所有的SQL语句都会用树结构来存储,树中的每个节点都有一个类,以及0或多个子节点。Scala中定义的新的节点类型都是TreeNode这个类的子类,这些对象是不可变的。

      2 Catalyst另外一个重要的概念是规则,基本上,所有的优化都是基于规则的。

      3 执行过程

        1 分析阶段

          分析逻辑树,解决引用。

          使用Catalyst规则和Catalog对象来跟踪所有数据源中的表,以解决所有未辨识的属性。

        2 逻辑优化

        3 物理计划

          Catalyst会生成很多计划,并基于成本进行对比。接受一个逻辑计划作为输入,生产一个或多个物理计划。

        4 代码生成

          将Spark SQL代码编译成Java字节码。

      

  • 相关阅读:
    Kotlin 基础
    ViewPager2
    8086-debug指令
    (四)主控板改IP,升级app,boot,mac
    (三)主控板生级uboot与内核
    (四)linux网络编程
    (七)嵌入式系统异常程序远程定位
    (六)ARM状态寄存器-PSR
    (五)stm32工程代码HardFault异常查错调试方法
    (十)makefile
  • 原文地址:https://www.cnblogs.com/yszd/p/10031102.html
Copyright © 2011-2022 走看看