zoukankan      html  css  js  c++  java
  • 总结《Spark技术内幕》第三章 RDD实现详解

    @

    RDD实现详解

    RDD是Spark最基本也是最根本的数据抽象,本质将数据保存在内存中,并且高度受限的共享内存,即RDD是只读的,并且只能通过其他RDD上的批量操作来创建。

    1. 什么是RDD

    RDDDD弹性分布式内存数据集,只读,分区记录的集合,RDD只能基于在稳定物理存储中的数据集和其他已有的RDD上执行确定性操作来创建。

    RDD含有如何从其他RDD衍生(即计算)出本RDD的相关信息(即Lineage)。因此可以在RDD部分分区数据丢失的时候从别的RDD计算出相应分区的数据。

    RDD主要的属性:
    一组分片(Partition),数据集的基本组成单位。每个分片都会被一个计算任务处理,并决定计算的粒度默认值就是程序所分配到的CPU core的个数

    每个分区都会被逻辑映射成BlockManager的一个Block,而这个Block会被一个Task负责计算。

    RDD之间的依赖关系,RDD每次转化都会形成新的RDD,所以RDD之间会形成流水线一样的依赖关系在部分分区数据丢失时,Spark可以通过依赖关系来重新计算这个分区的数据,而不是对整个RDD来重新计算。

    RDD的分区器,对于有key-value的RDD是hash分区,没有的是range范围分区。

    一个列表,对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置

    1.1 RDD的创建

    两种创建RDD的方式:

    1. 由一个已经存在的Scala集合创建
    2. 外部存储系统数据集创建。(本地文件、HDFS、HBASE)

    RDD支持的操作:

    转化(transformation)从现有的数据集创建一个新的数据集

    动作(action)在数据集上进行计算,将结果返回给driver

    1.2 RDD的转换

    RDD的所有转换都是惰性的,他们只是记住转换动作,只有要求返回结果给Driver时,这些转换才会真正执行。

    1.3 RDD缓存

    RDD可以进行持久化,当持久化一个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其他动作(action)中重用。

    RDD缓存过的Partition有可能丢失,或者存储于内存的数据由于内存不足而被删除。因此有容错机制保证了即使缓存丢失也能保证计算的正确执行

    通过基于RDD的一系列的转换,丢失的数据会被重算。

    RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

    1.4 RDD的checkpoint

    缓存是在计算结束后直接将计算结果通过用户定义的存储级别(内存,本地磁盘)写入不同的介质

    而检查点不同,它是在计算完成后,重新建立一个Job来计算

    2.RDD的转换和DAG的生成

    Spark会根据用户提交的计算逻辑中的RDD的转换和动作来生成RDD之间的依赖关系
    同时这个计算链也就生成了逻辑上的DAG

    接下来以“Word Count”为例,详细描述这个DAG生成的实现过程。

    val file = sc.textFile("hdfs://...")
    val counts = file.flatMap(lines => lines.split(" "))
    			.map(word => (word, 1))
    			.reduceByKey(_+_)
    counts.saveAsTextFile("hdfs://...")
    

    file 和 counts 都是RDD。

    file是从HDFS上读取文件并创建了RDD。

    counts是在file的基础上通过flatMap、map 和 reduceByKey这三个RDD转换而成的,最后counts调用了saveAsTestFile方法。

    1. sc是sparkContext的实例,他是用户程序和spark的交互接口,根据用户设置来申请资源,并创建RDD。

    2. 将file中的所有行的内容,以空格分隔为单词的列表,然后将这个按照行构成的单词列表合并为一个列表。最后,以每个单词为元素的列表被保存到MapPartitionsRDD。

    3. 将第2步生成的MapPartitionsRDD再次经过map将每个单词word转为(word,1)的元组。这些元组最终被放到一个MapPartitionsRDD中。

    4. 行4:首先会生成一个MapPartitionsRDD,起到map端combiner的作用;然后会生成一个ShuffledRDD,它从上一个RDD的输出读取数据,作为reducer的开始;最后,还会生成一个MapPartitionsRDD,起到reducer端reduce的作用。

    5. 行5:首先会生成一个MapPartitionsRDD,这个RDD会通过调用org.apache.spark.rdd.PairRDDFunctions#saveAsHadoopDataset向HDFS输出RDD的数据内容。最后,调用org.apache.spark.SparkContext#runJob向集群提交这个计算任务。

    RDD之间的关系可以从两个维度来理解:

    一个是RDD是从哪些RDD转换而来,也就是RDD的parent RDD(s)是什么

    另一个是依赖于parent RDD(s)的哪些Partition(s)。这个关系,就是RDD之间的依赖。根据依赖于parent RDD(s)的Partitions的不同情况,Spark将这种依赖分为两种,一种是宽依赖,一种是窄依赖

    2.1 RDD的依赖关系

    RDD和它依赖的parent RDD(s)的关系有两种不同的类型,即窄依赖(narrowdependency)和宽依赖(wide dependency)。

    窄依赖指的是每一个parent RDD的Partition最多被子RDD的一个Partition使用。

    在这里插入图片描述

    宽依赖指的是多个子RDD的Partition会依赖同一个parent RDD的Partition

    在这里插入图片描述

    对于union,只是将多个RDD合并成一个,parent RDD的Partition(s)不会有任何的变化,可以认为只是把parent RDD的Partition(s)简单进行复制与合并。

    对于join,如果每个Partition仅仅和已知的、特定的Partition进行join,那么这个依赖关系也是窄依赖。

    窄依赖由于RDD每个Partition依赖固定数量的parent RDD(s)的Partition(s),因此可以通过一个计算任务来处理这些Partition,并且这些Partition相互独立,这些计算任务也就可以并行执行了。

    宽依赖子RDD的所有Partition(s)会依赖于parent RDD的所有Partition(s),子RDD的Partition是parent RDD的所有Partition Shuffle的结果,因此这两个R D D是不能通过一个计算任务来完成的。

    窄依赖的实现方式

    • 一对一的依赖,即OneToOneDependency
    • 一个是范围的依赖,即RangeDependency union 就是rangeDependency

    宽依赖的实现方式

    只有一种:ShuffleDependency。子RDD依赖于parent RDD的所有Partition

    宽依赖支持两种Shuffle Manager,基于Hash的Shuffle机制基于排序的Shuffle机制

    2.2 DAG的生成

    原始的RDD(s)通过一系列转换就形成了DAG。RDD之间的依赖关系,包含了RDD由哪些Parent RDD(s)转换而来和它依赖parent RDD(s)的哪些Partitions,是DAG的重要属性。DAG可以认为这些RDD之间形成了Lineage(血统)。借助Lineage,能保证一个RDD被计算前,它所依赖的parent RDD都已经完成了计算同时也实现了RDD的容错性,即如果一个RDD的部分或者全部的计算结果丢失了,那么就需要重新计算这部分丢失的数据

    Spark是如何根据DAG来生成计算任务呢?

    根据依赖关系的不同将DAG划分为不同的阶段(Stage)。

    对于窄依赖由于Partition依赖关系的确定性,Partition的转换处理就可以在同一个线程里完成,窄依赖被Spark划分到同一个执行阶段

    对于宽依赖由于Shuffle的存在,只能在parent RDD(s)Shuffle处理完成后,才能开始接下来的计算,因此宽依赖就是Spark划分Stage的依据,即Spark根据宽依赖将DAG划分为不同的Stage

    在一个Stage内部,每个Partition都会被分配一个计算任务(Task),这些Task是可以并行执行的

    Stage之间根据依赖关系变成了一个大粒度的DAG,这个DAG的执行顺序也是从前向后的。也就是说,Stage只有在它没有parent Stage或者parent Stage都已经执行完成后,才可以执行。

    2.3 RDD的计算

    原始的RDD经过一系列转换后,会在最后一个RDD上触发一个动作这个动作会生成一个Job

    在Job被划分为一批计算任务(Task)后,这批Task会被提交到集群上的计算节点去计算 。计算节点执行计算逻辑的部分称为Executor。

    DAG的最后一个阶段会为每个结果的Partition生成一个ResultTask,其余所有的阶段都会生成ShuffleMapTask

    2.3.1 sparkEnv

    在用户创建org.apache.spark.SparkContext时会创建org.apache.spark.SparkEnv

    SparkEnv中包含了一个运行时节点所需要的所有的环境信息。SparkEnv包含以下的角色

    1. akka.actor.ActorSystem 运行在Driver上是sparkDriver,运行在executor上是sparkExecutor
    2. Serializer:序列化和发序列化的工具。
    3. ShuffleManager: Shuffle的管理者,其中Driver端会注册Shuffle的信息,而Executor端会上报和获取Shuffle的信息现阶段内置支持Hash Based ShuffleSort Based Shuffle.
    4. BroadcastManager:广播变量的管理者
    5. BlockManager: 供了Storage模块与其他模块的交互接口,管理Storage模块。

    2.3.2 缓存的处理

    在应用程序中,若某个中间结果(RDD)被多次调用(触发action),则可以将该中间结果(RDD)缓存起来,第一次调用任务(触发action)时会先进行计算再缓存(首次微慢一些),第二次及后续多次调用该中间结果数据执行任务时,则会直接从缓存中读取进行操作。

    这样会带来如下好处:

    1:避免重复劳,相同的活只做一次;

    2:因为数据在缓存中,所以效率会很高。

    2.3.3 checkpoint

    引入checkpoint机制原因:

    如果采用 persists (缓存)把数据持久化在内存中的话,虽然最快速但是也是最不可靠的(内存清理);如果放在磁盘上也不是完全可靠的,例如磁盘会损坏,系统管理员可能会清空磁盘。

    Checkpoint 的产生就是为了相对而言更加可靠的持久化数据,在 Checkpoint 可以指定把数据放在本地并且是多副本的方式,在正常生产环境下通常放在 HDFS 上,借助HDFS 高可靠的特征来实现更可靠的数据持久化。

    2.3.4 RDD的容错机制

    RDD实现了基于Lineage的容错机制。在部分计算结果丢失时,只需要根据这个Lineage重算即可。

  • 相关阅读:
    extJs学习基础4 Ext.each的用法
    extJs学习基础3 ajax与php交互
    extJs学习基础2
    extJs学习基础
    model 的验证
    创建模型,设置id
    inheritableStatics 与statics类
    关于在jeecms中css,图片,html,模板是如何组装成——part2
    FreeMarker的实例通俗理解
    eclipse启动了tomcat,但是浏览器打不开欢迎页
  • 原文地址:https://www.cnblogs.com/erlou96/p/14308224.html
Copyright © 2011-2022 走看看