zoukankan      html  css  js  c++  java
  • 小白学习Spark系列三:RDD常用方法总结

      上一节简单介绍了Spark的基本原理以及如何调用spark进行打包一个独立应用,那么这节我们来学习下在spark中如何编程,同样先抛出以下几个问题。

    • Spark支持的数据集,如何理解?
    • Spark编程中常用到的操作?

    一、RDD基础

      1、RDD简介

      在上一节的组件图Spark Core中我们简单提到了对弹性分布式数据集:RDD(Resilient Distributed DataSet),它表示分布在多个计算节点上可以并行操作的元素集合,是Spark主要得编程抽象。一般我们广为熟知的数值类型是整型、字符型等,那么RDD是包含这些任意数值类型的不可变对象。在实际编程中,和其他数值类型除了操作不同外并没有直观的区别。

      2、惰性求值

      为了节省空间和提高运算效率,Spark使用惰性求值的方式将一些操作合并到一起来减少计算数据的步骤,这就使得RDD支持两种类型的操作:转化操作和行动操作。转化操作会由一个RDD生成另一个新的RDD,而行动操作是会对RDD计算出一个结果,并将结果返回到驱动程序中或者存储到外部存储系统中。

      转化操作是惰性求值的,意思就是在调用行动操作之前Spark是不会开始真正计算的,它只会使用谱系图来记录这些不同RDD之间的依赖关系,这样就可以在后期行动操作时按需计算真正需要的数据,也可以依靠谱系图在持久化的RDD丢失部分数据时恢复所丢失的数据。例如,在行动操作 first() 中,Saprk只需要扫描文件直到找到第一个数据为止,不需要读取整个文件。默认情况下,Spark的RDD会在每次行动操作时重新计算,如果多个行动操作重用同一个RDD,建议使用RDD.persist(),把该RDD缓存下来,避免不必要的重复计算。

      那么如何区分结果是由转化操作还是行动操作计算而来的呢?可以观察结果的数据类型,转化操作返回的是RDD,而行动操作返回的是其他的数值型类型。

      3、一般流程

      每个Spark程序无外乎如下几个步骤:

      (1)从外部数据创建出输入的RDD

      (2)使用诸如 filter() 这样的转化操作对RDD进行转化,以定义新的RDD

      (3)对需要重用的中间结果进行缓存

      (4)使用行动操作来触发并行计算

    二、RDD常用操作

      1、创建RDD

      以下两种创建RDD的方式相比,第一种较为常用,因为第二种方式需要整个数据集首先放在一台机器内存中。

      (1)外部数据的读取

    val test_rdd = sc.textFile("hdfs:///user/test.txt")

      (2)驱动程序中的集合并行化

    val test1_rdd = sc.parallelize(["pandas","noodles","dog"])
    val test2_rdd = sc.makeRDD(["pandas","noodles","dog"]) 

      2、转化操作

      (1)针对RDD中各元素的操作

    函数 作用
    map()(常用) 将函数应用于RDD中每个元素,将返回值构成新的RDD
    flatMap()(常用) 将函数引用于RDD中每个元素,将返回的迭代器的所有内容取出重新构成新的RDD

    filter()(常用)

    filter()的参数为布尔函数,返回满足该布尔函数的元素构成新的RDD
    distinct() 去重
    sample(withReplacement,[seed]) 对RDD采样,以及是否替换

      

      (2)对RDD进行伪集合的操作

    函数 作用
    union() 生成一个包含两个RDD中所有元素的RDD,不去重。类似并集
    intersection() 将两个RDD共同的元素构成新的RDD,去重。类似交集
    substract() 在左边RDD中移除右边RDD中的内容,类似左连接
    cartesian() 与另一个RDD笛卡尔积

     

      3、行动操作

    函数 作用
    collect()(常用) 返回RDD中全部元素
    count()(常用) 返回RDD中元素个数
    countByValue()(常用) 返回各元素在RDD中出现的次数,返回类型为元组的集合
    take(num)(常用) 返回RDD中num个元素
    top(num) 返回RDD中最前面的num个元素
    takeOrder(num)(ordering) 从RDD中按照提供的顺序返回最前面的num个元素
    takeSample(withReplacement,num,[seed]) 从RDD中返回任意些元素
    reduce(func)(常用) 并行整合RDD中所有数据,类似sum
    fold(zero)(func) 和reduce()一样,但是需要提供初始值
    aggregate(zeroValue)(seq0p, comb0p) 和reduce()相似,但是通常返回不同类型的函数
    foreach(func)(常用) 遍历RDD中每个元素使用传入的函数

     

      4、持久化

       由于Spark RDD是惰性求值的,当我们每次调用行动操作时,都会重算RDD的所有依赖,如果多次行动操作使用同一个RDD,就会导致大量的重复运算。为避免这种现象,可以对数据进行持久化,也就是存储该RDD,保存在各自的分区中。出于不同的目的,可以为RDD选择不同的持久化级别,如下所示:

    级别 使用的空间 CPU时间 是否在内存 是否在磁盘 备注
    MEMORY_ONLY  
    MEMORY_ONLY_SER  
    MEMORY_AND_DISK 中等 部分 部分 如果数据在内存放不下,溢写到磁盘上
    MEMORY_AND_DISK_SER 部分 部分 如果数据在内存放不下,溢写到磁盘上,在内存中放序列化后的数据
    DISK_ONLY  

      scala中使用 persist() 进行缓存,unpersist()方法可以手动地把持久化RDD从缓存中移除。示例:

    import org.apache.spark.storage.StorageLevel
    val result = input.map(x => x*x)
    result.persist(StorageLevel.DISK_ONLY)
    println(result.count())
    println(result.collect().mkstring(","))

     

    结束语:纵使掌握了操作RDD的常用函数,但在实际运用中仍然会出现许多疑问。在做项目的过程中,我总结的一些踩坑经验会留在下一节中讲解~

    参考:Spark大数据快速分析

  • 相关阅读:
    poj 3243 Clever Y(BabyStep GiantStep)
    poj 2417 Discrete Logging
    poj 3481 Double Queue
    hdu 4046 Panda
    hdu 2896 病毒侵袭
    poj 1442 Black Box
    hdu 2815 Mod Tree
    hdu 3065 病毒侵袭持续中
    hdu 1576 A/B
    所有控件
  • 原文地址:https://www.cnblogs.com/hithink/p/9672425.html
Copyright © 2011-2022 走看看