zoukankan      html  css  js  c++  java
  • Spark中的RDD操作简介

    map(func)

    对数据集中的元素逐一处理,变为新的元素,但一个输入元素只能有一个输出元素

    scala> pairData.collect()
    res6: Array[Int] = Array(1, 2, 3, 4, 5)
    scala> val pairData = distData.map(a=>(a,1)).collect()
    res2: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1))
    

    flatMap(func)

    类似与map,对数据集中的元素逐一处理,变为新的元素,但一个输入元素可以被映射为0或多个输出元素

    scala> val file = sc.textFile("/tmp/input")
    scala> file.collect()
    res11: Array[String] = Array("1 2 3 4 ", test hello world, 123 8997 876, hai bai du) // 每一行为一个元素
    scala> file.flatMap(a => a.split(" ")).collect()
    res12: Array[String] = Array(1, 2, 3, 4, test, hello, world, 123, 8997, 876, hai, bai, du)
    

    filter(func)

    对数据集中的元素注意处理,返回经过func函数计算后返回值为true的输入元素组成

    scala> pairData.collect()
    res7: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1))
    scala> pairData.filter(r => r._1 == r._2).collect()
    res8: Array[(Int, Int)] = Array((1,1))
    

    mapValues(func)

    对数据集中的value进行逐个处理, 如RRD.mapValues(v => 1.0/20),将所有的value变为1.0/20

    scala> pairData.collect()
    res2: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1))
    scala> pairData.mapValues( v=> 1.0/5 ).collect()
    res5: Array[(Int, Double)] = Array((1,0.2), (2,0.2), (3,0.2), (4,0.2), (5,0.2))
    

    distinct()

    返回一个包含源数据集中所有不重复元素的新数据集

    scala> val a = Array(1,1,3,3,4)
    a: Array[Int] = Array(1, 1, 3, 3, 4)
    scala> sc.parallelize(a).distinct().collect()
    res6: Array[Int] = Array(1, 3, 4)
    

    groupByKey()

    对相同key的数据进行group操作,在一个(K,V)对的数据集上调用,返回一个(K,Seq[V])对的数据集

    scala> pairData.collect()
    res7: Array[(Int, Int)] = Array((1,1), (1,1), (3,1), (3,1), (4,1))
    scala> pairData.groupByKey().collect()
    res9: Array[(Int, Seq[Int])] = Array((1,ArrayBuffer(1, 1)), (3,ArrayBuffer(1, 1)), (4,ArrayBuffer(1)))
    

    reduceByKey(func)

    使用指定的reduce函数,将相同key的值聚合到一起,并执行函数

    scala> pairData.collect()
    res7: Array[(Int, Int)] = Array((1,1), (1,1), (3,1), (3,1), (4,1))
    scala> pairData.reduceByKey(_+_).collect()
    res10: Array[(Int, Int)] = Array((1,2), (3,2), (4,1))
    

    sortByKey([ascending], [numTasks])

    scala> pairData.collect()
    res7: Array[(Int, Int)] = Array((1,1), (1,1), (3,1), (3,1), (4,1))
    scala> pairData.sortByKey(false).collect
    res12: Array[(Int, Int)] = Array((4,1), (3,1), (3,1), (1,1), (1,1))
    

    union(otherDataSet)

    返回一个新的数据集,新数据集是由源数据集和参数数据集联合而成

    scala> pairData.collect()
    res16: Array[(Int, Int)] = Array((1,1), (1,1), (3,1), (3,1), (4,1))
    scala> pairData2.collect()
    res14: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (7,1))
    scala> pairData.union(pairData2).collect()
    res15: Array[(Int, Int)] = Array((1,1), (1,1), (3,1), (3,1), (4,1), (1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (7,1))
    

    join(otherDataSet)

    在类型为(K,V)和(K,W)类型的数据集上调用时,返回一个相同key对应的所有元素对在一起的(K, (V, W))数据集

    scala> pairData.collect()
    res16: Array[(Int, Int)] = Array((1,1), (1,1), (3,1), (3,1), (4,1))
    scala> pairData2.collect()
    res14: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (7,1))
    scala> pairData.distinct().join(pairData2.distinct()).collect()
    res18: Array[(Int, (Int, Int))] = Array((1,(1,1)), (3,(1,1)), (4,(1,1)))
    

    cogroup(otherDataSet)

    在类型为(K,V)和(K,W)的数据集上调用,返回一个 (K, Seq[V], Seq[W])元组的数据集

    scala> pairData.collect()
    res16: Array[(Int, Int)] = Array((1,1), (1,1), (3,1), (3,1), (4,1))
    scala> pairData2.collect()
    res14: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (7,1))
    scala> pairData.distinct().cogroup(pairData2.distinct()).collect()
    res19: Array[(Int, (Seq[Int], Seq[Int]))] = Array((5,(ArrayBuffer(),ArrayBuffer(1))), (6,(ArrayBuffer(),ArrayBuffer(1))), (1,(ArrayBuffer(1),ArrayBuffer(1))), (2,(ArrayBuffer(),ArrayBuffer(1))), (7,(ArrayBuffer(),ArrayBuffer(1))), (3,(ArrayBuffer(1),ArrayBuffer(1))), (4,(ArrayBuffer(1),ArrayBuffer(1))))
    

    cartesian(otherDataSet)

    笛卡尔积,在类型为 T 和 U 类型的数据集上调用时,返回一个 (T, U)对数据集(两两的元素对)

    scala> pairData.distinct().collect()
    res16: Array[(Int, Int)] = Array((1,1), (3,1), (4,1))
    scala> pairData2.collect()
    res14: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (7,1))
    scala> pairData.distinct().cartesian(pairData2).collect()
    res20: Array[((Int, Int), (Int, Int))] = Array(((3,1),(1,1)), ((3,1),(2,1)), ((3,1),(3,1)), ((3,1),(4,1)), ((3,1),(5,1)), ((3,1),(6,1)), ((3,1),(7,1)), ((4,1),(1,1)), ((4,1),(2,1)), ((4,1),(3,1)), ((4,1),(4,1)), ((4,1),(5,1)), ((4,1),(6,1)), ((4,1),(7,1)), ((1,1),(1,1)), ((1,1),(2,1)), ((1,1),(3,1)), ((1,1),(4,1)), ((1,1),(5,1)), ((1,1),(6,1)), ((1,1),(7,1)))
    

    sample(withReplacement,fraction, seed)

    返回一个数组,在数据集中随机采样num个元素组成,可以选择是否用随机数替换不足的部分,Seed用于指定的随机数生成器种子

    scala> pairData.collect()
    res16: Array[(Int, Int)] = Array((1,1), (1,1), (3,1), (3,1), (4,1))
    scala> pairData.sample(false, 0.2, 1)
    res34: Array[(Int, Int)] = Array((4,1))
    

    reduce(func)

    通过函数func(接受两个参数,返回一个参数)聚集数据集中的所有元素。

    scala> val c = Array(1, 2, 3, 4, 5)
    c: Array[Int] = Array(1, 2, 3, 4, 5)
    scala> sc.parallelize(c).reduce(_+_)
    res24: Int = 15
    

    collect()

    以数组的形式,返回数据集的所有元素

    scala> pairData.collect()
    res16: Array[(Int, Int)] = Array((1,1), (1,1), (3,1), (3,1), (4,1))
    

    count()

    返回数据集的元素的个数

    scala> pairData.collect()
    res16: Array[(Int, Int)] = Array((1,1), (1,1), (3,1), (3,1), (4,1))
    scala> pairData.count()
    res29: Long = 5
    

    first()

    返回数据集中的第一个元素

    scala> pairData.collect()
    res16: Array[(Int, Int)] = Array((1,1), (1,1), (3,1), (3,1), (4,1))
    scala> pairData.first()
    res30: (Int, Int) = (1,1)
    

    take(n)

    返回一个由数据集的前n个元素组成的数组。

    scala> pairData.collect()
    res16: Array[(Int, Int)] = Array((1,1), (1,1), (3,1), (3,1), (4,1))
    scala> pairData.take(3)
    res31: Array[(Int, Int)] = Array((1,1), (1,1), (3,1))
    

    takeSample(withReplacement,num, seed)

    返回一个数组,在数据集中随机采样num个元素组成,可以选择是否用随机数替换不足的部分,Seed用于指定的随机数生成器种子

    scala> pairData.collect()
    res16: Array[(Int, Int)] = Array((1,1), (1,1), (3,1), (3,1), (4,1))
    scala> pairData.takeSample(false, 2, 1)
    res36: Array[(Int, Int)] = Array((3,1), (3,1))
    

    countByKey()

    返回一个(K,Int)对的Map,表示每一个key对应的元素个数

    scala> pairData.countByKey()
    res37: scala.collection.Map[Int,Long] = Map(3 -> 2, 4 -> 1, 1 -> 2)
    

    saveAsTextFile(path)

    将数据集的元素,以textfile的形式,保存到本地文件系统,HDFS或者任何其它hadoop支持的文件系统。

    saveAsSequenceFile(path)

    将数据集的元素,以Hadoop sequencefile的格式,保存到指定的目录下,本地系统,HDFS或者任何其它hadoop支持的文件系统。

    foreach()

    在数据集的每一个元素上,运行函数func进行更新。这通常用于边缘效果,例如更新一个累加器

    scala> val accum = sc.accumulator(0)
    scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
    scala> accum.value
    res47: Int = 10
  • 相关阅读:
    python--脚本传参与shell脚本传参(位置参数)
    python--一起来盖个时间戳!!
    python--多线程的应用
    python-局域网内实现web页面用户端下载文件,easy!
    java初始化顺序
    java List<String>的初始化 的一个小问题
    java.util.Queue用法
    C#中的struct(结构)为值类型,struct类型全接触
    Java 实例
    git还原某个特定的文件到之前的版本
  • 原文地址:https://www.cnblogs.com/Cherise/p/4358965.html
Copyright © 2011-2022 走看看