zoukankan      html  css  js  c++  java
  • spark中的聚合操作和分组操作

    聚合操作

    注意:任何的聚合操作都有默认的分组,聚合是在分组的基础上进行的。比如,对整体进行求和,那么分组就是整体。所以,在做聚合操作之前,一定要明确是在哪个分组上进行聚合操作
    注意:聚合操作,本质上是一个多对一(一对一是多对一的特殊情况)的操作。特别注意的是这个’一‘,可以是一个值(mean, sum等),同样也可以是一个对象(list, set等对象)

    聚合函数

    除了DataFrame的某些操作或者通过.stat访问方法,所有的聚合操作都是以函数的方式出现的。大多数聚合函数可以在org.apache.spark.sql.functions中找到

    • count函数
      使用的方向:
      • 对指定列进行计数
      • 使用count(*)或者count(1)对所有列进行计数
    • countDistinct(统计不同的值得数量)
    • approx_count_distinct
      对统计的精度要求不高使用它,注意:approx_count_distinct带了另一个参数,该参数指定可容忍的最大误差。本例中我们指定了一个相当大的误差率,因此得到的答案与正确值差距很大,但执行速度更快,比countDistinct函数执行耗时更少。档处理更大的数据集的时候,这种提升会更加明显。

    聚合输出复杂类型

    spark的聚合还可以将某列上的数值聚合到一个list中,或者将唯一值聚合到set集合中。
    案例:将国家列直接生成list列和set列

        val path="/Volumes/Data/BigData_code/data/retail-data/all/*.csv"
        //读取数据
        val df = spark.read.format("csv").option("header", "true").option("inferSchema", "true")
          .load(path).coalesce(5)
        df.cache()
        df.createOrReplaceTempView("dfTable")
        df.show()
        //将Country聚合成set列和list列
        df.agg(collect_set("Country").as("CountrySet"), collect_list("Country").as("CountryList")).show()
    

    分组操作

    • 使用表达式分组
    • 使用Map进行分组
        //使用表达式分组
        df.groupBy("InvoiceNo").agg(
          count("Quantity").as("quan"),     //使用函数方式
          expr("count(Quantity)")     //使用字符串表达式
        ).show()
        //使用Map进行分组
        df.groupBy("InvoiceNo").agg("Quantity"->"count", "Quantity"->"stddev_pop").show()
      
    • window函数
      window函数的使用,请看这篇博客:https://blog.csdn.net/weixin_38653290/article/details/83962789

    分组集---(挖个坑)P133

    用户自定义的聚合函数

    使用UDAF来计算输入数据组(与单行相对)的自定义计算。
    若要创建UDAF,必须继承UserDefinedAggregateFunction基类并实现以下方法:

    • inputSchema用于指定输入参数,输入参数类型为StructType
    • bufferSchema用于指定UDAF中间结果,中间结果类型为StructType。
    • dataType用于指定返回结果,返回结果的类型为DataType。
    • deterministic是一个布尔值,它指定此UDAF对于某个输入是否会返回相同的结果。
    • initialize初始化聚合缓冲区的初始值
    • update描述应如何根据给定行更新内部缓冲区。
    • merge描述应如何合并两个缓冲区
    • evaluate将生成聚合最终结果
      例子:实现自定义聚合函数BoolAnd,它将返回所有行是否为true
    class BoolAnd extends UserDefinedAggregateFunction{
      //指定输入参数
      override def inputSchema: StructType = StructType(
        StructField("Value", BooleanType)::Nil
      )
      //用于指定UDAF中间结果,中间结果使用StructType
      override def bufferSchema: StructType = StructType(
        StructField("value", BooleanType)::Nil
      )
      //用于指定返回结果,返回结果为DataType
      override def dataType: DataType = BooleanType
      //此UDAF对某个输入是否会返回相同的结果
      override def deterministic: Boolean = true
      //初始化聚合缓冲区的初始值
      override def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0)=true
      }
      //描述如何根据给定行更新内部缓冲区
      override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        buffer(0)=buffer.getAs[Boolean](0)&&input.getAs[Boolean](0)
      }
      //描述如何聚合两个内部缓冲区
      override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        buffer1(0)=buffer1.getAs[Boolean](0) && buffer2.getAs[Boolean](0)
      }
      //生成聚合的最终结果
      override def evaluate(buffer: Row): Any = {
        buffer(0)
      }
    }
    

    实例化BoolAnd类,并将其注册为一个函数:

        //准备数据
        val df = spark.range(1).selectExpr("explode(array(TRUE, TRUE, TRUE)) as t")
          .selectExpr("explode(array(TRUE, FALSE, TRUE)) as f", "t")
        df.show()
        //实例化类,注册为udaf
        val ba = new BoolAnd
        spark.udf.register("booland", ba)
        df.select(ba(col("t")), expr("booland(f)")).show()
    
  • 相关阅读:
    tomcat8.5.57源码阅读笔记2
    tomcat8.5.57源码阅读笔记1
    KVM openstack
    爬虫进阶版
    react 之setChild子组件传值父组件
    Linux找死锁、cpu100%
    Java定时任务
    Java工具类HttpUtil
    Java后台远程下载url文件并远程上传文件
    jQuery上传文件
  • 原文地址:https://www.cnblogs.com/ALINGMAOMAO/p/14452057.html
Copyright © 2011-2022 走看看