zoukankan      html  css  js  c++  java
  • <Yarn> <Capacity Scheduler> <Source Code>

    Yarn capacity scheduler

    • 首先要知道, [Attention: RM有两个组件,其中Scheduler完全就只是负责资源的分配;ApplicationsManager则负责接受application,选取ApplicationMaster,监控重启AM。]
    • CapacityScheduler的优点就是灵活,集群的利用率高;缺点也是由其灵活性造成的,因为CapacityScheduler不支持抢占式调度,必须等上一个任务主动释放资源。
    • 可以看出,只要提交的app数没有达到上限,就能够一直提交app到集群,只是这些app会处于accepted的状态,一直pending,直到ResourceManager给它分配资源。
    • 因此Scheduler实际上是会不断地schedule,然后分配资源给那些集群,在具体分配的时候,在queue内部可能会考虑优先级,然后结合FIFO。

    AsyncScheduleThread

    • 在CapacityScheduler内部有个AsyncScheduleThread 这个异步Scheduler线程会不断地调schedule(cs)方法。

    schedule(cs)

    • // Schedule on all nodes by starting at a random point.
    • static void schedule(CapacityScheduler cs) 
      • // first randomize the start point
      • Collection<FiCaSchedulerNode> nodes = cs.getAllNodes().values()    // get all the nodes in the cluster
      • for each node: cs.allocateContainersToNode(node)
        • // assign new containers...  1. check for reserved apps(for reservation see here and here. )   2. schedule if there are no reservations
        • if reservedContainer != null
          • get the reserved apps according to reservedContainers
          • // try to fulfill the reservation
          • LeafQueue queue = ((LeafQueue) reservedApplication.getQueue();
          • CSAssignment assignment = queue.assignContainers(clusterResource, node, false); // assignContainers(Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve);
            • // if our queue cannot access this node, just return
            • // check for reserved resources
          • TBD...
        • // try to schedule more if there are no reservations to fulfill
        • if (node.getReservedContainer() == null)
          • if (calculator.computeAvailableContainers(node.getAvailableResource(), minimumAllocation) > 0), then  // computeAvailableContainers(Resource available, Resource required), as for DominantResourceCalculator, return the min ratio of mem & vcores.
            • assignContainers(clusterResource, node, false)   // assignContainers(Resource clusterResource, FiCaSchedulerNode node)
              • // if our queue cannot access this node, just return
              • // check for reserved resources    // TBD...
              • // try to assign containers to apps in order
              • for (FoCaSchedulerApp application: activeApplications) 
                • if (SchedulerAppUtils.isBlacklisted(application, node, LOG) // check if this resource is on blacklist, i.e. cannot run the app in this node/rack
                • // schedule in priority order, this is the priority of the resourceRequest of this app
                • for (Priority priority : application.getPriorities())
                  • ResourceRequest anyRequest = application.getResourceRequest(priority, ResourceRequest.ANY)
                  • Resource required = anyRequest.getCapacity()
                  • Set<String> requestedNodeLabels = getRequestLabelSetByExpression(anyRequest.getNodeLabelExpression());
                  • // compute user-limit & set headroom
                  • Resource userLimit = computeUserLimitAndSetHeadroom(application, clusterResource, required, requestedNodeLabels)
                    • // compute user limit respect requested labels
                    • // TODO: need consider headroom respect labels also
                    • Resource userLimit = computeUserLimit(application, clusterResource, required, queueUser, requestedLabels)
                      • // our current capacity: equal to the max(required, queue-capacity) if we're running below capacity, equal to (usedResources + required) if running over capacity.
                      • // if we have labels to request(choose to use the first one).
                      • // else if no label on request, just use absolute capacity as capacity for nodes without label.
                      • // TBD...
                  • // max avail capacity needs to take into account usage by ancestor-siblings which are greater than their base 
                  • // calculate absoluteMaxAvailCapacity: my max avail is min(my max capacity, unused from my parent by my siblings if they are beyond their base capacity)
                  • // then calculate queueMaxCap using absoluteMaxAvailCapacity
                  • // check canAssignToThisQueue
                    • // consider the intersection of queue-canAccessLabels and node-labels, if any of the label beyond queue limit, we cannot allocate on this node.
                    • // check user limit
                    • application.addSchedulingOpportunity(priority);
                    • // try to schedule...
                    • TBD...
    • FYI:
      /** 
      * Headroom is:
      * min(
      * min(userLimit, queueMaxCap) - userConsumed,
      * queueMaxCap - queueUsedResources
      * )
      *
      * ( which can be expressed as,
      * min (userLimit - userConsumed, queuMaxCap - userConsumed,
      * queueMaxCap - queueUsedResources)
      * )
      *
      * given that queueUsedResources >= userConsumed, this simplifies to
      *
      * >> min (userlimit - userConsumed, queueMaxCap - queueUsedResources) <<
      *
      */

    addApplication

    首先在CapacityScheduler随意找了个方法

    synchronized addApplication(ApplicationAttemptedId applicationAttemptId, String queueName, String user)

    • sanity check
      • queue == null
      • !queue instanceof LeafQueue
    • Represents an application from the viewpoint of the scheduler. (Each running app in the RM corresponds to one instance of the FiCaScheduler class)
      • FiCaSchedulerApp SchedulerApp = new FiCaSchedulerApp(applicationAttemptId, user, queue, queue.getActiveUserManager(), rmContext);
        • ActiveUsersManager tracks users in the system. (An active user is defined as someone with outstanding resource requests.)
        • rmContext is the context of the RM.
    • submit to the queue
      • try: queue.submitApplication(SchedulerApp, user, queueName)
        • check queue ACLs
        • synchronized(this) 
          • check if the queue is accepting jobs: if (getState() != QueueState.RUNNING) throw Exception
          • check submission limits for queues:
            • if (getNumApplications() >= getMaxApplications()) throw Exception
          • check submission limits for the user on this queue
          • addApplication(applictaion, user)
            • user.submitApplication() : pendingApp ++; // accepted
            • activateApplications():
              • for each pending apps: 
                • check queue limit & user limit again (same as above)
                • activateApplication(): --pendingApp; ++activeApp;
        • metrics.submitApp(userName, attempId): // each queue has a metrics which is an instance of QueueMetrics
          • update metrics: appsSubmitted, appsFailed, appsPending
          • if (parent != null) parent.submitApp(user, attemptId)   // to inform the parents recursively

    以上,可以看到在addApplication方法内主要是判断了ACL和appNum的上限,没有resource相关的分配和判断。资源(container)的分配是由相应的applicationMaster向Resourcemanager统一请求的。ResourceRequest使用protobuf。

    • 用户提交应用程序 --> ResourceManager   --> ACL等检查 -->  app accepted.
    • 一旦Scheduler有足够的资源可以满足需求  -->  app由accepted转成running  -->  RM为ApplicationMaster分配一个container,并负责在节点上拉起它。
    • AM是每个用户作业的主进程,负责管理作业生命周期,包括动态地增加or减少资源(container),管理执行流程,处理故障和计算偏差。 

    Yarn Queues

  • 相关阅读:
    线程安全与可重入编写方法
    新手MySQL工程师必备命令速查手册
    分布式之数据库和缓存双写一致性方案解析
    在java代码中用xslt处理xml文件
    Java并发编程之并发代码设计
    Java多线程和并发基础
    Java多线程面试大全
    springboot获取URL请求参数的多种方式
    JAVA文件转换为Base64
    Silver Cow Party
  • 原文地址:https://www.cnblogs.com/wttttt/p/7554463.html
Copyright © 2011-2022 走看看