zoukankan      html  css  js  c++  java
  • Spark-Core RDD行动算子

    1、reduce(func)

    通过func函数聚集RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据。

    scala> val rdd1 = sc.parallelize(1 to 100)
    scala> rdd1.reduce(_ + _)
    res0: Int = 5050
    
    scala> val rdd2 = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3)))
    scala> rdd2.reduce((x, y) => (x._1 + y._1, x._2 + y._2))
    res2: (String, Int) = (abc,6)
    

    2、collect

    数组的形式返回 RDD 中的所有元素.

    所有的数据都会被拉到 driver 端, 所以要慎用

    3、count

    返回 RDD 中元素的个数.

    4、take(n)

    返回 RDD 中前 n 个元素组成的数组.

    take 的数据也会拉到 driver 端, 应该只对小数据集使用

    5、first

    返回 RDD 中的第一个元素. 类似于take(1).

    6、takeOrdered(n,[ordering])

    返回排序后的前 n 个元素, 默认是升序排列.

    数据也会拉到 driver 端

    scala> val rdd1 = sc.makeRDD(Array(100, 20, 130, 500, 60))
    scala> rdd1.takeOrdered(2)
    res6: Array[Int] = Array(20, 60)
        
    scala> rdd1.takeOrdered(2)(Ordering.Int.reverse)
    res7: Array[Int] = Array(500, 130)
    

    7、aggregate

    def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
    

    aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数每个分区的结果初始值(zeroValue)进行combine操作

    这个函数最终返回的类型不需要和RDD中元素类型一致

    zeroValue分区内聚合和分区间聚合的时候各会使用一次
    scala> val rdd1 = sc.makeRDD(Array(100, 30, 10, 30, 1, 50, 1, 60, 1), 2)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at makeRDD at <console>:24
    
    scala> rdd1.aggregate(0)(_ + _, _ + _)
    res12: Int = 283
    
    scala> val rdd1 = sc.makeRDD(Array("a", "b", "c", "d"), 2)
    rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at makeRDD at <console>:24
    
    scala> rdd1.aggregate("-")(_ + _, _ + _)
    res13: String = --ab-cd
    

    8、fold

    折叠操作,aggregate的简化操作,seqop和combop一样的时候,可以使用fold

    scala> val rdd1 = sc.makeRDD(Array(100, 30, 10, 30, 1, 50, 1, 60, 1), 2)
    scala> rdd1.fold(0)(_ + _)
    scala> val rdd1 = sc.makeRDD(Array("a", "b", "c", "d"), 2)
    scala> rdd1.fold("-")(_ + _)
    res17: String = --ab-cd
    

    9、saveAsTextFile(path)

    作用:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark 将会调用toString方法,将它装换为文件中的文本

    10、saveAsSequenceFile(path)

    作用:将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录下,可以使 HDFS 或者其他 Hadoop 支持的文件系统。

    11、saveAsObjectFile(path)

    作用:用于将 RDD 中的元素序列化成对象,存储到文件中。

    12、countByKey()

    作用:针对(K,V)类型的 RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。

    应用: 可以用来查看数据是否倾斜

    scala> val rdd1 = sc.parallelize(Array(("a", 10), ("a", 20), ("b", 100), ("c", 200)))
    
    scala> rdd1.countByKey()
    res19: scala.collection.Map[String,Long] = Map(b -> 1, a -> 2, c -> 1)
    
    

    13、foreach(func)

    作用: 针对 RDD 中的每个元素都执行一次func

    每个函数是在 Executor 上执行的, 不是在 driver 端执行的.

  • 相关阅读:
    团队项目-需求分析报告
    自动化测试框架指南
    一起吐槽接口文档
    居家费拖鞋【FunTester居家日记】
    HTTP接口测试基础【FunTester框架教程】
    Java NIO在接口自动化中应用
    JAVA 可变参数
    HashSet 和 LinkedSet 数据结构
    简单数据结构
    JAVA 迭代器的简单使用
  • 原文地址:https://www.cnblogs.com/hyunbar/p/12048488.html
Copyright © 2011-2022 走看看