zoukankan      html  css  js  c++  java
  • Spark 基础及RDD基本操作

     

    什么是RDD

     

    RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。 
    RDD的属性

     

    一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。

     

    一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。

     

    RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

     

    一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

     

    一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。 
    基本RDD操作 
    创建RDD: 
    1)读取外部数据集 
    val file=sc.textFile(“hdfs://hadoop1:9000/input/word/word.txt”)

     

    2)在驱动器程序中对一个集合进行并行化
            val lines = sc.parallelize(List("pandas","i like pandas"))
    

     

    RDD操作: 
    RDD转化操作是返回一个新的RDD的操作,比如map()和filter() 
    RDD行动操作则是向驱动器程序返回结果或把结果写入外部系统的操作,会触发实际的计算 
    1)转化操作 
    val inputRDD = sc.textFile(“hdfs://hadoop1:9000/input/word/word.txt”)

     

        val keyRDD = inputRDD.filter(line => line.contains("guofei"))
    
    2)行动操作0
        val keyRDD = inputRDD.filter(line => line.contains("guofei"))
    
        wantRDD.take(10).foreach(println)
    

     

    常见的转化操作和行动操作 
    1.转化操作 
    map()与flatMap()区别 
    flatMap 将函数应用于RDD中的每个元素,将返回的迭代器的所有的内容构成新的RDD,通常用来切分单词 
    val lines = sc.parallelize(List(“come on”,”guofei”)) 
    var words = lines.flatMap(line => line.split(” “)) 
    words.collect()

     

    map 将函数应用于RDD中的每个元素,将返回值构成新的RDD
    var words1 = lines.map(line => line.split(" "))
    words1.collect()
    
    
    filter 返回一个由通过传给filter()的函数的元素组成的RDD
    val list = sc.parallelize(List(1,2,3,3))
    val listFilter = list.filter(x => x != 1)
    listFilter.collect()
    
    
    distinct 去重
    val listDistinct = list.distinct()
    listDistinct.collect()
    
    
    union() 生成一个包含俩哥哥RDD中所有元素的RDD
    val list = sc.parallelize(List(3,4,5))
    val list1 = sc.parallelize(List(1,2,3))
    val union = list.union(list1)
    union.collect()
    
    
    intersection() 求两个RDD共同的元素的RDD
    list.intersection(list1).collect()
    
    
    subtract() 移除里一个RDD中的内容
    list.subtract(list1).collect()
    
    
    cartesian() 与另一个RDD的笛卡儿积
    list.cartesian(list1).collect()
    

     

    2.行动操作 
    reduce() 
    val list = sc.parallelize(List(3,4,5)) 
    list.reduce((x,y) => x + y)

     

    collect() 返回RDD中的所有元素
    count() RDD中的元素个数
    countByValue() 各元素在RDD中出现的次数
    take(num) 从RDD中返回num个数
    top(num) RDD中返回最前面的num个元素
    takeOrdered(num)(ordering) 从RDD中按照提供的舒徐返回最前见的num元素
    reduce(func) 并行整合RDD中左右数据
    fold(zero)(func) 和reduce一样,但是需要提供初始值
    aggregate(zeroValue)(seqOp,combOp) 和reduce相似,但是通常返回不同类型的函数
    

     

    键值对操作: 
    创建Pair RDD

     

    使用第一个单词作为键创建出一个pair RDD 
    val file=sc.textFile(“hdfs://hadoop1:9000/input/word/word.txt”) 
    file.map(x => (x.split(” “)(0),x)).collect()

     

    Pair RDD的转化操作 
    创建Pair 
    val list1 = sc.parallelize(List((1,2),(3,4),(3,6))) 
    list1.collect()

     

    reduceByKey(func) 合并具有相同键的值 
    list1.reduceByKey((x,y) => x+y).collect()

     

    groupByKey() 对具有相同键的值进行分组 
    list1.groupByKey.collect()

     

    mapValues(func) 对pair RDD中的每个值应用一个函数而不改变键 
    list1.mapValues(x => x+1).collect()

     

    flatMapValues(func) 对pair RDD中的每个值应用一个返回迭代器的函数,然后对返回的每个元素都生成一个对应原键对记录。通常用于符号化 
    list1.flatMapValues(x => (x to 5)).collect()

     

    keys() 返回一个仅包含键的RDD 
    list1.keys.collect()

     

    values() 返回一个仅包含值得RDD 
    list1.values.collect()

     

    sortByKey() 返回一个根据键排序的RDD 
    list1.sortByKey().collect()

     

    针对两个pair RDD的转化操作 
    val rdd = sc.parallelize(List((1,2),(3,4),(3,6))) 
    val other = sc.parallelize(List((1,2)))

     

    subtractByKey 删掉RDD中键与other中的键相同的元素 
    rdd.subtractByKey(other).collect()

     

    join 对两个RDD进行内连接 
    rdd.join(other).collect()

     

    leftOuterJoin() 对两个RDD进行连接操作,确保第二个RDD的键必须存在(左外连接) 
    rdd.leftOuterJoin(other).collect()

     

    cogroup() 将两个RDD中拥有相同键的数据分组到一起 
    rdd.cogroup(other).collect()

     

  • 相关阅读:
    根据访问ip的地区跳转到指定地址
    js生成vCard,以及格式参数详细说明
    min_to_split_multiprocessing多进程,用于平时快速补充数据
    min_to_split.py按日存储到按个股存储
    readzip_minute_data 多进程处理数据
    打包成7zfile,to7zfile
    baostock_multiprocessing 多进程取数据
    终止阻塞线程(有共享锁的线程无效)
    readzip_add_maxL3多线程
    readzip_add_maxL2
  • 原文地址:https://www.cnblogs.com/awishfullyway/p/6509244.html
Copyright © 2011-2022 走看看