zoukankan      html  css  js  c++  java
  • [Pyspark]RDD常用方法总结

    aggregate(zeroValue, seqOp, combOp)

    • 入参:

      • zeroValue表示一组初值 Tuple
      • seqOp表示在各个分区partition中进行 什么样的聚合操作,支持不同类型的聚合 Func
      • combOp表示将不同分区partition聚合后的结果再进行聚合,只能进行同类型聚合 Func
    • 返回:

      • 聚合后的结果,不是RDD,是一个python对象

    下面是对一组数进行累加,并计算数据的长度的例子

        # sum, sum1, sum2 的数据类型跟zeroValue一样, 是一个tuple(int, int)
        seqOp = (lambda sum, item: (sum[0] + item, sum[1] + 1))
        combOp = (lambda sum1, sum2: (sum1[0] + sum2[0], sum1[1] + sum2[1]))
        result = sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
    
        print(result) # (10, 4)
    

    aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions, partitionFunc)

    基本跟aggregate类似,在相同的key下进行聚合操作

    • 入参:

      • zeroValue表示一组初值 Tuple
      • seqFunc表示在各个分区partition中进行 什么样的聚合操作,支持不同类型的聚合 Func
      • combFunc表示将不同分区partition聚合后的结果再进行聚合,只能进行同类型聚合 Func
      • numPartitions表示需要将此操作分割成多少个分区
      • partitionFunc自定义分区方法
    • 返回:

      • 聚合后的结果是一个RDD,不再是一个python对象, 需要调用collect()方法取回

    下面是对一队成员的成绩进行累加,并计算成员的总分和参加科目的总数

        seqFunc = (lambda sum, item: (sum[0] + item, sum[1] + 1))
        combFunc = (lambda sum1, sum2: (sum1[0] + sum2[0], sum1[1] + sum2[1]))
        result = sc.parallelize(
            [("A", 83), ("A", 74), ("A", 91), ("A", 82),
             ("B", 69), ("B", 62), ("B", 97), ("B", 80), ("B", 60),
             ("C", 78), ("C", 73), ("C", 68)]) 
            .aggregateByKey((0, 0), seqFunc, combFunc)
    
        print(result.collect()) # [('B', (368, 5)), ('C', (219, 3)), ('A', (330, 4))]
    

    cache()

    将RDD结果存储在内存中,以便再次利用

    以下两条语句相等

        result = sc.parallelize([1, 2, 3, 4]).cache()
        result2 = sc.parallelize([1, 2, 3, 4]) 
            .persist(storageLevel=StorageLevel.MEMORY_ONLY)
    

    cartesian(rdd)

    返回自己与传入rdd的笛卡尔积

    • 入参:

      • rdd表示一个rdd对象,可以存储不同数据类型 RDD
    • 返回:

      • 返回的结果是一个RDD
        num_rdd = sc.parallelize([1, 2])
        str_rdd = sc.parallelize(['a', 'y'])
        result = num_rdd.cartesian(str_rdd)
    
        print(result.collect()) # [(1, 'a'), (1, 'y'), (2, 'a'), (2, 'y')]
    

    coalesce(numPartitions, shuffle)

    常用于压缩任务,当分区过多时,将造成并行计算效率降低,调度器在不同分区中频繁切换,没有充分时间去完成计算任务。

    • 入参:

      • numPartitions表示需要将此操作压缩成多少个分区 Int
      • shuffle表示是否平均分给每个分区,并不是对数据进行打乱 Boolean
        (因为数据的偏斜, 也将影响并行计算的效率, 简单理解=> 木桶效应)
    • 返回:

      • 返回的结果是一个RDD
    
        num_rdd = sc.parallelize([i for i in range(0, 12)], 5)
        print(num_rdd.glom().collect()) # [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9, 10, 11]]
    
        new_rdd = num_rdd.coalesce(2, shuffle=True)
        print(new_rdd.glom().collect()) # [[0, 1, 4, 5, 6, 7], [2, 3, 8, 9, 10, 11]]
    
        new_rdd2 = num_rdd.coalesce(2, shuffle=False)
        print(new_rdd2.glom().collect()) # [[0, 1, 2, 3], [4, 5, 6, 7, 8, 9, 10, 11]]
    

    cogroup(rdd, numPartitions)

    将两个RDD中相同key进行合并,

    • 入参:

      • rdd表示一个rdd对象,可以存储不同数据类型 RDD
      • numPartitions表示需要将此操作压缩成多少个分区 Int
    • 返回:

      • 返回的结果是一个RDD
        x = sc.parallelize([("a", 1), ("b", 4)])
        y = sc.parallelize([("a", 2), ("y", 4)])
        z = x.cogroup(y) 
            .map(lambda item: (item[0], list(item[1][0]), list(item[1][1])))
    
        print(z.collect()) # [('b', [4], []), ('y', [], [4]), ('a', [1], [2])]
    
    

    collect()

    将数据以List取回本地
    官网提示,建议只在任务结束时在调用collect方法,否则很容易OOM

    • 返回:
      • 返回的结果是一个List

    collectAsMap()

    将数据以key-value对的形式取回本地

    • 返回:
      • 返回的结果是一个Dict

    combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions, partitionFunc)

    基本跟aggregate类似,在相同的key下进行聚合操作, 计算过程发生在Driver端

    • 入参:
      • createCombiner表示一个处理初值的函数 Func
      • mergeValue表示在所在的Map节点上进行什么样的聚合操作,支持不同类型的聚合 Func
      • mergeCombiners表示将不同Map节点上相同key聚合后的结果再进行聚合,只能进行同类型聚合 Func
      • numPartitions表示需要将此操作分割成多少个分区
      • partitionFunc自定义分区方法
        init = (lambda val: [val])
        seqFunc = (lambda sum_list, item: sum_list + [item])
        combFunc = (lambda sum_list1, sum_list2: sum_list1 + sum_list2)
        result = sc.parallelize(
            [("A", 83), ("A", 74), ("A", 91), ("A", 82),
             ("B", 69), ("B", 62), ("B", 97), ("B", 80), ("B", 60),
             ("C", 78), ("C", 73), ("C", 68)]) 
            .combineByKey(init, seqFunc, combFunc)
    
        print(result.collect()) 
        # [('B', [69, 62, 97, 80, 60]), ('C', [78, 73, 68]), ('A', [83, 74, 91, 82])]
    
    

    count()

    返回RDD内存储的数据长度(List形式)

    • 返回:
      • 返回的结果是一个Int

    countApprox(timeout, confidence)

    计算结果的估计数量;返回在timeout时间内完成的计算任务 的数据长度(List形式)

    • 入参:

      • timeout表示一个最大的计算时间 (毫秒) Int
      • confidence表示置信区间 Float
    • 返回:

      • 返回的结果是一个Int
        rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
        print(rdd.countApprox(100)) # 3
    

    countByKey()

    返回每个key对应的元素数量

    • 返回:
      • 返回的结果是一个Dict
        rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
        print(rdd.countByKey()) # defaultdict(<class 'int'>, {'a': 2, 'b': 1})
    

    countByValue()

    返回每个value出现的次数

    • 返回:
      • 返回的结果是一个Dict
        rdd2 = sc.parallelize([1, 2, 1, 2, 2], 2)
        print(rdd2.countByValue())  # defaultdict(<class 'int'>, {1: 2, 2: 3})
    

    distinct()

    遍历全部元素,并返回包含的不同元素的总数

    • 入参:

      • numPartitions表示需要将此操作分割成多少个分区
    • 返回:

      • 返回的结果是一个Int

    filter(func)

    遍历全部元素,筛选符合传入方法的元素

    • 入参:

      • func表示需要应用到每个元素的筛选方法
    • 返回:

      • 返回的结果是一个RDD
        rdd = sc.parallelize([1, 2, 3, 4, 5])
        rdd.filter(lambda x: x % 2 == 0)
        print(rdd.collect()) # [2, 4]
    

    flatMap(func, preservesPartitioning)

    遍历全部元素,将传入方法应用到每个元素上,并将最后结果展平(压成一个List)

    • 入参:

      • func表示需要应用到每个元素的方法
      • preservesPartitioning是否保持当前分区方式,默认重新分区
    • 返回:

      • 返回的结果是一个RDD
        rdd = sc.parallelize([2, 3, 4])
        sorted(rdd.flatMap(lambda x: range(1, x)).collect()) #[1, 1, 1, 2, 2, 3]
        sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) #[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
    

    flatMapValues(func)

    遍历某个元素的元素值,将传入方法应用到每个元素值上,并将最后结果展平(压成一个List)

    • 入参:

      • func表示需要应用到每个元素值的方法
    • 返回:

      • 返回的结果是一个RDD
        x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
        x.flatMapValues(lambda val: val).collect() # [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
    

    fold(zeroValue, func)

    fold()与reduce()类似,接收与reduce接收的函数签名相同的函数,另外再加上一个初始值作为第一次调用的结果。(例如,加法初始值应为0,乘法初始值应为1)

    • 入参:

      • zeroValue表示一组初值 Tuple
      • func表示需要应用到每个元素上的方法 Func
    • 返回:

      • 聚合后的结果,不是RDD,是一个python对象
        x = sc.parallelize([1, 2, 3, 4, 5])
        x.fold(0, add) # 15
    

    foldByKey(zeroValue, func, numPartitions, partitionFunc)

    基本跟fold()类似,在相同的key下进行聚合操作

    • 入参:

      • zeroValue表示一组初值 Tuple
      • func表示需要应用到每个元素上的方法 Func
      • numPartitions表示需要将此操作分割成多少个分区
      • partitionFunc自定义分区方法
    • 返回:

      • 返回的结果是一个RDD
        x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
        x.foldByKey(0, add).collect() # [('a', 2), ('b', 1)]
    

    foreach(func)

    用于遍历RDD中的元素,将函数func应用于每一个元素。

    • 入参:

      • func表示需要应用到每个元素的方法, 但这个方法不会在客户端执行
    • 返回:

      • 返回的结果是一个RDD
        def f(x): print(x)
        sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
    

    foreachPartition(func)

    遍历某个分区下的全部元素,将函数func应用于每一个元素。

    • 入参:

      • func表示需要应用到每个元素的方法, 但这个方法不会在客户端执行
    • 返回:

      • 返回的结果是一个RDD
        def f(iterator):
          for x in iterator:
               print(x)
        sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
    

    glom()

    按分区对元素进行聚合, 返回一个二维列表

    • 返回:
      • 返回的结果是一个RDD
        rdd = sc.parallelize([1, 2, 3, 4], 2)
        sorted(rdd.glom().collect()) # [[1, 2], [3, 4]]
    

    groupyBy(func, numPartitions, partitionFunc)

    这个算子接收一个Func,应用函数后的返回值作为key,然后通过这个key来对里面的元素进行分组。

    • 入参:

      • func表示需要应用到每个元素上的方法 Func
      • numPartitions表示需要将此操作分割成多少个分区
      • partitionFunc自定义分区方法
    • 返回:

      • 返回的结果是一个RDD
        rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
        result = rdd.groupBy(lambda x: x % 2).collect()
        sorted([(x, sorted(y)) for (x, y) in result]) # [(0, [2, 8]), (1, [1, 1, 3, 5])]
    

    groupByKey(numPartitions, partitionFunc)

    与groupBy类似,不需要再传入func

    groupWith(rdd, *rdd)

    cogroup的加强版,可以用于多于两个的RDD合并

    • 入参:

      • rdd表示一个rdd对象,可以存储不同数据类型 RDD
      • *rdd表示一个rdd可变列表对象,可以是多个RDD对象 RDD
    • 返回:

      • 返回的结果是一个RDD
        x = sc.parallelize([("a", 1), ("b", 4)])
        y = sc.parallelize([("a", 2), ("y", 4)])
        w = sc.parallelize([("c", 3), ("a", 6)])
        z = x.groupWith(y, w) 
            .map(lambda item: (item[0], list(item[1][0]), list(item[1][1])))
        print(z.collect()) # [('b', [4], []), ('y', [], [4]), ('a', [1], [2]), ('c', [], [])]
    

    join(rdd, numPartitions)

    内连接,将两个RDD中具有相同的key时进行连接

    • 入参:

      • rdd表示一个rdd对象,可以存储不同数据类型 RDD
      • numPartitions表示需要将此操作分割成多少个分区
    • 返回:

      • 返回的结果是一个RDD
        x = sc.parallelize([("a", 1), ("b", 4)])
        y = sc.parallelize([("a", 2), ("a", 3)])
        sorted(x.join(y).collect()) # [('a', (1, 2)), ('a', (1, 3))]
    

    leftOuterJoin(other, numPartitions=None)

    左外连接, 与join类似

    lookup(key)

    • 入参:

      • key表示需要查找元素的key
    • 返回:

      • 返回的结果是一个List
        rdd = sc.parallelize(list(zip(range(100), range(100, 200))), 10) #[(0,100), (1,101), ...]
        result = rdd.lookup(6)
        print(result) # 106
    

    map(func, preservesPartitioning)

    对于每个元素都应用这个func

    • 入参:

      • func表示需要应用到每个元素的方法
      • preservesPartitioning是否保持当前分区方式,默认重新分区
    • 返回:

      • 返回的结果是一个RDD
        rdd = sc.parallelize(["b", "a", "c"])
        sorted(rdd.map(lambda x: (x, 1)).collect()) #[('a', 1), ('b', 1), ('c', 1)]
    

    mapPartitions(func, preservesPartitioning)

    对于每个分区应用这个func

    • 入参:

      • func表示需要应用到每个元素的方法
      • preservesPartitioning是否保持当前分区方式,默认重新分区
    • 返回:

      • 返回的结果是一个RDD
        rdd = sc.parallelize([1, 2, 3, 4], 2)
        def f(iterator): yield sum(iterator)
        rdd.mapPartitions(f).collect() # [3, 7]
        
    

    mapPartitionsWithIndex(func, preservesPartitioning)

    对于每个分区应用这个func,但同时会被传入分区的index

    • 入参:

      • func表示需要应用到每个元素的方法
      • preservesPartitioning是否保持当前分区方式,默认重新分区
    • 返回:

      • 返回的结果是一个RDD
      rdd = sc.parallelize([1, 2, 3, 4], 4)
      def f(splitIndex, iterator): yield splitIndex
      rdd.mapPartitionsWithIndex(f).sum() # 6 =>  0 + 1 + 2 + 3
        
    

    mapValues(func)

    对键值对中每个value都应用这个func,并保持key不变

    • 入参:

      • func表示需要应用到每个元素值上的方法
    • 返回:

      • 返回的结果是一个RDD
        x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
        def f(x): return len(x)
        x.mapValues(f).collect() # [('a', 3), ('b', 1)]
    

    partitionBy(numPartitions, partitionFunc)

    返回一个原始RDD经过自定义分区方法的拷贝

    • 入参:

      • numPartitions表示需要将此操作分割成多少个分区
      • partitionFunc自定义分区方法
    • 返回:

      • 返回的结果是一个RDD
        rdd = sc.parallelize([(0, 1), (1, 2), (1, 3), (0, 2), (3, 5), (5, 6)])
        new_rdd = rdd.partitionBy(numPartitions=3, partitionFunc=lambda x: hash(x))
        print(rdd.glom().collect()) # [[], [(0, 1)], [(1, 2)], [(1, 3)], [], [(0, 2)], [(3, 5)], [(5, 6)]]
        print(new_rdd.glom().collect()) # [[(0, 1), (0, 2), (3, 5)], [(1, 2), (1, 3)], [(5, 6)]]
    
    

    persist(storageLevel)

    当RDD第一次计算完成之后,保存起来,具体保存在什么位置根据storageLevel来决定

    • 入参:
      • storageLevel表示RDD的计算结果保存的位置, 内存或者硬盘中(以文件形式)
        rdd = sc.parallelize(["b", "a", "c"])
        rdd.persist().is_cached # True
    

    pipe(command, env=None, checkCode=False)

    将由管道命令创建的数据以RDD形式拉取到内存中

    • 入参:

      • command表示需要执行的命令
      • env前置环境配置参数
      • checkCode是否检查shell命令的返回值
    • 返回:

      • 返回的结果是一个RDD
        x = sc.parallelize(['A', 'Ba', 'C', 'AD'])
        y = x.pipe('grep -i "A"') 
        print(x.collect()) # ['A', 'Ba', 'C', 'AD']
        print(y.collect()) # ['A', 'Ba', 'AD']
        
    

    reduce(func)

    对于每个元素值都应用这个func

    • 入参:

      • func表示需要应用到每个元素的方法
    • 返回:

      • 返回的结果是一个Python obj, 与元素值得数据类型一致
        x = sc.parallelize([1, 2, 3])
        y = x.reduce(lambda a, b : a + b )
        print(x.collect()) # [1, 2, 3]
        print(y) # 6
    

    reduceByKey(func, numPartitions=None, partitionFunc)

    对于这个key对应的元素值都应用这个func

    • 入参:

      • func表示需要应用到每个元素值的方法
      • numPartitions表示需要将此操作分割成多少个分区
      • partitionFunc自定义分区方法
    • 返回:

      • 返回的结果是一个RDD
        rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
        sorted(rdd.reduceByKey(add).collect()) # [('a', 2), ('b', 1)]
    

    reduceByKeyLocally(func)

    功能跟reduceByKey相同,但是计算发生在mapper节点中,计算结果直接传回主节点,类似combiner

    • 入参:

      • func表示需要应用到每个元素值的方法 Func
    • 返回:

      • 返回的结果是一个RDD
        rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
        sorted(rdd.reduceByKey(add).collect()) # [('a', 2), ('b', 1)]
    

    repartition(numPartitions)

    对RDD按指定分区数量进行重新分区

    • 入参:

      • numPartitions表示需要将此操作分割成多少个分区 Int
    • 返回:

      • 返回的结果是一个RDD
        rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
        sorted(rdd.glom().collect()) # [[1], [2, 3], [4, 5], [6, 7]]
        len(rdd.repartition(2).glom().collect()) # 2
        len(rdd.repartition(10).glom().collect()) # 10
        
    

    rightOuterJoin(other, numPartitions)

    右外连接, 与join类似

    sample(withReplacement, fraction, seed)

    返回RDD数据的一个子集

    • 入参:

      • withReplacement表示是否试放回抽样 Boolean
      • fraction表示抽样比例 Float
      • seed随机种子
    • 返回:

      • 返回的结果是一个RDD
        rdd = sc.parallelize(range(100), 4)
        sample = rdd.sample(False, 0.1, 666)
        print(sample.count()) # 11
    

    sortBy(keyfunc, ascending, numPartitions)

    根据keyfunc对RDD进行排序

    • 入参:

      • keyfunc表示需要被排序的key Func
      • ascending表示升序或者降序 Boolean 默认升序
      • numPartitions表示需要将此操作分割成多少个分区 Int
    • 返回:

      • 返回的结果是一个RDD
        tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
        sc.parallelize(tmp).sortBy(lambda x: x[0]).collect() # [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
    

    sortByKey(ascending, numPartitions, keyfunc)

    对RDD进行排序,默认RDD内的数据是tuple(key,value)形式

    • 入参:

      • ascending表示升序或者降序 Boolean 默认升序
      • numPartitions表示需要将此操作分割成多少个分区 Int
      • keyfunc表示需要被排序的key Func
    • 返回:

      • 返回的结果是一个RDD
        tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
        sc.parallelize(tmp).sortByKey(True, 1).collect() # [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
    

    takeOrdered(num, key)

    返回排序后的前num个数据

    • 入参:

      • num表示需要返回元素的数量
      • key表示需要被排序的key Func
    • 返回:

      • 返回的结果是一个List
        sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) # [1, 2, 3, 4, 5, 6]
        sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x) # [10, 9, 7, 6, 5, 4]
    

    takeSample(withReplacement, num, seed)

    返回RDD数据的一个子集

    • 入参:

      • withReplacement表示是否试放回抽样 Boolean
      • num表示需要返回元素的数量
      • seed随机种子
    • 返回:

      • 返回的结果是一个RDD
        rdd = sc.parallelize(range(0, 10))
        len(rdd.takeSample(True, 20, 1)) # 20
    

    zip(rdd)

    参考python的内置方法zip

    • 入参:

      • rdd表示一个rdd对象,可以存储不同数据类型,但数量需要相同 RDD
    • 返回:

      • 返回的结果是一个RDD
        x = sc.parallelize(range(0,5))
        y = sc.parallelize(range(1000, 1005))
        x.zip(y).collect() # [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
    

    zipWithIndex()

    与元素本身的index进行zip操作

    • 返回:
      • 返回的结果是一个RDD
        rdd = sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect()
        print(rdd.collect()) #[('a', 0), ('b', 1), ('c', 2), ('d', 3)]
    
  • 相关阅读:
    Net使用RdKafka引发异常RdKafka.Internal.LibRdKafka 的类型初始值设定项引发异常
    mysql数据与Hadoop之间导入导出之Sqoop实例
    如何将mysql数据导入Hadoop之Sqoop安装
    常见的几种Flume日志收集场景实战
    Restful服务应不应该在URI中加入版本号
    sudo
    shell实现SSH自动登陆
    使用465端口加密发邮件
    linux下c++如何输入不回显
    tmp
  • 原文地址:https://www.cnblogs.com/sight-tech/p/12990579.html
Copyright © 2011-2022 走看看