zoukankan      html  css  js  c++  java
  • Apache Spark-1.0.0浅析(三 ):资源调度——Job提交

    基本概念:Job,Stage,Task,DagScheduler,TaskScheduler……

    RDD的操作可以分为Transformations和Actions,Transformations是lazy的不立即执行,Action则会触发作业的提交和执行。例如本例中的foreach

    def foreach(f: T => Unit) {
      sc.runJob(this, (iter: Iterator[T]) => iter.foreach(f))
    }

    一句话,Actions调用sc.runJob触发作业运行。

    SparkContext中的runJob有多个版本的重载

    foreach调用的版本,以rdd和func为参数,返回执行的结果

    /**
       * Run a job on all partitions in an RDD and return the results in an array.
       */
      def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
        runJob(rdd, func, 0 until rdd.partitions.size, false)
      }

    然后,进入下一个runJob,加入参数partitions和allowLocal

    /**
       * Run a job on a given set of partitions of an RDD, but take a function of type
       * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
       */
      def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: Iterator[T] => U,
          partitions: Seq[Int],
          allowLocal: Boolean
          ): Array[U] = {
        runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal)
      }

    之后调用下一个runJob,将结果返回到result数组中

    /**
       * Run a function on a given set of partitions in an RDD and return the results as an array. The
       * allowLocal flag specifies whether the scheduler can run the computation on the driver rather
       * than shipping it out to the cluster, for short actions like first().
       */
      def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          allowLocal: Boolean
          ): Array[U] = {
        val results = new Array[U](partitions.size)
        runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res)
        results
      }

    最后调用,参数中加入resultHandler句柄

    /**
       * Run a function on a given set of partitions in an RDD and pass the results to the given
       * handler function. This is the main entry point for all actions in Spark. The allowLocal
       * flag specifies whether the scheduler can run the computation on the driver rather than
       * shipping it out to the cluster, for short actions like first().
       */
      def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          allowLocal: Boolean,
          resultHandler: (Int, U) => Unit) {
        if (dagScheduler == null) {
          throw new SparkException("SparkContext has been shutdown")
        }
        val callSite = getCallSite
        val cleanedFunc = clean(func)
        logInfo("Starting job: " + callSite)
        val start = System.nanoTime
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
          resultHandler, localProperties.get)
        logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
        rdd.doCheckpoint()
      }

    sc.runJob最终调用dagScheduler.runJob。

    需要提到的一点是

    val cleanedFunc = clean(func)

    其作用在注释中

    /**
       * Clean a closure to make it ready to serialized and send to tasks
       * (removes unreferenced variables in $outer's, updates REPL variables)
       */
      private[spark] def clean[F <: AnyRef](f: F): F = {
        ClosureCleaner.clean(f)
        f
      }

    END

  • 相关阅读:
    IM开发快速入门(一):什么是IM系统?
    IM群聊消息的已读未读功能在存储空间方面的实现思路探讨
    Android保活从入门到放弃:乖乖引导用户加白名单吧(附7大机型加白示例)
    IM开发干货分享:我是如何解决大量离线消息导致客户端卡顿的
    LetsTalk_Android中引导用户加入白名单图-2
    LetsTalk_Android中引导用户加入白名单图
    基于Go的马蜂窝旅游网分布式IM系统技术实践
    2020年了,Android后台保活还有戏吗?看我如何优雅的实现!
    微信团队分享:极致优化,iOS版微信编译速度3倍提升的实践总结
    史上最通俗,彻底搞懂字符乱码问题的本质
  • 原文地址:https://www.cnblogs.com/kevingu/p/4677196.html
Copyright © 2011-2022 走看看