zoukankan      html  css  js  c++  java
  • 【原】 Spark中Task的提交源码解读

    版权声明:本文为原创文章,未经允许不得转载。
    复习内容:
    Spark中Stage的提交 http://www.cnblogs.com/yourarebest/p/5356769.html

    Spark中Task的提交

    1.在复习内容部分我们介绍了在方法onStageSubmitted中,Stage的提交,那么在该方法中还有Task的提交,如下所示:

    override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
    //(1)Stage的提交,详见文章-Spark中Task的提交
    //(2)Task的提交
    //broadcasted task的二进制,用来分发tasks给executors。
    //注意:我们broadcast RDD的拷贝并且对于每一个task我们将要反序列化,这意味着每个task得到一个不同的RDD 拷贝
    var taskBinary: Broadcast[Array[Byte]] = null
    try {
    // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
    // For ResultTask, serialize and broadcast (rdd, func).
    val taskBinaryBytes: Array[Byte] = stage match {
    case stage: ShuffleMapStage =>
    closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
    case stage: ResultStage =>
    closureSerializer.serialize((stage.rdd, stage.func): AnyRef).array()
    }
    //将序列化后的task广播出去
    taskBinary = sc.broadcast(taskBinaryBytes)
    } catch {
    case e: NotSerializableException =>
    abortStage(stage, "Task not serializable: " + e.toString, Some(e))
    runningStages -= stage
    return
    case NonFatal(e) =>
    abortStage(stage, s"Task serialization failed: $e ${e.getStackTraceString}", Some(e))
    runningStages -= stage
    return
    }
    //根据stage生成tasks
    val tasks: Seq[Task[]] = try {
    stage match {
    //对于ShuffleMapStages生成ShuffleMapTask
    case stage: ShuffleMapStage =>
    partitionsToCompute.map { id =>
    val locs = taskIdToLocations(id)
    val part = stage.rdd.partitions(id)
    //可见一个partition,一个task,一个位置信息
    new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
    taskBinary, part, locs, stage.internalAccumulators)
    }
    //对于ResultStage生成ResultTask
    case stage: ResultStage =>
    val job = stage.resultOfJob.get
    partitionsToCompute.map { id =>
    val p: Int = stage.partitions(id)
    val part = stage.rdd.partitions(p)
    val locs = taskIdToLocations(id)
    new ResultTask(stage.id, stage.latestInfo.attemptId,
    taskBinary, part, locs, id, stage.internalAccumulators)
    }
    }
    } catch {
    case NonFatal(e) =>
    abortStage(stage, s"Task creation failed: $e ${e.getStackTraceString}", Some(e))
    runningStages -= stage
    return
    }
    //如果tasks的num大于0
    if (tasks.size > 0) {
    logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
    stage.pendingPartitions ++= tasks.map(
    .partitionId)
    logDebug("New pending partitions: " + stage.pendingPartitions)
    //调用taskScheduler提交TaskSet,详见2
    taskScheduler.submitTasks(new TaskSet(
    tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
    stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    } else {
    //因为我们之前就已经发送了事件SparkListenerStageSubmitted,所以我们标记Stage为completed防止没有任务提交
    markStageAsFinished(stage, None)
    //将debugString记录到日志中
    val debugString = stage match {
    case stage: ShuffleMapStage =>
    s"Stage ${stage} is actually done; " +
    s"(available: ${stage.isAvailable}," +
    s"available outputs: ${stage.numAvailableOutputs}," +
    s"partitions: ${stage.numPartitions})"
    case stage : ResultStage =>
    s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
    }
    logDebug(debugString)
    }
    }

    2.Task的提交会调用taskScheduler的submitTasks方法进行,TaskScheduler是trait,它的唯一的具体实现是TaskSchedulerImpl,submitTasks方法如下所示:

    override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
    //为一个taskSet创建一个TaskSetManager
    val manager = createTaskSetManager(taskSet, maxTaskFailures)
    val stage = taskSet.stageId
    val stageTaskSets =
    taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
    stageTaskSets(taskSet.stageAttemptId) = manager
    val conflictingTaskSet = stageTaskSets.exists { case (, ts) =>
    ts.taskSet != taskSet && !ts.isZombie
    }
    if (conflictingTaskSet) {
    throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
    s" ${stageTaskSets.toSeq.map{
    ._2.taskSet.id}.mkString(",")}")
    }
    //将taskSetManager和taskSet添加到两种可调度的tree中,FIFO or FAIR
    schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
    if (!isLocal && !hasReceivedTask) {
    //一个定时器
    starvationTimer.scheduleAtFixedRate(new TimerTask() {
    override def run() {
    if (!hasLaunchedTask) {
    logWarning("Initial job has not accepted any resources; " +
    "check your cluster UI to ensure that workers are registered " +
    "and have sufficient resources")
    } else {
    this.cancel()
    }
    }
    }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
    }
    hasReceivedTask = true
    }
    //不同(集群)模式进行资源的分配
    backend.reviveOffers()
    }

    这样我们就完成了Task的提交,那么不同模式对于Task的资源又是如何分配的呢,我们后面介绍。

  • 相关阅读:
    Docker安装ngnix进行挂载
    Linux上传下载小工具
    uniapp——原生导航栏
    uniapp——scroll-view组件隐藏滚动条
    边框阴影——box-shadow
    uniapp-监听自定义跳转
    uniapp整屏滑动
    用伪类写圆点css
    Vue——生命周期
    uniapp多选按钮
  • 原文地址:https://www.cnblogs.com/yourarebest/p/5423906.html
Copyright © 2011-2022 走看看