zoukankan      html  css  js  c++  java
  • 【原创】大叔问题定位分享(11)Spark中对大表子查询加limit为什么会报Broadcast超时错误

    当两个表需要join时,如果一个是大表,一个是小表,正常的map-reduce流程需要shuffle,这会导致大表数据在节点间网络传输,常见的优化方式是将小表读到内存中并广播到大表处理,避免shuffle+reduce;

    在hive中叫mapjoin(map-side join),配置为 hive.auto.convert.join

    在spark中叫BroadcastHashJoin (broadcast hash join)

    Spark SQL uses broadcast join (aka broadcast hash join) instead of hash join to optimize join queries when the size of one side data is below spark.sql.autoBroadcastJoinThreshold.

    Broadcast join can be very efficient for joins between a large table (fact) with relatively small tables (dimensions) that could then be used to perform a star-schema join. It can avoid sending all data of the large table over the network.

    有几种方式可以触发:

    1)sql hint (从spark 2.3版本开始支持)

    SELECT /*+ MAPJOIN(b) */ ...
    
    SELECT /*+ BROADCASTJOIN(b) */ ...
    
    SELECT /*+ BROADCAST(b) */ ...

    2)broadcast function:DataFrame.broadcast

    testTable3= testTable1.join(broadcast(testTable2), Seq("id"), "right_outer")

    3)自动优化

    org.apache.spark.sql.execution.SparkStrategies.JoinSelection

        private def canBroadcast(plan: LogicalPlan): Boolean = {
          plan.statistics.isBroadcastable ||
    
            (plan.statistics.sizeInBytes >= 0 &&
    
              plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold)
    
        }

    例如:

    spark-sql> explain select * from big_table1 a, (select * from big_table2 limit 10) b where a.id = b.id;

    18/09/17 18:14:09 339 WARN Utils66: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.

    == Physical Plan ==

    BroadcastHashJoin [id#5], [id#14], Inner, BuildRight

    :- *Filter isnotnull(id#5)

    :  +- HiveTableScan [name#4, id#5], MetastoreRelation big_table1

    +- BroadcastExchange HashedRelationBroadcastMode(List(input[6, string, false]))

       +- Filter isnotnull(id#14)

          +- GlobalLimit 10

             +- Exchange SinglePartition

                +- LocalLimit 10

                   +- HiveTableScan [id#14, ... 187 more fields], MetastoreRelation big_table2

    Time taken: 4.216 seconds, Fetched 1 row(s)

    BroadcastExchange 执行过程为

    org.apache.spark.sql.execution.exchange.BroadcastExchangeExec

      override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
    
        ThreadUtils.awaitResultInForkJoinSafely(relationFuture, timeout)
    
          .asInstanceOf[broadcast.Broadcast[T]]
    
      }

    其中timeout是指spark.sql.broadcastTimeout,默认300s

      private lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {
    
        // broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
    
        val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
    
        Future {
    
          // This will run in another thread. Set the execution id so that we can connect these jobs
    
          // with the correct execution.
    
          SQLExecution.withExecutionId(sparkContext, executionId) {
    
            try {
    
              val beforeCollect = System.nanoTime()
    
              // Note that we use .executeCollect() because we don't want to convert data to Scala types
    
              val input: Array[InternalRow] = child.executeCollect()
    
              if (input.length >= 512000000) {
    
                throw new SparkException(
    
                  s"Cannot broadcast the table with more than 512 millions rows: ${input.length} rows")
    
              }
    
              val beforeBuild = System.nanoTime()
    
              longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000
    
              val dataSize = input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
    
              longMetric("dataSize") += dataSize
    
              if (dataSize >= (8L << 30)) {
    
                throw new SparkException(
    
                  s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB")
    
              }
    
     
    
              // Construct and broadcast the relation.
    
              val relation = mode.transform(input)
    
              val beforeBroadcast = System.nanoTime()
    
              longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000
    
     
    
              val broadcasted = sparkContext.broadcast(relation)
    
              longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000
    
     
    
              SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)
    
              broadcasted

    对一个表broadcast执行过程为首先计算然后collect,然后通过SparkContext broadcast出去,并且执行过程为线程异步执行,超时时间为spark.sql.broadcastTimeout;

  • 相关阅读:
    指针变量的*p,p以及&p的区别
    C 真正理解二级指针
    二叉树Bynary_Tree(2):二叉树的递归遍历
    二叉树Binary_Tree(1):二叉树及其数组实现
    栈stack(2):栈的链表实现
    栈stack(1):栈的数组实现
    队列queue(2):链表实现队列
    老猿学5G扫盲贴:3GPP规范中与计费相关的主要规范文档列表及下载链接
    老猿学5G扫盲贴:推荐三篇介绍HTTP2协议相关的文章
    老猿学5G扫盲贴:中移动的5G计费架构中Nchf'服务化接口以及CHF中的AGF
  • 原文地址:https://www.cnblogs.com/barneywill/p/10109434.html
Copyright © 2011-2022 走看看