zoukankan      html  css  js  c++  java
  • sparkSQL1.1入门之四:深入了解sparkSQL执行计划

          前面两章花了不少篇幅介绍了SparkSQL的执行过程,非常多读者还是认为当中的概念非常抽象。比方Unresolved LogicPlan、LogicPlan、PhysicalPlan是长得什么样子,没点印象。仅仅知道名词,感觉非常缥缈。

    本章就着重介绍一个工具hive/console,来加深读者对sparkSQL的执行计划的理解。


    1:hive/console安装
          sparkSQL从1.0.0開始提供了一个sparkSQL的调试工具hive/console。

    该工具是给开发人员使用,在编译生成的安装部署包中并没有;该工具须要使用sbt编译执行。要使用该工具,须要具备下面条件:

    • spark1.1.0源代码
    • hive0.12源代码并编译
    • 配置环境变量

    1.1:安装hive/cosole
    以下是笔者安装过程:
    A:下载spark1.1.0源代码,安装在/app/hadoop/spark110_sql文件夹
    B:下载hive0.12源代码,安装在/app/hadoop/hive012文件夹,进入src文件夹后,使用以下命令进行编译:
    ant clean package -Dhadoop.version=2.2.0 -Dhadoop-0.23.version=2.2.0 -Dhadoop.mr.rev=23
    C:配置环境变量文件~/.bashrc后,source ~/.bashrc使环境变量生效。
    export HIVE_HOME=/app/hadoop/hive012/src/build/dist
    export HIVE_DEV_HOME=/app/hadoop/hive012/src
    export HADOOP_HOME=/app/hadoop/hadoop220
    D:启动
    切换到spark安装文件夹/app/hadoop/spark110_sql,执行命令:
    sbt/sbt hive/console
    经过一段漫长的sbt编译过程。最后出现例如以下界面:

    在控制台的scala提示符下,输入:help能够获取帮助,输入Tab键会陈列出当前可用的方法、函数、及变量。下图为按Tab键时显示的方法和函数。随着用户不断使用该控制态,用户定义或使用过的变量也会陈列出来。

     
    1.2:hive/console原理
          hive/console的调试原理非常easy。就是在scala控制台装载了catalyst中几个关键的class,当中的TestHive提前定义了表结构并装载命令。这些数据是hive0.12源代码中带有的測试数据,装载这些数据是按需运行的。这些数据位于/app/hadoop/hive012/src/data中。也就是$HIVE_DEV_HOME/data中。

     /*源自 sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala */
      // The test tables that are defined in the Hive QTestUtil.
      // /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
      val hiveQTestUtilTables = Seq(
        TestTable("src",
          "CREATE TABLE src (key INT, value STRING)".cmd,
          s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' INTO TABLE src".cmd),
        TestTable("src1",
          "CREATE TABLE src1 (key INT, value STRING)".cmd,
          s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv3.txt")}' INTO TABLE src1".cmd),
        TestTable("srcpart", () => {
          runSqlHive(
            "CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds STRING, hr STRING)")
          for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) {
            runSqlHive(
              s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}'
                 |OVERWRITE INTO TABLE srcpart PARTITION (ds='$ds',hr='$hr')
               """.stripMargin)
          }
        }),
    ......
    )
    由于要使用hive0.12的測试数据。所以须要定义两个环境变量:HIVE_HOME和HIVE_DEV_HOME。假设使用hive0.13的话。用户须要更改到对应文件夹:
     /*源自 sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala */
      /** The location of the compiled hive distribution */
      lazy val hiveHome = envVarToFile("HIVE_HOME")
      /** The location of the hive source code. */
      lazy val hiveDevHome = envVarToFile("HIVE_DEV_HOME")
    另外,假设用户想在hive/console启动的时候。预载很多其它的class。能够改动spark源代码下的 project/SparkBuild.scala文件
     /* 源自 project/SparkBuild.scala */
    object Hive {
      lazy val settings = Seq(
        javaOptions += "-XX:MaxPermSize=1g",
        // Multiple queries rely on the TestHive singleton. See comments there for more details.
        parallelExecution in Test := false,
        // Supporting all SerDes requires us to depend on deprecated APIs, so we turn off the warnings
        // only for this subproject.
        scalacOptions <<= scalacOptions map { currentOpts: Seq[String] =>
          currentOpts.filterNot(_ == "-deprecation")
        },
        initialCommands in console :=
          """
            |import org.apache.spark.sql.catalyst.analysis._
            |import org.apache.spark.sql.catalyst.dsl._
            |import org.apache.spark.sql.catalyst.errors._
            |import org.apache.spark.sql.catalyst.expressions._
            |import org.apache.spark.sql.catalyst.plans.logical._
            |import org.apache.spark.sql.catalyst.rules._
            |import org.apache.spark.sql.catalyst.types._
            |import org.apache.spark.sql.catalyst.util._
            |import org.apache.spark.sql.execution
            |import org.apache.spark.sql.hive._
            |import org.apache.spark.sql.hive.test.TestHive._
            |import org.apache.spark.sql.parquet.ParquetTestData""".stripMargin
      )
    }

    2:经常使用操作
          以下介绍一下hive/console的经常使用操作,主要是和执行计划相关的经常使用操作。在操作前,首先定义一个表people和查询query:
    //在控制台逐行执行
    case class Person(name:String, age:Int, state:String)
    sparkContext.parallelize(Person("Michael",29,"CA")::Person("Andy",30,"NY")::Person("Justin",19,"CA")::Person("Justin",25,"CA")::Nil).registerTempTable("people")
    val query= sql("select * from people")
    2.1 查看查询的schema
    query.printSchema


    2.2 查看查询的整个执行计划
    query.queryExecution


    2.3 查看查询的Unresolved LogicalPlan
    query.queryExecution.logical


    2.4 查看查询的analyzed LogicalPlan
    query.queryExecution.analyzed


     2.5 查看优化后的LogicalPlan
    query.queryExecution.optimizedPlan


    2.6 查看物理计划
    query.queryExecution.sparkPlan


    2.7 查看RDD的转换过程
    query.toDebugString


    2.8 很多其它的操作
          很多其它的操作能够通过Tab键陈列出来。也能够參开sparkSQL的API,也能够參看源码中的方法和函数。


    3:不同数据源的执行计划
          上面经常使用操作里介绍了源自RDD的数据。我们都知道。sparkSQL能够源自多个数据源:jsonFile、parquetFile、hive。

    以下看看这些数据源的schema:

    3.1 json文件
          json文件支持嵌套表,sparkSQL也能够读入嵌套表,如以下形式的json数据,经修整(去空格和换行符)保存后,能够使用jsonFile读入sparkSQL。
    {  
       "fullname": "Sean Kelly",     
       "org": "SK Consulting",     
       "emailaddrs": [     
          {"type": "work", "value": "kelly@seankelly.biz"},     
          {"type": "home", "pref": 1, "value": "kelly@seankelly.tv"}     
       ],     
        "telephones": [     
          {"type": "work", "pref": 1, "value": "+1 214 555 1212"},     
          {"type": "fax", "value": "+1 214 555 1213"},     
          {"type": "mobile", "value": "+1 214 555 1214"}     
       ],     
       "addresses": [     
          {"type": "work", "format": "us",     
           "value": "1234 Main StnSpringfield, TX 78080-1216"},     
          {"type": "home", "format": "us",     
           "value": "5678 Main StnSpringfield, TX 78080-1316"}     
       ],     
        "urls": [     
          {"type": "work", "value": "http://seankelly.biz/"},     
          {"type": "home", "value": "http://seankelly.tv/"}     
       ]     
    }
    去空格和换行符后保存为/home/mmicky/data/nestjson.json,使用jsonFile读入并注冊成表jsonPerson,然后定义一个查询jsonQuery:
    jsonFile("/home/mmicky/data/nestjson.json").registerTempTable("jsonPerson")
    val jsonQuery = sql("select * from jsonPerson")
    查看jsonQuery的schema:
    jsonQuery.printSchema

    查看jsonQuery的整个执行计划:
    jsonQuery.queryExecution


    3.2 parquet文件
          parquet文件读入并注冊成表parquetWiki,然后定义一个查询parquetQuery:
    parquetFile("/home/mmicky/data/spark/wiki_parquet").registerTempTable("parquetWiki")
    val parquetQuery = sql("select * from parquetWiki")
    查询parquetQuery的schema:
    parquetQuery.printSchema

    查询parquetQuery的整个执行计划:
    parquetQuery.queryExecution


    3.3 hive数据
          之前说了,TestHive类中已经定义了大量的hive0.12的測试数据的表格式,如src、sales等等,在hive/console里能够直接使用;第一次使用的时候,hive/console会装载一次。

    以下我们使用sales表看看其schema和整个执行计划。首先定义一个查询hiveQuery:

    val hiveQuery = sql("select * from sales")
    查看hiveQuery的schema:
    hiveQuery.printSchema

    查看hiveQuery的整个执行计划:
    hiveQuery.queryExecution

    从上面能够看出,来自jsonFile、parquetFile、hive数据的物理计划还有有非常大差别的。

    4:不同查询的执行计划
          为了加深理解,我们列几个经常使用查询的执行计划和RDD转换过程。
    4.1 聚合查询
    sql("select state,avg(age) from people group by state").queryExecution

    sql("select state,avg(age) from people group by state").toDebugString

     
    4.2 join操作
    sql("select a.name,b.name from people a join people b where a.name=b.name").queryExecution

    sql("select a.name,b.name from people a join people b where a.name=b.name").toDebugString


    4.3 Distinct操作
    sql("select distinct a.name,b.name from people a join people b where a.name=b.name").queryExecution

    sql("select distinct a.name,b.name from people a join people b where a.name=b.name").toDebugString

     
    5:查询的优化
          上面的查询比較简单。看不出优化的过程,以下看几个样例,能够理解sparkSQL的优化过程。

    5.1 CombineFilters
          CombineFilters就是合并Filter,在含有多个Filter时发生。例如以下查询:
    sql("select name from (select * from people where age >=19) a where a.age <30").queryExecution

    上面的查询,在Optimized的过程中。将age>=19和age<30这两个Filter合并了,合并成((age>=19) && (age<30))。事实上上面还做了一个其它的优化,就是project的下推,子查询使用了表的全部列,而主查询使用了列name。在查询数据的时候子查询优化成仅仅查列name。

    5.2 PushPredicateThroughProject
          PushPredicateThroughProject就是project下推。和上面样例中的project一样。
    sql("select name from (select name,state as location from people) a where location='CA'").queryExecution

     
    5.3 ConstantFolding
          ConstantFolding是常量叠加,用于表达式。如以下的样例:
    sql("select name,1+2 from people").queryExecution

    在Optimized的过程中,将常量表达式直接累加在一起。用新的列名来表示。

    5.4 自己定义优化
          在sparkSQL中的Optimizer中定义了3类12中优化方法,这里不再一一陈列。对于用于自己定义的优化,在hive/console也能够非常方便的调试。仅仅要先定义一个LogicalPlan,然后使用自己定义的优化函数进行測试就能够了。以下就举个和CombineFilters一样的样例,首先定义一个函数:
    object CombineFilters extends Rule[LogicalPlan] {
      def apply(plan: LogicalPlan): LogicalPlan = plan transform {
        case Filter(c1, Filter(c2, grandChild)) =>
          Filter(And(c1,c2),grandChild)
      }
    }
    然后定义一个query,并使用query.queryExecution.analyzed查看优化前的LogicPlan:
    val query= sql("select * from people").where('age >=19).where('age <30)
    query.queryExecution.analyzed

    最后。使用自己定义优化函数进行优化:
    CombineFilters(query.queryExecution.analyzed)

    能够看到两个Filter合并在一起了。

    甚至,在hive/console里直接使用transform对LogicPlan应用定义好的rule,以下定义了一个query,并使用query.queryExecution.analyzed查看应用rule前的LogicPlan:
    val hiveQuery = sql("SELECT * FROM (SELECT * FROM src) a")
    hiveQuery.queryExecution.analyzed

    然后,直接用transform将自己定义的rule:
    hiveQuery.queryExecution.analyzed transform {
       case Project(projectList, child) if projectList == child.output => child
     }

    该transform在LogicPlan的主查询和子查询的project同样时合并project。

          经过上面的样例。加上自己的理解。相信大部分的读者对sparkSQL中的执行计划应该有了比較明白的了解。


  • 相关阅读:
    Windows开发,关于通过写代码加载PDB的那些事
    从FreeBSD里面看到的网络协议列表,感觉可以保存一下
    听了几段《双投唐》
    ReactOS 无法显示中文的问题
    ReactOS 代码更新后的编译安装
    都什么年代了,怎么还那种德行
    假冒不伪劣
    ollvm 使用——“Cannot open /dev/random”错误的解决方法
    ollvm 编译
    C/C++ 吐槽第一期:你最讨厌的C/C++里面的数据类型是什么
  • 原文地址:https://www.cnblogs.com/slgkaifa/p/6748815.html
Copyright © 2011-2022 走看看