zoukankan      html  css  js  c++  java
  • spark中的combineByKey函数的用法

    一、函数的源码

    /**
       * Simplified version of combineByKeyWithClassTag that hash-partitions the resulting RDD using the
       * existing partitioner/parallelism level. This method is here for backward compatibility. It
       * does not provide combiner classtag information to the shuffle.
       *
       * @see `combineByKeyWithClassTag`
       */
      def combineByKey[C](
          createCombiner: V => C,
          mergeValue: (C, V) => C,
          mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
        combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
      }
     /**
       * Generic function to combine the elements for each key using a custom set of aggregation
       * functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a
       * "combined type" C.
       *
       * Users provide three functions:
       *
       *  - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
       *  - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
       *  - `mergeCombiners`, to combine two C's into a single one.
       *
       * In addition, users can control the partitioning of the output RDD, the serializer that is use
       * for the shuffle, and whether to perform map-side aggregation (if a mapper can produce multiple
       * items with the same key).
       *
       * @note V and C can be different -- for example, one might group an RDD of type (Int, Int) into
       * an RDD of type (Int, List[Int]).
       */
      def combineByKey[C](createCombiner: JFunction[V, C],
          mergeValue: JFunction2[C, V, C],
          mergeCombiners: JFunction2[C, C, C],
          partitioner: Partitioner,
          mapSideCombine: Boolean,
          serializer: Serializer): JavaPairRDD[K, C] = {
          implicit val ctag: ClassTag[C] = fakeClassTag
        fromRDD(rdd.combineByKeyWithClassTag(
          createCombiner,
          mergeValue,
          mergeCombiners,
          partitioner,
          mapSideCombine,
          serializer
        ))
      }

      由源码中看出,该函数中主要包含的参数:

        createCombiner:V=>C

        mergeValue:(C,V)=>C

        mergeCombiners:(C,C)=>R

        partitioner:Partitioner  

        mapSideCombine:Boolean=true

        serializer:Serializer=null

      这里的每一个参数都对分别对应这聚合操作的各个阶段

    二、参数详解:  

        1、createCombiner:V=>C  分组内的创建组合的函数。通俗点将就是对读进来的数据进行初始化,其把当前的值作为参数,可以对该值做一些转换操作,转换为我们想要的数据格式

        2、mergeValue:(C,V)=>C  该函数主要是分区内的合并函数,作用在每一个分区内部。其功能主要是将V合并到之前(createCombiner)的元素C上,注意,这里的C指的是上一函数转换之后的数据格式,而这里的V指的是原始数据格式(上一函数为转换之前的)

        3、mergeCombiners:(C,C)=>R  该函数主要是进行多分取合并,此时是将两个C合并为一个C,例如两个C:(Int)进行相加之后得到一个R:(Int)

        4、partitioner:自定义分区数,默认是hashPartitioner

        5、mapSideCombine:Boolean=true  该参数是设置是否在map端进行combine操作

    三、函数工作流程

        首先明确该函数遍历的数据是(k,v)对的rdd数据

        1、combinByKey会遍历rdd中每一个(k,v)数据对,对该数据对中的k进行判断,判断该(k,v)对中的k是否在之前出现过,如果是第一次出现,则调用createCombiner函数,对该k对应的v进行初始化操作(可以做一些转换操作),也就是创建该k对应的累加其的初始值

        2、如果这是一个在处理当前分区之前遇到的k,会调用mergeCombiners函数,把该k对应的累加器的value与这个新的value进行合并操作

    四、实例解释  

    实例1:

    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    
    import scala.collection.mutable
    
    object test {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("testCombineByKey").setMaster("local[2]")
        val ssc = new SparkSession
        .Builder()
          .appName("test")
          .master("local[2]")
          .config(conf)
          .getOrCreate()
        val sc = ssc.sparkContext
        sc.setLogLevel("error")
    
        val initialScores = Array((("1", "011"), 1), (("1", "012"), 1), (("2", "011"), 1), (("2", "013"), 1), (("2", "014"), 1))
        val d1 = sc.parallelize(initialScores)
      d1.map(x => (x._1._1, (x._1._2, 1)))
        .combineByKey(
          (v: (String, Int)) => (v: (String, Int)),
          (acc: (String, Int), v: (String, Int)) => (v._1+":"+acc._1,acc._2+v._2),
                     (p1:(String,Int),p2:(String,Int)) => (p1._1 + ":" + p2._1,p1._2 + p2._2)
        ).collect().foreach(println)
      } 
    }

      从map函数开始说起:

      1、map端将数据格式化为:(,(String,Int))->("1",("011",1))

      2、接着combineByKye函数会逐个的读取map之后的每一个k,v数据对,当读取到第一个("1",("011",1)),此时回判断,“1”这个是否在之前的出现过,如果该k是第一次出现,则会调用createCombiner函数,经过转换,该实例中是对该value值么有做任何的改变原样返回,此时这个该value对应的key回被comgbineByKey函数创建一个累加其记录

      3、当读取到第二个数据("1",("012",,1))的时候,回对“1”这个key进行一个判断,发现其在之前出现过,此时怎直接调用第二个函数,mergeValues函数,对应到该实例中,acc即为上一函数产生的结果,即("1",("011",1)),v即是新读进来的数据("1",("012",1))

      4、此时执行该函数:(acc: (String, Int), v: (String, Int)) => (v._1+":"+acc._1,acc._2+v._2)

        将新v中的第一个字符串与acc中的第一个字符串进行连接,v中的第二个值,与acc中的第二个值进行相加操作

      5、当所有的分区内的数据计算完成之后,开始调用mergeCombiners函数,对每个分区的数据进行合并,该实例中p1和p2分别对应的是不同分区的计算结果,所以二者的数据格式是完全相同的,此时将第一个分区中的字符串相连接,第二个字符相加得到最终结果

      6、最终输出结果为:

         (2,(014:013:011,3))
         (1,(012:011,2))

    实例2

    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    
    import scala.collection.mutable
    
    object test {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("testCombineByKey").setMaster("local[2]")
        val ssc = new SparkSession
        .Builder()
          .appName("test")
          .master("local[2]")
          .config(conf)
          .getOrCreate()
        val sc = ssc.sparkContext
        sc.setLogLevel("error")
    
        val initialScores = Array((("1", "011"), 1), (("1", "012"), 1), (("2", "011"), 1), (("2", "013"), 1), (("2", "014"), 1))
        val d1 = sc.parallelize(initialScores)
        d1.map(x => (x._1._1, (x._1._2, 1))) 
        //("1",("011",1)) .combineByKey( (v: (String, Int)) => (mutable.Set[String](v._1), (mutable.Set[Int](v._2))), (acc: (mutable.Set[String], mutable.Set[Int]), v: (String, Int)) => (acc._1 + v._1, acc._2 + v._2), (acc1: (mutable.Set[String], mutable.Set[Int]), acc2: (mutable.Set[String], mutable.Set[Int])) => (acc1._1 ++ acc2._1, acc1._2 ++ acc2._2) ).collect().foreach(println) } }
      

     

    运行结果为:

    (2,(Set(013, 014, 011),Set(1)))
    (1,(Set(011, 012),Set(1)))

  • 相关阅读:
    Log4net.config
    ASCII 转换帮助类
    维吉尼亚加密与解密
    nginx配置说明
    验证码
    css 设置下拉菜单
    输出一张自定义文字的图片
    mvc 自定义分页控件
    【模块化】export与export default在一个文件里共存,引入时需注意的地方
    【uniapp】兼容刘海屏底部安全区
  • 原文地址:https://www.cnblogs.com/Gxiaobai/p/10056482.html
Copyright © 2011-2022 走看看