zoukankan      html  css  js  c++  java
  • YARN分析系列之三 -- 从脚本入口分析 ResourceManager的初始化过程

    由脚本找到 RM 主类

      这部分,我们从脚本作为入口去逐步深入ResourceManager源码。

      从 Hadoop 官方文档 中可以看到 ResourceManager 的启动命令为:

      Usage: yarn resourcemanager [-format-state-store]

    COMMAND_OPTIONSDescription
    -format-state-store Formats the RMStateStore. This will clear the RMStateStore and is useful if past applications are no longer needed. This should be run only when the ResourceManager is not running.
    -remove-application-from-state-store <appId> Remove the application from RMStateStore. This should be run only when the ResourceManager is not running.

    定位到 源代码 hadoop-yarn-project > hadoop-yarn > bin > start-yarn.sh

    # start resourceManager
    HARM=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -confKey yarn.resourcemanager.ha.enabled 2>&-) # 查看配置,是否启用 ResourceManager 的 HA 机制
    # 未启用 ResourceManager 的 HA 机制
    if [[ ${HARM} = "false" ]]; then echo "Starting resourcemanager" hadoop_uservar_su yarn resourcemanager "${HADOOP_YARN_HOME}/bin/yarn" --config "${HADOOP_CONF_DIR}" --daemon start resourcemanager (( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? )) else # 启用ResourceManager的 HA 机制 logicals=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -confKey yarn.resourcemanager.ha.rm-ids 2>&-) # yarn.resoucemanager.ha.rm-ids 表示 RM 的逻辑Ids,多个按逗号分割 logicals=${logicals//,/ } # 按逗号分割成多个 RM id for id in ${logicals} do rmhost=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -confKey "yarn.resourcemanager.hostname.${id}" 2>&-) RMHOSTS="${RMHOSTS} ${rmhost}" # 最终,RMHOSTS 变量会是由空格分割的 hostname 字符串 done echo "Starting resourcemanagers on [${RMHOSTS}]" hadoop_uservar_su yarn resourcemanager "${HADOOP_YARN_HOME}/bin/yarn" # 运行 yarn 命令 --config "${HADOOP_CONF_DIR}" --daemon start --workers --hostnames "${RMHOSTS}" resourcemanager (( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? )) # 累加上一个命令的返回值 fi


    首先解释 shell 分割字符串的语法:

    $ aa='1,2,3';for i in ${aa//,/ }; do echo $i; done;
    1
    2
    3

    参照 官方的配置sample 会比较容易理解,下面已经启用了HA,并且 RM ids 有 rm1,rm2, 其中rm1 的hostname 是 master1, rm2 的 hostname 是 master2,:

    <property>
       <name>yarn.resourcemanager.ha.enabled</name>
       <value>true</value>
     </property>
     <property>
       <name>yarn.resourcemanager.cluster-id</name>
       <value>cluster1</value>
     </property>
     <property>
       <name>yarn.resourcemanager.ha.rm-ids</name>
       <value>rm1,rm2</value>
     </property>
     <property>
       <name>yarn.resourcemanager.hostname.rm1</name>
       <value>master1</value>
     </property>
     <property>
       <name>yarn.resourcemanager.hostname.rm2</name>
       <value>master2</value>
     </property>
     <property>
       <name>yarn.resourcemanager.zk-address</name>
       <value>zk1:2181,zk2:2181,zk3:2181</value>
     </property>

    然后再结合 yarn 脚本,可以得出,resourcemanager 的 入口类是 org.apache.hadoop.yarn.server.resourcemanager.ResourceManager,参数为 --config "${HADOOP_CONF_DIR}" --daemon start --workers --hostnames "${RMHOSTS}" 以及经由 shell函数 传递的参数值(不做具体分析)

    分析 RM 服务初始化过程

    分析ResouceManager 类继承关系

    接下来,终于到了入口类 org.apache.hadoop.yarn.server.resourcemanager.ResourceManager, 该类在 hadoop-yarn-server-resourcemanager 的子 mudule 下。

    先来看 RM 对象的 声明, 继承了 CompositeService 服务类,说明 RM 是一个组件服务,实现了ResourceManagerMXBean接口,可以交给 JMX 管理:

    public class ResourceManager extends CompositeService
            implements Recoverable, ResourceManagerMXBean

    分析 ResourceManager 的入口函数

    然后,找到 Main 函数:

    public static void main(String argv[]) {
        Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
        StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
        try {
          Configuration conf = new YarnConfiguration();
          GenericOptionsParser hParser = new GenericOptionsParser(conf, argv); # 解析参数
          argv = hParser.getRemainingArgs();  # --参数名 参数值之外的剩余以"-"开头的参数,第一次,没有指定剩余参数
          // If -format-state-store, then delete RMStateStore; else startup normally
          if (argv.length >= 1) {
            if (argv[0].equals("-format-state-store")) {
              deleteRMStateStore(conf);
            } else if (argv[0].equals("-remove-application-from-state-store")
                && argv.length == 2) {
              removeApplication(conf, argv[1]);
            } else {
              printUsage(System.err);
            }
          } else {
            ResourceManager resourceManager = new ResourceManager(); 
        // 初始化RM对象实例,在超类中初始化服务名称为 “ResouceManager” ,并实例化了状态模型成员字段 stateModel,初始化状态为 Service.State.NOTINITED ,后面详细介绍 ShutdownHookManager.get().addShutdownHook( // 添加服务组件关闭的回调函数
    new CompositeServiceShutdownHook(resourceManager), SHUTDOWN_HOOK_PRIORITY); resourceManager.init(conf); // 初始化 RM 服务 resourceManager.start(); // 启动 RM 服务 } } catch (Throwable t) { LOG.fatal("Error starting ResourceManager", t); System.exit(-1); } }

    分析 ResourceManager的 初始化过程

    @Override // 定义在其父类 AbstractService 中
      public void init(Configuration conf) {
        if (conf == null) {
          throw new ServiceStateException("Cannot initialize service "
                                          + getName() + ": null configuration");
        }
        if (isInState(STATE.INITED)) {
          return;
        }
        synchronized (stateChangeLock) {
          if (enterState(STATE.INITED) != STATE.INITED) { // 服务没有没有被初始化过
            setConfig(conf); // 设值 conf 对象
            try {
              serviceInit(config); // 初始化服务
              if (isInState(STATE.INITED)) { // 如果服务正确初始化
                //if the service ended up here during init,
                //notify the listeners
                notifyListeners(); // 通知 listener
              }
            } catch (Exception e) {
              noteFailure(e);
              ServiceOperations.stopQuietly(LOG, this);
              throw ServiceStateException.convert(e);
            }
          }
        }
      }

    serviceInit 方法在 ResouceManager 类中有实现:

    @Override
      protected void serviceInit(Configuration conf) throws Exception {
        this.conf = conf;
        // 1. 初始化服务上下文
        // RMContextImpl 保存了两类服务的上下文
        // 一类是 serviceContext : 这类服务是 Always On 服务,即不考虑HA状态的一直运行的服务
        // 一类是 activeServiceCotext : 活动的服务上下文,即需要运行在Active RM 节点上的服务
        this.rmContext = new RMContextImpl();
        rmContext.setResourceManager(this);
    
        // 2. 设置配置的provider
        this.configurationProvider =
            ConfigurationProviderFactory.getConfigurationProvider(conf);
        this.configurationProvider.init(this.conf);
        rmContext.setConfigurationProvider(configurationProvider);
    
        // 3.加载 core-site.xml
        loadConfigurationXml(YarnConfiguration.CORE_SITE_CONFIGURATION_FILE);
    
        // Do refreshSuperUserGroupsConfiguration with loaded core-site.xml
        // Or use RM specific configurations to overwrite the common ones first
        // if they exist
        RMServerUtils.processRMProxyUsersConf(conf);
        ProxyUsers.refreshSuperUserGroupsConfiguration(this.conf);
    
        // 4. 加载 yarn-site.xml
        loadConfigurationXml(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
        // 5. 配置校验
        validateConfigs(this.conf);
    
        // 6. login
        // Set HA configuration should be done before login
        this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
        if (this.rmContext.isHAEnabled()) { // 如果RM 启用了 HA,设置 HA 的配置
          HAUtil.verifyAndSetConfiguration(this.conf);
        }
    
        // Set UGI and do login
        // If security is enabled, use login user
        // If security is not enabled, use current user
        // 如果是启用了 安全认证,比如 kerberos,使用kerberos 登陆用户,否则默认使用当前用户
        this.rmLoginUGI = UserGroupInformation.getCurrentUser();
        try {
          doSecureLogin();
        } catch(IOException ie) {
          throw new YarnRuntimeException("Failed to login", ie);
        }
    
        // register the handlers for all AlwaysOn services using setupDispatcher().
        // 7. 初始化所有的一直运行的服务的事件的handler
        rmDispatcher = setupDispatcher();
        addIfService(rmDispatcher);
        rmContext.setDispatcher(rmDispatcher);
    
        // The order of services below should not be changed as services will be
        // started in same order
        // As elector service needs admin service to be initialized and started,
        // first we add admin service then elector service
        // 8. 创建 AdminService
        adminService = createAdminService();
        addService(adminService);
        rmContext.setRMAdminService(adminService);
    
        // elector must be added post adminservice
        if (this.rmContext.isHAEnabled()) {
          // If the RM is configured to use an embedded leader elector,
          // initialize the leader elector.
          if (HAUtil.isAutomaticFailoverEnabled(conf)
              && HAUtil.isAutomaticFailoverEmbedded(conf)) {
            EmbeddedElector elector = createEmbeddedElector();
            addIfService(elector);
            rmContext.setLeaderElectorService(elector);
          }
        }
    
        // 9. 设置 Yarn Configuration
        rmContext.setYarnConfiguration(conf);
        // 10. 创建并初始化 Active Service
        createAndInitActiveServices(false);
    
        // 11. 获取 yarn wenApp地址
        webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
                          YarnConfiguration.RM_BIND_HOST,
                          WebAppUtils.getRMWebAppURLWithoutScheme(this.conf));
    
        // 12. 创建 RMApplicationHistoryWriter 服务
        RMApplicationHistoryWriter rmApplicationHistoryWriter =
            createRMApplicationHistoryWriter();
        addService(rmApplicationHistoryWriter);
        rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
    
        // initialize the RM timeline collector first so that the system metrics
        // publisher can bind to it
        // 13. 创建 RM timeline collector
        if (YarnConfiguration.timelineServiceV2Enabled(this.conf)) {
          RMTimelineCollectorManager timelineCollectorManager =
              createRMTimelineCollectorManager();
          addService(timelineCollectorManager);
          rmContext.setRMTimelineCollectorManager(timelineCollectorManager);
        }
    
        // 14. 设置 SystemMetricsPublisher
        SystemMetricsPublisher systemMetricsPublisher =
            createSystemMetricsPublisher();
        addIfService(systemMetricsPublisher);
        rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
    
        // 15. 注册 JMX
        registerMXBean();
        // 16. 调用父类的服务 init 方法
        super.serviceInit(this.conf);
      }

    下面逐一查看初始化的各个子步骤

    初始化服务上下文

    public RMContextImpl() {
        // 一直运行的服务上下文
        this.serviceContext = new RMServiceContext();
        // 只运行在 active RM 节点上的 上下文
        this.activeServiceContext = new RMActiveServiceContext();
    }

    设置配置的 provider 

    这里使用了工厂模式和配置提供了默认的ConfigurationProvider ,并且用户可以实现 ConfigurationProvider 自定义 provider。

    provider 其实在其他的源码中也经常用到。在这里,provider 提供了可以做一些内部的初始化以及返回 配置文件的 inputstream 流对象,关闭流对象等操作。对于处理解析配置的类来说,只需要一个输入流即可。

    // ConfigurationProviderFactory 是一个工厂类
    /**
       * Creates an instance of {@link ConfigurationProvider} using given
       * configuration.
       * @param bootstrapConf
       * @return configurationProvider
       */
      @SuppressWarnings("unchecked")
      public static ConfigurationProvider
          getConfigurationProvider(Configuration bootstrapConf) {
        Class<? extends ConfigurationProvider> defaultProviderClass;
        try {
          // 默认的 provider class 是org.apache.hadoop.yarn.LocalConfigurationProvider
          defaultProviderClass = (Class<? extends ConfigurationProvider>)
              Class.forName(
                  YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS);
        } catch (Exception e) {
          throw new YarnRuntimeException(
              "Invalid default configuration provider class"
                  + YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS, e);
        }
        ConfigurationProvider configurationProvider =
                // 从缓存池中获取到该类的 构造方法,然后根据构造方法反射得到 provider实例
                // 可以 通过 yarn.resourcemanager.configuration.provider-class 参数指定 provider
            ReflectionUtils.newInstance(bootstrapConf.getClass(
                YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
                defaultProviderClass, ConfigurationProvider.class),
                bootstrapConf);
        return configurationProvider;
      }

    加载 core-site.xml 文件

    private void loadConfigurationXml(String configurationFile)
          throws YarnException, IOException {
        InputStream configurationInputStream =
            this.configurationProvider.getConfigurationInputStream(this.conf,
                configurationFile);
        if (configurationInputStream != null) {
          this.conf.addResource(configurationInputStream, configurationFile);
        }
      }

    加载 yarn-site.xml

    跟加载 core-site.xml 文件操作类似

    校验配置文件

    主要校验 最大尝试次数 和 过期会话时长 和 心跳间隔的关系

    protected static void validateConfigs(Configuration conf) {
        // validate max-attempts
        int globalMaxAppAttempts =
            conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
            YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
        if (globalMaxAppAttempts <= 0) {
          throw new YarnRuntimeException("Invalid global max attempts configuration"
              + ", " + YarnConfiguration.RM_AM_MAX_ATTEMPTS
              + "=" + globalMaxAppAttempts + ", it should be a positive integer.");
        }
    
        // validate expireIntvl >= heartbeatIntvl
        long expireIntvl = conf.getLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
            YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
        long heartbeatIntvl =
            conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
                YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
        if (expireIntvl < heartbeatIntvl) {
          throw new YarnRuntimeException("Nodemanager expiry interval should be no"
              + " less than heartbeat interval, "
              + YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS + "=" + expireIntvl
              + ", " + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS + "="
              + heartbeatIntvl);
        }
      }

    用户登陆

    第一步:校验是否启用了HA, 如果启用了HA,需要配置HA 的相关信息,因为 用户登陆,是每个节点都需要登陆的。

    第二步:获取当前的用户, 如果启用了 kerberos,那么是当前登陆kerberos的用户,否则是当前用户

    @InterfaceAudience.Public
      @InterfaceStability.Evolving
      public static UserGroupInformation getCurrentUser() throws IOException {
        AccessControlContext context = AccessController.getContext();
        Subject subject = Subject.getSubject(context);
        if (subject == null || subject.getPrincipals(User.class).isEmpty()) {
          return getLoginUser();
        } else {
          return new UserGroupInformation(subject);
        }
      }

     第三步: 调用安全API登陆,并获取登陆用户

    protected void doSecureLogin() throws IOException {
        InetSocketAddress socAddr = getBindAddress(conf);
        SecurityUtil.login(this.conf, YarnConfiguration.RM_KEYTAB,
            YarnConfiguration.RM_PRINCIPAL, socAddr.getHostName());
    
        // if security is enable, set rmLoginUGI as UGI of loginUser
        if (UserGroupInformation.isSecurityEnabled()) {
          this.rmLoginUGI = UserGroupInformation.getLoginUser();
        }
      }

    初始化所有一直运行的服务事件的handler

    private Dispatcher setupDispatcher() {
        // 创建 dispatcher
        Dispatcher dispatcher = createDispatcher();
        // 将 RMFatalEventType 事件的handler RMFatalEventDispatcher 
        // 注册到 dispatcher
        dispatcher.register(RMFatalEventType.class,
            new ResourceManager.RMFatalEventDispatcher());
        return dispatcher;
    }
    
    protected Dispatcher createDispatcher() {
        return new AsyncDispatcher("RM Event dispatcher");
    }

      AsyncDispatcher 内部是 有一个 阻塞的 事件队列,有一个一直运行的 执行线程,当阻塞队列中有事件被放入,执行线程会把事件取出来,并获取事件的类型,从事件注册器Map<Class<? extends Enum>, EventHandler>中 获取到对应的 EventHandler 对象,并调用 该对象的 dispatch 方法。这样就完成了一次异步事件调用。

    创建 AdminService

    protected AdminService createAdminService() {
        return new AdminService(this);
    }

    设置 Yarn Configuration

    rmContext.setYarnConfiguration(conf); 
    // 调用了
    public void setYarnConfiguration(Configuration yarnConfiguration) {
        serviceContext.setYarnConfiguration(yarnConfiguration);
    }

    创建并初始化 Active Service

    protected void createAndInitActiveServices(boolean fromActive) {
        activeServices = new RMActiveServices(this);
        activeServices.fromActive = fromActive;
        activeServices.init(conf);
    }
    // 其中,init 方法如下
    @Override
      public void init(Configuration conf) {
        if (conf == null) {
          throw new ServiceStateException("Cannot initialize service "
                                          + getName() + ": null configuration");
        }
        if (isInState(STATE.INITED)) {
          return;
        }
        synchronized (stateChangeLock) {
          if (enterState(STATE.INITED) != STATE.INITED) {
            setConfig(conf);
            try {
              serviceInit(config);
              if (isInState(STATE.INITED)) {
                //if the service ended up here during init,
                //notify the listeners
                notifyListeners();
              }
            } catch (Exception e) {
              noteFailure(e);
              ServiceOperations.stopQuietly(LOG, this);
              throw ServiceStateException.convert(e);
            }
          }
        }
      }
    // 调用的 serviceInit 方法如下,后面具体分析
    
    @Override
        protected void serviceInit(Configuration configuration) throws Exception {
          standByTransitionRunnable = new StandByTransitionRunnable();
    
          rmSecretManagerService = createRMSecretManagerService();
          addService(rmSecretManagerService);
    
          containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher);
          addService(containerAllocationExpirer);
          rmContext.setContainerAllocationExpirer(containerAllocationExpirer);
    
          AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
          addService(amLivelinessMonitor);
          rmContext.setAMLivelinessMonitor(amLivelinessMonitor);
    
          AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
          addService(amFinishingMonitor);
          rmContext.setAMFinishingMonitor(amFinishingMonitor);
          
          RMAppLifetimeMonitor rmAppLifetimeMonitor = createRMAppLifetimeMonitor();
          addService(rmAppLifetimeMonitor);
          rmContext.setRMAppLifetimeMonitor(rmAppLifetimeMonitor);
    
          RMNodeLabelsManager nlm = createNodeLabelManager();
          nlm.setRMContext(rmContext);
          addService(nlm);
          rmContext.setNodeLabelManager(nlm);
    
          AllocationTagsManager allocationTagsManager =
              createAllocationTagsManager();
          rmContext.setAllocationTagsManager(allocationTagsManager);
    
          PlacementConstraintManagerService placementConstraintManager =
              createPlacementConstraintManager();
          addService(placementConstraintManager);
          rmContext.setPlacementConstraintManager(placementConstraintManager);
    
          // add resource profiles here because it's used by AbstractYarnScheduler
          ResourceProfilesManager resourceProfilesManager =
              createResourceProfileManager();
          resourceProfilesManager.init(conf);
          rmContext.setResourceProfilesManager(resourceProfilesManager);
    
          RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater =
              createRMDelegatedNodeLabelsUpdater();
          if (delegatedNodeLabelsUpdater != null) {
            addService(delegatedNodeLabelsUpdater);
            rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater);
          }
    
          recoveryEnabled = conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED,
              YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
    
          RMStateStore rmStore = null;
          if (recoveryEnabled) {
            rmStore = RMStateStoreFactory.getStore(conf);
            boolean isWorkPreservingRecoveryEnabled =
                conf.getBoolean(
                  YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
                  YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
            rmContext
                .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
          } else {
            rmStore = new NullRMStateStore();
          }
    
          try {
            rmStore.setResourceManager(rm);
            rmStore.init(conf);
            rmStore.setRMDispatcher(rmDispatcher);
          } catch (Exception e) {
            // the Exception from stateStore.init() needs to be handled for
            // HA and we need to give up master status if we got fenced
            LOG.error("Failed to init state store", e);
            throw e;
          }
          rmContext.setStateStore(rmStore);
    
          if (UserGroupInformation.isSecurityEnabled()) {
            delegationTokenRenewer = createDelegationTokenRenewer();
            rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
          }
    
          // Register event handler for NodesListManager
          nodesListManager = new NodesListManager(rmContext);
          rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);
          addService(nodesListManager);
          rmContext.setNodesListManager(nodesListManager);
    
          // Initialize the scheduler
          scheduler = createScheduler();
          scheduler.setRMContext(rmContext);
          addIfService(scheduler);
          rmContext.setScheduler(scheduler);
    
          schedulerDispatcher = createSchedulerEventDispatcher();
          addIfService(schedulerDispatcher);
          rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
    
          // Register event handler for RmAppEvents
          rmDispatcher.register(RMAppEventType.class,
              new ApplicationEventDispatcher(rmContext));
    
          // Register event handler for RmAppAttemptEvents
          rmDispatcher.register(RMAppAttemptEventType.class,
              new ApplicationAttemptEventDispatcher(rmContext));
    
          // Register event handler for RmNodes
          rmDispatcher.register(
              RMNodeEventType.class, new NodeEventDispatcher(rmContext));
    
          nmLivelinessMonitor = createNMLivelinessMonitor();
          addService(nmLivelinessMonitor);
    
          resourceTracker = createResourceTrackerService();
          addService(resourceTracker);
          rmContext.setResourceTrackerService(resourceTracker);
    
          MetricsSystem ms = DefaultMetricsSystem.initialize("ResourceManager");
          if (fromActive) {
            JvmMetrics.reattach(ms, jvmMetrics);
            UserGroupInformation.reattachMetrics();
          } else {
            jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null);
          }
    
          JvmPauseMonitor pauseMonitor = new JvmPauseMonitor();
          addService(pauseMonitor);
          jvmMetrics.setPauseMonitor(pauseMonitor);
    
          // Initialize the Reservation system
          if (conf.getBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE,
              YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_ENABLE)) {
            reservationSystem = createReservationSystem();
            if (reservationSystem != null) {
              reservationSystem.setRMContext(rmContext);
              addIfService(reservationSystem);
              rmContext.setReservationSystem(reservationSystem);
              LOG.info("Initialized Reservation system");
            }
          }
    
          masterService = createApplicationMasterService();
          createAndRegisterOpportunisticDispatcher(masterService);
          addService(masterService) ;
          rmContext.setApplicationMasterService(masterService);
    
    
          applicationACLsManager = new ApplicationACLsManager(conf);
    
          queueACLsManager = createQueueACLsManager(scheduler, conf);
    
          rmAppManager = createRMAppManager();
          // Register event handler for RMAppManagerEvents
          rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
    
          clientRM = createClientRMService();
          addService(clientRM);
          rmContext.setClientRMService(clientRM);
    
          applicationMasterLauncher = createAMLauncher();
          rmDispatcher.register(AMLauncherEventType.class,
              applicationMasterLauncher);
    
          addService(applicationMasterLauncher);
          if (UserGroupInformation.isSecurityEnabled()) {
            addService(delegationTokenRenewer);
            delegationTokenRenewer.setRMContext(rmContext);
          }
    
          if(HAUtil.isFederationEnabled(conf)) {
            String cId = YarnConfiguration.getClusterId(conf);
            if (cId.isEmpty()) {
              String errMsg =
                  "Cannot initialize RM as Federation is enabled"
                      + " but cluster id is not configured.";
              LOG.error(errMsg);
              throw new YarnRuntimeException(errMsg);
            }
            federationStateStoreService = createFederationStateStoreService();
            addIfService(federationStateStoreService);
            LOG.info("Initialized Federation membership.");
          }
    
          new RMNMInfo(rmContext, scheduler);
    
          if (conf.getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE,
              false)) {
            SystemServiceManager systemServiceManager = createServiceManager();
            addIfService(systemServiceManager);
          }
    
          super.serviceInit(conf);
        }

    获取 yarn wenApp地址

    // yarn.resourcemanager.bind-host 可以根据这个参数来动态指定 RM HOST
    webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
                          YarnConfiguration.RM_BIND_HOST,
                          WebAppUtils.getRMWebAppURLWithoutScheme(this.conf));

    创建 RMApplicationHistoryWriter 服务

    protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() {
        return new RMApplicationHistoryWriter();
    }
    
        RMApplicationHistoryWriter rmApplicationHistoryWriter =
            createRMApplicationHistoryWriter();
        addService(rmApplicationHistoryWriter);
        rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);

    创建 RM timeline collector

    private RMTimelineCollectorManager createRMTimelineCollectorManager() {
        return new RMTimelineCollectorManager(this);
    }
    
    if (YarnConfiguration.timelineServiceV2Enabled(this.conf)) {
          RMTimelineCollectorManager timelineCollectorManager =
              createRMTimelineCollectorManager();
          addService(timelineCollectorManager);
          rmContext.setRMTimelineCollectorManager(timelineCollectorManager);
        }

    设置 SystemMetricsPublisher

    protected SystemMetricsPublisher createSystemMetricsPublisher() {
        List<SystemMetricsPublisher> publishers =
            new ArrayList<SystemMetricsPublisher>();
        // 使用 v1
        if (YarnConfiguration.timelineServiceV1Enabled(conf)) {
          SystemMetricsPublisher publisherV1 = new TimelineServiceV1Publisher();
          publishers.add(publisherV1);
        }
       // 使用 v2
        if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
          // we're dealing with the v.2.x publisher
          LOG.info("system metrics publisher with the timeline service V2 is "
              + "configured");
          SystemMetricsPublisher publisherV2 = new TimelineServiceV2Publisher(
              rmContext.getRMTimelineCollectorManager());
          publishers.add(publisherV2);
        }
       // 如果没有 publisher, 给一个 空的 publisher,这里运用了null object 模式,防止了空指针的出现。
        if (publishers.isEmpty()) {
          LOG.info("TimelineServicePublisher is not configured");
          SystemMetricsPublisher noopPublisher = new NoOpSystemMetricPublisher();
          publishers.add(noopPublisher);
        }
    
        for (SystemMetricsPublisher publisher : publishers) {
          addIfService(publisher);
        }
    
        SystemMetricsPublisher combinedPublisher =
            new CombinedSystemMetricsPublisher(publishers);
        return combinedPublisher;
      }

    注册 JMX

    /**
       * Register ResourceManagerMXBean.
       */
      private void registerMXBean() {
        MBeans.register("ResourceManager", "ResourceManager", this);
      }

    调用父类的服务 init 方法

    // 在这里,之前初始化过程中创建的任何被加入到服务列表中的服务,都会被初始化。
    protected void serviceInit(Configuration conf) throws Exception {
        List<Service> services = getServices();
        if (LOG.isDebugEnabled()) {
          LOG.debug(getName() + ": initing services, size=" + services.size());
        }
        for (Service service : services) {
          service.init(conf);
        }
        super.serviceInit(conf);
    }
    // 奇怪,为什么不直接 返回呢?ArrayList 的构造方法里面做的事就是 Arrays.copyOf 的工作(浅拷贝),防止了外部应用更新或删除服务列表。这是一个建议的做法,还可以返回一个 iterator 对象
    public List<Service> getServices() {
        synchronized (serviceList) {
          return new ArrayList<Service>(serviceList);
        }
      }

    至此,初始化的大致代码,基本上走完了,后续涉及到哪部分代码,再回来具体看。

  • 相关阅读:
    容斥原理解决某个区间[1,n]闭区间与m互质数数量问题
    Educational Codeforces Round 28
    括号匹配问题(区间dp)
    小球装箱问题八连(组合数学)
    Educational Codeforces Round 29
    Codeforces Round #437 (Div. 2, based on MemSQL Start[c]UP 3.0
    Codeforces Round #434 (Div. 2, based on Technocup 2018 Elimination Round 1)
    Opencv保存读取float类型深度图
    OpenGL快速入门
    使用selenium判断标签的元素值是否存在
  • 原文地址:https://www.cnblogs.com/johnny666888/p/11055651.html
Copyright © 2011-2022 走看看