zoukankan      html  css  js  c++  java
  • spark 笔记 10: TaskScheduler相关

    任务调度器的接口类。应用程序可以定制自己的调度器来执行。当前spark只实现了一个任务调度器TaskSchedulerImpl

    ===================task scheduler begin====================
    -> TaskSchedulerImpl::submitTasks(taskSet: TaskSet)  处理接受task,它做了同步操作。
    -> new TaskSetManager(this, taskSet, maxTaskFailures) 每个taskSet都会对应 一个TaskSetManager接管任务调度
    ->activeTaskSets(taskSet.id) = manager  记录为active taskSet
    ->schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)  --将taskManeger提交到调度器。
    -> starvationTimer.scheduleAtFixedRate(  --起一个防饿死的定时器,默认15s后提示错误
    ->new TimerTask()
    ->backend.reviveOffers()  backend=(CoarseGrainedSchedulerBackend) --通知后台调度器,现在需要资源
    ->driverActor ! ReviveOffers
    ->makeOffers
      ->tasks = scheduler.resourceOffers(offers: Seq[WorkerOffer])-- 将所有资源作为参数
    返回值Seq[Seq[TaskDescription]]为每个executor要执行的task列表
    ->val shuffledOffers = Random.shuffle(offers) --将所有资源随机排列后使用
    ->val sortedTaskSets = rootPool.getSortedTaskSetQueue --按优先级顺序获取要执行的taskSet序列
    ->schedulableQueue.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator) 
    将所有taskSet按优先级排列,后面专门再看排列的算法
    ->for (taskSet <- sortedTaskSets) 
    ->taskSet.executorAdded()
    ->for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) 
    ->until (launchedTask) --每个taskSet只要得到一个资源,就不再获取资源了
    ->for (i <- 0 until shuffledOffers.size) --查找所有可用的资源
    ->for (task <- taskSet.resourceOffer(execId, host, maxLocality)) --对taskSet给出的所有task,都记录为需要执行
    ->launchTasks(tasks: scala.Seq[scala.Seq[TaskDescription]]) --Launch tasks returned by a set of resource offers
    ->for (task <- tasks.flatten)
    ->//如果task过大(默认10m),直接失败退出)
    ->executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask)) --CoarseGrainedSchedulerBackend 通知executor
                                          给调度到task的executor发送task,由Executor接管task的执行。到此,离最后的task.run()不远了~~
      ==================end================================

    调度的过程和常理有点不同:我们一般认为是拿task去寻找合适的资源,但是spark中是拿资源去寻找合适的task。这可能是因为task(由RDD分解而来)是不变的,但是资源是不断变化的缘故。
    TaskScheduler :调度接口,当前只有TaskSchedulerImpl实现了。应用可以提供自己的调度器
    /**
    * Low-level task scheduler interface, currently implemented exclusively by TaskSchedulerImpl.
    * This interface allows plugging in different task schedulers. Each TaskScheduler schedulers tasks
    * for a single SparkContext
    . These schedulers get sets of tasks submitted to them from the
    * DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running
    * them, retrying if there are failures, and mitigating stragglers. They return events to the
    * DAGScheduler
    .
    */
    private[spark] trait TaskScheduler {

    def rootPool: Pool
    def schedulingMode: SchedulingMode
    def start(): Unit
    // Invoked after system has successfully initialized (typically in spark context).
    // Yarn uses this to bootstrap allocation of resources based on preferred locations,
    // wait for slave registerations, etc.
    def postStartHook() { }
    // Disconnect from the cluster.
    def stop(): Unit
    // Submit a sequence of tasks to run.
    def submitTasks(taskSet: TaskSet): Unit
    // Cancel a stage.
    def cancelTasks(stageId: Int, interruptThread: Boolean)
    // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
    def setDAGScheduler(dagScheduler: DAGScheduler): Unit
    // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
    def defaultParallelism(): Int
    /**
    * Update metrics for in-progress tasks and let the master know that the BlockManager is still
    * alive. Return true if the driver knows about the given block manager. Otherwise, return false,
    * indicating that the block manager should re-register.
    */
    def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
    blockManagerId: BlockManagerId): Boolean
    }

    /**
    * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
    * It can also work with a local setup by using a LocalBackend and setting isLocal to true.
    * It handles common logic, like determining a scheduling order across jobs, waking up to launch
    * speculative tasks, etc
    .
    *
    * Clients should first call initialize() and start(), then submit task sets through the
    * runTasks method.
    *    
    * THREADING: SchedulerBackends and task-submitting clients can call this class from multiple
    * threads, so it needs locks in public API methods to maintain its state. In addition, some
    * SchedulerBackends synchronize on themselves when they want to send events here, and then
    * acquire a lock on us, so we need to make sure that we don't try to lock the backend while
    * we are holding a lock on ourselves.
    */
    private[spark] class TaskSchedulerImpl(
    val sc: SparkContext,
    val maxTaskFailures: Int,
    isLocal: Boolean = false)
    extends TaskScheduler with Logging
    主要属性:
    // TaskSetManagers are not thread safe, so any access to one should be synchronized
    // on this class.
    val activeTaskSets = new HashMap[String, TaskSetManager]

    val taskIdToTaskSetId = new HashMap[Long, String]
    val taskIdToExecutorId = new HashMap[Long, String]
    // Which executor IDs we have executors on
    val activeExecutorIds = new HashSet[String]

    // The set of executors we have on each host; this is used to compute hostsAlive, which
    // in turn is used to decide when we can attain data locality on a given host
    protected val executorsByHost = new HashMap[String, HashSet[String]]

    protected val hostsByRack = new HashMap[String, HashSet[String]]

    protected val executorIdToHost = new HashMap[String, String]
    // Listener object to pass upcalls into
    var dagScheduler: DAGScheduler = null

    var backend: SchedulerBackend = null

    val mapOutputTracker = SparkEnv.get.mapOutputTracker

    var schedulableBuilder: SchedulableBuilder = null
    var rootPool: Pool = null
    初始化不同的调度算法
    def initialize(backend: SchedulerBackend) {
    this.backend = backend
    // temporarily set rootPool name to empty
    rootPool = new Pool("", schedulingMode, 0, 0)
    schedulableBuilder = {
    schedulingMode match {
    case SchedulingMode.FIFO =>
    new FIFOSchedulableBuilder(rootPool)
    case SchedulingMode.FAIR =>
    new FairSchedulableBuilder(rootPool, conf)
    }
    }
    schedulableBuilder.buildPools()
    }
    override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
    val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
    activeTaskSets(taskSet.id) = manager
    schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

    if (!isLocal && !hasReceivedTask) {
    starvationTimer.
    scheduleAtFixedRate(new TimerTask() {
    override def run() {
    if (!hasLaunchedTask) {
    logWarning(
    "Initial job has not accepted any resources; " +
    "check your cluster UI to ensure that workers are registered " +
    "and have sufficient memory")
    }
    else {
    this.cancel()
    }
    }
    }
    , STARVATION_TIMEOUT, STARVATION_TIMEOUT)
    }
    hasReceivedTask = true
    }
    backend.reviveOffers()
    }
    /**
    * Called by cluster manager to offer resources on slaves. We respond by asking our active task
    * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
    * that tasks are balanced across the cluster.
    */
    def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {

    TaskSetManager和TaskSet是意义对应的,它们可以等同看待resourceOffer是他的主要功能:根据输入的资源,给出taskSet需要执行的task列表。
    /**
    * Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of
    * each task, retries tasks if they fail (up to a limited number of times), and
    * handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces
    * to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node,
    * and statusUpdate, which tells it that one of its tasks changed state (e.g. finished).
    *
    * THREADING: This class is designed to only be called from code with a lock on the
    * TaskScheduler (e.g. its event handlers). It should not be called from other threads.
    *
    * @param sched the TaskSchedulerImpl associated with the TaskSetManager
    * @param taskSet the TaskSet to manage scheduling for
    * @param maxTaskFailures if any particular task fails more than this number of times, the entire
    * task set will be aborted
    */
    private[spark] class TaskSetManager(
    sched: TaskSchedulerImpl
    ,
    val taskSet: TaskSet,
    val maxTaskFailures: Int,
    clock: Clock = SystemClock)
    extends Schedulable with Logging {
    val copiesRunning = new Array[Int](numTasks)
    val successful = new Array[Boolean](numTasks)
    private val numFailures = new Array[Int](numTasks)
    // key is taskId, value is a Map of executor id to when it failed
    private val failedExecutors = new HashMap[Int, HashMap[String, Long]]()

    val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
    var tasksSuccessful = 0
    // Set of pending tasks for each executor. These collections are actually
    // treated as stacks, in which new tasks are added to the end of the
    // ArrayBuffer and removed from the end. This makes it faster to detect
    // tasks that repeatedly fail because whenever a task failed, it is put
    // back at the head of the stack. They are also only cleaned up lazily;
    // when a task is launched, it remains in all the pending lists except
    // the one that it was launched from, but gets removed from them later.
    private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]

    // Set of pending tasks for each host. Similar to pendingTasksForExecutor,
    // but at host level.
    private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]

    // Set of pending tasks for each rack -- similar to the above.
    private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]

    // Set containing pending tasks with no locality preferences.
    var pendingTasksWithNoPrefs = new ArrayBuffer[Int]

    // Set containing all pending tasks (also used as a stack, as above).
    val allPendingTasks = new ArrayBuffer[Int]

    // Tasks that can be speculated. Since these will be a small fraction of total
    // tasks, we'll just hold them in a HashSet.
    val speculatableTasks = new HashSet[Int]

    // Task index, start and finish time for each task attempt (indexed by task ID)
    val taskInfos = new HashMap[Long, TaskInfo]
    // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling
    var myLocalityLevels = computeValidLocalityLevels()
    var localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level

    // Delay scheduling variables: we keep track of our current locality level and the time we
    // last launched a task at that level, and move up a level when localityWaits[curLevel] expires.
    // We then move down if we manage to launch a "more local" task.
    var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels
    var lastLaunchTime = clock.getTime() // Time we last launched a task at this level
    /**
    * Return a speculative task for a given executor if any are available. The task should not have
    * an attempt running on this host, in case the host is slow. In addition, the task should meet
    * the given locality constraint.
    */
    private def findSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value)
    : Option[(Int, TaskLocality.Value)] =
    调度器眼中的executor: 调度的过程和常理有点不同:我们一般认为是拿task去寻找合适的资源,但是spark中是拿资源去寻找合适的task。这可能是因为task(由RDD分解而来)是不变的,但是资源是不断变化的缘故。
    /**
     * Respond to an offer of a single executor from the scheduler by finding a task
    *
    * NOTE: this function is either called with a maxLocality which
    * would be adjusted by delay scheduling algorithm or it will be with a special
    * NO_PREF locality which will be not modified
    *
    * @param execId the executor Id of the offered resource
    * @param host the host Id of the offered resource
    * @param maxLocality the maximum locality we want to schedule the tasks at
    */
    def resourceOffer(
    execId:
    String,
    host: String,
    maxLocality: TaskLocality.TaskLocality)
    : Option[TaskDescription] =
    {

    SchedulableBuilder定义了FIFO/FAIR两种调度算法。他们都是对Pool的封装,所以重点是Pool
    /**
    * An interface to build Schedulable tree
    * buildPools: build the tree nodes(pools)
    * addTaskSetManager: build the leaf nodes(TaskSetManagers)
    */
    private[spark] trait SchedulableBuilder {
    def rootPool: Pool

    def buildPools()

    def addTaskSetManager(manager: Schedulable, properties: Properties)
    }
    private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
    private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
    extends SchedulableBuilder with Logging {
    Pool: 对应文档中调度的group。每个Pool可以设置独立的调度算法。
    /**
    * An Schedulable entity that represent collection of Pools or TaskSetManagers
    */

    private[spark] class Pool(
    val poolName: String,
    val schedulingMode: SchedulingMode,
    initMinShare: Int,
    initWeight: Int)
    extends Schedulable
    with Logging {
    val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
    val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]

    Schedulable: 调度器操作的对象需要实现这个接口,也就是实现这个接口的类都可以交给调度器根据优先级调度。就像Task对应executor一样。getSortedTaskSetQueue是主要方法,它按优先级顺序将task返回给外层调度器。
    /**
    * An interface for schedulable entities.
    * there are two type of Schedulable entities(Pools and TaskSetManagers)
    */
    private[spark] trait Schedulable {
    var parent: Pool
    // child queues
    def schedulableQueue: ConcurrentLinkedQueue[Schedulable]
    def schedulingMode: SchedulingMode
    def weight: Int
    def minShare: Int
    def runningTasks: Int
    def priority: Int
    def stageId: Int
    def name: String

    def addSchedulable(schedulable: Schedulable): Unit
    def removeSchedulable(schedulable: Schedulable): Unit
    def getSchedulableByName(name: String): Schedulable
    def executorLost(executorId: String, host: String): Unit
    def checkSpeculatableTasks(): Boolean
    def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
    }

    CoarseGrainedSchedulerBackend:它是屏蔽各种资源管理算法的抽象。local、standalone、yarn-client等都是它的子类。而spark内部仅与CoarseGrainedSchedulerBackend交互,所以说他起了屏蔽左右。
    /**
    * A scheduler backend that waits for coarse grained executors to connect to it through Akka.
    * This backend holds onto each executor for the duration of the Spark job rather than relinquishing
    * executors whenever a task is done and asking the scheduler to launch a new executor for
    * each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the
    * coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode
    * (spark.deploy.*).
    */
    private[spark]
    class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem)
    extends SchedulerBackend with Logging
    主要属性:
    // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
    var totalCoreCount = new AtomicInteger(0)
    var totalRegisteredExecutors = new AtomicInteger(0)
    val conf = scheduler.sc.conf
    private val timeout = AkkaUtils.askTimeout(conf)
    private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
    // Submit tasks only after (registered resources / total expected resources)
    // is equal to at least this value, that is double between 0 and 1.
    var minRegisteredRatio =
    math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
    // Submit tasks after maxRegisteredWaitingTime milliseconds
    // if minRegisteredRatio has not yet been reached
    val maxRegisteredWaitingTime =
    conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
    val createTime = System.currentTimeMillis()

    class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive {

    override protected def log = CoarseGrainedSchedulerBackend.this.log

    private val executorActor = new HashMap[String, ActorRef]
    private val executorAddress = new HashMap[String, Address]
    private val executorHost = new HashMap[String, String]
    private val freeCores = new HashMap[String, Int]
    private val totalCores = new HashMap[String, Int]
    private val addressToExecutorId = new HashMap[Address, String]













  • 相关阅读:
    前后台验证字符串长度
    接口和抽象类该什么时候用?
    程序员常去网站汇总
    SQLServer复合查询条件(AND,OR,NOT)对NULL值的处理方法
    c#-轮询算法
    常用的SQL语句
    HTTP请求工具类
    asp.net mvc jQuery 城市二级联动
    ibatis动态多条件查询及模糊查询(oracle,mysql,sql)
    iBatis 中 Like 的写法实现模糊查询
  • 原文地址:https://www.cnblogs.com/zwCHAN/p/4246919.html
Copyright © 2011-2022 走看看