zoukankan      html  css  js  c++  java
  • Spark SQL如何选择join策略

    前言

    众所周知,Catalyst Optimizer是Spark SQL的核心,它主要负责将SQL语句转换成最终的物理执行计划,在一定程度上决定了SQL执行的性能。

    Catalyst在由Optimized Logical Plan生成Physical Plan的过程中,会根据:

    abstract class SparkStrategies extends QueryPlanner[SparkPlan]
    

    中的JoinSelection通过一些规则按照顺序进行模式匹配,从而确定join的最终执行策略,并且策略的选择会按照执行效率由高到低的优先级排列。

    在了解join策略选择之前,首先看几个先决条件:

    1. build table的选择

    Hash Join的第一步就是根据两表之中较小的那一个构建哈希表,这个小表就叫做build table,大表则称为probe table,因为需要拿小表形成的哈希表来"探测"它。源码如下:

    /* 左表作为build table的条件,join类型需满足:
       1. InnerLike:实现目前包括inner join和cross join
       2. RightOuter:right outer join
    */      
    private def canBuildLeft(joinType: JoinType): Boolean = joinType match {
      case _: InnerLike | RightOuter => true
      case _ => false
    }
    
    /* 右表作为build table的条件,join类型需满足(第1种是在业务开发中写的SQL主要适配的):
       1. InnerLike、LeftOuter(left outer join)、LeftSemi(left semi join)、LeftAnti(left anti join)
       2. ExistenceJoin:only used in the end of optimizer and physical plans, we will not generate SQL for this join type
    */
    private def canBuildRight(joinType: JoinType): Boolean = joinType match {
      case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true
      case _ => false
    }
    

    2. 满足什么条件的表才能被广播

    如果一个表的大小小于或等于参数spark.sql.autoBroadcastJoinThreshold(默认10M)配置的值,那么就可以广播该表。源码如下:

    private def canBroadcastBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
      : Boolean = {
      val buildLeft = canBuildLeft(joinType) && canBroadcast(left)
      val buildRight = canBuildRight(joinType) && canBroadcast(right)
      buildLeft || buildRight
    }
    
    private def canBroadcast(plan: LogicalPlan): Boolean = {
      plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
    }
    
    private def broadcastSideBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
      : BuildSide = {
      val buildLeft = canBuildLeft(joinType) && canBroadcast(left)
      val buildRight = canBuildRight(joinType) && canBroadcast(right)
      
      // 最终会调用broadcastSide
      broadcastSide(buildLeft, buildRight, left, right)
    }
    

    除了通过上述表的大小满足一定条件之外,我们也可以通过直接在Spark SQL中显示使用hint方式(/*+ BROADCAST(small_table) */),直接指定要广播的表,源码如下:

    private def canBroadcastByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
      : Boolean = {
      val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
      val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast
      buildLeft || buildRight
    }
    
    private def broadcastSideByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
      : BuildSide = {
      val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
      val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast
      
      // 最终会调用broadcastSide
      broadcastSide(buildLeft, buildRight, left, right)
    }
    

    无论是通过表大小进行广播还是根据是否指定hint进行表广播,最终都会调用broadcastSide,来决定应该广播哪个表:

    private def broadcastSide(
         canBuildLeft: Boolean,
         canBuildRight: Boolean,
         left: LogicalPlan,
         right: LogicalPlan): BuildSide = {
    
       def smallerSide =
         if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft
    
      if (canBuildRight && canBuildLeft) {
        // 如果左表和右表都能作为build table,则将根据表的统计信息,确定physical size较小的表作为build table(即使两个表都被指定了hint)
        smallerSide
      } else if (canBuildRight) {
         // 上述条件不满足,优先判断右表是否满足build条件,满足则广播右表。否则,接着判断左表是否满足build条件
        BuildRight
      } else if (canBuildLeft) {
        BuildLeft
      } else {
        // 如果左表和右表都不能作为build table,则将根据表的统计信息,确定physical size较小的表作为build table。目前主要用于broadcast nested loop join
        smallerSide
      }
    }
    

    从上述源码可知,即使用户指定了广播hint,实际执行时,不一定按照hint的表进行广播。

    3. 是否可构造本地HashMap

    应用于Shuffle Hash Join中,源码如下:

    // 逻辑计划的单个分区足够小到构建一个hash表
    // 注意:要求分区数是固定的。如果分区数是动态的,还需满足其他条件
    private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
      // 逻辑计划的physical size小于spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions(默认200)时,即可构造本地HashMap
      plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
    }
    

    我们知道,SparkSQL目前主要实现了3种join:Broadcast Hash Join、ShuffledHashJoin、Sort Merge Join。那么Catalyst在处理SQL语句时,是依据什么规则进行join策略选择的呢?

    1. Broadcast Hash Join

    主要根据hint和size进行判断是否满足条件。

    // broadcast hints were specified
    case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
       if canBroadcastByHints(joinType, left, right) =>
       val buildSide = broadcastSideByHints(joinType, left, right)
       Seq(joins.BroadcastHashJoinExec(
         leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
    
    // broadcast hints were not specified, so need to infer it from size and configuration.
    case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
       if canBroadcastBySizes(joinType, left, right) =>
       val buildSide = broadcastSideBySizes(joinType, left, right)
       Seq(joins.BroadcastHashJoinExec(
         leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
    

    2. Shuffle Hash Join

    选择Shuffle Hash Join需要同时满足以下条件:   

    1. spark.sql.join.preferSortMergeJoin为false,即Shuffle Hash Join优先于Sort Merge Join

    2. 右表或左表是否能够作为build table  

    3. 是否能构建本地HashMap  

    4. 以右表为例,它的逻辑计划大小要远小于左表大小(默认3倍)

    上述条件优先检查右表。

    case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
        if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)
          && muchSmaller(right, left) ||
          !RowOrdering.isOrderable(leftKeys) =>
       Seq(joins.ShuffledHashJoinExec(
         leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))
    
    case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
         if !conf.preferSortMergeJoin && canBuildLeft(joinType) && uildLocalHashMap(left)
           && muchSmaller(left, right) ||
           !RowOrdering.isOrderable(leftKeys) =>
        Seq(joins.ShuffledHashJoinExec(
          leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))
          
    private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
      a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
    }
    

    如果不满足上述条件,但是如果参与join的表的key无法被排序,即无法使用Sort Merge Join,最终也会选择Shuffle Hash Join。

    !RowOrdering.isOrderable(leftKeys)
    
    def isOrderable(exprs: Seq[Expression]): Boolean = exprs.forall(e => isOrderable(e.dataType))
    

    3. Sort Merge Join

    如果上面两种join策略(Broadcast Hash Join和Shuffle Hash Join)都不符合条件,并且参与join的key是可排序的,就会选择Sort Merge Join。

    case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
       if RowOrdering.isOrderable(leftKeys) =>
       joins.SortMergeJoinExec(
         leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
    

    4. Without joining keys

    Broadcast Hash Join、Shuffle Hash Join和Sort Merge Join都属于经典的ExtractEquiJoinKeys(等值连接条件)。

    对于非ExtractEquiJoinKeys,则会优先检查表是否可以被广播(hint或者size)。如果可以,则会使用BroadcastNestedLoopJoin(简称BNLJ),熟悉Nested Loop Join则不难理解BNLJ,主要却别在于BNLJ加上了广播表。

    源码如下:

    // Pick BroadcastNestedLoopJoin if one side could be broadcast
    case j @ logical.Join(left, right, joinType, condition)
        if canBroadcastByHints(joinType, left, right) =>
      val buildSide = broadcastSideByHints(joinType, left, right)
      joins.BroadcastNestedLoopJoinExec(
        planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
    
    case j @ logical.Join(left, right, joinType, condition)
        if canBroadcastBySizes(joinType, left, right) =>
      val buildSide = broadcastSideBySizes(joinType, left, right)
      joins.BroadcastNestedLoopJoinExec(
        planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
    

    如果表不能被广播,又细分为两种情况: 

    1. 若join类型InnerLike(关于InnerLike上面已有介绍)对量表直接进行笛卡尔积处理若

    2. 上述情况都不满足,最终方案是选择两个表中physical size较小的表进行广播,join策略仍为BNLJ

    源码如下:

    // Pick CartesianProduct for InnerJoin
    case logical.Join(left, right, _: InnerLike, condition) =>
      joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil
    
    case logical.Join(left, right, joinType, condition) =>
      val buildSide = broadcastSide(
        left.stats.hints.broadcast, right.stats.hints.broadcast, left, right)
      // This join could be very slow or OOM
      joins.BroadcastNestedLoopJoinExec(
        planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
    

    很显然,无论SQL语句最终的join策略选择笛卡尔积还是BNLJ,效率都很低,这一点在实际应用中,要尽量避免。

    推荐文章:
    SparkSQL与Hive metastore Parquet转换
    通过Spark生成HFile,并以BulkLoad方式将数据导入到HBase
    Spark SQL 小文件问题处理


    关注微信公众号:大数据学习与分享,获取更对技术干货

  • 相关阅读:
    Mysql登录错误:ERROR 1045 (28000): Plugin caching_sha2_password could not be loaded
    Docker配置LNMP环境
    Docker安装mysqli扩展和gd扩展
    Docker常用命令
    Ubuntu常用命令
    单例模式的优缺点和使用场景
    ABP 多租户数据共享
    ABP Core 后台Angular+Ng-Zorro 图片上传
    ERROR Error: If ngModel is used within a form tag, either the name attribute must be set or the form control must be defined as 'standalone' in ngModelOptions.
    AbpCore 执行迁移文件生成数据库报错 Could not find root folder of the web project!
  • 原文地址:https://www.cnblogs.com/bigdatalearnshare/p/14259884.html
Copyright © 2011-2022 走看看