zoukankan      html  css  js  c++  java
  • Spark修炼之道(进阶篇)——Spark入门到精通:第九节 Spark SQL执行流程解析

    1.总体执行流程

    使用下列代码对SparkSQL流程进行分析。让大家明确LogicalPlan的几种状态,理解SparkSQL总体执行流程

    // sc is an existing SparkContext.
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    // this is used to implicitly convert an RDD to a DataFrame.
    import sqlContext.implicits._
    
    // Define the schema using a case class.
    // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
    // you can use custom classes that implement the Product interface.
    case class Person(name: String, age: Int)
    
    // Create an RDD of Person objects and register it as a table.
    val people = sc.textFile("/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
    people.registerTempTable("people")
    
    // SQL statements can be run by using the sql methods provided by sqlContext.
    val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
    

    (1)查看teenagers的Schema信息

    scala> teenagers.printSchema
    root
     |-- name: string (nullable = true)
     |-- age: integer (nullable = false)

    (2)查看执行流程

    scala> teenagers.queryExecution
    res3: org.apache.spark.sql.SQLContext#QueryExecution =
    == Parsed Logical Plan ==
    'Project [unresolvedalias('name),unresolvedalias('age)]
     'Filter (('age >= 13) && ('age <= 19))
      'UnresolvedRelation [people], None
    
    == Analyzed Logical Plan ==
    name: string, age: int
    Project [name#0,age#1]
     Filter ((age#1 >= 13) && (age#1 <= 19))
      Subquery people
       LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
    
    == Optimized Logical Plan ==
    Filter ((age#1 >= 13) && (age#1 <= 19))
     LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
    
    == Physical Plan ==
    Filter ((age#1 >= 13) && (age#1 <= 19))
     Scan PhysicalRDD[name#0,age#1]
    
    Code Generation: true
    

    QueryExecution中表示的是总体Spark SQL执行流程,从上面的输出结果能够看到,一个SQL语句要执行须要经过下列步骤:

    == (1)Parsed Logical Plan ==
    'Project [unresolvedalias('name),unresolvedalias('age)]
     'Filter (('age >= 13) && ('age <= 19))
      'UnresolvedRelation [people], None
    
    == (2)Analyzed Logical Plan ==
    name: string, age: int
    Project [name#0,age#1]
     Filter ((age#1 >= 13) && (age#1 <= 19))
      Subquery people
       LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
    
    == (3)Optimized Logical Plan ==
    Filter ((age#1 >= 13) && (age#1 <= 19))
     LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
    
    == (4)Physical Plan ==
    Filter ((age#1 >= 13) && (age#1 <= 19))
     Scan PhysicalRDD[name#0,age#1]
    
    //启动动态字节码生成技术(bytecode generation。CG),提升查询效率
    Code Generation: true

    2.全表查询执行流程

    执行语句:

    val all= sqlContext.sql("SELECT * FROM people")

    执行流程:

    scala> all.queryExecution
    res9: org.apache.spark.sql.SQLContext#QueryExecution =
    //注意*号被解析为unresolvedalias(*)
    == Parsed Logical Plan ==
    'Project [unresolvedalias(*)]
     'UnresolvedRelation [people], None
    
    == Analyzed Logical Plan ==
    //unresolvedalias(*)被analyzed为Schema中全部的字段
    //UnresolvedRelation [people]被analyzed为Subquery people
    name: string, age: int
    Project [name#0,age#1]
     Subquery people
      LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
    
    == Optimized Logical Plan ==
    LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
    
    == Physical Plan ==
    Scan PhysicalRDD[name#0,age#1]
    
    Code Generation: true
    

    3. filter查询执行流程

    执行语句:

    scala> val filterQuery= sqlContext.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")
    filterQuery: org.apache.spark.sql.DataFrame = [name: string, age: int]

    执行流程:

    scala> filterQuery.queryExecution
    res0: org.apache.spark.sql.SQLContext#QueryExecution =
    == Parsed Logical Plan ==
    'Project [unresolvedalias(*)]
     'Filter (('age >= 13) && ('age <= 19))
      'UnresolvedRelation [people], None
    
    == Analyzed Logical Plan ==
    name: string, age: int
    Project [name#0,age#1]
     //多出了Filter。后同
     Filter ((age#1 >= 13) && (age#1 <= 19))
      Subquery people
       LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:20
    
    == Optimized Logical Plan ==
    Filter ((age#1 >= 13) && (age#1 <= 19))
     LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:20
    
    == Physical Plan ==
    Filter ((age#1 >= 13) && (age#1 <= 19))
     Scan PhysicalRDD[name#0,age#1]
    
    Code Generation: true

    4. join查询执行流程

    执行语句:

    val joinQuery= sqlContext.sql("SELECT * FROM people a, people b where a.age=b.age")

    查看总体执行流程

    scala> joinQuery.queryExecution
    res0: org.apache.spark.sql.SQLContext#QueryExecution =
    //注意Filter
    //Join Inner
    == Parsed Logical Plan ==
    'Project [unresolvedalias(*)]
     'Filter ('a.age = 'b.age)
      'Join Inner, None
       'UnresolvedRelation [people], Some(a)
       'UnresolvedRelation [people], Some(b)
    
    == Analyzed Logical Plan ==
    name: string, age: int, name: string, age: int
    Project [name#0,age#1,name#2,age#3]
     Filter (age#1 = age#3)
      Join Inner, None
       Subquery a
        Subquery people
         LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
       Subquery b
        Subquery people
         LogicalRDD [name#2,age#3], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
    
    == Optimized Logical Plan ==
    Project [name#0,age#1,name#2,age#3]
     Join Inner, Some((age#1 = age#3))
      LogicalRDD [name#0,age#1], MapPartitionsRDD[4]...
    
    //查看其Physical Plan
    scala> joinQuery.queryExecution.sparkPlan
    res16: org.apache.spark.sql.execution.SparkPlan =
    TungstenProject [name#0,age#1,name#2,age#3]
     SortMergeJoin [age#1], [age#3]
      Scan PhysicalRDD[name#0,age#1]
      Scan PhysicalRDD[name#2,age#3]

    前面的样例与以下的样例等同,仅仅只是其执行方式略有不同,执行语句:

    scala> val innerQuery= sqlContext.sql("SELECT * FROM people a inner join people b on a.age=b.age")
    innerQuery: org.apache.spark.sql.DataFrame = [name: string, age: int, name: string, age: int]
    

    查看总体执行流程:

    scala> innerQuery.queryExecution
    res2: org.apache.spark.sql.SQLContext#QueryExecution =
    //注意Join Inner
    //另外这里面没有Filter
    == Parsed Logical Plan ==
    'Project [unresolvedalias(*)]
     'Join Inner, Some(('a.age = 'b.age))
      'UnresolvedRelation [people], Some(a)
      'UnresolvedRelation [people], Some(b)
    
    == Analyzed Logical Plan ==
    name: string, age: int, name: string, age: int
    Project [name#0,age#1,name#4,age#5]
     Join Inner, Some((age#1 = age#5))
      Subquery a
       Subquery people
        LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
      Subquery b
       Subquery people
        LogicalRDD [name#4,age#5], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
    
    //注意Optimized Logical Plan与Analyzed Logical Plan
    //并没有进行特别的优化,突出这一点是为了比較后面的子查询
    //其Analyzed和Optimized间的差别
    == Optimized Logical Plan ==
    Project [name#0,age#1,name#4,age#5]
     Join Inner, Some((age#1 = age#5))
      LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder ...
    
    //查看其Physical Plan
    scala> innerQuery.queryExecution.sparkPlan
    res14: org.apache.spark.sql.execution.SparkPlan =
    TungstenProject [name#0,age#1,name#6,age#7]
     SortMergeJoin [age#1], [age#7]
      Scan PhysicalRDD[name#0,age#1]
      Scan PhysicalRDD[name#6,age#7]
    

    5. 子查询执行流程

    执行语句:

    scala> val subQuery=sqlContext.sql("SELECT * FROM (SELECT * FROM people WHERE age >= 13)a where a.age <= 19")
    subQuery: org.apache.spark.sql.DataFrame = [name: string, age: int]
    

    查看总体执行流程:

    
    scala> subQuery.queryExecution
    res4: org.apache.spark.sql.SQLContext#QueryExecution =
    == Parsed Logical Plan ==
    'Project [unresolvedalias(*)]
     'Filter ('a.age <= 19)
      'Subquery a
       'Project [unresolvedalias(*)]
        'Filter ('age >= 13)
         'UnresolvedRelation [people], None
    
    == Analyzed Logical Plan ==
    name: string, age: int
    Project [name#0,age#1]
     Filter (age#1 <= 19)
      Subquery a
       Project [name#0,age#1]
        Filter (age#1 >= 13)
         Subquery people
          LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
    
    //这里须要注意Optimized与Analyzed间的差别
    //Filter被进行了优化
    == Optimized Logical Plan ==
    Filter ((age#1 >= 13) && (age#1 <= 19))
     LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
    
    == Physical Plan ==
    Filter ((age#1 >= 13) && (age#1 <= 19))
     Scan PhysicalRDD[name#0,age#1]
    
    Code Generation: true
    
    

    6. 聚合SQL执行流程

    执行语句:

    scala> val aggregateQuery=sqlContext.sql("SELECT a.name,sum(a.age) FROM (SELECT * FROM people WHERE age >= 13)a where a.age <= 19 group by a.name")
    aggregateQuery: org.apache.spark.sql.DataFrame = [name: string, _c1: bigint]
    

    执行流程查看:

    
    scala> aggregateQuery.queryExecution
    res6: org.apache.spark.sql.SQLContext#QueryExecution =
    //注意'Aggregate ['a.name], [unresolvedalias('a.name),unresolvedalias('sum('a.age))]
    //即group by a.name被 parsed为unresolvedalias('a.name)
    == Parsed Logical Plan ==
    'Aggregate ['a.name], [unresolvedalias('a.name),unresolvedalias('sum('a.age))]
     'Filter ('a.age <= 19)
      'Subquery a
       'Project [unresolvedalias(*)]
        'Filter ('age >= 13)
         'UnresolvedRelation [people], None
    
    == Analyzed Logical Plan ==
    name: string, _c1: bigint
    Aggregate [name#0], [name#0,sum(cast(age#1 as bigint)) AS _c1#9L]
     Filter (age#1 <= 19)
      Subquery a
       Project [name#0,age#1]
        Filter (age#1 >= 13)
         Subquery people
          LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
    
    == Optimized Logical Plan ==
    Aggregate [name#0], [name#0,sum(cast(age#1 as bigint)) AS _c1#9L]
     Filter ((age#1 >= 13) && (age#1 <= 19))
      LogicalRDD [name#0,age#1], MapPartitions...
    
    //查看其Physical Plan
    scala> aggregateQuery.queryExecution.sparkPlan
    res10: org.apache.spark.sql.execution.SparkPlan =
    TungstenAggregate(key=[name#0], functions=[(sum(cast(age#1 as bigint)),mode=Final,isDistinct=false)], output=[name#0,_c1#14L])
     TungstenAggregate(key=[name#0], functions=[(sum(cast(age#1 as bigint)),mode=Partial,isDistinct=false)], output=[name#0,currentSum#17L])
      Filter ((age#1 >= 13) && (age#1 <= 19))
       Scan PhysicalRDD[name#0,age#1]
    

    其他SQL语句。大家能够使用相同的方法查看其执行流程。以掌握Spark SQL背后实现的基本思想。

  • 相关阅读:
    MySQL错误 1030-Got error 28 from storage engine
    电脑开机无反应 不显示BIOS 硬件没问题
    python错误 import: unable to open X server
    Python 错误 invalid command 'bdist_wheel' & outside environment /usr
    Cento 7安装 Failed to execute /init
    笔记《鸟哥的Linux私房菜》5 首次登入与在线求助 man
    Scrapy XPath语法
    Linux 用户操作
    Mysql 表修改
    Ubuntu 配置 Python环境 IPython
  • 原文地址:https://www.cnblogs.com/zsychanpin/p/7210997.html
Copyright © 2011-2022 走看看