zoukankan      html  css  js  c++  java
  • Spark快速大数据分析_3:第三章

    第 3 章 RDD 编程

    目录:

    3.1 RDD基础

    3.2 创建RDD

    3.3 RDD操作

    3.4 向spark传递函数

    3.5 常见的RDD操作

    3.6 持久化

    3.1 RDD基础

    RDD(Resilient Distributed Dataset):弹性分布式数据集,是一个不可变的分布式对象集合。每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。

    RDD 可以包含 Python、Java、Scala 中任意类型的对象,甚至可以包含用户自定义的对象。

    RDD 的转化操作都是惰性求值的,所以我们不应该把 RDD 看作存放着特定数据的数据集,而最好把每个 RDD 当作我们通过转化操作构建出来的、记录如何计算数据的指令列表。

    3.2 创建RDD

    1、创建RDD的2种方式:

    (1)读取一个外部数据集:eg 

    lines = sc.textFile("README.md")

    (2)SparkContext 的 parallelize() 方法,在驱动器程序里分发驱动器程序中的对象集合(比如 list 和 set),eg:

    lines = sc.parallelize(["pandas", "i like pandas"])

    3.3 RDD操作

    1、2种RDD操作:

    (1)转化操作:由一个 RDD 生成一个新的 RDD,返回值类型:1个RDD

    (2)行动操作:对 RDD 计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如 HDFS)中。返回值类型:其他数据类型

    # 转化操作
    >>> pythonLines = lines.filter(lambda line: "Python" in line)
    # 行动操作
    >>> pythonLines.first() u'## Interactive Python Shell'

    2、转化操作

    (1)返回一个新的RDD

    (2)惰性计算,不会触发实际的计算,比如 map() 和 filter()

    3、行动操作

    (1)返回其他数据类型数据,会把结果返回给驱动器程序(eg:take()、collect() )或把结果写入外部系统(eg: saveAsTextFile()saveAsSequenceFile())

    (2)会触发实际的计算,比如 count() 和 first()

    4、惰性求值

    (1)RDD 的转化操作都是惰性求值的。这意味着在被调用行动操作之前 Spark 不会开始计算。

    (2)优点:可以把一些操作合并到一起来减少计算数据的步骤。用户可以用更小的操作来组织他们的程序, 这样也使这些操作更容易管理。

    5、持久化:RDD.persist()

    因为RDD是惰性求值的,所以Spark 的 RDD 会在你每次对它们进行行动操作时重新计算。如果想在多个行动操作中重用同一个 RDD,可以使用 RDD.persist() 让 Spark 把这个 RDD 缓存下来。

    在第一次对持久化的 RDD 计算之后,Spark 会把 RDD 的内容保存到内存中(以分区方式存储到集群中的各机器上),这样在之后的行动操作中,就可以重用这些数据了。我们也可以把 RDD 缓存到磁盘上而不是内存中。

    6、每个 Spark 程序或 shell 会话的工作流程

    (1) 创建RDD:从外部数据创建出输入 RDD。

    (2) 转化操作:使用诸如 filter() 这样的转化操作对 RDD 进行转化,以定义新的 RDD。

    (3) 持久化RDD:告诉 Spark 对需要被重用的中间结果 RDD 执行 persist() 操作。

    (4) 行动操作:使用行动操作(例如 count() 和 first() 等)来触发一次并行计算,Spark 会对计算进行优化后再执行。

    3.4  向spark传递函数

    Spark 的大部分转化操作和一部分行动操作,都需要依赖用户传递的函数来计算。在我们支持的三种主要语言中,向 Spark 传递函数的方式略有区别。

    在 Python 中,我们有三种方式来把函数传递给 Spark。

    (1)传递比较短的函数时,可以使用 lambda 表达式来传递

    (2)传递顶层函数

    (3)传递定义的局部函数

    word = rdd.filter(lambda s: "error" in s)
    
    def containsError(s):
        return "error" in s
    word = rdd.filter(containsError)

    3.5 常见的转化操作和行动操作

    1、RDD类型及常见基本操作

    (1)RDD类型:数字类型、键值对类型

    (2)基本操作:适用于任何类型的RDD

    (3)特殊类型的特殊操作:例如:数字类型的 RDD 支持统计型函数操作,而键值对形式的 RDD 则支持诸如根据键聚合数据的键值对操作

    2、基本操作

    (1)基本转化操作

    表3-2:对一个数据为{1, 2, 3, 3}的RDD进行基本的RDD转化操作

    函数名

    目的

    示例

    结果

    map()

    将函数应用于 RDD 中的每个元素,将返回值构成新的 RDD

    rdd.map(x => x + 1)

    {2, 3, 4, 4}

    flatMap()

    将函数应用于 RDD 中的每个元素,将返回的迭代器的所有内容构成新的 RDD。通常用来切分单词

    rdd.flatMap(x => x.to(3))

    {1, 2, 3, 2, 3, 3, 3}

    filter()

    返回一个由通过传给 filter() 的函数的元素组成的 RDD

    rdd.filter(x => x != 1)

    {2, 3, 3}

    distinct()

    去重

    rdd.distinct()

    {1, 2, 3}

    sample(withReplacement, fraction, [seed])

    对 RDD 采样,以及是否替换

    rdd.sample(false, 0.5)

    非确定的

    表3-3:对数据分别为{1, 2, 3}和{3, 4, 5}的RDD进行针对两个RDD的转化操作

    函数名

    目的

    示例

    结果

    union()

    生成一个包含两个 RDD 中所有元素的 RDD

    rdd.union(other)

    {1, 2, 3, 3, 4, 5}

    intersection()

    求两个 RDD 共同的元素的 RDD

    rdd.intersection(other)

    {3}

    subtract()

    移除一个 RDD 中的内容(例如移除训练数据)

    rdd.subtract(other)

    {1, 2}

    cartesian()

    与另一个 RDD 的笛卡儿积

    rdd.cartesian(other)

    {(1, 3), (1, 4), ...(3, 5)}

    (2)基本行动操作

    表3-4:对一个数据为{1, 2, 3, 3}的RDD进行基本的RDD行动操作

    函数名

    目的

    示例

    结果

    collect()

    返回 RDD 中的所有元素

    rdd.collect()

    {1, 2, 3, 3}

    count()

    RDD 中的元素个数

    rdd.count()

    4

    countByValue()

    各元素在 RDD 中出现的次数

    rdd.countByValue()

    {(1, 1),(2, 1),(3, 2)}

    take(num)

    从 RDD 中返回 num 个元素

    rdd.take(2)

    {1, 2}

    top(num)

    从 RDD 中 返回最前面的 num 个元素

    rdd.top(2)

    {3, 3}

    takeOrdered(num)(ordering)

    从 RDD 中按照提供的顺序返回最前面的 num 个元素

    rdd.takeOrdered(2)(myOrdering)

    {3, 3}

    takeSample(withReplacement, num, [seed])

    从 RDD 中返回任意一些元素

    rdd.takeSample(false, 1)

    非确定的

    reduce(func)

    并行整合 RDD 中 所有数据(例如 sum

    rdd.reduce((x, y) => x + y)

    9

    fold(zero)(func)

    和 reduce() 一样,但是需要提供初始值

    rdd.fold(0)((x, y) => x + y)

    9

    aggregate(zeroValue)(seqOp, combOp)

    和 reduce() 相似,但是通常返回不同类型的函数

     

    rdd.aggregate((0, 0))
    ((x, y) =>
    (x._1 + y, x._2 + 1),
    (x, y) =>
    (x._1 + y._1, x._2 + y._2))

    (9,4)

    foreach(func)

    对 RDD 中的每个元素使用给定的函数

    rdd.foreach(func)

    3、如何转换RDD类型

    有些函数只能用于特定类型的 RDD,比如 mean() 和 variance() 只能用在数值 RDD 上,而 join() 只能用在键值对 RDD 上。我们会在第 6 章讨论数值 RDD 的专门函数,在第 4 章讨论键值对 RDD 的专有操作。在 Scala 和 Java 中,这些函数都没有定义在标准的 RDD 类中,所以要访问这些附加功能,必须要确保获得了正确的专用 RDD 类。

    Python 的 API 结构与 Java 和 Scala 有所不同。在 Python 中,所有的函数都实现在基本的 RDD 类中,但如果操作对应的 RDD 数据类型不正确,就会导致运行时错误。

    3.6 持久化(缓存)

    (1) persist() 方法可以让 Spark 对数据进行持久化。

    (2)当我们让 Spark 持久化存储一个 RDD 时,计算出 RDD 的节点会分别保存它们所求出的分区数据。如果一个有持久化数据的节点发生故障,Spark 会在需要用到缓存的数据时重算丢失的数据分区。如果希望节点故障的情况不会拖累我们的执行速度,也可以把数据备份到多个节点上。

    (3)我们在第一次对这个 RDD 调用行动操作前就调用了 persist() 方法。persist() 调用本身不会触发强制求值。

    (4)RDD 还有一个方法叫作 unpersist(),调用该方法可以手动把持久化的 RDD 从缓存中移除。

  • 相关阅读:
    c# 使用Selenium模拟登录和操作数据的学习记录【续】
    c# 使用Selenium模拟登录和操作数据的学习记录
    使用bat一键安装mysql
    使用c#程序 添加iis网站目录的用户权限
    NOIp游记
    线规集合
    背包规划集合
    阴间扫描线
    11.30数学集合
    高精度运算
  • 原文地址:https://www.cnblogs.com/hailin2018/p/13902533.html
Copyright © 2011-2022 走看看