zoukankan      html  css  js  c++  java
  • Spark中的 aggregate 方法详解

    函数解析:

    1. 参数:(zeroValue: U) (seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)
    2. 作用:aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。
    这个函数最终返回的类型不需要和RDD中元素类型一致。

    首先来看第 1 部分:即上面蓝色加粗的 " (zeroValue: U) " 。这个表示它接受一个任意类型的输入参数,变量名为 zeroValue 。
    这个值就是初值,至于这个初值的作用,姑且不用理会,等到下一小节通过实例来讲解会更明了,在这里只需要记住它是一个 “只使用一次” 的值就好了。

    第 2 部分:我们还可以再把它拆分一下,因为它里面其实有两个参数。笔者认为 Scala 语法在定义多个参数时,辨识度比较弱,不睁大眼睛仔细看,很难确定它到底有几个参数。

    首先是第 1 个参数 " seqOp: (U, T) => U " 它是一个函数类型,以一个输入为任意两个类型 U, T 而输出为 U 类型的函数作为参数。
    这个函数会先被执行。这个参数函数的作用是为每一个分片( slice )中的数据遍历应用一次函数。换句话说就是假设我们的输入数据集( RDD )有 1 个分片,
    则只有一个 seqOp 函数在运行,假设有 3 个分片,则有三个 seqOp 函数在运行。初始值会在每个分区(分片)中当做初始值进行计算。

    另一个参数 " combOp: (U, U) => U " 接受的也是一个函数类型,以输入为任意类型的两个输入参数而输出为一个与输入同类型的值的函数作为参数。
    这个函数会在上面那个函数执行以后再执行。这个参数函数的输入数据来自于第一个参数函数的输出结果,这个函数仅会执行 1 次,它是用来最终聚合结果用的。
    这个函数可以理解为把上面各分区的结果值的输出当做此函数的输入进行聚合操作,当然初始值也会在此处也会再进行一次计算操作。
    (假如上面的初始值为0并且分区为1,则第一个函数计算完成后,整个 aggregate 过程也计算完毕了);

    示例一:创建一个rdd,并计算List集合中的平均值;

    scala> val rdd = List(1,2,3,4,5,6,7,8,9)
    rdd: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9)
    
    scala> rdd.par.aggregate((0,0))(
    (acc,number) => (acc._1 + number, acc._2 + 1),
    (par1,par2) => (par1._1 + par2._1, par1._2 + par2._2)
    )
    res0: (Int, Int) = (45,9)
    
    scala> res0._1 / res0._2
    res1: Int = 5
    
    过程大概这样:
    首先,初始值是(0,0),这个值在后面2步会用到。
    而后,(acc,number) => (acc._1 + number, acc._2 + 1),number便是函数定义中的T,这里便是List中的元素。因此acc._1 + number,acc._2 + 1的过程以下。
    
    1.  0+1,  0+1
    2.  1+2,  1+1
    3.  3+3,  2+1
    4.  6+4,  3+1
    5.  10+5,  4+1
    6.  15+6,  5+1
    7.  21+7,  6+1
    8.  28+8,  7+1
    9.  36+9,  8+1
    结果便是(45,9)。这里演示的是单线程计算过程,实际Spark执行中是分布式计算,可能会把List分红多个分区,假如3个,p1(1,2,3,4),p2(5,6,7,8),p3(9),
    通过计算各分区的的结果(10,4),(26,4),(9,1),这样,执行(par1,par2) =>(par1._1 + par2._1, par1._2 + par2._2)
    就是(10+26+9,4+4+1)即(45,9),再求平均值就简单了。
    

      

    示例二:创建一个rdd,将所有的元素相加;

    (1)创建一个RDD
    scala> var rdd1 = sc.makeRDD(1 to 10,2)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24
    (2)将该RDD所有元素相加得到结果
    scala> rdd.aggregate(0)(_+_,_+_)
    res22: Int = 55
    

      

    示例三:创建一个rdd,将所有的元素相加,并把每个分区的结果+1,聚合的结果也+1 

    scala> var rdd1 = sc.makeRDD(1 to 10,2)  --此处分区为2
    scala> rdd.aggregate(1)(_+_,_+_)
    res22: Int = 58
    
    " seqOp: (U, T) => U " 函数两个分区对初始值各进行+1 ,
    " combOp: (U, U) => U " 聚合函数对上面的分区结果进行相加,然后对初始值再进行+1 ,
    得到结果:58
    

      

    拓展:上面的两个函数可以自定义函数,根据需求自行变更

    然后我们来应用一下 aggregate 方法。

    在使用 aggregate 之前,我们还是先定义两个要给 aggregate 当作输入参数的函数吧。

    首先来定义第 1 个函数,即等下要被当成 seqOp 的形参使用的函数。在上一小节我们知道 seqOp 函数是一个输入类型为 U, T 类型而输出为 U 类型的函数。
    但是在这里,因为我们的 RDD 只包含一个 Int 类型数据,所以这里的 seqOp 的两个输入参数都是 Int 类型的,这是没毛病的哦!然后这个函数的返回类型也为 Int 。
    我们这个函数的作用就是将输入的参数 p1 , p2 求积以后返回。

    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    def pfun1(p1: Int, p2: Int): Int = {
        
        p1 * p2
        
    }
    
    // Exiting paste mode, now interpreting.
    
    pfun1: (p1: Int, p2: Int)Int
    
    scala>
    

      接着是第 2 个函数,直接相加。就不再解释什么了。

    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    def pfun2(p3: Int, p4: Int): Int = {
    
        p3 + p4
        
    }
    
    // Exiting paste mode, now interpreting.
    
    pfun2: (p3: Int, p4: Int)Int
    
    scala> 
    

      使用aggregate方法(下面操作为单个分区):

    scala> val rdd1 = sc.parallelize(1 to 5)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
    scala> rdd1 collectwarning: there was one feature warning; re-run with -feature for detailsres24: Array[Int] = Array(1, 2, 3, 4, 5)
    
    scala> rdd1.aggregate(3)(pfun1, pfun2)
    res25: Int = 363
    
    scala> 
    

      

    参考:

    轻松理解 Spark 的 aggregate 方法

    Spark操做—aggregate、aggregateByKey详解

  • 相关阅读:
    [FAQ] Smart Contract: xxx has not been deployed to detected network (network/artifact mismatch)
    [Contract] Solidity address payable 转换与数组地址
    [Contract] Solidity 变量类型的默认值
    [Contract] Solidity 遍历 mapping 的一种方式
    [Contract] Solidity 判断 mapping 值的存在
    [Contract] public、external, private、internal 在 Solidity 中如何选择
    [FAQ] Member "address" not found or not visible after argument-dependent lookup in address payable.
    [Contract] Truffle 使用流程
    移动端微信应用开发总结(function ajax meta)
    windows sever2003安装Wamp 2.5不成功——VC 11不支持Windows Server 2003和win XP?
  • 原文地址:https://www.cnblogs.com/-courage/p/15207190.html
Copyright © 2011-2022 走看看