zoukankan      html  css  js  c++  java
  • RDD的三个机制

    1.血统机制

    RDD只支持粗粒度转换,即只记录单个块上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

    RDD与RDD之间存在依赖关系,依赖关系都是通过转换算子构建的。转换算子都是懒加载的。Spark应用程序会通过Action算子触发Job操作,Job在运行过程中 是从后往前回溯的,回溯的时候就是根据RDD的依赖关系。这样就构建了RDD的血统机制。有了依赖链条的存在,当RDD中数据丢失的时候,会根据血统机制进行自动恢复数据。
    窄依赖:
    父RDD中一个partition最多被子RDD中一个partition所依赖,所以当子RDD中一个parition数据丢失时会重算其相应的父RDD中的数据,不需要对整个RDD进行数据恢复。一对一或者多对一情况下:无冗余无浪费。
    宽依赖:父RDD中一个partition被子RDD中多个partition所依赖的, 所以如果子RDD中的一个partition数据丢失,那么他会重算其所依赖的所有父RDD的partition。一对多和多对多:数据有冗余和浪费。
    所以宽依赖操作会出现大量冗余的数据计算。

    2.RDD的缓存机制

    Spark速度非常快的原因之一,就是在不同操作中可以在内存或者磁盘中缓存数据集。当持久化某个RDD后,每一个节点都将把计算分区结果保存在内存中,对此RDD或衍生出的RDD进行的其他动作中重用。这使得后续的动作变得更加迅速。RDD相关的持久化和缓存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。

    RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

    object StorageLevel {
      val NONE = new StorageLevel(false, false, false, false)
      val DISK_ONLY = new StorageLevel(true, false, false, false)
      val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
      val MEMORY_ONLY = new StorageLevel(false, true, false, true)
      val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
      val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
      val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
      val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
      val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
      val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
      val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
      val OFF_HEAP = new StorageLevel(false, false, true, false)
      ......
    }
    
    class StorageLevel private(
        private var _useDisk: Boolean,
        private var _useMemory: Boolean,
        private var _useOffHeap: Boolean,
        private var _deserialized: Boolean,
        private var _replication: Int = 1)
      extends Externalizable {
      ......
      def useDisk: Boolean = _useDisk
      def useMemory: Boolean = _useMemory
      def useOffHeap: Boolean = _useOffHeap
      def deserialized: Boolean = _deserialized
      def replication: Int = _replication
      ......
    }
    

    可以看到StorageLevel类的主构造器包含了5个参数:

    useDisk:使用硬盘(外存) useMemory:使用内存 useOffHeap:使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。 deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象。序列化方式存储对象可以节省磁盘或内存的空间,一般 序列化:反序列化=1:3 replication:备份数(在多个节点上备份) 理解了这5个参数,StorageLevel 的12种缓存级别就不难理解了。

    val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 就表示使用这种缓存级别的RDD将存储在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上备份2份(正常的RDD只有一份)

    另外还注意到有一种特殊的缓存级别

    val OFF_HEAP = new StorageLevel(false, false, true, false)

    使用了堆外内存,StorageLevel 类的源码中有一段代码可以看出这个的特殊性,它不能和其它几个参数共存。

    • RDD是如何进行缓存的

      • rdd.cache操作 rdd.persist操作,通过这两个操作就能够缓存RDD的数据

      • rdd缓存操作也是懒加载的,也是有action算子进行触发

      • rdd数据缓存以后,后续在使用这个RDD的时候其运行速度要比第一次rdd创建时候速度要快至少10倍

    • rdd.cache与 rdd.persist区别

      • cache和persist实际上是一个方法 都是调用的 persist(StorageLevel.MEMORY_ONLY)

    • RDD的缓存级别

    总共12中缓存级别
    //不使用缓存
    val NONE = new StorageLevel(false, false, false, false)
    //仅仅缓存到磁盘
    val DISK_ONLY = new StorageLevel(true, false, false, false)
    //缓存到磁盘并且备份
    val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
    //缓存到内存当中 默认常用
    val MEMORY_ONLY = new StorageLevel(false, true, false, true)
    //缓存到内存当中并且备份
    val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
    //缓存到内存当中并且序列化
    val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
    //缓存到内存当中并且序列化 备份
    val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
    //缓存到内存当中和磁盘  常用
    val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
    //缓存到内存当中和磁盘 备份
    val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
    //缓存到内存当中和磁盘 序列化  常用
    val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
    //缓存到内存当中和磁盘 序列化 备份
    val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
    //堆外内存
    val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
    
    • RDD的缓存也是分布式的,每个节点只缓存其当前节点的数据。

    • 释放资源rdd.unpersist

    • RDD的缓存机制有没有缺点:

      • RDD使用内存和磁盘缓存,使用内存可能会被JVM垃圾回收。使用磁盘可能会损坏或者被人为删除掉。

    3.checkpoint机制

    • 设置检查点:spark可以通过checkpoint操作将rdd存储到hdfs,hdfs天然备份机制能够最大程度的保证缓存数据的安全性。

    • 操作步骤:

      • sparkContext.setCheckPointDir(hdfs路径)

      • rdd.checkPoint()

    • checkpoint操作也是懒加载的,action算子触发的

      • checkpoint机制也会触发job操作。是在整个job执行完成后,然后启动一个job去执行存储操作。

      • checkpoint会打破rdd的血统机制,checkpoint的job执行完成之后,会清理掉其rdd的所有依赖关系

    • RDD缓存和checkpoint操作应用场景

      • 如果计算特别耗时(耗时操作占用整个应用程序的30%),此时需要考虑缓存

      • shuffle操作之后,有必要将rdd数据缓存或者检查点

      • 读取大量数据操作之后(读取数据占用整个应用程序执行的30%),此时需要考虑缓存

      • 一个计算过程其计算链条过长,可以在中间比较重要的过程设置缓存或者检查点

      • 作为最佳实践操作,一般情况在checkpoint之前会使用缓存机制cache

  • 相关阅读:
    AODH: ALARM EVENTS IN OPENSTACK
    OpenStack企业私有云新需求(1):Nova 虚机支持 GPU
    How to use the ZooKeeper driver for ServiceGroup in OpenStack Nova
    keystone DB in devstack
    用外部物理路由器时使用Neutron dhcp-agent提供的metadata服务(by quqi99)
    调试OpenStack时遇到的主要问题(by quqi99)
    09 算数运算符
    08 常量
    07 值传递和地址传递
    06 指针入门
  • 原文地址:https://www.cnblogs.com/warmsky/p/12642674.html
Copyright © 2011-2022 走看看