zoukankan      html  css  js  c++  java
  • Apache Spark-1.0.0浅析(三 ):资源调度——Job提交

    基本概念:Job,Stage,Task,DagScheduler,TaskScheduler……

    RDD的操作可以分为Transformations和Actions,Transformations是lazy的不立即执行,Action则会触发作业的提交和执行。例如本例中的foreach

    def foreach(f: T => Unit) {
      sc.runJob(this, (iter: Iterator[T]) => iter.foreach(f))
    }

    一句话,Actions调用sc.runJob触发作业运行。

    SparkContext中的runJob有多个版本的重载

    foreach调用的版本,以rdd和func为参数,返回执行的结果

    /**
       * Run a job on all partitions in an RDD and return the results in an array.
       */
      def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
        runJob(rdd, func, 0 until rdd.partitions.size, false)
      }

    然后,进入下一个runJob,加入参数partitions和allowLocal

    /**
       * Run a job on a given set of partitions of an RDD, but take a function of type
       * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
       */
      def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: Iterator[T] => U,
          partitions: Seq[Int],
          allowLocal: Boolean
          ): Array[U] = {
        runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal)
      }

    之后调用下一个runJob,将结果返回到result数组中

    /**
       * Run a function on a given set of partitions in an RDD and return the results as an array. The
       * allowLocal flag specifies whether the scheduler can run the computation on the driver rather
       * than shipping it out to the cluster, for short actions like first().
       */
      def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          allowLocal: Boolean
          ): Array[U] = {
        val results = new Array[U](partitions.size)
        runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res)
        results
      }

    最后调用,参数中加入resultHandler句柄

    /**
       * Run a function on a given set of partitions in an RDD and pass the results to the given
       * handler function. This is the main entry point for all actions in Spark. The allowLocal
       * flag specifies whether the scheduler can run the computation on the driver rather than
       * shipping it out to the cluster, for short actions like first().
       */
      def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          allowLocal: Boolean,
          resultHandler: (Int, U) => Unit) {
        if (dagScheduler == null) {
          throw new SparkException("SparkContext has been shutdown")
        }
        val callSite = getCallSite
        val cleanedFunc = clean(func)
        logInfo("Starting job: " + callSite)
        val start = System.nanoTime
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
          resultHandler, localProperties.get)
        logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
        rdd.doCheckpoint()
      }

    sc.runJob最终调用dagScheduler.runJob。

    需要提到的一点是

    val cleanedFunc = clean(func)

    其作用在注释中

    /**
       * Clean a closure to make it ready to serialized and send to tasks
       * (removes unreferenced variables in $outer's, updates REPL variables)
       */
      private[spark] def clean[F <: AnyRef](f: F): F = {
        ClosureCleaner.clean(f)
        f
      }

    END

  • 相关阅读:
    【cocos2d-js公文】十七、事件分发机制
    UVA它11292
    定义自己的仪表板DashBoard
    ufldl学习笔记和编程作业:Feature Extraction Using Convolution,Pooling(卷积和汇集特征提取)
    互联网和移动互联网怎么赚钱?
    android布局margin和padding差异!
    解决ORA-28000: the account is locked
    网络编程基本知识
    10个优秀的 HTML5 &amp; CSS3 下拉菜单制作教程
    Codeforces554A:Kyoya and Photobooks
  • 原文地址:https://www.cnblogs.com/kevingu/p/4677196.html
Copyright © 2011-2022 走看看