zoukankan      html  css  js  c++  java
  • Spark RDD深度解析-RDD计算流程

    Spark RDD深度解析-RDD计算流程

    摘要  RDDResilient Distributed Datasets)是Spark的核心数据结构,所有数据计算操作均基于该结构进行,包括Spark sql Spark Streaming。理解RDD有助于了解分布式计算引擎的基本架构,更好地使用Spark进行批处理与流计算。本文以Spark2.0源代码为主,对RDD的生成、计算流程、加载顺序等作深入的解析。

    RDD印象

    直观上,RDD可理解为下图所示结构,即RDD包含多个Partition(分区),每个Partition代表一部分数据并位于一个计算节点。

     

    RDD本质上Spark中的一个抽象类,所有子RDDHadoopRDDMapPartitionRDDJdbcRDD等)都要继承并实现其中的方法。

    abstract class RDD[T: ClassTag](
        @transient private var _sc: SparkContext,
        @transient private var deps: Seq[Dependency[_]]
      ) extends Serializable with Logging {

    RDD包含以下成员方法或属性

    1、compute方法

    提供在计算过程中Partition元素的获取与计算方式

    2、partition的列表

    每一个partition代表一个并行的最小划分单元;

    3、dependencies列表

    描述RDD依赖哪些父RDD生成即RDD的血缘关系

    4、partition的位置列表

    定义如何最快速的获取partition数据,加快计算,这个是可选的,可作为本地化计算的优化选项;

    5、partitioner方法

    定义如何对数据进行分区。

    RDD生成方式

    1、scala集合

    Partition的默认值:defaultParallelism

    defaultParallelismspark的部署模式相关:

      • Local 模式:本机 cpu cores 的数量
      • Mesos 模式:8
      • Yarnmax(2, 所有 executors cpu cores 个数总和)

    2、物理数据载入

    默认为min(defaultParallelism, 2)

    3、其他RDD转换

    根据具体的转换算子而定

    Partition

    Partiton不直接持有数据,仅仅代表了分区的位置(index的值)。

    trait Partition extends Serializable {
      /**
       * Get the partition's index within its parent RDD
       */
      def index: Int

      // A better default implementation of HashCode
      override def hashCode(): Int = index

      override def equals(other: Any): Boolean = super.equals(other)
    }

    Dependency

    从名字可以猜想,他描述了RDD之间的依赖关系。成员rdd就是父RDD,会在构造RDD时被赋值。

    abstract class Dependency[T] extends Serializable {
      def rdd: RDD[T]
    }

    由上述RDDDependcy关系可画出下图,通过这种方式,子RDD能轻易找到父RDD的位置等信息,从而构建出RDD的转换路径,为DAGScheduler的任务划分及任务执行时寻找依赖的数据提供依据。

     

    到此应该能大致明白RDD中涉及的各个概念的含义及其之间的联系。但是仔细思考,会发现存在很多问题,比如:

    既然RDD不携带数据,那么数据是何时加载的?怎么加载的?怎么分布到不同计算节点的?

    不同类型RDD是怎么完成转换的?


    RDD计算流程

    以下面几行代码为例,解答上述问题。

    var sc = new SparkContext();

    var hdfs_rdd = sc.textFile(hdfs://master:9000/examples/people.txt);  //  加载数据

    var rdd = hdfs_rdd.map(_.split(“,”));  //  对每行数据按逗号分隔

    print(rdd.count());  //  打印数据的条数

    RDD的转换

     

    首先从直观上了解上述代码执行过程中RDD的转换,如下图,Spark按照HDFS中文件的block将数据加载到内存,成为初始RDD1,经过每一步操作后转换为相应RDD

     

    首先分析textFile方法的作用,源码如下:

     def textFile(

        path: String,
        minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
      assertNotStopped()
      hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
        minPartitions).map(pair => pair._2.toString).setName(path)
    }

    着重看红色语句,textFile方法实际上是调用了hadoopFile方法,再利用其返回值调用map方法,HadoopFile执行了什么,返回了什么呢?

    def hadoopFile[K, V](
        path: String,
        inputFormatClass: Class[_ <: InputFormat[K, V]],
        keyClass: Class[K],
        valueClass: Class[V],
        minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
      assertNotStopped()

      // This is a hack to enforce loading hdfs-site.xml.
      // See SPARK-11227 for details.
      FileSystem.getLocal(hadoopConfiguration)

      // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
      val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
      val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
      new HadoopRDD(
        this,
        confBroadcast,
        Some(setInputPathsFunc),
        inputFormatClass,
        keyClass,
        valueClass,
        minPartitions).setName(path)
    }

    很明显,hadoopFile实际上是获取了HADOOP的配置,然后构造并返回了HadoopRDD对象,HadoopRDDRDD的子类。因此textFile最后调用的是HadoopRDD对象的map方法,其实RDD接口中定义并实现了map方法,所有继承了RDD的类调用的map方法都来自于此。

    观察RDDmap方法:

    def map[U: ClassTag](f: T => U): RDD[U] = withScope {
      val cleanF = sc.clean(f)
      new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
    }

    map方法很简单,首先包装一下传进来的函数,然后返回MapPartitionsRDD对象。至此,textFile结束,他最终只是返回了MapPartitionsRDD,并没有执行数据读取、计算操作。

     接着看下一语句:var rdd = hdfs_rdd.map(_.split(“,”));

    由上面的分析可知hdfs_rdd是一个MapPartitionsRDD对象,于是其map方法内容与上文的一模一样,也只是返回一个包含用户函数的MapPartitionsRDD对象。

    目前为止每个方法的调用只是返回不同类型的RDD对象,还未真正执行计算。

    接着var cnt = rdd.count();

    count是一种action类型的操作,会触发RDD的计算,为什么说count会触发RDD的计算呢?需要看看count的实现:

    def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

    可以看到,count方法中调用了scsparkContext)的runJob方法,该操作将触发DagScheduler去分解任务并提交到集群执行。count方法会返回Array[U]类型的结果,数组中每个值代表了当前RDD每个分区中包含的元素个数,因此sum的结果就是RDD中所有元素的个数,本例的结果就是HDFS文件中存在几行数据。

    RDD的计算

    下面介绍任务提交后RDD是怎么计算出来的。

    任务分解并提交后开始执行,task最后一个RDD上执行compute方法

    以上述代码为例,最后一个RDD的类型是MapPartitionsRDD,看其compute方法:

    override def compute(split: Partition, context: TaskContext): Iterator[U] =
      f(context, split.index, firstParent[T].iterator(split, context))

    其中splitRDD的分区,firstParent是父RDD;最外层的f其实是构造MapPartitionsRDD时传入的一个参数,改参数是一个函数对象,接收三个参数并返回Iterator

    private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
        var prev: RDD[T],
        f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
        preservesPartitioning: Boolean = false)

    f是何时生成的呢?就看何时生成的MapPartitionsRDD,参考上文可知MapPartitionsRDD是在map方法里构造的第二个构造参数就是f的具体实现。

    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))

    综上可知,MapPartitionsRDDcomputef的作用就是就是对f的第三个参数iter执行iter.map(cleanF),其中cleanF就是用户调用map时传入的函数,而iter又firstParent[T].iterator(split, context)的返回值。

    firstParent[T].iterator(split, context)又是什么呢?他是对父RDD执行iterator方法,该方法是RDD接口的final方法,因此所有子RDD调用的都是该方法。

    final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
      if (storageLevel != StorageLevel.NONE) {
        getOrCompute(split, context)
      } else {
        computeOrReadCheckpoint(split, context)
      }
    }

    通过进一步查看可知,iterator先判断 RDD storageLevel 是否为 NONE,若不是,则尝试从缓存中读取,读取不到则通过计算来获取该 Partition 对应的数据的迭代器;若是,尝试从 checkpoint 中获取 Partition 对应数据的迭代器,若 checkpoint 不存在则通过计算来获取。

    Iterator方法将返回一个迭代器,通过迭代器可以访问父RDD个分区的每个元素,如果内存中不存在父RDD的数据,则调用父RDDcompute方法进行计算。

    RDD真正的计算由RDDaction 操作触发,对于action 操作之前的所有Transformation 操作,Spark只记录Transformation的RDD生成轨迹,即各个RDD之间的相互依赖关系。

    总结

    Spark RDD的计算方式为:spark是从最后一个RDD开始计算(调用compute),计算时寻找父RDD,若父RDD在内存就直接使用,否则调用父RDDcompute计算得出,以此递归,过程可抽象为下图:

     

    从对象产生的顺序看,先生成了HadoopRDD,调用两次map方法后依次产生两个MapPartitionsRDD;从执行的角度看,先执行最后一个RDDcompute方法,在计算过程中递归执行父RDDcompute,以生成对应RDD的数据数据加载角度看,第一个构造出来的RDD执行compute时才会将数据载入内存(本例中为HDFS读入内存),然后在这些数据上执行用户传入的方法,依次生成子RDD的内存数据。

  • 相关阅读:
    mysql字符串连接,重复等字符串函数总结
    mysql字符串连接,重复等字符串函数总结
    mysql字符串连接,重复等字符串函数总结
    solr4.x设置默认查询字段
    一)如何开始 ehcache ?
    tomcat 配置图片服务器
    Cache Algorithms
    shiro 和 spring
    Tomcat 环境变量配置
    No cache or cacheManager properties have been set. Authorization cache cannot be obtained.
  • 原文地址:https://www.cnblogs.com/candl/p/9604351.html
Copyright © 2011-2022 走看看