zoukankan      html  css  js  c++  java
  • spark任务的下推规则

    object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
      // split the condition expression into 3 parts, 
      // (canEvaluateInLeftSide, canEvaluateInRightSide, haveToEvaluateWithBothSide) 
      private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = {
        //T1: a,b,c T2: d,e  
        //select * from T1 join T2 on T1.a = T2.b where T1.b > 3 and T2.e > 4
        val (leftEvaluateCondition, rest) =
            condition.partition(_.references subsetOf left.outputSet)
           //和左孩子有关系的谓词:T1.b > 3
        val (rightEvaluateCondition, commonCondition) = 
            rest.partition(_.references subsetOf right.outputSet)
            //和右孩子有关系的谓词:T2.e > 4
        (leftEvaluateCondition, rightEvaluateCondition, commonCondition)
        //commonCondition是等于吗?这个commonCondition中是不是会经常没有值,
        //当where中有"="条件的时候,commonCondition会有值
      }
     
      def apply(plan: LogicalPlan): LogicalPlan = plan transform {
        // push the where condition down into join filter
        // select * from T1 join T2 on T1.a = T2.b where T1.b > 3 and T2.e > 4
        // 以上这个查询,filter是T1.b > 3 and T2.e > 4
        case f @ Filter(filterCondition, Join(left, right, joinType, joinCondition)) =>
          val (leftFilterConditions, rightFilterConditions, commonFilterCondition) = 
            split(splitConjunctivePredicates(filterCondition), left, right)
            //拆分filter的过滤谓词,splitConjunctivePredicates(filterCondition)返回一个expression的数组
          joinType match {
            case Inner =>
              // push down the single side `where` condition into respective sides
              val newLeft = leftFilterConditions.
                reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
              val newRight = rightFilterConditions.
                reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
              val newJoinCond = (commonFilterCondition ++ joinCondition).reduceLeftOption(And)
             //如果是inner join,很简单,直接下推即可
              Join(newLeft, newRight, Inner, newJoinCond)
            case RightOuter =>
              // push down the right side only `where` condition
              val newLeft = left//作为nullable端的左孩子,where谓词是不能下推的
              val newRight = rightFilterConditions.
                reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)//作为nonullable端的右孩子,加入filter然后下推。
              val newJoinCond = joinCondition//joinCondition留在下面再优化
              val newJoin = Join(newLeft, newRight, RightOuter, newJoinCond)
     
              (leftFilterConditions ++ commonFilterCondition).
                reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin)
               //如果存在左孩子谓词和where中的"="的,放在生成join的上面,如果不存在,直接返回join
            case _ @ (LeftOuter | LeftSemi) =>
              // push down the left side only `where` condition
              val newLeft = leftFilterConditions.
                reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
              val newRight = right
              val newJoinCond = joinCondition
              val newJoin = Join(newLeft, newRight, joinType, newJoinCond)
     
              (rightFilterConditions ++ commonFilterCondition).
                reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin)
            case FullOuter => f // DO Nothing for Full Outer Join
          }
     
          //以上是在处理join上端的filter的下推情况,现在来考虑join中的也就是on之后的filter下推情况
        // push down the join filter into sub query scanning if applicable
        case f @ Join(left, right, joinType, joinCondition) =>
         //同样对join中on后面的连接谓词分类,分为三类
          val (leftJoinConditions, rightJoinConditions, commonJoinCondition) = 
            split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
     
          joinType match {
            case Inner =>
              // push down the single side only join filter for both sides sub queries
              val newLeft = leftJoinConditions.//如果是inner join,可以下推
                reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
              val newRight = rightJoinConditions.//如果是inner join,可以下推
                reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
              val newJoinCond = commonJoinCondition.reduceLeftOption(And)
     
              Join(newLeft, newRight, Inner, newJoinCond)
            case RightOuter =>
              // push down the left side only join filter for left side sub query
              val newLeft = leftJoinConditions.
                reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
              val newRight = right//右端为非空端,不能下推,只能放在join中作为条件
              val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
             //最后将右端没有下推的filter和"="的filter一起存入newJoinCond,作为join的条件
              Join(newLeft, newRight, RightOuter, newJoinCond)
            case _ @ (LeftOuter | LeftSemi) =>
              // push down the right side only join filter for right sub query
              val newLeft = left
              val newRight = rightJoinConditions.
                reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
              val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
     
              Join(newLeft, newRight, joinType, newJoinCond)
            case FullOuter => f
          }
      }
    }
  • 相关阅读:
    Exchange 2013学习笔记七:邮箱导入导出
    Exchange 2013学习笔记六:邮件归档
    Exchange 2013学习笔记五:资源邮箱
    Exchange 2013学习笔记四:新建用户邮箱
    Exchange 2013学习笔记三:创建邮箱数据库
    Exchange 2013学习笔记二:邮箱访问
    Exchange 2013学习笔记一:Exchange Server 2013安装
    域学习笔记十五:在活动目录中清除彻底失败的域控制器信息
    域学习笔记十四:迁移操作主控
    域学习笔记十三:将域控制器迁移到新买的服务器
  • 原文地址:https://www.cnblogs.com/txfsheng/p/9101366.html
Copyright © 2011-2022 走看看