zoukankan      html  css  js  c++  java
  • Spark源码走读1——RDD

    RDD全称Resilient Distributed DataSets,弹性的分布式数据集。是Spark的核心内容。

    RDD是只读的,不可变的数据集,也拥有很好的容错机制。他有5个主要特性

       -A list of partitions 分片列表,数据能为切分才好做并行计算

       -A function for computing each split 一个函数计算一个分片

       -A list of dependencies on other RDDs 对其他RDD的依赖列表

       -Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

        RDD  可选的,key-value型的RDD,根据hash来分区

       -Optionally, a list of preferred locations to compute each split on (e.g. blocklocations for

        an HDFS file) 可选的,每一个分片的最佳计算位置

     RDD是Spark所有组件运行的底层系统,RDD是一个容错的,并行的数据结构,它提供了丰富的数据操作和API接口

     Spark中的RDD API

    一个RDD可以包含多个分区。每个分区都是一个dataset片段。RDD之间可以相互依赖

    窄依赖:一一对应的关系,一个RDD分区只能被一个子RDD的分区使用的关系

    宽依赖:一多对应关系,若多个子RDD分区都依赖同一个父RDD分区

    如下RDD图览


    在源码packageorg.apache.spark.rdd.RDD中有一些比较中的方法:

    1)

      /**

       * Implemented by subclasses to return the set of partitions in this RDD. This method will only

       * be called once, so it is safe to implement a time-consuming computation in it.

       * 子类实现返回一组分区在这个RDD。这种方法将只被调用一次,因此它是安全的,它来实现一个耗时的计算。

       */

      protected def getPartitions: Array[Partition]

    这个方法返回多个partition,存放在一个数字中

    2)

      /**

       * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only

       * be called once, so it is safe to implement a time-consuming computation in it.

       * 子类实现返回这个RDD如何取决于父RDDS。这种方法将只被调用一次,因此它是安全的,它来实现一个耗时的计算。

       *

       */

      protected def getDependencies: Seq[Dependency[_]] = deps

    它返回一个依赖关系的Seq集合

    3)

      /**

       * :: DeveloperApi ::

       * Implemented by subclasses to compute a given partition.

       * 子类实现的计算一个给定的分区。

       */

      @DeveloperApi

      def compute(split: Partition, context: TaskContext): Iterator[T]

    每个RDD都有一个对应的具体计算函数

    4)

      /**

       * Optionally overridden by subclasses to specify placement preferences.

       */

      protected def getPreferredLocations(split: Partition): Seq[String] = Nil

    获取partition的首选位置,这是分区策略。

    RDD Transformations and action

    RDD 数据操作主要有两个动作:

    Transformations(转换)

    map(f : T ) U) : RDD[T] ) RDD[U]
    filter(f : T ) Bool) : RDD[T] ) RDD[T]
    flatMap(f : T ) Seq[U]) : RDD[T] ) RDD[U]
    sample(fraction : Float) : RDD[T] ) RDD[T] (Deterministic sampling)
    groupByKey() : RDD[(K, V)] ) RDD[(K, Seq[V])]
    reduceByKey(f : (V; V) ) V) : RDD[(K, V)] ) RDD[(K, V)]
    union() : (RDD[T]; RDD[T]) ) RDD[T]
    join() : (RDD[(K, V)]; RDD[(K, W)]) ) RDD[(K, (V, W))]
    cogroup() : (RDD[(K, V)]; RDD[(K, W)]) ) RDD[(K, (Seq[V], Seq[W]))]
    crossProduct() : (RDD[T]; RDD[U]) ) RDD[(T, U)]
    mapValues(f : V ) W) : RDD[(K, V)] ) RDD[(K, W)] (Preserves partitioning)
    sort(c : Comparator[K]) : RDD[(K, V)] ) RDD[(K, V)]
    partitionBy(p : Partitioner[K]) : RDD[(K, V)] ) RDD[(K, V)]

    Action(动作)

    count() : RDD[T] ) Long
    collect() : RDD[T] ) Seq[T]
    reduce(f : (T; T) ) T) : RDD[T] ) T
    lookup(k : K) : RDD[(K, V)] ) Seq[V] (On hash/range partitioned RDDs)
    save(path : String) : Outputs RDD to a storage system, e.g., HDFS

    先看下Transformations部分

      // Transformations (return a new RDD)

      /**

       * Return a new RDD by applying a function to all elements of this RDD.

       */

      def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))

      /**

       *  Return a new RDD by first applying a function to all elements of this

       *  RDD, and then flattening the results.

       */

      def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =

        new FlatMappedRDD(this, sc.clean(f))

      /**

       * Return a new RDD containing only the elements that satisfy a predicate.

       */

      def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))

    ......

    Map

      /**

       * Return a new RDD by applying a function to all elements of this RDD.

       */

      def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))

    返回一个MappedRDD,它继承RDD并重写了两个方法getPartitions、compute

    第一个方法getPartitions,他获取第一个父RDD,并获取分片数组

    override def getPartitions: Array[Partition] = firstParent[T].partitions

    第二个方法compute,将根据map参数内容来遍历RDD分区

    override def compute(split: Partition, context: TaskContext) =

        firstParent[T].iterator(split, context).map(f)

    filter

       /**

       * Return a new RDD containing only the elements that satisfy a predicate.

       */

      def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))

    Filter是一个过滤操作,比如mapRDD.filter(_ >1)

    Union

      /**

       * Return the union of this RDD and another one. Any identical elements will appear multiple

       * times (use `.distinct()` to eliminate them).

       */

      def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other))

    多个RDD组成成一个新RDD,它重写了RDD的5个方法getPartitions、getDependencies、compute、getPreferredLocations、clearDependencies

    从getPartitions、getDependencies中可以看出它应该是一组宽依赖关系

      override def getDependencies: Seq[Dependency[_]] = {

        val deps = new ArrayBuffer[Dependency[_]]

        var pos = 0

        for (rdd <- rdds) {

          deps += new RangeDependency(rdd, 0, pos, rdd.partitions.size)

          pos += rdd.partitions.size

        }

        deps

      }

    groupBy

      /**

       * Return an RDD of grouped items. Each group consists of a key and a sequence of elements

       * mapping to that key.

       *

       * Note: This operation may be very expensive. If you are grouping in order to perform an

       * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]

       * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.

       */

      def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =

        groupBy[K](f, defaultPartitioner(this))

    根据参数分组,这又产生了一个新的RDD

    Action

    Count

      /**

       * Return the number of elements in the RDD.

       */

      def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

    跟踪代码,在runJob方法中调用了dagScheduler.runJob。而在DAGScheduler,将提交到作业调度器,并获得JobWaiter对象返回。该JobWaiter对象可以用来阻塞,直到任务完成执行或可以用来取消作业。

    RDD中的任务调度


    从这个图中:

    RDD Object产生DAG,然后进入DAGScheduler阶段:

    1、DAGScheduler是面向Stage的高层次调度器,DAGScheduler会将DAG拆分成很多个      tasks,而一组tasks就是图中的stage。

    2、每一次shuffle的过程就会产生一个新的stage。DAGScheduler会有RDD记录磁盘的物· 理化操作,为了获得最有tasks,DAGSchulder会先查找本地tasks。

    3、DAGScheduler还要监控shuffle产生的失败任务,如果还得重启

    DAGScheduler划分stage后,会以TaskSet为单位把任务提交给TaskScheduler:

    1、一个TaskScheduler只为一个sparkConext服务。

    2、当接收到TaskSet后,它会把任务提交给Worker节点的Executor中去运行。失败的任务

    由TaskScheduler监控重启。

    Executor是以多线程的方式运行,每个线程都负责一个任务。

    接下来跟踪一个spark提供的例子源码:

    源码packageorg.apache.spark.examples.SparkPi

      def main(args: Array[String]) {

        //设置一个应用名称(用于在Web UI中显示)

    val conf = new SparkConf().setAppName("Spark Pi")

    //实例化一个SparkContext

    val spark = new SparkContext(conf)

    //转成数据

        val slices = if (args.length > 0) args(0).toInt else 2

        val n = 100000 * slices

        val count = spark.parallelize(1 to n, slices).map { i =>

          val x = random * 2 - 1

          val y = random * 2 - 1

          if (x*x + y*y < 1) 1 else 0

        }.reduce(_ + _)

        println("Pi is roughly " + 4.0 * count / n)

        spark.stop()

      }

    代码中的parallelize是一个并行化的延迟加载,跟踪源码

      /** Distribute a local Scala collection to form an RDD.

       *  从RDD中分配一个本地的scala集合

       * @note Parallelize acts lazily. If `seq` is a mutable collection and is

       * altered after the call to parallelize and before the first action on the

       * RDD, the resultant RDD will reflect the modified collection. Pass a copy of

       * the argument to avoid this.

       */

      def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {

        new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())

      }

    它调用了RDD中的map,上面说过的map是一个转换过程,将生成一个新的RDD。最后reduce。

    在shell中弄一个单词统计例子:

    scala> val rdd = sc.textFile("hdfs://192.168.0.245:8020/test/README.md")

    14/12/18 01:12:26 INFO storage.MemoryStore: ensureFreeSpace(82180) called with curMem=331133, maxMem=280248975

    14/12/18 01:12:26 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 80.3 KB, free 266.9 MB)

    rdd: org.apache.spark.rdd.RDD[String] = hdfs://192.168.0.245:8020/test/README.md MappedRDD[7] at textFile at <console>:12

    scala> rdd.toDebugString

    14/12/18 01:12:29 INFO mapred.FileInputFormat: Total input paths to process : 1

    res3: String =

    (1) hdfs://192.168.0.245:8020/test/README.md MappedRDD[7] at textFile at <console>:12

     |  hdfs://192.168.0.245:8020/test/README.md HadoopRDD[6] at textFile at <console>:12

    Sc是从hdfs中读取数据,那在debugString中他就转换成了HadoopRDD

    scala> val result = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

    14/12/18 01:14:51 INFO spark.SparkContext: Starting job: collect at <console>:14

    14/12/18 01:14:51 INFO scheduler.DAGScheduler: Registering RDD 9 (map at <console>:14)

    14/12/18 01:14:51 INFO scheduler.DAGScheduler: Got job 0 (collect at <console>:14) with 1 output partitions (allowLocal=false)

    14/12/18 01:14:51 INFO scheduler.DAGScheduler: Final stage: Stage 0(collect at <console>:14)

    14/12/18 01:14:51 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 1)

    14/12/18 01:14:51 INFO scheduler.DAGScheduler: Missing parents: List(Stage 1)

    14/12/18 01:14:51 INFO scheduler.DAGScheduler: Submitting Stage 1 (MappedRDD[9] at map at <console>:14), which has no missing parents

    14/12/18 01:14:51 INFO storage.MemoryStore: ensureFreeSpace(3440) called with curMem=413313, maxMem=280248975

    14/12/18 01:14:51 INFO storage.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 3.4 KB, free 266.9 MB)

    14/12/18 01:14:51 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 1 (MappedRDD[9] at map at <console>:14)

    14/12/18 01:14:51 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks

    14/12/18 01:14:51 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 0, localhost, ANY, 1185 bytes)

    14/12/18 01:14:51 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 0)

    14/12/18 01:14:51 INFO rdd.HadoopRDD: Input split: hdfs://192.168.0.245:8020/test/README.md:0+4811

    14/12/18 01:14:51 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id

    14/12/18 01:14:51 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id

    14/12/18 01:14:51 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap

    14/12/18 01:14:51 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition

    14/12/18 01:14:51 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id

    14/12/18 01:14:52 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 0). 1860 bytes result sent to driver

    14/12/18 01:14:53 INFO scheduler.DAGScheduler: Stage 1 (map at <console>:14) finished in 1.450 s

    14/12/18 01:14:53 INFO scheduler.DAGScheduler: looking for newly runnable stages

    14/12/18 01:14:53 INFO scheduler.DAGScheduler: running: Set()

    14/12/18 01:14:53 INFO scheduler.DAGScheduler: waiting: Set(Stage 0)

    14/12/18 01:14:53 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 0) in 1419 ms on localhost (1/1)

    14/12/18 01:14:53 INFO scheduler.DAGScheduler: failed: Set()

    14/12/18 01:14:53 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool

    14/12/18 01:14:53 INFO scheduler.DAGScheduler: Missing parents for Stage 0: List()

    14/12/18 01:14:53 INFO scheduler.DAGScheduler: Submitting Stage 0 (ShuffledRDD[10] at reduceByKey at <console>:14), which is now runnable

    14/12/18 01:14:53 INFO storage.MemoryStore: ensureFreeSpace(2112) called with curMem=416753, maxMem=280248975

    14/12/18 01:14:53 INFO storage.MemoryStore: Block broadcast_5 stored as values in memory (estimated size 2.1 KB, free 266.9 MB)

    14/12/18 01:14:53 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (ShuffledRDD[10] at reduceByKey at <console>:14)

    14/12/18 01:14:53 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks

    14/12/18 01:14:53 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 948 bytes)

    14/12/18 01:14:53 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 1)

    14/12/18 01:14:53 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329

    14/12/18 01:14:53 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks

    14/12/18 01:14:53 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 5 ms

    14/12/18 01:14:53 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 1). 8680 bytes result sent to driver

    14/12/18 01:14:53 INFO scheduler.DAGScheduler: Stage 0 (collect at <console>:14) finished in 0.108 s

    14/12/18 01:14:53 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 1) in 99 ms on localhost (1/1)

    14/12/18 01:14:53 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool

    14/12/18 01:14:53 INFO spark.SparkContext: Job finished: collect at <console>:14, took 1.884598939 s

    result: Array[(String, Int)] = Array((For,5), (Programs,1), (gladly,1), (Because,1), (The,1), (agree,1), (cluster.,1), (webpage,1), (its,1), (-Pyarn,3), (under,2), (legal,1), (APIs,1), (1.x,,1), (computation,1), (Try,1), (MRv1,,1), (have,2), (Thrift,2), (add,2), (through,1), (several,1), (This,2), (Whether,1), ("yarn-cluster",1), (%,2), (graph,1), (storage,1), (To,2), (setting,2), (any,2), (Once,1), (application,1), (JDBC,3), (use:,1), (prefer,1), (SparkPi,2), (engine,1), (version,3), (file,1), (documentation,,1), (processing,,2), (Along,1), (the,28), (explicitly,,1), (entry,1), (author.,1), (are,2), (systems.,1), (params,1), (not,2), (different,1), (refer,1), (Interactive,2), (given.,1), (if,5), (`-Pyarn`:,1), (build,3), (when,3), (be,2), (Tests,1), (file's,1), (Apache,6), (./bin/run-e...

    根据空格来区分单词后,各个单词的统计结果


  • 相关阅读:
    使用Supervisor守护Python进程
    Python 程序员经常犯的 10 个错误
    DNS的view加速
    转-Gitorious搭建步骤
    linux下curl的地址使用双引号引用的原因
    java poi ppt操作示例
    CentOS 6.3下Samba服务器的安装与配置
    Linux NFS服务器的安装与配置
    Linux下安装JDK和tomcat
    Oracle数据库合并行记录,WMSYS.WM_CONCAT 函數的用法
  • 原文地址:https://www.cnblogs.com/huwf/p/4273379.html
Copyright © 2011-2022 走看看