zoukankan      html  css  js  c++  java
  • combineByKey

     def test66: Unit = {
        val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))
        val conf = new SparkConf().setAppName("wc").setMaster("local[2]")
        val sc = new SparkContext(conf)
    /*
    val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
    val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
    val c = b.zip(a)
    val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
    d.collect
    res16: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))
         */
        val resrdd0 = sc.makeRDD(initialScores).map(t=>t).combineByKey(List(_),(x:List[Double],y:Double)=>y::x,(x:List[Double],y:List[Double])=>x:::y)
          .map(t=>(t._1,t._2.sum/t._2.size))
        println(resrdd0.collect().toBuffer)
    
    type MVType = (Int, Double)
    val resrdd = sc.makeRDD(initialScores)
      .combineByKey(score => (1, score),(x:MVType,y:Double)=>{(x._1+1,x._2+y)},(x:MVType,y:MVType)=>{(x._1+y._1,x._2+y._2)})
      .map(t=>(t._1,t._2._2/t._2._1))
        println(resrdd.collect().toBuffer)
    
        //reducebykey 的缺陷可以用前后两次map来规避
        val resrdd2 = sc.makeRDD(initialScores).map(t=>(t._1,(List(t._2)))).reduceByKey(_:::_).map(t=>(t._1,t._2.sum/t._2.size))
        println(resrdd2.collect().toBuffer)
      }
    
    }
    Spark函数讲解:combineByKey
     Spark  2015-03-19 08:03:47 16623  0评论 下载为PDF
      使用用户设置好的聚合函数对每个Key中的Value进行组合(combine)。可以将输入类型为RDD[(K, V)]转成成RDD[(K, C)]。
    
    函数原型
    
    def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, 
        mergeCombiners: (C, C) => C) : RDD[(K, C)]
    def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, 
        mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
    def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, 
        mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine:
        Boolean = true, serializer: Serializer = null): RDD[(K, C)]
      第一个和第二个函数都是基于第三个函数实现的,使用的是HashPartitioner,Serializer为null。而第三个函数我们可以指定分区,
    如果需要使用Serializer的话也可以指定。combineByKey函数比较重要,我们熟悉地诸如aggregateByKey、foldByKey、reduceByKey等函数都是基于该函数实现的。默认情况会在Map端进行组合操作。
  • 相关阅读:
    hdu 5007 水题 (2014西安网赛A题)
    hdu 1698 线段树(成段替换 区间求和)
    poj 3468 线段树 成段增减 区间求和
    hdu 2795 公告板 (单点最值)
    UVaLive 6833 Miscalculation (表达式计算)
    UVaLive 6832 Bit String Reordering (模拟)
    CodeForces 124C Prime Permutation (数论+贪心)
    SPOJ BALNUM (数位DP)
    CodeForces 628D Magic Numbers (数位DP)
    POJ 3252 Round Numbers (数位DP)
  • 原文地址:https://www.cnblogs.com/rocky-AGE-24/p/7496180.html
Copyright © 2011-2022 走看看