zoukankan      html  css  js  c++  java
  • YARN源码分析(三)-----ResourceManager HA之应用状态存储与恢复

    前言

    任何系统即使做的再大,都会有可能出现各种各样的突发状况。尽管你可以说我在软件层面上已经做到所有情况的意外处理了,但是万一硬件出问题了或者说物理层面上出了问题,恐怕就不是多写几行代码能够立刻解决的吧,说了这么多,无非就是想强调HA,系统高可用性的重要性。在YARN中,NameNode的HA方式估计很多人都已经了解了,那本篇文章就来为大家梳理梳理RM资源管理器HA方面的知识,并不是指简单的RM的HA配置,确切的说是RM的应用状态存储于恢复。


    RM应用状态存储使用

    RM应用状态存储是什么意思呢,我们知道,RM全称ResourceManager,好比一个大管家,他不仅要与各个节点上的ApplicationMaster进行通信,还要与NodeManager进行心跳包的传输,自然在RM上会注册进来很多的应用,每个应用由1个ApplicationMaster负责掌管整个应用周期。既然RM角色这么重要,就有必要保存一下RM的信息状态,以免RM进程异常退出导致的应用状态信息丢失,RM重启无法重跑之前的应用的现象。


    保存什么应用信息

    既然目标已经明确了,那么在YARN中,保存的应用信息到底是哪些数据信息呢,应用状态信息只是1个笼统的概念。下面用一张图来表示。


    可以看到,这是一张分层多叉树的形状,这个图类似于MapReduce作用运行的分层执行状态图,做个简单介绍,最上面就是1个RMState的状态,这个状态中包含若干个ApplicationState的应用状态信息,每个应用状态信息中包含了很多歌应用尝试信息状态。


    应用状态信息如何保存

    RM应用状态信息保存的方式又哪些呢:

    1.MemoryRMStateStore--信息状态保存在内存中的实现类。

    2.FileSystemRMStateStore--信息状态保存在HDFS文件系统中,这个是做了持久化了。

    3.NullRMStateStore--do nothing,什么都不做,就是不保存应用状态信息。

    4.ZKRMStateStore--信息状态保存在Zookeeper中。

    由于我分析的源码中还没有ZKRMStateStore这个类,所以只针对前3种做一个简单的介绍。上面列举的几个类都是具体实现类,那么就一定存在更加上层级的类来定义更基本的变量和方法,答案是RMStateStore类,所以继承关系就是下面这张图所表示


    下面蓝色箭头所表示的意思实现类的依托对象。具体什么意思,看接下来的源码分析。首先RMStateStore类对象

    /**
     * Base class to implement storage of ResourceManager state.
     * Takes care of asynchronous notifications and interfacing with YARN objects.
     * Real store implementations need to derive from it and implement blocking
     * store and load methods to actually store and load the state.
     * 保存RM资源状态信息的基类,也是一个服务对象类
     */
    public abstract class RMStateStore extends AbstractService {
     
      ....
    
    
      /**
       * State of an application attempt
       * 一次应用尝试状态信息类
       */
      public static class ApplicationAttemptState {
      	//应用尝试ID
        final ApplicationAttemptId attemptId;
        //主容器
        final Container masterContainer;
        //凭证信息
        final Credentials appAttemptCredentials;
    
        ....
      }
      
      /**
       * State of an application application
       * 应用状态信息类
       */
      public static class ApplicationState {
      	//应用提交上下文对象
        final ApplicationSubmissionContext context;
        //应用提交时间
        final long submitTime;
        //提交者
        final String user;
        //应用尝试信息对
        Map<ApplicationAttemptId, ApplicationAttemptState> attempts =
                      new HashMap<ApplicationAttemptId, ApplicationAttemptState>();
        
        ....
      }
    
      public static class RMDTSecretManagerState {
        // DTIdentifier -> renewDate
        //RM身份标识符ID对时间的映射
        Map<RMDelegationTokenIdentifier, Long> delegationTokenState =
            new HashMap<RMDelegationTokenIdentifier, Long>();
    
        Set<DelegationKey> masterKeyState =
            new HashSet<DelegationKey>();
    
        int dtSequenceNumber = 0;
    
        ....
      }
    
      /**
       * State of the ResourceManager
       * RM状态信息类
       */
      public static class RMState {
      	//RM中的应用状态对图
        Map<ApplicationId, ApplicationState> appState =
            new HashMap<ApplicationId, ApplicationState>();
    
        RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState();
    
        ....
      }
    重点关注,在此类中定义的几个应用状态类,与上面第一张进行对比。下面看下在这个父类中定义的几个应用保存相关的方法:

    /**
       * Non-Blocking API
       * ResourceManager services use this to store the application's state
       * This does not block the dispatcher threads
       * RMAppStoredEvent will be sent on completion to notify the RMApp
       * 保存应用状态方法,触发一次保存event事件,此方法为非阻塞方法
       */
      @SuppressWarnings("unchecked")
      public synchronized void storeApplication(RMApp app) {
        ApplicationSubmissionContext context = app
                                                .getApplicationSubmissionContext();
        assert context instanceof ApplicationSubmissionContextPBImpl;
        ApplicationState appState = new ApplicationState(
            app.getSubmitTime(), context, app.getUser());
        //触发一次应用信息保存事件,由中央调度器进行事件分发处理
        dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
      }
        
      /**
       * Blocking API
       * Derived classes must implement this method to store the state of an 
       * application.
       * 保存应用状态信息的阻塞方法,由子类具体实现
       */
      protected abstract void storeApplicationState(String appId,
                                          ApplicationStateDataPBImpl appStateData) 
                                          throws Exception;
    保存应用状态方法分为阻塞式的方法和非阻塞式的方法,非阻塞式的方法时通过事件驱动的方式实现,阻塞式的方法由具体子类去实现。移除应用的方法有小小的不同点

    /**
       * Non-blocking API
       * ResourceManager services call this to remove an application from the state
       * store
       * This does not block the dispatcher threads
       * There is no notification of completion for this operation.
       * There is no notification of completion for this operation.
       * RM中移除应用状态信息,主要是移除里面的应用尝试信息列表
       */
      public synchronized void removeApplication(RMApp app) {
        ApplicationState appState = new ApplicationState(
                app.getSubmitTime(), app.getApplicationSubmissionContext(),
                app.getUser());
        //取出此应用中的运行尝试信息状态
        for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
          Credentials credentials = getCredentialsFromAppAttempt(appAttempt);
          ApplicationAttemptState attemptState =
              new ApplicationAttemptState(appAttempt.getAppAttemptId(),
                appAttempt.getMasterContainer(), credentials);
          appState.attempts.put(attemptState.getAttemptId(), attemptState);
        }
        //进行移除操作
        removeApplication(appState);
      }
    移除应用需要把目标应用内包含的所有应用尝试信息都取出,然后进行移除操作,removeApplication操作又会又如上面的2个方法分支

    /**
       * Non-Blocking API
       */
      public synchronized void removeApplication(ApplicationState appState) {
        dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState));
      }
    
      /**
       * Blocking API
       * Derived classes must implement this method to remove the state of an 
       * application and its attempts
       */
      protected abstract void removeApplicationState(ApplicationState appState) 
                                                                 throws Exception;
    在这个类中特别介绍一下,下面这个类是干嘛的

    public static class RMDTSecretManagerState {
        // DTIdentifier -> renewDate
        //RM身份标识符ID对时间的映射
        Map<RMDelegationTokenIdentifier, Long> delegationTokenState =
            new HashMap<RMDelegationTokenIdentifier, Long>();
    
        Set<DelegationKey> masterKeyState =
            new HashSet<DelegationKey>();
    
        int dtSequenceNumber = 0;
    
        ....
      }
    里面保存了RM身份标识位到时间的映射,RMDelegationonTokenIdentifier标识位可以用来表明此RM是旧的RM还是新启动的RM,对于应用来说。下面说说3个具体实现类。


    MemoryRMStateStore

    内存保存实现类,RM的应用状态信息在RMStateStore已经被抽象成了RMState类,所以在MemoryRMStateStore类中,肯定会有对应的变量

    //内存RM状态信息保存类实现
    public class MemoryRMStateStore extends RMStateStore {
      
      RMState state = new RMState();
      
      @VisibleForTesting
      public RMState getState() {
        return state;
      }
      ...
    刚刚开始的时候,state是一个没有任何信息内容的实例对象。然后他定义了保存应用信息对象的方法
    @Override
      public void storeApplicationState(String appId, 
                                         ApplicationStateDataPBImpl appStateData)
          throws Exception {
        //生成新的应用状态对象实例
        ApplicationState appState = new ApplicationState(
            appStateData.getSubmitTime(),
            appStateData.getApplicationSubmissionContext(), appStateData.getUser());
        if (state.appState.containsKey(appState.getAppId())) {
          Exception e = new IOException("App: " + appId + " is already stored.");
          LOG.info("Error storing info for app: " + appId, e);
          throw e;
        }
        //加入state对象中
        state.appState.put(appState.getAppId(), appState);
      }
    保存应用尝试状态信息方法

    @Override
      public synchronized void storeApplicationAttemptState(String attemptIdStr, 
                                ApplicationAttemptStateDataPBImpl attemptStateData)
                                throws Exception {
        ApplicationAttemptId attemptId = ConverterUtils
                                            .toApplicationAttemptId(attemptIdStr);
        ...
        ApplicationAttemptState attemptState =
            new ApplicationAttemptState(attemptId,
              attemptStateData.getMasterContainer(), credentials);
    
        ApplicationState appState = state.getApplicationState().get(
            attemptState.getAttemptId().getApplicationId());
        if (appState == null) {
          throw new YarnRuntimeException("Application doesn't exist");
        }
    
        if (appState.attempts.containsKey(attemptState.getAttemptId())) {
          Exception e = new IOException("Attempt: " +
              attemptState.getAttemptId() + " is already stored.");
          LOG.info("Error storing info for attempt: " +
              attemptState.getAttemptId(), e);
          throw e;
        }
        //加入appState的运行尝试信息状态列表中
        appState.attempts.put(attemptState.getAttemptId(), attemptState);
      }
    应用状态信息保存完毕之后,如何从内存中进行加载呢,这个也是我们所关心的,;loadState()方法实现了这个需求

    //相当于返回一个内存中维护的RM状态拷贝对象
      @Override
      public synchronized RMState loadState() throws Exception {
        // return a copy of the state to allow for modification of the real state
        //新建一个RMState对象,拷贝内存中维护的RMstate对象
        RMState returnState = new RMState();
        //拷贝appState
        returnState.appState.putAll(state.appState);
        returnState.rmSecretManagerState.getMasterKeyState()
          .addAll(state.rmSecretManagerState.getMasterKeyState());
        returnState.rmSecretManagerState.getTokenState().putAll(
          state.rmSecretManagerState.getTokenState());
        returnState.rmSecretManagerState.dtSequenceNumber =
            state.rmSecretManagerState.dtSequenceNumber;
        return returnState;
      }
    相当于MemoryStateStore对象的RMState深拷贝。


    FileSystemRMStateStore

    文件系统RM应用信息状态保存类,此类做的一个核心操作就是把应用状态信息持久化到HDFS中了。

    /**
     * A simple class for storing RM state in any storage that implements a basic
     * FileSystem interface. Does not use directories so that simple key-value
     * stores can be used. The retry policy for the real filesystem client must be
     * configured separately to enable retry of filesystem operations when needed.
     * RM状态信息文件系统保存类
     */
    public class FileSystemRMStateStore extends RMStateStore {
    
      public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
    
      private static final String ROOT_DIR_NAME = "FSRMStateRoot";
      private static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
      private static final String RM_APP_ROOT = "RMAppRoot";
      private static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
      private static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
      private static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
          "RMDTSequenceNumber_";
      //文件系统对象
      protected FileSystem fs;
      
      //RM保存的文件路径
      private Path rootDirPath;
      private Path rmDTSecretManagerRoot;
      private Path rmAppRoot;
      private Path dtSequenceNumberPath = null;
    
      @VisibleForTesting
      Path fsWorkingPath;
    声明了多种路径,不同对象实例有不同的路径,然后还有1个总文件系统操作对象。下面看核心的保存应用方法

    @Override
      public synchronized void storeApplicationState(String appId,
          ApplicationStateDataPBImpl appStateDataPB) throws Exception {
        Path appDirPath = getAppDir(rmAppRoot, appId);
        fs.mkdirs(appDirPath);
        //获取待写入的目录路径
        Path nodeCreatePath = getNodePath(appDirPath, appId);
    
        LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
        //获取待写入的状态数据
        byte[] appStateData = appStateDataPB.getProto().toByteArray();
        try {
          // currently throw all exceptions. May need to respond differently for HA
          // based on whether we have lost the right to write to FS
          //进行状态信息的写入
          writeFile(nodeCreatePath, appStateData);
        } catch (Exception e) {
          LOG.info("Error storing info for app: " + appId, e);
          throw e;
        }
      }
    对应的加载RM应用状态方法

    @Override
      public synchronized RMState loadState() throws Exception {
      	//新建RM状态对象
        RMState rmState = new RMState();
        //调用方法,从文件中进行恢复
        // recover DelegationTokenSecretManager
        loadRMDTSecretManagerState(rmState);
        // recover RM applications
        loadRMAppState(rmState);
        return rmState;
      }
    加载应用操作

    private void loadRMAppState(RMState rmState) throws Exception {
        try {
          List<ApplicationAttemptState> attempts =
              new ArrayList<ApplicationAttemptState>();
    
          for (FileStatus appDir : fs.listStatus(rmAppRoot)) {
            for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) {
              assert childNodeStatus.isFile();
              String childNodeName = childNodeStatus.getPath().getName();
              //读取文件数据信息
              byte[] childData =
                  readFile(childNodeStatus.getPath(), childNodeStatus.getLen());
              //如果是应用状态信息
              if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
                // application
                LOG.info("Loading application from node: " + childNodeName);
                ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
                ApplicationStateDataPBImpl appStateData =
                    new ApplicationStateDataPBImpl(
                      ApplicationStateDataProto.parseFrom(childData));
                ApplicationState appState =
                    new ApplicationState(appStateData.getSubmitTime(),
                      appStateData.getApplicationSubmissionContext(),
                      appStateData.getUser());
                // assert child node name is same as actual applicationId
                assert appId.equals(appState.context.getApplicationId());
                rmState.appState.put(appId, appState);
              } else if (childNodeName
                .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
                // attempt
                //如果是应用产生信息
                LOG.info("Loading application attempt from node: " + childNodeName);
                ApplicationAttemptId attemptId =
                    ConverterUtils.toApplicationAttemptId(childNodeName);
                ApplicationAttemptStateDataPBImpl attemptStateData =
                    new ApplicationAttemptStateDataPBImpl(
                      ApplicationAttemptStateDataProto.parseFrom(childData));
                Credentials credentials = null;
                if (attemptStateData.getAppAttemptTokens() != null) {
                  credentials = new Credentials();
                  DataInputByteBuffer dibb = new DataInputByteBuffer();
                  dibb.reset(attemptStateData.getAppAttemptTokens());
                  credentials.readTokenStorageStream(dibb);
                }
                ApplicationAttemptState attemptState =
                    new ApplicationAttemptState(attemptId,
                      attemptStateData.getMasterContainer(), credentials);
    
                // assert child node name is same as application attempt id
                assert attemptId.equals(attemptState.getAttemptId());
                attempts.add(attemptState);
              } else {
                LOG.info("Unknown child node with name: " + childNodeName);
              }
            }
          }

    NullRMStateStore

    空方法实现类,就是不保存状态信息操作,方法很简单,继承了方法,但不实现代码逻辑

    //空RM信息状态保存类,不实现保存方法的任何操作
    @Unstable
    public class NullRMStateStore extends RMStateStore {
      ....
      
      //不实现加载状态方法
      @Override
      public RMState loadState() throws Exception {
        throw new UnsupportedOperationException("Cannot load state from null store");
      }
      
      //具体保存应用方法也不实现
      @Override
      protected void storeApplicationState(String appId,
          ApplicationStateDataPBImpl appStateData) throws Exception {
        // Do nothing
      }
    
      @Override
      protected void storeApplicationAttemptState(String attemptId,
          ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
        // Do nothing
      }
    
      @Override
      protected void removeApplicationState(ApplicationState appState)
          throws Exception {
        // Do nothing
      }
    
      .....
    }

    那么如何使用上面这些类呢,在yarn的配置属性中,通过参数yarn.resource-manager.store.class进行类对象配置,填入类名称即可。


    全部代码的分析请点击链接https://github.com/linyiqun/hadoop-yarn,后续将会继续更新YARN其他方面的代码分析。

    参考文献

    《Hadoop技术内部–YARN架构设计与实现原理》.董西成


  • 相关阅读:
    warning: ISO C++ forbids converting a string constant to 'char*' [-Wwrite-strings]
    Windows10+CLion+OpenCV4.5.2开发环境搭建
    Android解决部分机型WebView播放视频全屏按钮灰色无法点击、点击全屏白屏无法播放等问题
    MediaCodec.configure Picture Width(1080) or Height(2163) invalid, should N*2
    tesseract
    Caer -- a friendly API wrapper for OpenCV
    Integrating OpenCV python tool into one SKlearn MNIST example for supporting prediction
    Integrating Hub with one sklearn mnist example
    What is WSGI (Web Server Gateway Interface)?
    Hub --- 机器学习燃料(数据)的仓库
  • 原文地址:https://www.cnblogs.com/bianqi/p/12183864.html
Copyright © 2011-2022 走看看