zoukankan      html  css  js  c++  java
  • Spark RDD Action 简单用例(一)

    collectAsMap(): Map[K, V]

    返回key-value对,key是唯一的,如果rdd元素中同一个key对应多个value,则只会保留一个。
    /**

    * Return the key-value pairs in this RDD to the master as a Map.
    *
    * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only
    * one value per key is preserved in the map returned)
    *
    * @note this method should only be used if the resulting data is expected to be small, as
    * all the data is loaded into the driver's memory.
    */
    def collectAsMap(): Map[K, V]
    scala> val rdd = sc.parallelize(List(("A",1),("A",2),("A",3),("B",1),("B",2),("C",3)),3)
    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
    
    scala> rdd.collectAsMap
    res0: scala.collection.Map[String,Int] = Map(A -> 3, C -> 3, B -> 2)   
    
    

    countByKey(): Map[K, Long]

    计算有多少个不同的key.
    /**
    * Count the number of elements for each key, collecting the results to a local Map.
    *
    * Note that this method should only be used if the resulting map is expected to be small, as
    * the whole thing is loaded into the driver's memory.
    * To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which
    * returns an RDD[T, Long] instead of a map.
    */
    def countByKey(): Map[K, Long] = self.withScope {
    self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
    }

    scala> val rdd = sc.parallelize(List((1,1),(1,2),(1,3),(2,1),(2,2),(2,3)),3)
    rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[5] at parallelize at <console>:24
    
    scala> rdd.countByKey
    res5: scala.collection.Map[Int,Long] = Map(1 -> 3, 2 -> 3)

     countByValue()

    计算不同的value个数,该函数首先通过map将每个元素转成(value,null)的key-value(value为null)对,
    然后调用countByKey进行统计。


    /**
    * Return the count of each unique value in this RDD as a local map of (value, count) pairs.
    *
    * Note that this method should only be used if the resulting map is expected to be small, as
    * the whole thing is loaded into the driver's memory.
    * To handle very large results, consider using rdd.map(x =&gt; (x, 1L)).reduceByKey(_ + _), which
    * returns an RDD[T, Long] instead of a map.
    */
    def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {
    map(value => (value, null)).countByKey()
    }
    scala> val rdd = sc.parallelize(List(1,2,3,4,5,4,4,3,2,1))
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:24
    
    scala> rdd.countByValue
    res12: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 2, 2 -> 2, 3 -> 2, 4 -> 3)

    lookup(key: K)

    根据key值搜索所有的value.
    /**

    * Return the list of values in the RDD for key `key`. This operation is done efficiently if the
    * RDD has a known partitioner by only searching the partition that the key maps to.
    */
    def lookup(key: K): Seq[V]
    scala> val rdd = sc.parallelize(List(("A",1),("A",2),("A",3),("B",1),("B",2),("C",3)),3)
    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24
    
    scala> rdd.lookup("A")
    res2: Seq[Int] = WrappedArray(1, 2, 3)

    checkpoint()

     将RDD数据根据设置的checkpoint目录保存至硬盘中。

    /**
    * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
    * directory set with `SparkContext#setCheckpointDir` and all references to its parent
    * RDDs will be removed. This function must be called before any job has been
    * executed on this RDD. It is strongly recommended that this RDD is persisted in
    * memory, otherwise saving it on a file will require recomputation.
    */
    def checkpoint(): Unit
    /*通过linux命令创建/home/check目录后,设置checkpoint directory*/
    scala> sc.setCheckpointDir("/home/check")
    
    scala> val rdd = sc.parallelize(List(("A",1),("A",2),("A",3),("B",1),("B",2),("C",3)),3)
    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24
    
    /*
    *执行下面的代码会在/home/check目录下创建一个空的目录/home/check/5545e4ca-d53d-4d93-aaf4-fd3c74f1ea49
    */
    scala> rdd.checkpoint
    
    /*
    执行count后会在上述目录下创建一个rdd目录,rdd目录下是数据文件
    */
    scala> rdd.count
    res5: Long = 6           
    [root@localhost ~]# ll -a /home/check/5545e4ca-d53d-4d93-aaf4-fd3c74f1ea49/
    total 8
    drwxr-xr-x. 2 root root 4096 Sep  4 10:29 .
    drwxr-xr-x. 3 root root 4096 Sep  4 10:29 ..
    [root@localhost ~]# ll -a /home/check/5545e4ca-d53d-4d93-aaf4-fd3c74f1ea49/
    total 12
    drwxr-xr-x. 3 root root 4096 Sep  4 10:30 .
    drwxr-xr-x. 3 root root 4096 Sep  4 10:29 ..
    drwxr-xr-x. 2 root root 4096 Sep  4 10:30 rdd-6
    [root@localhost ~]# ll -a /home/check/5545e4ca-d53d-4d93-aaf4-fd3c74f1ea49/rdd-6/
    total 32
    drwxr-xr-x. 2 root root 4096 Sep  4 10:30 .
    drwxr-xr-x. 3 root root 4096 Sep  4 10:30 ..
    -rw-r--r--. 1 root root  171 Sep  4 10:30 part-00000
    -rw-r--r--. 1 root root   12 Sep  4 10:30 .part-00000.crc
    -rw-r--r--. 1 root root  170 Sep  4 10:30 part-00001
    -rw-r--r--. 1 root root   12 Sep  4 10:30 .part-00001.crc
    -rw-r--r--. 1 root root  170 Sep  4 10:30 part-00002
    -rw-r--r--. 1 root root   12 Sep  4 10:30 .part-00002.crc

    collect()

    返回RDD所有元素的数组。
    /**

    * Return an array that contains all of the elements in this RDD.
    *
    * @note this method should only be used if the resulting array is expected to be small, as
    * all the data is loaded into the driver's memory.
    */
    def collect(): Array[T]
    scala> val rdd = sc.parallelize(1 to 10,3)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:24
    
    scala> rdd.collect
    res8: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

    toLocalIterator: Iterator[T]

    返回一个包含所有算的迭代器。
    /**
    * Return an iterator that contains all of the elements in this RDD.
    *
    * The iterator will consume as much memory as the largest partition in this RDD.
    *
    * Note: this results in multiple Spark jobs, and if the input RDD is the result
    * of a wide transformation (e.g. join with different partitioners), to avoid
    * recomputing the input RDD should be cached first.
    */
    def toLocalIterator: Iterator[T]
    scala> val rdd = sc.parallelize(1 to 10,2)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
    
    scala> val it = rdd.toLocalIterator
    it: Iterator[Int] = non-empty iterator
    
    scala> while(it.hasNext){
         | println(it.next)
         | }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

    count()

    返回RDD中元素的数量。
    /**
    * Return the number of elements in the RDD.
    */
    def count(): Long
    scala> val rdd = sc.parallelize(1 to 10,2)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
    scala> rdd.count
    res1: Long = 10

    dependencies

    返回该RDD的依赖RDD的地址。
    /**
    * Get the list of dependencies of this RDD, taking into account whether the
    * RDD is checkpointed or not.
    */
    final def dependencies: Seq[Dependency[_]]
    scala> val rdd = sc.parallelize(1 to 10,2)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
    scala> val rdd1 = rdd.filter(_>3)
    rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at filter at <console>:26
    
    scala> val rdd2 = rdd1.filter(_<6)
    rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at filter at <console>:28
    
    scala> rdd2.dependencies
    res2: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@21c882b5)

    partitions

    以数组形式返回RDD各分区地址
    /**
    * Get the array of partitions of this RDD, taking into account whether the
    * RDD is checkpointed or not.
    */
    final def partitions: Array[Partition]
    scala> val rdd = sc.parallelize(1 to 10,2)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
    
    scala> rdd.partitions
    res4: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ParallelCollectionPartition@70c, org.apache.spark.rdd.ParallelCollectionPartition@70d)

    first()

    返回RDD的第一个元素。
    /**
    * Return the first element in this RDD.
    */
    def first(): T
    scala> val rdd = sc.parallelize(1 to 10,2)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
    scala> rdd.first
    res5: Int = 1

    fold(zeroValue: T)(op: (T, T) => T)

    使用zeroValue和每个分区的元素进行聚合运算,最后各分区结果和zeroValue再进行一次聚合运算。
    /**
    * @param zeroValue the initial value for the accumulated result of each partition for the `op`
    * operator, and also the initial value for the combine results from different
    * partitions for the `op` operator - this will typically be the neutral
    * element (e.g. `Nil` for list concatenation or `0` for summation)
    * @param op an operator used to both accumulate results within a partition and combine results
    * from different partitions
    */
    def fold(zeroValue: T)(op: (T, T) => T): T
    scala> val rdd = sc.parallelize(1 to 5)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24
    
    scala> rdd.fold(10)(_+_)
    res13: Int = 35



  • 相关阅读:
    NFS服务简单配置
    TCP/IP网络
    python文件问题
    明明apple id 密码 是对的,登陆 iTunes还总提示密码错误
    台式电脑怎么使用iPhone热点进行上网
    Svn总是提示输入账号密码
    unity的相关技巧
    正则表达式标记
    NGUI用UIGrid加载Item会有部分空出来的解决办法
    win10 硬盘占用率经常100%的可能有用的解决方案
  • 原文地址:https://www.cnblogs.com/alianbog/p/5837396.html
Copyright © 2011-2022 走看看