zoukankan      html  css  js  c++  java
  • Spark RDD Transformation 简单用例(三)

    cache和persist

    将RDD数据进行存储,persist(newLevel: StorageLevel)设置了存储级别,cache()和persist()是相同的,存储级别为MEMORY_ONLY。因为RDD的transformation是lazy的,只有action算子才会触发transformain真正的执行,如果一个rdd需要进行多次的action算子操作,最好能够使用cache或persist将rdd缓存至内存中,这样除第一次action会触发transformation操作,后面的action算子都不会再次触发transformation操作。

    class StorageLevel private(
        private var _useDisk: Boolean,
        private var _useMemory: Boolean,
        private var _useOffHeap: Boolean,
        private var _deserialized: Boolean,
        private var _replication: Int = 1) /*复制份数,默认为1*/
    extends Externalizable 
        
      val NONE = new StorageLevel(false, false, false, false)
      val DISK_ONLY = new StorageLevel(true, false, false, false)
      val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
      val MEMORY_ONLY = new StorageLevel(false, true, false, true)
      val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
      val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
      val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
      val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
      val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
      val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
      val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
      val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
    
    /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
    def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
    
    /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
    def cache(): this.type = persist()
    /**
    * Set this RDD's storage level to persist its values across operations after the first time
    * it is computed. This can only be used to assign a new storage level if the RDD does not
    * have a storage level set yet. Local checkpointing is an exception.
    */
    def persist(newLevel: StorageLevel): this.type

    /**
    * Mark this RDD for persisting using the specified level.
    *
    * @param newLevel the target storage level
    * @param allowOverride whether to override any existing level with the new one
    */
    private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type
    
    

    mapValues(func)

    元素是key-value对的RDD的每一个元素的value经过func映射(key不变)构建一个新的RDD
    /**

    * Pass each value in the key-value pair RDD through a map function without changing the keys;
    * this also retains the original RDD's partitioning.
    */
    def mapValues[U](f: V => U): RDD[(K, U)]
    val rdd = sc.parallelize(List((1,1),(1,2),(1,3),(2,1),(2,2),(2,3)),3)
    val rdd1 = rdd.mapValues(x=>1L)
    rdd1.foreachPartition(it=>{
         while(it.hasNext){
           println(it.next)
         }
       println("================")
     }
    )
    scala> val rdd = sc.parallelize(List((1,1),(1,2),(1,3),(2,1),(2,2),(2,3)),3)
    rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24
    
    scala> val rdd1 = rdd.mapValues(x=>1L)
    rdd1: org.apache.spark.rdd.RDD[(Int, Long)] = MapPartitionsRDD[2] at mapValues at <console>:26
    
    scala> rdd1.foreachPartition(it=>{
         | while(it.hasNext){
         | println(it.next)
         | }
         | println("================")
         | }
         | )
    (1,1)
    (1,1)
    ================
    (1,1)
    (2,1)
    ================
    (2,1)
    (2,1)
    ================

    以上就是将(1,1),(1,2),(1,3),(2,1),(2,2),(2,3)中的value重新赋值为1,变为(1,1),(1,1),(1,1),(2,1),(2,1),(2,1)。下面使用reduceByKey计算每一个key出现的次数。

    scala> val rdd1 = rdd.mapValues(x=>1L).reduceByKey(_ + _)
    rdd1: org.apache.spark.rdd.RDD[(Int, Long)] = ShuffledRDD[4] at reduceByKey at <console>:26
    
    scala> rdd1.collect.toMap
    res4: scala.collection.immutable.Map[Int,Long] = Map(1 -> 3, 2 -> 3)

    其实以上操作就是action算子countByKey()的实现。 

    collect(f: PartialFunction[T, U])

    /**
    * Return an RDD that contains all matching values by applying `f`.
    */
    def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U]

     PartialFunction[T,U]是个什么东东呢?看一下PartialFunction的apply函数,即需要定义一个f(x)函数,入参类型为A,结果输出类型为B

    /** Converts ordinary function to partial one
     *  @since   2.10
     */
    def apply[A, B](f: A => B): PartialFunction[A, B] = { case x => f(x) }
    val f : PartialFunction[Int,String] = {case 0 => "Sunday"
        case 1 => "Monday"
        case 2 => "Tuesday"
        case 3 => "Wednesday"
        case 4 => "Thursday"
        case 5 => "Friday"
        case 6 => "Saturday"
        case _ => "Unknown"
    }
    
    val rdd = sc.parallelize(0 to 9)
    rdd.collect(f).collect
    
    res3: Array[String] = Array(Sunday, Monday, Tuesday, Wednesday, Thursday, Friday, Saturday, Unknown, Unknown, Unknown)

    glom()

    将每个partition中的元素合并成一个数组
    /**

    * Return an RDD created by coalescing all elements within each partition into an array.
    */
    def glom(): RDD[Array[T]]
    scala> val rdd = sc.parallelize(1 to 9,3)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24
    
    scala> rdd.glom
    res7: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[10] at glom at <console>:27
    
    scala> rdd.glom.collect
    res8: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9))

    subtract(other: RDD[T])

    返回other中不存在的元素组成新的RDD,分区属性如果没有指定,则和原RDD保持一致。
    /**
    * Return an RDD with the elements from `this` that are not in `other`.
    *
    * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
    * RDD will be &lt;= us.
    */
    def subtract(other: RDD[T]): RDD[T]
    val rdd1 = sc.parallelize(1 to 10,2)
    val rdd2 = sc.parallelize(5 to 20,3)
    val rdd = rdd1.subtract(rdd2)
    rdd.collect
    rdd.partitions.length
    scala> val rdd1 = sc.parallelize(1 to 10,2)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
    
    scala> val rdd2 = sc.parallelize(5 to 20,3)
    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
    
    scala> val rdd = rdd1.subtract(rdd2)
    rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at subtract at <console>:28
    
    scala> rdd.collect
    res0: Array[Int] = Array(2, 4, 1, 3)                                            
    
    scala> rdd.partitions.length
    res2: Int = 2

    指定分区数量

    /**
    * Return an RDD with the elements from `this` that are not in `other`.
    */
    def subtract(other: RDD[T], numPartitions: Int): RDD[T]
    scala> val rdd = rdd1.subtract(rdd2,5)
    rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at subtract at <console>:28
    
    scala> rdd.partitions.length
    res3: Int = 5

    自定义分区属性partitioner

    /**
    * Return an RDD with the elements from `this` that are not in `other`.
    */
    def subtract(
    other: RDD[T],
    p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]

    class MyPartitioner(numParts:Int) extends org.apache.spark.Partitioner{
      override def numPartitions: Int = numParts
      override def getPartition(key: Any): Int = {
        key.toString.toInt%numPartitions
      }
    }
    
    val rdd = rdd1.subtract(rdd2,new MyPartitioner(3))
    
    rdd.foreachPartition(
     x=>{
       while(x.hasNext){
         println(x.next)
       }
       println("============")
     }
     )
    scala> class MyPartitioner(numParts:Int) extends org.apache.spark.Partitioner{
         |   override def numPartitions: Int = numParts
         |   override def getPartition(key: Any): Int = {
         |     key.toString.toInt%numPartitions
         |   }
         | }
    defined class MyPartitioner
    
    scala> val rdd = rdd1.subtract(rdd2,new MyPartitioner(3))
    rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at subtract at <console>:29
    
    scala> rdd.foreachPartition(
         |  x=>{
         |    while(x.hasNext){
         |      println(x.next)
         |    }
         |    println("============")
         |  }
         |  )
    3
    ============
    1
    4
    ============
    2
    ============

    zip

    两个RDD对应位置(按顺序)组成key-value对创建新的RDD,两个RDD的元素在每个分区中数量必须相同,partition数量必须相同。
    /**

    * Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
    * second element in each RDD, etc. Assumes that the two RDDs have the *same number of
    * partitions* and the *same number of elements in each partition* (e.g. one was made through
    * a map on the other).
    */
    def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
    val rdd1 = sc.parallelize(1 to 5,2)
    val rdd2 = sc.parallelize(List("one","two","three","four","five"),2)
    val rdd = rdd1.zip(rdd2)
    rdd.collect
    scala> val rdd1 = sc.parallelize(1 to 5,2)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:24
    
    scala> val rdd2 = sc.parallelize(List("one","two","three","four","five"),2)
    rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[15] at parallelize at <console>:24
    
    scala> val rdd = rdd1.zip(rdd2)
    rdd: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[16] at zip at <console>:28
    
    scala> rdd.collect
    res5: Array[(Int, String)] = Array((1,one), (2,two), (3,three), (4,four), (5,five))

    combineByKey

    /**
    * Simplified version of combineByKeyWithClassTag that hash-partitions the resulting RDD using the
    * existing partitioner/parallelism level. This method is here for backward compatibility. It
    * does not provide combiner classtag information to the shuffle.
    *
    * @see [[combineByKeyWithClassTag]]
    */
    def combineByKey[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C): RDD[(K, C)]

    var rdd = sc.parallelize(Array(("A",1),("A",2),("A",3),("B",1),("B",2),("C",1)))
    rdd.combineByKey(
      (v : Int) => v + "$",   
      (c : String, v : Int) => c + "@" + v,  
      (c1 : String, c2 : String) => c1 + "||" + c2
    ).collect
    scala> var rdd = sc.parallelize(Array(("A",1),("A",2),("A",3),("B",1),("B",2),("C",1)))
    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[29] at parallelize at <console>:24
    scala> rdd.combineByKey(
         |   (v : Int) => v + "$",   
         |   (c : String, v : Int) => c + "@" + v,  
         |   (c1 : String, c2 : String) => c1 + "||" + c2
         | ).collect
    res20: Array[(String, String)] = Array((B,1$@2), (A,1$@2@3), (C,1$))

    没看明白啊!没看明白啊!没看明白啊!没看明白啊!没看明白啊!

    flatMapValues

    与flatMap类似,只是value经过函数f映射后得到1个或多个元素与key组成新的key-value,然后创建新的RDD。
    /**

    * Pass each value in the key-value pair RDD through a flatMap function without changing the
    * keys; this also retains the original RDD's partitioning.
    */
    def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]

    var rdd = sc.parallelize(Array(("A",1),("A",2),("A",3),("B",1),("B",2),("C",1)))
    rdd.flatMapValues(x => { x to 3}).collect
    scala> var rdd = sc.parallelize(Array(("A",1),("A",2),("A",3),("B",1),("B",2),("C",1)))
    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at parallelize at <console>:24
    
    
    scala> rdd.flatMapValues(x => { x to 3})
    res24: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[33] at flatMapValues at <console>:27
    
    scala> rdd.flatMapValues(x => { x to 3}).collect
    res25: Array[(String, Int)] = Array((A,1), (A,2), (A,3), (A,2), (A,3), (A,3), (B,1), (B,2), (B,3), (B,2), (B,3), (C,1), (C,2), (C,3))

    foldByKey

    对每一个key的value进行聚合运算,其中zeroValue会与每一个key组成一个key-value对参与运算。
    /**

    * Merge the values for each key using an associative function and a neutral "zero value" which
    * may be added to the result an arbitrary number of times, and must not change the result
    * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
    */
    def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
    var rdd = sc.parallelize(Array(("A",1),("A",2),("A",3),("B",1),("B",2),("C",1)))
    rdd.foldByKey(0)(_+_)
    rdd.foldByKey(0)(_+_).collect
    rdd.foldByKey(1)(_+_)
    rdd.foldByKey(1)(_+_).collect
    scala> var rdd = sc.parallelize(Array(("A",1),("A",2),("A",3),("B",1),("B",2),("C",1)))
    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[35] at parallelize at <console>:24
    
    scala> rdd.foldByKey(0)(_+_)
    res26: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[36] at foldByKey at <console>:27
    
    scala> rdd.foldByKey(0)(_+_).collect
    res27: Array[(String, Int)] = Array((B,3), (A,6), (C,1))
    
    scala> rdd.foldByKey(1)(_+_)
    res28: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[38] at foldByKey at <console>:27
    
    scala> rdd.foldByKey(1)(_+_).collect
    res29: Array[(String, Int)] = Array((B,4), (A,7), (C,2))

    keys

    返回key-value的所有key.
    /**

    * Return an RDD with the keys of each tuple.
    */
    def keys: RDD[K] = self.map(_._1)
    scala> var rdd = sc.parallelize(Array(("A",1),("A",2),("A",3),("B",1),("B",2),("C",1)))
    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[42] at parallelize at <console>:24
    
    scala> rdd.keys.collect
    res32: Array[String] = Array(A, A, A, B, B, C)

    values

    返回key-value的所有value.

    /**
    * Return an RDD with the values of each tuple.
    */
    def values: RDD[V] = self.map(_._2)
    scala> var rdd = sc.parallelize(Array(("A",1),("A",2),("A",3),("B",1),("B",2),("C",1)))
    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[44] at parallelize at <console>:24
    
    scala> rdd.values
    res33: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[45] at values at <console>:27
    
    scala> rdd.values.collect
    res34: Array[Int] = Array(1, 2, 3, 1, 2, 1)
  • 相关阅读:
    Qt 添加外部库文件
    实例属性的增删改查
    面向对象2 类属性的增删改查
    面向对象
    hashlib模块
    configparser模块
    logging模块
    re模块2
    计算器 暂时没解决小数问题
    re正则表达式
  • 原文地址:https://www.cnblogs.com/alianbog/p/5837240.html
Copyright © 2011-2022 走看看