zoukankan      html  css  js  c++  java
  • Hadoop 三大调度器源码分析及编写自己的调度器

    如要转载,请注上作者和出处。  由于能力有限,如有错误,请大家指正。

    须知: 我们下载的是hadoop-2.7.3-src 源码。 这个版本默认调度器是Capacity调度器。 在2.0.2-alpha版本的时候,有人汇报了一个fifo调度器的bug,社区把默认调度器从原来的fifo切换成capacity了。  参考   

      在Hadoop中,调度器是一个可插拔的模块,用户可以根据自己的实际应用要求设计调度器,然后在配置文件中指定相应的调度器,这样,当Hadoop集群启动时,便会加载该调度器。当前Hadoop自带了几种调度器,分别是FIFO(默认调度器),Capacity Scheduler和FairScheduler,通常境况下,这些调度器很难满足公司复杂的应用需求,因而往往需要开发自己的调度器。本文介绍了Hadoop调度器的基本编写方法,  参考1    

      Hadoop1 调度框架:Hadoop的调度器是在JobTracker中加载和调用的,用户可以在配置文件mapred-site.xml中的mapred.jobtracker.taskScheduler属性中指定调度器。本节分析了Hadoop调度器的调度框架,实际上分析了两个重要类:TaskScheduler和JobTracker的关系。

      Hadoop2 调度框架:Hadoop的调度器是在ResourceManager中加载和调用的,用户可以在配置文件yarn-site.xml中的yarn.resourcemanager.scheduler.class属性中指定调度器,默认是 org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler ; 还可以配置Fifo调度器,org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler ; 还可以配置Fair调度器, org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler 。 本节分析了Hadoop调度器的调度框架, 类比Hadoop1 , 三个调度器的共同扩展类是 AbstractYarnScheduler <T extends SchedulerApplicationAttempt, N extends SchedulerNode> , 它的功能类似Hadoop1的TaskScheduler ; 如果用户要编写自己的调度器,需要继承抽象类AbstractYarnScheduler

      MapReduce在Hadoop2中称为MR2或YARN,将JobTracker中的资源管理及任务生命周期管理(包括定时触发及监控),拆分成两个独立的服务,用于管理全部资源的ResourceManager以及管理每个应用的ApplicationMaster,ResourceManager用于管理向应用程序分配计算资源,每个ApplicationMaster用于管理应用程序、调度以及协调。一个应用程序可以是经典的MapReduce架构中的一个单独的任务,也可以是这些任务的一个DAG(有向无环图)任务。ResourceManager及每台机上的NodeManager服务,用于管理那台机的用户进程,形成计算架构。每个应用程序的ApplicationMaster实际上是一个框架具体库,并负责从ResourceManager中协调资源及与NodeManager(s)协作执行并监控任务。  参考2

      针对Hadoop 1.0中的MapReduce在扩展性和多框架支持等方面的不足,它将JobTracker中的资源管理和作业控制功能分开,分别由组件ResourceManager和ApplicationMaster实现,其中,ResourceManager负责所有应用程序的资源分配,而ApplicationMaster仅负责管理一个应用程序,进而诞生了全新的通用资源管理框架YARN。基于YARN,用户可以运行各种类型的应用程序(不再像1.0那样仅局限于MapReduce一类应用),从离线计算的MapReduce到在线计算(流式处理)的Storm等。Hadoop 2.0对应Hadoop版本为Apache Hadoop 0.23.x、2.x和CDH4。

    架构图:

    其中ResourceManager包含两个主要的组件:定时调用器(Scheduler)以及应用管理器(ApplicationManager)。

    定时调用器(Scheduler): 定时调度器负责向应用程序分配置资源,它不做监控以及应用程序的状 态跟踪,并且它不保证会重启由于应用程序本身或硬件出错而执行失败 的应用程序。

    应用管理器(ApplicationManager): 应用程序管理器负责接收新任务,协调并提供在ApplicationMaster容 器失败时的重启功能。

    节点管理器(NodeManager): NodeManager是ResourceManager在每台机器的上代理,负责容器的管 理,并监控他们的资源使用情况(cpu,内存,磁盘及网络等),以及向 ResourceManager/Scheduler提供这些资源使用报告。

    应用总管(ApplicationMaster): 每个应用程序的ApplicationMaster负责从Scheduler申请资源,以及 跟踪这些资源的使用情况以及任务进度的监控。

    1  调度器

    我们先想分析调度器,首先要分析它的父类,以及父类的父类和实现接口,如 AbstractService, YarnScheduler, ResourceScheduler 以及 AbstractYarnScheduler, 如下所示:

    AbstractService.java 在 hadoop-2.7.3-src/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/AbstractService.java 

    package org.apache.hadoop.service;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.classification.InterfaceAudience.Public;
    import org.apache.hadoop.classification.InterfaceStability.Evolving;
    import org.apache.hadoop.conf.Configuration;
    
    import com.google.common.annotations.VisibleForTesting;
    
    /**
     * This is the base implementation class for services.
     */
    //这是服务的基本实现类。
    @Public
    @Evolving
    public abstract class AbstractService implements Service {
    
      private static final Log LOG = LogFactory.getLog(AbstractService.class);
    
      /**
       * Service name.
       */
      //服务名称
      private final String name;
    
      /** service state */
      //服务状态
      private final ServiceStateModel stateModel;
    
      /**
       * Service start time. Will be zero until the service is started.
       */
      //服务开始时间。在服务开始之前为0。
      private long startTime;
    
      /**
       * The configuration. Will be null until the service is initialized.
       */
      //配置。在服务初始化之前为null。
      private volatile Configuration config;
    
      /**
       * List of state change listeners; it is final to ensure
       * that it will never be null.
       */
      //状态更改侦听器列表;最终确保它不为null。
      private final ServiceOperations.ServiceListeners listeners
        = new ServiceOperations.ServiceListeners();
      /**
       * Static listeners to all events across all services
       */
      //所有服务的所有事件的静态监听器
      private static ServiceOperations.ServiceListeners globalListeners
        = new ServiceOperations.ServiceListeners();
    
      /**
       * The cause of any failure -will be null.
       * if a service did not stop due to a failure.
       */
      //任何失败的原因 - 是因为null。 如果服务没有因为故障停止。
      private Exception failureCause;
    
      /**
       * the state in which the service was when it failed.
       * Only valid when the service is stopped due to a failure
       */
      //服务失败时的状态。仅当服务由于失败而停止时才有效。
      private STATE failureState = null;
    
      /**
       * object used to co-ordinate {@link #waitForServiceToStop(long)}
       * across threads.
       */
      //对象用于协调 {@link #waitForServiceToStop(long)} 跨线程。
      private final AtomicBoolean terminationNotification =
        new AtomicBoolean(false);
    
      /**
       * History of lifecycle transitions
       */
      //生命周期转换的历史
      private final List<LifecycleEvent> lifecycleHistory
        = new ArrayList<LifecycleEvent>(5);
    
      /**
       * Map of blocking dependencies
       */
      //阻止依赖关系的映射
      private final Map<String,String> blockerMap = new HashMap<String, String>();
    
      private final Object stateChangeLock = new Object();
     
      /**
       * Construct the service.
       * @param name service name
       */
      //构造服务
      public AbstractService(String name) {
        this.name = name;
        stateModel = new ServiceStateModel(name);
      }
    
      /*
       * 获取当前的服务状态。
       * 返回:服务的状态
       */
      @Override
      public final STATE getServiceState() {
        return stateModel.getState();
      }
    
      /*
       * 获取服务失败时引发的第一个异常。 如果为空,则不记录任何异常
       * 返回:在转换到停止状态期间日志记录的故障
       */
      @Override
      public final synchronized Throwable getFailureCause() {
        return failureCause;
      }
    
      /*
       * 获取发生在{@link #getFailureCause()}中失败的状态。
       * 返回:状态,如果没有失败,则为null
       */
      @Override
      public synchronized STATE getFailureState() {
        return failureState;
      }
    
      /**
       * Set the configuration for this service.
       * This method is called during {@link #init(Configuration)}
       * and should only be needed if for some reason a service implementation
       * needs to override that initial setting -for example replacing
       * it with a new subclass of {@link Configuration}
       * @param conf new configuration.
       */
      /*
       * 设置此服务的配置。当{@link #init(Configuration)}时该方法会被调用并且
       * 只有在某些原因出现,服务实现需要覆盖该初始设置的情况下才需要这样做 - 例如
       * 用{@link Configuration}的新子类替换它。
       */
      protected void setConfig(Configuration conf) {
        this.config = conf;
      }
    
      /**
       * {@inheritDoc}
       * This invokes {@link #serviceInit}
       * @param conf the configuration of the service. This must not be null
       * @throws ServiceStateException if the configuration was null,
       * the state change not permitted, or something else went wrong
       */
      //这将调用{@link #serviceInit}
      //子类的serviceInit会初始化所需服务,会创建相应的服务类然后加入服务列表
      @Override
      public void init(Configuration conf) {
        //服务配置是否为空
        if (conf == null) {
          throw new ServiceStateException("Cannot initialize service "
                                          + getName() + ": null configuration");
        }
        //服务是否已经初始化
        if (isInState(STATE.INITED)) {
          return;
        }
        synchronized (stateChangeLock) {
          if (enterState(STATE.INITED) != STATE.INITED) {
            setConfig(conf);
            try {
              //服务初始化,会进入子类的同名函数
              serviceInit(config);
              if (isInState(STATE.INITED)) {
                //if the service ended up here during init,
                //notify the listeners
                notifyListeners();
              }
            } catch (Exception e) {
              noteFailure(e);
              ServiceOperations.stopQuietly(LOG, this);
              throw ServiceStateException.convert(e);
            }
          }
        }
      }
    
      /**
       * {@inheritDoc}
       * @throws ServiceStateException if the current service state does not permit
       * this action
       */
      //开始服务
      @Override
      public void start() {
        if (isInState(STATE.STARTED)) {
          return;
        }
        //enter the started state
        synchronized (stateChangeLock) {
          if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
            try {
              startTime = System.currentTimeMillis();
              serviceStart();
              if (isInState(STATE.STARTED)) {
                //if the service started (and isn't now in a later state), notify
                if (LOG.isDebugEnabled()) {
                  LOG.debug("Service " + getName() + " is started");
                }
                notifyListeners();
              }
            } catch (Exception e) {
              noteFailure(e);
              ServiceOperations.stopQuietly(LOG, this);
              throw ServiceStateException.convert(e);
            }
          }
        }
      }
    
      /**
       * {@inheritDoc}
       */
      //停止服务
      @Override
      public void stop() {
        if (isInState(STATE.STOPPED)) {
          return;
        }
        synchronized (stateChangeLock) {
          if (enterState(STATE.STOPPED) != STATE.STOPPED) {
            try {
              serviceStop();
            } catch (Exception e) {
              //stop-time exceptions are logged if they are the first one,
              noteFailure(e);
              throw ServiceStateException.convert(e);
            } finally {
              //report that the service has terminated
              terminationNotification.set(true);
              synchronized (terminationNotification) {
                terminationNotification.notifyAll();
              }
              //notify anything listening for events
              notifyListeners();
            }
          } else {
            //already stopped: note it
            if (LOG.isDebugEnabled()) {
              LOG.debug("Ignoring re-entrant call to stop()");
            }
          }
        }
      }
    
      /**
       * Relay to {@link #stop()}
       * @throws IOException
       */
      @Override
      public final void close() throws IOException {
        stop();
      }
    
      /**
       * Failure handling: record the exception
       * that triggered it -if there was not one already.
       * Services are free to call this themselves.
       * @param exception the exception
       */
      /*
       * 故障处理:记录触发它的异常 - 如果还没有一个。 服务可以自由调用。
       */
      protected final void noteFailure(Exception exception) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("noteFailure " + exception, null);
        }
        if (exception == null) {
          //make sure failure logic doesn't itself cause problems
          return;
        }
        //record the failure details, and log it
        //记录故障细节,并记录日志
        synchronized (this) {
          if (failureCause == null) {
            failureCause = exception;
            failureState = getServiceState();
            LOG.info("Service " + getName()
                     + " failed in state " + failureState
                     + "; cause: " + exception,
                     exception);
          }
        }
      }
    
      /*
       * 阻止等待服务停止; 使用终止通知对象这样做。
       * 该方法只有在执行所有服务停止操作(成功或失败)之后才返回,或超时已过
       * 该方法可以在服务初始化或启动之前调用; 这是为了消除任何竞争条件,服务在此事件发生之前停止。
       */
      @Override
      public final boolean waitForServiceToStop(long timeout) {
        boolean completed = terminationNotification.get();
        while (!completed) {
          try {
            synchronized(terminationNotification) {
              terminationNotification.wait(timeout);
            }
            // here there has been a timeout, the object has terminated,
            // or there has been a spurious wakeup (which we ignore)
            //这里有一个超时,对象已经终止了,或者有一个虚假的唤醒(我们忽略)
            completed = true;
          } catch (InterruptedException e) {
            // interrupted; have another look at the flag
            completed = terminationNotification.get();
          }
        }
        return terminationNotification.get();
      }
    
      /* ===================================================================== */
      /* Override Points */
      /* ===================================================================== */
    
      /**
       * All initialization code needed by a service.
       *
       * This method will only ever be called once during the lifecycle of
       * a specific service instance.
       *
       * Implementations do not need to be synchronized as the logic
       * in {@link #init(Configuration)} prevents re-entrancy.
       *
       * The base implementation checks to see if the subclass has created
       * a new configuration instance, and if so, updates the base class value
       * @param conf configuration
       * @throws Exception on a failure -these will be caught,
       * possibly wrapped, and wil; trigger a service stop
       */
      /*
       * 服务所需的所有初始化代码。
       * 该方法只能在特定服务实例的生命周期中被调用一次。
       * 实现不需要同步机制,因为{@link #init(Configuration))中的逻辑可以防止重新进入。
       * 基本实现检查子类是否已创建新的配置实例,如果是,则更新基类值。
       */
      protected void serviceInit(Configuration conf) throws Exception {
        if (conf != config) {
          LOG.debug("Config has been overridden during init");
          setConfig(conf);
        }
      }
    
      /**
       * Actions called during the INITED to STARTED transition.
       *
       * This method will only ever be called once during the lifecycle of
       * a specific service instance.
       *
       * Implementations do not need to be synchronized as the logic
       * in {@link #start()} prevents re-entrancy.
       *
       * @throws Exception if needed -these will be caught,
       * wrapped, and trigger a service stop
       */
      /*
       * 在INITED到STARTED过渡期间所采取的行动。
       * 该方法只能在特定服务实例的生命周期中被调用一次。
       * 实现不需要同步机制,因为{@link #start()}中的逻辑可以防止重新进入。
       */
      protected void serviceStart() throws Exception {
    
      }
    
      /**
       * Actions called during the transition to the STOPPED state.
       *
       * This method will only ever be called once during the lifecycle of
       * a specific service instance.
       *
       * Implementations do not need to be synchronized as the logic
       * in {@link #stop()} prevents re-entrancy.
       *
       * Implementations MUST write this to be robust against failures, including
       * checks for null references -and for the first failure to not stop other
       * attempts to shut down parts of the service.
       *
       * @throws Exception if needed -these will be caught and logged.
       */
      /*
       * 在转换到STOPPED状态期间调用的动作。
       * 该方法只能在特定服务实例的生命周期中被调用一次。
       * 实现不需要同步机制,因为{@link #stop()}中的逻辑可以防止重入。
       * 实现MUST写入这个要健壮来避免失败, 包括对空引用的检查,以及第一个不能停止其他尝试关闭部分服务的失败。
       */
      protected void serviceStop() throws Exception {
    
      }
    
      /*
       * 将监听器注册到服务状态更改事件。
       * 如果提供的侦听器已经在监听此服务,则此方法是无效的。
       * 参数 l 表示:一个新的监听器
       */
      @Override
      public void registerServiceListener(ServiceStateChangeListener l) {
        listeners.add(l);
      }
    
      /*
       * 取消注册先前注册的服务状态更改事件的侦听器。 如果监听器已经被注销,则不用操作。
       * 参数 l 表示:要注销的监听器
       */
      @Override
      public void unregisterServiceListener(ServiceStateChangeListener l) {
        listeners.remove(l);
      }
    
      /**
       * Register a global listener, which receives notifications
       * from the state change events of all services in the JVM
       * @param l listener
       */
      //注册一个全局监听器,它从JVM中所有服务的状态更改事件接收通知
      public static void registerGlobalListener(ServiceStateChangeListener l) {
        globalListeners.add(l);
      }
    
      /**
       * unregister a global listener.
       * @param l listener to unregister
       * @return true if the listener was found (and then deleted)
       */
      //取消注册全局监听器。
      public static boolean unregisterGlobalListener(ServiceStateChangeListener l) {
        return globalListeners.remove(l);
      }
    
      /**
       * Package-scoped method for testing -resets the global listener list
       */
      //用于测试的程序包范围的方法 - 重新设置全局侦听器列表
      @VisibleForTesting
      static void resetGlobalListeners() {
        globalListeners.reset();
      }
    
      /*
       * 获取服务的名称。
       * 返回:服务的名称
       */
      @Override
      public String getName() {
        return name;
      }
    
      /*
       * 获取该服务的配置信息。
       * 这通常不是一个克隆,并且可能被操纵,尽管不能保证这种行为的后果可能如何
       * 返回:当前配置,除非具体实现选择。
       */
      @Override
      public synchronized Configuration getConfig() {
        return config;
      }
    
      /*
       * 获取服务的开始时间。
       * 返回:服务的开始时间。 如果服务尚未启动,则为零。
       */
      @Override
      public long getStartTime() {
        return startTime;
      }
    
      /**
       * Notify local and global listeners of state changes.
       * Exceptions raised by listeners are NOT passed up.
       */
      //通知本地和全局监听器的状态变化。监听器提出的异常情况不会被传递。
      private void notifyListeners() {
        try {
          listeners.notifyListeners(this);
          globalListeners.notifyListeners(this);
        } catch (Throwable e) {
          LOG.warn("Exception while notifying listeners of " + this + ": " + e,
                   e);
        }
      }
    
      /**
       * Add a state change event to the lifecycle history
       */
      //将状态更改事件添加到生命周期历史记录
      private void recordLifecycleEvent() {
        LifecycleEvent event = new LifecycleEvent();
        event.time = System.currentTimeMillis();
        event.state = getServiceState();
        lifecycleHistory.add(event);
      }
    
      /*
       * 获取生命周期历史的快照; 它是一个静态列表
       * 返回:一个可能是empty的但从不是null的生命周期事件列表。
       */
      @Override
      public synchronized List<LifecycleEvent> getLifecycleHistory() {
        return new ArrayList<LifecycleEvent>(lifecycleHistory);
      }
    
      /**
       * Enter a state; record this via {@link #recordLifecycleEvent}
       * and log at the info level.
       * @param newState the proposed new state
       * @return the original state
       * it wasn't already in that state, and the state model permits state re-entrancy.
       */
      /*
       * 输入状态; 记录这个通过{@link #recordLifecycleEvent}并以信息级别记录在日志。
       * 参数 newState 表示 提出新的状态
       * 返回:原来的状态还没有在这个状态,状态模式允许状态重新进入。
       */
      private STATE enterState(STATE newState) {
        assert stateModel != null : "null state in " + name + " " + this.getClass();
        STATE oldState = stateModel.enterState(newState);
        if (oldState != newState) {
          if (LOG.isDebugEnabled()) {
            LOG.debug(
              "Service: " + getName() + " entered state " + getServiceState());
          }
          recordLifecycleEvent();
        }
        return oldState;
      }
    
      /*
       * 查询状态是否处于特定状态
       * 参数 表示提出新的状态
       */
      @Override
      public final boolean isInState(Service.STATE expected) {
        return stateModel.isInState(expected);
      }
    
      @Override
      public String toString() {
        return "Service " + name + " in state " + stateModel;
      }
    
      /**
       * Put a blocker to the blocker map -replacing any
       * with the same name.
       * @param name blocker name
       * @param details any specifics on the block. This must be non-null.
       */
      /*
       * 将拦截器放在拦截器map上 - 重新放置任何具有相同名称的。
       * 参数 name 表示:拦截器名称
       * 参数 details 表示:详细说明块上的细节。 这必须是非空。
       */
      protected void putBlocker(String name, String details) {
        synchronized (blockerMap) {
          blockerMap.put(name, details);
        }
      }
    
      /**
       * Remove a blocker from the blocker map -
       * this is a no-op if the blocker is not present
       * @param name the name of the blocker
       */
      /*
       * 从拦截器map中移除一个拦截器 - 如果拦截器不存在,这是空操作
       * 参数 name 表示:拦截器的名称
       */
      public void removeBlocker(String name) {
        synchronized (blockerMap) {
          blockerMap.remove(name);
        }
      }
    
      /*
       * 获取一个服务的拦截器 - 远程依赖关系,使服务不再是<i>live</i>。
       * 返回:一个拦截器名称-&gt的(快照)map;描述值
       */
      @Override
      public Map<String, String> getBlockers() {
        synchronized (blockerMap) {
          Map<String, String> map = new HashMap<String, String>(blockerMap);
          return map;
        }
      }
    }

    YarnScheduler.java 在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java

    参考3   

    package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
    
    import java.io.IOException;
    import java.util.EnumSet;
    import java.util.List;
    import java.util.Set;
    
    import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
    import org.apache.hadoop.classification.InterfaceAudience.Public;
    import org.apache.hadoop.classification.InterfaceStability.Evolving;
    import org.apache.hadoop.classification.InterfaceStability.Stable;
    import org.apache.hadoop.classification.InterfaceStability.Unstable;
    import org.apache.hadoop.security.UserGroupInformation;
    import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
    import org.apache.hadoop.yarn.api.records.ApplicationId;
    import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
    import org.apache.hadoop.yarn.api.records.ContainerId;
    import org.apache.hadoop.yarn.api.records.NodeId;
    import org.apache.hadoop.yarn.api.records.QueueACL;
    import org.apache.hadoop.yarn.api.records.QueueInfo;
    import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
    import org.apache.hadoop.yarn.api.records.Resource;
    import org.apache.hadoop.yarn.api.records.ResourceRequest;
    import org.apache.hadoop.yarn.event.EventHandler;
    import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
    import org.apache.hadoop.yarn.exceptions.YarnException;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
    import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
    import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
    
    /**
     * This interface is used by the components to talk to the
     * scheduler for allocating of resources, cleaning up resources.
     *
     */
    //该接口用于组件与调度器对话以分配资源、清理资源。
    public interface YarnScheduler extends EventHandler<SchedulerEvent> {
    
      /**
       * Get queue information
       * @param queueName queue name
       * @param includeChildQueues include child queues?
       * @param recursive get children queues?
       * @return queue information
       * @throws IOException
       */
      /*
        获取队列信息
        参数queueName 表示队列名称
        参数includeChildQueues 表示是否包含子队列
        参数recursive 表示递归得到孩子队列?
        返回QueueInfo 队列信息
       */
      @Public
      @Stable
      public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
          boolean recursive) throws IOException;
    
      /**
       * Get acls for queues for current user.
       * @return acls for queues for current user
       */
      /*
        获取当前用户的队列的访问控制列表(acls)
      */
      @Public
      @Stable
      public List<QueueUserACLInfo> getQueueUserAclInfo();
    
      /**
       * Get the whole resource capacity of the cluster.
       * @return the whole resource capacity of the cluster.
       */
      /*
        获取集群的全部资源容量。
        返回:集群的全部资源容量
      */
      @LimitedPrivate("yarn")
      @Unstable
      public Resource getClusterResource();
    
      /**
       * Get minimum allocatable {@link Resource}.
       * @return minimum allocatable resource
       */
      /*
        获取最小可分配资源。
        返回:最小可分配资源。
      */
      @Public
      @Stable
      public Resource getMinimumResourceCapability();
      
      /**
       * Get maximum allocatable {@link Resource} at the cluster level.
       * @return maximum allocatable resource
       */
      /*
        获得最大的可分配的资源在集群级别。
        返回:最大的可分配的资源
      */
      @Public
      @Stable
      public Resource getMaximumResourceCapability();
    
      /**
       * Get maximum allocatable {@link Resource} for the queue specified.
       * @param queueName queue name
       * @return maximum allocatable resource
       */
      /*
        获取指定队列的最大可分配资源。
        参数queueName 指队列名
        返回:最大可分配资源
      */
      @Public
      @Stable
      public Resource getMaximumResourceCapability(String queueName);
    
      @LimitedPrivate("yarn")
      @Evolving
      ResourceCalculator getResourceCalculator();
    
      /**
       * Get the number of nodes available in the cluster.
       * @return the number of available nodes.
       */
      /*
        获取集群中可用节点的数目。
        返回:可用节点的数目
      */
      @Public
      @Stable
      public int getNumClusterNodes();
      
      /**
       * The main api between the ApplicationMaster and the Scheduler.
       * The ApplicationMaster is updating his future resource requirements
       * and may release containers he doens't need.
       * 
       * @param appAttemptId
       * @param ask
       * @param release
       * @param blacklistAdditions 
       * @param blacklistRemovals 
       * @return the {@link Allocation} for the application
       */
      /*
        ApplicationMaster 和 Scheduler 之间的主要接口。ApplicationMaster 正在更新它的将来的资源需求以及可能释放它不需要的 containers 。
        返回:应用程序的 {@link Allocation}
      */
      @Public
      @Stable
      Allocation 
      allocate(ApplicationAttemptId appAttemptId, 
          List<ResourceRequest> ask,
          List<ContainerId> release, 
          List<String> blacklistAdditions, 
          List<String> blacklistRemovals);
    
      /**
       * Get node resource usage report.
       * @param nodeId
       * @return the {@link SchedulerNodeReport} for the node or null
       * if nodeId does not point to a defined node.
       */
      /*
        获取节点资源使用报告。
        返回:节点的 {@link SchedulerNodeReport} ;或者null,当nodeId没有指向一个已经定义的节点。
      */
      @LimitedPrivate("yarn")
      @Stable
      public SchedulerNodeReport getNodeReport(NodeId nodeId);
      
      /**
       * Get the Scheduler app for a given app attempt Id.
       * @param appAttemptId the id of the application attempt
       * @return SchedulerApp for this given attempt.
       */
      /*
        获取调度器应用程序,通过一个应用程序的尝试Id。
        参数appAttemptId 应用程序尝试的id
        返回:对这个给定的尝试返回 SchedulerApp
      */
      @LimitedPrivate("yarn")
      @Stable
      SchedulerAppReport getSchedulerAppInfo(ApplicationAttemptId appAttemptId);
    
      /**
       * Get a resource usage report from a given app attempt ID.
       * @param appAttemptId the id of the application attempt
       * @return resource usage report for this given attempt
       */
      /*
        从给定的应用程序尝试ID获取资源使用报告。
        参数appAttemptId表示应用程序尝试的id
        返回:给定的尝试的资源使用报告
      */
      @LimitedPrivate("yarn")
      @Evolving
      ApplicationResourceUsageReport getAppResourceUsageReport(
          ApplicationAttemptId appAttemptId);
      
      /**
       * Get the root queue for the scheduler.
       * @return the root queue for the scheduler.
       */
      /*
        获取调度器的根队列。
        返回:度器的根队列
      */
      @LimitedPrivate("yarn")
      @Evolving
      QueueMetrics getRootQueueMetrics();
    
      /**
       * Check if the user has permission to perform the operation.
       * If the user has {@link QueueACL#ADMINISTER_QUEUE} permission,
       * this user can view/modify the applications in this queue
       * @param callerUGI
       * @param acl
       * @param queueName
       * @return <code>true</code> if the user has the permission,
       *         <code>false</code> otherwise
       */
      /*
        检查用户是否具有执行操作的权限。如果用户有{@link QueueACL#ADMINISTER_QUEUE}这样的权限,这个用户就可以查看和修改这个队列里的应用程序。
        返回:<code>true</code>表示用户有这样的权限, 其它返回 <code>false</code> 
      */
      boolean checkAccess(UserGroupInformation callerUGI,
          QueueACL acl, String queueName);
      
      /**
       * Gets the apps under a given queue
       * @param queueName the name of the queue.
       * @return a collection of app attempt ids in the given queue.
       */
      /*
        获取给定队列下的应用程序。
        参数 queueName指队列的名称
        返回:给定队列的应用程序尝试的id的集合
      */
      @LimitedPrivate("yarn")
      @Stable
      public List<ApplicationAttemptId> getAppsInQueue(String queueName);
    
      /**
       * Get the container for the given containerId.
       * @param containerId
       * @return the container for the given containerId.
       */
      /*
        获得给定containerId的容器。
        返回:给定containerId的容器
      */
      @LimitedPrivate("yarn")
      @Unstable
      public RMContainer getRMContainer(ContainerId containerId);
    
      /**
       * Moves the given application to the given queue
       * @param appId
       * @param newQueue
       * @return the name of the queue the application was placed into
       * @throws YarnException if the move cannot be carried out
       */
      /*
        将给定的应用程序移动到给定的队列。
        返回:返回应用程序被放置的队列的名称
        抛出YarnException异常,当移动不能进行的时候
      */
      @LimitedPrivate("yarn")
      @Evolving
      public String moveApplication(ApplicationId appId, String newQueue)
          throws YarnException;
    
      /**
       * Completely drain sourceQueue of applications, by moving all of them to
       * destQueue.
       *
       * @param sourceQueue
       * @param destQueue
       * @throws YarnException
       */
      /*
        应用程序完全用完sourceQueue, 通过把它们都移动到destQueue
      */
      void moveAllApps(String sourceQueue, String destQueue) throws YarnException;
    
      /**
       * Terminate all applications in the specified queue.
       *
       * @param queueName the name of queue to be drained
       * @throws YarnException
       */
      /*
        终止指定队列中的所有应用程序。
        参数queueName指资源被用完的队列名称
      */
      void killAllAppsInQueue(String queueName) throws YarnException;
    
      /**
       * Remove an existing queue. Implementations might limit when a queue could be
       * removed (e.g., must have zero entitlement, and no applications running, or
       * must be a leaf, etc..).
       *
       * @param queueName name of the queue to remove
       * @throws YarnException
       */
      /*
        删除现有队列。当队列可以被移除时,实现可能会受到限制 (例如,必须有零个授权,并且没有应用程序运行,或必须是叶子,等。)
        参数 queueName指要删除的队列名
      */
      void removeQueue(String queueName) throws YarnException;
    
      /**
       * Add to the scheduler a new Queue. Implementations might limit what type of
       * queues can be dynamically added (e.g., Queue must be a leaf, must be
       * attached to existing parent, must have zero entitlement).
       *
       * @param newQueue the queue being added.
       * @throws YarnException
       */
      /*
        给调度器添加一个新队列。实现可能会限制哪种类型的队列能够动态添加(例如,队列必须是一个叶子,必须依附于现有的父级,必须有零的授权) 
      */
      void addQueue(Queue newQueue) throws YarnException;
    
      /**
       * This method increase the entitlement for current queue (must respect
       * invariants, e.g., no overcommit of parents, non negative, etc.).
       * Entitlement is a general term for weights in FairScheduler, capacity for
       * the CapacityScheduler, etc.
       *
       * @param queue the queue for which we change entitlement
       * @param entitlement the new entitlement for the queue (capacity,
       *              maxCapacity, etc..)
       * @throws YarnException
       */
      /*
        此方法增加当前队列的权限(必须遵守不变量,例如,没有过度使用的双亲,非负,等等。)。
      */
      void setEntitlement(String queue, QueueEntitlement entitlement)
          throws YarnException;
    
      /**
       * Gets the list of names for queues managed by the Reservation System
       * @return the list of queues which support reservations
       */
      /*
        获取由预订系统管理的队列的名称列表。
        返回:支持预定的队列列表
      */
      public Set<String> getPlanQueues() throws YarnException;  
    
      /**
       * Return a collection of the resource types that are considered when
       * scheduling
       *
       * @return an EnumSet containing the resource types
       */
      /*
        返回调度时所考虑的资源类型的集合
        返回:返回一个EnumSet包含资源类型
      */
      public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes();
    }

    ResourceScheduler.java 在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java

    package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
    
    import java.io.IOException;
    
    import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
    import org.apache.hadoop.classification.InterfaceStability.Evolving;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
    import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
    
    /**
     * This interface is the one implemented by the schedulers. It mainly extends 
     * {@link YarnScheduler}. 
     *
     */
    //这个接口是由调度器实现的。它主要扩展{@link YarnScheduler}.
    @LimitedPrivate("yarn")
    @Evolving
    public interface ResourceScheduler extends YarnScheduler, Recoverable {
    
      /**
       * Set RMContext for <code>ResourceScheduler</code>.
       * This method should be called immediately after instantiating
       * a scheduler once.
       * @param rmContext created by ResourceManager
       */
      /*
       * 为<code>ResourceScheduler</code>设置RMContext。
       * 一旦实例化一个scheduler, 该方法应该立刻被调用。
       * 参数: rmContext 被 ResourceManager 创建
       */
      void setRMContext(RMContext rmContext);
    
      /**
       * Re-initialize the <code>ResourceScheduler</code>.
       * @param conf configuration
       * @throws IOException
       */
      /*
       * 重新初始化<code>ResourceScheduler</code>.
       * 参数conf表示配置
       */
      void reinitialize(Configuration conf, RMContext rmContext) throws IOException;
    }

    AbstractYarnScheduler.java 在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

    package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
    
    import java.io.IOException;
    import java.util.*;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentMap;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.classification.InterfaceAudience.Private;
    import org.apache.hadoop.classification.InterfaceStability.Unstable;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.service.AbstractService;
    import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
    import org.apache.hadoop.yarn.api.records.ApplicationId;
    import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
    import org.apache.hadoop.yarn.api.records.Container;
    import org.apache.hadoop.yarn.api.records.ContainerId;
    import org.apache.hadoop.yarn.api.records.ContainerState;
    import org.apache.hadoop.yarn.api.records.ContainerStatus;
    import org.apache.hadoop.yarn.api.records.NodeId;
    import org.apache.hadoop.yarn.api.records.Resource;
    import org.apache.hadoop.yarn.api.records.ResourceOption;
    import org.apache.hadoop.yarn.api.records.ResourceRequest;
    import org.apache.hadoop.yarn.conf.YarnConfiguration;
    import org.apache.hadoop.yarn.exceptions.YarnException;
    import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
    import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
    import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
    import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
    import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
    import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
    import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
    import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
    import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
    import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
    import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
    import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
    import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
    import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
    import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
    import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
    import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
    import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
    import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
    import org.apache.hadoop.yarn.util.resource.Resources;
    
    import com.google.common.util.concurrent.SettableFuture;
    
    
    @SuppressWarnings("unchecked")
    @Private
    @Unstable
    public abstract class AbstractYarnScheduler
        <T extends SchedulerApplicationAttempt, N extends SchedulerNode>
        extends AbstractService implements ResourceScheduler {
    
      private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);
    
      // Nodes in the cluster, indexed by NodeId
      // 在集群中的节点,用NodeId索引
      protected Map<NodeId, N> nodes = new ConcurrentHashMap<NodeId, N>();
    
      // Whole capacity of the cluster
      // 集群全部容量
      protected Resource clusterResource = Resource.newInstance(0, 0);
    
      protected Resource minimumAllocation;
      private Resource maximumAllocation;
      private Resource configuredMaximumAllocation;
      private int maxNodeMemory = -1;
      private int maxNodeVCores = -1;
      private final ReadLock maxAllocReadLock;
      private final WriteLock maxAllocWriteLock;
    
      private boolean useConfiguredMaximumAllocationOnly = true;
      private long configuredMaximumAllocationWaitTime;
    
      protected RMContext rmContext;
      
      /*
       * All schedulers which are inheriting AbstractYarnScheduler should use
       * concurrent version of 'applications' map.
       */
      // 所有继承自AbstractYarnScheduler的调度器应该使用并行版本的'applications' map
      protected ConcurrentMap<ApplicationId, SchedulerApplication<T>> applications;
      protected int nmExpireInterval;
    
      protected final static List<Container> EMPTY_CONTAINER_LIST =
          new ArrayList<Container>();
      protected static final Allocation EMPTY_ALLOCATION = new Allocation(
        EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
    
      /**
       * Construct the service.
       *
       * @param name service name
       */
      /*
       * 构造服务
       * 参数name表示服务名
       */
      public AbstractYarnScheduler(String name) {
        super(name);
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        this.maxAllocReadLock = lock.readLock();
        this.maxAllocWriteLock = lock.writeLock();
      }
    
      // 服务所需的所有初始化代码。
      @Override
      public void serviceInit(Configuration conf) throws Exception {
        // getInt()表示获取<code> name </code>属性的值作为<code> int </code>。
        // 如果没有这样的属性,返回提供的默认值,或者如果指定的值不是有效的<code> int </ code>,那么会抛出一个错误。
        // 第一个参数是String name,第二个参数int defaultValue
        // DEFAULT_RM_NM_EXPIRY_INTERVAL_MS指节点管理器被认为死所要的等待的时间,默认为600000ms。
        nmExpireInterval =
            conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
              YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
        // 获取<code> name </code>属性的值作为<code> long </code>。
        // 如果没有这样的属性,返回所提供的默认值,或者如果指定的值不是有效的<code> long </ code>,则会抛出错误。
        // 第一个参数是String name,第二个参数long defaultValue
        // DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS指,默认为10000ms。
        configuredMaximumAllocationWaitTime =
            conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
              YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
        //
        createReleaseCache();
        super.serviceInit(conf);
      }
    
      public List<Container> getTransferredContainers(
          ApplicationAttemptId currentAttempt) {
        // 从<code>ApplicationAttempId</code>中获取<code>ApplicationId</code>
        ApplicationId appId = currentAttempt.getApplicationId();
        // 调用的get()函数是Map类,返回指定键映射到的值,如果此映射不包含该键的映射,则返回{@code null}。
        SchedulerApplication<T> app = applications.get(appId);
        // 构造一个初始容量为十的空列表。它只接收Container类型
        List<Container> containerList = new ArrayList<Container>();
        // rmContext是接口RMContext的对象,而该接口只有一个实现类RMContextImpl,
        // rmContext.getRMApps()返回ConcurrentMap<ApplicationId, RMApp> 
        // rmContext.getRMApps().get(appId)调用的是Map类的get()函数。
        RMApp appImpl = this.rmContext.getRMApps().get(appId);
        // appImpl是接口RMApp对象,
        // appImpl.getApplicationSubmissionContext()此{@link RMApp}的应用程序提交上下文,返回ApplicationSubmissionContext
        // getUnmanagedAM()获取是否RM应该管理AM的执行。
        if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) {
          return containerList;
        }
        if (app == null) {
          return containerList;
        }
        // getLiveContainers()获取应用程序的活动容器并返回。
        Collection<RMContainer> liveContainers =
            app.getCurrentAppAttempt().getLiveContainers();
        // getMasterContainer()是供Application Master运行的容器,
        // Container类的getId()获取容器的全局唯一标识符。
        // 最后获取的是Application Master的容器Id
        ContainerId amContainerId =
            rmContext.getRMApps().get(appId).getCurrentAppAttempt()
              .getMasterContainer().getId();
        for (RMContainer rmContainer : liveContainers) {
          // 判断当前的Id是否是Application Master的容器Id
          if (!rmContainer.getContainerId().equals(amContainerId)) {
            // 不相等,则往容器列表中添加容器
            containerList.add(rmContainer.getContainer());
          }
        }
        return containerList;
      }
    
      public Map<ApplicationId, SchedulerApplication<T>>
          getSchedulerApplications() {
        return applications;
      }
    
      // 获取集群的整个资源容量。
      @Override
      public Resource getClusterResource() {
        return clusterResource;
      }
    
      // 获取最小可分配{@link Resource}。
      @Override
      public Resource getMinimumResourceCapability() {
        return minimumAllocation;
      }
    
      // 在集群级别获取最大可分配{@link Resource}。
      @Override
      public Resource getMaximumResourceCapability() {
        Resource maxResource;
        maxAllocReadLock.lock();
        try {
          // 类最开始定义useConfiguredMaximumAllocationOnly为true
          if (useConfiguredMaximumAllocationOnly) {
            // System.currentTimeMillis()产生一个当前的毫秒,这个毫秒其实就是自1970年1月1日0时起的毫秒数
            // ResourceManager.getClusterTimeStamp()调用的也是System.currentTimeMillis(),
            // configuredMaximumAllocationWaitTime默认值为10000ms
            if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
                > configuredMaximumAllocationWaitTime) {
              useConfiguredMaximumAllocationOnly = false;    //设为false
            }
            // 克隆一份资源
            maxResource = Resources.clone(configuredMaximumAllocation);
          } else {
            maxResource = Resources.clone(maximumAllocation);
          }
        } finally {
          maxAllocReadLock.unlock();
        }
        return maxResource;
      }
    
      //
      @Override
      public Resource getMaximumResourceCapability(String queueName) {
        return getMaximumResourceCapability();
      }
    
      // 初始化最大资源容量
      protected void initMaximumResourceCapability(Resource maximumAllocation) {
        maxAllocWriteLock.lock();
        try {
          if (this.configuredMaximumAllocation == null) {
            // 克隆资源
            this.configuredMaximumAllocation = Resources.clone(maximumAllocation);
            this.maximumAllocation = Resources.clone(maximumAllocation);
          }
        } finally {
          maxAllocWriteLock.unlock();
        }
      }
    
      //
      protected synchronized void containerLaunchedOnNode(
          ContainerId containerId, SchedulerNode node) {
        // Get the application for the finished container
        // 获取完成了的容器的应用程序
        SchedulerApplicationAttempt application = getCurrentAttemptForContainer
            (containerId);
        if (application == null) {
          // getApplicationAttemptId()获取分配了<code> Container </code>的应用程序的<code> ApplicationAttemptId </code>。
          // getApplicationId() 获取<code> ApplicationAttempId </code>的<code> ApplicationId </code>。
          LOG.info("Unknown application "
              + containerId.getApplicationAttemptId().getApplicationId()
              + " launched container " + containerId + " on node: " + node);
          // rmContext是接口RMContext的对象, rmContext.getDispatcher()返回接口Dispatcher的对象, 
          // rmContext.getDispatcher().getEventHandler()返回接口EventHandler对象, 最后调用EventHandler的handle()方法
          // RMNodeCleanContainerEvent表示资源管理器节点清除容器事件,构造函数内部有RMNodeEventType.CLEANUP_CONTAINER
          this.rmContext.getDispatcher().getEventHandler()
            .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
          return;
        }
    
        application.containerLaunchedOnNode(containerId, node.getNodeID());
      }
    
      //
      public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
        // getApplicationId() 获取<code> ApplicationAttempId </code>的<code> ApplicationId </code>。
        // 调用的get()函数是Map类,返回指定键映射到的值,如果此映射不包含该键的映射,则返回{@code null}。
        SchedulerApplication<T> app =
            applications.get(applicationAttemptId.getApplicationId());
        // getCurrentAppAttempt()返回的是SchedulerApplicationAttempt类对象
        return app == null ? null : app.getCurrentAppAttempt();
      }
    
      // 从给定应用程序尝试Id中获取调度器应用程序
      @Override
      public SchedulerAppReport getSchedulerAppInfo(
          ApplicationAttemptId appAttemptId) {
        SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId);
        if (attempt == null) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
          }
          return null;
        }
        // SchedulerAppReport类 表示应用程序尝试,以及尝试使用的资源。
        return new SchedulerAppReport(attempt);
      }
    
      // 从给定的应用程序尝试ID获取资源使用情况报告。
      @Override
      public ApplicationResourceUsageReport getAppResourceUsageReport(
          ApplicationAttemptId appAttemptId) {
        SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId);
        if (attempt == null) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
          }
          return null;
        }
        //
        return attempt.getResourceUsageReport();
      }
    
      // 根据容器Id获取当前应用程序的尝试
      public T getCurrentAttemptForContainer(ContainerId containerId) {
        return getApplicationAttempt(containerId.getApplicationAttemptId());
      }
    
      // 获取给定containerId的容器。
      @Override
      public RMContainer getRMContainer(ContainerId containerId) {
        SchedulerApplicationAttempt attempt =
            getCurrentAttemptForContainer(containerId);
        // getRMContainer()方法表示获取资源管理器容器
        return (attempt == null) ? null : attempt.getRMContainer(containerId);
      }
    
      // 获取节点资源使用情况报告。
      @Override
      public SchedulerNodeReport getNodeReport(NodeId nodeId) {
        // Map类方法get()
        N node = nodes.get(nodeId);
        // SchedulerNodeReport类表示节点使用报告
        return node == null ? null : new SchedulerNodeReport(node);
      }
    
      // 将给定的应用程序移动到给定的队列
      @Override
      public String moveApplication(ApplicationId appId, String newQueue)
          throws YarnException {
        throw new YarnException(getClass().getSimpleName()
            + " does not support moving apps between queues");
      }
    
      // 移除一个已有的队列
      public void removeQueue(String queueName) throws YarnException {
        throw new YarnException(getClass().getSimpleName()
            + " does not support removing queues");
      }
    
      // 把一个新队列添加到调度器。
      @Override
      public void addQueue(Queue newQueue) throws YarnException {
        throw new YarnException(getClass().getSimpleName()
            + " does not support this operation");
      }
    
      // 此方法增加了当前队列的权限
      @Override
      public void setEntitlement(String queue, QueueEntitlement entitlement)
          throws YarnException {
        throw new YarnException(getClass().getSimpleName()
            + " does not support this operation");
      }
    
      //在节点上杀死孤立容器
      private void killOrphanContainerOnNode(RMNode node,
          NMContainerStatus container) {
        // getContainerState()获取容器的状态
        // Enum类的equals()函数表示 如果指定的对象等于此枚举常量,则返回true。否则false。
        // ContainerState类表示容器的状态,有三种NEW, RUNNING, COMPLETE。COMPLETE表示完成的容器。
        if (!container.getContainerState().equals(ContainerState.COMPLETE)) {
          // 在本类的containerLaunchedOnNode()函数中有一样的,略
          this.rmContext.getDispatcher().getEventHandler().handle(
            new RMNodeCleanContainerEvent(node.getNodeID(),
              container.getContainerId()));
        }
      }
    
      // 在节点上恢复容器
      public synchronized void recoverContainersOnNode(
          List<NMContainerStatus> containerReports, RMNode nm) {
        if (!rmContext.isWorkPreservingRecoveryEnabled()
            || containerReports == null
            || (containerReports != null && containerReports.isEmpty())) {
          return;
        }
    
        for (NMContainerStatus container : containerReports) {
          /*
           * container.getContainerId()获取容器的<code> ContainerId </code>。
           * getApplicationAttemptId() 获取分配了<code> Container </code>的应用程序的<code> ApplicationAttemptId </code>。
           * getApplicationId() 获取<code> ApplicationAttempId </ code>的<code> ApplicationId </code>。
           */
          ApplicationId appId =
              container.getContainerId().getApplicationAttemptId().getApplicationId();
          //
          RMApp rmApp = rmContext.getRMApps().get(appId);
          if (rmApp == null) {
            LOG.error("Skip recovering container " + container
                + " for unknown application.");
            killOrphanContainerOnNode(nm, container);
            continue;
          }
    
          // Unmanaged AM recovery is addressed in YARN-1815
          // 未经管理的AM恢复在YARN-1815中得到解决
          // rmApp.getApplicationSubmissionContext()函数表示{@link RMApp}的应用程序提交上下文
          // getUnmanagedAM()获取是否RM应该管理AM的执行。如果为真,则RM不会为AM分配容器并启动它。
          if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) {
            LOG.info("Skip recovering container " + container + " for unmanaged AM."
                + rmApp.getApplicationId());
            killOrphanContainerOnNode(nm, container);
            continue;
          }
    
          //Map类的get()函数
          SchedulerApplication<T> schedulerApp = applications.get(appId);
          if (schedulerApp == null) {
            //rmApp.getState()表示{@link RMApp}的当前状态。
            LOG.info("Skip recovering container  " + container
                + " for unknown SchedulerApplication. Application current state is "
                + rmApp.getState());
            killOrphanContainerOnNode(nm, container);
            continue;
          }
    
          LOG.info("Recovering container " + container);
          SchedulerApplicationAttempt schedulerAttempt =
              schedulerApp.getCurrentAppAttempt();
    
          // getKeepContainersAcrossApplicationAttempts()函数 获取指示是否在应用程序尝试中保留容器的标志
          if (!rmApp.getApplicationSubmissionContext()
            .getKeepContainersAcrossApplicationAttempts()) {
            // Do not recover containers for stopped attempt or previous attempt.
            // 不要因为停止了的尝试或以前的尝试恢复容器。
            if (schedulerAttempt.isStopped()
                || !schedulerAttempt.getApplicationAttemptId().equals(
                  container.getContainerId().getApplicationAttemptId())) {
              LOG.info("Skip recovering container " + container
                  + " for already stopped attempt.");
              killOrphanContainerOnNode(nm, container);
              continue;
            }
          }
    
          // create container
          // 创建容器
          RMContainer rmContainer = recoverAndCreateContainer(container, nm);
    
          // recover RMContainer
          // 恢复 RMContainer
          rmContainer.handle(new RMContainerRecoverEvent(container.getContainerId(),
            container));
    
          // recover scheduler node
          // 恢复调度器节点
          nodes.get(nm.getNodeID()).recoverContainer(rmContainer);
    
          // recover queue: update headroom etc.
          // 恢复队列:更新净空等等
          Queue queue = schedulerAttempt.getQueue();
          queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer);
    
          // recover scheduler attempt
          // 恢复调度器尝试
          schedulerAttempt.recoverContainer(rmContainer);
                
          // set master container for the current running AMContainer for this
          // attempt.
          // 为这个尝试 为当前运行的AMContainer设置主容器
          RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt();
          if (appAttempt != null) {
            // getMasterContainer()函数表示 ApplicationMaster运行在其上的容器
            Container masterContainer = appAttempt.getMasterContainer();
    
            // Mark current running AMContainer's RMContainer based on the master
            // container ID stored in AppAttempt.
            // 根据存储在AppAttempt中的主容器ID,标记当前正在运行的AMContainer的RMContainer。
            if (masterContainer != null
                && masterContainer.getId().equals(rmContainer.getContainerId())) {
              // 设置ApplicationMaster容器
              ((RMContainerImpl)rmContainer).setAMContainer(true);
            }
          }
    
          synchronized (schedulerAttempt) {
            // 这个pendingRelease用于工作维护恢复方案,以跟踪AM的未完成发布请求。
            // RM恢复可以收到AM的发布请求表,在此之前从NM收到容器状态以进行恢复。
            // 在这种情况下,由NM报告的待回收容器不应该被收回。
            Set<ContainerId> releases = schedulerAttempt.getPendingRelease();
            // Set类中的contains()函数, 
            //如果此集合包含指定的元素,则返回<tt> true </ tt>。 更正式地,当且仅当该集合包含元素<tt> e </ tt>时,返回<tt> true </ tt>,
            //这样<tt>(o==null&nbsp;?&nbsp;e==null&nbsp;:&nbsp;o.equals(e))</tt>.
            if (releases.contains(container.getContainerId())) {
              // release the container
              //释放容器
              rmContainer.handle(new RMContainerFinishedEvent(container
                .getContainerId(), SchedulerUtils.createAbnormalContainerStatus(
                container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER),
                RMContainerEventType.RELEASED));
              releases.remove(container.getContainerId());
              LOG.info(container.getContainerId() + " is released by application.");
            }
          }
        }
      }
    
      // 恢复并创建容器
      // NMContainerStatus包括容器的当前信息。
      // RMNode类表示节点管理器有关可用资源和其他静态信息的信息。
      private RMContainer recoverAndCreateContainer(NMContainerStatus status,
          RMNode node) {
        // 创建Container实例
        Container container =
            Container.newInstance(status.getContainerId(), node.getNodeID(),
              node.getHttpAddress(), status.getAllocatedResource(),
              status.getPriority(), null);
        // 获取应用程序的尝试Id
        ApplicationAttemptId attemptId =
            container.getId().getApplicationAttemptId();
        // 创建一个RMContainerImpl对象                                                                      
        RMContainer rmContainer =
            new RMContainerImpl(container, attemptId, node.getNodeID(),
              applications.get(attemptId.getApplicationId()).getUser(), rmContext,
              status.getCreationTime());
        return rmContainer;
      }
    
      /**
       * Recover resource request back from RMContainer when a container is 
       * preempted before AM pulled the same. If container is pulled by
       * AM, then RMContainer will not have resource request to recover.
       * @param rmContainer
       */
      /*
       * 在AM拉出相同之前当容器被抢占时,从RMContainer恢复资源请求。如果容器被AM拉过来,则RMContainer将不会有资源请求恢复。
       */
      protected void recoverResourceRequestForContainer(RMContainer rmContainer) {
        // getResourceRequests()函数获取资源请求
        List<ResourceRequest> requests = rmContainer.getResourceRequests();
    
        // If container state is moved to ACQUIRED, request will be empty.
        // 如果容器状态被移动到 ACQUIRED,请求将为空。
        if (requests == null) {
          return;
        }
        // Add resource request back to Scheduler.
        // 将资源请求添加回调度器。
        SchedulerApplicationAttempt schedulerAttempt 
            = getCurrentAttemptForContainer(rmContainer.getContainerId());
        if (schedulerAttempt != null) {
          // 恢复资源请求
          schedulerAttempt.recoverResourceRequests(requests);
        }
      }
    
      protected void createReleaseCache() {
        // Cleanup the cache after nm expire interval.
        // 在nm到期之际后清除缓存。
        // Timer类创建一个新的计时器。schedule()函数表示在指定的延迟之后安排指定的任务执行。
        new Timer().schedule(new TimerTask() {
          @Override
          public void run() {
            // Map类的values()函数表示 返回此map中包含的值的{@link Collection}视图。
            for (SchedulerApplication<T> app : applications.values()) {
    
              // 获取当前应用程序的尝试
              T attempt = app.getCurrentAppAttempt();
              synchronized (attempt) {
                // 
                for (ContainerId containerId : attempt.getPendingRelease()) {
                  // logFailure()函数表示 为失败的事件创建可读和可分析的审核日志字符串。
                  RMAuditLogger.logFailure(
                    app.getUser(),
                    AuditConstants.RELEASE_CONTAINER,
                    "Unauthorized access or invalid container",
                    "Scheduler",
                    "Trying to release container not owned by app or with invalid id.",
                    attempt.getApplicationId(), containerId);
                }
                // Set类的clear()函数表示 从此set中删除所有元素(可选操作)。 此调用返回后,该组将为空。
                attempt.getPendingRelease().clear();
              }
            }
            LOG.info("Release request cache is cleaned up");
          }
        }, nmExpireInterval);
      }
    
      // clean up a completed container
      // 清理完成的容器
      protected abstract void completedContainer(RMContainer rmContainer,
          ContainerStatus containerStatus, RMContainerEventType event);
    
      // 清除容器
      protected void releaseContainers(List<ContainerId> containers,
          SchedulerApplicationAttempt attempt) {
        for (ContainerId containerId : containers) {
          // 获取给定containerId的容器。
          RMContainer rmContainer = getRMContainer(containerId);
          if (rmContainer == null) {
            //
            if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
                < nmExpireInterval) {
              LOG.info(containerId + " doesn't exist. Add the container"
                  + " to the release request cache as it maybe on recovery.");
              synchronized (attempt) {
                // Set类的add()函数表示 如果指定的元素不存在,则将其指定的元素添加到这个set(可选操作)。 
                // 更正式地,如果set不包含元素<tt> e2 </tt>,则将指定的元素<tt> e </tt>添加到此set,以便
                //<tt>(e==null&nbsp;?&nbsp;e2==null&nbsp;:&nbsp;e.equals(e2))</tt>.
                attempt.getPendingRelease().add(containerId);
              }
            } else {
              // logFailure()函数表示 为失败的事件创建可读和可分析的审核日志字符串
              RMAuditLogger.logFailure(attempt.getUser(),
                AuditConstants.RELEASE_CONTAINER,
                "Unauthorized access or invalid container", "Scheduler",
                "Trying to release container not owned by app or with invalid id.",
                attempt.getApplicationId(), containerId);
            }
          }
          // 清理完成的容器
          // createAbnormalContainerStatus()函数表示在特殊情况下创建{@link ContainerStatus}的实用程序。
          completedContainer(rmContainer,
            SchedulerUtils.createAbnormalContainerStatus(containerId,
              SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED);
        }
      }
    
      // 获取
      // SchedulerNode类表示 从调度器的角度表示YARN集群节点。
      public SchedulerNode getSchedulerNode(NodeId nodeId) {
        // Map类的get()函数表示 返回指定键映射到的值,如果此映射不包含该键的映射,则返回{@code null}。
        return nodes.get(nodeId);
      }
    
      // 完全排除应用程序的sourceQueue,将其全部移动到destQueue。
      @Override
      public synchronized void moveAllApps(String sourceQueue, String destQueue)
          throws YarnException {
        // check if destination queue is a valid leaf queue
        // 检查目标队列是否是有效的叶队列
        try {
          getQueueInfo(destQueue, false, false);
        } catch (IOException e) {
          LOG.warn(e);
          throw new YarnException(e);
        }
        // check if source queue is a valid
        // 检查源队列是否有效
        // getAppsInQueue()函数表示 获取给定队列下的应用程序
        List<ApplicationAttemptId> apps = getAppsInQueue(sourceQueue);
        if (apps == null) {
          String errMsg = "The specified Queue: " + sourceQueue + " doesn't exist";
          LOG.warn(errMsg);
          throw new YarnException(errMsg);
        }
        // generate move events for each pending/running app
        // 为每个待处理/正在运行的应用生成移动事件
        for (ApplicationAttemptId app : apps) {
          // 
          SettableFuture<Object> future = SettableFuture.create();
          // RMAppMoveEvent类构造函数内部有 RMAppEventType.MOVE事件。
          this.rmContext
              .getDispatcher()
              .getEventHandler()
              .handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
        }
      }
      
      // 终止指定队列中的所有应用程序。
      @Override
      public synchronized void killAllAppsInQueue(String queueName)
          throws YarnException {
        // check if queue is a valid
        // 检查队列是否有效
        // getAppsInQueue()函数表示 获取给定队列下的应用程序
        List<ApplicationAttemptId> apps = getAppsInQueue(queueName);
        if (apps == null) {
          String errMsg = "The specified Queue: " + queueName + " doesn't exist";
          LOG.warn(errMsg);
          throw new YarnException(errMsg);
        }
        // generate kill events for each pending/running app
        // 为每个待处理/正在运行的应用生成kill事件
        for (ApplicationAttemptId app : apps) {
          this.rmContext
              .getDispatcher()
              .getEventHandler()
              .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL,
              "Application killed due to expiry of reservation queue " +
              queueName + "."));
        }
      }
      
      /**
       * Process resource update on a node.
       */
      // 在节点上处理资源更新。
      public synchronized void updateNodeResource(RMNode nm, 
          ResourceOption resourceOption) {
        SchedulerNode node = getSchedulerNode(nm.getNodeID());
        Resource newResource = resourceOption.getResource();
        // 获取节点上的总资源。
        Resource oldResource = node.getTotalResource();
        if(!oldResource.equals(newResource)) {
          // Log resource change
          // 日志记录资源更改
          LOG.info("Update resource on node: " + node.getNodeName()
              + " from: " + oldResource + ", to: "
              + newResource);
    
          // Map类的remove()函数表示 从该map中删除一个键的映射,如果存在(可选的操作)。
          // 更正式地,如果该map包含从<tt> k </tt>到值<tt> v </tt>的映射,使得<code>(key==null ? k==null : key.equals(k))</code>,
          // 该映射被删除。(map最多可以包含一个这样的映射。)
          nodes.remove(nm.getNodeID());
          //
          updateMaximumAllocation(node, false);
    
          // update resource to node
          // 将资源更新到节点
          // 在节点上设置总资源。
          node.setTotalResource(newResource);
    
          // Map类的put()函数表示 将指定的值与该映射中的指定键相关联(可选操作)。如果map先前包含了键的映射,则旧值将被指定的值替换。
          nodes.put(nm.getNodeID(), (N)node);
          // 
          updateMaximumAllocation(node, true);
    
          // update resource to clusterResource
          // 将资源更新到clusterResource
          // subtractFrom(clusterResource, oldResource)表示从clusterResource减去oldResource,资源包括内存和虚拟内核
          Resources.subtractFrom(clusterResource, oldResource);
          // addTo(clusterResource, newResource)表示在clusterResource添加newResource,资源包括内存和虚拟内核
          Resources.addTo(clusterResource, newResource);
        } else {
          // Log resource change
          // 日志记录资源改变
          LOG.warn("Update resource on node: " + node.getNodeName() 
              + " with the same resource: " + newResource);
        }
      }
    
      /** {@inheritDoc} */
      // 返回调度时考虑的资源类型的集合
      @Override
      public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() {
        // EnumSet类的of()函数 创建一个最初包含指定元素的枚举集。 
        return EnumSet.of(SchedulerResourceTypes.MEMORY);
      }
    
      // 获取由预留系统管理的队列的名称列表
      @Override
      public Set<String> getPlanQueues() throws YarnException {
        // Object类的getClass()函数 返回此{@code Object}的运行时类。
        // 返回的{@code Class}对象是被表示类的{@code static synchronized}方法锁定的对象。
        // Class类的getSimpleName()函数返回源代码中给出的基础类的简单名称。 如果基础类是匿名的,则返回一个空字符串。
        throw new YarnException(getClass().getSimpleName()
            + " does not support reservations");
      }
    
      // 更新最大可分配
      protected void updateMaximumAllocation(SchedulerNode node, boolean add) {
        // 获取节点上的总资源
        Resource totalResource = node.getTotalResource();
        maxAllocWriteLock.lock();
        try {
          if (add) { // added node  //添加节点
            // 获取资源的<em>memory</em> 
            int nodeMemory = totalResource.getMemory();
            if (nodeMemory > maxNodeMemory) {
              maxNodeMemory = nodeMemory;
              // 设置资源的<em>memory</em>
              // Math.min()返回两个数的最小值
              maximumAllocation.setMemory(Math.min(
                  configuredMaximumAllocation.getMemory(), maxNodeMemory));
            }
            // 获取资源的<em>number of virtual cpu cores</em>
            int nodeVCores = totalResource.getVirtualCores();
            if (nodeVCores > maxNodeVCores) {
              maxNodeVCores = nodeVCores;
              maximumAllocation.setVirtualCores(Math.min(
                  configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
            }
          } else {  // removed node  //删除节点
            if (maxNodeMemory == totalResource.getMemory()) {
              maxNodeMemory = -1;
            }
            if (maxNodeVCores == totalResource.getVirtualCores()) {
              maxNodeVCores = -1;
            }
            // We only have to iterate through the nodes if the current max memory
            // or vcores was equal to the removed node's
            // 如果当前的最大内存或虚拟内核等于被删除的节点的,我们只需遍历节点
            if (maxNodeMemory == -1 || maxNodeVCores == -1) {
              // A map entry (key-value pair).  entrySet()返回此map中包含的映射的{@link Set}视图。
              for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) {
                int nodeMemory =
                    nodeEntry.getValue().getTotalResource().getMemory();
                if (nodeMemory > maxNodeMemory) {
                  maxNodeMemory = nodeMemory;
                }
                int nodeVCores =
                    nodeEntry.getValue().getTotalResource().getVirtualCores();
                if (nodeVCores > maxNodeVCores) {
                  maxNodeVCores = nodeVCores;
                }
              }
              if (maxNodeMemory == -1) {  // no nodes  //无节点
                maximumAllocation.setMemory(configuredMaximumAllocation.getMemory());
              } else {
                maximumAllocation.setMemory(
                    Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory));
              }
              if (maxNodeVCores == -1) {  // no nodes   //无节点
                maximumAllocation.setVirtualCores(configuredMaximumAllocation.getVirtualCores());
              } else {
                maximumAllocation.setVirtualCores(
                    Math.min(configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
              }
            }
          }
        } finally {
          maxAllocWriteLock.unlock();
        }
      }
    
      // 刷新最大可分配
      protected void refreshMaximumAllocation(Resource newMaxAlloc) {
        maxAllocWriteLock.lock();
        try {
          configuredMaximumAllocation = Resources.clone(newMaxAlloc);
          int maxMemory = newMaxAlloc.getMemory();
          if (maxNodeMemory != -1) {
            maxMemory = Math.min(maxMemory, maxNodeMemory);
          }
          int maxVcores = newMaxAlloc.getVirtualCores();
          if (maxNodeVCores != -1) {
            maxVcores = Math.min(maxVcores, maxNodeVCores);
          }
          // 
          maximumAllocation = Resources.createResource(maxMemory, maxVcores);
        } finally {
          maxAllocWriteLock.unlock();
        }
      }
    
      // 为应用尝试获取待处理的资源请求
      public List<ResourceRequest> getPendingResourceRequestsForAttempt(
          ApplicationAttemptId attemptId) {
        // 获取应用程序尝试
        SchedulerApplicationAttempt attempt = getApplicationAttempt(attemptId);
        if (attempt != null) {
          // getAppSchedulingInfo()获取应用程序调度信息。  getAllResourceRequests()获取所有的资源请求。
          return attempt.getAppSchedulingInfo().getAllResourceRequests();
        }
        return null;
      }
    }

    Hadoop 三大调度器包括, Fifo , Capacity 以及 Fair 调度器,如下所示:

    (1)  Fifo调度器

      对应的类是 FifoScheduler.java , 在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java 。

    package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
    import java.util.concurrent.ConcurrentSkipListMap;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
    import org.apache.hadoop.classification.InterfaceStability.Evolving;
    import org.apache.hadoop.conf.Configurable;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.security.UserGroupInformation;
    import org.apache.hadoop.security.authorize.AccessControlList;
    import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
    import org.apache.hadoop.yarn.api.records.ApplicationId;
    import org.apache.hadoop.yarn.api.records.Container;
    import org.apache.hadoop.yarn.api.records.ContainerId;
    import org.apache.hadoop.yarn.api.records.ContainerStatus;
    import org.apache.hadoop.yarn.api.records.NodeId;
    import org.apache.hadoop.yarn.api.records.Priority;
    import org.apache.hadoop.yarn.api.records.QueueACL;
    import org.apache.hadoop.yarn.api.records.QueueInfo;
    import org.apache.hadoop.yarn.api.records.QueueState;
    import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
    import org.apache.hadoop.yarn.api.records.Resource;
    import org.apache.hadoop.yarn.api.records.ResourceRequest;
    import org.apache.hadoop.yarn.conf.YarnConfiguration;
    import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
    import org.apache.hadoop.yarn.factories.RecordFactory;
    import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
    import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
    import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
    import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
    import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
    import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
    import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
    import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
    import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
    import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
    import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
    import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
    import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
    import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
    import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
    import org.apache.hadoop.yarn.server.utils.BuilderUtils;
    import org.apache.hadoop.yarn.server.utils.Lock;
    import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
    import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
    import org.apache.hadoop.yarn.util.resource.Resources;
    
    import com.google.common.annotations.VisibleForTesting;
    
    @LimitedPrivate("yarn")
    @Evolving
    @SuppressWarnings("unchecked")
    public class FifoScheduler extends
        AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> implements
        Configurable {
    
      private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
    
      private static final RecordFactory recordFactory = 
        RecordFactoryProvider.getRecordFactory(null);
    
      Configuration conf;
    
      private boolean usePortForNodeName;
    
      private ActiveUsersManager activeUsersManager;
    
      private static final String DEFAULT_QUEUE_NAME = "default";
      private QueueMetrics metrics;
      
      private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
    
      //创建一个默认队列
      private final Queue DEFAULT_QUEUE = new Queue() {
        @Override
        public String getQueueName() {
          return DEFAULT_QUEUE_NAME;
        }
    
        @Override
        public QueueMetrics getMetrics() {
          return metrics;
        }
    
        @Override
        public QueueInfo getQueueInfo( 
            boolean includeChildQueues, boolean recursive) {
          QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
          queueInfo.setQueueName(DEFAULT_QUEUE.getQueueName());
          queueInfo.setCapacity(1.0f);
          if (clusterResource.getMemory() == 0) {
            queueInfo.setCurrentCapacity(0.0f);
          } else {
            queueInfo.setCurrentCapacity((float) usedResource.getMemory()
                / clusterResource.getMemory());
          }
          queueInfo.setMaximumCapacity(1.0f);
          queueInfo.setChildQueues(new ArrayList<QueueInfo>());
          queueInfo.setQueueState(QueueState.RUNNING);
          return queueInfo;
        }
    
        public Map<QueueACL, AccessControlList> getQueueAcls() {
          Map<QueueACL, AccessControlList> acls =
            new HashMap<QueueACL, AccessControlList>();
          for (QueueACL acl : QueueACL.values()) {
            acls.put(acl, new AccessControlList("*"));
          }
          return acls;
        }
    
        @Override
        public List<QueueUserACLInfo> getQueueUserAclInfo(
            UserGroupInformation unused) {
          QueueUserACLInfo queueUserAclInfo = 
            recordFactory.newRecordInstance(QueueUserACLInfo.class);
          queueUserAclInfo.setQueueName(DEFAULT_QUEUE_NAME);
          queueUserAclInfo.setUserAcls(Arrays.asList(QueueACL.values()));
          return Collections.singletonList(queueUserAclInfo);
        }
    
        @Override
        public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
          return getQueueAcls().get(acl).isUserAllowed(user);
        }
        
        @Override
        public ActiveUsersManager getActiveUsersManager() {
          return activeUsersManager;
        }
    
        @Override
        public void recoverContainer(Resource clusterResource,
            SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
          if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
            return;
          }
          increaseUsedResources(rmContainer);
          updateAppHeadRoom(schedulerAttempt);
          updateAvailableResourcesMetrics();
        }
    
        @Override
        public Set<String> getAccessibleNodeLabels() {
          // TODO add implementation for FIFO scheduler
          return null;
        }
    
        @Override
        public String getDefaultNodeLabelExpression() {
          // TODO add implementation for FIFO scheduler
          return null;
        }
      };
    
      public FifoScheduler() {
        super(FifoScheduler.class.getName());
      }
    
      // 初始化调度器
      private synchronized void initScheduler(Configuration conf) {
        // 验证配置信息
        validateConf(conf);
        //Use ConcurrentSkipListMap because applications need to be ordered
        // 使用ConcurrentSkipListMap,因为应用程序需要有序
        // 该applications在它的父类抽象类AbstractYarnScheduler中声明。
        this.applications =
            new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
        // createResource()函数表示创建资源。 getInt()函数表示获取<code> name </code>属性的值作为<code> int </code>。
        // 其中DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB = 1024
        this.minimumAllocation =
            Resources.createResource(conf.getInt(
                YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
                YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
        // 初始化最大资源容量。 
        // 其中 DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB = 8192
        // 其中 DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES = 4
        initMaximumResourceCapability(
            Resources.createResource(conf.getInt(
                YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
                YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB),
              conf.getInt(
                YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
                YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES)));
        // getBoolean()函数表示获取<code> name </code>属性的值作为<code>boolean</code>。
        // 其中 DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME = false
        this.usePortForNodeName = conf.getBoolean(
            YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
            YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
        //
        this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
            conf);
        // ActiveUsersManager 跟踪系统中的活动用户。
        this.activeUsersManager = new ActiveUsersManager(metrics);
      }
    
      @Override
      public void serviceInit(Configuration conf) throws Exception {
        //初始化调度器
        initScheduler(conf);
        super.serviceInit(conf);
      }
    
      @Override
      public void serviceStart() throws Exception {
        super.serviceStart();
      }
    
      @Override
      public void serviceStop() throws Exception {
        super.serviceStop();
      }
    
      @Override
      public synchronized void setConf(Configuration conf) {
        this.conf = conf;
      }
      
      //验证配置信息
      private void validateConf(Configuration conf) {
        // validate scheduler memory allocation setting
        // 验证调度器内存分配设置
        // 其中 DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB = 1024
        int minMem = conf.getInt(
          YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
          YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
        // 其中 DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB = 8192
        int maxMem = conf.getInt(
          YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
          YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
        
        if (minMem <= 0 || minMem > maxMem) {
          throw new YarnRuntimeException("Invalid resource scheduler memory"
            + " allocation configuration"
            + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
            + "=" + minMem
            + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB
            + "=" + maxMem + ", min and max should be greater than 0"
            + ", max should be no smaller than min.");
        }
      }
      
      @Override
      public synchronized Configuration getConf() {
        return conf;
      }
    
      @Override
      public int getNumClusterNodes() {
        // Map类的size()函数表示 返回此map中键值映射的数量。
        return nodes.size();
      }
    
      @Override
      public synchronized void setRMContext(RMContext rmContext) {
        this.rmContext = rmContext;
      }
    
      @Override
      public synchronized void
          reinitialize(Configuration conf, RMContext rmContext) throws IOException
      {
        setConf(conf);
      }
    
      // 
      @Override
      public Allocation allocate(
          ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
          List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
        FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
        if (application == null) {
          LOG.error("Calling allocate on removed " +
              "or non existant application " + applicationAttemptId);
          return EMPTY_ALLOCATION;
        }
    
        // Sanity check
        // 完整性检查
        SchedulerUtils.normalizeRequests(ask, resourceCalculator, 
            clusterResource, minimumAllocation, getMaximumResourceCapability());
    
        // Release containers
        // 释放容器
        releaseContainers(release, application);
    
        synchronized (application) {
    
          // make sure we aren't stopping/removing the application
          // when the allocate comes in
          // 确保在分配进来时我们不会停止/删除应用程序
          if (application.isStopped()) {
            LOG.info("Calling allocate on a stopped " +
                "application " + applicationAttemptId);
            return EMPTY_ALLOCATION;
          }
    
          if (!ask.isEmpty()) {
            LOG.debug("allocate: pre-update" +
                " applicationId=" + applicationAttemptId + 
                " application=" + application);
            // 
            application.showRequests();
    
            // Update application requests
            // 更新应用程序请求
            application.updateResourceRequests(ask);
    
            LOG.debug("allocate: post-update" +
                " applicationId=" + applicationAttemptId + 
                " application=" + application);
            application.showRequests();
    
            LOG.debug("allocate:" +
                " applicationId=" + applicationAttemptId + 
                " #ask=" + ask.size());
          }
          
          // 更新黑名单列表
          application.updateBlacklist(blacklistAdditions, blacklistRemovals);
          // 创建容器令牌和NMToken,如果其中任何一个由于某些原因(如DNS不可用)而失败,
          // 请不要返回此容器并将其保留在等待重新引导的newlyAllocatedContainers中。
          ContainersAndNMTokensAllocation allocation =
              application.pullNewlyAllocatedContainersAndNMTokens();
          // 在应用程序的用户的资源方面获得可用的余量。
          Resource headroom = application.getHeadroom();
          // 
          application.setApplicationHeadroomForMetrics(headroom);
          // Allocation类
          return new Allocation(allocation.getContainerList(), headroom, null,
              null, null, allocation.getNMTokenList());
        }
      }
    
      private FiCaSchedulerNode getNode(NodeId nodeId) {
        return nodes.get(nodeId);
      }
    
      @VisibleForTesting
      public synchronized void addApplication(ApplicationId applicationId,
          String queue, String user, boolean isAppRecovering) {
        // 
        SchedulerApplication<FiCaSchedulerApp> application =
            new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user);
        // Map类函数put()表示将指定的值与该map中的指定键相关联(可选操作)。
        applications.put(applicationId, application);
        // 
        metrics.submitApp(user);
        LOG.info("Accepted application " + applicationId + " from user: " + user
            + ", currently num of applications: " + applications.size());
        if (isAppRecovering) {
          if (LOG.isDebugEnabled()) {
            LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
          }
        } else {
          rmContext.getDispatcher().getEventHandler()
            .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
        }
      }
    
      @VisibleForTesting
      public synchronized void
          addApplicationAttempt(ApplicationAttemptId appAttemptId,
              boolean transferStateFromPreviousAttempt,
              boolean isAttemptRecovering) {
        // Map类函数get()表示返回指定键映射到的值,如果此映射不包含该键的映射,则返回{@code null}。
        SchedulerApplication<FiCaSchedulerApp> application =
            applications.get(appAttemptId.getApplicationId());
        String user = application.getUser();
        // TODO: Fix store
        // 创建FiCaSchedulerApp类对象, 表示从FIFO或容量调度器的角度出发的应用程序尝试。
        FiCaSchedulerApp schedulerApp =
            new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE,
              activeUsersManager, this.rmContext);
    
        if (transferStateFromPreviousAttempt) {
          schedulerApp.transferStateFromPreviousAttempt(application
            .getCurrentAppAttempt());
        }
        application.setCurrentAppAttempt(schedulerApp);
    
        metrics.submitAppAttempt(user);
        LOG.info("Added Application Attempt " + appAttemptId
            + " to scheduler from user " + application.getUser());
        if (isAttemptRecovering) {
          if (LOG.isDebugEnabled()) {
            LOG.debug(appAttemptId
                + " is recovering. Skipping notifying ATTEMPT_ADDED");
          }
        } else {
          rmContext.getDispatcher().getEventHandler().handle(
            new RMAppAttemptEvent(appAttemptId,
                RMAppAttemptEventType.ATTEMPT_ADDED));
        }
      }
    
      private synchronized void doneApplication(ApplicationId applicationId,
          RMAppState finalState) {
        SchedulerApplication<FiCaSchedulerApp> application =
            applications.get(applicationId);
        if (application == null){
          LOG.warn("Couldn't find application " + applicationId);
          return;
        }
    
        // Inform the activeUsersManager
        // 通知activeUsersManager
        activeUsersManager.deactivateApplication(application.getUser(),
          applicationId);
        application.stop(finalState);
        // Map类函数remove()表示 如果存在(从可选的操作),从该map中删除一个键的映射。
        applications.remove(applicationId);
      }
    
      private synchronized void doneApplicationAttempt(
          ApplicationAttemptId applicationAttemptId,
          RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers)
          throws IOException {
        FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
        SchedulerApplication<FiCaSchedulerApp> application =
            applications.get(applicationAttemptId.getApplicationId());
        if (application == null || attempt == null) {
          throw new IOException("Unknown application " + applicationAttemptId + 
          " has completed!");
        }
    
        // Kill all 'live' containers
        // 杀死所有的“活”容器
        for (RMContainer container : attempt.getLiveContainers()) {
          if (keepContainers
              && container.getState().equals(RMContainerState.RUNNING)) {
            // do not kill the running container in the case of work-preserving AM
            // restart.
            // 在维护AM重新启动的情况下,不要杀死正在运行的容器。
            LOG.info("Skip killing " + container.getContainerId());
            continue;
          }
          // createAbnormalContainerStatus()表示在特殊情况下创建{@link ContainerStatus}的实用程序。
          completedContainer(container,
            SchedulerUtils.createAbnormalContainerStatus(
              container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
            RMContainerEventType.KILL);
        }
    
        // Clean up pending requests, metrics etc.
        // 清理待处理的请求,指标等
        attempt.stop(rmAppAttemptFinalState);
      }
      
      /**
       * Heart of the scheduler...
       * 
       * @param node node on which resources are available to be allocated
       */
      // 调度器的核心...  
      // 分配容器, 参数node表示资源可用于分配的节点
      private void assignContainers(FiCaSchedulerNode node) {
        LOG.debug("assignContainers:" +
            " node=" + node.getRMNode().getNodeAddress() + 
            " #applications=" + applications.size());
    
        // Try to assign containers to applications in fifo order
        // 尝试以fifo顺序将容器分配给应用程序
        for (Map.Entry<ApplicationId, SchedulerApplication<FiCaSchedulerApp>> e : applications
            .entrySet()) {
          FiCaSchedulerApp application = e.getValue().getCurrentAppAttempt();
          if (application == null) {
            continue;
          }
    
          LOG.debug("pre-assignContainers");
          application.showRequests();
          synchronized (application) {
            // Check if this resource is on the blacklist
            // 检查这个资源是否在黑名单上
            if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
              continue;
            }
            
            for (Priority priority : application.getPriorities()) {
              // 获取最大可分配容器
              int maxContainers = 
                getMaxAllocatableContainers(application, priority, node, 
                    NodeType.OFF_SWITCH); 
              // Ensure the application needs containers of this priority
              // 确保应用程序需要这个优先级的容器
              if (maxContainers > 0) {
                // 在节点上分配容器
                int assignedContainers = 
                  assignContainersOnNode(node, application, priority);
                // Do not assign out of order w.r.t priorities
                // 分配不要违反w.r.t优先级
                if (assignedContainers == 0) {
                  break;
                }
              }
            }
          }
          
          LOG.debug("post-assignContainers");
          application.showRequests();
    
          // Done
          //
          if (Resources.lessThan(resourceCalculator, clusterResource,
                  node.getAvailableResource(), minimumAllocation)) {
            break;
          }
        }
    
        // Update the applications' headroom to correctly take into
        // account the containers assigned in this update.
        // 更新应用程序的余量,以正确地考虑在此更新中分配的容器。
        for (SchedulerApplication<FiCaSchedulerApp> application : applications.values()) {
          // 
          FiCaSchedulerApp attempt =
              (FiCaSchedulerApp) application.getCurrentAppAttempt();
          if (attempt == null) {
            continue;
          }
          // 
          updateAppHeadRoom(attempt);
        }
      }
    
      private int getMaxAllocatableContainers(FiCaSchedulerApp application,
          Priority priority, FiCaSchedulerNode node, NodeType type) {
        int maxContainers = 0;
        
        ResourceRequest offSwitchRequest = 
          application.getResourceRequest(priority, ResourceRequest.ANY);
        if (offSwitchRequest != null) {
          maxContainers = offSwitchRequest.getNumContainers();
        }
    
        // 资源分类。 NODE_LOCAL(0) 表示同一节点, RACK_LOCAL(1) 同一机架上不同节点, OFF_SWITCH(2)不同机架
        if (type == NodeType.OFF_SWITCH) {
          return maxContainers;
        }
    
        if (type == NodeType.RACK_LOCAL) {
          // getResourceRequest()获取资源请求。 getRMNode()返回RMNode类对象, getRackName()函数此节点管理器的机架名称。
          ResourceRequest rackLocalRequest = 
            application.getResourceRequest(priority, node.getRMNode().getRackName());
          if (rackLocalRequest == null) {
            return maxContainers;
          }
    
          maxContainers = Math.min(maxContainers, rackLocalRequest.getNumContainers());
        }
    
        if (type == NodeType.NODE_LOCAL) {
          // getResourceRequest()获取资源请求。 getNodeAddress()该节点的ContainerManager地址。
          ResourceRequest nodeLocalRequest = 
            application.getResourceRequest(priority, node.getRMNode().getNodeAddress());
          if (nodeLocalRequest != null) {
            // getNumContainers()获取所需规格的容器数量
            maxContainers = Math.min(maxContainers, nodeLocalRequest.getNumContainers());
          }
        }
    
        return maxContainers;
      }
    
      // 在节点上分配容器
      private int assignContainersOnNode(FiCaSchedulerNode node, 
          FiCaSchedulerApp application, Priority priority 
      ) {
        // Data-local // 数据本地
        int nodeLocalContainers = 
          assignNodeLocalContainers(node, application, priority); 
    
        // Rack-local // 机架本地
        int rackLocalContainers = 
          assignRackLocalContainers(node, application, priority);
    
        // Off-switch // 非同一机架
        int offSwitchContainers =
          assignOffSwitchContainers(node, application, priority);
    
        
        LOG.debug("assignContainersOnNode:" +
            " node=" + node.getRMNode().getNodeAddress() + 
            " application=" + application.getApplicationId().getId() +
            " priority=" + priority.getPriority() + 
            " #assigned=" + 
            (nodeLocalContainers + rackLocalContainers + offSwitchContainers));
    
    
        return (nodeLocalContainers + rackLocalContainers + offSwitchContainers);
      }
    
      // 分配节点本地容器
      private int assignNodeLocalContainers(FiCaSchedulerNode node, 
          FiCaSchedulerApp application, Priority priority) {
        int assignedContainers = 0;
        // getNodeName()获取节点的名称以调度匹配决策。
        ResourceRequest request = 
          application.getResourceRequest(priority, node.getNodeName());
        if (request != null) {
          // Don't allocate on this node if we don't need containers on this rack
          // 如果我们不需要在此机架上的容器,则不要在此节点上分配
          // getRackName()此节点管理器的机架名称。
          ResourceRequest rackRequest =
              application.getResourceRequest(priority, 
                  node.getRMNode().getRackName()); 
          // getNumContainers()获取所需规格的容器数量。
          if (rackRequest == null || rackRequest.getNumContainers() <= 0) {
            return 0;
          }
          
          int assignableContainers = 
            Math.min(
                getMaxAllocatableContainers(application, priority, node, 
                    NodeType.NODE_LOCAL), 
                    request.getNumContainers());
          // 分配容器
          assignedContainers = 
            assignContainer(node, application, priority, 
                assignableContainers, request, NodeType.NODE_LOCAL);
        }
        return assignedContainers;
      }
    
      // 分配机架本地容器
      private int assignRackLocalContainers(FiCaSchedulerNode node, 
          FiCaSchedulerApp application, Priority priority) {
        int assignedContainers = 0;
        ResourceRequest request = 
          application.getResourceRequest(priority, node.getRMNode().getRackName());
        if (request != null) {
          // Don't allocate on this rack if the application doens't need containers
          // 如果应用程序不需要容器,请不要在此机架上分配
          ResourceRequest offSwitchRequest =
              application.getResourceRequest(priority, ResourceRequest.ANY);
          if (offSwitchRequest.getNumContainers() <= 0) {
            return 0;
          }
          
          int assignableContainers = 
            Math.min(
                getMaxAllocatableContainers(application, priority, node, 
                    NodeType.RACK_LOCAL), 
                    request.getNumContainers());
          // 分配容器
          assignedContainers = 
            assignContainer(node, application, priority, 
                assignableContainers, request, NodeType.RACK_LOCAL);
        }
        return assignedContainers;
      }
    
      // 分配容器跨机架
      private int assignOffSwitchContainers(FiCaSchedulerNode node, 
          FiCaSchedulerApp application, Priority priority) {
        int assignedContainers = 0;
        ResourceRequest request = 
          application.getResourceRequest(priority, ResourceRequest.ANY);
        if (request != null) {
          assignedContainers = 
            assignContainer(node, application, priority, 
                request.getNumContainers(), request, NodeType.OFF_SWITCH);
        }
        return assignedContainers;
      }
    
      // 分配容器
      private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application, 
          Priority priority, int assignableContainers, 
          ResourceRequest request, NodeType type) {
        LOG.debug("assignContainers:" +
            " node=" + node.getRMNode().getNodeAddress() + 
            " application=" + application.getApplicationId().getId() + 
            " priority=" + priority.getPriority() + 
            " assignableContainers=" + assignableContainers +
            " request=" + request + " type=" + type);
        // 获取请求的<code>Resource</code>容量。
        Resource capability = request.getCapability();
    
        int availableContainers = 
          node.getAvailableResource().getMemory() / capability.getMemory(); // TODO: A buggy
                                                                            // application
                                                                            // with this
                                                                            // zero would
                                                                            // crash the
                                                                            // scheduler.
        int assignedContainers = 
          Math.min(assignableContainers, availableContainers);
    
        if (assignedContainers > 0) {
          for (int i=0; i < assignedContainers; ++i) {
    
            // getNodeID()该节点的节点ID。
            NodeId nodeId = node.getRMNode().getNodeID();
            ContainerId containerId = BuilderUtils.newContainerId(application
                .getApplicationAttemptId(), application.getNewContainerId());
    
            // Create the container
            // 创建容器
            Container container =
                BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
                  .getHttpAddress(), capability, priority, null);
            
            // Allocate!
            // 分配!
            
            // Inform the application
            // 通知应用程序
            RMContainer rmContainer =
                application.allocate(type, node, priority, request, container);
            
            // Inform the node
            // 通知节点
            node.allocateContainer(rmContainer);
    
            // Update usage for this container
            // 更新此容器的使用
            increaseUsedResources(rmContainer);
          }
    
        }
        
        return assignedContainers;
      }
    
      private synchronized void nodeUpdate(RMNode rmNode) {
        FiCaSchedulerNode node = getNode(rmNode.getNodeID());
        
        // 获取并清除在NM心跳中累积的containerUpdates列表。
        List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
        List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
        List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
        for(UpdatedContainerInfo containerInfo : containerInfoList) {
          //
          newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
          //
          completedContainers.addAll(containerInfo.getCompletedContainers());
        }
        // Processing the newly launched containers
        // 处理新发起的容器
        for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
          containerLaunchedOnNode(launchedContainer.getContainerId(), node);
        }
    
        // Process completed containers
        // 处理完成的容器
        for (ContainerStatus completedContainer : completedContainers) {
          ContainerId containerId = completedContainer.getContainerId();
          LOG.debug("Container FINISHED: " + containerId);
          completedContainer(getRMContainer(containerId), 
              completedContainer, RMContainerEventType.FINISHED);
        }
    
    
        if (rmContext.isWorkPreservingRecoveryEnabled()
            && !rmContext.isSchedulerReadyForAllocatingContainers()) {
          return;
        }
    
        if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
                node.getAvailableResource(),minimumAllocation)) {
          LOG.debug("Node heartbeat " + rmNode.getNodeID() + 
              " available resource = " + node.getAvailableResource());
    
          // 
          assignContainers(node);
    
          LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
              + node.getAvailableResource());
        }
    
        updateAvailableResourcesMetrics();
      }
    
      // 增加使用的资源, 
      private void increaseUsedResources(RMContainer rmContainer) {
        // addTo()把后面的资源添加到前面
        Resources.addTo(usedResource, rmContainer.getAllocatedResource());
      }
    
      private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) {
        // 
        schedulerAttempt.setHeadroom(Resources.subtract(clusterResource,
          usedResource));
      }
    
      private void updateAvailableResourcesMetrics() {
        // 设置可用资源。 资源变得可用时由调度器定期调用。
        metrics.setAvailableResourcesToQueue(Resources.subtract(clusterResource,
          usedResource));
      }
    
      @Override
      public void handle(SchedulerEvent event) {
        switch(event.getType()) {
        case NODE_ADDED:
        {
          NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
          addNode(nodeAddedEvent.getAddedRMNode());
          recoverContainersOnNode(nodeAddedEvent.getContainerReports(),
            nodeAddedEvent.getAddedRMNode());
    
        }
        break;
        case NODE_REMOVED:
        {
          NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
          removeNode(nodeRemovedEvent.getRemovedRMNode());
        }
        break;
        case NODE_RESOURCE_UPDATE:
        {
          NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = 
              (NodeResourceUpdateSchedulerEvent)event;
          updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
            nodeResourceUpdatedEvent.getResourceOption());
        }
        break;
        case NODE_UPDATE:
        {
          NodeUpdateSchedulerEvent nodeUpdatedEvent = 
          (NodeUpdateSchedulerEvent)event;
          nodeUpdate(nodeUpdatedEvent.getRMNode());
        }
        break;
        case APP_ADDED:
        {
          AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
          addApplication(appAddedEvent.getApplicationId(),
            appAddedEvent.getQueue(), appAddedEvent.getUser(),
            appAddedEvent.getIsAppRecovering());
        }
        break;
        case APP_REMOVED:
        {
          AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
          doneApplication(appRemovedEvent.getApplicationID(),
            appRemovedEvent.getFinalState());
        }
        break;
        case APP_ATTEMPT_ADDED:
        {
          AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
              (AppAttemptAddedSchedulerEvent) event;
          addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
            appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
            appAttemptAddedEvent.getIsAttemptRecovering());
        }
        break;
        case APP_ATTEMPT_REMOVED:
        {
          AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
              (AppAttemptRemovedSchedulerEvent) event;
          try {
            doneApplicationAttempt(
              appAttemptRemovedEvent.getApplicationAttemptID(),
              appAttemptRemovedEvent.getFinalAttemptState(),
              appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
          } catch(IOException ie) {
            LOG.error("Unable to remove application "
                + appAttemptRemovedEvent.getApplicationAttemptID(), ie);
          }
        }
        break;
        case CONTAINER_EXPIRED:
        {
          ContainerExpiredSchedulerEvent containerExpiredEvent = 
              (ContainerExpiredSchedulerEvent) event;
          ContainerId containerid = containerExpiredEvent.getContainerId();
          completedContainer(getRMContainer(containerid), 
              SchedulerUtils.createAbnormalContainerStatus(
                  containerid, 
                  SchedulerUtils.EXPIRED_CONTAINER),
              RMContainerEventType.EXPIRE);
        }
        break;
        case CONTAINER_RESCHEDULED:
        {
          ContainerRescheduledEvent containerRescheduledEvent =
              (ContainerRescheduledEvent) event;
          RMContainer container = containerRescheduledEvent.getContainer();
          recoverResourceRequestForContainer(container);
        }
        break;
        default:
          LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
        }
      }
    
      // 清理完成的容器
      @Lock(FifoScheduler.class)
      @Override
      protected synchronized void completedContainer(RMContainer rmContainer,
          ContainerStatus containerStatus, RMContainerEventType event) {
        if (rmContainer == null) {
          LOG.info("Null container completed...");
          return;
        }
    
        // Get the application for the finished container
        // 获取完成了的容器的应用程序
        Container container = rmContainer.getContainer();
        // 根据容器Id获取当前应用程序的尝试
        FiCaSchedulerApp application =
            getCurrentAttemptForContainer(container.getId());
        ApplicationId appId =
            container.getId().getApplicationAttemptId().getApplicationId();
        
        // Get the node on which the container was allocated
        // 获取分配容器的节点
        FiCaSchedulerNode node = getNode(container.getNodeId());
        
        if (application == null) {
          LOG.info("Unknown application: " + appId + 
              " released container " + container.getId() +
              " on node: " + node + 
              " with event: " + event);
          return;
        }
    
        // Inform the application
        // 通知应用程序
        application.containerCompleted(rmContainer, containerStatus, event);
    
        // Inform the node
        // 通知节点  在此节点上释放分配的容器。
        node.releaseContainer(container);
        
        // Update total usage
        // 更新总的使用情况
        Resources.subtractFrom(usedResource, container.getResource());
    
        LOG.info("Application attempt " + application.getApplicationAttemptId() + 
            " released container " + container.getId() +
            " on node: " + node + 
            " with event: " + event);
         
      }
      
      private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
    
      // 移除节点
      private synchronized void removeNode(RMNode nodeInfo) {
        FiCaSchedulerNode node = getNode(nodeInfo.getNodeID());
        if (node == null) {
          return;
        }
        // Kill running containers
        // 杀死正在运行的容器
        for(RMContainer container : node.getRunningContainers()) {
          completedContainer(container, 
              SchedulerUtils.createAbnormalContainerStatus(
                  container.getContainerId(), 
                  SchedulerUtils.LOST_CONTAINER),
                  RMContainerEventType.KILL);
        }
        
        //Remove the node
        // 移除节点
        this.nodes.remove(nodeInfo.getNodeID());
        updateMaximumAllocation(node, false);
        
        // Update cluster metrics
        // 
        Resources.subtractFrom(clusterResource, node.getTotalResource());
      }
    
      @Override
      public QueueInfo getQueueInfo(String queueName,
          boolean includeChildQueues, boolean recursive) {
        return DEFAULT_QUEUE.getQueueInfo(false, false);
      }
    
      @Override
      public List<QueueUserACLInfo> getQueueUserAclInfo() {
        return DEFAULT_QUEUE.getQueueUserAclInfo(null); 
      }
    
      @Override
      public ResourceCalculator getResourceCalculator() {
        return resourceCalculator;
      }
    
      // 添加节点
      private synchronized void addNode(RMNode nodeManager) {
        FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
            usePortForNodeName);
        this.nodes.put(nodeManager.getNodeID(), schedulerNode);
        Resources.addTo(clusterResource, schedulerNode.getTotalResource());
        updateMaximumAllocation(schedulerNode, true);
      }
    
      @Override
      public void recover(RMState state) {
        // NOT IMPLEMENTED
      }
    
      @Override
      public RMContainer getRMContainer(ContainerId containerId) {
        FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
        return (attempt == null) ? null : attempt.getRMContainer(containerId);
      }
    
      @Override
      public QueueMetrics getRootQueueMetrics() {
        return DEFAULT_QUEUE.getMetrics();
      }
    
      // 检查用户是否有权执行操作。
      @Override
      public synchronized boolean checkAccess(UserGroupInformation callerUGI,
          QueueACL acl, String queueName) {
        return DEFAULT_QUEUE.hasAccess(acl, callerUGI);
      }
    
      @Override
      public synchronized List<ApplicationAttemptId>
          getAppsInQueue(String queueName) {
        if (queueName.equals(DEFAULT_QUEUE.getQueueName())) {
          List<ApplicationAttemptId> attempts =
              new ArrayList<ApplicationAttemptId>(applications.size());
          for (SchedulerApplication<FiCaSchedulerApp> app : applications.values()) {
            attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId());
          }
          return attempts;
        } else {
          return null;
        }
      }
    
      public Resource getUsedResource() {
        return usedResource;
      }
    }

    Fifo队列内部的调用过程是handle()函数内部NODE_UPDATE事件触发nodeUpdate(...)函数; 该函数内部调用assignContainers(...); 该函数内部调用assignContainersOnNode(...); 该函数顺序执行assignNodeLocalContainers(...), assignRackLocalContainers(...) 以及 assignOffSwitchContainers(...); 其中这三个函数内部都会顺序执行 getResourceRequest(...), getMaxAllocatableContainers(...), assignContainer(...)

    (2)  Capacity调度器

      对应的类是 CapacityScheduler.java ,在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java 。

    (3)  Fair调度器

      对应的类是 FairScheduler.java, 在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java 。

      这三个调度器类都

    2  JobTracker中的资源管理和作业控制功能分开,分别由组件ResourceManager和ApplicationMaster实现。

    (1)  ResourceManager 负责所有应用程序的资源分配

      对应的类是 ResourceManager.java 在 

    (2)  ApplicationMaster 仅负责管理一个应用程序

      对应的类是 ApplicationMaster.java 在 

    另外, NodeManager

    3  编写Hadoop调度器  参考如何编写Hadoop调度器   以及 深入Hadoop的调度器  

      假设我们要编写一个新的调度器,为MyHadoopScheduler,需要进行以下工作:

      (1) 用户需要自己实现的类

      

      (2) 用户要用到的系统类

  • 相关阅读:
    jquery学习笔记1
    javascript常用函数(1):jquery操作select 基本操作
    Angular.js学习笔记
    Mutex, semaphore, spinlock
    Linq 常用方法解释
    装B必备之 快捷键配置
    HttpWebRequest
    HttpClient get post
    js获取url 参数
    整洁代码1
  • 原文地址:https://www.cnblogs.com/zhangchao0515/p/6955126.html
Copyright © 2011-2022 走看看