zoukankan      html  css  js  c++  java
  • Spark RDD Action 简单用例(二)

    foreach(f: T => Unit)

    对RDD的所有元素应用f函数进行处理,f无返回值。
    /**

    * Applies a function f to all elements of this RDD.
    */
    def foreach(f: T => Unit): Unit
    scala> val rdd = sc.parallelize(1 to 9, 2)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
    scala> rdd.foreach(x=>{println(x)})
    [Stage 0:>                                                          (0 + 0) / 2]
    1 2 3 4 5 6 7 8 9
    
    

    foreachPartition(f: Iterator[T] => Unit)

    遍历所有的分区进行f函数操作
    /**

    * Applies a function f to each partition of this RDD.
    */
    def foreachPartition(f: Iterator[T] => Unit): Unit
    
    
    scala> val rdd = sc.parallelize(1 to 9, 2)
    scala> rdd.foreachPartition(x=>{
         | while(x.hasNext){
         | println(x.next)
         | }
         | println("===========")
         | }
         | )
    1
    2
    3
    4
    ===========
    5
    6
    7
    8
    9
    ===========
    
    

    getCheckpointFile

    获取RDD checkpoint的目录.
    /**
    * Gets the name of the directory to which this RDD was checkpointed.
    * This is not defined if the RDD is checkpointed locally.
    */
    def getCheckpointFile: Option[String]
    
    
    scala> val rdd = sc.parallelize(1 to 9,2)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
    
    scala> rdd.checkpoint
    
    /*
    checkpoint操作后直接查询得到None,说明checkpoint是lazy的
    */
    scala> rdd.getCheckpointFile
    res6: Option[String] = None
    
    scala> rdd.count
    res7: Long = 9                                                                  
    
    scala> rdd.getCheckpointFile
    res8: Option[String] = Some(file:/home/check/ca771099-b1bf-46c8-9404-68b4ace7feeb/rdd-1)
    
    

    getNumPartitions

    获取分区数量
    /**
    * Returns the number of partitions of this RDD.
    */
    @Since("1.6.0")
    final def getNumPartitions: Int = partitions.length
    scala> val rdd = sc.parallelize(1 to 9,2)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
    
    scala> rdd.getNumPartitions
    res9: Int = 2

    getStorageLevel

    获取当前RDD的存储级别
    /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
    def getStorageLevel: StorageLevel = storageLevel
    
    
    scala> val rdd = sc.parallelize(1 to 9,2)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
    
    scala> rdd.getStorageLevel
    res10: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)
    
    scala> rdd.cache
    res11: rdd.type = ParallelCollectionRDD[3] at parallelize at <console>:24
    
    scala> rdd.getStorageLevel
    res12: org.apache.spark.storage.StorageLevel = StorageLevel(memory, deserialized, 1 replicas)
    
    

    isCheckpointed

    获取该RDD是否已checkpoint处理
    /**

    * Return whether this RDD is checkpointed and materialized, either reliably or locally.
    */
    def isCheckpointed: Boolean
    scala> val rdd = sc.parallelize(1 to 9,2)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
    
    scala> rdd.isCheckpointed
    res13: Boolean = false
    
    scala> rdd.checkpoint
    
    scala> rdd.isCheckpointed
    res15: Boolean = false
    
    scala> rdd.count
    res16: Long = 9
    
    scala> rdd.isCheckpointed
    res17: Boolean = true

    isEmpty()

    获取RDD是否为空,如果RDD为Nothing或Null,则抛出异常
    /**
    * @note due to complications in the internal implementation, this method will raise an
    * exception if called on an RDD of `Nothing` or `Null`. This may be come up in practice
    * because, for example, the type of `parallelize(Seq())` is `RDD[Nothing]`.
    * (`parallelize(Seq())` should be avoided anyway in favor of `parallelize(Seq[T]())`.)
    * @return true if and only if the RDD contains no elements at all. Note that an RDD
    * may be empty even when it has at least 1 partition.
    */
    def isEmpty(): Boolean
    
    
    scala> val rdd = sc.parallelize(Seq())
    rdd: org.apache.spark.rdd.RDD[Nothing] = ParallelCollectionRDD[5] at parallelize at <console>:24
    
    scala> rdd.isEmpty
    org.apache.spark.SparkDriverExecutionException: Execution error
      at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1187)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1656)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
      at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1305)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
      at org.apache.spark.rdd.RDD.take(RDD.scala:1279)
      at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1413)
      at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1413)
      at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1413)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
      at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1412)
      ... 48 elided
    Caused by: java.lang.ArrayStoreException: [Ljava.lang.Object;
      at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:90)
      at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1884)
      at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1884)
      at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:59)
      at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1656)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    
    scala> val rdd = sc.parallelize(Seq(1 to 9))
    rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[6] at parallelize at <console>:24
    
    scala> rdd.isEmpty
    res19: Boolean = false
    
    
    

    max()

    /**
    * Returns the max of this RDD as defined by the implicit Ordering[T].
    * @return the maximum element of the RDD
    * */
    def max()(implicit ord: Ordering[T]): T

    min()

    /**
    * Returns the min of this RDD as defined by the implicit Ordering[T].
    * @return the minimum element of the RDD
    * */
    def min()(implicit ord: Ordering[T]): T
    
    
    scala> val rdd = sc.parallelize(1 to 9)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24
    
    scala> rdd.max
    res21: Int = 9
    
    scala> rdd.min
    res22: Int = 1
    
    

    reduce(f: (T, T) => T)

    对RDD所有元素进行聚合运算
    /**
    * Reduces the elements of this RDD using the specified commutative and
    * associative binary operator.
    */
    def reduce(f: (T, T) => T): T
    
    
    scala> val rdd = sc.parallelize(1 to 9)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24
    scala> def func(x:Int, y:Int):Int={
         | if(x >= y){
         | x
         | }else{
         | y}
         | }
    func: (x: Int, y: Int)Int
    
    scala> rdd.reduce(func(_,_))
    res23: Int = 9
    
    
    scala> rdd.reduce((x,y)=>{
         | if(x>=y){
         | x
         | }else{
         | y
         | }
         | }
         | )
    res24: Int = 9
    
    

    saveAsObjectFile(path: String)

    将RDD保存指定目录下文件中
    /**

    * Save this RDD as a SequenceFile of serialized objects.
    */
    def saveAsObjectFile(path: String): Unit
    
    
    scala> val rdd = sc.parallelize(1 to 9)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24
    scala> rdd.saveAsObjectFile("/home/check/object")
    
    
    [root@localhost ~]# ls /home/check/object/
    part-00000  _SUCCESS
    
    

    saveAsTextFile(path: String)

    将RDD保存至文本文件
    /**
    * Save this RDD as a text file, using string representations of elements.
    */
    def saveAsTextFile(path: String): Unit
    
    
    scala> val rdd = sc.parallelize(1 to 9)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24
    scala> rdd.saveAsTextFile("/home/check/text")
    [root@localhost ~]# ls /home/check/text/part-00000 
    /home/check/text/part-00000
    [root@localhost ~]# more /home/check/text/part-00000 
    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    

    take(num: Int)

    返回前num个元素。
    /**
    * Take the first num elements of the RDD. It works by first scanning one partition, and use the
    * results from that partition to estimate the number of additional partitions needed to satisfy
    * the limit.
    *
    * @note this method should only be used if the resulting array is expected to be small, as
    * all the data is loaded into the driver's memory.
    *
    * @note due to complications in the internal implementation, this method will raise
    * an exception if called on an RDD of `Nothing` or `Null`.
    */
    def take(num: Int): Array[T]
    
    
    scala> val rdd = sc.parallelize(1 to 9)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:24
    
    scala> rdd.take(5)
    res28: Array[Int] = Array(1, 2, 3, 4, 5)


    takeOrdered(num: Int)

    排序后返回前num个元素
    scala> val rdd = sc.parallelize(List(2,6,3,1,5,9))
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:24
    
    scala> rdd.takeOrdered(3)
    res30: Array[Int] = Array(1, 2, 3)
    def takeSample(
    withReplacement: Boolean,
    num: Int,
    seed: Long = Utils.random.nextLong): Array[T]
    
    
    scala> val rdd = sc.parallelize(List(2,6,3,1,5,9))
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:24
    
    scala> rdd.takeSample(true,6,8)
    res34: Array[Int] = Array(5, 2, 2, 5, 3, 2)
    
    scala> rdd.takeSample(false,6,8)
    res35: Array[Int] = Array(9, 3, 2, 6, 1, 5)
    
    

    top(num: Int)

    降序排列后返回top n
    /*
    * @param num k, the number of top elements to return
    * @param ord the implicit ordering for T
    * @return an array of top elements
    */
    def top(num: Int)(implicit ord: Ordering[T]): Array[T]
    
    
    scala> val rdd = sc.parallelize(List(2,6,3,1,5,9))
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:24
    scala> rdd.top(3)
    res37: Array[Int] = Array(9, 6, 5)
    
    
    



  • 相关阅读:
    基于kafka-net实现的可以长链接的消息生产者
    Windows服务安装、卸载、启动和关闭的管理器
    基于Confluent.Kafka实现的KafkaConsumer消费者类和KafkaProducer消息生产者类型
    [转]C#中HttpClient使用注意:预热与长连接
    基于Confluent.Kafka实现的Kafka客户端操作类使用详解
    [转载]RabbitMQ消息可靠性分析
    ASP.NET Core3.1 MVC 添加验证规则
    asp.net core 3.1 webapi接口参数有时间类型取不到值得问题
    asp.net core 3.1 引用的元包dll版本兼容性问题解决方案
    Python安装和环境配置
  • 原文地址:https://www.cnblogs.com/alianbog/p/5839680.html
Copyright © 2011-2022 走看看