zoukankan      html  css  js  c++  java
  • spark python算子讲解

    1:spark的算子分类

    1.   Transformation 称为转换,是一种延迟加载的算法,会记录元数据信息,任务触发action时开始执行
    2.   Action 称为动作 出发就执行  
    sc.textFile().map  map是transformation
                     .filter  transformation
                     .collect 是action直接执行

     2:创建rdd的两种方式

    1. 通过hdfs支持的文件系统,rdd里面没有真正要计算的数据,只记录元数据
    2. 通过scala集合或者数据以并行化的方式创建rdd

     2:spark python高级算子

     1.mapPartitions

    // 传给mapPartitions的方法中 参数是partitions的迭代器对象,返回值也是一个迭代器对象
    // python实现如下
    def filterOutFromPartion(list):
        //list是partitioins的迭代器集合
        iterator = []
        //elements是具体的partition中元素的迭代器
        for elements in list:
            iterator.append([x for  x in elements if x !=2 ])
        return iter(iterator)
    data = [[1,2,3],[3,2,4],[5,2,7]] conf = SparkConf().setAppName("study") sc = SparkContext(conf=conf) partitions = sc.parallelize(data, 2).mapPartitions(filterOutFromPartion).collect() print(partitions)
    //yield为简化版,因为yield本身就是返回一个迭代器
    def filterOutFromPartion(list): # iterator = [] for elements in list: yield [x for x in elements if x !=2 ] # iterator.append([x for x in elements if x !=2 ]) # return iter(iterator)

    2.mapPartitionsWithIndex

    Similar to mapPartitions, but also provides a function with an int value to indicate the index position of the partition.

    和mapPartitions类似,但是提供了一个带有整形参数用来表明分区位置的方法

    parallel = sc.parallelize(range(1,10),4)
    //下面这个方法是传进去的方法对象,
    def show(index, iterator): yield 'index: '+str(index)+" values: "+ str(list(iterator))
    
    parallel.mapPartitionsWithIndex(show).collect()
    
    //一下为结果
    ['index: 0 values: [1, 2, 3]',
     'index: 1 values: [4, 5, 6]',
     'index: 2 values: [7, 8, 9]']

     3.sample 

    sample(withReplacement,faction,seed):抽样,withReplacement为true表示有放回;faction表示采样的比例;seed为随机种子

    parallel = sc.parallelize(range(1,10))
    //表示取50%的数据 种子随机
    parallel.sample(True,0.5).count()

     4.union

    union(ortherDataset):将两个RDD中的数据集进行合并,最终返回两个RDD的并集,若RDD中存在相同的元素也不会去重

    one = sc.parallelize(range(1,10))
    two = sc.parallelize(range(10,21))
    one.union(two).collect()
    //output
    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]

    5.intersection

    intersection(otherDataset):返回两个RDD的交集

    one = sc.parallelize(range(1,10))
    two = sc.parallelize(range(5,15))
    one.intersection(two).collect()
    //output
    [5, 6, 7, 8, 9]

    6.distinct

    distinct([numTasks]):对RDD中的元素进行去重

    >>> parallel = sc.parallelize(range(1,9))
    >>> par2 = sc.parallelize(range(5,15))
    >>> parallel.union(par2).distinct().collect()
    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]

     7.groupByKey算子

     // 但是,JavaPairRDD的第一个泛型类型不变,第二个泛型类型变成Iterable这种集合类型
     // 也就是说,按照了key进行分组,那么每个key可能都会有多个value,此时多个value聚合成了Iterable

    8.reducebykey  

    reduceByKey(func, [numTasks]) //通过key来进行reduce过程,key相同的值的集合进行reduce操作

    sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("a", 1),
    ... ("b", 1), ("b", 1), ("b", 1), ("b", 1)], 3)
    
    # Applying reduceByKey operation on x>>> y = x.reduceByKey(lambda accum, n: accum + n)
    >>> y.collect()
    [('b', 5), ('a', 3)]
    
    # Define associative function separately >>>def sumFunc(accum, n):
    ...     return accum + n
    ...
    >>> y = x.reduceByKey(sumFunc)
    >>> y.collect()
    [('b', 5), ('a', 3)]

     9.aggregatebykey

    aggregateByKey

    这个函数可用于完成对groupByKey,reduceByKey的相同的功能,用于对rdd中相同的key的值的聚合操作,主要用于返回一个指定的类型U的RDD的transform,在这个函数中,需要传入三个参数:

    参数1:用于在每个分区中,对key值第一次读取V类型的值时,使用的U类型的初始变量,

    参数2:用于在每个分区中,相同的key中V类型的值合并到参数1创建的U类型的变量中,

    参数3:用于对重新分区后两个分区中传入的U类型数据的合并的函数.

        //合并在不同partition中的值,a,b的数据类型为zeroValue的数据类型
        def comb(a: String, b: String): String = {
          println("comb: " + a + "	 " + b)
          a + b
        }
        //合并在同一个partition中的值, a的数据类型为zeroValue的数据类型,b的数据类型为原value的数据类型
        def seq(a: String, b: Int): String = {
          println("seq: " + a + "	 " + b)
          a + b
        }
    
        rdd.foreach(println)
        
        //zeroValue 中立值,定义返回value的类型,并参与运算
        //seqOp 用来在一个partition中合并值的
        //comb 用来在不同partition中合并值的
        val aggregateByKeyRDD: RDD[(Int, String)] = rdd.aggregateByKey("100")(seq,comb)

     10.sortByKey

    sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序

  • 相关阅读:
    横竖屏切换
    org.apache.harmony.xml.ExpatParser$ParseException: At line 1, column 0: unknown encoding
    @Value() 使用方法
    调用第三方超时处理
    spring 配置注解定时器quartz01
    tomcat:PermGen space
    06-图3 六度空间 (30分)
    06-图2 Saving James Bond
    06-图1 列出连通集 (25分)
    05-树9 Huffman Codes (30分)
  • 原文地址:https://www.cnblogs.com/zhangweilun/p/6530092.html
Copyright © 2011-2022 走看看