zoukankan      html  css  js  c++  java
  • SparkRDD函数详解

    1RDD操作详解

    启动spark-shell

    spark-shell --master spark://hdp-node-01:7077

    1.1 基本转换

    1) map

    map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

    举例:

    scala> val a = sc.parallelize(1 to 9, 3)

    scala> val b = a.map(x => x*2)

    scala> a.collect

    res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

    scala> b.collect

    res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

    上述例子中把原RDD中每个元素都乘以2来产生一个新的RDD。

    2) filter

    filter 是对RDD中的每个元素都执行一个指定的函数来过滤产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

    val rdd = sc.parallelize(List(1,2,3,4,5,6)) 

    val filterRdd = rdd.filter(_ > 5)

    filterRdd.collect() //返回所有大于5的数据的一个Array, Array(6,8,10,12)

    3) flatMap

    与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。 举例:对原RDD中的每个元素x产生y个元素(从1到y,y为元素x的值)

    scala> val a = sc.parallelize(1 to 4, 2)

    scala> val b = a.flatMap(x => 1 to x)

    scala> b.collect

    res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

    4) mapPartitions

    mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。 它的函数定义为:

    def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

    f即为输入函数,它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数f,f的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。

    举例:

    scala> val a = sc.parallelize(1 to 9, 3)

    scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {

      var res = List[(T, T)]()

      var pre = iter.next

    while (iter.hasNext) {

        val cur = iter.next

        res.::=(pre, cur)

          pre = cur  }

      res.iterator

    }

    scala> a.mapPartitions(myfunc).collect

    res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

    上述例子中的函数myfunc是把分区中一个元素和它的下一个元素组成一个Tuple。因为分区中最后一个元素没有下一个元素了,所以(3,4)和(6,7)不在结果中。 mapPartitions还有些变种,比如mapPartitionsWithContext,它能把处理过程中的一些状态信息传递给用户指定的输入函数。还有mapPartitionsWithIndex,它能把分区的index传递给用户指定的输入函数。

    5) mapPartitionsWithIndex

    def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]

    函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引。

    var rdd1 = sc.makeRDD(1 to 5,2)

    //rdd1有两个分区

    var rdd2 = rdd1.mapPartitionsWithIndex{

            (x,iter) => {

              var result = List[String]()

                var i = 0

                while(iter.hasNext){

                  i += iter.next()

                }

                result.::(x + "|" + i).iterator

              

            }

          }

    //rdd2将rdd1中每个分区的数字累加,并在每个分区的累加结果前面加了分区索引

    scala> rdd2.collect

    res13: Array[String] = Array(0|3, 1|12)

    6) mapWith

    mapWith是map的另外一个变种,map只需要一个输入函数,而mapWith有两个输入函数。它的定义如下:

    def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U]

    第一个函数constructA是把RDD的partition index(index从0开始)作为输入,输出为新类型A;

    第二个函数f是把二元组(T, A)作为输入(其中T为原RDD中的元素,A为第一个函数的输出),输出类型为U。

    举例:把partition index 乘以10加2,作为新的RDD的元素。

    val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3)

    x.mapWith(a => a * 10)((b, a) => (b,a + 2)).collect

    结果:

    (1,2)

    (2,2)

    (3,2)

    (4,12)

    (5,12)

    (6,12)

    (7,22)

    (8,22)

    (9,22)

    (10,22)

    7) flatMapWith

    flatMapWith与mapWith很类似,都是接收两个函数,一个函数把partitionIndex作为输入,输出是一个新类型A;另外一个函数是以二元组(T,A)作为输入,输出为一个序列,这些序列里面的元素组成了新的RDD。它的定义如下:

    def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U]): RDD[U]

    举例:

    scala> val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)

    scala> a.flatMapWith(x => x, true)((x, y) => List(y, x)).collect

    res58: Array[Int] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2,

    8, 2, 9)

    8) coalesce

    def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]

    该函数用于将RDD进行重分区,使用HashPartitioner。

    第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false;

    以下面的例子来看:

    scala> var data = sc.parallelize(1 to 12, 3)

    scala> data.collect

    scala> data.partitions.size

    scala> var rdd1 = data.coalesce(1)

    scala> rdd1.partitions.size

    scala> var rdd1 = data.coalesce(4)

    scala> rdd1.partitions.size

    res2: Int = 1   //如果重分区的数目大于原来的分区数,那么必须指定shuffle参数为true,//否则,分区数不便

    scala> var rdd1 = data.coalesce(4,true)

    scala> rdd1.partitions.size

    res3: Int = 4

    9) repartition

    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

    该函数其实就是coalesce函数第二个参数为true的实现

    scala> var data = sc.parallelize(1 to 12, 3)

    scala> data.collect

    scala> data.partitions.size

    scala> var rdd1 = data. repartition(1)

    scala> rdd1.partitions.size

    scala> var rdd1 = data. repartition(4)

    scala> rdd1.partitions.size

    res3: Int = 4

    10) randomSplit

    def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]

    该函数根据weights权重,将一个RDD切分成多个RDD。

    该权重参数为一个Double数组

    第二个参数为random的种子,基本可忽略。

    scala> var rdd = sc.makeRDD(1 to 12,12)

    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at makeRDD at :21

    scala> rdd.collect

    res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) 

    scala> var splitRDD = rdd.randomSplit(Array(0.5, 0.1, 0.2, 0.2))

    splitRDD: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[17] at randomSplit at :23,

    MapPartitionsRDD[18] at randomSplit at :23,

    MapPartitionsRDD[19] at randomSplit at :23,

    MapPartitionsRDD[20] at randomSplit at :23)

    //这里注意:randomSplit的结果是一个RDD数组

    scala> splitRDD.size

    res8: Int = 4

    //由于randomSplit的第一个参数weights中传入的值有4个,因此,就会切分成4个RDD,

    //把原来的rdd按照权重0.5, 0.1, 0.2, 0.2,随机划分到这4个RDD中,权重高的RDD,划分到//的几率就大一些。

    //注意,权重的总和加起来为1,否则会不正常

    scala> splitRDD(0).collect

    res10: Array[Int] = Array(1, 4)

    scala> splitRDD(1).collect

    res11: Array[Int] = Array(3)                                                   

    scala> splitRDD(2).collect

    res12: Array[Int] = Array(5, 9)

    scala> splitRDD(3).collect

    res13: Array[Int] = Array(2, 6, 7, 8, 10)

    11) glom

    def glom(): RDD[Array[T]]

    该函数是将RDD中每一个分区中类型为T的元素转换成Array[T],这样每一个分区就只有一个数组元素。

    scala> var rdd = sc.makeRDD(1 to 10,3)

    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at makeRDD at :21

    scala> rdd.partitions.size

    res33: Int = 3  //该RDD有3个分区

    scala> rdd.glom().collect

    res35: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))

    //glom将每个分区中的元素放到一个数组中,这样,结果就变成了3个数组

    12) union并集

    val rdd1 = sc.parallelize(List(5, 6, 4, 3))

    val rdd2 = sc.parallelize(List(1, 2, 3, 4))

    //求并集

    val rdd3 = rdd1.union(rdd2)

    rdd3.collect

    13) distinct

    去重

    val rdd1 = sc.parallelize(List(5, 6, 4, 3))

    val rdd2 = sc.parallelize(List(1, 2, 3, 4))

    //求并集

    val rdd3 = rdd1.union(rdd2)

    //去重输出

    rdd3.distinct.collect

    14) intersection交集

    val rdd1 = sc.parallelize(List(5, 6, 4, 3))

    val rdd2 = sc.parallelize(List(1, 2, 3, 4))

    //求交集

    val rdd4 = rdd1.intersection(rdd2)

    rdd4.collect

    15)subtract

    def subtract(other: RDD[T]): RDD[T]

    def subtract(other: RDD[T], numPartitions: Int): RDD[T]

    def subtract(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]

    该函数返回在RDD中出现,并且不在otherRDD中出现的元素,不去重。

    val rdd1 = sc.parallelize(List(5, 6, 6, 4, 3))

        val rdd2 = sc.parallelize(List(1, 2, 3, 4))

        //求差集

        val rdd4 = rdd1.subtract(rdd2)

    rdd4.collect

    16)subtractByKey

    def subtractByKey[W](other: RDD[(K, W)])(implicit arg0: ClassTag[W]): RDD[(K, V)]

    def subtractByKey[W](other: RDD[(K, W)], numPartitions: Int)(implicit arg0: ClassTag[W]): RDD[(K, V)]

    def subtractByKey[W](other: RDD[(K, W)], p: Partitioner)(implicit arg0: ClassTag[W]): RDD[(K, V)]

    subtractByKey和基本转换操作中的subtract类似,只不过这里是针对K的,返回在主RDD中出现,并且不在otherRDD中出现的元素。

    参数numPartitions用于指定结果的分区数

    参数partitioner用于指定分区函数

    var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)

    var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)

    scala> rdd1.subtractByKey(rdd2).collect

    res13: Array[(String, String)] = Array((B,2))

    17) groupbyKey

    val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))

        val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))

        //求并集

        val rdd4 = rdd1 union rdd2

        //按key进行分组

        val rdd5 = rdd4.groupByKey

    rdd5.collect

    18)reduceByKey

    顾名思义,reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行reduce,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。

    举例:

    val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))

        val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))

        //求并集

        val rdd4 = rdd1 union rdd2

        //按key进行分组

        val rdd6 = rdd4.reduceByKey(_ + _)

        rdd6.collect()

    19)sortByKey

    将List(("tom", 1), ("jerry", 3), ("kitty", 2),  ("shuke", 1))和List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5))做wordcount,并按名称排序

    val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))

        val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))

        val rdd3 = rdd1.union(rdd2)

        //按key进行聚合

        val rdd4 = rdd3.reduceByKey(_ + _)

        //false降序

        val rdd5 = rdd4.sortByKey(false)

    rdd5.collect

    20)sortBy

    将List(("tom", 1), ("jerry", 3), ("kitty", 2),  ("shuke", 1))和List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5))做wordcount,并按数值排序

    val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))

        val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))

        val rdd3 = rdd1.union(rdd2)

        //按key进行聚合

        val rdd4 = rdd3.reduceByKey(_ + _)

        //false降序

        val rdd5 = rdd4.sortBy(_._2, false)

        rdd5.collect

    21)zip

    def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]

    zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

    scala> var rdd1 = sc.makeRDD(1 to 5,2)

    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at :21

    scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)

    rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at makeRDD at :21

    scala> rdd1.zip(rdd2).collect

    res0: Array[(Int, String)] = Array((1,A), (2,B), (3,C), (4,D), (5,E))          

    scala> rdd2.zip(rdd1).collect

    res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (D,4), (E,5))

    scala> var rdd3 = sc.makeRDD(Seq("A","B","C","D","E"),3)

    rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at makeRDD at :21

    scala> rdd1.zip(rdd3).collect

    java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions

    //如果两个RDD分区数不同,则抛出异常

    22) zipPartitions

    zipPartitions函数将多个RDD按照partition组合成为新的RDD,该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求。

    该函数有好几种实现,可分为三类:

    参数是一个RDD

    def zipPartitions[B, V](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]

    def zipPartitions[B, V](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]

    这两个区别就是参数preservesPartitioning,是否保留父RDD的partitioner分区信息

    映射方法f参数为两个RDD的迭代器。

    scala> var rdd1 = sc.makeRDD(1 to 5,2)

    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at makeRDD at :21

    scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)

    rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at makeRDD at :21

    //rdd1两个分区中元素分布:

    scala> rdd1.mapPartitionsWithIndex{

         |         (x,iter) => {

         |           var result = List[String]()

         |             while(iter.hasNext){

         |               result ::= ("part_" + x + "|" + iter.next())

         |             }

         |             result.iterator

         |           

         |         }

         |       }.collect

    res17: Array[String] = Array(part_0|2, part_0|1, part_1|5, part_1|4, part_1|3)

    //rdd2两个分区中元素分布

    scala> rdd2.mapPartitionsWithIndex{

         |         (x,iter) => {

         |           var result = List[String]()

         |             while(iter.hasNext){

         |               result ::= ("part_" + x + "|" + iter.next())

         |             }

         |             result.iterator

         |           

         |         }

         |       }.collect

    res18: Array[String] = Array(part_0|B, part_0|A, part_1|E, part_1|D, part_1|C)

    //rdd1和rdd2做zipPartition

    scala> rdd1.zipPartitions(rdd2){

         |       (rdd1Iter,rdd2Iter) => {

         |         var result = List[String]()

         |         while(rdd1Iter.hasNext && rdd2Iter.hasNext) {

         |           result::=(rdd1Iter.next() + "_" + rdd2Iter.next())

         |         }

         |         result.iterator

         |       }

         |     }.collect

    res19: Array[String] = Array(2_B, 1_A, 5_E, 4_D, 3_C)

    参数是两个RDD

    def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]

    def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]

    用法同上面,只不过该函数参数为两个RDD,映射方法f输入参数为两个RDD的迭代器。

    scala> var rdd1 = sc.makeRDD(1 to 5,2)

    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at makeRDD at :21

    scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)

    rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at makeRDD at :21

    scala> var rdd3 = sc.makeRDD(Seq("a","b","c","d","e"),2)

    rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[29] at makeRDD at :21

    //rdd3中个分区元素分布

    scala> rdd3.mapPartitionsWithIndex{

         |         (x,iter) => {

         |           var result = List[String]()

         |             while(iter.hasNext){

         |               result ::= ("part_" + x + "|" + iter.next())

         |             }

         |             result.iterator

         |           

         |         }

         |       }.collect

    res21: Array[String] = Array(part_0|b, part_0|a, part_1|e, part_1|d, part_1|c)

    //三个RDD做zipPartitions

    scala> var rdd4 = rdd1.zipPartitions(rdd2,rdd3){

         |       (rdd1Iter,rdd2Iter,rdd3Iter) => {

         |         var result = List[String]()

         |         while(rdd1Iter.hasNext && rdd2Iter.hasNext && rdd3Iter.hasNext) {

         |           result::=(rdd1Iter.next() + "_" + rdd2Iter.next() + "_" + rdd3Iter.next())

         |         }

         |         result.iterator

         |       }

         |     }

    rdd4: org.apache.spark.rdd.RDD[String] = ZippedPartitionsRDD3[33] at zipPartitions at :27

    scala> rdd4.collect

    res23: Array[String] = Array(2_B_b, 1_A_a, 5_E_e, 4_D_d, 3_C_c)

    参数是三个RDD

    def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]

    def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]

    用法同上面,只不过这里又多了个一个RDD而已。

    23) zipWithIndex

    def zipWithIndex(): RDD[(T, Long)]

    该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。

    scala> var rdd2 = sc.makeRDD(Seq("A","B","R","D","F"),2)

    rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[34] at makeRDD at :21

    scala> rdd2.zipWithIndex().collect

    res27: Array[(String, Long)] = Array((A,0), (B,1), (R,2), (D,3), (F,4))

    24) zipWithUniqueId

    def zipWithUniqueId(): RDD[(T, Long)]

    该函数将RDD中元素和一个唯一ID组合成键/值对,该唯一ID生成算法如下:

    每个分区中第一个元素的唯一ID值为:该分区索引号,

    每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分区数)

    看下面的例子:

    scala> var rdd1 = sc.makeRDD(Seq("A","B","C","D","E","F"),2)

    rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[44] at makeRDD at :21

    //rdd1有两个分区,

    scala> rdd1.zipWithUniqueId().collect

    res32: Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5))

    //总分区数为2

    //第一个分区第一个元素ID为0,第二个分区第一个元素ID为1

    //第一个分区第二个元素ID为0+2=2,第一个分区第三个元素ID为2+2=4

    //第二个分区第二个元素ID为1+2=3,第二个分区第三个元素ID为3+2=5

    键值转换

    25) partitionBy

    def partitionBy(partitioner: Partitioner): RDD[(K, V)]

    该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区。

    scala> var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)

    rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[23] at makeRDD at :21

    scala> rdd1.partitions.size

    res20: Int = 2

    //查看rdd1中每个分区的元素

    scala> rdd1.mapPartitionsWithIndex{

         |         (partIdx,iter) => {

         |           var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()

         |             while(iter.hasNext){

         |               var part_name = "part_" + partIdx;

         |               var elem = iter.next()

         |               if(part_map.contains(part_name)) {

         |                 var elems = part_map(part_name)

         |                 elems ::= elem

         |                 part_map(part_name) = elems

         |               } else {

         |                 part_map(part_name) = List[(Int,String)]{elem}

         |               }

         |             }

         |             part_map.iterator

         |           

         |         }

         |       }.collect

    res22: Array[(String, List[(Int, String)])] = Array((part_0,List((2,B), (1,A))), (part_1,List((4,D), (3,C))))

    //(2,B),(1,A)在part_0中,(4,D),(3,C)在part_1中

    //使用partitionBy重分区

    scala> var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))

    rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[25] at partitionBy at :23

    scala> rdd2.partitions.size

    res23: Int = 2

    //查看rdd2中每个分区的元素

    scala> rdd2.mapPartitionsWithIndex{

         |         (partIdx,iter) => {

         |           var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()

         |             while(iter.hasNext){

         |               var part_name = "part_" + partIdx;

         |               var elem = iter.next()

         |               if(part_map.contains(part_name)) {

         |                 var elems = part_map(part_name)

         |                 elems ::= elem

         |                 part_map(part_name) = elems

         |               } else {

         |                 part_map(part_name) = List[(Int,String)]{elem}

         |               }

         |             }

         |             part_map.iterator

         |         }

         |       }.collect

    res24: Array[(String, List[(Int, String)])] = Array((part_0,List((4,D), (2,B))), (part_1,List((3,C), (1,A))))

    //(4,D),(2,B)在part_0中,(3,C),(1,A)在part_1中

    26)mapValues

    mapValues顾名思义就是输入函数应用于RDD中Kev-Value的Value,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。

    举例:

    scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)

    scala> val b = a.map(x => (x.length, x))

    scala> b.mapValues("x" + _ + "x").collect

    res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx),(3,xcatx), (7,xpantherx), (5,xeaglex))

    27) flatMapValues

    flatMapValues类似于mapValues,不同的在于flatMapValues应用于元素为KV对的RDD中Value。每个一元素的Value被输入函数映射为一系列的值,然后这些值再与原RDD中的Key组成一系列新的KV对。

    举例

    val a = sc.parallelize(List((1, 2), (3, 4), (5, 6)))

        val b = a.flatMapValues(x => 1.to(x))

        b.collect.foreach(println)

    28) combineByKey

    def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

    def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

    def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

    该函数用于将RDD[K,V]转换成RDD[K,C],这里的V类型和C类型可以相同也可以不同。

    其中的参数:

    createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C ,分区内相同的key做一次

    mergeValue:合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C,分区内相同的key循环做

    mergeCombiners:分区合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C,分区之间循环做

    numPartitions:结果RDD分区数,默认保持原有的分区数

    partitioner:分区函数,默认为HashPartitioner

    mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true

    看下面例子:

    scala> var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))

    rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[64] at makeRDD at :21

    scala> rdd1.combineByKey(

         |       (v : Int) => v + "_",  

         |       (c : String, v : Int) => c + "@" + v, 

         |       (c1 : String, c2 : String) => c1 + "$" + c2

         |     ).collect

    res60: Array[(String, String)] = Array((A,2_$1_), (B,1_$2_), (C,1_))

    其中三个映射函数分别为:

    createCombiner: (V) => C

    (v : Int) => v + “_” //在每一个V值后面加上字符_,返回C类型(String)

    mergeValue: (C, V) => C

    (c : String, v : Int) => c + “@” + v //合并C类型和V类型,中间加字符@,返回C(String)

    mergeCombiners: (C, C) => C

    (c1 : String, c2 : String) => c1 + “$” + c2 //合并C类型和C类型,中间加$,返回C(String)

    其他参数为默认值。

    最终,将RDD[String,Int]转换为RDD[String,String]。

    再看例子:

    rdd1.combineByKey(

          (v : Int) => List(v),

          (c : List[Int], v : Int) => v :: c,

          (c1 : List[Int], c2 : List[Int]) => c1 ::: c2

    ).collect

    res65: Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))

    最终将RDD[String,Int]转换为RDD[String,List[Int]]。

    29)foldByKey

    def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

    def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

    def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

    该函数用于RDD[K,V]根据K将V做折叠、合并处理,其中的参数zeroValue表示先根据映射函数将zeroValue应用于V,进行初始化V,再将映射函数应用于初始化后的V.

    例子:

    scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))

    scala> rdd1.foldByKey(0)(_+_).collect

    res75: Array[(String, Int)] = Array((A,2), (B,3), (C,1))

    //将rdd1中每个key对应的V进行累加,注意zeroValue=0,需要先初始化V,映射函数为+操

    //作,比如("A",0), ("A",2),先将zeroValue应用于每个V,得到:("A",0+0), ("A",2+0),即:

    //("A",0), ("A",2),再将映射函数应用于初始化后的V,最后得到(A,0+2),即(A,2)

    再看:

    scala> rdd1.foldByKey(2)(_+_).collect

    res76: Array[(String, Int)] = Array((A,6), (B,7), (C,3))

    //先将zeroValue=2应用于每个V,得到:("A",0+2), ("A",2+2),即:("A",2), ("A",4),再将映射函

    //数应用于初始化后的V,最后得到:(A,2+4),即:(A,6)

    再看乘法操作:

    scala> rdd1.foldByKey(0)(_*_).collect

    res77: Array[(String, Int)] = Array((A,0), (B,0), (C,0))

    //先将zeroValue=0应用于每个V,注意,这次映射函数为乘法,得到:("A",0*0), ("A",2*0),

    //即:("A",0), ("A",0),再将映射函//数应用于初始化后的V,最后得到:(A,0*0),即:(A,0)

    //其他K也一样,最终都得到了V=0

    scala> rdd1.foldByKey(1)(_*_).collect

    res78: Array[(String, Int)] = Array((A,0), (B,2), (C,1))

    //映射函数为乘法时,需要将zeroValue设为1,才能得到我们想要的结果。

    在使用foldByKey算子时候,要特别注意映射函数及zeroValue的取值。

    30) reduceByKeyLocally

    def reduceByKeyLocally(func: (V, V) => V): Map[K, V]

    该函数将RDD[K,V]中每个K对应的V值根据映射函数来运算,运算结果映射到一个Map[K,V]中,而不是RDD[K,V]。

    scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))

    rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[91] at makeRDD at :21

    scala> rdd1.reduceByKeyLocally((x,y) => x + y)

    res90: scala.collection.Map[String,Int] = Map(B -> 3, A -> 2, C -> 1)

    31)cogroupgroupByKey的区别

             val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))

        val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))

        //cogroup

        val rdd3 = rdd1.cogroup(rdd2)

        //groupbykey

        val rdd4 = rdd1.union(rdd2).groupByKey

        //注意cogroup与groupByKey的区别

        rdd3.foreach(println)

        rdd4.foreach(println)

    32) join

    val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))

    val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))

    //求jion

    val rdd3 = rdd1.join(rdd2)

    rdd3.collect

    33)leftOuterJoin

    def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

    def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]

    def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]

    leftOuterJoin类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。

    参数numPartitions用于指定结果的分区数

    参数partitioner用于指定分区函数

    var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)

    var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)

    scala> rdd1.leftOuterJoin(rdd2).collect

    res11: Array[(String, (String, Option[String]))] = Array((B,(2,None)), (A,(1,Some(a))), (C,(3,Some(c))))

    34)  rightOuterJoin

    def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]

    def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]

    def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]

    rightOuterJoin类似于SQL中的有外关联right outer join,返回结果以参数中的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。

    参数numPartitions用于指定结果的分区数

    参数partitioner用于指定分区函数

    var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)

    var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)

    scala> rdd1.rightOuterJoin(rdd2).collect

    res12: Array[(String, (Option[String], String))] = Array((D,(None,d)), (A,(Some(1),a)), (C,(Some(3),c)))

    Action操作

    35)first

    def first(): T

    first返回RDD中的第一个元素,不排序。

    scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)

    rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at makeRDD at :21

    scala> rdd1.first

    res14: (String, String) = (A,1)

    scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))

    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :21

    scala> rdd1.first

    res8: Int = 10

    36) count

    def count(): Long

    count返回RDD中的元素数量。

    scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)

    rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at makeRDD at :21

    scala> rdd1.count

    res15: Long = 3

    37) reduce

    def reduce(f: (T, T) ⇒ T): T

    根据映射函数f,对RDD中的元素进行二元计算,返回计算结果。

    scala> var rdd1 = sc.makeRDD(1 to 10,2)

    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21

    scala> rdd1.reduce(_ + _)

    res18: Int = 55

    scala> var rdd2 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))

    rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at makeRDD at :21

    scala> rdd2.reduce((x,y) => {

         |       (x._1 + y._1,x._2 + y._2)

         |     })

    res21: (String, Int) = (CBBAA,6)

    collect

    def collect(): Array[T]

    collect用于将一个RDD转换成数组。

    scala> var rdd1 = sc.makeRDD(1 to 10,2)

    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21

    scala> rdd1.collect

    res23: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

    38) take 

    def take(num: Int): Array[T]

    take用于获取RDD中从0到num-1下标的元素,不排序。

    scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))

    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21

    scala> rdd1.take(1)

    res0: Array[Int] = Array(10)                                                    

    scala> rdd1.take(2)

    res1: Array[Int] = Array(10, 4)

    39)top

    def top(num: Int)(implicit ord: Ordering[T]): Array[T]

    top函数用于从RDD中,按照默认(降序)或者指定的排序规则,返回前num个元素。

    scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))

    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21

    scala> rdd1.top(1)

    res2: Array[Int] = Array(12)

    scala> rdd1.top(2)

    res3: Array[Int] = Array(12, 10)

    //指定排序规则

    scala> implicit val myOrd = implicitly[Ordering[Int]].reverse

    myOrd: scala.math.Ordering[Int] = scala.math.Ordering$$anon$4@767499ef

    scala> rdd1.top(1)

    res4: Array[Int] = Array(2)

    scala> rdd1.top(2)

    res5: Array[Int] = Array(2, 3)

    40)takeOrdered

    def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

    takeOrdered和top类似,只不过以和top相反的顺序返回元素。

    scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))

    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21

    scala> rdd1.top(1)

    res4: Array[Int] = Array(12)

    scala> rdd1.top(2)

    res5: Array[Int] = Array(12, 10)

    scala> rdd1.takeOrdered(1)

    res6: Array[Int] = Array(2)

    scala> rdd1.takeOrdered(2)

    res7: Array[Int] = Array(2, 3)

    41) aggregate

    def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U

    aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U。

    var rdd1 = sc.makeRDD(1 to 10,2)

    rdd1.mapPartitionsWithIndex{

            (partIdx,iter) => {

              var part_map = scala.collection.mutable.Map[String,List[Int]]()

                while(iter.hasNext){

                  var part_name = "part_" + partIdx;

                  var elem = iter.next()

                  if(part_map.contains(part_name)) {

                    var elems = part_map(part_name)

                    elems ::= elem

                    part_map(part_name) = elems

                  } else {

                    part_map(part_name) = List[Int]{elem}

                  }

                }

                part_map.iterator

              

            }

          }.collect

    res16: Array[(String, List[Int])] = Array((part_0,List(5, 4, 3, 2, 1)), (part_1,List(10, 9, 8, 7, 6)))

    ##第一个分区中包含5,4,3,2,1

    ##第二个分区中包含10,9,8,7,6

    scala> rdd1.aggregate(1)(

         |           {(x : Int,y : Int) => x + y},

         |           {(a : Int,b : Int) => a + b}

         |     )

    res17: Int = 58

    结果为什么是58,看下面的计算过程:

    ##先在每个分区中迭代执行 (x : Int,y : Int) => x + y 并且使用zeroValue的值1

    ##即:part_0中 zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16

    ## part_1中 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41

    ##再将两个分区的结果合并(a : Int,b : Int) => a + b ,并且使用zeroValue的值1

    ##即:zeroValue+part_0+part_1 = 1 + 16 + 41 = 58

    再比如:

    scala> rdd1.aggregate(2)(

         |           {(x : Int,y : Int) => x + y},

         |           {(a : Int,b : Int) => a * b}

         |     )

    res18: Int = 1428

    ##这次zeroValue=2

    ##part_0中 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17

    ##part_1中 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42

    ##最后:zeroValue*part_0*part_1 = 2 * 17 * 42 = 1428

    因此,zeroValue即确定了U的类型,也会对结果产生至关重要的影响,使用时候要特别注意。

    42) fold

    def fold(zeroValue: T)(op: (T, T) ⇒ T): T

    fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。

    var rdd1 = sc.makeRDD(1 to 10, 2)

    scala> rdd1.fold(1)(

         |       (x,y) => x + y   

         |     )

    res19: Int = 58

    ##结果同上面使用aggregate的第一个例子一样,即:

    scala> rdd1.aggregate(1)(

         |           {(x,y) => x + y},

         |           {(a,b) => a + b}

         |     )

    res20: Int = 58

    43) lookup

    def lookup(key: K): Seq[V]

    lookup用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值。

    scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))

    rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at :21

    scala> rdd1.lookup("A")

    res0: Seq[Int] = WrappedArray(0, 2)

    scala> rdd1.lookup("B")

    res1: Seq[Int] = WrappedArray(1, 2)

    44) countByKey

    def countByKey(): Map[K, Long]

    countByKey用于统计RDD[K,V]中每个K的数量。

    scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("B",3)))

    rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at :21

    scala> rdd1.countByKey

    res5: scala.collection.Map[String,Long] = Map(A -> 2, B -> 3)

    45) foreach

    def foreach(f: (T) ⇒ Unit): Unit

    foreach用于遍历RDD,将函数f应用于每一个元素。

    但要注意,如果对RDD执行foreach,只会在Executor端有效,而并不是Driver端。

    比如:rdd.foreach(println),只会在Executor的stdout中打印出来,Driver端是看不到的。

    我在Spark1.4中是这样,不知道是否真如此。

    这时候,使用accumulator共享变量与foreach结合,倒是个不错的选择。

    scala> var cnt = sc.accumulator(0)

    cnt: org.apache.spark.Accumulator[Int] = 0

    scala> var rdd1 = sc.makeRDD(1 to 10,2)

    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21

    scala> rdd1.foreach(x => cnt += x)

    scala> cnt.value

    res51: Int = 55

    scala> rdd1.collect.foreach(println)

    46)  foreachPartition

    def foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit

    foreachPartition和foreach类似,只不过是对每一个分区使用f。

    scala> var rdd1 = sc.makeRDD(1 to 10,2)

    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21

    scala> var allsize = sc.accumulator(0)

    size: org.apache.spark.Accumulator[Int] = 0

    scala>     rdd1.foreachPartition { x => {

         |       allsize += x.size

         |     }}

    scala> println(allsize.value)

    10

    47)sortBy

    def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

    sortBy根据给定的排序k函数将RDD中的元素进行排序。

    scala> var rdd1 = sc.makeRDD(Seq(3,6,7,1,2,0),2)

    scala> rdd1.sortBy(x => x).collect

    res1: Array[Int] = Array(0, 1, 2, 3, 6, 7) //默认升序

    scala> rdd1.sortBy(x => x,false).collect

    res2: Array[Int] = Array(7, 6, 3, 2, 1, 0)  //降序

    //RDD[K,V]类型

    scala>var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))

    scala> rdd1.sortBy(x => x).collect

    res3: Array[(String, Int)] = Array((A,1), (A,2), (B,3), (B,6), (B,7))

    //按照V进行降序排序

    scala> rdd1.sortBy(x => x._2,false).collect

    res4: Array[(String, Int)] = Array((B,7), (B,6), (B,3), (A,2), (A,1))

    48) saveAsTextFile

    def saveAsTextFile(path: String): Unit

    def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

    saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中。

    codec参数可以指定压缩的类名。

    var rdd1 = sc.makeRDD(1 to 10,2)

    scala> rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/") //保存到HDFS

    hadoop fs -ls /tmp/lxw1234.com

    Found 2 items

    -rw-r--r--   2 lxw1234 supergroup        0 2015-07-10 09:15 /tmp/lxw1234.com/_SUCCESS

    -rw-r--r--   2 lxw1234 supergroup        21 2015-07-10 09:15 /tmp/lxw1234.com/part-00000

    hadoop fs -cat /tmp/lxw1234.com/part-00000

    注意:如果使用rdd1.saveAsTextFile(“file:///tmp/lxw1234.com”)将文件保存到本地文件系统,那么只会保存在Executor所在机器的本地目录。

    //指定压缩格式保存

    rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/",classOf[com.hadoop.compression.lzo.LzopCodec])

    hadoop fs -ls /tmp/lxw1234.com

    -rw-r--r--   2 lxw1234 supergroup    0 2015-07-10 09:20 /tmp/lxw1234.com/_SUCCESS

    -rw-r--r--   2 lxw1234 supergroup    71 2015-07-10 09:20 /tmp/lxw1234.com/part-00000.lzo

    hadoop fs -text /tmp/lxw1234.com/part-00000.lzo

    49)  saveAsSequenceFile

    saveAsSequenceFile用于将RDD以SequenceFile的文件格式保存到HDFS上。

    用法同saveAsTextFile。

    50) saveAsObjectFile

    def saveAsObjectFile(path: String): Unit

    saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。

    对于HDFS,默认采用SequenceFile保存。

    var rdd1 = sc.makeRDD(1 to 10,2)

    rdd1.saveAsObjectFile("hdfs://cdh5/tmp/lxw1234.com/")

    hadoop fs -cat /tmp/lxw1234.com/part-00000

    SEQ !org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableT

    51)saveAsHadoopFile

    def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], codec: Class[_ <: CompressionCodec]): Unit

    def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = …, codec: Option[Class[_ <: CompressionCodec]] = None): Unit

    saveAsHadoopFile是将RDD存储在HDFS上的文件中,支持老版本Hadoop API。

    可以指定outputKeyClass、outputValueClass以及压缩格式。

    每个分区输出一个文件。

    var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))

    import org.apache.hadoop.mapred.TextOutputFormat

    import org.apache.hadoop.io.Text

    import org.apache.hadoop.io.IntWritable

    rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])

    rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]],

                          classOf[com.hadoop.compression.lzo.LzopCodec])

    52) saveAsHadoopDataset

    def saveAsHadoopDataset(conf: JobConf): Unit

    saveAsHadoopDataset用于将RDD保存到除了HDFS的其他存储中,比如HBase。

    在JobConf中,通常需要关注或者设置五个参数:

    文件的保存路径、key值的class类型、value值的class类型、RDD的输出格式(OutputFormat)、以及压缩相关的参数。

    ##使用saveAsHadoopDataset将RDD保存到HDFS中

    import org.apache.spark.SparkConf

    import org.apache.spark.SparkContext

    import SparkContext._

    import org.apache.hadoop.mapred.TextOutputFormat

    import org.apache.hadoop.io.Text

    import org.apache.hadoop.io.IntWritable

    import org.apache.hadoop.mapred.JobConf

    var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))

    var jobConf = new JobConf()

    jobConf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]])

    jobConf.setOutputKeyClass(classOf[Text])

    jobConf.setOutputValueClass(classOf[IntWritable])

    jobConf.set("mapred.output.dir","/tmp/lxw1234/")

    rdd1.saveAsHadoopDataset(jobConf)

    结果:

    hadoop fs -cat /tmp/lxw1234/part-00000

    A       2

    A       1

    hadoop fs -cat /tmp/lxw1234/part-00001

    B       6

    B       3

    B       7

    ##保存数据到HBASE

    HBase建表:

    create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}

    import org.apache.spark.SparkConf

    import org.apache.spark.SparkContext

    import SparkContext._

    import org.apache.hadoop.mapred.TextOutputFormat

    import org.apache.hadoop.io.Text

    import org.apache.hadoop.io.IntWritable

    import org.apache.hadoop.mapred.JobConf

    import org.apache.hadoop.hbase.HBaseConfiguration

    import org.apache.hadoop.hbase.mapred.TableOutputFormat

    import org.apache.hadoop.hbase.client.Put

    import org.apache.hadoop.hbase.util.Bytes

    import org.apache.hadoop.hbase.io.ImmutableBytesWritable

    var conf = HBaseConfiguration.create()

        var jobConf = new JobConf(conf)

        jobConf.set("hbase.zookeeper.quorum","zkNode1,zkNode2,zkNode3")

        jobConf.set("zookeeper.znode.parent","/hbase")

        jobConf.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")

        jobConf.setOutputFormat(classOf[TableOutputFormat])

       

        var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))

        rdd1.map(x =>

          {

            var put = new Put(Bytes.toBytes(x._1))

            put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))

            (new ImmutableBytesWritable,put)

          }

        ).saveAsHadoopDataset(jobConf)

    ##结果:

    hbase(main):005:0> scan 'lxw1234'

    ROW     COLUMN+CELL                                                                                               

     A       column=f1:c1, timestamp=1436504941187, value=x00x00x00x02                                             

     B       column=f1:c1, timestamp=1436504941187, value=x00x00x00x06                                             

     C       column=f1:c1, timestamp=1436504941187, value=x00x00x00x07                                             

    3 row(s) in 0.0550 seconds

    注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。

    53)saveAsNewAPIHadoopFile 

    def saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit

    def saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration): Unit

    saveAsNewAPIHadoopFile用于将RDD数据保存到HDFS上,使用新版本Hadoop API。

    用法基本同saveAsHadoopFile。

    import org.apache.spark.SparkConf

    import org.apache.spark.SparkContext

    import SparkContext._

    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

    import org.apache.hadoop.io.Text

    import org.apache.hadoop.io.IntWritable

    var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))

    rdd1.saveAsNewAPIHadoopFile("/tmp/lxw1234/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])

    54) saveAsNewAPIHadoopDataset

    def saveAsNewAPIHadoopDataset(conf: Configuration): Unit

    作用同saveAsHadoopDataset,只不过采用新版本Hadoop API。

    以写入HBase为例:

    HBase建表:

    create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}

    完整的Spark应用程序:

    package com.lxw1234.test

    import org.apache.spark.SparkConf

    import org.apache.spark.SparkContext

    import SparkContext._

    import org.apache.hadoop.hbase.HBaseConfiguration

    import org.apache.hadoop.mapreduce.Job

    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat

    import org.apache.hadoop.hbase.io.ImmutableBytesWritable

    import org.apache.hadoop.hbase.client.Result

    import org.apache.hadoop.hbase.util.Bytes

    import org.apache.hadoop.hbase.client.Put

    object Test {

      def main(args : Array[String]) {

       val sparkConf = new SparkConf().setMaster("spark://lxw1234.com:7077").setAppName("lxw1234.com")

       val sc = new SparkContext(sparkConf);

       var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))

      

        sc.hadoopConfiguration.set("hbase.zookeeper.quorum ","zkNode1,zkNode2,zkNode3")

        sc.hadoopConfiguration.set("zookeeper.znode.parent","/hbase")

        sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")

        var job = new Job(sc.hadoopConfiguration)

        job.setOutputKeyClass(classOf[ImmutableBytesWritable])

        job.setOutputValueClass(classOf[Result])

        job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

       

        rdd1.map(

          x => {

            var put = new Put(Bytes.toBytes(x._1))

            put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))

            (new ImmutableBytesWritable,put)

          }   

        ).saveAsNewAPIHadoopDataset(job.getConfiguration)

       

        sc.stop()  

      }

    }

    注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。

  • 相关阅读:
    docker OCI runtime
    docker 非root用户修改mount到容器的文件出现“Operation not permitted
    清除canvas画布内容--点擦除+线擦除
    js实现存储对象的数据结构hashTable和list
    学习ES6的全部特性
    页面重绘与重排版的性能影响
    测试css3的动画效果在display:none的时候不耗费性能
    stroke和fill顺序对绘图的影响
    css样式学习小知识
    统一事件监听
  • 原文地址:https://www.cnblogs.com/jifengblog/p/9369258.html
Copyright © 2011-2022 走看看