zoukankan      html  css  js  c++  java
  • spark 笔记 9: Task/TaskContext

    DAGScheduler最终创建了task set,并提交给了taskScheduler。那先得看看task是怎么定义和执行的。
    Task是execution执行的一个单元。

    Task: executor执行的基本单元,也是spark操作的最小单位。和java executor的task基本上是相同含义的。
    /**
    * A unit of execution. We have two kinds of Task's in Spark:
    * - [[org.apache.spark.scheduler.ShuffleMapTask]]
    * - [[org.apache.spark.scheduler.ResultTask]]
    *
    * A Spark job consists of one or more stages. The very last stage in a job consists of multiple
    * ResultTasks, while earlier stages consist of ShuffleMapTasks. A ResultTask executes the task
    * and sends the task output back to the driver application
    . A ShuffleMapTask executes the task
    * and divides the task output to multiple buckets
    (based on the task's partitioner).
    *
    * @param stageId id of the stage this task belongs to
    * @param partitionId index of the number in the RDD
    */
    private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable {
    主要属性:
    final def run(attemptId: Long): T = {
    context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)
    context.taskMetrics.hostname = Utils.localHostName()
    taskThread = Thread.currentThread()
    if (_killed) {
    kill(interruptThread = false)
    }
    runTask(context)
    def runTask(context: TaskContext): T
    // Map output tracker epoch. Will be set by TaskScheduler.
    var epoch: Long = -1

    var metrics: Option[TaskMetrics] = None

    // Task context, to be initialized in run().
    @transient protected var context: TaskContext = _
    // The actual Thread on which the task is running, if any. Initialized in run().
    @volatile @transient private var taskThread: Thread = _

    /**
    * Handles transmission of tasks and their dependencies, because this can be slightly tricky. We
    * need to send the list of JARs and files added to the SparkContext with each task to ensure that
    * worker nodes find out about it, but we can't make it part of the Task because the user's code in
    * the task might depend on one of the JARs. Thus we serialize each task as multiple objects, by
    * first writing out its dependencies.
    */
    private[spark] object Task {
    /**
    * Serialize a task and the current app dependencies (files and JARs added to the SparkContext)
    */
    def serializeWithDependencies(
    /**
    * Deserialize the list of dependencies in a task serialized with serializeWithDependencies,
    * and return the task itself as a serialized ByteBuffer. The caller can then update its
    * ClassLoaders and deserialize the task.
    *
    * @return (taskFiles, taskJars, taskBytes)
    */
    def deserializeWithDependencies(serializedTask: ByteBuffer)
    ShuffleMapTask: 它是对应于transformation操作的task,主要更能是解决提供action操作所需要的数据。依旧是它是被action依赖的task,需要提前执行。
    /**
    * A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner
    * specified in the ShuffleDependency).
    *
    * See [[org.apache.spark.scheduler.Task]] for more information.
    *
    * @param stageId id of the stage this task belongs to
    * @param taskBinary broadcast version of of the RDD and the ShuffleDependency. Once deserialized,
    * the type should be (RDD[_], ShuffleDependency[_, _, _]).
    * @param partition partition of the RDD this task is associated with
    * @param locs preferred task execution locations for locality scheduling
    */
    private[spark] class ShuffleMapTask(
    stageId:
    Int,
    taskBinary: Broadcast[Array[Byte]],
    partition: Partition,
    @transient private var locs: Seq[TaskLocation])
    extends Task[MapStatus](stageId, partition.index) with Logging {
    override def runTask(context: TaskContext): MapStatus = {
    // Deserialize the RDD using the broadcast variable.
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
    ByteBuffer.
    wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

    metrics = Some(context.taskMetrics)
    var writer: ShuffleWriter[Any, Any] = null
    try {
    val manager = SparkEnv.get.shuffleManager
    writer = manager.getWriter[Any
    , Any](dep.shuffleHandle, partitionId, context)
    writer.write(
    rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    return writer.stop(success = true).get --HashShuffleWriter
    } catch {
    case e: Exception =>
    if (writer != null) {
    writer.stop(success =
    false)
    }
    throw e
    }
    finally {
    context.markTaskCom pleted()
    }
    }
    ResultTask: 它是与action操作对应的,也就是依赖树的叶子节点上。
    /**
    * A task that sends back the output to the driver application.
    *
    * See [[Task]] for more information.
    *
    * @param stageId id of the stage this task belongs to
    * @param taskBinary broadcasted version of the serialized RDD and the function to apply on each
    * partition of the given RDD. Once deserialized, the type should be
    * (RDD[T], (TaskContext, Iterator[T]) => U).
    * @param partition partition of the RDD this task is associated with
    * @param locs preferred task execution locations for locality scheduling
    * @param outputId index of the task in this job (a job can launch tasks on only a subset of the
    * input RDD's partitions).
    */
    private[spark] class ResultTask[T, U](
    stageId:
    Int,
    taskBinary: Broadcast[Array[Byte]],
    partition: Partition,
    @transient locs: Seq[TaskLocation],
    val outputId: Int)
    extends Task[U](stageId, partition.index) with Serializable {
    override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
    ByteBuffer.
    wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

    metrics = Some(context.taskMetrics)
    try {
    func(context, rdd.iterator(partition, context))
    }
    finally {
    context.markTaskCompleted()
    }
    }











  • 相关阅读:
    零散的学习总结
    JSON学习整理
    轮播图
    关于new Object的小结
    js函数声明和函数表达式的区别
    float小结
    DOM文档加载步骤
    css主要的浏览器兼容性问题
    js for循环小记
    CANVAS中的lineWidth小计
  • 原文地址:https://www.cnblogs.com/zwCHAN/p/4245302.html
Copyright © 2011-2022 走看看