zoukankan      html  css  js  c++  java
  • Spark常用RDD操作总结

    aggregate

    • 函数原型:aggregate(zeroValue, seqOp, combOp)
    • seqOp相当于Map
    • combOp相当于Reduce
    • zeroValue是seqOp每一个partion的初始值,是一个元组,默认为0。
    • 计算列表中总数:
    sc.parallelize( [[123],[4,5,6],[7,8,9]] )\
    .aggregate(0, lambda: x,y: sum(y)+x, lambda x,y: x+y)
    # [('world', 1), ('hello', 2), ('morning', 1)]
    

    seqOp的输入值为x,y,其中x为初始值或中间值(如果当前partion有多个列表那就有中间值了,即上一个seqOp返回的值),而y就是第一个输入,比如:[1,2,3],[4,5,6],[7,8,9]。。。 combOp的输入值也为x,y,其中y为初始值或中间值(超过2个partion时肯定会产生中间值),x为输入值。比如1+2+3=6,4+5+6=15,7+8+9=24。那6,15,24都会作为输入值计算。当然此处的combOp在调用过程中也并非是串行的挨个把6,15,24加起来,中间也会有先汇总再求和的过程。但对用户来说此处是透明的。 我们看到aggregate中把每一个物理上的输入行作为一个计算单位输入并输出,他比较适合计算总数,行数等类似与列中所蕴含值无关的统计维度。


    aggregateByKey

    • 函数原型:aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None)
    • 参数与aggregate相同
    • 根据key进行合并

    • 上例稍加改动可以完成一个wordcounts

    sc.parallelize(["hello world", "hello morning"])\
    .flatMap(lambda line: line.split())\
    .map(lambda letter: (letter, 1)).aggregateByKey(0, lambda x,y: y+x, lambda x,y: x+y)\
    .collect()
    # [(1, 1), (1, 2), (2, 1), (2, 2)]
    

    cartesian

    • 返回两个rdd的笛卡儿积
    rdd1 = sc.parallelize([1, 2])
    rdd2 = sc.parallelize([3, 4, 5])
    rdd1.catesian(rdd2).cellect()
    # [(1, 1), (1, 2), (2, 1), (2, 2)]
    

    glom

    • 将一个一维横向列表,划分为多个块
    sc.parallelize([1,2,3,4,5], 1).collect()
    # [1, 2, 3, 4, 5]
    sc.parallelize([1,2,3,4,5], 1).glom().collect()
    # [[1, 2, 3, 4, 5]]
    sc.parallelize([1,2,3,4,5], 2).glom().collect()
    # [[1, 2], [3, 4, 5]]
    

    coalesce

    • 将多个块组合成n个大的列表
    sc.parallelize([1,2,3,4,5], 3).coalesce(2).glom().collect()
    # [[1], [2, 3, 4, 5]]
    sc.parallelize([1,2,3,4,5], 3).coalesce(2).collect()
    # [1, 2, 3, 4, 5]
    sc.parallelize([1,2,3,4,5], 3).glom().collect()
    # [[1], [2, 3], [4, 5]]
    

    cogroup

    • 函数原型:cogroup(other, numPartitions=None)

    • 按key聚合后,求两个RDD的并集。

    x = sc.parallelize([("a", 1), ("b", 4)])
    y = sc.parallelize([("a", 2)])
    map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect())))
    # [('a', ([1], [2])), ('b', ([4], []))]
    

    collectAsMap

    • 将rdd数据按KV对形式返回
    sc.parallelize([(1,2), (3,4)]).collectAsMap()
    # {1: 2, 3: 4}
    sc.parallelize([(1, (2, 6666)), (3, 4)]).collectAsMap()
    # {1: (2, 6666), 3: 4}
    

    combineByKey

    • 函数原型:combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None)

    • 根据key进行


    count

    • 返回rdd中元素的数目
    sc.parallelize([2,3,4]).count()
    # 3
    

    countByKey

    • 按key聚合后计数
    rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    rdd.countByKey().items()
    # [('a', 2), ('b', 1)]
    

    countByValue

    • 按value聚合后再计数
    sc.parallelize(["hello", "world", "hello", "china", "hello"]).countByValue().items()
    # [('world', 1), ('china', 1), ('hello', 3)]
    

    countApprox

    • countApprox(timeout, confidence=0.95) 貌似在公司版本中还未提供 count的一个升级版(实验中),当超过timeout时,返回一个未完成的结果。
    rdd = sc.parallelize(range(1000), 10)
    rdd.countApprox(1000, 1.0)
    # 1000
    

    distinct

    • distinct(numPartitions=None) 返回rdd中unique的元素
    sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()
    # [1, 2, 3]
    

    filter

    • 过滤一个RDD中,其每一行必须瞒住filter的条件
    rdd = sc.parallelize([1, 2, 3, 4, 5])
    rdd.filter(lambda x: x%2==0).collect()
    # [2, 4]
    

    first

    • 返回rdd中的第一个元素
    sc.parallelize([2, 3, 4]).first()
    

    flatMap

    • flatMap(f, preservesPartitioning=False) 返回rdd中的所有元素,并把flatMap中返回的列表拉平。
    rdd = sc.parallelize([2, 3, 4])
    rdd.flatMap(lambda x: range(1, x)).collect()
    # [1, 1, 1, 2, 2, 3]
    

    flatMapValues

    • 同flatMap,但按照key进行flat,并最终拉平。
    x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
    def f(x): return x
    x.flatMapValues(f).collect()
    # [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
    

    fold

    • fold(zeroValue, op) 聚合RDD的每一个分区,最后再合并计算,每一个函数默认值为"zeroValue"。 op(t1,t2)函数可以更改t1并且将更改后的t1作为返回值返回以减少对象内存占用。切记不可个性t2的值。
    def add(x,y): return x+y
    sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
    # 15
    
  • 相关阅读:
    jquery插件layer
    获取订单的product_id 和订单的数量
    Python psutil模块
    Linuc bazaar命令
    分布式版本控制系统
    launchpad, jira, github
    C/C++ 经典面试题汇总
    Windows Cmder
    Reddit指南
    Linux xclip命令
  • 原文地址:https://www.cnblogs.com/milkcoffeesugar/p/5734072.html
Copyright © 2011-2022 走看看