zoukankan      html  css  js  c++  java
  • 理解Spark RDD中的aggregate函数(转)

    针对Spark的RDD,API中有一个aggregate函数,本人理解起来费了很大劲,明白之后,mark一下,供以后参考。

    首先,Spark文档中aggregate函数定义如下

    def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U
    Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.   seqOp操作会聚合各分区中的元素,然后combOp操作把所有分区的聚合结果再次聚合,两个操作的初始值都是zeroValue.   seqOp的操作是遍历分区中的所有元素(T),第一个T跟zeroValue做操作,结果再作为与第二个T做操作的zeroValue,直到遍历完整个分区。combOp操作是把各分区聚合的结果,再聚合。aggregate函数返回一个跟RDD不同类型的值。因此,需要一个操作seqOp来把分区中的元素T合并成一个U,另外一个操作combOp把所有U聚合。

    zeroValue
    the initial value for the accumulated result of each partition for the seqOp operator, and also the initial value for the combine results from different partitions for the combOp operator - this will typically be the neutral element (e.g. Nil for list concatenation or 0 for summation)

    seqOp
    an operator used to accumulate results within a partition

    combOp
    an associative operator used to combine results from different partitions

    举个例子。假如List(1,2,3,4,5,6,7,8,9,10),对List求平均数,使用aggregate可以这样操作。
    C:WindowsSystem32>scala
    Welcome to Scala 2.11.8 (Java HotSpot(TM) Client VM, Java 1.8.0_91).
    Type in expressions for evaluation. Or try :help.

    scala> val rdd = List(1,2,3,4,5,6,7,8,9)
    rdd: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9)

    scala> rdd.par.aggregate((0,0))(

    (acc,number) => (acc._1 + number, acc._2 + 1),

    (par1,par2) => (par1._1 + par2._1, par1._2 + par2._2)

    )
    res0: (Int, Int) = (45,9)

    scala> res0._1 / res0._2
    res1: Int = 5

    过程大概这样:

    首先,初始值是(0,0),这个值在后面2步会用到。

    然后,(acc,number) => (acc._1 + number, acc._2 + 1),number即是函数定义中的T,这里即是List中的元素。所以acc._1 + number, acc._2 + 1的过程如下。

    1.   0+1,  0+1

    2.  1+2,  1+1

    3.  3+3,  2+1

    4.  6+4,  3+1

    5.  10+5,  4+1

    6.  15+6,  5+1

    7.  21+7,  6+1

    8.  28+8,  7+1

    9.  36+9,  8+1

    结果即是(45,9)。这里演示的是单线程计算过程,实际Spark执行中是分布式计算,可能会把List分成多个分区,假如3个,p1(1,2,3,4),p2(5,6,7,8),p3(9),经过计算各分区的的结果(10,4),(26,4),(9,1),这样,执行(par1,par2) => (par1._1 + par2._1, par1._2 + par2._2)就是(10+26+9,4+4+1)即(45,9).再求平均值就简单了。
    ————————————————
    版权声明:本文为CSDN博主「飞鸿踏雪Ben归来」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/qingyang0320/article/details/51603243

  • 相关阅读:
    如何在Windows Forms应用程序中实现可组装式(Composite)的架构以及松耦合事件机制
    SQL Server 2008中的CDC(Change Data Capture)功能使用及释疑
    【VSTO】Office开发中遇到的兼容性检查问题
    SQL Server 2008 R2的StreamInsight 【文章转载】
    WCF技术的不同应用场景及其实现分析
    如何利用Interception简化MVVM中的Model和ViewModel的设计
    有关在SharePoint Server中Infopath表单无法呈现的问题及解决方案
    再谈谈ADO.NET Data Service 数据格式(xml和json)
    自定义Domain Service时遇到实体不能更新的问题及其解决方案
    如何在RIA Service中启用身份验证
  • 原文地址:https://www.cnblogs.com/sandea/p/12020955.html
Copyright © 2011-2022 走看看