zoukankan      html  css  js  c++  java
  • 对spark算子aggregateByKey的理解

    案例

    aggregateByKey算子其实相当于是针对不同“key”数据做一个map+reduce规约的操作。

    举一个简单的在生产环境中的一段代码
    有一些整理好的日志字段,经过处理得到了RDD类型为(String,(String,String))的List格式结果,其中各个String代表的是:(用户名,(访问时间,访问页面url))
    同一个用户可能在不同的时间访问了不同或相同的页面,为了合并同一个用户的访问行为,写了下面这段代码,用到aggregateByKey。

    val data = sc.parallelize(
    List(
    ("13909029812",("20170507","http://www.baidu.com")),("18089376778",("20170401","http://www.google.com")),("18089376778",("20170508","http://www.taobao.com")),("13909029812",("20170507","http://www.51cto.com"))
    )
    )
            data.aggregateByKey(scala.collection.mutable.Set[(String, String)](), 200)((set, item) => {
              set += item
            }, (set1, set2) => set1 union set2).mapValues(x => x.toIterable).collect
    

    结果:

    res12: Array[(String, Iterable[(String, String)])] = Array((18089376778,Set((20170401,http://www.google.com), (20170508,http://www.taobao.com))), (13909029812,Set((20170507,http://www.51cto.com), (20170507,http://www.baidu.com))))
    

    分解分析:##

    aggregateByKey(参数1)(参数2,参数3)

    过程:对于data的某个key,参数1为初始化值,在参数2的函数中,初始值和该key的每一个value传入函数进行操作,所有返回的结果在参数3中进行规约。

    • 参数1
      scala.collection.mutable.Set[(String, String)]()
    

    new 了一个空的set集合,做为初始值

    • 参数2
      (set, item) => {
      set += item
      }
      一个类似于map的映射函数,将该key的每一个value(在本案例之是(访问时间,访问url))作为item,将其放入set中并返回。
      可知某个key的所有value都会返回一个含有该value的set

    • 参数3
      (set1, set2) => set1 union set2
      该key的所有value得到的set进行union规约。并返回

    最终结果:得到了每一个用户在所有时间的访问url的行为信息。



    作者:Entry_1
    链接:https://www.jianshu.com/p/09912beb1350
    來源:简书
    简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
  • 相关阅读:
    C# Dev PropertyGrid
    C# PropertyGrid控件应用心得
    FileWriter不覆盖
    FileWriter
    java试题
    Java线程池
    java自带线程池和队列详细讲解
    HashMap练习题
    Map集合
    java指定
  • 原文地址:https://www.cnblogs.com/cxhfuujust/p/9776353.html
Copyright © 2011-2022 走看看