zoukankan      html  css  js  c++  java
  • 【原创】大叔经验分享(15)spark sql limit实现原理

    之前讨论过hive中limit的实现,详见 https://www.cnblogs.com/barneywill/p/10109217.html
    下面看spark sql中limit的实现,首先看执行计划:

    spark-sql> explain select * from test1 limit 10;
    == Physical Plan ==
    CollectLimit 10
    +- HiveTableScan [id#35], MetastoreRelation temp, test1
    Time taken: 0.201 seconds, Fetched 1 row(s)

    limit对应的CollectLimit,对应的实现类是

    org.apache.spark.sql.execution.CollectLimitExec

    case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
    ...
      protected override def doExecute(): RDD[InternalRow] = {
        val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit))
        val shuffled = new ShuffledRowRDD(
          ShuffleExchange.prepareShuffleDependency(
            locallyLimited, child.output, SinglePartition, serializer))
        shuffled.mapPartitionsInternal(_.take(limit))
      }

    可见实现非常简单,首先调用SparkPlan.execute得到结果的RDD,然后从每个partition中取前limit个row得到一个新的RDD,然后再将这个新的RDD变成一个分区,然后再取前limit个,这样就得到最终的结果。

  • 相关阅读:
    Unity Ioc框架简单例子
    Newtonsoft.Json.Linq
    Quartz.net
    AngularJS
    Zookeeper
    mysql 游标CURSOR
    mysql 存储过程 CONCAT 字符串拼接
    MD5Util
    生成缩略图
    Asp.net MVC 基于Area的路由映射
  • 原文地址:https://www.cnblogs.com/barneywill/p/10155597.html
Copyright © 2011-2022 走看看