zoukankan      html  css  js  c++  java
  • Spark计算均值

    作者:Syn良子 出处:http://www.cnblogs.com/cssdongl 转载请注明出处

    用spark来快速计算分组的平均值,写法很便捷,话不多说上代码

    object ColumnValueAvg extends App {
      /**
        * ID,Name,ADDRESS,AGE
        * 001,zhangsan,chaoyang,20
        * 002,zhangsa,chaoyang,27
        * 003,zhangjie,chaoyang,35
        * 004,lisi,haidian,24
        * 005,lier,haidian,40
        * 006,wangwu,chaoyang,90
        * 007,wangchao,haidian,80
        */
      val conf = new SparkConf().setAppName("test column value sum and avg").setMaster("local[1]")
      val sc = new SparkContext(conf)
    
      val textRdd = sc.textFile(args(0))
    
      //be careful the toInt here is necessary ,if no cast ,then it will be age string append
      val addressAgeMap = textRdd.map(x => (x.split(",")(2), x.split(",")(3).toInt))
    
      val sumAgeResult = addressAgeMap.reduceByKey(_ + _).collect().foreach(println)
    
      val avgAgeResult = addressAgeMap.combineByKey(
        (v) => (v, 1),
        (accu: (Int, Int), v) => (accu._1 + v, accu._2 + 1),
        (accu1: (Int, Int), accu2: (Int, Int)) => (accu1._1 + accu2._1, accu1._2 + accu2._2)
      ).mapValues(x => (x._1 / x._2).toDouble).collect().foreach(println)
    
      println("Sum and Avg calculate successfuly")
    
      sc.stop()
    
    }

    用textFile读取数据后,以address进行分组来求age的平均值,这里用combineByKey来计算,这是一个抽象层次很高的函数.稍微总结一下自己的理解

    查看源代码会发现combineByKey定义如下

    def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
        : RDD[(K, C)] = {
        combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
      }

    combineByKey函数需要传递三个函数做为参数,分别为createCombiner、mergeValue、mergeCombiner,需要理解这三个函数的意义

    结合数据来讲的话,combineByKey默认按照key来进行元素的combine,这里三个参数都是对value的一些操作

    1>第一个参数createCombiner,如代码中定义的是 : (v) => (v, 1)

    这里是创建了一个combiner,作用是当遍历rdd的分区时,遇到第一次出现的key值,那么生成一个(v,1)的combiner,比如这里key为address,当遇到第一个

    chaoyang,20 的时候,(v,1)中的v就是age的值20,1是address出现的次数
     
    2>第2个参数是mergeValue,顾名思义就是合并value,如代码中定义的是:(accu: (Int, Int), v) => (accu._1 + v, accu._2 + 1)
    这里的作用是当处理当前分区时,遇到已经出现过的key,那么合并combiner中的value,注意这里accu: (Int, Int)对应第一个参数中出现的combiner,即(v,1),注意类型要一致
    那么(accu._1 + v, accu._2 + 1)就很好理解了,accu._1即使需要合并的age的值,而acc._2是需要合并的key值出现的次数,出现一次即加1
     
    3>第三个参数是mergeCombiners,用来合并各个分区上的累加器,因为各个分区分别运行了前2个函数后需要最后合并分区结果.
     
    ok,运行代码,结果如下,分别按照address来计算出age的平均值
     
    (haidian,48.0)
    (chaoyang,43.0)
     
    由于combineByKey抽象程度很高,可以自己custom一些函数做为计算因子,因此可以灵活的完成更多的计算功能.
  • 相关阅读:
    每日英语:Why Sit Up Straight?
    每日英语:Doctor's Orders: 20 Minutes Of Meditation Twice A Day
    每日英语:SamsungApple Patent Fight: Is It Worth It?
    每日英语:Better Than Buffett, This Investor Made Me Rich for Life
    每日英语:Coming Soon to China: AtHome Toxic Food Test Kits
    去了家新公司,技术总监不让用 IntelliJ IDEA!!想离职了。。
    IDEA 进行远程 Debug,这个太强了。。
    雷军做程序员时写的博客,太牛了。。
    Spring Boot 如何获取 Controller 方法名和注解信息?
    sql 2005/2008中直接创建web service,允许客户机通过HTTP访问
  • 原文地址:https://www.cnblogs.com/cssdongl/p/6184157.html
Copyright © 2011-2022 走看看