Spark SQL(7)- physicalPlan 物理计划
spark的物理计划是将得到的逻辑算子树进一步的处理,得到针对RDD的一系列操作的集合,之后提交作业到spark集群。
spark物理计划的生成主要经历了一下的步骤:
- 应用sparkPlanner中定义的各种策略(strategy), 这里sparkPlanner是SparkStrategy的子类,sparkPlanner中的各种策略都是在SparkStrategy中定义的,这里面还有一个PlanLater类型的代表叶子节点的sparkplan,其主要的作用就是延迟解析,这个在后面应用规则的时候会使用到;
- 选取一个物理计划,目前直接在返回的迭代器里面获取了第一个元素
- 准备物理计划,这个过程中会针对拿到的物理计划进行一些分区、排序等方面的处理,这一步也是根据各种各种定义好的规则来进行的。
SparkPlanner简介
sparkPlanner可以理解为物理计划针对RDD操作的描述,一般分为几大类:
-
LeafExecNode 叶子节点 主要和数据源相关,用户创建RDD
-
UnaryExecNode 一元节点 主要是针对RDD的转换操作
-
BinaryExecNode 二元节点 join操作就属于这类
- 其他类型的节点
这里面有几个重要的成员变量和方法:
- metrics指标信息;
-
outputPartitioning: Partitioning以及outputOrdering: Seq[SortOrder] 描述了分区和排序的信息,子类都有各自的实现;
-
execute和doExecute方法,sparkPlanner对外提供了统一的调用触发RDD的方法execute,但是实际子类继承实现的doExecute方法。
sparkPlan物理计划的生成
sparkPlan的生成是在QueryExecution.scala中
lazy val sparkPlan: SparkPlan = { SparkSession.setActiveSession(sparkSession) // TODO: We use next(), i.e. take the first plan returned by the planner, here for now, // but we will implement to choose the best plan. planner.plan(ReturnAnswer(optimizedPlan)).next() }
这里调用了Queryplan.plan方法:
def strategies: Seq[GenericStrategy[PhysicalPlan]] def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still... // Collect physical plan candidates. val candidates = strategies.iterator.flatMap(_(plan)) // The candidates may contain placeholders marked as [[planLater]], // so try to replace them by their child plans. val plans = candidates.flatMap { candidate => val placeholders = collectPlaceholders(candidate) if (placeholders.isEmpty) { // Take the candidate as is because it does not contain placeholders. Iterator(candidate) } else { // Plan the logical plan marked as [[planLater]] and replace the placeholders. placeholders.iterator.foldLeft(Iterator(candidate)) { case (candidatesWithPlaceholders, (placeholder, logicalPlan)) => // Plan the logical plan for the placeholder. val childPlans = this.plan(logicalPlan) candidatesWithPlaceholders.flatMap { candidateWithPlaceholders => childPlans.map { childPlan => // Replace the placeholder by the child plan candidateWithPlaceholders.transformUp { case p if p == placeholder => childPlan } } } } } } val pruned = prunePlans(plans) assert(pruned.hasNext, s"No plan for $plan") pruned }
QueryPlan有点类似spark逻辑计划阶段的RulExecutor的角色,在QueryPlan同样定义了针对逻辑计划转化为物理计划的规则(strategy),但是具体的规则策略需要子类实现(sparkPlanner);因此类似逻辑计划阶段,我们主要关注sparkplanner中重新定义的策略规则即可,但是在此之前上述QueryPlan.plan方法的逻辑还是有必要理一理的这样也利于理解后面的规则,plan其实就是事先定义好的各种规则,但是在应用规则的时候,不一定一次性全部解析完毕,所以需要上面提到的PlanLater节点来占位,这么做之后,在一次迭代之后,会统计PlanLater节点,之后替换成其子节点,继续应用plan方法,直到所有的节点都解析完毕,此时也就相当于物理计划转换完成。下面简单看下sparkPlanner中定义的strategy有哪些:
override def strategies: Seq[Strategy] = experimentalMethods.extraStrategies ++ extraPlanningStrategies ++ ( DataSourceV2Strategy :: FileSourceStrategy :: DataSourceStrategy(conf) :: SpecialLimits :: Aggregation :: JoinSelection :: InMemoryScans :: BasicOperators :: Nil)
这里面根据名字大概能明白策略的含义,数据源相关:DataSourceV2Strategy和DataSourceStrategy(conf) 关于他俩个区别感兴趣的同学可以自己研究下,文件数据源:FileSource,聚合:Aggregation,连接: JoinSelection 等等。
以上之后相当于生成了物理计划,之后获取第一条便是后面计算需要的物理计划了,但是在真正提交之前,还有一步就是prepareForExecution,就是准备的操作,其主要的目的是为了优化物理计划,使之满足shuffle和内部行格式。
/** * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal * row format conversions as needed. */ protected def prepareForExecution(plan: SparkPlan): SparkPlan = { preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) } } /** A sequence of rules that will be applied in order to the physical plan before execution. */ protected def preparations: Seq[Rule[SparkPlan]] = Seq( python.ExtractPythonUDFs, PlanSubqueries(sparkSession), EnsureRequirements(sparkSession.sessionState.conf), CollapseCodegenStages(sparkSession.sessionState.conf), ReuseExchange(sparkSession.sessionState.conf), ReuseSubquery(sparkSession.sessionState.conf))
上面的preparations定义了各种准备阶段的规则,有关于python-UDF函数的规则、子查询、确保分区排序准确、代码生成等规则。这里主要看下EnsureRequirements规则,这个规则主要是通过添加Exchange节点保证分区和排序的准确。
private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = { val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering var children: Seq[SparkPlan] = operator.children assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { // 1、分区数相同、同时为UnspecifiedDistribution或者为alltuple 并且分区数为1 case (child, distribution) if child.outputPartitioning.satisfies(distribution) => child case (child, BroadcastDistribution(mode)) => BroadcastExchangeExec(mode, child) case (child, distribution) => val numPartitions = distribution.requiredNumPartitions .getOrElse(defaultNumPreShufflePartitions) ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child) } // Get the indexes of children which have specified distribution requirements and need to have // same number of partitions. val childrenIndexes = requiredChildDistributions.zipWithIndex.filter { case (UnspecifiedDistribution, _) => false case (_: BroadcastDistribution, _) => false case _ => true }.map(_._2) val childrenNumPartitions = childrenIndexes.map(children(_).outputPartitioning.numPartitions).toSet if (childrenNumPartitions.size > 1) { // Get the number of partitions which is explicitly required by the distributions. val requiredNumPartitions = { val numPartitionsSet = childrenIndexes.flatMap { index => requiredChildDistributions(index).requiredNumPartitions }.toSet assert(numPartitionsSet.size <= 1, s"$operator have incompatible requirements of the number of partitions for its children") numPartitionsSet.headOption } val targetNumPartitions = requiredNumPartitions.getOrElse(childrenNumPartitions.max) children = children.zip(requiredChildDistributions).zipWithIndex.map { case ((child, distribution), index) if childrenIndexes.contains(index) => if (child.outputPartitioning.numPartitions == targetNumPartitions) { child } else { val defaultPartitioning = distribution.createPartitioning(targetNumPartitions) child match { // If child is an exchange, we replace it with a new one having defaultPartitioning. case ShuffleExchangeExec(_, c, _) => ShuffleExchangeExec(defaultPartitioning, c) case _ => ShuffleExchangeExec(defaultPartitioning, child) } } case ((child, _), _) => child } } // Now, we need to add ExchangeCoordinator if necessary. // Actually, it is not a good idea to add ExchangeCoordinators while we are adding Exchanges. // However, with the way that we plan the query, we do not have a place where we have a // global picture of all shuffle dependencies of a post-shuffle stage. So, we add coordinator // at here for now. // Once we finish https://issues.apache.org/jira/browse/SPARK-10665, // we can first add Exchanges and then add coordinator once we have a DAG of query fragments. children = withExchangeCoordinator(children, requiredChildDistributions) // Now that we've performed any necessary shuffles, add sorts to guarantee output orderings: children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) => // If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort. if (SortOrder.orderingSatisfies(child.outputOrdering, requiredOrdering)) { child } else { SortExec(requiredOrdering, global = false, child = child) } } operator.withNewChildren(children) }
上面的操作主要可以分成下面几步:
- 添加ExChange节点,遍历子节点,会依次判断子节点的分区方式是否满足所需的数据分布,如果不满足,则考虑是否能以广播的形式来满足,如果不行的话就添加shuffleExChange节点,之后会查看所要求的子节点输出(requiredChildDistributions),是否有特殊需求,并且要求有相同的分区数,针对这类对子节点有特殊需求的情况,则会查看每个子节点的输出分区数目,如果匹配不做改变,不然会添加ShuffleExchangeExec节点。
- 添加ExchangeCoordinator节点,主要在sql执行的时候进行自适应查询优化;
- 查看requiredChildOrderings针对排序有特殊需求的添加SortExec节点
这里简述下ExchangeCoordinator:上述2已经在最新的spark源码里面去掉,感兴趣的同学可以下载最新代码研究,这里以ShuffleExchangeExec为例,在注册了ExchangeCoordinator节点之后,会调用doPrepare注册自己到ExchangeCoordinator上,之后在调用doExecute时候,会调用exchangeCoordinator.postShuffleRDD(this),来拿到对应的shuffleRdd。那么没拿到的时候,协调器则会要求注册的Exchange上报pre-shuffle阶段的信息,根据上报的信息来确定post-shuffle的ShuffleRDD分发。
在经历准备阶段的物理计划之后就可以提交到集群运行了。
总结:到此spark从解析-> 逻辑计划 -> 物理计划阶段,整个大概的流程已经捋清楚了,但是具体的细节,需要在工作中或者平时有机会的时候再细致研究;如果对spark sql 感兴趣的同学可以看看《spark sql 内核剖析》挺不错的,但是需要结合源码来看,不然有的地方比较难理解。后面会再参考这本书和自己的理解整理聚合和连接的脉络。