zoukankan      html  css  js  c++  java
  • spark-聚合算子aggregatebykey

    spark-聚合算子aggregatebykey

    Aggregate the values of each key, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, as in scala.TraversableOnce. The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U.

    使用给定的聚合函数和中性的“零值”聚合每个键的值。这个函数可以返回与这个RDD V中的值类型不同的结果类型U。

    前一个操作用于合并分区内的值,而后一个操作用于合并分区之间的值。为了避免内存分配,允许这两个函数修改并返回它们的第一个参数,而不是创建一个新的U。

     

      def aggregateByKey[U: ClassTag](zeroValue: U)(
            seqOp: (U, V) => U,
            combOp: (U, U) => U
            ): RDD[(K, U)] = self.withScope {
        aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
      }
    
      def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(
            seqOp: (U, V) => U,
            combOp: (U, U) => U
            ): RDD[(K, U)] = self.withScope {
    
        // Serialize the zero value to a byte array so that we can get a new clone of it on each key
        val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
        val zeroArray = new Array[Byte](zeroBuffer.limit)
        zeroBuffer.get(zeroArray)
    
        lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
        val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
    
        // We will clean the combiner closure later in `combineByKey`
        val cleanedSeqOp = self.context.clean(seqOp)
        combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
          cleanedSeqOp, combOp, partitioner)
      }

      

    def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    
      combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
    }
    

      

    def combineByKeyWithClassTag[C](
        createCombiner: V => C,
        mergeValue: (C, V) => C,
        mergeCombiners: (C, C) => C,
        partitioner: Partitioner,
        mapSideCombine: Boolean = true,
        serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)]{
    	...
      }
    

      

    /**
      * 按key聚合Demo
      */
    object AggregateByKeyDemo {
        def main(args: Array[String]): Unit = {
            val conf = new SparkConf()
            conf.setAppName("wcDemo")
            conf.setMaster("local[4]")
            val sc = new SparkContext(conf)
            val rdd1 = sc.textFile("file:///e:/wc/1.txt" , 3)
            val rdd2 = rdd1.flatMap(_.split(" ")).mapPartitionsWithIndex((idx, it) => {
                var list: List[(String, String)] = Nil
                for (e <- it) {
                    list = (e, e + "_" + idx) :: list
                }
                list.iterator
            })
            rdd2.collect().foreach(println)
            println("=======================")
            val zeroU:String = "[]"
            def seqOp(a:String,b:String) = {
                a + b + " ,"
            }
            def comOp(a:String,b:String) = {
                a + "$" + b
            }
    
            val rdd3 = rdd2.aggregateByKey(zeroU)(seqOp,comOp)
            rdd3.collect().foreach(println)
          
        }
    
    }
    

     

    	  
    (hello,hello_0)		=>[hello_0]hello_0,hello_0,hello_0,		=>[hello_0]hello_0,hello_0,hello_0,$[hello_1]hello_1,hello_1,$[hello_2]hello_2,hello_2,
    (hello,hello_0)
    (hello,hello_0)
    (hello,hello_0)
    
    (hello,hello_1)		=>[hello_1]hello_1,hello_1,
    (hello,hello_1)
    (hello,hello_1)
    
    (hello,hello_2)		=>[hello_2]hello_2,hello_2,
    (hello,hello_2)
    (hello,hello_2)
    
    
    (hello,[]hello_0 ,hello_0 ,hello_0 ,hello_0 ,$[]hello_1 ,hello_1 ,hello_1 ,$[]hello_2 ,hello_2 ,hello_2 ,)
    
    (tom2,tom2_0)
    (world,world_0)
    (tom1,tom1_0)
    (world,world_0)
    (tom7,tom7_1)
    (world,world_1)
    (tom6,tom6_1)
    (world,world_1)
    (tom5,tom5_1)
    (world,world_1)
    (tom10,tom10_2)
    (world,world_2)
    (tom9,tom9_2)
    (world,world_2)
    (tom8,tom8_2)
    (world,world_2)
    

     

    spark PairRDDFunction聚合函数
    ------------------------------
    1.reduceByKey
    V类型不变,有map端合成。
    2.groupByKey
    按照key分组,生成的v是集合,map端不能合成。
    3.aggregateByKey
    可以改变v的类型,map端还可以合成。
    4.combineByKeyWithClassTag
    按照key合成,可以指定是否进行map端合成、任意的combiner创建函数,值合并函数以及合成器合并函数。

     

  • 相关阅读:
    解决:oracle+myBatis ResultMap 类型为 map 时,表字段类型有 Long/Blob/Clob 时报错
    总结:独立开发 jar 包组件——功能主要是支持查询数据库的所有表数据
    解决 iframe 后退不是主页面后退(浏览器 history)问题
    解决访问 jar 包里面的字体报错:OTS parsing error: incorrect file size in WOFF header
    html 如何访问 jar 包里面的静态资源(js、css、字体等)
    css3 实现打字机效果
    js 图形验证码
    input 设置 flex:1不起作用
    vue 样式加scoped不起作用
    node-mongoose开发中常见警告或问题-持续更新
  • 原文地址:https://www.cnblogs.com/wqbin/p/10163569.html
Copyright © 2011-2022 走看看