zoukankan      html  css  js  c++  java
  • Spark聚合操作:combineByKey()

    Spark中对键值对RDD(pairRDD)基于键的聚合函数中,都是通过combineByKey()实现的。

    它可以让用户返回与输入数据类型不同的返回值(可以自己配置返回的参数,返回的类型)

    首先理解:combineByKey是一个聚合函数,实际使用场景比如,对2个同学的3门考试科目成绩,分别求出他们的平均值。

    (也就是对3门考试成绩进行聚合,用一个平均数来表示)

    combineByKey是通过3个内部函数来解决这个问题的:

    具体处理过程为:遍历分区中的所有元素,因此每一个元素的键要么没有遇到过,要么就和之前的键相等。

    它的参数形式为:combineByKey(1.createCombiner,2.mergeValue,3.mergeCombiners,4.partioner)

    比如,我有一个数组{1,2,1,2,4}  

    具体流程为:第一次遇到1,调用createCombiner()函数。

    2.第一次遇到2,调用createCombiner()函数。

    3.第二次遇到1,调用mergeValue()函数。

    4.第二次遇到2,调用mergeValue()函数。

    5.第一次遇到4,调用mergeValue()函数。

    接下来解释每一个函数的作用

    1.createCombiner():在遍历过程中,遇到新的键,就会调用createCombiner()函数。这个过程会发生在每一个分区内,因为RDD中有不同的分区,也就有同一个键调用多次createCombiner的情况。

    2.mergeValue() 遇到已经重复的键,调用mergeValue()函数。

    3.mergeCombiners() 如果有2个或者更多的分区,会把分区的结果合并。

    4.pationer  分区函数()

    举例:

    准备数据:

    val scores =sc.parallelize(Array(
    ("jack",89.0),
    ("jack",82.0),
    ("jack",92.0),
    ("tom",88.0),
    ("tom",89.0),
    ("tom",98.0)
    ))
    

      数据为jack和tom的3门科目成绩,要对jack和tom的平均成绩进行输出。

    1.遍历过程中,统计课程的数目,同时计算总分。

    val score2=scores.combineByKey(x =>(1,x) ,
    (c1:(Int,Double),newScore)=>(c1._1+1,c1._2+newScore),
    (c1:(Int,Double),c2:(Int,Double))=>(c1._1+c2._1,c1._2+c2._2))

    详解:

    x =>(1,x)   将scores的value转化为(1,value)的格式
    (c1:(Int,Double),newScore)=>(c1._1+1,c1._2+newScore)  遇到重复的key:我们对value的处理过程为:
    之前计算的结果定义为newScore,对c1:(c1._1,c2._2)处理过程为:(c1._1+1,c2._2+newScore)  
    实际意义为:再次遍历到jack时,我们将科目数量+1,将统计的总分再加上遍历到的分数。
    (c1:(Int,Double),c2:(Int,Double))=>(c1._1+c2._1,c1._2+c2._2)) 对2个不同的分区c1,c2(这2个分区,他的键相同,都是Jack)
    最后我们将不同分区的结果相加。
    比如我们还有另一个分区("jack",45) 代表c2。我们要将Jack的科目数+1,总分+45. 获得最终结果

    统计得到的结果:得到姓名:科目+总分

    scala> score2.foreach(println)
    (tom,(3,275.0))
    (jack,(3,263.0))
    

      

    2.求平均值:

    val average=score2.map{case(name, (num,score) )=>(name,score/num) }
    结果: average.foreach(println)
    (tom,91.66666666666667)
    (jack,87.66666666666667)
  • 相关阅读:
    chrome Network 过滤和高级过滤
    python3 在webelement对象里面获取元素路径的方法
    Robot frawork关键字使用报错原因
    robotframework-autoitlibrary离线安装
    网络基础之网络协议篇
    eclipse查看jar包源代码
    对链接服务器进行查询
    数据库还原失败System.Data.SqlClient.SqlError: 无法执行 BACKUP LOG,因为当前没有数据库备份
    sqlserver创建链接服务器
    5.0jemter(英文版)录制脚本,进行压力测试
  • 原文地址:https://www.cnblogs.com/patatoforsyj/p/10584368.html
Copyright © 2011-2022 走看看