zoukankan      html  css  js  c++  java
  • spark算子:combineByKey

    假设我们有一组个人信息,我们针对人的性别进行分组统计,并进行统计每个分组中的记录数。

    scala> val people = List(("male", "Mobin"), ("male", "Kpop"), ("female", "Lucy"), ("male", "Lufei"), ("female", "Amy"))
          people: List[(String, String)] = List((male,Mobin), (male,Kpop), (female,Lucy), (male,Lufei), (female,Amy))
    
    scala> val rdd = sc.parallelize(people)
          rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:23
    
    scala> val combinByKeyRDD = rdd.combineByKey(
               |   (x: String) => (List(x), 1),
               |   (peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 + 1),
               |   (sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2))
    combinByKeyRDD: org.apache.spark.rdd.RDD[(String, (List[String], Int))] = ShuffledRDD[1] at combineByKey at <console>:25
    
    scala> combinByKeyRDD.foreach(println)
          (female,(List(Lucy, Amy),2))
          (male,(List(Mobin, Kpop, Lufei),3))
    scala>

    输出步骤:

    Partition1:
    K="male"  -->  ("male","Mobin")  --> createCombiner("Mobin") =>  peo1 = (  List("Mobin") , 1 )
    K="male"  -->  ("male","Kpop")  --> mergeValue(peo1,"Kpop") =>  peo2 = (  "Kpop"  ::  peo1_1 , 1 + 1 )    //Key相同调用mergeValue函数对值进行合并
    K="female"  -->  ("female","Lucy")  --> createCombiner("Lucy") =>  peo3 = (  List("Lucy") , 1 )
    
    Partition2:
    K="male"  -->  ("male","Lufei")  --> createCombiner("Lufei") =>  peo4 = (  List("Lufei") , 1 )
    K="female"  -->  ("female","Amy")  --> createCombiner("Amy") =>  peo5 = (  List("Amy") , 1 )
    
    Merger Partition:
    K="male" --> mergeCombiners(peo2,peo4) => (List(Lufei,Kpop,Mobin))
    K="female" --> mergeCombiners(peo3,peo5) => (List(Amy,Lucy))

    上边的信息中,个人信息中只有一个值,如果value是元组的话,需要定义出一个type:

    scala>       val people = List(("male", ("Mobin",89)),("male", ("Kpop",98)),("female", ("Lucy",99)),("male", ("Lufei",77)),("female", ("Amy",97)))
    scala>       val rdd = sc.parallelize(people)
    rdd: org.apache.spark.rdd.RDD[(String, (String, Int))] = ParallelCollectionRDD[2] at parallelize at <console>:23
    
    scala>   type MVType = (String, Int)
    defined type alias MVType
    
    scala>       val combinByKeyRDD = rdd.combineByKey(
         |         (x: MVType) => (List(x), 1),
         |         (peo: (List[MVType], Int), x:MVType) => (x :: peo._1, peo._2 + 1),
         |         (sex1: (List[MVType], Int), sex2: (List[MVType], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2))
    combinByKeyRDD: org.apache.spark.rdd.RDD[(String, (List[(String, Int)], Int))] = ShuffledRDD[3] at combineByKey at <console>:27
    
    scala>    combinByKeyRDD.foreach(println)
    (male,(List((Mobin,89), (Kpop,98), (Lufei,77)),3))
    (female,(List((Lucy,99), (Amy,97)),2))
  • 相关阅读:
    【css】如何实现响应式布局
    【PHP】foreach语法
    【css】cursor鼠标指针光标样式知识整理
    【JavaScript】修改图片src属性切换图片
    【PHP】PHP中的排序函数sort、asort、rsort、krsort、ksort区别分析
    【PHP】常用的PHP正则表达式收集整理
    【Mysql】mysql中bigint、int、mediumint、smallint 和 tinyint的取值范围
    js获取url参数的方法
    SQL Server 2008 geometry 数据类型
    SQL Server 存储过程之基础知识(转)
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/7806277.html
Copyright © 2011-2022 走看看