zoukankan      html  css  js  c++  java
  • Spark RDD 操作(三)

    1. 创建 RDD

    主要两种方式:

    • sc.textFile 加载本地或集群文件系统中的数据,或者从 HDFS 文件系统、HBase、Cassandra、Amazon S3等外部数据源中加载数据集。Spark可以支持文本文件、SequenceFile文件(Hadoop提供的 SequenceFile是一个由二进制序列化过的key/value的字节流组成的文本存储文件)和其他符合Hadoop InputFormat格式的文件
    • parallelize 方法将 Driver 中数据结构化并行成 RDD
    >>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
    >>> lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
    >>> lines = sc.textFile("/user/hadoop/word.txt")
    >>> lines = sc.textFile("word.txt")
    
    # 并行化
    nums = [1, 2, 3, 5, 6]
    rdd = sc.parallelize(nums)
    

    注意

    • 使用本地文件系统路径,须保证在所有 worker 节点上都能采用相同路径能够访问该文件(可将文件包括到每个 worker 节点上,或采用网络挂载共享文件系统)
    • textFile() 参数可以是文件、目录、压缩文件
    • textFile() 接收第二个参数(可选),用于指定分区数,默认 sparkHDFSblock 创建一个分区,(HDFS中每个block默认是128MB),可以提供一个比 block 更大的值作为分区数目,但是不能比它小

    2. RDD 操作

    RDD 创建后,在后续过程中会有两种操作:

    • 转换 transformation 操作:基于现有数据集创建一个新的数据集,转换得到的 RDD 是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作,不会触发计算
    • 行动 action 操作:在数据集上进行运算,返回计算值,会触发计算

    2.1 常用Transformation操作

    2.1.1 map

    将分区中的每份数据都作用到一个 function 中,生成一个新的分布式的数据集并返回,类似于 Python 内置的 map 方法:

    from pyspark import SparkConf, SparkContext
    
    if __name__ == "__main__":
        conf = SparkConf().setMaster("local[2]").setAppName("spark0401")
        sc = SparkContext(conf=conf)
    
    
        def my_app():
            """
            data 中每个元素都乘以 2
            """
            data = [1, 2, 3, 4, 5, 6]
            rdd = sc.parallelize(data).map(lambda x: x * 2)
    
            print(rdd.collect())
    
    
        my_app()
    
        sc.stop()	# 记得关闭
    

    image-20201220223223341

    image-20201220223534810

    2.1.2 filter

    选出所有 function 返回值为 True 的元素,生成一个新的分布式的数据集返回:

    from pyspark import SparkConf, SparkContext
    
    if __name__ == "__main__":
        conf = SparkConf().setMaster("local[2]").setAppName("spark0401")
        sc = SparkContext(conf=conf)
    
        def my_filter():
            data = [1, 2, 3, 4, 5, 6]
            rdd1 = sc.parallelize(data).map(lambda x: x * 2)
            filer_rdd = rdd1.filter(lambda x: x > 4)
    
            print(filer_rdd.collect())
    
        my_filter()
    
        sc.stop()
    

    2.1.3 flatMap

    将函数应用于 rdd 之中的每一个元素,将返回的迭代器的所有内容构成新的 rdd,通常用来切分单词:

    from pyspark import SparkConf, SparkContext
    
    if __name__ == "__main__":
        conf = SparkConf().setMaster("local[2]").setAppName("spark0401")
        sc = SparkContext(conf=conf)
    
        def my_flat_map():
            data = ["hello spark", "hello python", "hello world"]
            rdd = sc.parallelize(data).flatMap(lambda line: line.split(" "))
            print(rdd.collect())
    
        my_flat_map()
    
        sc.stop()
    

    运行结果:

    ['hello', 'spark', 'hello', 'python', 'hello', 'world']
    

    2.1.4 union

    连接、合并多个 rdd

    from pyspark import SparkConf, SparkContext
    
    if __name__ == "__main__":
        conf = SparkConf().setMaster("local[2]").setAppName("spark0109")
        sc = SparkContext(conf=conf)
        
        def my_union():
            """连接"""
            rdd1 = sc.parallelize([1, 2, 3])
            rdd2 = sc.parallelize(['a', 'b', 'c'])
            rdd_union = rdd1.union(rdd2)
    
            print(rdd_union.collect())
    
    
        my_union()
        sc.stop()
    

    运行结果:

    [1, 2, 3, 'a', 'b', 'c']
    

    2.1.5 distinct 去重

    rdd 中相同元素进行去重:

    from pyspark import SparkConf, SparkContext
    
    if __name__ == "__main__":
        conf = SparkConf().setMaster("local[2]").setAppName("spark0109")
        sc = SparkContext(conf=conf)
        
        def my_distinct():
            rdd1 = sc.parallelize([1, 2, 3])
            rdd2 = sc.parallelize([1, 'a', '2', 'b'])
            rdd_distinct = rdd1.union(rdd2).distinct()
    
            print(rdd_distinct.collect())
    
    
        my_distinct()
    sc.stop()
    

    运行结果:

    ['b', 1, 'a', 2, 3, '2']
    

    2.1.6 join 连接

    类似于 SQLjoin,包括:

    • inner join:内连接
    • outer joinleft/right/full join 外连接(左外、右外、全连接)
    from pyspark import SparkConf, SparkContext
    
    if __name__ == "__main__":
        conf = SparkConf().setMaster("local[2]").setAppName("spark0109")
        sc = SparkContext(conf=conf)
        
        def my_join():
            a = sc.parallelize([('A', 'a1'), ('C', 'c1'), ('D', 'd1'), ('F', 'f1'), ('F', 'f2')])
            b = sc.parallelize([('A', 'a2'), ('C', 'c2'), ('C', 'c3'), ('E', 'e1')])
            
            join_res = a.join(b).collect()
            left_join_res = a.leftOuterJoin(b).collect()	# 只关心左边有的数据,左边没有的为 None
            right_join_res = a.rightOuterJoin(b).collect()	# 只关心右边有的数据,右边没有的为 None
            full_join_res = a.fullOuterJoin(b).collect()
    
            print('a join b >>>', join_res)
            print('left_join_res >>>', left_join_res)
            print('right_join_res >>>', right_join_res)
            print('full_join_res >>>', full_join_res)
    
    
        my_join()
    sc.stop()
    

    运行结果:

    a join b >>> [('A', ('a1', 'a2')), ('C', ('c1', 'c2')), ('C', ('c1', 'c3'))]
    
    left_join_res >>> [('A', ('a1', 'a2')), ('F', ('f1', None)), ('F', ('f2', None)), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('D', ('d1', None))]
    
    right_join_res >>> [('A', ('a1', 'a2')), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('E', (None, 'e1'))]
    
    full_join_res >>> [('A', ('a1', 'a2')), ('F', ('f1', None)), ('F', ('f2', None)), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('D', ('d1', None)), ('E', (None, 'e1'))]
    

    其他操作

    # subtract找到属于前一个rdd而不属于后一个rdd的元素
    >>> a = sc.parallelize(range(10))
    >>> b = sc.parallelize(range(5,15))
    >>> a.subtract(b).collect()
    [0, 1, 2, 3, 4]            
    
    # 求交集 
    >>> a.intersection(b).collect()
    [6, 7, 8, 9, 5]          
    
    # cartesian笛卡尔积
    >>> boys = sc.parallelize(["LiLei","Tom"])
    >>> girls = sc.parallelize(["HanMeiMei","Lily"])
    >>> boys.cartesian(girls).collect()
    [('LiLei', 'HanMeiMei'), ('LiLei', 'Lily'), ('Tom', 'HanMeiMei'), ('Tom', 'Lily')]
    
    # 按照某种方式排序,这里从小到大排序
    >>> c = sc.parallelize([(1,2,3),(3,2,2),(4,1,1)])
    >>> c.sortBy(lambda x: x[2]).collect()
    [(4, 1, 1), (3, 2, 2), (1, 2, 3)]
    
    # 按照拉链方式连接两个RDD,效果类似python的zip函数
    # 需要两个RDD具有相同的分区,每个分区元素数量相同
    >>> rdd_name = sc.parallelize(["LiLei","Hanmeimei","Lily"])
    >>> rdd_age = sc.parallelize([19,18,20])
    >>> rdd_name.zip(rdd_age).collect()
    [('LiLei', 19), ('Hanmeimei', 18), ('Lily', 20)]
    >>> rdd_name =  sc.parallelize(["LiLei","Hanmeimei","Lily","Lucy","Ann","Dachui","RuHua"])
    
    # 将RDD和一个从0开始的递增序列按照拉链方式连接。
    >>> rdd_name.zipWithIndex().collect()
    [('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]
    

    2.2. 常用 Action 操作

    常用 action 算子:

    • collect:收集获取全部元素
    • count:统计数目
    • take:取几个元素,如:take(5)
    • reduce:累计计算
    • saveAsTextFile:保存到文件系统,可以保存到本地或 HDFS
    • foreach:循环元素,对每一个元素执行某种操作,不生成新的 RDD
    • takeSample(False, 10, 0):可以随机取若干个到Driver,第一个参数设置是否放回抽样
    • first():获取第一个数据

    示例:

    >>> data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    >>> rdd = sc.parallelize(data)
    >>> rdd.collect()
    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    
    >>> rdd.count()
    10
    
    >>> rdd.max()
    10
    
    >>> rdd.min()
    1
    
    >>> rdd.sum()
    55
    
    >>> rdd.reduce(lambda x, y: x+y)
    55
    
    >>> rdd.foreach(lambda x: print(x))
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    

    2.2.1 排序 sortBy

    topN

    students = [("HanMeiMei", 16, 77), ("DaChui", 16, 66), ("Jim", 18, 77), ("LiLei", 18, 87), ("RuHua", 18, 50)]
    rdd = sc.parallelize(students)
    rdd.sortBy(lambda x: x[2], ascending=False)
    
    print(rdd.take(3))
    
    [('LiLei', 18, 87), ('HanMeiMei', 16, 77), ('DaChui', 16, 66)]
    

    2.2.2 countByKey

    Pair RDDkey 统计数量:

    pairRdd = sc.parallelize([(1, 1), (1, 4), (3, 9), (2, 16)])
    rdd2 = pairRdd.countByKey()
    print(rdd2)     # defaultdict(<class 'int'>, {1: 2, 3: 1, 2: 1})
    

    2.3 常用PairRDD的转换操作

    PairRDD 指的是数据为长度为2 的 tuple 类似 (k,v) 结构的数据类型的 RDD,其每个数据的第一个元素被当做key,第二个元素被当做 value

    2.1.4 groupByKey

    将相同的 key 分组,key-value 形式:

    from pyspark import SparkConf, SparkContext
    
    if __name__ == "__main__":
        conf = SparkConf().setMaster("local[2]").setAppName("spark0401")
        sc = SparkContext(conf=conf)
        
            def my_group_by_key():
            data = ["hello spark", "hello python", "hello world"]
            rdd1 = sc.parallelize(data). 
                flatMap(lambda x: x.split(" ")). 
                map(lambda x: (x, 1))
    		
            rdd1 = rdd1.groupByKey().mapValues(len)
    
            # 返回的是一个 key-value 形式的键值对,键为具体单词,值为可迭代对象,需要 list 进行转换
            # [{'python': [<pyspark.resultiterable.ResultIterable object at 0x0000027029F17048>]},
            # {'world': [<pyspark.resultiterable.ResultIterable object at 0x0000027029F17080>]},
            # {'hello': [<pyspark.resultiterable.ResultIterable object at 0x0000027029F170F0>]},
            # {'spark': [<pyspark.resultiterable.ResultIterable object at 0x0000027029F17160>]}]
            rdd2 = rdd1.groupByKey().map(lambda x: {x[0]: list(x[1])})
    
            print(rdd2.collect())
    
            print(rdd1.collect())
    
        my_group_by_key()
    
        sc.stop()
    

    运行分析

    • flatMap:经过空格切分后变成:['hello', 'spark', 'hello', 'python', 'hello', 'world']
    • map(lambda x: (x, 1)):给每个单词都计数为 1,变为:[('hello', 1), ('spark', 1), ('hello', 1), ('python', 1), ('hello', 1), ('world', 1)]

    运行结果:

    # rdd2 result
    [{'python': [1]}, {'world': [1]}, {'hello': [3]}, {'spark': [1]}]
    
    # rdd1 result,采用的是 groupByKey().mapValues(len) 方式
    [('python', 1), ('world', 1), ('hello', 3), ('spark', 1)]
    

    2.1.5 reduceByKey

    把相同的 key 的数据分发到一起并进行相应的计算(如:累加,累乘等),类似于 Pythonreduce 方法,需要传入两个参数:

    from pyspark import SparkConf, SparkContext
    
    if __name__ == "__main__":
        conf = SparkConf().setMaster("local[2]").setAppName("spark0401")
        sc = SparkContext(conf=conf)
    
        def my_reduce_by_key():
            data = ["hello spark", "hello python", "hello world"]
            rdd1 = sc.parallelize(data). 
                flatMap(lambda x: x.split(" ")). 
                map(lambda x: (x, 1))
    
            reduce_rdd = rdd1.reduceByKey(lambda x, y: x + y)
    
            print(reduce_rdd.collect())
    
        my_reduce_by_key()
    
        sc.stop()
    

    运行结果:

    [('python', 1), ('world', 1), ('hello', 3), ('spark', 1)]
    

    2.1.6 sortByKey

    对由 key-value 组成的 RDD 进行排序,默认升序 key 排序:

    需求:

    # 对 wc 结果中出现的次数降序/升序排序
    [('hello', 3), ('world', 1), ('spark', 2)]
    

    value 排序:

    from pyspark import SparkConf, SparkContext
    
    if __name__ == "__main__":
        conf = SparkConf().setMaster("local[2]").setAppName("spark0401")
        sc = SparkContext(conf=conf)
    
        def sort_by_key():
            """
            排序,升序排序设置 sortByKey(True) 或不设置即可
            """
            data = [('hello', 3), ('world', 1), ('spark', 2)]
            sort_rdd = sc.parallelize(data)
            # 变为 [(3, 'hello'), (1, 'world'), (2, 'spark')]
            sort_rdd = sort_rdd.map(lambda x: (x[1], x[0])). 
                sortByKey(False). 
                map(lambda x: (x[1], x[0]))
    
            print(sort_rdd.collect())
    
        sort_by_key()
    
        sc.stop()
    

    运行结果:

    [('hello', 3), ('spark', 2), ('world', 1)]
    

    wordcountvalue 排序:

    def my_sort():
        data = ["hello world", "hello spark", "hello python"]
        sort_rdd = sc.parallelize(data)
        sort_rdd = sort_rdd.flatMap(lambda x: x.split(" ")). 
            map(lambda x: (x, 1)). 
            reduceByKey(lambda x, y: x + y). 
            map(lambda x: (x[1], x[0])). 
            sortByKey(False).
            map(lambda x: (x[1], x[0]))
    
        print(sort_rdd.collect())
    

    运行结果:

    [('hello', 3), ('world', 1), ('python', 1), ('spark', 1)]
    

    2.1.7 foldByKey

    foldByKey 的操作和 reduceByKey 类似,但是要提供一个初始值:

    >>> x = sc.parallelize([("a",1),("b",2),("a",3),("b",5)],1)
    
    >>> x.foldByKey(1, lambda x,y:x+y).collect()
    [('a', 5), ('b', 8)]
    

    2.1.8 subtractByKey

    去除 x 中那些 key 也在 y 中的元素

    x = sc.parallelize([("a", 1), ("b", 2), ("c", 3)])
    y = sc.parallelize([("a", 2), ("b", (1, 2))])
    
    print(x.subtractByKey(y).collect())		# [('c', 3)]
    

    3. Spark RDD 案例实战

    3.1 词频统计 wordcount

    步骤:

    • 将文本内容每一行转成一个个单词:flatMap
    • 单词 === > (单词, 1):map
    • 把所有相同单词的计数相加得到最终结果:reduceByKey

    1、wordcount.py

    # coding=utf-8
    import sys
    from pyspark import SparkConf, SparkContext
    
    if __name__ == '__main__':
        conf = SparkConf().setAppName('fengjun').setMaster('local[2]')
        sc = SparkContext(conf=conf)
    
        # wc_rdd = sc.textFile(sys.argv[1])
        wc_rdd = sc.textFile("wc.txt")
    
        wc_rdd = wc_rdd.flatMap(lambda row: row.split(" ")). 
            map(lambda x: (x, 1)). 
            reduceByKey(lambda x, y: x + y)
    
        print(wc_rdd.collect())
    

    2、wc.txt

    hello world
    hello spark
    hello python
    

    也可以将结果写入文件系统:

    rdd.saveAsTextFile('xxxx')
    

    3.2 TOPN

    students = [("HanMeiMei", 16, 77), ("DaChui", 16, 66), ("Jim", 18, 77), ("LiLei", 18, 87), ("RuHua", 18, 50)]
    rdd = sc.parallelize(students)
    
    # 方法一
    rdd.sortBy(lambda x: x[2], ascending=False)
    
    [('LiLei', 18, 87), ('HanMeiMei', 16, 77), ('DaChui', 16, 66)]
    
    # 方法二
    rdd = rdd.map(lambda x: (x[2], x)). 
    	sortByKey(False). 
    	map(lambda x: x[1])
    
    print(rdd.take(3))	# [('LiLei', 18, 87), ('HanMeiMei', 16, 77), ('Jim', 18, 77)]
    

    3.3 平均数

    students = [("HanMeiMei", 16, 77), ("DaChui", 16, 66), ("Jim", 18, 77), ("LiLei", 18, 87), ("RuHua", 18, 50)]
    rdd = sc.parallelize(students)
    
    avg_rdd = rdd.map(lambda line: line[2]).
    reduce(lambda x, y: x + y)
    
    avg = avg_rdd / rdd.count()
    
    print(avg)  # 71.4
    

    3.4 使用 spark-submit提交任务

    [root@bogon bin]# cd /home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/bin
    
    ./spark-submit --master local[2] --name fengjun /home/hj/app/projects/wordcount.py /home/hj/app/projects/wc.txt 
    

    提交成功后,可访问:http://192.168.131.131:4041,只有当任务运行时才可以访问,运行结束就不可以访问了。

  • 相关阅读:
    FortiGate 硬件加速
    RSA modulus too small: 512 < minimum 768 bits
    VMXNET3 vs E1000E and E1000
    BZOJ 1432: [ZJOI2009]Function(新生必做的水题)
    BZOJ 2456: mode(新生必做的水题)
    BZOJ 1968: [Ahoi2005]COMMON 约数研究(新生必做的水题)
    BZOJ 2463: [中山市选2009]谁能赢呢?(新生必做的水题)
    海量数据处理算法总结【超详解】
    POJ 1659 Frogs' Neighborhood(可图性判定—Havel-Hakimi定理)【超详解】
    图的存储结构之邻接表(详解)
  • 原文地址:https://www.cnblogs.com/midworld/p/14645992.html
Copyright © 2011-2022 走看看