zoukankan      html  css  js  c++  java
  • Spark 算子

    RDD算子分类,大致可以分为两类,即:

    1. Transformation:转换算子,这类转换并不触发提交作业,完成作业中间过程处理。

    2. Action:行动算子,这类算子会触发SparkContext提交Job作业。

    一:Transformation:转换算子

    1.map 
    map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

    举例:

    scala> val a = sc.parallelize(1 to 9, 3)
    scala> val b = a.map(x => x*2)
    scala> a.collect
    res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
    scala> b.collect
    res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    上述例子中把原RDD中每个元素都乘以2来产生一个新的RDD。

    2.mapPartitions 
    mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。 
    它的函数定义为:

    def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] 
    f即为输入函数,它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数f,f的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。

    举例:

    scala> val a = sc.parallelize(1 to 9, 3)
    a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at :27
    
    scala> a.collect
    res11: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)                            
    
    scala> var c = a.mapPartitions( a=>a.filter(_>=7) )
    c: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[11] at mapPartitions at :29
    
    scala> c.collect
    res12: Array[Int] = Array(7, 8, 9) 
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    上述例子是通过函数filter对分区中所有数据进行过滤 
    mapPartitions还有些变种,比如mapPartitionsWithContext,它能把处理过程中的一些状态信息传递给用户指定的输入函数。还有mapPartitionsWithIndex,它能把分区的index传递给用户指定的输入函数。

    3.mapValues 
    mapValues顾名思义就是输入函数应用于RDD中Kev-Value的Value,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。

    举例:

    scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
    scala> val b = a.map(x => (x.length, x))
    scala> b.mapValues("x" + _ + "x").collect
    res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx),(3,xcatx), (7,xpantherx), (5,xeaglex))
    • 1
    • 2
    • 3
    • 4

    4.mapWith 
    mapWith是map的另外一个变种,map只需要一个输入函数,而mapWith有两个输入函数。它的定义如下:

    def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U] 
    第一个函数constructA是把RDD的partition index(index从0开始)作为输入,输出为新类型A; 
    第二个函数f是把二元组(T, A)作为输入(其中T为原RDD中的元素,A为第一个函数的输出),输出类型为U。 
    举例:把partition index 乘以10,然后加上2作为新的RDD的元素。

    val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3) 
    x.mapWith(a => a * 10)((a, b) => (b + 2)).collect 
    res4: Array[Int] = Array(2, 2, 2, 12, 12, 12, 22, 22, 22, 22)
    • 1
    • 2
    • 3

    5.flatMap 
    与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。 
    举例:对原RDD中的每个元素x产生y个元素(从1到y,y为元素x的值)

    scala> val a = sc.parallelize(1 to 4, 2)
    scala> val b = a.flatMap(x => 1 to x)
    scala> b.collect
    res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)
    • 1
    • 2
    • 3
    • 4

    6.flatMapWith 
    flatMapWith与mapWith很类似,都是接收两个函数,一个函数把partitionIndex作为输入,输出是一个新类型A;另外一个函数是以二元组(T,A)作为输入,输出为一个序列,这些序列里面的元素组成了新的RDD。它的定义如下:

    def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U]): RDD[U] 
    举例:

    scala> val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
    scala> a.flatMapWith(x => x, true)((x, y) => List(y, x)).collect
    res58: Array[Int] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2,
    8, 2, 9)
    • 1
    • 2
    • 3
    • 4

    7.flatMapValues 
    flatMapValues类似于mapValues,不同的在于flatMapValues应用于元素为KV对的RDD中Value。每个一元素的Value被输入函数映射为一系列的值,然后这些值再与原RDD中的Key组成一系列新的KV对。

    举例

    scala> val a = sc.parallelize(List((1,2),(3,4),(3,6)))
    scala> val b = a.flatMapValues(x=>x.to(5))
    scala> b.collect
    res3: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (3,4), (3,5))
    • 1
    • 2
    • 3
    • 4

    上述例子中原RDD中每个元素的值被转换为一个序列(从其当前值到5),比如第一个KV对(1,2), 其值2被转换为2,3,4,5。然后其再与原KV对中Key组成一系列新的KV对(1,2),(1,3),(1,4),(1,5)。

    8.reduce 
    reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。

    举例

    scala> val c = sc.parallelize(1 to 10)
    scala> c.reduce((x, y) => x + y)
    res4: Int = 55
    • 1
    • 2
    • 3

    上述例子对RDD中的元素求和。

    9.reduceByKey 
    顾名思义,reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行reduce,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。

    举例:

    scala> val a = sc.parallelize(List((1,2),(3,4),(3,6)))
    scala> a.reduceByKey((x,y) => x + y).collect
    res7: Array[(Int, Int)] = Array((1,2), (3,10))
    • 1
    • 2
    • 3

    上述例子中,对Key相同的元素的值求和,因此Key为3的两个元素被转为了(3,10)。

    10.cartesian:

    对两个RDD内的所有元素进行笛卡尔积操作(耗内存),内部实现返回CartesianRDD。

    scala> val a = sc.parallelize(List(1,2,3))
    
    scala> val b = sc.parallelize(List(4,5,6))
    
    scala> val c = a.cartesian(b)
    
    scala> c.collect
    res15: Array[(Int, Int)] = Array((1,4), (1,5), (1,6), (2,4), (3,4), (2,5), (2,6), (3,5), (3,6))
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    11 Sample: 
    sample将RDD这个集合内的元素进行采样,获取所有元素的子集。用户可以设定是否有有放回的抽样,百分比,随机种子,进而决定采样方式。

    内部实现: SampledRDD(withReplacement,fraction,seed)。 
    函数参数设置: 
    withReplacement=true,表示有放回的抽样。 
    withReplacement=false,表示无放回的抽样。

    根据fraction指定的比例,对数据进行采样,可以选择是否用随机数进行替换,seed用于指定随机数生成器种子。

    随机函数产生的是一种伪随机数,它实际是一种序列发生器,有固定的算法,只有当种子不同时,序列才不同,所以不应该把种子固定在程序中,应该用随机产生的数做种子,如程序运行时的时间等。 
    以c++为例,应先用srand()设置不同种子,否则每次调用rand()得到的值是一样的。

    scala> val a = sc.parallelize(1 to 100,3)
    
    scala> a.sample(false,0.1,0).count
    res16: Long = 12
    
    scala> a.sample(false,0.1,0).collect
    res17: Array[Int] = Array(10, 47, 55, 73, 76, 84, 87, 88, 91, 92, 95, 98)
    
    scala> a.sample(true,0.7,scala.util.Random.nextInt(10000)).count
    res19: Long = 75
    
    scala> a.sample(true,0.7,scala.util.Random.nextInt(10000)).collect
    res20: Array[Int] = Array(1, 3, 3, 3, 5, 6, 9, 9, 9, 9, 10, 10, 15, 17, 20, 23, 23, 27, 28, 31, 32, 32, 34, 35, 36, 36, 36, 36, 38, 39, 41, 42, 42, 43, 45, 47, 49, 49, 50, 50, 51, 51, 54, 55, 55, 57, 57, 57, 57, 57, 59, 59, 61, 61, 63, 67, 72, 74, 76, 76, 80, 80, 81, 81, 81, 82, 83, 85, 87, 88, 90, 93, 95, 96, 97, 97, 99, 100)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    12 union:

    使用union函数时需要保证两个RDD元素的数据类型相同,返回的RDD数据类型和被合并的RDD元素数据类型相同。并不进行去重操作,保存所有的元素,如果想去重,可以使用distinct()。同时,spark还提供更为简洁的使用union的API,即通过++符号相当于union函数操作。

    eg: a 与 b 的联合

    scala> val a = sc.parallelize(List(("A",1),("B",2),("c",3),("A",4),("C",5) ))
    
    scala> val b = sc.parallelize(List(("A",5),("B",6),("A",4),("C",9) ))
    
    scala> a.union(b).collect
    res22: Array[(String, Int)] = Array((A,1), (B,2), (c,3), (A,4), (C,5), (A,5), (B,6), (A,4), (C,9))
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    去重复:

    scala> val d = sc.parallelize(List(("A",5),("B",6),("A",5) ))
    
    scala> d.distinct.collect
    res25: Array[(String, Int)] = Array((B,6), (A,5))
    • 1
    • 2
    • 3
    • 4

    13 groupByKey:

    将元素通过函数生成相应的Key,数据就转化为Key-Value格式,之后将Key相同的元素分为一组。

    eg:根据数据集中的每个元素的K值对数据分组

    scala> val a = sc.parallelize(List(("A",1),("B",2),("c",3),("A",4),("C",5) ))
    
    scala> a.groupByKey().collect
    res21: Array[(String, Iterable[Int])] = Array((B,CompactBuffer(2)), (A,CompactBuffer(1, 4)), (C,CompactBuffer(5)), (c,CompactBuffer(3)))
    • 1
    • 2
    • 3
    • 4

    14 join:

    join对两个需要连接的RDD进行cogroup函数操作,将相同key的数据能偶放到一个分区,在cgroup操作之后形成新RDD对每个key下的元素进行笛卡尔积的操作,返回的结果在展平,对应key下的所有元组形成一个集合。最后返回 RDD[(K, (V, W))]。

    eg:a与b两个数据连接,相当于表的关联

    scala> val a = sc.parallelize(List(("A",1),("B",2),("c",3),("A",4),("C",5) ))
    
    scala> val b = sc.parallelize(List(("A",5),("B",6),("A",4),("C",9) ))
    
    scala> a.join(b).collect
    res23: Array[(String, (Int, Int))] = Array((B,(2,6)), (A,(1,5)), (A,(1,4)), (A,(4,5)), (A,(4,4)), (C,(5,9)))
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    15 cache:

    cache将RDD元素从磁盘缓存到内存。相当于 persist(MEMORY_ONLY) 函数的 
    功能。

    16 persist:

    persist函数对RDD进行缓存操作,数据缓存在哪里,由StorageLevel这个枚举类型进行确定。DISK 代表磁盘,MEMORY 代表内存, SER 代表数据是否进行序列化存储。

    函数定义: persist(newLevel:StorageLevel) 
    StorageLevel 是枚举类型,代表存储模式。

    MEMORY_AND_DISK_SER 代表数据可以存储在内存和磁盘,并且以序列化的方式存储,其他同理。


    二:Action:行动算子

    1.foreach: 
    foreach对RDD中的每个元素都应用f函数操作,不返回 RDD 和 Array, 而是返回Uint。

    2.saveAsTextFile:

    函数将数据输出,存储到 HDFS 的指定目录。 
    函数的内 部实现,其内部通过调用 saveAsHadoopFile 进行实现:

    this.map(x => (NullWritable.get(), new Text(x.toString))) .saveAsHadoopFileTextOutputFormat[NullWritable, Text]

    将 RDD 中的每个元素映射转变为 (null, x.toString),然后再将其写入 HDFS。

    3 collect:

    collect相当于toArray,不过已经过时不推荐使用,collect将分布式的RDD返回为一个单机的scala Array数据,在这个数组上运用 scala 的函数式操作。

    4.count:

    count返回整个RDD的元素个数。

    scala> val a = sc.parallelize(1 to 10 )
    
    scala> a.collect
    res9: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)                         
    
    scala> a.count
    res10: Long = 10
  • 相关阅读:
    城市的划入划出效果
    文本溢出省略解决笔记css
    长串英文数字强制折行解决办法css
    Poj 2352 Star
    树状数组(Binary Indexed Trees,二分索引树)
    二叉树的层次遍历
    Uva 107 The Cat in the Hat
    Uva 10336 Rank the Languages
    Uva 536 Tree Recovery
    Uva10701 Pre, in and post
  • 原文地址:https://www.cnblogs.com/csguo/p/7803651.html
Copyright © 2011-2022 走看看