zoukankan      html  css  js  c++  java
  • Spark SQL(7)- physicalPlan 物理计划

    Spark SQL(7)- physicalPlan 物理计划

    spark的物理计划是将得到的逻辑算子树进一步的处理,得到针对RDD的一系列操作的集合,之后提交作业到spark集群。

    spark物理计划的生成主要经历了一下的步骤:

    1. 应用sparkPlanner中定义的各种策略(strategy), 这里sparkPlanner是SparkStrategy的子类,sparkPlanner中的各种策略都是在SparkStrategy中定义的,这里面还有一个PlanLater类型的代表叶子节点的sparkplan,其主要的作用就是延迟解析,这个在后面应用规则的时候会使用到;
    2. 选取一个物理计划,目前直接在返回的迭代器里面获取了第一个元素
    3. 准备物理计划,这个过程中会针对拿到的物理计划进行一些分区、排序等方面的处理,这一步也是根据各种各种定义好的规则来进行的。

    SparkPlanner简介

    sparkPlanner可以理解为物理计划针对RDD操作的描述,一般分为几大类:

    1. LeafExecNode 叶子节点 主要和数据源相关,用户创建RDD

    2. UnaryExecNode 一元节点  主要是针对RDD的转换操作

    3. BinaryExecNode 二元节点 join操作就属于这类

    4. 其他类型的节点 

     这里面有几个重要的成员变量和方法:

    1. metrics指标信息;
    2. outputPartitioning: Partitioning以及outputOrdering: Seq[SortOrder]  描述了分区和排序的信息,子类都有各自的实现;

    3. execute和doExecute方法,sparkPlanner对外提供了统一的调用触发RDD的方法execute,但是实际子类继承实现的doExecute方法。

    sparkPlan物理计划的生成

    sparkPlan的生成是在QueryExecution.scala中

    lazy val sparkPlan: SparkPlan = {
        SparkSession.setActiveSession(sparkSession)
        // TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
        //       but we will implement to choose the best plan.
        planner.plan(ReturnAnswer(optimizedPlan)).next()
      }
    

      这里调用了Queryplan.plan方法:

      def strategies: Seq[GenericStrategy[PhysicalPlan]]
    
      def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
        // Obviously a lot to do here still...
    
        // Collect physical plan candidates.
        val candidates = strategies.iterator.flatMap(_(plan))
    
        // The candidates may contain placeholders marked as [[planLater]],
        // so try to replace them by their child plans.
        val plans = candidates.flatMap { candidate =>
          val placeholders = collectPlaceholders(candidate)
    
          if (placeholders.isEmpty) {
            // Take the candidate as is because it does not contain placeholders.
            Iterator(candidate)
          } else {
            // Plan the logical plan marked as [[planLater]] and replace the placeholders.
            placeholders.iterator.foldLeft(Iterator(candidate)) {
              case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
                // Plan the logical plan for the placeholder.
                val childPlans = this.plan(logicalPlan)
    
                candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
                  childPlans.map { childPlan =>
                    // Replace the placeholder by the child plan
                    candidateWithPlaceholders.transformUp {
                      case p if p == placeholder => childPlan
                    }
                  }
                }
            }
          }
        }
    
        val pruned = prunePlans(plans)
        assert(pruned.hasNext, s"No plan for $plan")
        pruned
      }
    

      QueryPlan有点类似spark逻辑计划阶段的RulExecutor的角色,在QueryPlan同样定义了针对逻辑计划转化为物理计划的规则(strategy),但是具体的规则策略需要子类实现(sparkPlanner);因此类似逻辑计划阶段,我们主要关注sparkplanner中重新定义的策略规则即可,但是在此之前上述QueryPlan.plan方法的逻辑还是有必要理一理的这样也利于理解后面的规则,plan其实就是事先定义好的各种规则,但是在应用规则的时候,不一定一次性全部解析完毕,所以需要上面提到的PlanLater节点来占位,这么做之后,在一次迭代之后,会统计PlanLater节点,之后替换成其子节点,继续应用plan方法,直到所有的节点都解析完毕,此时也就相当于物理计划转换完成。下面简单看下sparkPlanner中定义的strategy有哪些:

     override def strategies: Seq[Strategy] =
        experimentalMethods.extraStrategies ++
          extraPlanningStrategies ++ (
          DataSourceV2Strategy :: 
          FileSourceStrategy :: 
          DataSourceStrategy(conf) :: 
          SpecialLimits ::
          Aggregation ::
          JoinSelection ::
          InMemoryScans ::
          BasicOperators :: Nil)
    

      这里面根据名字大概能明白策略的含义,数据源相关:DataSourceV2Strategy和DataSourceStrategy(conf) 关于他俩个区别感兴趣的同学可以自己研究下,文件数据源:FileSource,聚合:Aggregation,连接: JoinSelection 等等。

          以上之后相当于生成了物理计划,之后获取第一条便是后面计算需要的物理计划了,但是在真正提交之前,还有一步就是prepareForExecution,就是准备的操作,其主要的目的是为了优化物理计划,使之满足shuffle和内部行格式。

     /**
       * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal
       * row format conversions as needed.
       */
      protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
        preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
      }
    
      /** A sequence of rules that will be applied in order to the physical plan before execution. */
      protected def preparations: Seq[Rule[SparkPlan]] = Seq(
        python.ExtractPythonUDFs,
        PlanSubqueries(sparkSession),
        EnsureRequirements(sparkSession.sessionState.conf),
        CollapseCodegenStages(sparkSession.sessionState.conf),
        ReuseExchange(sparkSession.sessionState.conf),
        ReuseSubquery(sparkSession.sessionState.conf))
    

      上面的preparations定义了各种准备阶段的规则,有关于python-UDF函数的规则、子查询、确保分区排序准确、代码生成等规则。这里主要看下EnsureRequirements规则,这个规则主要是通过添加Exchange节点保证分区和排序的准确。

    private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {
        val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
        val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering
        var children: Seq[SparkPlan] = operator.children
        assert(requiredChildDistributions.length == children.length)
        assert(requiredChildOrderings.length == children.length)
    
        // Ensure that the operator's children satisfy their output distribution requirements.
        children = children.zip(requiredChildDistributions).map {
              // 1、分区数相同、同时为UnspecifiedDistribution或者为alltuple 并且分区数为1
          case (child, distribution) if child.outputPartitioning.satisfies(distribution) =>
            child
          case (child, BroadcastDistribution(mode)) =>
            BroadcastExchangeExec(mode, child)
          case (child, distribution) =>
            val numPartitions = distribution.requiredNumPartitions
              .getOrElse(defaultNumPreShufflePartitions)
            ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child)
        }
    
        // Get the indexes of children which have specified distribution requirements and need to have
        // same number of partitions.
        val childrenIndexes = requiredChildDistributions.zipWithIndex.filter {
          case (UnspecifiedDistribution, _) => false
          case (_: BroadcastDistribution, _) => false
          case _ => true
        }.map(_._2)
    
        val childrenNumPartitions =
          childrenIndexes.map(children(_).outputPartitioning.numPartitions).toSet
    
        if (childrenNumPartitions.size > 1) {
          // Get the number of partitions which is explicitly required by the distributions.
          val requiredNumPartitions = {
            val numPartitionsSet = childrenIndexes.flatMap {
              index => requiredChildDistributions(index).requiredNumPartitions
            }.toSet
            assert(numPartitionsSet.size <= 1,
              s"$operator have incompatible requirements of the number of partitions for its children")
            numPartitionsSet.headOption
          }
    
          val targetNumPartitions = requiredNumPartitions.getOrElse(childrenNumPartitions.max)
    
          children = children.zip(requiredChildDistributions).zipWithIndex.map {
            case ((child, distribution), index) if childrenIndexes.contains(index) =>
              if (child.outputPartitioning.numPartitions == targetNumPartitions) {
                child
              } else {
                val defaultPartitioning = distribution.createPartitioning(targetNumPartitions)
                child match {
                  // If child is an exchange, we replace it with a new one having defaultPartitioning.
                  case ShuffleExchangeExec(_, c, _) => ShuffleExchangeExec(defaultPartitioning, c)
                  case _ => ShuffleExchangeExec(defaultPartitioning, child)
                }
              }
    
            case ((child, _), _) => child
          }
        }
    
        // Now, we need to add ExchangeCoordinator if necessary.
        // Actually, it is not a good idea to add ExchangeCoordinators while we are adding Exchanges.
        // However, with the way that we plan the query, we do not have a place where we have a
        // global picture of all shuffle dependencies of a post-shuffle stage. So, we add coordinator
        // at here for now.
        // Once we finish https://issues.apache.org/jira/browse/SPARK-10665,
        // we can first add Exchanges and then add coordinator once we have a DAG of query fragments.
        children = withExchangeCoordinator(children, requiredChildDistributions)
    
        // Now that we've performed any necessary shuffles, add sorts to guarantee output orderings:
        children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
          // If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort.
          if (SortOrder.orderingSatisfies(child.outputOrdering, requiredOrdering)) {
            child
          } else {
            SortExec(requiredOrdering, global = false, child = child)
          }
        }
    
        operator.withNewChildren(children)
      }
    

      上面的操作主要可以分成下面几步:

    1. 添加ExChange节点,遍历子节点,会依次判断子节点的分区方式是否满足所需的数据分布,如果不满足,则考虑是否能以广播的形式来满足,如果不行的话就添加shuffleExChange节点,之后会查看所要求的子节点输出(requiredChildDistributions),是否有特殊需求,并且要求有相同的分区数,针对这类对子节点有特殊需求的情况,则会查看每个子节点的输出分区数目,如果匹配不做改变,不然会添加ShuffleExchangeExec节点。
    2. 添加ExchangeCoordinator节点,主要在sql执行的时候进行自适应查询优化; 
    3. 查看requiredChildOrderings针对排序有特殊需求的添加SortExec节点

           这里简述下ExchangeCoordinator:上述2已经在最新的spark源码里面去掉,感兴趣的同学可以下载最新代码研究,这里以ShuffleExchangeExec为例,在注册了ExchangeCoordinator节点之后,会调用doPrepare注册自己到ExchangeCoordinator上,之后在调用doExecute时候,会调用exchangeCoordinator.postShuffleRDD(this),来拿到对应的shuffleRdd。那么没拿到的时候,协调器则会要求注册的Exchange上报pre-shuffle阶段的信息,根据上报的信息来确定post-shuffle的ShuffleRDD分发。

           在经历准备阶段的物理计划之后就可以提交到集群运行了。

          总结:到此spark从解析-> 逻辑计划 -> 物理计划阶段,整个大概的流程已经捋清楚了,但是具体的细节,需要在工作中或者平时有机会的时候再细致研究;如果对spark sql 感兴趣的同学可以看看《spark sql 内核剖析》挺不错的,但是需要结合源码来看,不然有的地方比较难理解。后面会再参考这本书和自己的理解整理聚合和连接的脉络。

  • 相关阅读:
    MFC 按钮
    读写文件
    遍历一个文件夹所有文件
    Java的运行机制
    selenium学习笔记——高级操作
    selenium学习笔记——定位元素
    selenium学习笔记——介绍&环境准备
    搭建安卓系统的测试环境
    Linux下Java环境的安装与配置
    Linux的目录结构介绍
  • 原文地址:https://www.cnblogs.com/ldsggv/p/13398021.html
Copyright © 2011-2022 走看看