zoukankan      html  css  js  c++  java
  • sparkRDD相关操作

    RDD(弹性分布式数据集)。RDD以分区中的每一行进行分布式计算。父子依赖关系。

    一、RDD创建操作

    1)数据集合

    Val data=Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

    Val distData = sc.parallelize(data, 3) #分区,生成RDD数据集

    Val distData =sc.parallelize(1 to 10, 2) #2是并行程度,指定多少线程同时执行。

    distData.collect

    distData.take(1)

    sc.makeRDD(1 to 10, 4).map(e=> {val tname=Thread.currentThread().getName; println(tname + ":" +e)}).collect

    2)外部读取

    Val distFile1 = sc.textFile(“data.txt”) /#本地当前目录下文件或指定目录下文件

    Val distFile2 = sc.textFile(“hdfs://192.168.1.100:9000/input/data.txt”)#HDFS文件

                   textFile(“/input/001.txt, /input/002.txt”)#读取多个文件

                   textFile(“/input/*.txt”)#读取含通配符路径

    二、RDD转换操作(不会立即执行,返回RDD)

    1)   Map

    Map是对RDD中每个元素都执行一个指定的函数来生成一个新的RDD

    Val rdd1=sc.parallelize(1 to 9, 3)

    Val rdd2=rdd1.map(x=>x*2)

    Rdd2.collect

    2)   Filter

    Filter是对RDD元素进行过滤,返回一个新的数据集,是经过func函数后返回值为True的原元素组成

    Val rdd3=rdd2.filter (x=>x>10)

    (12, 14, 16)

    3)   Top

    提取rdd最大的n个元素

    rdd1.top(1)

    rdd1.top(1)(scala.math.Ordering.String.reverse) #倒序 

    4)   flatMap

    类似于map,但是它是一对多关系

    rdd3.flatMap(x => x to 20)

    (12,13,14,15,16,17,18,19,20,14,15,16,17,18,19,20,16,17,18,19,20)

    5)   mapPartitons

    是map的一种变种,mapPartitions的输入函数是每个分区的数据,也就是把每个分区中的内容作为整体来处理 。

    6)   repatition

    再分区

    rdd.repartition(4)

    7)   sample

       Sample(withReplacement, fraction, seed) 第一个参数是是否为又放回抽样,第二个参数是比例。

       Val a=sc.parallelize(1 to 10000,3)

    a.sample(false, 0.1).collect().foreach(println)

    8)   union

    数据合并,返回一个新的数据集

    Val rdd8=rdd1.union(rdd3)

    Rdd8.collect

    9)   intersection

    数据交集

    Val rdd9=rdd8.intersection(rdd1)

    10)  distinct

    数据去重

    Val rdd10=rdd8.union(rdd9).distinct

    11)  groupBy

    对RDD元素进行分组

    val rdd = sc.parallelize(Array((“tom”,10),(“tomas”,12),(“tomlee”,12),(“tomsan”,10))

    val rdd2 = rdd.groupBy(e => e _2)

    rdd2.collect()

    Array((10.CompareBuffer((tom,10),(tomsam,10)),12.CompareBuffer((tomas,12),(tomlee,12))))

    12)  groupByKey

       根据Key进行分组,迭代部分都是value

       Val rdd0=sc.parallelize(Array((1,1),(1,2),(1,3),(2,1),(2,2),(2,3)),3)

       Val rdd1=rdd0.groupByKey()

       Array((1,ArrayBuffer(1,2,3)),(2,ArrayBuffer(1,2,3)))

    13)  groupWith

    两个RDD

    100->tom

    200->tomas

    100->20

    200->30

    Val rdd1 = sc.makeRDD(Array((100,”tom”),(200, “tomas”)))

    Val rdd2 = sc.makeRDD(Array((100, 20),(200, 30)))

    rdd1.groupWith(rdd2)

    Array((100,(CompactBuffer(tom),CompactBuffer(20))),(200,(CompactBuffer(tomas),CompactBuffer(30))))

    14)  reduceBykey

    数组的分组聚合操作

    Val rdd12=rdd0.reduceByKey((x,y)=>x+y)

    Array((1,6),(2,6))

    15)  aggregeteByKey

    更加灵活的一个函数。三个参数,第一个是初始值,第二个是给每个元素值进行的函数操作,第三个是根据Key做相应的合并操作

    Val z=sc.parallelize(list((1,3),(1,2),(1,4),(2,3)))

    z.aggregateByKey(0)(math.max(_,_),_+_)  #先将每个value值与初始值0比较大小,然后根据Key求和。

    Array((2,3),(1,9))

    16)  conbineByKey

        更加灵活的一个函数,与reduceByKey不同,它可以同时计算求和和求次数。根据Key进行聚合操作。

    17)  sortByKey

    排序操作

    Val rdd14=rdd0.sortBykey()

    Rdd14.collect

    18)  join

       连接两个RDD,形成新的rdd,(跟groupwith区别是join是一对一,groupwith是分组,相同的放一起)

     (1 tom) join (1 100) --> 1, (tom, 100)

    (2 tomas) join (2 80) --> 2, (tomas, 80)

    Val rdd1 = sc.makeRDD(Array((1, “tom”), (2, “tomas”)))

    Val rdd1 = sc.makeRDD(Array((1, 100), (2, 80)))

    rdd1.join(rdd2).collect()

    array((1,(tom,800)),(2,(tomas,700)))

    19)  intersection

    提取RDD之间的交集

     val rdd1 = sc.makeRDD(Array(“tom”,”tomas”,”tomaslee”))

    val rdd2 = sc.makeRDD(Array(“tomas”,”tomaslee”,”tomason”))

    rdd1.intersection().collect()

    Array(tomaslee, tomas)

    20)  cogroup

    输入数据集(k, v)和另外一个数据集(k, w)进行cogroup,得到一个格式(k, Seq[v], Seq[W])的数据集。

    rdd0 = sc.makeRDD(Array((1, “tom”),(2,”tomas”),(3,”tomasLee”) ))

    rdd0.cogroup(sc.makeRDD(Array((1,”hebei”),(2,”henan”),(3,”hexi”))))

    Array((1,(CompactBuffer(tom),CompactBuffer(hebei))), (2,(CompactBuffer(tomas),CompactBuffer(henan))), (3,(CompactBuffer(tomasLee),CompactBuffer(hex))))

    21)  cache / persist

    cache是特殊的persist,只在内存中对RDD的结果进行保存(一旦关掉就没有了)。

    val rdd = sc.makeRDD(1 to 10).map(e=>(println(e);e))

    rdd.collect

    rdd.cache

    rdd.collect

    rdd.presist() == rdd.persist(StorageLevel.MEMORY_ONLY)

    rdd.presist(org.apache.spark.storage.StorageLevel.DISK_ONLY)

    22)  pipe

    对每个分区执行shell命令

    val r = sc.makeRDD(1 to 5).pipe(“echo hahah”).collect #hahah个数同分区数

    val rdd18=sc.parallelize(1 to 9,3)

    rdd18.pipe(“head -n 1”).collect #取每个分区的第一个数

    23)  randomSplit

     Val rdd19=rdd1.randomSplit(Array(0.3,0.7), 1)

    rdd19(0).collect

    rdd19(1).collect

    24)  Zip

     Val rdd21_1=sc.parallelize(Array(1,2,3,4), 3)

     Val rdd21_2=sc.parallelize(Array(“a”,”b”,”c”,”d”), 3)

     Val rdd21_3=rdd21_1.zip(rdd21_2)

    将每个分区的所有元素放到一个数组中,形成RDD,RDD的每个元素是数组,数组长度等于分区个数

     Val rdd = sc. parallelize(1 to 10, 4).glom().collect()

    Array<array<1,2>, array<3,4,5>,array<6,7>,array<8,9,10>>

    25)  keyBy

    将rdd的元素和一个变换之后的值组合形成元组

    val rdd = sc.makeRDD(1 to 10)

    rdd.keyBy(_ * 2).collect

    Array[(Int, Int)] = Array((2,1), (4,2), (6,3), (8,4), (10,5), (12,6), (14,7), (16,8), (18,9), (20,10))

    26)  max | min | mean

    Rdd.max 

    27)  repartitionAndSortWithinPartitions

    通过指定分区函数实现再分区并在分区内排序

    二、RDD行动操作(会立即执行,返回数组)

    1)     reduce

    val rdd1=sc.parallelize(1 to 9, 3)

    val rdd2=rdd1.reduce(_+_)

    2)     collect

    3)     count

    4)     first

    5)     take

    6)     takesample

    类似于sample,但takeSample是行动操作,所以返回的是数组

    Rdd1.takeSample(true, 4)

    7)     takeOrdered

    takeOrdered(n, [ordering])是返回包含随机的n个元素的数组,按照顺序输出

    8)SaveAsTextFile

       把数据集中的元素写到一个文本文件,Spark会对每个元素调用toString方法来把每个元素存成文本文件的一行。

       r.saveAsTextFile(“/home/centos/aa”)

       cd aa/

       find .

       ll

      nano part-0000

    9)     saveAsSequenceFile

    r.map(w=>(w,1)).saveAsSequenceFile(“home/centos/bb”)

    cd bb/

    ls

    hdfs dfs -text file:///home/centos/bb/part-00000

    10)countByKey

    对于(k,v)类型的RDD,返回一个(k,int)的map, int为k的个数。

     Val rdd =

    sc.makeRDD(Array(1,”tom”),(2,”tomas”),(1,”tomasLee”))).countByKey.foreach(e=>println(e))

    结果 (1,2)

            (2,1)

    11)foreach

       Foreach(func)是对数据集中的每个元素都执行func函数

  • 相关阅读:
    一键java环境配置
    eclipse + tomcat7 + maven 配置过程
    eclipse/myeclipse link 方式安装插件
    eclipse maven plugin 插件 安装 和 配置
    Spring MVC 教程,快速入门,深入分析
    Spring MVC 框架搭建及详解
    Javassist介绍
    OO的奇妙冒险4
    OO的奇妙冒险3
    OO的奇妙冒险2
  • 原文地址:https://www.cnblogs.com/fionacai/p/8805722.html
Copyright © 2011-2022 走看看