zoukankan      html  css  js  c++  java
  • spark任务的下推规则

    object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
      // split the condition expression into 3 parts, 
      // (canEvaluateInLeftSide, canEvaluateInRightSide, haveToEvaluateWithBothSide) 
      private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = {
        //T1: a,b,c T2: d,e  
        //select * from T1 join T2 on T1.a = T2.b where T1.b > 3 and T2.e > 4
        val (leftEvaluateCondition, rest) =
            condition.partition(_.references subsetOf left.outputSet)
           //和左孩子有关系的谓词:T1.b > 3
        val (rightEvaluateCondition, commonCondition) = 
            rest.partition(_.references subsetOf right.outputSet)
            //和右孩子有关系的谓词:T2.e > 4
        (leftEvaluateCondition, rightEvaluateCondition, commonCondition)
        //commonCondition是等于吗?这个commonCondition中是不是会经常没有值,
        //当where中有"="条件的时候,commonCondition会有值
      }
     
      def apply(plan: LogicalPlan): LogicalPlan = plan transform {
        // push the where condition down into join filter
        // select * from T1 join T2 on T1.a = T2.b where T1.b > 3 and T2.e > 4
        // 以上这个查询,filter是T1.b > 3 and T2.e > 4
        case f @ Filter(filterCondition, Join(left, right, joinType, joinCondition)) =>
          val (leftFilterConditions, rightFilterConditions, commonFilterCondition) = 
            split(splitConjunctivePredicates(filterCondition), left, right)
            //拆分filter的过滤谓词,splitConjunctivePredicates(filterCondition)返回一个expression的数组
          joinType match {
            case Inner =>
              // push down the single side `where` condition into respective sides
              val newLeft = leftFilterConditions.
                reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
              val newRight = rightFilterConditions.
                reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
              val newJoinCond = (commonFilterCondition ++ joinCondition).reduceLeftOption(And)
             //如果是inner join,很简单,直接下推即可
              Join(newLeft, newRight, Inner, newJoinCond)
            case RightOuter =>
              // push down the right side only `where` condition
              val newLeft = left//作为nullable端的左孩子,where谓词是不能下推的
              val newRight = rightFilterConditions.
                reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)//作为nonullable端的右孩子,加入filter然后下推。
              val newJoinCond = joinCondition//joinCondition留在下面再优化
              val newJoin = Join(newLeft, newRight, RightOuter, newJoinCond)
     
              (leftFilterConditions ++ commonFilterCondition).
                reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin)
               //如果存在左孩子谓词和where中的"="的,放在生成join的上面,如果不存在,直接返回join
            case _ @ (LeftOuter | LeftSemi) =>
              // push down the left side only `where` condition
              val newLeft = leftFilterConditions.
                reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
              val newRight = right
              val newJoinCond = joinCondition
              val newJoin = Join(newLeft, newRight, joinType, newJoinCond)
     
              (rightFilterConditions ++ commonFilterCondition).
                reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin)
            case FullOuter => f // DO Nothing for Full Outer Join
          }
     
          //以上是在处理join上端的filter的下推情况,现在来考虑join中的也就是on之后的filter下推情况
        // push down the join filter into sub query scanning if applicable
        case f @ Join(left, right, joinType, joinCondition) =>
         //同样对join中on后面的连接谓词分类,分为三类
          val (leftJoinConditions, rightJoinConditions, commonJoinCondition) = 
            split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
     
          joinType match {
            case Inner =>
              // push down the single side only join filter for both sides sub queries
              val newLeft = leftJoinConditions.//如果是inner join,可以下推
                reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
              val newRight = rightJoinConditions.//如果是inner join,可以下推
                reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
              val newJoinCond = commonJoinCondition.reduceLeftOption(And)
     
              Join(newLeft, newRight, Inner, newJoinCond)
            case RightOuter =>
              // push down the left side only join filter for left side sub query
              val newLeft = leftJoinConditions.
                reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
              val newRight = right//右端为非空端,不能下推,只能放在join中作为条件
              val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
             //最后将右端没有下推的filter和"="的filter一起存入newJoinCond,作为join的条件
              Join(newLeft, newRight, RightOuter, newJoinCond)
            case _ @ (LeftOuter | LeftSemi) =>
              // push down the right side only join filter for right sub query
              val newLeft = left
              val newRight = rightJoinConditions.
                reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
              val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
     
              Join(newLeft, newRight, joinType, newJoinCond)
            case FullOuter => f
          }
      }
    }
  • 相关阅读:
    Oracle SQL FAQ
    miniasp(no encode)
    请看用javascript设置和读取cookie的简单例子
    asp流下载(Stream)
    (企业公司)网站开发方案
    asp发消息并代多个附件上传(多对多关系)
    tabpage1
    crystal report (asp调用水晶报表实例)
    上海万千文化传播有限公司(网站项目策划书)
    访问和更新Cookies集合
  • 原文地址:https://www.cnblogs.com/txfsheng/p/9101366.html
Copyright © 2011-2022 走看看