zoukankan      html  css  js  c++  java
  • Spark:The Definitive Book第七章笔记

    分组的类型:

    The simplest grouping is to just summarize a complete DataFrame by performing an aggregation in a select statement.

    A “group by” allows you to specify one or more keys as well as one or more aggregation functions to transform the value columns.

    A “window” gives you the ability to specify one or more keys as well as one or more aggregation functions to transform the value columns. However, the rows input to the function are somehow related to the current row.

    A “grouping set,” which you can use to aggregate at multiple different levels. Grouping sets are available as a primitive in SQL and via rollups and cubes in DataFrames.

    A “rollup” makes it possible for you to specify one or more keys as well as one or more aggregation functions to transform the value columns, which will be summarized hierarchically.

    A “cube” allows you to specify one or more keys as well as one or more aggregation functions to transform the value columns, which will be summarized across all combinations of columns.

    每次分组产生RelationalGroupedDataset。

    在交互式查询与hot Analysis中,Spark提供了精度与速度的权衡。

    count() 最简单的聚合,是一个action,不是transformation。

    
    val dataPath = "data/retail-data/all/*.csv"
    
    val df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(dataPath).coalesce(5)
    
    df.cache()
    
    df.createOrReplaceTempView("dfTable")
    
    df.count()
    
    

    Aggregation Functions

    • 转换版 的count()

    we can do one of two things: specify a specific column to count, or all the columns by using count(*) or count(1) to represent that we want to count every row as the literal one

    
    import org.apache.spark.sql.functions.count
    
    df.select(count("StockCode")).show() 
    
    

    当count(*),Spark会计算包含null的行,当单独在某些列上用count时,不计算null

    • countDistinct
    
    // in Scala
    
    import org.apache.spark.sql.functions.countDistinct
    
    df.select(countDistinct("StockCode")).show() // 4070
    
    
    • approx_count_distinct
    
    // in Scala
    
    import org.apache.spark.sql.functions.approx_count_distinct
    
    df.select(approx_count_distinct("StockCode", 0.1)).show() // 3364
    
    

    You will notice that approx_count_distinct took another parameter with which you can specify the maximum estimation error allowed.这样有很大性能提升。

    • first、last

    This will be based on the rows in the DataFrame, not on the values in the DataFrame

    
    // in Scala
    
    import org.apache.spark.sql.functions.{first, last}
    
    df.select(first("StockCode"), last("StockCode")).show()
    
    
    • min、max

    • sum

    • sumDistinct

    • avg、mean

    • Variance and Standard Deviation

    By default, Spark performs the formula for the sample standard deviation or variance if you use the variance or stddev functions.

    
    // in Scala
    
    import org.apache.spark.sql.functions.{var_pop, stddev_pop}
    
    import org.apache.spark.sql.functions.{var_samp, stddev_samp}
    
    df.select(var_pop("Quantity"), var_samp("Quantity"),
    
      stddev_pop("Quantity"), stddev_samp("Quantity")).show()
    
    
    • skewness and kurtosis

    Skewness and kurtosis are both measurements of extreme points in your data. Skewness measures the asymmetry of the values in your data around the mean, whereas kurtosis is a measure of the tail of data.

    
    import org.apache.spark.sql.functions.{skewness, kurtosis}
    
    df.select(skewness("Quantity"), kurtosis("Quantity")).show()
    
    
    • Covariance and Correlation

    cov() 协方差

    corr()相关性

    
    import org.apache.spark.sql.functions.{corr, covar_pop, covar_samp}
    
    df.select(corr("InvoiceNo", "Quantity"), covar_samp("InvoiceNo", "Quantity"),
    
        covar_pop("InvoiceNo", "Quantity")).show()
    
    
    • Aggregating to Complex Types

    collect_set

    collect_list

    agg

    
    import org.apache.spark.sql.functions.{collect_set, collect_list}
    
    df.agg(collect_set("Country"), collect_list("Country")).show()
    
    

    Grouping

    First we specify the column(s) on which we would like to group, and then we specify.the aggregation(s). The first step returns a RelationalGroupedDataset, and the second step returns a DataFrame.

    • Grouping with Expressions

    Rather than passing that function as an expression into a select statement, we specify it as within agg. This makes it possible for you to pass-in arbitrary expressions that just need to have some aggregation specified.

    
    import org.apache.spark.sql.functions.{count, expr}
    
    df.groupBy("InvoiceNo").agg(count("Quantity").alias("quan"),expr("count(Quantity)")).show()
    
    
    • Grouping with Maps

    Sometimes, it can be easier to specify your transformations as a series of Maps for which the key is the column, and the value is the aggregation function (as a string) that you would like to perform. You can reuse multiple column names if you specify them inline, as well.

    
    // in Scala
    
    df.groupBy("InvoiceNo").agg("Quantity"->"avg", "Quantity"->"stddev_pop").show()
    
    

    Window Functions

    Spark supports three kinds of window functions: ranking functions, analytic functions, and aggregate functions.

    
    import org.apache.spark.sql.functions.{col, to_date}
    
    val dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"),
    
      "MM/d/yyyy H:mm"))
    
    dfWithDate.createOrReplaceTempView("dfWithDate")
    
    

    The first step to a window function is to create a window specification. Note that the partition by is unrelated to the partitioning scheme concept that we have covered thus far. It’s just a similar concept that describes how we will be breaking up our group. The ordering determines the ordering within a given partition, and, finally, the frame specification (the rowsBetween statement) states which rows will be included in the frame based on its reference to the current input row.In the following example, we look at all previous rows up to the current row:

    
    // in Scala
    
    import org.apache.spark.sql.expressions.Window
    
    import org.apache.spark.sql.functions.col
    
    val windowSpec = Window
    
      .partitionBy("CustomerId", "date")
    
      .orderBy(col("Quantity").desc)
    
      .rowsBetween(Window.unboundedPreceding, Window.currentRow)
    
    

    Now we want to use an aggregation function to learn more about each specific customer. An example might be establishing the maximum purchase quantity over all time. To answer this, we use the same aggregation functions that we saw earlier by passing a column name or expression. In addition, we indicate the window specification that defines to which frames of data this function will apply:

    
    import org.apache.spark.sql.functions.max
    
    val maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)
    
    

    You will notice that this returns a column (or expressions). We can now use this in a DataFrame select statement.Before doing so, though, we will create the purchase quantity rank. To do that we use the dense_rank function to determine which date had the maximum purchase quantity for every customer. We use dense_rank as opposed to rank to avoid gaps in the ranking sequence when there are tied values (or in our case, duplicate rows):

    
    // in Scala
    
    import org.apache.spark.sql.functions.{dense_rank, rank}
    
    val purchaseDenseRank = dense_rank().over(windowSpec)
    
    val purchaseRank = rank().over(windowSpec)
    
    

    Now we can perform a select to view the calculated window values:

    
    // in Scala
    
    import org.apache.spark.sql.functions.col
    
    
    
    dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")
    
      .select(
    
        col("CustomerId"),
    
        col("date"),
    
        col("Quantity"),
    
        purchaseRank.alias("quantityRank"),
    
        purchaseDenseRank.alias("quantityDenseRank"),
    
        maxPurchaseQuantity.alias("maxPurchaseQuantity")).show(100)
    
    

    Grouping Sets

    sometimes we want something a bit more complete—an aggregation across multiple groups. We achieve this by using grouping sets. Grouping sets are a low-level tool for combining sets of aggregations together. They give you the ability to create arbitrary aggregation in their group-by statements.

    
    // in Scala
    
    val dfNoNull = dfWithDate.drop()
    
    dfNoNull.createOrReplaceTempView("dfNoNull")
    
    //传统方法
    
    spark.sql("""SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
    
    GROUP BY customerId, stockCode
    
    ORDER BY CustomerId DESC, stockCode DESC""").show(100)
    
    //Grouping Sets
    
    spark.sql("""SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
    
    GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode))
    
    ORDER BY CustomerId DESC, stockCode DESC""").show()
    
    

    警告:Grouping sets depend on null values for aggregation levels. If you do not filter-out null values, you will get incorrect results. This applies to cubes, rollups, and grouping sets.

    if you also want to include the total number of items, regardless of customer or stock code? With a conventional group-by statement, this would be impossible. But,it’s simple with grouping sets: we simply specify that we would like to aggregate at that level, as well, in our grouping set. This is, effectively, the union of several different groupings together: ?:什么意思

    
    spark.sql("""
    
    SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
    
    GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode),())
    
    ORDER BY CustomerId DESC, stockCode DESC
    
    """).show()
    
    //?: 与传统方法的结果一样
    
    The GROUPING SETS operator is only available in SQL. To perform the same in DataFrames, you use the rollup and cube operators—which allow us to get the same results.
    
    ## Rollups
    
    When we set our grouping keys of multiple columns, Spark looks at those as well as the actual combinations that are visible in the dataset. A rollup is a multidimensional aggregation that performs a variety of group-by style calculations for us.
    
    ```scala
    
    val rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))
    
      .selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")
    
      .orderBy("Date")
    
    rolledUpDF.show()
    
    

    A null in both rollup columns specifies the grand total across both of those columns:

    
    rolledUpDF.where("Country IS NULL").show()
    
    rolledUpDF.where("Date IS NULL").show()
    
    

    Cube

    A cube takes the rollup to a level deeper. Rather than treating elements hierarchically, a cube does the same thing across all dimensions. This means that it won’t just go by date over the entire time period, but also the country.

    
    // in Scala
    
    dfNoNull.cube("Date", "Country").agg(sum(col("Quantity")))
    
      .select("Date", "Country", "sum(Quantity)").orderBy("Date").show()
    
    

    Grouping Metadata

    Sometimes when using cubes and rollups, you want to be able to query the aggregation levels so that you can easily filter them down accordingly. We can do this by using the grouping_id, which gives us a column specifying the level of aggregation that we have in our result set.

    
    // in Scala
    
    import org.apache.spark.sql.functions.{grouping_id, sum, expr}
    
    dfNoNull.cube("customerId", "stockCode").agg(grouping_id(), sum("Quantity"))
    
    .orderBy(expr("grouping_id()").desc)
    
    .show()
    
    

    Pivot

    Pivots make it possible for you to convert a row into a column.

    
    // in Scala
    
    val pivoted = dfWithDate.groupBy("date").pivot("Country").sum()
    
    

    This DataFrame will now have a column for every combination of country, numeric variable, and a column specifying the date.

    
    pivoted.where("date > '2011-12-05'").select("date" ,"`USA_sum(Quantity)`").show()
    
    

    User-Defined Aggregation Functions

    User-defined aggregation functions (UDAFs) are a way for users to define their own aggregation functions based on custom formulae or business rules. You can use UDAFs to compute custom calculations over groups of input data (as opposed to single rows). Spark maintains a single AggregationBuffer to store intermediate results for every group of input data.

    To create a UDAF, you must inherit from the UserDefinedAggregateFunction base class and implement the following methods:

    • inputSchema represents input arguments as a StructType

    • bufferSchema represents intermediate UDAF results as a StructType

    • dataType represents the return DataType

    • deterministic is a Boolean value that specifies whether this UDAF will return the same result for a given input

    • initialize allows you to initialize values of an aggregation buffer

    • update describes how you should update the internal buffer based on a given row

    • merge describes how two aggregation buffers should be merged

    • evaluate will generate the final result of the aggregation

    定义UADF

    
    // in Scala
    
    import org.apache.spark.sql.expressions.MutableAggregationBuffer
    
    import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
    
    import org.apache.spark.sql.Row
    
    import org.apache.spark.sql.types._
    
    class BoolAnd extends UserDefinedAggregateFunction {
    
      def inputSchema: org.apache.spark.sql.types.StructType =
    
        StructType(StructField("value", BooleanType) :: Nil)
    
      def bufferSchema: StructType = StructType(
    
        StructField("result", BooleanType) :: Nil
    
      )
    
      def dataType: DataType = BooleanType
    
      def deterministic: Boolean = true
    
      def initialize(buffer: MutableAggregationBuffer): Unit = {
    
        buffer(0) = true
    
      }
    
      def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    
        buffer(0) = buffer.getAs[Boolean](0) && input.getAs[Boolean](0)
    
      }
    
      def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    
          buffer1(0) = buffer1.getAs[Boolean](0) && buffer2.getAs[Boolean](0)
    
      }
    
      def evaluate(buffer: Row): Any = {
    
        buffer(0)
    
      }
    
    }
    
    

    注册及使用

    
    // in Scala
    
    val ba = new BoolAnd
    
    spark.udf.register("booland", ba)
    
    import org.apache.spark.sql.functions._
    
    spark.range(1)
    
      .selectExpr("explode(array(TRUE, TRUE, TRUE)) as t")
    
      .selectExpr("explode(array(TRUE, FALSE, TRUE)) as f", "t")
    
     .select(ba(col("t")), expr("booland(f)"))
    
      .show()
    
    

    in Spark 2.3, you will also be able to call Scala or Java UDFs and UDAFs by registering the function just as we showed in the UDF.

  • 相关阅读:
    Python解析Yahoo的XML格式的天气预报数据
    如何卸载wineQQ?
    纪念我的第一篇
    hihocoder1062 最近公共祖先·一
    hihocoder1055 刷油漆(树形DP)
    hihocoder1050 树中的最长路径
    hihocoder1049 根据二叉树的先序序列和中序序列求后序序列
    hihocoder1044 状态压缩
    hihocoder1043 完全背包
    hihocoder1038 01背包
  • 原文地址:https://www.cnblogs.com/DataNerd/p/10399778.html
Copyright © 2011-2022 走看看