zoukankan      html  css  js  c++  java
  • Spark 概念学习系列之从物理执行的角度透视spark Job(十七)

      

    本博文主要内容:

      1、再次思考pipeline

      2、窄依赖物理执行内幕

      3、宽依赖物理执行内幕

      4、Job提交流程

     

    一:再次思考pipeline

      即使采用pipeline的方式,函数f对依赖的RDD中的数据的操作也会有2种方式:

        1:f(record), f作用于集合的每一条记录,每次只作用于一条记录。

        2、f(redord), f一次性作用于集合的全部数据。

         Spark采用的是第一种方式,原因:

        1、spark无需等待,可以最大化的使用集群计算资源。

        2、减少OOM的发生

        3、最大化的有利于开发

        4、可以精准的控制每一个Partition本身(Dependency)及内部的计算(computer)

        5、基于lineage的算子流动函数式编程,节省了中间结果的产生,并可以最快的恢复

      疑问:会不会增加网络通信?当然不会, 因为在pipeline!

    二: 思考Spark Job 具体的物理执行

      Spark Application 里面可以产生1个或者多个job,例如spark-shell默认启动的时候内部就没有Job,只是作为资源的分配程序,可以在spark-shell里面写代码

    产生若干个Job,普通程序中一般而言可以有不同的Action,每一个Action一般也就触发一个/job.

      Spark 是 MapReduce思想的一种更加精致和高效的实现,MapReduce有很多具体不同的实现,例如Hadoop 的Mapreduce基本计算流程如下

    :首先是以JVM为对象的并发 执行Mapper,Mapper中map的执行会产生输出数据,输出数据会经过Partition指定的规则放在Local FileSystem中,然后

    经由Shuffle、 sort、Aggreate变成Reducer中的reduce的输入,执行reduce产生最终的执行结果:Hadoop Mapreduce执行的流程虽然很简单,但是过于死板,尤其

    在构造复杂算法(迭代)的时候非常不利于算法的实现。且执行效率极为低下。

      Spark算法构造和物理执行是最基本核心算法:最大化pipeline!

        基于Pipeline的思想,数据被使用的时候才开始计算,从数据流动的角度来讲,是数据流动到计算的位置!!!实际上从逻辑的角度来看, 是算子在数据上流动。

      从算法构建的角度而言:肯定是算子作用于数据,所以是算子在数据上流动,方便算法的构建!

      从物理执行的角度而言:是数据流动到计算的位置。方便系统最为高效的运行!

      对于pipeline而言,数据计算的位置就是每个Stage中最后的RDD, 一个震撼人心的内幕真想就是:每个Stage中除了最后一个RDD 算子是真实的外,前面的算子都是假的。

         由于计算的Lazy特性,导致计算从后往前回溯形成Computing  Chain,导致的结果就是需要首先计算出具体一个Stage内部左侧的RDD中本次计算依赖的Partiton。

      

      三:窄依赖的物理执行内幕

      1、 一个Stage内部的RDD都是窄依赖,窄依赖计算本身是逻辑上看是从Stage内部最左侧的RDD开始立即计算的,根据Computing Chain,数据

    从一个计算步骤流动到下一个结算步骤,以此类推(算的时候从前往后), 直到计算到Stage内部的最后一个RDD产生计算结果。

      Computiing Chain 的构建是从后往前回溯构建而成,而实际的物理计算则是让数据从前往后在算子上流动,直到流动到不能在流动位置才开始计算下一个Record。这就导致一个美好的结果后面的RDD 对前面RDD的依赖虽然是Partition级别数据集合的依赖,但是并不需要父RDD把partition中所有Records计算完毕才整体往后流动数据进行计算,这就极大的 提高了计算速率!

    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *    http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.apache.spark.rdd
    
    import scala.reflect.ClassTag
    
    import org.apache.spark.{Partition, TaskContext}
    
    /**
     * An RDD that applies the provided function to every partition of the parent RDD.
     */
    private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
        prev: RDD[T],
        f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
        preservesPartitioning: Boolean = false)
      extends RDD[U](prev) {
    
      override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
    
      override def getPartitions: Array[Partition] = firstParent[T].partitions
    
      override def compute(split: Partition, context: TaskContext): Iterator[U] =
        f(context, split.index, firstParent[T].iterator(split, context))
    }




    四: 宽依赖物理执行内幕

      提示:写代码的时候尽量减少宽依赖

      必须等待依赖的父Stage中的最后一个RDD把全部数据彻底计算完成,才能够经过shuffle来计算当前的Stage

      遇到 shuffle级别的就是形成stage

      所有依赖父Stage,是拿所有Stage的数据还是拿一部分数据:拿一部分数据,算一部分。

      计算数据是从Dependency来的;

      spark作业提交都是触发Action

    源码分析类:

        ShuffleDependency  
        --->
    MapPartitionRDD   override def compute(split : org.apache.spark.Partition, context : org.apache.spark.TaskContext) : scala.Iterator[U]
         -->
       RDD  (count,--runJob-onReceive-doOnReceive)

      

    ==========宽依赖物理执行内幕 ============

    必须等到依赖的父Stage中的最后一个RDD把全部数据彻底计算完毕才能够经过shuffle来计算当前的Stage。

    这样写代码的时候尽量避免宽依赖!!!

    /**
     * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
     * be called once, so it is safe to implement a time-consuming computation in it.
     */
    protected def getDependencies: Seq[Dependency[_]] = deps

    compute负责接受父Stage的数据流,计算出record

    五、Job提交流程

      

    ==========Job提交流程 ============

    作业提交,触发Action

    /**
     * Run a function on a given set of partitions in an RDD and pass the results to the given
     * handler function. This is the main entry point for all actions in Spark.
     */
    def runJob[T, U: ClassTag](
        rdd: RDD[T],
        func: (TaskContext, Iterator[T]) => U,
        partitions: Seq[Int],
        resultHandler: (Int, U) => Unit): Unit = {
      if (stopped.get()) {
        throw new IllegalStateException("SparkContext has been shutdown")
      }
      val callSite = getCallSite
      val cleanedFunc = clean(func)
      logInfo("Starting job: " + callSite.shortForm)
      if (conf.getBoolean("spark.logLineage", false)) {
        logInfo("RDD‘s recursive dependencies:
    " + rdd.toDebugString)
      }
      dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
      progressBar.foreach(_.finishAll())
      rdd.doCheckpoint()
    }
    
    /**
     * Run an action job on the given RDD and pass all the results to the resultHandler function as
     * they arrive.
     *
     * @param rdd target RDD to run tasks on
     * @param func a function to run on each partition of the RDD
     * @param partitions set of partitions to run on; some jobs may not want to compute on all
     *   partitions of the target RDD, e.g. for operations like first()
     * @param callSite where in the user program this job was called
     * @param resultHandler callback to pass each result to
     * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
     *
     * @throws Exception when the job fails
     */
    def runJob[T, U](
        rdd: RDD[T],
        func: (TaskContext, Iterator[T]) => U,
        partitions: Seq[Int],
        callSite: CallSite,
        resultHandler: (Int, U) => Unit,
        properties: Properties): Unit = {
      val start = System.nanoTime
      val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
      waiter.awaitResult() match {
        case JobSucceeded =>
          logInfo("Job %d finished: %s, took %f s".format
            (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        case JobFailed(exception: Exception) =>
          logInfo("Job %d failed: %s, took %f s".format
            (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
          // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
          val callerStackTrace = Thread.currentThread().getStackTrace.tail
          exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
          throw exception
      }
    }
    
    /**
     * Submit an action job to the scheduler.
     *
     * @param rdd target RDD to run tasks on
     * @param func a function to run on each partition of the RDD
     * @param partitions set of partitions to run on; some jobs may not want to compute on all
     *   partitions of the target RDD, e.g. for operations like first()
     * @param callSite where in the user program this job was called
     * @param resultHandler callback to pass each result to
     * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
     *
     * @return a JobWaiter object that can be used to block until the job finishes executing
     *         or can be used to cancel the job.
     *
     * @throws IllegalArgumentException when partitions ids are illegal
     */
    def submitJob[T, U](
        rdd: RDD[T],
        func: (TaskContext, Iterator[T]) => U,
        partitions: Seq[Int],
        callSite: CallSite,
        resultHandler: (Int, U) => Unit,
        properties: Properties): JobWaiter[U] = {
      // Check to make sure we are not launching a task on a partition that does not exist.
      val maxPartitions = rdd.partitions.length
      partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
        throw new IllegalArgumentException(
          "Attempting to access a non-existent partition: " + p + ". " +
            "Total number of partitions: " + maxPartitions)
      }
    
      val jobId = nextJobId.getAndIncrement()
      if (partitions.size == 0) {
        // Return immediately if the job is running 0 tasks
        return new JobWaiter[U](this, jobId, 0, resultHandler)
      }
    
      assert(partitions.size > 0)
      val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
      val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
      eventProcessLoop.post(JobSubmitted(
        jobId, rdd, func2, partitions.toArray, callSite, waiter,
        SerializationUtils.clone(properties)))
      waiter
    }

    作业:

      写一下我理解中的spark job物理执行。

    感谢下面的博主:

    http://feiweihy.blog.51cto.com/6389397/1743588 

  • 相关阅读:
    次小生成树
    [bzoj5329] P4606 [SDOI2018]战略游戏
    CF487E Tourists
    P3225 [HNOI2012]矿场搭建
    CF #636 (Div. 3) 对应题号CF1343
    P3469 [POI2008]BLO-Blockade
    大假期集训模拟赛12
    大假期集训模拟赛11
    大假期集训模拟赛10
    小奇画画——BFS
  • 原文地址:https://www.cnblogs.com/zlslch/p/5942347.html
Copyright © 2011-2022 走看看