zoukankan      html  css  js  c++  java
  • Apache Spark-1.0.0浅析(七):资源调度——结果返回

    对于ResultTask,直接执行func操作,最后告知任务是否执行完成;而对于ShuffleMapTask,则需要将中间结果存储到实例化DirectTaskResult,以备下一个task使用,同时还要返回实例化的MapStatus。

    Executor.run中,当Task执行完毕调用execBackend.statusUpdate,在CoarseGrainedExecutorBackend继承了ExecutorBackend,重新定义statusUpdate,向driver发送StatusUpdate消息

    override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
        driver ! StatusUpdate(executorId, taskId, state, data)
      }
    }

    CoaseGrainedSchedulerBackend中定义的driverActor接收,首先执行scheduler.statusUpdate,更新状态,释放资源

    case StatusUpdate(executorId, taskId, state, data) =>
            scheduler.statusUpdate(taskId, state, data.value)
            if (TaskState.isFinished(state)) {
              if (executorActor.contains(executorId)) {
                freeCores(executorId) += scheduler.CPUS_PER_TASK
                makeOffers(executorId)
              } else {
                // Ignoring the update since we don't know about the executor.
                val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"
                logWarning(msg.format(taskId, state, sender, executorId))
              }
            }

    scheduler.statusUpdate主要移除当前完成的task,同时更新taskSets

    def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
        var failedExecutor: Option[String] = None
        synchronized {
          try {
            if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
              // We lost this entire executor, so remember that it's gone
              val execId = taskIdToExecutorId(tid)
              if (activeExecutorIds.contains(execId)) {
                removeExecutor(execId)
                failedExecutor = Some(execId)
              }
            }
            taskIdToTaskSetId.get(tid) match {
              case Some(taskSetId) =>
                if (TaskState.isFinished(state)) {
                  taskIdToTaskSetId.remove(tid)
                  taskIdToExecutorId.remove(tid)
                }
                activeTaskSets.get(taskSetId).foreach { taskSet =>
                  if (state == TaskState.FINISHED) {
                    taskSet.removeRunningTask(tid)
                    taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
                  } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
                    taskSet.removeRunningTask(tid)
                    taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
                  }
                }
              case None =>
                logError(
                  ("Ignoring update with state %s for TID %s because its task set is gone (this is " +
                   "likely the result of receiving duplicate task finished status updates)")
                  .format(state, tid))
            }
          } catch {
            case e: Exception => logError("Exception in statusUpdate", e)
          }
        }
        // Update the DAGScheduler without holding a lock on this, since that can deadlock
        if (failedExecutor.isDefined) {
          dagScheduler.executorLost(failedExecutor.get)
          backend.reviveOffers()
        }
      }

    其中,主要语句是taskResultGetter.enqueueSuccessfulTask,首先获得反序列化的结果数据,分为直接结果或非直接结果处理,最后执行scheduler.handleSuccessfulTask

    def enqueueSuccessfulTask(
        taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
        getTaskResultExecutor.execute(new Runnable {
          override def run(): Unit = Utils.logUncaughtExceptions {
            try {
              val result = serializer.get().deserialize[TaskResult[_]](serializedData) match {
                case directResult: DirectTaskResult[_] => directResult
                case IndirectTaskResult(blockId) =>
                  logDebug("Fetching indirect task result for TID %s".format(tid))
                  scheduler.handleTaskGettingResult(taskSetManager, tid)
                  val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
                  if (!serializedTaskResult.isDefined) {
                    /* We won't be able to get the task result if the machine that ran the task failed
                     * between when the task ended and when we tried to fetch the result, or if the
                     * block manager had to flush the result. */
                    scheduler.handleFailedTask(
                      taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
                    return
                  }
                  val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
                    serializedTaskResult.get)
                  sparkEnv.blockManager.master.removeBlock(blockId)
                  deserializedResult
              }
              result.metrics.resultSize = serializedData.limit()
              scheduler.handleSuccessfulTask(taskSetManager, tid, result)
            } catch {
              case cnf: ClassNotFoundException =>
                val loader = Thread.currentThread.getContextClassLoader
                taskSetManager.abort("ClassNotFound with classloader: " + loader)
              case ex: Exception =>
                taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex))
            }
          }
        })
      }
    scheduler.handleSuccessfulTask在TaskSchedulerImpl中定义如下,仅调用taskSetManager.handleSuccessfulTask
    def handleSuccessfulTask(
        taskSetManager: TaskSetManager,
        tid: Long,
        taskResult: DirectTaskResult[_]) = synchronized {
        taskSetManager.handleSuccessfulTask(tid, taskResult)
      }

    taskSetManager.handleSuccessfulTask,将task标记为successful,从RunningTask中移除,然后调用sched.dagScheduler.taskEnded

    /**
       * Marks the task as successful and notifies the DAGScheduler that a task has ended.
       */
      def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) = {
        val info = taskInfos(tid)
        val index = info.index
        info.markSuccessful()
        removeRunningTask(tid)
        sched.dagScheduler.taskEnded(
          tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
        if (!successful(index)) {
          tasksSuccessful += 1
          logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format(
            tid, info.duration, info.host, tasksSuccessful, numTasks))
          // Mark successful and stop if all the tasks have succeeded.
          successful(index) = true
          if (tasksSuccessful == numTasks) {
            isZombie = true
          }
        } else {
          logInfo("Ignorning task-finished event for TID " + tid + " because task " +
            index + " has already completed successfully")
        }
        failedExecutors.remove(index)
        maybeFinishTaskSet()
      }

    sched.dagScheduler,taskEnded向eventProcessActor发送CompletionEvent消息

    // Called by TaskScheduler to report task completions or failures.
      def taskEnded(
          task: Task[_],
          reason: TaskEndReason,
          result: Any,
          accumUpdates: Map[Long, Any],
          taskInfo: TaskInfo,
          taskMetrics: TaskMetrics) {
        eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)

    DAGScheduler中定义接收响应,调用dagScheduler.handleTaskCompletion

    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
          dagScheduler.handleTaskCompletion(completion)

    dagScheduler.handleTaskCompletion,如果是ResultTask,首先向listenerBus发送SparkListenerTaskEnd,获得task对应的stage,定义了一个本地方法markStageAsFinished,后续调用,判断事件类型,包含Success、Resubmitted、FetchFailed、ExceptionFailure、TaskResultLost等,最后submitWaitingStages()提交等待(依赖)的stages。

    如果是Success事件,则进一步判断task是ResultTask或者ShuffleMapTask,如果是ResultTask,将task所属stage中的该部output标记为已完成,最后调用job.listener.taskSucceeded,如果整个stage完成,则标记markStageAsFinished,向listenerBus发送SparkListenerJobEnd。

    若是ShuffleMapTask,记录task在executor完成,addOutputLoc添加Shuffle output location,markStageAsFinished,判断如果该stage是runningStages且该stage挂起的tasks为空,主要动作是getMissingParentStages获得依赖waitingStages,最后submitMissingTasks提交依赖tasks

    /**
       * Responds to a task finishing. This is called inside the event loop so it assumes that it can
       * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
       */
      private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
        val task = event.task
        val stageId = task.stageId
        val taskType = Utils.getFormattedClassName(task)
        listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
          event.taskMetrics))
        if (!stageIdToStage.contains(task.stageId)) {
          // Skip all the actions if the stage has been cancelled.
          return
        }
        val stage = stageIdToStage(task.stageId)
    
        def markStageAsFinished(stage: Stage) = {
          val serviceTime = stageToInfos(stage).submissionTime match {
            case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0)
            case _ => "Unknown"
          }
          logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
          stageToInfos(stage).completionTime = Some(System.currentTimeMillis())
          listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
          runningStages -= stage
        }
        event.reason match {
          case Success =>
            logInfo("Completed " + task)
            if (event.accumUpdates != null) {
              Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
            }
            pendingTasks(stage) -= task
            task match {
              case rt: ResultTask[_, _] =>
                resultStageToJob.get(stage) match {
                  case Some(job) =>
                    if (!job.finished(rt.outputId)) {
                      job.finished(rt.outputId) = true
                      job.numFinished += 1
                      // If the whole job has finished, remove it
                      if (job.numFinished == job.numPartitions) {
                        markStageAsFinished(stage)
                        cleanupStateForJobAndIndependentStages(job, Some(stage))
                        listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
                      }
                      job.listener.taskSucceeded(rt.outputId, event.result)
                    }
                  case None =>
                    logInfo("Ignoring result from " + rt + " because its job has finished")
                }
    
              case smt: ShuffleMapTask =>
                val status = event.result.asInstanceOf[MapStatus]
                val execId = status.location.executorId
                logDebug("ShuffleMapTask finished on " + execId)
                if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
                  logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
                } else {
                  stage.addOutputLoc(smt.partitionId, status)
                }
                if (runningStages.contains(stage) && pendingTasks(stage).isEmpty) {
                  markStageAsFinished(stage)
                  logInfo("looking for newly runnable stages")
                  logInfo("running: " + runningStages)
                  logInfo("waiting: " + waitingStages)
                  logInfo("failed: " + failedStages)
                  if (stage.shuffleDep.isDefined) {
                    // We supply true to increment the epoch number here in case this is a
                    // recomputation of the map outputs. In that case, some nodes may have cached
                    // locations with holes (from when we detected the error) and will need the
                    // epoch incremented to refetch them.
                    // TODO: Only increment the epoch number if this is not the first time
                    //       we registered these map outputs.
                    mapOutputTracker.registerMapOutputs(
                      stage.shuffleDep.get.shuffleId,
                      stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
                      changeEpoch = true)
                  }
                  clearCacheLocs()
                  if (stage.outputLocs.exists(_ == Nil)) {
                    // Some tasks had failed; let's resubmit this stage
                    // TODO: Lower-level scheduler should also deal with this
                    logInfo("Resubmitting " + stage + " (" + stage.name +
                      ") because some of its tasks had failed: " +
                      stage.outputLocs.zipWithIndex.filter(_._1 == Nil).map(_._2).mkString(", "))
                    submitStage(stage)
                  } else {
                    val newlyRunnable = new ArrayBuffer[Stage]
                    for (stage <- waitingStages) {
                      logInfo("Missing parents for " + stage + ": " + getMissingParentStages(stage))
                    }
                    for (stage <- waitingStages if getMissingParentStages(stage) == Nil) {
                      newlyRunnable += stage
                    }
                    waitingStages --= newlyRunnable
                    runningStages ++= newlyRunnable
                    for {
                      stage <- newlyRunnable.sortBy(_.id)
                      jobId <- activeJobForStage(stage)
                    } {
                      logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable")
                      submitMissingTasks(stage, jobId)
                    }
                  }
                }
              }
    
          case Resubmitted =>
            logInfo("Resubmitted " + task + ", so marking it as still running")
            pendingTasks(stage) += task
    
          case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
            // Mark the stage that the reducer was in as unrunnable
            val failedStage = stageIdToStage(task.stageId)
            runningStages -= failedStage
            // TODO: Cancel running tasks in the stage
            logInfo("Marking " + failedStage + " (" + failedStage.name +
              ") for resubmision due to a fetch failure")
            // Mark the map whose fetch failed as broken in the map stage
            val mapStage = shuffleToMapStage(shuffleId)
            if (mapId != -1) {
              mapStage.removeOutputLoc(mapId, bmAddress)
              mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
            }
            logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name +
              "); marking it for resubmission")
            if (failedStages.isEmpty && eventProcessActor != null) {
              // Don't schedule an event to resubmit failed stages if failed isn't empty, because
              // in that case the event will already have been scheduled. eventProcessActor may be
              // null during unit tests.
              import env.actorSystem.dispatcher
              env.actorSystem.scheduler.scheduleOnce(
                RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
            }
            failedStages += failedStage
            failedStages += mapStage
            // TODO: mark the executor as failed only if there were lots of fetch failures on it
            if (bmAddress != null) {
              handleExecutorLost(bmAddress.executorId, Some(task.epoch))
            }
    
          case ExceptionFailure(className, description, stackTrace, metrics) =>
            // Do nothing here, left up to the TaskScheduler to decide how to handle user failures
    
          case TaskResultLost =>
            // Do nothing here; the TaskScheduler handles these failures and resubmits the task.
    
          case other =>
            // Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler
            // will abort the job.
        }
        submitWaitingStages()
      }

    ResultTask执行成功调用的job.listener.taskSucceeded,JobWaiter继承了JobListener,重新定义了taskSucceeded,判断如果已完成的task数量和总共task数量相等,则意味着job完成,向所有listener发送JobSucceeded消息

    override def taskSucceeded(index: Int, result: Any): Unit = synchronized {
        if (_jobFinished) {
          throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")
        }
        resultHandler(index, result.asInstanceOf[T])
        finishedTasks += 1
        if (finishedTasks == totalTasks) {
          _jobFinished = true
          jobResult = JobSucceeded
          this.notifyAll()
        }
      }

    接DAGScheduler.runJob,waiter等待接受消息JobSucceeded消息,整个job执行完毕

    def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          callSite: String,
          allowLocal: Boolean,
          resultHandler: (Int, U) => Unit,
          properties: Properties = null)
      {
        val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
        waiter.awaitResult() match {
          case JobSucceeded => {}
          case JobFailed(exception: Exception) =>
            logInfo("Failed to run " + callSite)
            throw exception
        }
      }

    END

  • 相关阅读:
    Meten Special Activities II
    Meten Special Activities II
    Meten Special Activities II
    Meten Special Activities II
    Meten Special Activities
    Meten Special Activities
    Meten Special Activities
    Meten Special Activities
    Meten Special Activities
    冒泡排序和选择排序
  • 原文地址:https://www.cnblogs.com/kevingu/p/4678828.html
Copyright © 2011-2022 走看看