zoukankan      html  css  js  c++  java
  • spark学习(10)-RDD的介绍和常用算子

    RDD(弹性分布式数据集,里面并不存储真正要计算的数据,你对RDD的操作,
    他会在Driver端转换成Task,下发到Executor计算分散在多台集群上的数据)


    RDD是一个代理,你对代理进行操作,他会生成Task,帮你计算
    你操作这个代理,就像操作本地集合一样,不用关心任务调度,容错等

    RDD的属性

    1)一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。

     

    2)一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。

     

    3)RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

     

    4)一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

     

    5)一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”

    val r1 = sc.textFile("hdfs://hdp-02:9000/wc")

    r1.count //这样就统计出有多少行

    创建RDD的方式
    生成一个RDD
    sc.textFile("hdfs://hdp-01:9000/wc")
    源代码还是hadoop的API
    RDD是一个基本的抽象
    RDD的算子一类是Transformation(Lazy)
    一种是Action(触发任务执行)

    创建方式
    1。通过外部的储存系统创建RDD
    val rdd1 = sc.textFile("hdfs://hdp-01:9000/wc")
    2.将Driver的scala集合通过并行化的方式编程RDD(学习,实验
    val arr = Array(1,2,3,4,5,6)
    val rdd2 = sc.parallelize(arr) //此时rdd2为集合转化成的一个RDD
    3.条用一个已经存在的RDD的Transformation,生成一个新的RDD
    val rdd3 = rdd.map(_ * 10))//这个rdd3就是通过rdd2转化而得到的

     

     

    #RDD中的常用算子(方法) 
    #常用Transformation(即转换,延迟加载)
    
    //特点是1.lazy
    //2.生成新的RDD 
    #通过并行化scala集合创建RDD
    val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
    #查看该rdd的分区数量
    rdd1.partitions.length
    
    
    val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10))//把list转化成rdd 
    val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x,true)//按照本身的数字排序 
    val rdd3 = rdd2.filter(_>10) //过滤出rdd2中大于10的 
    val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x+"",true)//传进来的排序规则是按照字符串排序的,下同 
    val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x.toString,true)
    
    
    val rdd4 = sc.parallelize(Array("a b c", "d e f", "h i j"))
    rdd4.flatMap(_.split(' ')).collect
    
    val rdd5 = sc.parallelize(List(List("a b c", "a b b"),List("e f g", "a f g"), List("h i j", "a a b")))
    
    
    List("a b c", "a b b") =List("a","b",))
    
    rdd5.flatMap(_.flatMap(_.split(" "))).collect
    
    #union求并集,注意类型要一致
    val rdd6 = sc.parallelize(List(5,6,4,7))
    val rdd7 = sc.parallelize(List(1,2,3,4))
    val rdd8 = rdd6.union(rdd7)//将两个Rdd合成一个rdd 不去重 
    rdd8.distinct.sortBy(x=>x).collect
    
    #intersection求交集
    val rdd9 = rdd6.intersection(rdd7) //将两个rdd合成一个交集的rdd 
    
    #join(连接)
    val rdd11 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3)))
    val rdd12 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7), ("tom", 2)))
    
    
    val rdd13 = rdd11.join(rdd12)
    //结果为Array[(String, (Int, Int))] = Array((tom,(1,8)), (tom,(1,2)), (jerry,(2,9))) 
    val rdd13 = rdd11.leftOuterJoin(rdd12)
    //结果为Array[(String, (Int, Option[Int]))] = Array((tom,(1,Some(8))), (tom,(1,Some(2))), (kitty,(3,None)), (jerry,(2,Some(9)))) 
    val rdd13 = rdd11.rightOuterJoin(rdd12)
    //结果为Array[(String, (Option[Int], Int))] = Array((tom,(Some(1),8)), (tom,(Some(1),2)), (shuke,(None,7)), (jerry,(Some(2),9))) 
    
    
    #groupByKey
    val rdd3 = rdd1 union rdd2
    rdd3.groupByKey
    //Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(8, 2, 1)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3)), (jerry,CompactBuffer(2, 9)))
    rdd3.groupByKey.map(x=>(x._1,x._2.sum))//结果与下同 
    rdd3.groupByKey.mapValues(_.sum).collect
    //Array[(String, Int)] = Array((tom,11), (shuke,7), (kitty,3), (jerry,11))
    
    
    
    #WordCount
    
    //reduceByKey是先局部聚合  然后shuffle,shuffle的次数少,然后groupByKey再map的话是要把很多(单词,1)shuffle到一台机器上再聚合,这样shuffle的次数会很多
    sc.textFile("/root/words.txt").flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect
    sc.textFile("/root/words.txt").flatMap(x=>x.split(" ")).map((_,1)).groupByKey.map(t=>(t._1, t._2.sum)).collect
    
    #cogroup
    val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
    val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
    //Array[(String, (Iterable[Int], Iterable[Int]))] = Array((tom,(CompactBuffer(1, 2),CompactBuffer(1))), 
    //(shuke,(CompactBuffer(),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer())), (jerry,(CompactBuffer(3),CompactBuffer(2)))) 
    val rdd3 = rdd1.cogroup(rdd2)
    val rdd4 = rdd3.map(t=>(t._1, t._2._1.sum + t._2._2.sum))
    
    #cartesian笛卡尔积
    val rdd1 = sc.parallelize(List("tom", "jerry"))
    val rdd2 = sc.parallelize(List("tom", "kitty", "shuke"))
    val rdd3 = rdd1.cartesian(rdd2)
    
    ###################################################################################################
    
    #spark action
    #后面2位一个分区 
    #创建几个分区就会生成几个task,这几个task会将数据进行处理然后写入到hdfs
    #几个task就会生成几个文件 
    #如果没有指定的话,那么task的数据跟启动时设置的core的数量一样 
    #但并不是越多越好比如:你电脑上可以有8个线程同时运行,但你可以在你电脑上启动80个线程,他对于这80个线程是每8个线程作为一个批次进行切换的 
    val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
    
    #collect
    rdd1.collect
    
    #reduce
    val r = rdd1.reduce(_+_)
    
    #count
    rdd1.count
    
    #top
    rdd1.top(2)
    
    #take
    rdd1.take(2)
    
    #first(similer to take(1))
    rdd1.first
    
    #takeOrdered
    rdd1.takeOrdered(3)
    
    
    
    #查看一个rdd有几个分区 
    rdd1.partitions.length
    #rdd最小的分区数量是2 
    
    #这样会先把test下的左右文件大小加在一起, 然后再重新计算应该分区的大小 
    #触发Action的时候才会指定有几个分区  
    sc.textFile("hdfs://hdp-01:9099/test)
     
    mapPartitionsWithIndex
    一次拿出一个分区(分区中并没有数据,而是记录尧都区那些数据,真正生成的Task弧度去多条数据)
    ,并且可以将分区的编号取出来,
    功能:取出分区中对应的数据时,还可以将分区的编号取出来,知道数据是属于那个分区 
        //该函数的功能是将对应分区中的数据取出来,并且带上分区编号
        val func = (index: Int, it: Iterator[Int]) => {
          it.map(e => s"part: $index, ele: $e")
        }
    val rdd1 = rdd.mapPartitionsWithIndex(func)
    
    
    aggregate方法
    val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
    rdd1.aggregate(0)(_+_,_+_) //结果为45,第一个括号是初始值,第二个括号表达先是对每个分区的操作,再总的操作 
    //下面这个意思是先求出每个分区的最大值,然后加起来 
    rdd1.aggregate(0)(math.max(_,_), _+_)
    
    #结果是25第一个分区1,2,3,和初始值比结果为5,第二个分区4,5,6,结果为6,第三个为9,这三个最大值加起来,再加上一个初始值 
    rdd1.aggregate(5)(math.max(_._), _+_) 
    
    
    val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
    def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
      iter.map(x => "[partID:" +  index + ", val: " + x + "]")
    }
    pairRDD.mapPartitionsWithIndex(func2).collect
    pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
    // Array[(String, Int)] = Array((dog,112), (cat,219), (mouse,206)) 因为cat在第一个分区加了100,在第二个分区也加了100,所以加了200,dog只在一个分区里出现,加了100 
    pairRDD.aggregateByKey(100)(_+_, _ + _).collect
    
    
    collection.map
    var rdd = sc.parallelize(List(("a",1), ("b",2))) 
    
    rdd.mapValues(_*100)
    
    rdd.mapValues(_*100).collectAsMap
    
    
    collect的执行过程
    rdd执行action的方法之后,会从后往前推
    知道找到一个数据源,有几个分区就会在driver端生成几个task ,然后发送给worker
    worker会找到数据源,下载数据,边下载边计算,
    计算完之后会发送给driver端(如果数据太大的话,不要把数据收集到driver端,在写入数据库啥的,这样driver是一个瓶颈,直接把计算结果写入到数据库或者啥的) 
    
    countByKey 
    
    val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
    rdd1.countByKey //scala.collection.Map[String,Long] = Map(a -> 1, b -> 2, c -> 2) 
    rdd1.countByValue//scala.collection.Map[(String, Int),Long] = Map((b,2) -> 2, (c,2) -> 1, (c,1) -> 1, (a,1) -> 1)
    
    filterByRange
    
    val rdd1 = sc.parallelize(List(("e", 5),("b", 5),("c", 3), ("d", 4), ("c", 2), ("a", 1)))
    val rdd2 = rdd1.filterByRange("b", "d")//按照key在b,到d之间的范围取出 
    rdd2.colllect
    
    
    flatMapValues
    val a = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))
    rdd3.flatMapValues(_.split(" "))//把value进行flatmap,再和key结合 
    
    foreach
    这是一个action,但不会发生在driver,会在executor执行,在driver端是看不到的
    
    foreachPartition
    val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
    rdd1.foreachPartition(x => println(x.reduce(_ + _)))//一次拿出来一个分区,然后打印出每个分区中的数据进行聚合
    #这说明任务执行的时候是在executor执行的 

     

    RDD的产生和分配

     

  • 相关阅读:
    PAT 甲级 1027 Colors in Mars
    PAT 甲级 1026 Table Tennis(模拟)
    PAT 甲级 1025 PAT Ranking
    PAT 甲级 1024 Palindromic Number
    PAT 甲级 1023 Have Fun with Numbers
    PAT 甲级 1021 Deepest Root (并查集,树的遍历)
    Java实现 蓝桥杯VIP 算法训练 无权最长链
    Java实现 蓝桥杯VIP 算法训练 无权最长链
    Java实现 蓝桥杯 算法提高 抽卡游戏
    Java实现 蓝桥杯 算法提高 抽卡游戏
  • 原文地址:https://www.cnblogs.com/wpbing/p/11331376.html
Copyright © 2011-2022 走看看