  • Apache Spark-1.0.0浅析(五):资源调度——Task创建和分发

    前面提到,submitMissingTask是分发任务的开始,首先submitMissingTasks判断该stage是否为shuffle map stage,是则getPreferredLocs,实例化一个ShuffleMapTasks返回一组task集合,否则是final stage,getPreferredLocs,实例化Result Task返回一组tasks集合;向listenerBus发送SparkListenerStageSubmitted事件;提前序列化一个task以保证其可以被序列化;最后taskScheduler.submitTasks提交TaskSet

    /** Called when stage's parents are available and we can now do its task. */
      private def submitMissingTasks(stage: Stage, jobId: Int) {
        logDebug("submitMissingTasks(" + stage + ")")
        // Get our pending tasks and remember them in our pendingTasks entry
        val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet)
        var tasks = ArrayBuffer[Task[_]]()
        if (stage.isShuffleMap) {
          for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
            val locs = getPreferredLocs(stage.rdd, p)
            tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
        } else {
          // This is a final stage; figure out its job's missing partitions
          val job = resultStageToJob(stage)
          for (id <- 0 until job.numPartitions if !job.finished(id)) {
            val partition = job.partitions(id)
            val locs = getPreferredLocs(stage.rdd, partition)
            tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
        val properties = if (jobIdToActiveJob.contains(jobId)) {
        } else {
          // this stage will be assigned to "default" pool
        // must be run listener before possible NotSerializableException
        // should be "StageSubmitted" first and then "JobEnded"
        listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties))
        if (tasks.size > 0) {
          // Preemptively serialize a task to make sure it can be serialized. We are catching this
          // exception here because it would be fairly hard to catch the non-serializable exception
          // down the road, where we have several different implementations for local scheduler and
          // cluster schedulers.
          try {
          } catch {
            case e: NotSerializableException =>
              abortStage(stage, "Task not serializable: " + e.toString)
              runningStages -= stage
          logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
          myPending ++= tasks
          logDebug("New pending tasks: " + myPending)
            new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
          stageToInfos(stage).submissionTime = Some(System.currentTimeMillis())
        } else {
          logDebug("Stage " + stage + " is actually done; %b %d %d".format(
            stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
          runningStages -= stage


    override def submitTasks(taskSet: TaskSet) {
        val tasks = taskSet.tasks
        logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
        this.synchronized {
          val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
          activeTaskSets(taskSet.id) = manager
          schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
          if (!isLocal && !hasReceivedTask) {
            starvationTimer.scheduleAtFixedRate(new TimerTask() {
              override def run() {
                if (!hasLaunchedTask) {
                  logWarning("Initial job has not accepted any resources; " +
                    "check your cluster UI to ensure that workers are registered " +
                    "and have sufficient memory")
                } else {
          hasReceivedTask = true


    override def reviveOffers() {
        driverActor ! ReviveOffers


    def receive = {
          case RegisterExecutor(executorId, hostPort, cores) =>
            Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
            if (executorActor.contains(executorId)) {
              sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
            } else {
              logInfo("Registered executor: " + sender + " with ID " + executorId)
              sender ! RegisteredExecutor(sparkProperties)
              executorActor(executorId) = sender
              executorHost(executorId) = Utils.parseHostPort(hostPort)._1
              totalCores(executorId) = cores
              freeCores(executorId) = cores
              executorAddress(executorId) = sender.path.address
              addressToExecutorId(sender.path.address) = executorId
          case StatusUpdate(executorId, taskId, state, data) =>
            scheduler.statusUpdate(taskId, state, data.value)
            if (TaskState.isFinished(state)) {
              if (executorActor.contains(executorId)) {
                freeCores(executorId) += scheduler.CPUS_PER_TASK
              } else {
                // Ignoring the update since we don't know about the executor.
                val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"
                logWarning(msg.format(taskId, state, sender, executorId))
          case ReviveOffers =>
          case KillTask(taskId, executorId, interruptThread) =>
            executorActor(executorId) ! KillTask(taskId, executorId, interruptThread)
          case StopDriver =>
            sender ! true
          case StopExecutors =>
            logInfo("Asking each executor to shut down")
            for (executor <- executorActor.values) {
              executor ! StopExecutor
            sender ! true
          case RemoveExecutor(executorId, reason) =>
            removeExecutor(executorId, reason)
            sender ! true
          case DisassociatedEvent(_, address, _) =>
              "remote Akka client disassociated"))


    // Make fake resource offers on all executors
        def makeOffers() {
            executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))


     * Represents free resources available on an executor.
    case class WorkerOffer(executorId: String, host: String, cores: Int)


       * Called by cluster manager to offer resources on slaves. We respond by asking our active task
       * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
       * that tasks are balanced across the cluster.
      def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
        // Mark each slave as alive and remember its hostname
        for (o <- offers) {
          executorIdToHost(o.executorId) = o.host
          if (!executorsByHost.contains(o.host)) {
            executorsByHost(o.host) = new HashSet[String]()
            executorAdded(o.executorId, o.host)
        // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
        val shuffledOffers = Random.shuffle(offers)
        // Build a list of tasks to assign to each worker.
        val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
        val availableCpus = shuffledOffers.map(o => o.cores).toArray
        val sortedTaskSets = rootPool.getSortedTaskSetQueue
        for (taskSet <- sortedTaskSets) {
          logDebug("parentName: %s, name: %s, runningTasks: %s".format(
            taskSet.parent.name, taskSet.name, taskSet.runningTasks))
        // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
        // of locality levels so that it gets a chance to launch local tasks on all of them.
        var launchedTask = false
        for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
          do {
            launchedTask = false
            for (i <- 0 until shuffledOffers.size) {
              val execId = shuffledOffers(i).executorId
              val host = shuffledOffers(i).host
              if (availableCpus(i) >= CPUS_PER_TASK) {
                for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
                  tasks(i) += task
                  val tid = task.taskId
                  taskIdToTaskSetId(tid) = taskSet.taskSet.id
                  taskIdToExecutorId(tid) = execId
                  activeExecutorIds += execId
                  executorsByHost(host) += execId
                  availableCpus(i) -= CPUS_PER_TASK
                  assert (availableCpus(i) >= 0)
                  launchedTask = true
          } while (launchedTask)
        if (tasks.size > 0) {
          hasLaunchedTask = true
        return tasks


       * Respond to an offer of a single executor from the scheduler by finding a task
      def resourceOffer(
          execId: String,
          host: String,
          maxLocality: TaskLocality.TaskLocality)
        : Option[TaskDescription] =
        if (!isZombie) {
          val curTime = clock.getTime()
          var allowedLocality = getAllowedLocalityLevel(curTime)
          if (allowedLocality > maxLocality) {
            allowedLocality = maxLocality   // We're not allowed to search for farther-away tasks
          findTask(execId, host, allowedLocality) match {
            case Some((index, taskLocality)) => {
              // Found a task; do some bookkeeping and return a task description
              val task = tasks(index)
              val taskId = sched.newTaskId()
              // Figure out whether this should count as a preferred launch
              logInfo("Starting task %s:%d as TID %s on executor %s: %s (%s)".format(
                taskSet.id, index, taskId, execId, host, taskLocality))
              // Do various bookkeeping
              copiesRunning(index) += 1
              val info = new TaskInfo(taskId, index, curTime, execId, host, taskLocality)
              taskInfos(taskId) = info
              taskAttempts(index) = info :: taskAttempts(index)
              // Update our locality level for delay scheduling
              currentLocalityIndex = getLocalityIndex(taskLocality)
              lastLaunchTime = curTime
              // Serialize and return the task
              val startTime = clock.getTime()
              // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
              // we assume the task can be serialized without exceptions.
              val serializedTask = Task.serializeWithDependencies(
                task, sched.sc.addedFiles, sched.sc.addedJars, ser)
              val timeTaken = clock.getTime() - startTime
              logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
                taskSet.id, index, serializedTask.limit, timeTaken))
              val taskName = "task %s:%d".format(taskSet.id, index)
              sched.dagScheduler.taskStarted(task, info)
              return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask))
            case _ =>


    val serializedTask = Task.serializeWithDependencies(
                task, sched.sc.addedFiles, sched.sc.addedJars, ser)
              val timeTaken = clock.getTime() - startTime


    // Launch tasks returned by a set of resource offers
        def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
          for (task <- tasks.flatten) {
            freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
            executorActor(task.executorId) ! LaunchTask(task)



