zoukankan      html  css  js  c++  java
  • Spark核心 RDD(下)

    Spark核心 RDD(下)

    引言

      Spark核心 RDD(下)主要内容包括:一、Spark编程接口(API),二、使用RDD表示的基于数据并行的应用,三、Spark中的RDD关联关系的源码分析

    一、Spark编程接口

    预备知识:

    1、Scala:是一种基于JVM的静态类型、函数式、面向对象的语言。Scala具有简洁(特别适合交互式使用)、有效(因为是静态类型)特点

    2、Driver定义了一个或多个RDD,并调用RDD上的动作。Worker是长时间运行的进程,将RDD分区以Java对象的形式缓存在内存中

    3、闭包:相当于一个只有一个方法的紧凑对象(a compact object)

    4、用户执行RDD操作时会提供参数,比如map传递一个闭包(closure,函数式编程中的概念)。Scala将闭包表示为Java对象,如果传递的参数是闭包,则这些对象被序列化,通过网络传输到其他节点上进行装载。

    图-1

     

    二、使用RDD表示的基于数据并行的应用

    1、机器学习算法:每次迭代时执行一对map和reduce操作:逻辑回归、kmeans;两个不同的map/reduce步骤交替执行:EM;交替最小二乘矩阵分解和协同过滤算法

    val points = spark.textFile(...)
         .map(parsePoint).persist()
    var w = // random initial vector
    for (i <- 1 to ITERATIONS) {
         val gradient = points.map{ p =>
              p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
         }.reduce((a,b) => a+b)
         w -= gradient
    }
    

     注:定义一个名为points的缓存RDD,这是在文本文件上执行map转换之后得到的,即将每个文本行解析为一个Point对象;然后在points上反复执行map和reduce操作,每次迭代时通过对当前w的函数进行求和来计算梯度

    2、RDD实现集群编程模型(MapReduce、Pregel、Hadoop)

    1、MapReduce

    data.flatMap(myMap)
        .groupByKey()
        .map((k, vs) => myReduce(k, vs))
    

    如果任务包含combiner,则相应的代码为:

    data.flatMap(myMap)
        .reduceByKey(myCombiner)
        .map((k, v) => myReduce(k, v))
    

     注:ReduceByKey操作在mapper节点上执行部分聚集,与MapReduce的combiner类似

    2、Pregel

    3、Hadoop

    三、Spark中的RDD关联关系的源码分析

    表1 Spark中RDD的内部接口

    操作含义
    partitions() 返回一组Partition对象,数据集的原子组成部分
    preferredLocations(p) 根据数据存放的位置,返回分区p在哪些节点访问更快
    dependencies() 返回一组依赖,描述了RDD的Lineage
    iterator(p, parentIters) 按照父分区的迭代器,逐个计算分区p的元素
    partitioner() 返回RDD是否hash/range分区的元数据信息
    1、抽象类Dependency:
    abstract class Dependency[T] extends Serializable {
      def rdd: RDD[T]
    }
    

    注:Dependency有两个子类,一个子类为窄依赖:NarrowDependency;一个为宽依赖ShuffleDependency

    2、抽象类NarrowDependency

    abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
      /**
       * Get the parent partitions for a child partition.
       * @param partitionId a partition of the child RDD
       * @return the partitions of the parent RDD that the child partition depends upon
       */
      def getParents(partitionId: Int): Seq[Int]
    
      override def rdd: RDD[T] = _rdd
    }
    

     注:NarrowDependency实现了getParents 重写了 rdd 函数。NarrowDependency有两个子类,一个是 OneToOneDependency,一个是 RangeDependency

      NarrowDependency允许在一个集群节点上以流水线的方式(pipeline)计算所有父分区

      NarrowDependency能够更有效地进行失效节点的恢复,即只需重新计算丢失RDD分区的父分区

    2.1、OneToOneDependency:

    class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
      override def getParents(partitionId: Int): List[Int] = List(partitionId)
    }
    

    注:getParents实现很简单,就是传进一个partitionId: Int,再把partitionId放在List里面传出去,即去parent RDD 中取与该RDD 相同 partitionID的数据

    2.2、RangeDependency

    class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
      extends NarrowDependency[T](rdd) {
    
      override def getParents(partitionId: Int): List[Int] = {
        if (partitionId >= outStart && partitionId < outStart + length) {
          List(partitionId - outStart + inStart)
        } else {
          Nil
        }
      }
    }
    

     注:某个parent RDD 从 inStart 开始的partition,逐个生成了 child RDD 从outStart 开始的partition,则计算方式为: partitionId - outStart + inStart ***

    3、ShuffleDependency
    class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
        @transient private val _rdd: RDD[_ <: Product2[K, V]],
        val partitioner: Partitioner,
        val serializer: Serializer = SparkEnv.get.serializer,
        val keyOrdering: Option[Ordering[K]] = None,
        val aggregator: Option[Aggregator[K, V, C]] = None,
        val mapSideCombine: Boolean = false)
      extends Dependency[Product2[K, V]] {
    
      override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
    
      private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
      private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
      // Note: It's possible that the combiner class tag is null, if the combineByKey
      // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
      private[spark] val combinerClassName: Option[String] =
        Option(reflect.classTag[C]).map(_.runtimeClass.getName)
    //获取shuffleID
      val shuffleId: Int = _rdd.context.newShuffleId()
    //向注册shuffleManager注册Shuffle信息
      val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
        shuffleId, _rdd.partitions.length, this)
    
      _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
    }
    

    注:ShuffleDependency需要首先计算好所有父分区数据,然后在节点之间进行Shuffle

      ShuffleDependency单个节点失效可能导致这个RDD的所有祖先丢失部分分区,因而需要整体重新计算

      ShuffleDependency的Lineage链较长,采用检查点机制,将ShuffleDependency的RDD数据存到物理存储中可以实现优化,单个节点失效可以从物理存储中获取RDD数据,但是一般cpu的计算的速度比读取磁盘的速度快,这得看实际情况权衡。

    1、writePartitionToCheckpointFile:把RDD一个Partition文件里面的数据写到一个Checkpoint文件里面

      def writePartitionToCheckpointFile[T: ClassTag](
          path: String,
          broadcastedConf: Broadcast[SerializableConfiguration],
          blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
        val env = SparkEnv.get
        //获取Checkpoint文件输出路径
        val outputDir = new Path(path)
        val fs = outputDir.getFileSystem(broadcastedConf.value.value)
    
        //根据partitionId 生成 checkpointFileName
        val finalOutputName = ReliableCheckpointRDD.checkpointFileName(ctx.partitionId())
        //拼接路径与文件名
        val finalOutputPath = new Path(outputDir, finalOutputName)
        //生成临时输出路径
        val tempOutputPath =
          new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}")
    
        if (fs.exists(tempOutputPath)) {
          throw new IOException(s"Checkpoint failed: temporary path $tempOutputPath already exists")
        }
        //得到块大小,默认为64MB
        val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
        //得到文件输出流
        val fileOutputStream = if (blockSize < 0) {
          fs.create(tempOutputPath, false, bufferSize)
        } else {
          // This is mainly for testing purpose
          fs.create(tempOutputPath, false, bufferSize,
            fs.getDefaultReplication(fs.getWorkingDirectory), blockSize)
        }
        //序列化文件输出流
        val serializer = env.serializer.newInstance()
        val serializeStream = serializer.serializeStream(fileOutputStream)
        Utils.tryWithSafeFinally {
        //写数据
          serializeStream.writeAll(iterator)
        } {
          serializeStream.close()
        }
    
        if (!fs.rename(tempOutputPath, finalOutputPath)) {
          if (!fs.exists(finalOutputPath)) {
            logInfo(s"Deleting tempOutputPath $tempOutputPath")
            fs.delete(tempOutputPath, false)
            throw new IOException("Checkpoint failed: failed to save output of task: " +
              s"${ctx.attemptNumber()} and final output path does not exist: $finalOutputPath")
          } else {
            // Some other copy of this task must've finished before us and renamed it
            logInfo(s"Final output path $finalOutputPath already exists; not overwriting it")
            if (!fs.delete(tempOutputPath, false)) {
              logWarning(s"Error deleting ${tempOutputPath}")
            }
          }
        }
      }
    

     2、writeRDDToCheckpointDirectoryWrite,将一个RDD写入到多个checkpoint文件,并返回一个ReliableCheckpointRDD来代表这个RDD

     def writeRDDToCheckpointDirectory[T: ClassTag](
          originalRDD: RDD[T],
          checkpointDir: String,
          blockSize: Int = -1): ReliableCheckpointRDD[T] = {
    
        val sc = originalRDD.sparkContext
    
        // 生成 checkpoint文件 的输出路径
        val checkpointDirPath = new Path(checkpointDir)
        val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
        if (!fs.mkdirs(checkpointDirPath)) {
          throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
        }
    
        // 保存文件,并重新加载它作为一个RDD
        val broadcastedConf = sc.broadcast(
          new SerializableConfiguration(sc.hadoopConfiguration))
        sc.runJob(originalRDD,
          writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
    
        if (originalRDD.partitioner.nonEmpty) {
          writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
        }
    
        val newRDD = new ReliableCheckpointRDD[T](
          sc, checkpointDirPath.toString, originalRDD.partitioner)
        if (newRDD.partitions.length != originalRDD.partitions.length) {
          throw new SparkException(
            s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " +
              s"number of partitions from original RDD $originalRDD(${originalRDD.partitions.length})")
        }
        newRDD
      }
    

     

     

     

     

     

     

  • 相关阅读:
    maven打包额外的资源文件
    阿里巴巴的程序员等级
    sql是最成功的第四代语言
    nginx的配置与应用
    浏览器的同源策略与跨域问题的解决方案
    算法:二分查找(基础)
    动态类型语言和静态类型语言
    【VS开发】单文档中往视图中加入控件
    【VS开发】使用VS2010创建MFC ActiveX工程项目
    【VS开发】使用VS2010创建MFC ActiveX工程项目
  • 原文地址:https://www.cnblogs.com/yinminbo/p/11834899.html
Copyright © 2011-2022 走看看