zoukankan      html  css  js  c++  java
  • Spark SQL(7)- physicalPlan 物理计划

    Spark SQL(7)- physicalPlan 物理计划

    spark的物理计划是将得到的逻辑算子树进一步的处理,得到针对RDD的一系列操作的集合,之后提交作业到spark集群。

    spark物理计划的生成主要经历了一下的步骤:

    1. 应用sparkPlanner中定义的各种策略(strategy), 这里sparkPlanner是SparkStrategy的子类,sparkPlanner中的各种策略都是在SparkStrategy中定义的,这里面还有一个PlanLater类型的代表叶子节点的sparkplan,其主要的作用就是延迟解析,这个在后面应用规则的时候会使用到;
    2. 选取一个物理计划,目前直接在返回的迭代器里面获取了第一个元素
    3. 准备物理计划,这个过程中会针对拿到的物理计划进行一些分区、排序等方面的处理,这一步也是根据各种各种定义好的规则来进行的。

    SparkPlanner简介

    sparkPlanner可以理解为物理计划针对RDD操作的描述,一般分为几大类:

    1. LeafExecNode 叶子节点 主要和数据源相关,用户创建RDD

    2. UnaryExecNode 一元节点  主要是针对RDD的转换操作

    3. BinaryExecNode 二元节点 join操作就属于这类

    4. 其他类型的节点 

     这里面有几个重要的成员变量和方法:

    1. metrics指标信息;
    2. outputPartitioning: Partitioning以及outputOrdering: Seq[SortOrder]  描述了分区和排序的信息,子类都有各自的实现;

    3. execute和doExecute方法,sparkPlanner对外提供了统一的调用触发RDD的方法execute,但是实际子类继承实现的doExecute方法。

    sparkPlan物理计划的生成

    sparkPlan的生成是在QueryExecution.scala中

    lazy val sparkPlan: SparkPlan = {
        SparkSession.setActiveSession(sparkSession)
        // TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
        //       but we will implement to choose the best plan.
        planner.plan(ReturnAnswer(optimizedPlan)).next()
      }
    

      这里调用了Queryplan.plan方法:

      def strategies: Seq[GenericStrategy[PhysicalPlan]]
    
      def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
        // Obviously a lot to do here still...
    
        // Collect physical plan candidates.
        val candidates = strategies.iterator.flatMap(_(plan))
    
        // The candidates may contain placeholders marked as [[planLater]],
        // so try to replace them by their child plans.
        val plans = candidates.flatMap { candidate =>
          val placeholders = collectPlaceholders(candidate)
    
          if (placeholders.isEmpty) {
            // Take the candidate as is because it does not contain placeholders.
            Iterator(candidate)
          } else {
            // Plan the logical plan marked as [[planLater]] and replace the placeholders.
            placeholders.iterator.foldLeft(Iterator(candidate)) {
              case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
                // Plan the logical plan for the placeholder.
                val childPlans = this.plan(logicalPlan)
    
                candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
                  childPlans.map { childPlan =>
                    // Replace the placeholder by the child plan
                    candidateWithPlaceholders.transformUp {
                      case p if p == placeholder => childPlan
                    }
                  }
                }
            }
          }
        }
    
        val pruned = prunePlans(plans)
        assert(pruned.hasNext, s"No plan for $plan")
        pruned
      }
    

      QueryPlan有点类似spark逻辑计划阶段的RulExecutor的角色,在QueryPlan同样定义了针对逻辑计划转化为物理计划的规则(strategy),但是具体的规则策略需要子类实现(sparkPlanner);因此类似逻辑计划阶段,我们主要关注sparkplanner中重新定义的策略规则即可,但是在此之前上述QueryPlan.plan方法的逻辑还是有必要理一理的这样也利于理解后面的规则,plan其实就是事先定义好的各种规则,但是在应用规则的时候,不一定一次性全部解析完毕,所以需要上面提到的PlanLater节点来占位,这么做之后,在一次迭代之后,会统计PlanLater节点,之后替换成其子节点,继续应用plan方法,直到所有的节点都解析完毕,此时也就相当于物理计划转换完成。下面简单看下sparkPlanner中定义的strategy有哪些:

     override def strategies: Seq[Strategy] =
        experimentalMethods.extraStrategies ++
          extraPlanningStrategies ++ (
          DataSourceV2Strategy :: 
          FileSourceStrategy :: 
          DataSourceStrategy(conf) :: 
          SpecialLimits ::
          Aggregation ::
          JoinSelection ::
          InMemoryScans ::
          BasicOperators :: Nil)
    

      这里面根据名字大概能明白策略的含义,数据源相关:DataSourceV2Strategy和DataSourceStrategy(conf) 关于他俩个区别感兴趣的同学可以自己研究下,文件数据源:FileSource,聚合:Aggregation,连接: JoinSelection 等等。

          以上之后相当于生成了物理计划,之后获取第一条便是后面计算需要的物理计划了,但是在真正提交之前,还有一步就是prepareForExecution,就是准备的操作,其主要的目的是为了优化物理计划,使之满足shuffle和内部行格式。

     /**
       * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal
       * row format conversions as needed.
       */
      protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
        preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
      }
    
      /** A sequence of rules that will be applied in order to the physical plan before execution. */
      protected def preparations: Seq[Rule[SparkPlan]] = Seq(
        python.ExtractPythonUDFs,
        PlanSubqueries(sparkSession),
        EnsureRequirements(sparkSession.sessionState.conf),
        CollapseCodegenStages(sparkSession.sessionState.conf),
        ReuseExchange(sparkSession.sessionState.conf),
        ReuseSubquery(sparkSession.sessionState.conf))
    

      上面的preparations定义了各种准备阶段的规则,有关于python-UDF函数的规则、子查询、确保分区排序准确、代码生成等规则。这里主要看下EnsureRequirements规则,这个规则主要是通过添加Exchange节点保证分区和排序的准确。

    private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {
        val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
        val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering
        var children: Seq[SparkPlan] = operator.children
        assert(requiredChildDistributions.length == children.length)
        assert(requiredChildOrderings.length == children.length)
    
        // Ensure that the operator's children satisfy their output distribution requirements.
        children = children.zip(requiredChildDistributions).map {
              // 1、分区数相同、同时为UnspecifiedDistribution或者为alltuple 并且分区数为1
          case (child, distribution) if child.outputPartitioning.satisfies(distribution) =>
            child
          case (child, BroadcastDistribution(mode)) =>
            BroadcastExchangeExec(mode, child)
          case (child, distribution) =>
            val numPartitions = distribution.requiredNumPartitions
              .getOrElse(defaultNumPreShufflePartitions)
            ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child)
        }
    
        // Get the indexes of children which have specified distribution requirements and need to have
        // same number of partitions.
        val childrenIndexes = requiredChildDistributions.zipWithIndex.filter {
          case (UnspecifiedDistribution, _) => false
          case (_: BroadcastDistribution, _) => false
          case _ => true
        }.map(_._2)
    
        val childrenNumPartitions =
          childrenIndexes.map(children(_).outputPartitioning.numPartitions).toSet
    
        if (childrenNumPartitions.size > 1) {
          // Get the number of partitions which is explicitly required by the distributions.
          val requiredNumPartitions = {
            val numPartitionsSet = childrenIndexes.flatMap {
              index => requiredChildDistributions(index).requiredNumPartitions
            }.toSet
            assert(numPartitionsSet.size <= 1,
              s"$operator have incompatible requirements of the number of partitions for its children")
            numPartitionsSet.headOption
          }
    
          val targetNumPartitions = requiredNumPartitions.getOrElse(childrenNumPartitions.max)
    
          children = children.zip(requiredChildDistributions).zipWithIndex.map {
            case ((child, distribution), index) if childrenIndexes.contains(index) =>
              if (child.outputPartitioning.numPartitions == targetNumPartitions) {
                child
              } else {
                val defaultPartitioning = distribution.createPartitioning(targetNumPartitions)
                child match {
                  // If child is an exchange, we replace it with a new one having defaultPartitioning.
                  case ShuffleExchangeExec(_, c, _) => ShuffleExchangeExec(defaultPartitioning, c)
                  case _ => ShuffleExchangeExec(defaultPartitioning, child)
                }
              }
    
            case ((child, _), _) => child
          }
        }
    
        // Now, we need to add ExchangeCoordinator if necessary.
        // Actually, it is not a good idea to add ExchangeCoordinators while we are adding Exchanges.
        // However, with the way that we plan the query, we do not have a place where we have a
        // global picture of all shuffle dependencies of a post-shuffle stage. So, we add coordinator
        // at here for now.
        // Once we finish https://issues.apache.org/jira/browse/SPARK-10665,
        // we can first add Exchanges and then add coordinator once we have a DAG of query fragments.
        children = withExchangeCoordinator(children, requiredChildDistributions)
    
        // Now that we've performed any necessary shuffles, add sorts to guarantee output orderings:
        children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
          // If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort.
          if (SortOrder.orderingSatisfies(child.outputOrdering, requiredOrdering)) {
            child
          } else {
            SortExec(requiredOrdering, global = false, child = child)
          }
        }
    
        operator.withNewChildren(children)
      }
    

      上面的操作主要可以分成下面几步:

    1. 添加ExChange节点,遍历子节点,会依次判断子节点的分区方式是否满足所需的数据分布,如果不满足,则考虑是否能以广播的形式来满足,如果不行的话就添加shuffleExChange节点,之后会查看所要求的子节点输出(requiredChildDistributions),是否有特殊需求,并且要求有相同的分区数,针对这类对子节点有特殊需求的情况,则会查看每个子节点的输出分区数目,如果匹配不做改变,不然会添加ShuffleExchangeExec节点。
    2. 添加ExchangeCoordinator节点,主要在sql执行的时候进行自适应查询优化; 
    3. 查看requiredChildOrderings针对排序有特殊需求的添加SortExec节点

           这里简述下ExchangeCoordinator:上述2已经在最新的spark源码里面去掉,感兴趣的同学可以下载最新代码研究,这里以ShuffleExchangeExec为例,在注册了ExchangeCoordinator节点之后,会调用doPrepare注册自己到ExchangeCoordinator上,之后在调用doExecute时候,会调用exchangeCoordinator.postShuffleRDD(this),来拿到对应的shuffleRdd。那么没拿到的时候,协调器则会要求注册的Exchange上报pre-shuffle阶段的信息,根据上报的信息来确定post-shuffle的ShuffleRDD分发。

           在经历准备阶段的物理计划之后就可以提交到集群运行了。

          总结:到此spark从解析-> 逻辑计划 -> 物理计划阶段,整个大概的流程已经捋清楚了,但是具体的细节,需要在工作中或者平时有机会的时候再细致研究;如果对spark sql 感兴趣的同学可以看看《spark sql 内核剖析》挺不错的,但是需要结合源码来看,不然有的地方比较难理解。后面会再参考这本书和自己的理解整理聚合和连接的脉络。

  • 相关阅读:
    SQLMAP注入教程-11种常见SQLMAP使用方法详解
    VS2012/2013/2015/Visual Studio 2017 关闭单击文件进行预览的功能
    解决 IIS 反向代理ARR URLREWRITE 设置后,不能跨域跳转 return Redirect 问题
    Spring Data JPA one to one 共享主键关联
    JHipster 问题集中
    Spring Data JPA 定义超类
    Spring Data JPA查询关联数据
    maven命名
    maven仓库
    Jackson读取列表
  • 原文地址:https://www.cnblogs.com/ldsggv/p/13398021.html
Copyright © 2011-2022 走看看