zoukankan      html  css  js  c++  java
  • spark 常用技巧总结2

     zip拉链操作

    def zip[U](other: org.apache.spark.rdd.RDD[U])(implicit evidence$10: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[(String, U)]

    scala> val rdd1=sc.makeRDD(Array("apple","pear","grape","egg","elephant"))

    rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at makeRDD at <console>:24

    scala> val rdd2=sc.makeRDD(List(20,5,8,6,3))

    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at makeRDD at <console>:24

    scala> rdd1.zip(rdd2).collect
    res35: Array[(String, Int)] = Array((apple,20), (pear,5), (grape,8), (egg,6), (elephant,3))

    scala> val rdd3=rdd1 zip rdd2

    rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[27] at zip at <console>:28

    scala> rdd3.collect

    res36: Array[(String, Int)] = Array((apple,20), (pear,5), (grape,8), (egg,6), (elephant,3))

    -------------------------


    def combineByKey[C](createCombiner: Int => C,mergeValue: (C, Int) => C,mergeCombiners: (C, C) => C): org.apache.spark.rdd.RDD[(String, C)]
    def combineByKey[C](createCombiner: Int => C,mergeValue: (C, Int) => C,mergeCombiners: (C, C) => C,numPartitions: Int): org.apache.spark.rdd.RDD[(String, C)]
    def combineByKey[C](createCombiner: Int => C,mergeValue: (C, Int) => C,mergeCombiners: (C, C) => C,partitioner: org.apache.spark.Partitioner,mapSideCombine: Boolean,serializer: org.apache.spark.serializer.Serializer): org.apache.spark.rdd.RDD[(String, C)]

     def combineByKey[C](      

    createCombiner: V => C,

    mergeValue: (C, V) => C,      

    mergeCombiners: (C, C) => C,       n

    umPartitions: Int): RDD[(K, C)] = self.withScope {    

    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)  

    }


    scala> rdd3.collect
    res53: Array[(String, Int)] = Array((apple,2), (pear,1), (grape,2), (egg,1), (elephant,1))

    scala> val rdd4=rdd3.combineByKey(List(_),(x:List[Int],v:Int)=>x:+v,(m:List[Int],n:List[Int])=>m++n)
    rdd4: org.apache.spark.rdd.RDD[(String, List[Int])] = ShuffledRDD[35] at combineByKey at <console>:30

    scala> rdd4.collect

    res51: Array[(String, List[Int])] = Array((egg,List(1)), (elephant,List(1)), (pear,List(1)), (apple,List(2)), (grape,List(2)))

    scala> val rdd4=rdd3.map(x=>(x._2,x._1))

    rdd4: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[33] at map at <console>:30

    scala> val rdd5=rdd4.combineByKey(List(_),(x:List[String],v:String)=>x:+v,(m:List[String],n:List[String])=>m++n)
    rdd5: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[37] at combineByKey at <console>:32

    scala> rdd5.collect

    res52: Array[(Int, List[String])] = Array((1,List(pear, egg, elephant)), (2,List(apple, grape)))

    --------------------

    scala> val rdd1=sc.makeRDD(Array("apple","apple","pear","egg","hellokitty","egg","apple"))

    rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at makeRDD at <console>:24

    scala> rdd1.countByValue

    res1: scala.collection.Map[String,Long] = Map(hellokitty -> 1, egg -> 2, pear -> 1, apple -> 3)


    scala> val map1=rdd1.countByValue
    map1: scala.collection.Map[String,Long] = Map(hellokitty -> 1, egg -> 2, pear -> 1, apple -> 3)

    scala> val rdd2=sc.makeRDD(map1.toList)

    rdd2: org.apache.spark.rdd.RDD[(String, Long)] = ParallelCollectionRDD[21] at makeRDD at <console>:28

    scala> rdd2.collect

    res5: Array[(String, Long)] = Array((hellokitty,1), (egg,2), (pear,1), (apple,3))

    -------------------

    scala> val rdd1=sc.makeRDD(Array("apple","apple","pear","egg","hellokitty","egg","apple"))

    rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at makeRDD at <console>:24

    scala> val rdd2=rdd1.map(x=>(x,1))

    rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[29] at map at <console>:26

    scala> rdd2.collect

    res33: Array[(String, Int)] = Array((apple,1), (apple,1), (pear,1), (egg,1), (hellokitty,1), (egg,1), (apple,1))

    scala> rdd2.partitions.size
    res34: Int = 4

    scala> rdd2.reduceByKey(_+_).collect
    res36: Array[(String, Int)] = Array((hellokitty,1), (egg,2), (pear,1), (apple,3))

    scala> rdd2.reduceByKey(_+_,2).partitions.size //shuffile重新分为2个分区
    res37: Int = 2

    -------------------------------

    shuffle操作可以重新分区,指定分区数

    进行 shuffle 操作的是是很消耗系统资源的,需要写入到磁盘并通过网络传输,有时还需要对数据进行排序.常见的 Transformation 操作如:repartition,join,cogroup,以及任何 *By 或者 *ByKey 的 Transformation 都需要 shuffle 

     --------------------------------------

    scala> val rdd2=rdd1.map(x=>(x,1))
    rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[29] at map at <console>:26

    scala> rdd2.collect
    res39: Array[(String, Int)] = Array((apple,1), (apple,1), (pear,1), (egg,1), (hellokitty,1), (egg,1), (apple,1))


    scala> rdd2.combineByKey(x=>x,(c:Int,n:Int)=>c+n,(c1:Int,c2:Int)=>c1+c2).collect
    res41: Array[(String, Int)] = Array((hellokitty,1), (egg,2), (pear,1), (apple,3))

    scala> rdd1.countByValue()
    res42: scala.collection.Map[String,Long] = Map(hellokitty -> 1, egg -> 2, pear -> 1, apple -> 3)

    scala> rdd2.reduceByKey(_+_).collect
    res44: Array[(String, Int)] = Array((hellokitty,1), (egg,2), (pear,1), (apple,3))

    -------------------------------

    scala> val rdd3=rdd1.map(x=>(1,x))

    rdd3: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[40] at map at <console>:26

    scala> rdd3.collect

    res45: Array[(Int, String)] = Array((1,apple), (1,apple), (1,pear), (1,egg), (1,hellokitty), (1,egg), (1,apple))

    scala> rdd3.combineByKey(x=>List(x),(c:List[String],y:String)=>c:+y,(c1:List[String],c2:List[String])=>c1++c2).collect
    res49: Array[(Int, List[String])] = Array((1,List(apple, apple, pear, egg, hellokitty, egg, apple)))

    ---------------------------------------------

    scala> val rdd00=sc.makeRDD(List(("a",1),("b",1),("a",3),("ba",3),("b",1),("g",10)),2)

    rdd00: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[44] at makeRDD at <console>:24

    scala> val rdd3=rdd00.map(x=>(x._2,x._1))

    rdd3: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[45] at map at <console>:26

    scala> rdd3.collect

    res51: Array[(Int, String)] = Array((1,a), (1,b), (3,a), (3,ba), (1,b), (10,g))

    scala> rdd3.groupByKey().collect

    res53: Array[(Int, Iterable[String])] = Array((10,CompactBuffer(g)), (1,CompactBuffer(a, b, b)), (3,CompactBuffer(a, ba)))

    scala> rdd3.combineByKey(x=>List(x),(c:List[String],y:String)=>c:+y,(c1:List[String],c2:List[String])=>c1++c2).collect

    res54: Array[(Int, List[String])] = Array((10,List(g)), (1,List(a, b, b)), (3,List(a, ba)))

    -----------------------

    distinct(numPartitions:Int) 去重的同时重新分区

    scala> val bb=sc.makeRDD(Array(1,1,2,1,8,6,8,4,5,4),2)

    bb: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[81] at makeRDD at <console>:25

    scala> bb.distinct(1).partitions.size

    res61: Int = 1

    scala> bb.distinct(3).partitions.size

    res62: Int = 3

    ----------------------

      def randomSplit(weights: Array[Double],seed: Long): Array[org.apache.spark.rdd.RDD[Int]]

    randomSplit操作根据weights权重将一个RDD分割为多个RDD。权重越高,划分到的几率越大,权重的总和加起来为1,否则会不正常

    scala> val split=aa.randomSplit(Array(0.1,0.2,0.3,0.4))

    split: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[165] at randomSplit at <console>:27, MapPartitionsRDD[166] at randomSplit at <console>:27, MapPartitionsRDD[167] at randomSplit at <console>:27, MapPartitionsRDD[168] at randomSplit at <console>:27)

    scala> split(0).count

    res94: Long = 11

    scala> split(1).count

    res95: Long = 19

    scala> split(2).count

    res96: Long = 34

    scala> split(3).count

    res97: Long = 36

    -----------------------------------------------------

        def glom(): org.apache.spark.rdd.RDD[Array[Int]]

    glom将每个分区中的元素放到一个数组里,变成和分区数一样多的数据

    scala> val bb=sc.makeRDD(1 to 10,3)

    bb: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[203] at makeRDD at <console>:25

    scala> bb.glom().collect

    res127: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))

  • 相关阅读:
    poj 2528 Mayor's posters (线段树+离散化)
    poj 1201 Intervals (差分约束)
    hdu 4109 Instrction Arrangement (差分约束)
    poj 1195 Mobile phones (二维 树状数组)
    poj 2983 Is the Information Reliable? (差分约束)
    树状数组 讲解
    poj 2828 Buy Tickets (线段树)
    hdu 1166 敌兵布阵 (树状数组)
    Ubuntu网络配置
    Button控制窗体变量(开关控制灯的状态)
  • 原文地址:https://www.cnblogs.com/playforever/p/9294416.html
Copyright © 2011-2022 走看看