zoukankan      html  css  js  c++  java
  • Spark SQL dropDuplicates

    spark sql 数据去重

    在对spark sql 中的dataframe数据表去除重复数据的时候可以使用dropDuplicates()方法

    dropDuplicates()有4个重载方法

    • 第一个def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns)

    这个方法,不需要传入任何的参数,默认根据所有列进行去重,然后按数据行的顺序保留每行数据出现的第一条。

    /**
       * Returns a new Dataset that contains only the unique rows from this Dataset.
       * This is an alias for `distinct`.
       *
       * For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
       * will keep all data across triggers as intermediate state to drop duplicates rows. You can use
       * [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
       * the state. In addition, too late data older than watermark will be dropped to avoid any
       * possibility of duplicates.
       *
       * @group typedrel
       * @since 2.0.0
       */
      def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns)
    
    
    • 第二个def dropDuplicates(colNames: Seq[String])

    传入的参数是一个序列。你可以在序列中指定你要根据哪些列的重复元素对数据表进行去重,然后也是返回每一行数据出现的第一条

    /**
       * (Scala-specific) Returns a new Dataset with duplicate rows removed, considering only
       * the subset of columns.
       *
       * For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
       * will keep all data across triggers as intermediate state to drop duplicates rows. You can use
       * [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
       * the state. In addition, too late data older than watermark will be dropped to avoid any
       * possibility of duplicates.
       *
       * @group typedrel
       * @since 2.0.0
       */
      def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
        val resolver = sparkSession.sessionState.analyzer.resolver
        val allColumns = queryExecution.analyzed.output
        val groupCols = colNames.toSet.toSeq.flatMap { (colName: String) =>
          // It is possibly there are more than one columns with the same name,
          // so we call filter instead of find.
          val cols = allColumns.filter(col => resolver(col.name, colName))
          if (cols.isEmpty) {
            throw new AnalysisException(
              s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
          }
          cols
        }
        Deduplicate(groupCols, planWithBarrier)
      }
    
    • 第三个def dropDuplicates(colNames: Array[String])

    传入的参数是一个数组,然后方法会把数组转换为序列然后再调用第二个方法。

    /**
       * Returns a new Dataset with duplicate rows removed, considering only
       * the subset of columns.
       *
       * For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
       * will keep all data across triggers as intermediate state to drop duplicates rows. You can use
       * [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
       * the state. In addition, too late data older than watermark will be dropped to avoid any
       * possibility of duplicates.
       *
       * @group typedrel
       * @since 2.0.0
       */
      def dropDuplicates(colNames: Array[String]): Dataset[T] = dropDuplicates(colNames.toSeq)
    
    • 第四个def dropDuplicates(col1: String, cols: String*)

    传入的参数为字符串,在方法体内会把你传入的字符串组合成一个序列再调用第二个方法。

    /**
       * Returns a new [[Dataset]] with duplicate rows removed, considering only
       * the subset of columns.
       *
       * For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
       * will keep all data across triggers as intermediate state to drop duplicates rows. You can use
       * [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
       * the state. In addition, too late data older than watermark will be dropped to avoid any
       * possibility of duplicates.
       *
       * @group typedrel
       * @since 2.0.0
       */
      @scala.annotation.varargs
      def dropDuplicates(col1: String, cols: String*): Dataset[T] = {
        val colNames: Seq[String] = col1 +: cols
        dropDuplicates(colNames)
      }
    

    第三和第四个本质上还是调用了第二个方法,所以我们在使用的时候如果需要根据指定的列进行数据去重,可以直接传入一个Seq。

    第一个方法默认根据所有列去重,实际上也是调用了第二个方法,然后传入参数this.columns,即所有的列组成的Seq。

    所以各位想深究dropDuplicate()去重的核心代码,只需要研究第二个去重方法即可。等我有时间我也会把去重的核心源码讲解继续补充。

    dropDuplicates()的坑!

    在使用dropDuplicates() 在去重的时候,我发现有时候还是会出现重复数据的情况。

    我分析了一下还出现重复数据的原因:

    1. 数据存在多个excuter中

    因为spark是分布式计算的,数据在计算的时候会分布在不同的excutor上,使用dropDuplicate去重的时候,可能只是一个excutor内的数据进行了去重,别的excutor上可能还会有重复的数据。

    1. 数据是存放在不同分区的,

    因为spark是分布式计算的,数据在计算的时候会分散在不同的分区中,使用dropDuplicate去重的时候,不同的区分可能还会存在相同的数据。

    我试了只启动一个excutor多分区的情况下进行计算,没有出现重复的数据,然后多个excutor将数据先合并到一个分区在去重还是有重复的数据。所以觉得可能是第一种猜测的情况比较大,但是如果只使用一个excutor就失去了分布式计算的意义和优势,所以还是得想想其它办法。

    各位有什么好的解决办法也可以在评论区交流!

  • 相关阅读:
    OpenGL——旋转的六边形(动画)
    OpenGL——三维多面体实现
    OpenGL——二维几何变换
    OpenGL——圆公式相关变化的绘制
    OpenGL——折线图柱状图饼图绘制
    图片相似原理--Java实现
    Service 保活法之二
    Service 保活法之一
    Android仿腾讯手机管家实现桌面悬浮窗小火箭发射的动画效果
    让应用在息屏后保活
  • 原文地址:https://www.cnblogs.com/Jaryer/p/13558701.html
Copyright © 2011-2022 走看看