zoukankan      html  css  js  c++  java
  • spark 常用技巧总结2

     zip拉链操作

    def zip[U](other: org.apache.spark.rdd.RDD[U])(implicit evidence$10: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[(String, U)]

    scala> val rdd1=sc.makeRDD(Array("apple","pear","grape","egg","elephant"))

    rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at makeRDD at <console>:24

    scala> val rdd2=sc.makeRDD(List(20,5,8,6,3))

    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at makeRDD at <console>:24

    scala> rdd1.zip(rdd2).collect
    res35: Array[(String, Int)] = Array((apple,20), (pear,5), (grape,8), (egg,6), (elephant,3))

    scala> val rdd3=rdd1 zip rdd2

    rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[27] at zip at <console>:28

    scala> rdd3.collect

    res36: Array[(String, Int)] = Array((apple,20), (pear,5), (grape,8), (egg,6), (elephant,3))

    -------------------------


    def combineByKey[C](createCombiner: Int => C,mergeValue: (C, Int) => C,mergeCombiners: (C, C) => C): org.apache.spark.rdd.RDD[(String, C)]
    def combineByKey[C](createCombiner: Int => C,mergeValue: (C, Int) => C,mergeCombiners: (C, C) => C,numPartitions: Int): org.apache.spark.rdd.RDD[(String, C)]
    def combineByKey[C](createCombiner: Int => C,mergeValue: (C, Int) => C,mergeCombiners: (C, C) => C,partitioner: org.apache.spark.Partitioner,mapSideCombine: Boolean,serializer: org.apache.spark.serializer.Serializer): org.apache.spark.rdd.RDD[(String, C)]

     def combineByKey[C](      

    createCombiner: V => C,

    mergeValue: (C, V) => C,      

    mergeCombiners: (C, C) => C,       n

    umPartitions: Int): RDD[(K, C)] = self.withScope {    

    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)  

    }


    scala> rdd3.collect
    res53: Array[(String, Int)] = Array((apple,2), (pear,1), (grape,2), (egg,1), (elephant,1))

    scala> val rdd4=rdd3.combineByKey(List(_),(x:List[Int],v:Int)=>x:+v,(m:List[Int],n:List[Int])=>m++n)
    rdd4: org.apache.spark.rdd.RDD[(String, List[Int])] = ShuffledRDD[35] at combineByKey at <console>:30

    scala> rdd4.collect

    res51: Array[(String, List[Int])] = Array((egg,List(1)), (elephant,List(1)), (pear,List(1)), (apple,List(2)), (grape,List(2)))

    scala> val rdd4=rdd3.map(x=>(x._2,x._1))

    rdd4: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[33] at map at <console>:30

    scala> val rdd5=rdd4.combineByKey(List(_),(x:List[String],v:String)=>x:+v,(m:List[String],n:List[String])=>m++n)
    rdd5: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[37] at combineByKey at <console>:32

    scala> rdd5.collect

    res52: Array[(Int, List[String])] = Array((1,List(pear, egg, elephant)), (2,List(apple, grape)))

    --------------------

    scala> val rdd1=sc.makeRDD(Array("apple","apple","pear","egg","hellokitty","egg","apple"))

    rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at makeRDD at <console>:24

    scala> rdd1.countByValue

    res1: scala.collection.Map[String,Long] = Map(hellokitty -> 1, egg -> 2, pear -> 1, apple -> 3)


    scala> val map1=rdd1.countByValue
    map1: scala.collection.Map[String,Long] = Map(hellokitty -> 1, egg -> 2, pear -> 1, apple -> 3)

    scala> val rdd2=sc.makeRDD(map1.toList)

    rdd2: org.apache.spark.rdd.RDD[(String, Long)] = ParallelCollectionRDD[21] at makeRDD at <console>:28

    scala> rdd2.collect

    res5: Array[(String, Long)] = Array((hellokitty,1), (egg,2), (pear,1), (apple,3))

    -------------------

    scala> val rdd1=sc.makeRDD(Array("apple","apple","pear","egg","hellokitty","egg","apple"))

    rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at makeRDD at <console>:24

    scala> val rdd2=rdd1.map(x=>(x,1))

    rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[29] at map at <console>:26

    scala> rdd2.collect

    res33: Array[(String, Int)] = Array((apple,1), (apple,1), (pear,1), (egg,1), (hellokitty,1), (egg,1), (apple,1))

    scala> rdd2.partitions.size
    res34: Int = 4

    scala> rdd2.reduceByKey(_+_).collect
    res36: Array[(String, Int)] = Array((hellokitty,1), (egg,2), (pear,1), (apple,3))

    scala> rdd2.reduceByKey(_+_,2).partitions.size //shuffile重新分为2个分区
    res37: Int = 2

    -------------------------------

    shuffle操作可以重新分区,指定分区数

    进行 shuffle 操作的是是很消耗系统资源的,需要写入到磁盘并通过网络传输,有时还需要对数据进行排序.常见的 Transformation 操作如:repartition,join,cogroup,以及任何 *By 或者 *ByKey 的 Transformation 都需要 shuffle 

     --------------------------------------

    scala> val rdd2=rdd1.map(x=>(x,1))
    rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[29] at map at <console>:26

    scala> rdd2.collect
    res39: Array[(String, Int)] = Array((apple,1), (apple,1), (pear,1), (egg,1), (hellokitty,1), (egg,1), (apple,1))


    scala> rdd2.combineByKey(x=>x,(c:Int,n:Int)=>c+n,(c1:Int,c2:Int)=>c1+c2).collect
    res41: Array[(String, Int)] = Array((hellokitty,1), (egg,2), (pear,1), (apple,3))

    scala> rdd1.countByValue()
    res42: scala.collection.Map[String,Long] = Map(hellokitty -> 1, egg -> 2, pear -> 1, apple -> 3)

    scala> rdd2.reduceByKey(_+_).collect
    res44: Array[(String, Int)] = Array((hellokitty,1), (egg,2), (pear,1), (apple,3))

    -------------------------------

    scala> val rdd3=rdd1.map(x=>(1,x))

    rdd3: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[40] at map at <console>:26

    scala> rdd3.collect

    res45: Array[(Int, String)] = Array((1,apple), (1,apple), (1,pear), (1,egg), (1,hellokitty), (1,egg), (1,apple))

    scala> rdd3.combineByKey(x=>List(x),(c:List[String],y:String)=>c:+y,(c1:List[String],c2:List[String])=>c1++c2).collect
    res49: Array[(Int, List[String])] = Array((1,List(apple, apple, pear, egg, hellokitty, egg, apple)))

    ---------------------------------------------

    scala> val rdd00=sc.makeRDD(List(("a",1),("b",1),("a",3),("ba",3),("b",1),("g",10)),2)

    rdd00: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[44] at makeRDD at <console>:24

    scala> val rdd3=rdd00.map(x=>(x._2,x._1))

    rdd3: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[45] at map at <console>:26

    scala> rdd3.collect

    res51: Array[(Int, String)] = Array((1,a), (1,b), (3,a), (3,ba), (1,b), (10,g))

    scala> rdd3.groupByKey().collect

    res53: Array[(Int, Iterable[String])] = Array((10,CompactBuffer(g)), (1,CompactBuffer(a, b, b)), (3,CompactBuffer(a, ba)))

    scala> rdd3.combineByKey(x=>List(x),(c:List[String],y:String)=>c:+y,(c1:List[String],c2:List[String])=>c1++c2).collect

    res54: Array[(Int, List[String])] = Array((10,List(g)), (1,List(a, b, b)), (3,List(a, ba)))

    -----------------------

    distinct(numPartitions:Int) 去重的同时重新分区

    scala> val bb=sc.makeRDD(Array(1,1,2,1,8,6,8,4,5,4),2)

    bb: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[81] at makeRDD at <console>:25

    scala> bb.distinct(1).partitions.size

    res61: Int = 1

    scala> bb.distinct(3).partitions.size

    res62: Int = 3

    ----------------------

      def randomSplit(weights: Array[Double],seed: Long): Array[org.apache.spark.rdd.RDD[Int]]

    randomSplit操作根据weights权重将一个RDD分割为多个RDD。权重越高,划分到的几率越大,权重的总和加起来为1,否则会不正常

    scala> val split=aa.randomSplit(Array(0.1,0.2,0.3,0.4))

    split: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[165] at randomSplit at <console>:27, MapPartitionsRDD[166] at randomSplit at <console>:27, MapPartitionsRDD[167] at randomSplit at <console>:27, MapPartitionsRDD[168] at randomSplit at <console>:27)

    scala> split(0).count

    res94: Long = 11

    scala> split(1).count

    res95: Long = 19

    scala> split(2).count

    res96: Long = 34

    scala> split(3).count

    res97: Long = 36

    -----------------------------------------------------

        def glom(): org.apache.spark.rdd.RDD[Array[Int]]

    glom将每个分区中的元素放到一个数组里,变成和分区数一样多的数据

    scala> val bb=sc.makeRDD(1 to 10,3)

    bb: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[203] at makeRDD at <console>:25

    scala> bb.glom().collect

    res127: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))

  • 相关阅读:
    Oracle Merge into
    查询 null 记录
    删除 eclipse 插件
    vs 密钥
    视频网站建设
    eclipse 版本 查看
    让数据库变快的十个建议
    c# WebBrowser 操作
    svn 代码合并
    Android开发者应该深入学习的10个开源应用项目
  • 原文地址:https://www.cnblogs.com/playforever/p/9294416.html
Copyright © 2011-2022 走看看