zoukankan      html  css  js  c++  java
  • Spark RDD Action 简单用例(一)

    collectAsMap(): Map[K, V]

    返回key-value对,key是唯一的,如果rdd元素中同一个key对应多个value,则只会保留一个。
    /**

    * Return the key-value pairs in this RDD to the master as a Map.
    *
    * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only
    * one value per key is preserved in the map returned)
    *
    * @note this method should only be used if the resulting data is expected to be small, as
    * all the data is loaded into the driver's memory.
    */
    def collectAsMap(): Map[K, V]
    scala> val rdd = sc.parallelize(List(("A",1),("A",2),("A",3),("B",1),("B",2),("C",3)),3)
    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
    
    scala> rdd.collectAsMap
    res0: scala.collection.Map[String,Int] = Map(A -> 3, C -> 3, B -> 2)   
    
    

    countByKey(): Map[K, Long]

    计算有多少个不同的key.
    /**
    * Count the number of elements for each key, collecting the results to a local Map.
    *
    * Note that this method should only be used if the resulting map is expected to be small, as
    * the whole thing is loaded into the driver's memory.
    * To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which
    * returns an RDD[T, Long] instead of a map.
    */
    def countByKey(): Map[K, Long] = self.withScope {
    self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
    }

    scala> val rdd = sc.parallelize(List((1,1),(1,2),(1,3),(2,1),(2,2),(2,3)),3)
    rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[5] at parallelize at <console>:24
    
    scala> rdd.countByKey
    res5: scala.collection.Map[Int,Long] = Map(1 -> 3, 2 -> 3)

     countByValue()

    计算不同的value个数,该函数首先通过map将每个元素转成(value,null)的key-value(value为null)对,
    然后调用countByKey进行统计。


    /**
    * Return the count of each unique value in this RDD as a local map of (value, count) pairs.
    *
    * Note that this method should only be used if the resulting map is expected to be small, as
    * the whole thing is loaded into the driver's memory.
    * To handle very large results, consider using rdd.map(x =&gt; (x, 1L)).reduceByKey(_ + _), which
    * returns an RDD[T, Long] instead of a map.
    */
    def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {
    map(value => (value, null)).countByKey()
    }
    scala> val rdd = sc.parallelize(List(1,2,3,4,5,4,4,3,2,1))
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:24
    
    scala> rdd.countByValue
    res12: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 2, 2 -> 2, 3 -> 2, 4 -> 3)

    lookup(key: K)

    根据key值搜索所有的value.
    /**

    * Return the list of values in the RDD for key `key`. This operation is done efficiently if the
    * RDD has a known partitioner by only searching the partition that the key maps to.
    */
    def lookup(key: K): Seq[V]
    scala> val rdd = sc.parallelize(List(("A",1),("A",2),("A",3),("B",1),("B",2),("C",3)),3)
    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24
    
    scala> rdd.lookup("A")
    res2: Seq[Int] = WrappedArray(1, 2, 3)

    checkpoint()

     将RDD数据根据设置的checkpoint目录保存至硬盘中。

    /**
    * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
    * directory set with `SparkContext#setCheckpointDir` and all references to its parent
    * RDDs will be removed. This function must be called before any job has been
    * executed on this RDD. It is strongly recommended that this RDD is persisted in
    * memory, otherwise saving it on a file will require recomputation.
    */
    def checkpoint(): Unit
    /*通过linux命令创建/home/check目录后,设置checkpoint directory*/
    scala> sc.setCheckpointDir("/home/check")
    
    scala> val rdd = sc.parallelize(List(("A",1),("A",2),("A",3),("B",1),("B",2),("C",3)),3)
    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24
    
    /*
    *执行下面的代码会在/home/check目录下创建一个空的目录/home/check/5545e4ca-d53d-4d93-aaf4-fd3c74f1ea49
    */
    scala> rdd.checkpoint
    
    /*
    执行count后会在上述目录下创建一个rdd目录,rdd目录下是数据文件
    */
    scala> rdd.count
    res5: Long = 6           
    [root@localhost ~]# ll -a /home/check/5545e4ca-d53d-4d93-aaf4-fd3c74f1ea49/
    total 8
    drwxr-xr-x. 2 root root 4096 Sep  4 10:29 .
    drwxr-xr-x. 3 root root 4096 Sep  4 10:29 ..
    [root@localhost ~]# ll -a /home/check/5545e4ca-d53d-4d93-aaf4-fd3c74f1ea49/
    total 12
    drwxr-xr-x. 3 root root 4096 Sep  4 10:30 .
    drwxr-xr-x. 3 root root 4096 Sep  4 10:29 ..
    drwxr-xr-x. 2 root root 4096 Sep  4 10:30 rdd-6
    [root@localhost ~]# ll -a /home/check/5545e4ca-d53d-4d93-aaf4-fd3c74f1ea49/rdd-6/
    total 32
    drwxr-xr-x. 2 root root 4096 Sep  4 10:30 .
    drwxr-xr-x. 3 root root 4096 Sep  4 10:30 ..
    -rw-r--r--. 1 root root  171 Sep  4 10:30 part-00000
    -rw-r--r--. 1 root root   12 Sep  4 10:30 .part-00000.crc
    -rw-r--r--. 1 root root  170 Sep  4 10:30 part-00001
    -rw-r--r--. 1 root root   12 Sep  4 10:30 .part-00001.crc
    -rw-r--r--. 1 root root  170 Sep  4 10:30 part-00002
    -rw-r--r--. 1 root root   12 Sep  4 10:30 .part-00002.crc

    collect()

    返回RDD所有元素的数组。
    /**

    * Return an array that contains all of the elements in this RDD.
    *
    * @note this method should only be used if the resulting array is expected to be small, as
    * all the data is loaded into the driver's memory.
    */
    def collect(): Array[T]
    scala> val rdd = sc.parallelize(1 to 10,3)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:24
    
    scala> rdd.collect
    res8: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

    toLocalIterator: Iterator[T]

    返回一个包含所有算的迭代器。
    /**
    * Return an iterator that contains all of the elements in this RDD.
    *
    * The iterator will consume as much memory as the largest partition in this RDD.
    *
    * Note: this results in multiple Spark jobs, and if the input RDD is the result
    * of a wide transformation (e.g. join with different partitioners), to avoid
    * recomputing the input RDD should be cached first.
    */
    def toLocalIterator: Iterator[T]
    scala> val rdd = sc.parallelize(1 to 10,2)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
    
    scala> val it = rdd.toLocalIterator
    it: Iterator[Int] = non-empty iterator
    
    scala> while(it.hasNext){
         | println(it.next)
         | }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

    count()

    返回RDD中元素的数量。
    /**
    * Return the number of elements in the RDD.
    */
    def count(): Long
    scala> val rdd = sc.parallelize(1 to 10,2)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
    scala> rdd.count
    res1: Long = 10

    dependencies

    返回该RDD的依赖RDD的地址。
    /**
    * Get the list of dependencies of this RDD, taking into account whether the
    * RDD is checkpointed or not.
    */
    final def dependencies: Seq[Dependency[_]]
    scala> val rdd = sc.parallelize(1 to 10,2)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
    scala> val rdd1 = rdd.filter(_>3)
    rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at filter at <console>:26
    
    scala> val rdd2 = rdd1.filter(_<6)
    rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at filter at <console>:28
    
    scala> rdd2.dependencies
    res2: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@21c882b5)

    partitions

    以数组形式返回RDD各分区地址
    /**
    * Get the array of partitions of this RDD, taking into account whether the
    * RDD is checkpointed or not.
    */
    final def partitions: Array[Partition]
    scala> val rdd = sc.parallelize(1 to 10,2)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
    
    scala> rdd.partitions
    res4: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ParallelCollectionPartition@70c, org.apache.spark.rdd.ParallelCollectionPartition@70d)

    first()

    返回RDD的第一个元素。
    /**
    * Return the first element in this RDD.
    */
    def first(): T
    scala> val rdd = sc.parallelize(1 to 10,2)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
    scala> rdd.first
    res5: Int = 1

    fold(zeroValue: T)(op: (T, T) => T)

    使用zeroValue和每个分区的元素进行聚合运算,最后各分区结果和zeroValue再进行一次聚合运算。
    /**
    * @param zeroValue the initial value for the accumulated result of each partition for the `op`
    * operator, and also the initial value for the combine results from different
    * partitions for the `op` operator - this will typically be the neutral
    * element (e.g. `Nil` for list concatenation or `0` for summation)
    * @param op an operator used to both accumulate results within a partition and combine results
    * from different partitions
    */
    def fold(zeroValue: T)(op: (T, T) => T): T
    scala> val rdd = sc.parallelize(1 to 5)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24
    
    scala> rdd.fold(10)(_+_)
    res13: Int = 35



  • 相关阅读:
    Windows server 2016 解决“无法完成域加入,原因是试图加入的域的SID与本计算机的SID相同。”
    Windows Server 2016 辅助域控制器搭建
    Windows Server 2016 主域控制器搭建
    Net Framework 4.7.2 覆盖 Net Framework 4.5 解决办法
    SQL SERVER 2012更改默认的端口号为1772
    Windows下彻底卸载删除SQL Serever2012
    在Windows Server2016中安装SQL Server2016
    SQL Server 创建索引
    C#控制台或应用程序中两个多个Main()方法的设置
    Icon cache rebuilding with Delphi(Delphi 清除Windows 图标缓存源代码)
  • 原文地址:https://www.cnblogs.com/alianbog/p/5837396.html
Copyright © 2011-2022 走看看