zoukankan      html  css  js  c++  java
  • Spark学习笔记之RDD中的Transformation和Action函数

      总算可以开始写第一篇技术博客了,就从学习Spark开始吧。之前阅读了很多关于Spark的文章,对Spark的工作机制及编程模型有了一定了解,下面把Spark中对RDD的常用操作函数做一下总结,以pyspark库为例。

      RDD 的操作函数(operation)主要分为2种类型 Transformation 和 Action,如下图:

      

      Transformation 操作不是马上提交 Spark 集群执行的,Spark 在遇到 Transformation 操作时只会记录需要这样的操作,并不会去执行,需要等到有 Action 操作的时候才会真正启动计算过程进行计算.针对每个 Action,Spark 会生成一个 Job, 从数据的创建开始,经过 Transformation, 结尾是 Action 操作.这些操作对应形成一个有向无环图(DAG),形成 DAG 的先决条件是最后的函数操作是一个Action.
      

     

    Transformation:

    map(f, preservesPartitioning=False):将一个函数应用到这个RDD的每个element上,返回一个新的RDD。下面例子将rdd中每个element复制两遍:

    1 from pyspark import SparkContext
    2 
    3 sc = SparkContext('local', 'test')
    4 
    5 rdd = sc.parallelize(['a', 'b', 'c'])
    6 
    7 rdd.map(lambda x: x*2).collect()
    8 
    9 Out: ['aa', 'bb', 'cc']

    filter(f):返回仅包含满足应用到element函数的新RDD。下面例子将过滤出rdd中的偶数:

    1 rdd = sc.parallelize([1, 2, 3, 4])
    2 
    3 rdd.filter(lambda x: x%2 == 0).collect()
    4 
    5 Out: [2, 4]

    flatMap(f, preservesPartitioning=False):返回一个新的RDD,首先将一个函数应用到这个RDD的所有element上,注意返回的是多个结果。

    1 rdd.flatMap(lambda x: range(1, x)).collect()
    2 
    3 Out: [1, 1, 2, 1, 2, 3]

    mapPartitions(f, preservesPartitioning=False):通过将一个函数应用到这个RDD的每个partition上,返回一个新的RDD。

    1 rdd = sc.parallelize([1, 2, 3, 4], 2)
    2 
    3 def f(iterator): yield sum(iterator)
    4 
    5 rdd.mapPartitions(f).collect()
    6 Out:[3, 7]

    mapPartitionsWithIndex(f, preservesPartitioning=False):通过在RDD的每个partition上应用一个函数来返回一个新的RDD,同时跟踪原始partition的索引。下面例子返回索引和:

    1 rdd = sc.parallelize([1, 2, 3, 4], 4)
    2 
    3 def f(splitIndex, iterator): yield splitIndex
    4 
    5 rdd.mapPartitionsWithIndex(f).sum()
    6 
    7 Out:6

    sample(withReplacement, fraction, seed=None)根据给定的随机种子seed,随机抽样出数量为frac的数据,返回RDD。

    1 rdd = sc.parallelize(range(100), 4)
    2 
    3 rdd.sample(False, 0.2, 10).count()
    4 
    5 Out: 21

    union(other):返回两个RDD的并集。

    1 rdd = sc.parallelize([1, 1, 2, 3])
    2 
    3 rdd.union(rdd).collect()
    4 
    5 Out: [1, 1, 2, 3, 1, 1, 2, 3]

    distinct(numPartitions=None):类似于python中的set(),返回不重复的元素集合。

    1 sc.parallelize([1, 1, 2, 3]).distinct().collect()
    2 
    3 Out:[1, 2, 3]

    groupByKey(numPartitions=None, partitionFunc=<function portable_hash>)在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task。

    >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    >>> sorted(rdd.groupByKey().mapValues(len).collect())
    [('a', 2), ('b', 1)]
    >>> sorted(rdd.groupByKey().mapValues(list).collect())
    [('a', [1, 1]), ('b', [1])]

    reduceByKey(func, numPartitions=None, partitionFunc=<function portable_hash>)在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。

    >>> from operator import add
    >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    >>> sorted(rdd.reduceByKey(add).collect())
    [('a', 2), ('b', 1)]

    sortByKey(ascending=True, numPartitions=None, keyfunc=<function RDD.<lambda>>):按照key来进行排序,是升序还是降序,ascending是boolean类型

     1 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
     2 >>> sc.parallelize(tmp).sortByKey().first()
     3 ('1', 3)
     4 >>> sc.parallelize(tmp).sortByKey(True, 1).collect()
     5 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
     6 >>> sc.parallelize(tmp).sortByKey(True, 2).collect()
     7 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
     8 >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
     9 >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
    10 >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
    11 [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]

    join(other, numPartitions=None)在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集。默认为inner join

    >>> x = sc.parallelize([("a", 1), ("b", 4)])
    >>> y = sc.parallelize([("a", 2), ("a", 3)])
    >>> sorted(x.join(y).collect())
    [('a', (1, 2)), ('a', (1, 3))]

    cogroup(other, numPartitions=None):当有两个KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,即outer join

    >>> x = sc.parallelize([("a", 1), ("b", 4)])
    >>> y = sc.parallelize([("a", 2)])
    >>> [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
    [('a', ([1], [2])), ('b', ([4], []))]

    cartesian(other)笛卡尔积。但在数据集T和U上调用时,返回一个(T,U)对的数据集,所有元素交互进行笛卡尔积。

    >>> rdd = sc.parallelize([1, 2])
    >>> sorted(rdd.cartesian(rdd).collect())
    [(1, 1), (1, 2), (2, 1), (2, 2)]

    Action:

    reduce(f):说白了就是聚集,但是传入的函数是两个参数输入返回一个值,这个函数必须是满足交换律和结合律的

    >>> from operator import add
    >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
    15
    >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
    10
    >>> sc.parallelize([]).reduce(add)
    Traceback (most recent call last):
        ...
    ValueError: Can not reduce() empty RDD

    collect():一般在filter或者足够小的结果的时候,再用collect封装返回一个数组

    count():返回的是dataset中的element的个数

    first():返回的是dataset中的第一个元素 

    take(n):返回一个数组,由数据集的前n个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是Driver程序所在机器,单机计算所有的元素(Gateway的内存压力会增大,需要谨慎使用)

    >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
    [2, 3]
    >>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
    [2, 3, 4, 5, 6]
    >>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
    [91, 92, 93]

    takeSample(withReplacement, num, seed=None):抽样返回一个dataset中的num个元素,随机种子seed

    >>> rdd = sc.parallelize(range(0, 10))
    >>> len(rdd.takeSample(True, 20, 1))
    20
    >>> len(rdd.takeSample(False, 5, 2))
    5
    >>> len(rdd.takeSample(False, 15, 3))
    10

    saveAsTextFile(path, compressionCodecClass=None):将数据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者任何其它hadoop支持的文件系统。Spark将会调用每个元素的toString方法,并将它转换为文件中的一行文本

    saveAsSequenceFile(path, compressionCodecClass=None):将数据集的元素,以sequencefile的格式,保存到指定的目录下,本地系统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必须由key-value对组成,并都实现了Hadoop的Writable接口,或隐式可以转换为Writable(Spark包括了基本类型的转换,例如Int,Double,String等等)

    countByKey():返回的是key对应的个数的一个map,作用于一个RDD

    >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    >>> sorted(rdd.countByKey().items())
    [('a', 2), ('b', 1)]

    foreach(f):在数据集的每一个元素上,运行函数func。这通常用于更新一个累加器变量,或者和外部存储系统做交互

    >>> def f(x): print(x)
    >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
  • 相关阅读:
    数据结构算法C语言实现(十五)---4.3 串的模式匹配算法
    数据结构算法C语言实现(十四)--- 4.1&4.2串的类型定义、表示及实现
    数据结构算法C语言实现(十三)--- 3.5离散事件模拟
    数据结构算法C语言实现(十二)--- 3.4循环队列&队列的顺序表示和实现
    数据结构算法C语言实现(十一)--- 3.4队列的链式表示和实现
    数据结构算法C语言实现(十)--- 3.3栈与递归的实现
    数据结构算法C语言实现(九)--- 拓展:由迷宫问题引申的AI贪吃蛇
    数据结构算法C语言实现(八)--- 3.2栈的应用举例:迷宫求解与表达式求值
    数据结构算法C语言实现(七)--- 3.1栈的线性实现及应用举例
    数据结构算法C语言实现(六)---2.4一元多项式的表示及相加
  • 原文地址:https://www.cnblogs.com/wmx24/p/8563514.html
Copyright © 2011-2022 走看看