zoukankan      html  css  js  c++  java
  • Spark操作:Aggregate和AggregateByKey

    1. Aggregate

    Aggregate即聚合操作。直接上代码:

    import org.apache.spark.{SparkConf, SparkContext}
    
    object AggregateTest {
    
      def main(args:Array[String]) = {
    
        // 设置运行环境
        val conf = new SparkConf().setAppName("Aggregate Test").setMaster("spark://master:7077").setJars(Seq("E:\Intellij\Projects\SimpleGraphX\SimpleGraphX.jar"))
        val sc = new SparkContext(conf)
    
        var data = List(2,5,8,1,2,6,9,4,3,5)
        var res = data.par.aggregate((0,0))(
          // seqOp
          (acc, number) => (acc._1+number, acc._2+1),
          // combOp
          (par1, par2) => (par1._1+par2._1, par1._2+par2._2)
        )
    
        println(res)
    
        sc.stop
      }
    
    }

    acc即(0,0),number即data,seqOp将data的值累加到Tuple的第一个元素,将data的个数累加到Tuple的第二个元素。由于没有分区,所以combOp是不起作用的,这个例子里面即使分区了,combOp起作用了,结果也是一样的。

    运行结果:

    (45,10)

    2. AggregateByKey

    AggregateByKey和Aggregate差不多,也是聚合,不过它是根据Key的值来聚合。

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Created by Administrator on 2017/6/13.
      */
    object AggregateByKeyTest {
    
      def main(args:Array[String]) = {
    
        // 设置运行环境
        val conf = new SparkConf().setAppName("AggregateByKey Test").setMaster("spark://master:7077").setJars(Seq("E:\Intellij\Projects\SimpleGraphX\SimpleGraphX.jar"))
        val sc = new SparkContext(conf)
    
        val data = List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8))
        val rdd = sc.parallelize(data)
    
        val res : RDD[(Int,Int)] = rdd.aggregateByKey(0)(
          // seqOp
          math.max(_,_),
          // combOp
          _+_
        )
    
        res.collect.foreach(println)
        sc.stop
      }
    
    }

    根据Key值的不同,可以分为3个组:

    (1)  (1,3),(1,2),(1,4);

    (2)  (2,3);

    (3)  (3,6),(3,8)。

    这3个组分别进行seqOp,也就是(K,V)里面的V和0进行math.max()运算,运算结果和下一个V继续运算,以第一个组为例,运算过程是这样的:

    0, 3 => 3

    3, 2 => 3

    3, 4 => 4

    所以最终结果是(1,4)。combOp是对把各分区的V加起来,由于这里并没有分区,所以实际上是不起作用的。

    运行结果:

    (2,3)
    (1,4)
    (3,8)

    如果生成RDD时分成3个区:

    val rdd = sc.parallelize(data,3)

    运行结果就变成了:

    (3,8)
    (1,7)
    (2,3)

    这是因为一个分区返回(1,3),另一个分区返回(1,4),combOp将这两个V加起来,就得到了(1,7)。

  • 相关阅读:
    Silverlight/Windows8/WPF/WP7/HTML5周学习导读(8月20日8月26日)
    Silverlight/Windows8/WPF/WP7/HTML5周学习导读(9月24日9月30日)
    获取免费Windows Store开发者账户方法
    QOCIDriver: unable to create environment Unable to free Error handle: 2 Unable to free Environment
    C:\workdir\dbManager\lib>c:\Qt\Qt5.9.9\5.9.9\mingw53_32\bin\windeployqt.exe ./db ManagerDll.dll
    QT5.9.932 oracle1032 驱动编译
    sqlplus
    c++const成员函数*
    C++ 函数内静态静态变量
    c++构造/拷贝构造函数初始化变量*
  • 原文地址:https://www.cnblogs.com/mstk/p/7000509.html
Copyright © 2011-2022 走看看