zoukankan      html  css  js  c++  java
  • RDD转换算子(transformantion)

    RDD算子

    作用于RDD上的Operation分为转换(transformantion)和动作(action)。 Spark中的所有“转换”都是惰性的,在执行“转换”操作,并不会提交Job,只有在执行“动作”操作,所有operation才会被提交到cluster中真正的被执行。这样可以大大提升系统的性能。

    RDD的转换算子

    • map(func)

      返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

        rdd1 = sc.parallelize(['b','a','c'])
        rdd1.map(lambda x: (x,1)).collect()
        #结果
        [('b',1),('a',1),('c',1)]
      
    • filter(func)

      返回包含指定过滤条件的元素。RDD是一个分布式数据集,filter转换操作针对RDD所有分区的每一个元素进行过滤,filter方法将满足条件的元素返回,不满足条件的元素被忽略

        def fun(x):
            return x % 2 == 0
        rdd1 = sc.parallelize([1,2,3,4,5,6,7])
        rdd2 = rdd1.filter(fun)
        rdd2.collect()
        #结果
        [2,4,6]
      
    • flatMap(func)

      类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

        rdd1 = sc.parallelize(['hello world','hello china'])
        rdd2 = rdd1.flatMap(lambda x:x.split(' '))
        rdd2.collect()
        #结果
        ['hello','world','hello','china']
      
    • glom

      该函数是将RDD中每一个分区中元素转为数组

        rdd1 = sc.parallelize([1,2,3,4,5,6],3)
        rdd2 = rdd1.glom()
        rdd2.collect()
        #结果
        [[1, 2], [3, 4], [5, 6]]
      
    • mapPartitions(funs)

      类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]

        x = sc.parallelize([1,2,3,4,5,6], 2)
        def f(x):
        	li = list(x)
        	result = []
        	for i in li:
        		result.append(i+10)
        	return result
        y = x.mapPartitions(f)
        print('x原来分区信息:{}'.format(x.glom().collect()))
        print('x经过f计算后的结果:{}'.format(y.glom().collect()))
        #结果
        x原来分区信息:[[1, 2, 3], [4, 5, 6]]
        x经过f计算后的结果:[[11, 12, 13], [14, 15, 16]]
      
    • mapPartitionsWithIndex(func)

      类似于mapPartitions,但func带有一个整数参数表示分片的索引值

        x = sc.parallelize([1,2,3,4,5,6], 2)
        def f(idx,x):
        	li = list(x)
        	result = []
        	for i in li:
        		result.append(i+10)
        	return (idx,result)
        	y = x.mapPartitionsWithIndex(f)
      
        print('x原来分区信息:{}'.format(x.glom().collect()))
        print('x经过f计算后的结果:{}'.format(y.glom().collect())
      
        #结果
        x原来分区信息:[[1, 2, 3], [4, 5, 6]]
        x经过f计算后的结果:[[0, [11, 12, 13]], [1, [14, 15, 16]]]
      
    • distinct()

      返回包含不同元素的新RDD

        rdd = sc.parallelize([1,1,1,2,3,4,5,5,6])
        rdd.distinct().collect()
        #结果
        [1, 2, 3, 4, 5, 6]   
      
    • union() (并集)

      源RDD和参数RDD求并集后返回一个新的RDD

        rdd = sc.parallelize([1, 1, 2, 3])
        rdd1 = sc.parallelize([5, 3, 4, 6])
        rdd.union(rdd1).collect()
        #结果
        [1,1,2,3,5,3,4,6]
      
    • intersection() (交集)

      对源RDD和参数RDD求交集后返回一个新的RDD

        rdd1 = sc.parallelize([1, 1, 2, 3])
        rdd2 = sc.parallelize([5, 3, 4, 6])
        rdd1.intersection(rdd2).collect()
        #结果
        [3]
      
    • groupBy

      通过函数f对RDD进行分组。

        rdd1 = sc.parallelize([1, 1, 2, 3, 5, 8])
        rdd2 = rdd1.groupBy(lambda x: x % 2)
        result = rdd2.collect()
        for (key,value) in result:
        	li = list(value)
        	print("key:{},数据:{}".format(key,li))
        #结果
        key:0,数据:[2, 8]
        key:1,数据:[1, 1, 3, 5]
      
    • sortBy(keyfunc, ascending=True, numPartitions=None)

      按给定的函数,对RDD进行排序

        x = sc.parallelize(['wills', 'kris', 'april', 'chang'])
        def sortByFirstLetter(s): return s[0]
        def sortBySecondLetter(s): return s[1]
      
        y = x.sortBy(sortByFirstLetter).collect()
        yy = x.sortBy(sortBySecondLetter).collect()
        
        print ('按第一个字母排序结果: {}'.format(y))
        print ('按第二个字母排序结果:{}'.format(yy))
        #结果
        按第一个字母排序结果: ['april', 'chang', 'kris', 'wills']
        按第二个字母排序结果:['chang', 'wills', 'april', 'kris']
      
    • cogroup(other, numPartitions=None)

      在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD

        x = sc.parallelize([("a", 1), ("b", 4)])
        y = sc.parallelize([("a", 2)])
        result = x.cogroup(y).collect()
      
        for (x,y) in result:
        	tmp = []
        	for i in y:
        		tmp.append(list(i))
        	print('key是{},value是:{}'.format(x,tmp))
        #结果
        key是a,value是:[[1], [2]]
        key是b,value是:[[4], []]
      
    • partitionBy

      该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区

        pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
        sets = pairs.partitionBy(2).glom().collect()
        #结果
        sets
        [[(2,2),(4,4),(2,2),(4,4)],[(1,1),(3,3),(1,1)]]
      
    • coalesce(numPartitions)

      cartesian 返回一个RDD和另一个RDD的笛卡尔乘积,即所有元素对(a,b)的RDD,其中a在自身中,b在另一个中。

      假设集合A={a, b},集合B={0, 1, 2},则两个集合的笛卡尔积为{(a, 0), (a, 1), (a, 2), (b, 0), (b, 1), (b, 2)}。

        rdd = sc.parallelize([2,3])
        sorted(rdd.cartesian(rdd).collect())
        #结果
        [(2, 2), (2, 3), (3, 2), (3, 3)]
  • 相关阅读:
    Overloaded的方法是否可以改变返回值的类型
    parseXXX的用法
    java的类型转换问题。int a = 123456;short b = (short)a;System.out.println(b);为什么结果是-7616?
    UVA 10405 Longest Common Subsequence(简单DP)
    POJ 1001 Exponentiation(大数处理)
    POJ 2318 TOYS(计算几何)(二分)
    POJ 1265 Area (计算几何)(Pick定理)
    POJ 3371 Flesch Reading Ease (模拟题)
    POJ 3687 Labeling Balls(拓扑序列)
    POJ 1094 Sorting It All Out(拓扑序列)
  • 原文地址:https://www.cnblogs.com/jiajiaba/p/10621775.html
Copyright © 2011-2022 走看看