zoukankan      html  css  js  c++  java
  • PySpark笔记

    RDD介绍

    Spark中的RDD就是一个不可变的分布式对象集合,每个RDD都被分为多个分区,这些分区运行在集群中的不同节点上。用户可以使用两种方法创建RDD:读取一个外部数据集,或在驱动器程序里分发驱动器程序中的对象集合(比如 list 和 set)。创建出来之后,RDD支持两种类型的操作:转化操作(transformation)和行动操作(action)。转化操作会由一个RDD生成一个新的RDD,比如 filter() 函数。行动操作会对RDD计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如HDFS)中,比如 first() 函数。如果,你对于当前的函数无法判断是转化操作还是行动操作时,你可以看该函数的返回值是什么,如果是一个RDD,那么该函数就是一个转化操作,如果是其他的数据类型,那么该函数就是一个行动操作。

    有关转化操作的API


    filter()函数

    函数例子:

    errorsRDD = inputRDD.filter(lambda x: "error" in x)
    
    # 或者
    def hasError(line):
      return 'error' in line
    
    errorsRDD = inputRDD.filter(hasError)

    这个API的作用是挑选出包含 error 的内容。注意,filter()操作不会改变已有的 inputRDD 中的数据。实际上,该操作会返回一个全新的RDD。


    union()函数

    函数例子:

    errorsRDD = inputRDD.filter(lambda line : "error" in line)
    warningRDD = inputRDD.filter(lambda line : "warning" in line)
    badLinesRDD = errorsRDD.union(warningRDD)

    这个API的作用是计算两个RDD的并集。如果两个RDD之间有重复的元素,那么在新生成的RDD中也会包含重复的元素。


    intersection()函数

    函数例子:

    inputRDD = sc.parallelize([1,2,3,4,5,6,7,8,9])
    a = inputRDD.filter(lambda x : x % 2 == 0) # 2,4,6,8
    b = inputRDD.filter(lambda x : x > 5) # 6,7,8,9
    c = a.intersection(b) # 8,6

    subtract()函数

    函数例子:

    inputRDD = sc.parallelize([1,2,3,4,5,6,7,8,9])
    a = inputRDD.filter(lambda x : x % 2 == 0) # 2,4,6,8
    b = inputRDD.filter(lambda x : x > 5) # 6,7,8,9
    c = a.subtract(b) # 2,4

    cartesian()函数

    函数例子:

    inputRDD = sc.parallelize([1,2,3,4,5,6,7,8,9])
    a=sc.parallelize(['a','b','c','d'])
    b = inputRDD.filter(lambda x : x > 5) # 6,7,8,9
    c = a.cartesian(b) 
    # output
    [('a', 6), ('a', 7), ('a', 8), ('a', 9), ('b', 6), ('b', 7), ('b', 8), ('b', 9), ('c', 6), ('c', 7), ('c', 8), ('c', 9), ('d', 6), ('d', 7), ('d', 8), ('d', 9)]
    这个API的作用是计算两个RDD的笛卡尔积。该API转化操作会返回所有可能的(a,b)对,其中a是源RDD中的元素,而b则是来自另一个RDD。
    笛卡尔积在我们希望考虑所有可能的组合的相似度时比较有用,比如计算各用户对各种产品的预期兴趣程度。我们也可以求一个RDD与其自身的笛卡尔积,
    这可以用于求用户相似度的应用中。不过要特别注意的是,求大规模RDD的笛卡尔积开销巨大。

    map()函数

    函数例子:

    doubleRDD = inputRDD.map(lambda x: x * 2)

    这个API的作用是遍历inputRDD中所有的元素,然后返回的新的RDD中的元素是原来的两倍。


    flatMap()函数

    函数例子:

    inputRDD = sc.parallelize(['i love you', 'hello world'])
    outputRDD = inputRDD.flatMap(lambda x: x.split(' '))
    print outputRDD.count() # 5

    这个API的作用是被应用到输入inputRDD中每个元素上,不过返回的不是一个一个元素,而是一个返回值序列的迭代器。输出的RDD倒不是由迭代器组成的。我们得到的是一个包含各个迭代器可访问的所有元素的RDD。


    distinct()函数

    函数例子:

    inputRDD = sc.parallelize([2,4,3,1,2,3,3,2,1,3,4,2,3,1,4])
    distinctRDD = inputRDD.distinct()
    dictinctRDD.collect()   # 1,2,3,4

    这个API的作用是来生成一个只包含不同元素的新RDD。不过由于该操作需要对所有数据通过网络进行混洗(shuffle),所有这个操作非常消耗时间。


    sample()函数

    函数例子:

    inputRDD = sc.parallelize([1,2,3,4,5,6,7,8,9,0])
    sampleRDD = inputRDD.sample(False, 0.5)
    # 2,3,4,7
    sampleRDD = inputRDD.sample(True, 0.5)
    # 2,2,2,5,6,6

    这个API的作用是来随机采集RDD中的数据,第一个参数表示RDD中的元素是否可以被重复采集,如果True,那么表示可以重复采集。第二个参数是元素是否被采集的概率,取值范围必须是 [0,1]


    reduceByKey()函数

    函数例子:

    inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
    outputRDD = inputRDD.reduceByKey(lambda x,y: x+y)
    # output
    [(1, 2), (3, 10)]

    这个API的作用是来合并具有相同键的值。


    groupByKey()函数

    函数例子:

    inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
    outputRDD = inputRDD.groupByKey(lambda x,y: x+y)
    # {(1, [1]), (3,[4,6])}
    for (i,j) in outputRDD.collect():
      for item in j:
        print item
    # output item
    <pyspark.resultiterable.ResultIterable object at 0x110a7ec90>
    2
    <pyspark.resultiterable.ResultIterable object at 0x110a7ed50>
    4
    6

    这个API的作用是对具有相同键的值进行分组。


    mapValues()函数

    函数例子:

    inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
    outputRDD = inputRDD.mapValues(lambda x: x+1)
    # {(1, [1]), (3,[4,6])}
    for (i,j) in outputRDD.collect():
      for item in j:
        print item
    # output item
    <pyspark.resultiterable.ResultIterable object at 0x110a7ec90>
    2
    <pyspark.resultiterable.ResultIterable object at 0x110a7ed50>
    4
    6

    这个API的作用是对具有相同键的值进行分组。


    flatMapValues()函数

    函数例子:

    inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
    outputRDD = inputRDD.flatMapValues(lambda x: range(x, 6))
    # output
    [(1, 2), (1, 3), (1, 4), (1, 5), (3, 4), (3, 5)]

    这个API的作用是对pair RDD中的每个值应用一个返回迭代器的函数,然后对返回的每个元素都生成一个对应原键的键值对记录。通常用于符号化。


    keys()函数

    函数例子:

    inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
    outputRDD = inputRDD.keys()
    # output
    [1,3,3]

    这个API的作用是返回一个仅包含键的RDD。


    values()函数

    函数例子:

    inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
    outputRDD = inputRDD.values()
    # output
    [2,4,6]

    这个API的作用是返回一个仅包含值的RDD。


    sortByKey()函数

    函数例子:

    inputRDD = sc.parallelize([(11,2),(13,4),(3,6)])
    outputRDD = inputRDD.sortByKey()
    # output
    [(3,6),(11,2),(13,4)]

    这个API的作用是返回一个根据键排序的RDD。


    combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)函数

    函数例子:

    inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
    outputRDD = inputRDD.combineByKey(
      (lambda x: (x, 1)),
      (lambda x, y: (x[0] + y, x[1] + 1)),
      (lambda x, y: (x[0] + y[0], x[1] + y[1]))
    )
    # output
    {[(1, (2, 1)), (3, (10, 2))]}

    要理解combineByKey(),要先理解它在处理数据时是如何处理每个元素的。由于combineByKey()会遍历分区中的所有的元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。
    如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值。需要注意的是,这一过程会在每个分区中第一次出现各个键时发生,而不是在整个RDD中第一次出现一个键时发生。
    如果这是一个在处理当前分区之前已经遇到的值,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并。
    由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器,如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果进行合并。

    有关行动操作的API


    top()函数

    函数例子:

    outputdata = inputRDD.top(10)
    for line in outputdata:
      print line

    这个API的作用是返回inputRDD最前面的K个元素,返回的数据类型是一个list,长度是K。


    take()函数

    函数例子:

    outputdata = inputRDD.take(10)
    for line in outputdata:
      print line 

    这个API的作用是返回inputRDD中的K个元素,返回的数据类型是一个list,长度是K。


    first()函数

    函数例子:

    inputRDD.first()

    这个API的作用是返回inputRDD最前面的元素,返回的数据类型是一个字符串,编码是Unicode编码。


    collect()函数

    函数例子:

    outputdata = inputRDD.collect()
    for line in outputdata:
      print line

    这个API的作用是返回inputRDD中所有的元素,返回的数据类型是一个list。注意,这个API只能在小数据上面使用,如果数据量太大,非常消耗时间和内存。


    count()函数

    函数例子:

    len = inputRDD.count()
    print len

    这个API的作用是返回inputRDD中元素的个数。


    reduce()函数

    函数例子:

    inputRDD = sc.parallelize([1,2,3,4,5,6,7,8,9])
    output = inputRDD.reduce(lambda x,y : x+y)
    # output
    45

    这个API的作用是接收一个函数作为参数,这个函数要操作两个RDD的元素类型的数据并返回一个同样类型的新元素。


    takeSample(withReplacement, num, seed = None)函数

    函数例子:

    inputRDD = sc.parallelize(range(10))
    output = inputRDD.takeSample(True, 20)
    # output
    [8, 5, 5, 7, 7, 6, 3, 1, 0, 7, 5, 5, 4, 3, 3, 4, 8, 2, 7, 4]
    output = inputRDD.takeSample(False, 5)
    # output
    [2, 9, 7, 8, 0]

    这个API的作用是返回一个指定长度的子集。如果 withReplacement 是True,那么返回的元素可以是重复采集的。


    countByValue()函数

    函数例子:

    inputRDD = sc.parallelize(['a','b','c','d'])
    output = inputRDD.countByValue()
    # output
    {'a': 1, 'c': 1, 'b': 1, 'd': 1}

    这个API的作用是计算各元素在RDD中出现的次数。


    substractByKey()函数

    函数例子:

    inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
    otherRDD = sc.parallelize([(3,9)])
    output = inputRDD.subtractByKey(otherRDD)
    # output
    [(1,2)]

    这个API的作用是删除inputRDD中键与otherRDD中的键相同的元素。


    join()函数

    函数例子:

    inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
    otherRDD = sc.parallelize([(3,9)])
    output = inputRDD.join(otherRDD)
    # output
    [(3, (4, 9)), (3, (6, 9))]

    这个API的作用是对两个RDD进行内连接。


    cogroup()函数

    函数例子:

    inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
    otherRDD = sc.parallelize([(3,9)])
    output = inputRDD.cogroup(otherRDD)
    # output
    {(1, ([2], [])), (3, ([4, 6], [9]))}

    这个API的作用是将两个RDD中拥有相同键的数据分组到一起。


    rightOuterJoin()函数

    函数例子:

    inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
    otherRDD = sc.parallelize([(3,9)])
    output = inputRDD.rightOuterJoin(otherRDD)
    # output
    [(3, (4, 9)), (3, (6, 9))] 

    这个API的作用是对两个RDD进行连接操作,确保第一个RDD的键必须存在(右外连接)。


    leftOuterJoin()函数

    函数例子:

    inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
    otherRDD = sc.parallelize([(3,9)])
    output = inputRDD.leftOuterJoin(otherRDD)
    # output
    [(1, (2, None)), (3, (4, 9)), (3, (6, 9))]

    这个API的作用是对两个RDD进行连接操作,确保第二个RDD的键必须存在(左外连接)。


    countByKey()函数

    函数例子:

    inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
    output = inputRDD.countByKey()
    # output
    {1: 1, 3: 2}

    这个API的作用是对每个键对应的元素分别计数。


    collectAsMap()函数

    函数例子:

    inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
    output = inputRDD.collectAsMap()
    # output
    {1: 2, 3: 6}

    这个API的作用是将结果以映射表的形式返回,以便查询。但如果RDD中,同一个key存在多个value,那么后面的value将会覆盖掉前面的value,最终得到的key就是唯一的,而且对应一个value。


    lookup()函数

    函数例子:

    inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
    output = inputRDD.lookup(3)
    # output
    [4, 6]

    这个API的作用是返回给定键对应的所有值。


    从Hive中读取数据


    from pyspark.sql import HiveContext
    
    hiveCtx = HiveContext(sc)
    rows = hiveCtx.sql("select name, age from users")
    firstRow = rows.first()
    print firstRow.name

    Python向spark传递函数有三种方法


    # 方法一
    word = rdd.filter(lambda s: "error" in s)
    
    # 方法二
    def containsError(s):
      return "error" in s
    word = rdd.filter(containsError)
    
    # 方法三
    class WordFunctions(object):
      ...
      def getMatchesNoReference(self, rdd):
        # 安全方式:只把需要的字段提取到局部变量中
        query = self.query
        return rdd.filter(lambda x: query in x)

  • 相关阅读:
    Oracle数据库的dual表的作用
    数据库中CASE函数和Oracle的DECODE函数的用法
    Oracle数据库中,通过function的方式建立自增字段
    Java学习(十三):抽象类和接口的区别,各自的优缺点
    Java学习(十八):二叉树的三种递归遍历
    Sublime工具插件安装
    sizeof和strlen
    I2C接口的EEPROM操作
    关于窗口看门狗
    关于指针传入函数
  • 原文地址:https://www.cnblogs.com/geogre123/p/12787610.html
Copyright © 2011-2022 走看看