zoukankan      html  css  js  c++  java
  • Spark-RDD操作(26个常用函数附实例)

    (1)进入spark

    ./bin/spark-shell
    

     (2)创建RDD

    val rdd=sc.parallelize(Array(1,2,3,4,5,6,8))
    

     或者

    val rdd1=sc.makeRDD(Array(1,2,3,4,5,6,8))
    

     (3)map实例

    1. 作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

    2. 需求:创建一个1-10数组的RDD,将所有元素*2形成新的RDD

    var source =sc.parallelize(1 to 10)//创建RDD
    
    source.collect();//打印
    
    val mapadd=source.map(_*2)//每一个×2
    
    mapadd.collect()//打印
    

     

     (4)mapPartitions(func) 案例

    1. 作用:类似于map,但独立地在RDD的每一个分片上运行,因此在类型为TRDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]假设有N个元素,有M个分区,那么map的函数的将被调用N,mapPartitions被调用M,一个函数一次处理所有分区。

    2. 需求:创建一个RDD,使每个元素*2组成新的RDD

    scala> val add=sc.parallelize(Array(1,2,3,4))
    add: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

    scala> add.mapPartitions(x=>x.map(_*2))
    res3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at mapPartitions at <console>:26

    scala> res3.collect
    res4: Array[Int] = Array(2, 4, 6, 8)

    (5)mapPartitionsWithIndex(func) 案例

    1. 作用:类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为TRDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]

    2. 需求:创建一个RDD,使每个元素跟所在分区形成一个元组组成一个新的RDD

    val rdd=sc.parallelize(Array(1,2,3,4))
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24
    
    scala> val indexRdd=rdd.mapPartitionsWithIndex((index,items)=>(items.map((index,_))))//index在这个例子中时下标值
    indexRdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[7] at mapPartitionsWithIndex at <console>:25
    
    scala> indexRdd.collect
    res5: Array[(Int, Int)] = Array((0,1), (1,2), (2,3), (3,4))

    cala> val indexRdd=rdd.mapPartitionsWithIndex((index,items)=>(items.map((3,_))))
    indexRdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[8] at mapPartitionsWithIndex at <console>:25

    scala> indexRdd.collect
    res6: Array[(Int, Int)] = Array((3,1), (3,2), (3,3), (3,4))

     (6)flatMap(func) 案例

    1. 作用:类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

    2. 需求:创建一个元素为1-5的RDD,运用flatMap创建一个新的RDD

    scala> val sourceFlat=sc.parallelize(1 to 5)
    sourceFlat: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:24
    
    scala> sourceFlat.collect()
    res8: Array[Int] = Array(1, 2, 3, 4, 5)
    
    scala> val flatMap=sourcelat.flatMap(1 to _)//根据上一个RDD创建一个新的RDD
    <console>:23: error: not found: value sourcelat
           val flatMap=sourcelat.flatMap(1 to _)
                       ^
    
    scala> val flatMap=sourceFlat.flatMap(1 to _)
    flatMap: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[11] at flatMap at <console>:25
    
    scala> flatMap.collect
    res9: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)
    

     (7)glom案例

    1. 作用:将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]

    2. 需求:创建一个4个分区的RDD,并将每个分区的数据放到一个数组

    scala> val rdd=sc.parallelize(1 to 16,4)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24
    
    scala> rdd.glom.collect()
    res10: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16))
    

     (8)groupBy(func)案例

    1. 作用:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。

    2. 需求:创建一个RDD,按照元素模以2的值进行分组。

    scala> val rdd=sc.parallelize(1 to 15)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:24
    
    scala> val group=rdd.groupBy(_%2)//讲所有数除以2 按照余数分组
    group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[19] at groupBy at <console>:25
    
    scala> group.collect
    res13: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4, 6, 8, 10, 12, 14)), (1,CompactBuffer(1, 3, 5, 7, 9, 11, 13, 15)))
    

     (9)filter(func) 案例

    1. 作用:过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。

    2. 需求:创建一个RDD(由字符串组成),过滤出一个新RDD(包含”xiao”子串)

    scala> val sourceFilter=sc.parallelize(Array("xiaoming","xiaohu","mxlg","uzi"))
    sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[20] at parallelize at <console>:24
    
    scala> sourceFilter.collect
    res14: Array[String] = Array(xiaoming, xiaohu, mxlg, uzi)
                                             
    
    scala> val filter=sourceFilter.filter(_.contains("xiao"))
    filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at filter at <console>:25
    
    scala> filter.collect
    res15: Array[String] = Array(xiaoming, xiaohu)
    

     (10)sample(withReplacement, fraction, seed) 案例

    1. 作用:以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样seed用于指定随机数生成器种子。

    2. 需求:创建一个RDD1-10),从中选择放回和不放回抽样

    scala> val rdd=sc.parallelize(1 to 10)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
    
    scala> rdd.collect
    res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    
    scala> val sample1=rdd.sample(true,0.4,2)//有放回
    sample1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[1] at sample at <console>:25
    
    scala> sample1.collect
    res1: Array[Int] = Array(1, 2, 2, 7, 7, 8, 9)
    
    scala> var sample2=rdd.sample(false,0.2,3)//不放回
    sample2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[2] at sample at <console>:25
    
    scala> sample2.collect
    res2: Array[Int] = Array(1, 9)
    
    scala> var sample2=rdd.sample(false,0.4,2)
    sample2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[3] at sample at <console>:25
    
    scala> sample2.collect
    res3: Array[Int] = Array(1, 7, 8)
    

     (11)distinct([numTasks])) 案例

    1. 作用:对源RDD进行去重后返回一个新的RDD默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它。

    2. 需求:创建一个RDD,使用distinct()对其去重。

    scala> val distinctRdd=sc.parallelize(List(1,2,1,5,3,5,4,8,6,4))
    distinctRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
    
    scala> val unionRDD =distinctRdd.distinct()
    unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at distinct at <console>:25
    
    scala> unionRDD.collect
    res5: Array[Int] = Array(4, 8, 1, 5, 6, 2, 3)
    
    scala> val unionRDD =distinctRdd.distinct(2)//设置并行度为2
    unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at distinct at <console>:25
    
    scala> unionRDD.collect
    res6: Array[Int] = Array(4, 6, 8, 2, 1, 3, 5)
    

     (12)coalesce(numPartitions) 案例

    1. 作用:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。

    2. 需求:创建一个4个分区的RDD,对其缩减分区

    scala> val rdd=sc.parallelize(1 to 16,4)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24
    
    scala> rdd.partitions.size
    res7: Int = 4
    
    scala> val coalRDD=rdd.coalesce(3)
    coalRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[12] at coalesce at <console>:25
    
    scala> coalRDD.partitions.size
    res9: Int = 3
    

     (13)repartition(numPartitions) 案例

    1. 作用:根据分区数,重新通过网络随机洗牌所有数据。

    2. 需求:创建一个4个分区的RDD,对其重新分区

    scala> val rdd=sc.parallelize(1 to 16,4)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:24
    
    scala> val rerdd=rdd.repartition(2)
    rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[17] at repartition at <console>:25
    
    scala> rerdd.partitions.size
    res11: Int = 2
    

    coalescerepartition的区别

    1. coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。

    2. repartition实际上是调用的coalesce,默认是进行shuffle的。

    (14)sortBy(func,[ascending], [numTasks]) 案例

    1. 作用;使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。

    2. 需求:创建一个RDD,按照不同的规则进行排序

    scala> val rdd=sc.parallelize(List(2,1,3,4))
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:24
    
    scala> rdd.sortBy(x=>x).collect//按照自己排序,默认升序
    res12: Array[Int] = Array(1, 2, 3, 4)
    
    scala> rdd.sortBy(x=>x%3).collect//按照3的余数排序
    res13: Array[Int] = Array(3, 1, 4, 2)
    
    scala> rdd.sortBy(p=>p%3).collect//字母不一定非是x
    res14: Array[Int] = Array(3, 1, 4, 2)
    

     (15)union(otherDataset) 案例

    1. 作用:对源RDD和参数RDD求并集后返回一个新的RDD

    2. 需求:创建两个RDD,求并集

    scala> val rdd1=sc.parallelize(1 to 5)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:24
    
    scala> val rdd2=sc.parallelize(3 to 7)
    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[35] at parallelize at <console>:24
    
    scala> val rdd3=rdd1.union(rdd2)
    rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[36] at union at <console>:27
    
    scala> rdd3.collect
    res15: Array[Int] = Array(1, 2, 3, 4, 5, 3, 4, 5, 6, 7)
    

     (16)subtract (otherDataset) 案例

    1. 作用:计算差的一种函数,去除两个RDD中相同的元素,不同的RDD将保留下来

    2. 需求:创建两个RDD,求第一个RDD与第二个RDD的差集

    scala> val rdd1=sc.parallelize(1 to 5)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:24
    
    scala> val rdd2=sc.parallelize(3 to 7)
    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[35] at parallelize at <console>:24

    scala> rdd1.subtract(rdd2).collect res17: Array[Int] = Array(1, 2)

     (17)intersection(otherDataset) 案例

    1. 作用:对源RDD和参数RDD求交集后返回一个新的RDD

    2. 需求:创建两个RDD,求两个RDD的交集

    scala> val rdd1=sc.parallelize(1 to 5)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
    
    scala> val rdd2=sc.parallelize(3 to 7)
    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
    
    scala> val rdd3=rdd1.intersection(rdd2)
    rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at intersection at <console>:27
    
    scala> rdd3.collect
    res0: Array[Int] = Array(4, 5, 3)
    

     (18)cartesian(otherDataset) 案例

    1. 作用:笛卡尔积(尽量避免使用)

    2. 需求:创建两个RDD,计算两个RDD的笛卡尔积

    scala> val rdd1=sc.parallelize(1 to 5)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
    
    scala> val rdd2=sc.parallelize(3 to 7)
    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

    scala> rdd1.cartesian(rdd2).collect res1: Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (1,6), (1,7), (2,3), (2,4), (2,5), (2,6), (2,7), (3,3), (3,4), (3,5), (3,6), (3,7), (4,3), (5,3), (4,4), (5,4), (4,5), (5,5), (4,6), (4,7), (5,6), (5,7))

     (19)zip(otherDataset)案例

    1. 作用:将两个RDD组合成Key/Value形式的RDD,这里默认两个RDDpartition数量以及元素数量都相同,否则会抛出异常。

    2. 需求:创建两个RDD,并将两个RDD组合到一起形成一个(k,v)RDD

    scala> val rdd1=sc.parallelize(Array(1,2,3),3)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:24
    
    scala> val rdd2=sc.parallelize(Array("a","b","c"),3)
    rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:24
    
    scala> rdd1.zip(rdd2).collect
    res2: Array[(Int, String)] = Array((1,a), (2,b), (3,c))
    

    Key-Value类型

    (20)partitionBy案例

    1. 作用:对pairRDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程。

    2. 需求:创建一个4个分区的RDD,对其重新分区

    scala> val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4)
    rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[13] at parallelize at <console>:24
    
    scala> rdd.partitions.size
    res3: Int = 4
    
    scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
    rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[14] at partitionBy at <console>:25
    
    scala> rdd2.partitions.size
    res4: Int = 2
    

     (21)groupByKey案例

    1. 作用:groupByKey也是对每个key进行操作,但只生成一个sequence

    2. 需求:创建一个pairRDD,将相同key对应值聚合到一个sequence中,并计算相同key对应值的相加结果。

    scala> val words=Array("one","two","three","three","three" )//创建一个pairRDD
    words: Array[String] = Array(one, two, three, three, three)
    
    scala> val wordPairRDD=sc.parallelize(words).map(word=>(word,1))
    wordPairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[16] at map at <console>:26
    
    scala> val group=wordPairRDD.groupByKey()//将相同key对应值聚合到一个sequence中
    group: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[17] at groupByKey at <console>:25
    
    scala> group.collect//打印
    res5: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))
    
    scala> group.map(t=>(t._1,t._2.sum))//聚合
    res6: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[18] at map at <console>:26
    
    scala> res6.collect
    res7: Array[(String, Int)] = Array((two,1), (one,1), (three,3))

    (22)reduceByKey(func, [numTasks]) 案例

    1. 在一个(K,V)RDD上调用,返回一个(K,V)RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。

    2. 需求:创建一个pairRDD,计算相同key对应值的相加结果

    scala> val rdd=sc.parallelize(List(("female",1),("male",5),("female",5),("male",2)))
    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[19] at parallelize at <console>:24
    
    scala> val reduce=rdd.reduceByKey((x,y)=>x+y)
    reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[20] at reduceByKey at <console>:25
    
    scala> reduce.collect();
    res8: Array[(String, Int)] = Array((female,6), (male,7))
    

     reduceByKeygroupByKey的区别

    1. reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v].

    2. groupByKey:按照key进行分组,直接进行shuffle

     (23)foldByKey案例

    参数:(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

    1. 作用:aggregateByKey的简化操作,seqopcombop相同
    2. 需求:创建一个pairRDD,计算相同key对应值的相加结果
    res11: Array[(Int, Int)] = Array((1,3), (1,2), (1,4), (2,3), (3,6), (3,8))
    
    scala> val agg=rdd.foldByKey(0)(_+_)//0为初始值
    agg: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[24] at foldByKey at <console>:25
    
    scala> agg.collect
    res12: Array[(Int, Int)] = Array((3,14), (1,9), (2,3))
    

     (24)combineByKey[C] 案例

    参数:(createCombiner: V => C,  mergeValue: (C, V) => C,  mergeCombiners: (C, C) => C) 

    1. 作用:对相同K,把V合并成一个集合。
    2. 参数描述:

    1createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值

    2mergeValue: 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并

    3mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。

    1. 需求:创建一个pairRDD,根据key计算每种key的均值。(先计算每个key出现的次数以及可以对应值的总和,再相除得到结果
    scala> val input=sc.parallelize(Array(("a",88),("b",95),("a",91),("b",93),("a",95),("b",98)),2)//输入数据分成一个二元组
    input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[26] at parallelize at <console>:24                                                                                                ^
    
    scala> val combine=input.combineByKey((_,1),(acc:(Int,Int),v)=>(acc._1+v,acc._2+1),(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2))//将相同key对应的值相加,同时记录该key出现的次数,放入一个二元组,具体此语句看下图
    
    combine: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[27] at combineByKey at <console>:25
    
    scala> combine.collect//输出
    res15: Array[(String, (Int, Int))] = Array((b,(286,3)), (a,(274,3)))
    
    scala> val result=combine.map{case(key,value)=>(key,value._1/value._2.toDouble)} //求平均值
    result: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[28] at map at <console>:25
    
    scala> result.collect//输出
    res16: Array[(String, Double)] = Array((b,95.33333333333333), (a,91.33333333333333))
    
    scala> 
    

     

    (25)sortByKey([ascending], [numTasks]) 案例

    1. 作用:在一个(K,V)RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)RDD

    2. 需求:创建一个pairRDD,按照key的正序和倒序进行排序

    scala> var rdd=sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
    rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[29] at parallelize at <console>:24
    
    scala> rdd.sortByKey(true).collect//正序
    res17: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc))
    
    scala> rdd.sortByKey(false).collect//倒序
    res18: Array[(Int, String)] = Array((6,cc), (3,aa), (2,bb), (1,dd))
    

     (26)mapValues案例

    1. 针对于(K,V)形式的类型只对V进行操作

    2. 需求:创建一个pairRDD,并将value添加字符串"|||"

    scala> val rdd3=sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c")))
    rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[36] at parallelize at <console>:24
    
    scala> rdd.mapValues(_+"|||").collect
    res19: Array[(Int, String)] = Array((3,aa|||), (6,cc|||), (2,bb|||), (1,dd|||))
    

     (27)join(otherDataset, [numTasks]) 案例

    1. 作用:在类型为(K,V)(K,W)RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))RDD

    2. 需求:创建两个pairRDD,并将key相同的数据聚合到一个元组。

    scala> val rdd=sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
    rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[38] at parallelize at <console>:24
    
    scala> val rdd1=sc.parallelize(Array((1,4),(2,5),(3,6)))
    rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[39] at parallelize at <console>:24
    
    scala> rdd.join(rdd1).collect
    res20: Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6)))
    

     (28)cogroup(otherDataset, [numTasks]) 案例

    1. 作用:在类型为(K,V)(K,W)RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD

    2. 需求:创建两个pairRDD,并将key相同的数据聚合到一个迭代器。

    scala> val rdd =sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
    rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[43] at parallelize at <console>:24
    
    scala> val rdd1=sc.parallelize(Array((1,4),(2,5),(3,6)))
    rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[44] at parallelize at <console>:24
    
    scala> rdd.cogroup(rdd1).collect
    res21: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((1,(CompactBuffer(a),CompactBuffer(4))), (2,(CompactBuffer(b),CompactBuffer(5))), (3,(CompactBuffer(c),CompactBuffer(6))))
    
  • 相关阅读:
    Python web开发——自定义userprofile(用户描述)
    Python web 开发(1)——新建项目
    小白使用pycharm+ 安装Django
    创建独立的Python开发环境virtualenvwrapper
    创建一个独立的Python 开发环境 virtualenv
    简单的爬虫demo
    Python 输入和输出
    响应式网页设计
    WEB字体,多列布局和伸缩盒
    CSS变形
  • 原文地址:https://www.cnblogs.com/837634902why/p/11475787.html
Copyright © 2011-2022 走看看