zoukankan      html  css  js  c++  java
  • Spark RDD aggregateByKey

    aggregateByKey 这个RDD有点繁琐,整理一下使用示例,供参考

    直接上代码

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkContext, SparkConf}
    
    /**
      * Created by Edward on 2016/10/27.
      */
    object AggregateByKey {
      def main(args: Array[String]) {
        val sparkConf: SparkConf = new SparkConf().setAppName("AggregateByKey")
          .setMaster("local")
        val sc: SparkContext = new SparkContext(sparkConf)
    
        val data = List((1, 3), (1, 2), (1, 4), (2, 3))
        var rdd = sc.parallelize(data,2)//数据拆分成两个分区
    
        //合并在不同partition中的值,a,b的数据类型为zeroValue的数据类型
        def comb(a: String, b: String): String = {
          println("comb: " + a + "	 " + b)
          a + b
        }
        //合并在同一个partition中的值, a的数据类型为zeroValue的数据类型,b的数据类型为原value的数据类型
        def seq(a: String, b: Int): String = {
          println("seq: " + a + "	 " + b)
          a + b
        }
    
        rdd.foreach(println)
        
    //
    zeroValue 中立值,定义返回value的类型,并参与运算 //seqOp 用来在一个partition中合并值的 //comb 用来在不同partition中合并值的 val aggregateByKeyRDD: RDD[(Int, String)] = rdd.aggregateByKey("100")(seq,comb) //打印输出 aggregateByKeyRDD.foreach(println) sc.stop() } }

    输出结果说明:

     /*
    将数据拆分成两个分区
    
    //分区一数据
    (1,3)
    (1,2)
    //分区二数据
    (1,4)
    (2,3)
    
    //分区一相同key的数据进行合并
    seq: 100     3   //(1,3)开始和中立值进行合并  合并结果为 1003
    seq: 1003     2   //(1,2)再次合并 结果为 10032
    
    //分区二相同key的数据进行合并
    seq: 100     4  //(1,4) 开始和中立值进行合并 1004
    seq: 100     3  //(2,3) 开始和中立值进行合并 1003
    
    将两个分区的结果进行合并
    //key为2的,只在一个分区存在,不需要合并 (2,1003)
    (2,1003)
    
    //key为1的, 在两个分区存在,并且数据类型一致,合并
    comb: 10032     1004
    (1,100321004)
    
    * */

    参考代码及下面的说明进行理解 

    官网的说明

    aggregateByKey(zeroValue)(seqOpcombOp, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

    源码中函数的说明 

    /**
    * Aggregate the values of each key, using given combine functions and a neutral "zero value".
    * This function can return a different result type, U, than the type of the values in this RDD,
    * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
    * as in scala.TraversableOnce. The former operation is used for merging values within a
    * partition, and the latter is used for merging values between partitions. To avoid memory
    * allocation, both of these functions are allowed to modify and return their first argument
    * instead of creating a new U.
    */
  • 相关阅读:
    【笔记】进化型开发方法
    错误注入学习笔记
    【C/C++】关于编译错误 "error C2146: syntax error : missing ';' before identifier 'xxx'"
    查找进程加载到内存中的EntryPoint
    devepxress qtp 点击子菜单
    RijndaelManaged 自定义key和iv
    sql server transaction
    使用gzip压缩字符串
    tsql 与时间(周)相关的一些操作
    excel 合并单元格
  • 原文地址:https://www.cnblogs.com/one--way/p/6006296.html
Copyright © 2011-2022 走看看