zoukankan      html  css  js  c++  java
  • Spark操作:Aggregate和AggregateByKey

    1. Aggregate

    Aggregate即聚合操作。直接上代码:

    import org.apache.spark.{SparkConf, SparkContext}
    
    object AggregateTest {
    
      def main(args:Array[String]) = {
    
        // 设置运行环境
        val conf = new SparkConf().setAppName("Aggregate Test").setMaster("spark://master:7077").setJars(Seq("E:\Intellij\Projects\SimpleGraphX\SimpleGraphX.jar"))
        val sc = new SparkContext(conf)
    
        var data = List(2,5,8,1,2,6,9,4,3,5)
        var res = data.par.aggregate((0,0))(
          // seqOp
          (acc, number) => (acc._1+number, acc._2+1),
          // combOp
          (par1, par2) => (par1._1+par2._1, par1._2+par2._2)
        )
    
        println(res)
    
        sc.stop
      }
    
    }

    acc即(0,0),number即data,seqOp将data的值累加到Tuple的第一个元素,将data的个数累加到Tuple的第二个元素。由于没有分区,所以combOp是不起作用的,这个例子里面即使分区了,combOp起作用了,结果也是一样的。

    运行结果:

    (45,10)

    2. AggregateByKey

    AggregateByKey和Aggregate差不多,也是聚合,不过它是根据Key的值来聚合。

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Created by Administrator on 2017/6/13.
      */
    object AggregateByKeyTest {
    
      def main(args:Array[String]) = {
    
        // 设置运行环境
        val conf = new SparkConf().setAppName("AggregateByKey Test").setMaster("spark://master:7077").setJars(Seq("E:\Intellij\Projects\SimpleGraphX\SimpleGraphX.jar"))
        val sc = new SparkContext(conf)
    
        val data = List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8))
        val rdd = sc.parallelize(data)
    
        val res : RDD[(Int,Int)] = rdd.aggregateByKey(0)(
          // seqOp
          math.max(_,_),
          // combOp
          _+_
        )
    
        res.collect.foreach(println)
        sc.stop
      }
    
    }

    根据Key值的不同,可以分为3个组:

    (1)  (1,3),(1,2),(1,4);

    (2)  (2,3);

    (3)  (3,6),(3,8)。

    这3个组分别进行seqOp,也就是(K,V)里面的V和0进行math.max()运算,运算结果和下一个V继续运算,以第一个组为例,运算过程是这样的:

    0, 3 => 3

    3, 2 => 3

    3, 4 => 4

    所以最终结果是(1,4)。combOp是对把各分区的V加起来,由于这里并没有分区,所以实际上是不起作用的。

    运行结果:

    (2,3)
    (1,4)
    (3,8)

    如果生成RDD时分成3个区:

    val rdd = sc.parallelize(data,3)

    运行结果就变成了:

    (3,8)
    (1,7)
    (2,3)

    这是因为一个分区返回(1,3),另一个分区返回(1,4),combOp将这两个V加起来,就得到了(1,7)。

  • 相关阅读:
    登陆界面
    信号和槽
    线程同步
    java script简介
    css粘性定位sticky的使用
    vue中使用qrcodejs2生成二维码
    webpack基本使用
    总结一些h5出现的问题及解决方案
    srcset属性配合w宽度描述符配合sizes属性
    vw实现页面布局
  • 原文地址:https://www.cnblogs.com/mstk/p/7000509.html
Copyright © 2011-2022 走看看