zoukankan      html  css  js  c++  java
  • Spark 开源新特性:Catalyst 优化流程裁剪

    摘要:为了解决过多依赖 Hive 的问题, SparkSQL 使用了一个新的 SQL 优化器替代 Hive 中的优化器, 这个优化器就是 Catalyst。

    本文分享自华为云社区《Spark 开源新特性:Catalyst 优化流程裁剪》,作者:hzjturbo 。

    1. 问题背景

    上图是典型的Spark Catalyst优化器的布局,一条由用户输入的SQL,到真实可调度执行的RDD DAG任务,需要经历以下五个阶段:

    • Parser: 将SQL解析成相应的抽象语法树(AST),spark也称为 Unresolved Logical Plan;
    • Analyzer: 通过查找Metadata的Catalog信息,将 Unresolved Logical Plan 变为 Resolved Logical Plan,这个过程会做表、列、数据类型等做校验;
    • Optimizer: 逻辑优化流程,通过一些优化规则对匹配上的Plan做转换,得到优化后的逻辑Plan
    • Planner:根据Optimized Logical Plan的统计信息等转换成相应的Physical Plan
    • Query Execution: 主要是执行前的一些preparations优化,比如AQE, Exchange Reuse, CodeGen stages合并等

    上述的五个阶段中,除了Parser (由Antlr实现),其他的每个阶段都是由一个个规则(Rule)构成,总共大约有200+个,对于不同的规则,还可能需要跑多次,所以对于相对比较复杂的查询,可能得到一个executed Plan都需要耗费数秒。

    Databricks内部基准测试表明,对于TPC-DS查询,每个查询平均调用树转换函数约280k次,这远远超出了必要的范围。因此,我们探索在每个树节点中嵌入BitSet,以传递自身及其子树的信息,并利用计划不变性来修剪不必要的遍历。通过原型实现验证:在TPC-DS基准测试中,我们看到优化的速度约为50%,分析的速度约为30%,整个查询编译的速度约为34%(包括Hive元存储RPC和文件列表)[1]。

    2. 设计实现

    2.1 Tree Pattern Bits and Rule Id Bits

    • Tree pattern bits

    在TreeNode 增加nodePatterns属性,所有继承该类的节点可以通过复写该属性值来标识自己的属性。

    /**
     * @return a sequence of tree pattern enums in a TreeNode T. It does not include propagated
     *         patterns in the subtree of T.
     */
    protected val nodePatterns: Seq[TreePattern] = Seq()

    TreePattern 是一个枚举类型, 对于每个节点/表达式都可以为其设置一个TreePattern方便标识,具体可见 TreePatterns.scala 。

    例如对于Join节点的nodePatterns:

    override val nodePatterns : Seq[TreePattern] = {
      var patterns = Seq(JOIN)
      joinType match {
        case _: InnerLike => patterns = patterns :+ INNER_LIKE_JOIN
        case LeftOuter | FullOuter | RightOuter => patterns = patterns :+ OUTER_JOIN
        case LeftSemiOrAnti(_) => patterns = patterns :+ LEFT_SEMI_OR_ANTI_JOIN
        case NaturalJoin(_) | UsingJoin(_, _) => patterns = patterns :+ NATURAL_LIKE_JOIN
        case _ =>
      }
      patterns
    }
    • Rule ID bits

    将规则ID的缓存BitSet嵌入到每个树/表达式节点T中,这样我们就可以跟踪规则R对于根植于T的子树是有效还是无效。这样,如果R在T上被调用,并且已知R无效,如果R再次应用于T(例如,R位于定点规则批处理中),我们可以跳过它。这个想法最初被用于Cascades optimizer,以加快探索性规划。

    Rule:

    abstract class Rule[TreeType <: TreeNode[_]] extends SQLConfHelper with Logging {
    
      // The integer id of a rule, for pruning unnecessary tree traversals.
      protected lazy val ruleId = RuleIdCollection.getRuleId(this.ruleName)

    TreeNode:

    /**
     * A BitSet of rule ids to record ineffective rules for this TreeNode and its subtree.
     * If a rule R (which does not read a varying, external state for each invocation) is
     * ineffective in one apply call for this TreeNode and its subtree, R will still be
     * ineffective for subsequent apply calls on this tree because query plan structures are
     * immutable.
     */
    private val ineffectiveRules: BitSet = new BitSet(RuleIdCollection.NumRules)

    2.2 Changes to The Transform Function Family

    改造后的transform 方法相比之前的多了两个判断,如下所示

    def transformDownWithPruning(
      cond: TreePatternBits => Boolean, // 判断是否存在可优化的节点,由规则设计者所提供
      ruleId: RuleId = UnknownRuleId // 不会生效的规则ID,自动更新
        )(rule: PartialFunction[BaseType, BaseType]): BaseType = {
      // 如果上述两个条件存在一个不满足,直接跳过本次规则
      if (!cond.apply(this) || isRuleIneffective(ruleId)) {
        return this
      }
      // 执行rule的逻辑
      val afterRule = CurrentOrigin.withOrigin(origin) {
        rule.applyOrElse(this, identity[BaseType])
      }
    
      // Check if unchanged and then possibly return old copy to avoid gc churn.
      if (this fastEquals afterRule) {
        val rewritten_plan = mapChildren(_.transformDownWithPruning(cond, ruleId)(rule))
        // 如果没生效,把规则ID加入到不生效的BitSet里
        if (this eq rewritten_plan) {
          markRuleAsIneffective(ruleId)
          this
        } else {
          rewritten_plan
        }
      } else {
        // If the transform function replaces this node with a new one, carry over the tags.
        afterRule.copyTagsFrom(this)
        afterRule.mapChildren(_.transformDownWithPruning(cond, ruleId)(rule))
      }
    }

    2.3 Changes to An Individual Rule

    规则的例子:

    object OptimizeIn extends Rule[LogicalPlan] with SQLConfHelper {
     def apply(plan: LogicalPlan): LogicalPlan = plan transform ({
       case q: LogicalPlan => q transformExpressionsDown ({
         case In(v, list) if list.isEmpty => ...
         case expr @ In(v, list) if expr.inSetConvertible => ...
       }, _.containsPattern(IN), ruleId) // 必须包含IN
     }, _.containsPattern(IN), ruleId) // 必须包含IN
    }

    3. 测试结果

    在Delta中使用TPC-DS SF10对TPC-DS查询编译时间进行了基准测试。结果如下:

    • 图1显示了查询编译速度;
    • 表1显示了几个关键树遍历函数的调用计数和CPU减少的细分。

    我简单运行了开版本的TPCDSQuerySuite,该测试会把TPCDS的语句解析优化,并且检查下生成的代码(CodeGen),平均耗时的时间为三次运行得到的最优值, 得到的结果如下:

    • 合入PR前[2], 包含156个Tpcds查询,平均总耗时~56s
    • 最新Spark开源代码,包含150个Tpcds查询,平均总耗时~19s

    之所以最新的Tpcds查询比合入PR前的条数少6条,是因为后续有个减少重复TPCDS的PR。总时长优化前是优化后的两倍多。

    参考引用

    [1]. [SPARK-34916] Tree Traversal Pruning for Catalyst Transform/Resolve Function Families. SISP

    [2]. [SPARK-35544][SQL] Add tree pattern pruning to Analyzer rules.

    [3]. Building a SIMD Supported Vectorized Native Engine for Spark SQL. link

     

    点击关注,第一时间了解华为云新鲜技术~

  • 相关阅读:
    LeetCode 1122. Relative Sort Array (数组的相对排序)
    LeetCode 46. Permutations (全排列)
    LeetCode 47. Permutations II (全排列 II)
    LeetCode 77. Combinations (组合)
    LeetCode 1005. Maximize Sum Of Array After K Negations (K 次取反后最大化的数组和)
    LeetCode 922. Sort Array By Parity II (按奇偶排序数组 II)
    LeetCode 1219. Path with Maximum Gold (黄金矿工)
    LeetCode 1029. Two City Scheduling (两地调度)
    LeetCode 392. Is Subsequence (判断子序列)
    写程序判断系统是大端序还是小端序
  • 原文地址:https://www.cnblogs.com/huaweiyun/p/15047855.html
Copyright © 2011-2022 走看看