zoukankan      html  css  js  c++  java
  • Yarn RM写ZNode超数据量限制bug修复

    问题背景

    线上集群出现过几次 Yarn RM 写 ZK ZNode 的数据量超过 ZNode 限制,导致 RM 服务均进入 Standby 状态,用户无法正常提交任务,整个集群 hang 住,后续排查发现主要是异常任务写 ZNode 数据量太大,超过 ZNode 限制,导致集群其他提交作业的状态信息无法正常写入 ZNode,为避免类似问题再次出现,我们对 RM 写 ZNode 逻辑进行了优化,规避异常任务对整个集群造成的雪崩效应。

    一、问题复现

    最直接方式是修改 ZK 的 Jute 最大缓冲区为 512 B,重启 ZK 和 Yarn 服务,此时 ZK 和 RM 服务均出现异常,ZK 异常信息表现为数据 java.io.IOException: Len error 614 客户端写入数据超过 512B 无法正常写入 ZK,RM 表现为 ”code:CONNECTIONLOSS“,无法连接到 ZK,两个 RM 均处于 Standy 状态,此时集群处于不可用状态。

    leader ZK 异常信息:

    2020-12-07 16:00:11,869 INFO org.apache.zookeeper.server.ZooKeeperServer: Client attempting to renew session 0x1763c3707800002 at /10.197.1.96:32892
    2020-12-07 16:00:11,869 INFO org.apache.zookeeper.server.ZooKeeperServer: Established session 0x1763c3707800002 with negotiated timeout 40000 for client /10.197.1.96:32892
    2020-12-07 16:00:11,870 WARN org.apache.zookeeper.server.NIOServerCnxn: Exception causing close of session 0x1763c3707800002 due to java.io.IOException: Len error 614
    2020-12-07 16:00:11,870 INFO org.apache.zookeeper.server.NIOServerCnxn: Closed socket connection for client /10.197.1.96:32892 which had sessionid 0x1763c3707800002
    2020-12-07 16:00:12,216 INFO org.apache.zookeeper.server.NIOServerCnxnFactory: Accepted socket connection from /10.197.1.141:56492
    2020-12-07 16:00:12,216 INFO org.apache.zookeeper.server.ZooKeeperServer: Client attempting to establish new session at /10.197.1.141:56492
    2020-12-07 16:00:12,218 INFO org.apache.zookeeper.server.ZooKeeperServer: Established session 0x3763c3707830001 with negotiated timeout 40000 for client /10.197.1.141:56492
    2020-12-07 16:00:12,219 WARN org.apache.zookeeper.server.NIOServerCnxn: Exception causing close of session 0x3763c3707830001 due to java.io.IOException: Len error 614
    2020-12-07 16:00:12,220 INFO org.apache.zookeeper.server.NIOServerCnxn: Closed socket connection for client /10.197.1.141:56492 which had sessionid 0x3763c3707830001
    2020-12-07 16:00:14,275 INFO org.apache.zookeeper.server.NIOServerCnxnFactory: Accepted socket connection from /10.197.1.141:56510
    2020-12-07 16:00:14,275 INFO org.apache.zookeeper.server.ZooKeeperServer: Client attempting to renew session 0x3763c3707830001 at /10.197.1.141:56510
    2020-12-07 16:00:14,276 INFO org.apache.zookeeper.server.ZooKeeperServer: Established session 0x3763c3707830001 with negotiated timeout 40000 for client /10.197.1.141:56510
    2020-12-07 16:00:14,276 WARN org.apache.zookeeper.server.NIOServerCnxn: Exception causing close of session 0x3763c3707830001 due to java.io.IOException: Len error 614
    2020-12-07 16:00:14,276 INFO org.apache.zookeeper.server.NIOServerCnxn: Closed socket connection for client /10.197.1.141:56510 which had sessionid 0x3763c3707830001
    2020-12-07 16:00:16,000 INFO org.apache.zookeeper.server.ZooKeeperServer: Expiring session 0x1763c3707800000, timeout of 5000ms exceeded
    View Code

    Yarn RM 日志:

    2020-12-07 16:00:10,938 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session connected.
    2020-12-07 16:00:10,938 INFO org.apache.hadoop.ha.ActiveStandbyElector: Ignore duplicate monitor lock-node request.
    2020-12-07 16:00:11,038 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session disconnected. Entering neutral mode...
    2020-12-07 16:00:11,647 INFO org.apache.zookeeper.ClientCnxn: Opening socket connection to server slave-prd-10-197-1-236.v-bj-5.kwang.lan/10.197.1.236:2181. Will not attempt to authenticate using SASL (unknown error)
    2020-12-07 16:00:11,647 INFO org.apache.zookeeper.ClientCnxn: Socket connection established, initiating session, client: /10.197.1.141:56854, server: slave-prd-10-197-1-236.v-bj-5.kwang.lan/10.197.1.236:2181
    2020-12-07 16:00:11,649 INFO org.apache.zookeeper.ClientCnxn: Session establishment complete on server slave-prd-10-197-1-236.v-bj-5.kwang.lan/10.197.1.236:2181, sessionid = 0x1763c3707800001, negotiated timeout = 40000
    2020-12-07 16:00:11,649 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session connected.
    2020-12-07 16:00:11,650 INFO org.apache.hadoop.ha.ActiveStandbyElector: Ignore duplicate monitor lock-node request.
    2020-12-07 16:00:11,650 INFO org.apache.zookeeper.ClientCnxn: Unable to read additional data from server sessionid 0x1763c3707800001, likely server has closed socket, closing socket connection and attempting reconnect
    2020-12-07 16:00:11,750 FATAL org.apache.hadoop.ha.ActiveStandbyElector: Received create error from Zookeeper. code:CONNECTIONLOSS for path /yarn-leader-election/yarnRM/ActiveStandbyElectorLock. Not retrying further znode create connection errors.
    2020-12-07 16:00:12,210 INFO org.apache.zookeeper.ZooKeeper: Session: 0x1763c3707800001 closed
    2020-12-07 16:00:12,212 WARN org.apache.hadoop.ha.ActiveStandbyElector: Ignoring stale result from old client with sessionId 0x1763c3707800001
    2020-12-07 16:00:12,212 WARN org.apache.hadoop.ha.ActiveStandbyElector: Ignoring stale result from old client with sessionId 0x1763c3707800001
    2020-12-07 16:00:12,212 INFO org.apache.zookeeper.ClientCnxn: EventThread shut down
    2020-12-07 16:00:12,213 ERROR org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: Received RMFatalEvent of type EMBEDDED_ELECTOR_FAILED, caused by Received create error from Zookeeper. code:CONNECTIONLOSS for path /yarn-leader-election/yarnRM/ActiveStandbyElectorLock. Not retrying further znode create connection errors.
    2020-12-07 16:00:12,213 WARN org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: Transitioning the resource manager to standby.
    2020-12-07 16:00:12,214 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: Transitioning RM to Standby mode
    2020-12-07 16:00:12,214 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: Already in standby state
    2020-12-07 16:00:12,214 INFO org.apache.hadoop.ha.ActiveStandbyElector: Yielding from electionÏ
    2020-12-07 16:00:12,214 INFO org.apache.zookeeper.ZooKeeper: Initiating client connection, connectString=slave-prd-10-197-1-236.v-bj-5.kwang.lan:2181,slave-prd-10-197-1-96.v-bj-5.kwang.lan:2181,slave-prd-10-197-1-141.v-bj-5.kwang.lan:2181 sessionTimeout=60000 watcher=org.apache.hadoop.ha.ActiveStandbyElector$WatcherWithClientRef@67b6359c
    2020-12-07 16:00:12,215 INFO org.apache.zookeeper.ClientCnxn: Opening socket connection to server slave-prd-10-197-1-141.v-bj-5.kwang.lan/10.197.1.141:2181. Will not attempt to authenticate using SASL (unknown error)
    2020-12-07 16:00:12,216 INFO org.apache.zookeeper.ClientCnxn: Socket connection established, initiating session, client: /10.197.1.141:56492, server: slave-prd-10-197-1-141.v-bj-5.kwang.lan/10.197.1.141:2181
    2020-12-07 16:00:12,218 INFO org.apache.zookeeper.ClientCnxn: Session establishment complete on server slave-prd-10-197-1-141.v-bj-5.kwang.lan/10.197.1.141:2181, sessionid = 0x3763c3707830001, negotiated timeout = 40000
    2020-12-07 16:00:12,219 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session connected.
    2020-12-07 16:00:12,220 INFO org.apache.zookeeper.ClientCnxn: Unable to read additional data from server sessionid 0x3763c3707830001, likely server has closed socket, closing socket connection and attempting reconnect
    2020-12-07 16:00:12,320 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session disconnected. Entering neutral mode...
    2020-12-07 16:00:12,320 WARN org.apache.hadoop.yarn.server.resourcemanager.EmbeddedElectorService: Lost contact with Zookeeper. Transitioning to standby in 60000 ms if connection is not reestablished.
    View Code

    二、RM 与 ZNode 交互原理

    2.1 RM 状态在 ZK 中的存储

    不管 RM 是否启用了高可用,RM 作为 Yarn 的核心服务组件,不仅要与各个节点上的 ApplicationMaster 进行通信,还要与 NodeManager 进行心跳包的传输,自然在 RM 上会注册进来很多应用,每个应用由一个 ApplicationMaster 负责掌管整个应用周期,既然 RM 角色如此重要,就有必要保存一下 RM 的信息状态,以免 RM 进程异常退出后导致应用状态信息全部丢失,RM 重启无法重跑之前的任务。

    既然应用状态信息要保存的目标易经明确了,那保存方式和保存的数据信息是什么呢。

    在 Yarn 中 RM 应用状态信息保存的方式有四种:

    • MemoryRMStateStore——信息状态保存在内存中的实现类。

    • FileSystemRMStateStore——信息状态保存在 HDFS 文件系统中,这个是做了持久化的。

    • NullRMStateStore——什么都不做,就是不保存应用状态信息。

    • ZKRMStateStore——信息状态保存在 Zookeeper 中。

    由于 Yarn 启用了 RM HA,以上四种方式只能支持 ZKRMStateStore。

    那 RM 在 ZK 中到底是存储了哪些信息状态呢?如下所示,是 ZK 中存储 RM 信息状态的目录格式,可以看出,ZK 中主要存储 Application(作业的状态信息)和 SECRET_MANAGER(作业的 TOKEN 信息)等。

        ROOT_DIR_PATH
          |--- VERSION_INFO
          |--- EPOCH_NODE
          |--- RM_ZK_FENCING_LOCK
          |--- RM_APP_ROOT
          |     |----- (#ApplicationId1)
          |     |        |----- (#ApplicationAttemptIds)
          |     |
          |     |----- (#ApplicationId2)
          |     |       |----- (#ApplicationAttemptIds)
          |     ....
          |
          |--- RM_DT_SECRET_MANAGER_ROOT
          |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME
          |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME
          |       |----- Token_1
          |       |----- Token_2
          |       ....
          |
          |----- RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME
          |      |----- Key_1
          |      |----- Key_2
          ....
          |--- AMRMTOKEN_SECRET_MANAGER_ROOT
          |----- currentMasterKey
          |----- nextMasterKey

    2.2 ZK 存储&更新 RM 信息状态逻辑

    作业提交到 Yarn 上的入口,都是通过 YarnClient 这个接口 api 提交的,具体提交方法为 submitApplication()。

    //位置:org/apache/hadoop/yarn/client/api/YarnClient.java
      public abstract ApplicationId submitApplication(
          ApplicationSubmissionContext appContext) throws YarnException,
          IOException;

    作业提交后,会经过一些列的事件转换,请求到不同的状态机进行处理,而保存作业的状态机 StoreAppTransition 会对 APP 的状态进行保存,将其元数据存储到 ZK 中。

    //位置:org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
      public void storeNewApplication(RMApp app) {
        ApplicationSubmissionContext context = app
                                                .getApplicationSubmissionContext();
        assert context instanceof ApplicationSubmissionContextPBImpl;
        ApplicationStateData appState =
            ApplicationStateData.newInstance(
                app.getSubmitTime(), app.getStartTime(), context, app.getUser());
        // 向调度器发送 RMStateStoreEventType.STORE_APP 事件
        dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
      }

    这里向调度器发送 RMStateStoreEventType.STORE_APP 事件,并注册了 StoreAppTransition 状态机。

    //位置:org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
        .addTransition(RMStateStoreState.ACTIVE,
              EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
              RMStateStoreEventType.STORE_APP, new StoreAppTransition())

    StoreAppTransition 状态机最终会调用 ZKRMStateStore#storeApplicationStateInternal() 方法,对 RM 的元数据在 ZK 中进行保存。

    //位置:org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java 
      @Override
      public synchronized void storeApplicationStateInternal(ApplicationId appId,
          ApplicationStateData appStateDataPB) throws Exception {
        String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
    
        if (LOG.isDebugEnabled()) {
          LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
        }
        byte[] appStateData = appStateDataPB.getProto().toByteArray();
        createWithRetries(nodeCreatePath, appStateData, zkAcl,
                  CreateMode.PERSISTENT);
      }

    RM Application 的状态保存到 ZK 后,APP 状态最终会转化为 ACCETPED 状态 ,此时,会触发 StartAppAttemptTransition 状态机,对 AppAttemp 状态进行保存。

    //位置:org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java 
      @Override
      public synchronized void storeApplicationAttemptStateInternal(
          ApplicationAttemptId appAttemptId,
          ApplicationAttemptStateData attemptStateDataPB)
          throws Exception {
        String appDirPath = getNodePath(rmAppRoot,
            appAttemptId.getApplicationId().toString());
        String nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
    
        if (LOG.isDebugEnabled()) {
          LOG.debug("Storing info for attempt: " + appAttemptId + " at: "
              + nodeCreatePath);
        }
        byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
        createWithRetries(nodeCreatePath, attemptStateData, zkAcl,
                        CreateMode.PERSISTENT);
      }

    而在任务运行结束时,会对 Application 和 AppAttemp 的状态进行更新。而更新操作也是容易出现异常的地方,这两段代码主要是执行更新或添加任务重试状态信息到 ZK 中的操作,Yarn 在调度任务的过程中,可能会对任务进行多次重试,主要受网络、硬件、资源等因素影响,如果任务重试信息保存在 ZK 失败,会调用 org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore.ZKAction.runWithRetries() 方法重试。

    //位置:org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java 
      // 对 Application 状态进行更新
      @Override
      public synchronized void updateApplicationStateInternal(ApplicationId appId,
          ApplicationStateData appStateDataPB) throws Exception {
        String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
    
        if (LOG.isDebugEnabled()) {
          LOG.debug("Storing final state info for app: " + appId + " at: "
              + nodeUpdatePath);
        }
        byte[] appStateData = appStateDataPB.getProto().toByteArray();
    
        if (existsWithRetries(nodeUpdatePath, false) != null) {
          setDataWithRetries(nodeUpdatePath, appStateData, -1);
        } else {
          createWithRetries(nodeUpdatePath, appStateData, zkAcl,
                  CreateMode.PERSISTENT);
          LOG.debug(appId + " znode didn't exist. Created a new znode to"
                  + " update the application state.");
        }
      }
    
      // 对 AppAttemp 状态进行更新
      @Override
      public synchronized void updateApplicationAttemptStateInternal(
          ApplicationAttemptId appAttemptId,
          ApplicationAttemptStateData attemptStateDataPB)
          throws Exception {
        String appIdStr = appAttemptId.getApplicationId().toString();
        String appAttemptIdStr = appAttemptId.toString();
        String appDirPath = getNodePath(rmAppRoot, appIdStr);
        String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr);
        if (LOG.isDebugEnabled()) {
          LOG.debug("Storing final state info for attempt: " + appAttemptIdStr
              + " at: " + nodeUpdatePath);
        }
        byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
    
        if (existsWithRetries(nodeUpdatePath, false) != null) {
          setDataWithRetries(nodeUpdatePath, attemptStateData, -1);
        } else {
          createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
                  CreateMode.PERSISTENT);
          LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to"
                  + " update the application attempt state.");
        }
      }

    在启用 Yarn 高可用情况下,

    重试间隔机制如下:受 yarn.resourcemanager.zk-timeout-ms(ZK会话超时时间,线上 1 分钟,即 60000ms)和 yarn.resourcemanager.zk-num-retries(操作失败后重试次数,线上环境 1000次)参数控制,计算公式为:

    重试时间间隔(yarn.resourcemanager.zk-retry-interval-ms )=yarn.resourcemanager.zk-timeout-ms(ZK session超时时间)/yarn.resourcemanager.zk-num-retries(重试次数)

    即在生产环境中,重试时间间隔 = 600000ms /1000次 = 60 ms/次,即线上环境在任务不成功的条件下,会重试 1000 次,每次 60 ms,这里也可能会导致 RM 堆内存溢出。参考资料:https://my.oschina.net/dabird/blog/3089265

    重试间隔确定代码如下:

    //位置:src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java  
      @Override
      public synchronized void initInternal(Configuration conf) throws Exception {
        zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
        if (zkHostPort == null) {
          throw new YarnRuntimeException("No server address specified for " +
              "zookeeper state store for Resource Manager recovery. " +
              YarnConfiguration.RM_ZK_ADDRESS + " is not configured.");
        }
        // ZK 连接重试次数
        numRetries =
            conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
                YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
        znodeWorkingPath =
            conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
                YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH);
    
        // ZK session 超时时间
        zkSessionTimeout =
            conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
                YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
        zknodeLimit =
            conf.getInt(YarnConfiguration.RM_ZK_ZNODE_SIZE_LIMIT_BYTES,
                YarnConfiguration.DEFAULT_RM_ZK_ZNODE_SIZE_LIMIT_BYTES);
    
        if (HAUtil.isHAEnabled(conf)) {
          zkRetryInterval = zkSessionTimeout / numRetries;
        } else {
          zkRetryInterval =
              conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
                  YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
        }
     }

    至此,我们已经清楚了 RM 中作业的信息状态是如何保存在 ZK 中并如何进行更新的。

    2.3 ZK 删除 RM 信息状态逻辑

    在了解了 RM 作业信息状态保存在 ZK 的逻辑后,我们便会产生一个疑问,那 RM 状态保存在 ZK 中后,是否会一直驻留在 ZK 中呢?答案是否定的,ZK 也会对作业的状态进行删除,那删除逻辑是这样的呢?

    删除的核心逻辑位于 RMAppManager#checkAppNumCompletedLimit() 方法中调用的 removeApplication() 方法,其逻辑就是判断保存在 ZK StateStore 中或已完成的作业数量超过对应限制,则对 App 状态信息进行删除。

    //位置:org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
      /*
       * check to see if hit the limit for max # completed apps kept
       */
      protected synchronized void checkAppNumCompletedLimit() {
        // check apps kept in state store.
        while (completedAppsInStateStore > this.maxCompletedAppsInStateStore) {
          ApplicationId removeId =
              completedApps.get(completedApps.size() - completedAppsInStateStore);
          RMApp removeApp = rmContext.getRMApps().get(removeId);
          LOG.info("Max number of completed apps kept in state store met:"
              + " maxCompletedAppsInStateStore = " + maxCompletedAppsInStateStore
              + ", removing app " + removeApp.getApplicationId()
              + " from state store.");
          rmContext.getStateStore().removeApplication(removeApp);
          completedAppsInStateStore--;
        }
    
        // check apps kept in memorty.
        while (completedApps.size() > this.maxCompletedAppsInMemory) {
          ApplicationId removeId = completedApps.remove();
          LOG.info("Application should be expired, max number of completed apps"
              + " kept in memory met: maxCompletedAppsInMemory = "
              + this.maxCompletedAppsInMemory + ", removing app " + removeId
              + " from memory: ");
          rmContext.getRMApps().remove(removeId);
          this.applicationACLsManager.removeApplication(removeId);
        }
      }

    可以看看相关参数是如何设置的,其中保存在 ZK StateStore 中和保存在 Memory 的 App 最大数量是一致的,默认是 10000(线上环境默认也是 10000),且保存在 ZK StateSotre 中的作业数量不能超过保存在 Memory 中的作业数量。

    //位置:org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
      public RMAppManager(RMContext context,
          YarnScheduler scheduler, ApplicationMasterService masterService,
          ApplicationACLsManager applicationACLsManager, Configuration conf) {
        ...
        // 保存在 Memory 中的 App 最大数量
        this.maxCompletedAppsInMemory = conf.getInt(
            YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
            YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
        // 保存在 ZK StateStore 中的 App 最大数量,默认和 Memory 中的最大值保存一致
        this.maxCompletedAppsInStateStore =
            conf.getInt(
              YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS,
              YarnConfiguration.DEFAULT_RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS);
    
        // 保存在 ZK StateStore 中的 App 数量不能超过保存在 Memory 中的 App 数量
        if (this.maxCompletedAppsInStateStore > this.maxCompletedAppsInMemory) {
          this.maxCompletedAppsInStateStore = this.maxCompletedAppsInMemory;
        }
      }
    
    //位置:org/apache/hadoop/yarn/conf/YarnConfiguration.java
      // maxCompletedAppsInMemory 参数定义
      /** The maximum number of completed applications RM keeps. */ 
      public static final String RM_MAX_COMPLETED_APPLICATIONS =
        RM_PREFIX + "max-completed-applications";
      public static final int DEFAULT_RM_MAX_COMPLETED_APPLICATIONS = 10000;
    
      // maxCompletedAppsInStateStore 参数定义,默认和 maxCompletedAppsInMemory 保持一致
      /**
       * The maximum number of completed applications RM state store keeps, by
       * default equals to DEFAULT_RM_MAX_COMPLETED_APPLICATIONS
       */
      public static final String RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS =
          RM_PREFIX + "state-store.max-completed-applications";
      public static final int DEFAULT_RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS =
          DEFAULT_RM_MAX_COMPLETED_APPLICATIONS;

    执行真正的删除操作,删除在 ZK 中保存的超出限制的 App 状态信息。

    //位置:org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java 
      @Override
      public synchronized void removeApplicationStateInternal(
          ApplicationStateData  appState)
          throws Exception {
        String appId = appState.getApplicationSubmissionContext().getApplicationId()
            .toString();
        String appIdRemovePath = getNodePath(rmAppRoot, appId);
        ArrayList<Op> opList = new ArrayList<Op>();
    
        // 删除在 ZK 中保存的 AppAttempt 信息
        for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
          String attemptRemovePath = getNodePath(appIdRemovePath, attemptId.toString());
          opList.add(Op.delete(attemptRemovePath, -1));
        }
        opList.add(Op.delete(appIdRemovePath, -1));
    
        if (LOG.isDebugEnabled()) {
          LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
              + " and its attempts.");
        }
        // 删除在 ZK 中保存的 Applicaton 信息
        doDeleteMultiWithRetries(opList);
      }

    三、解决方案

    3.1 Hadoop 2.9.0 之前修复方法

    RM 状态在 ZK 存储的过程中,RM 作为客户端,ZK 作为服务端,在 Hadoop 2.9.0 版本之前,出现这种异常的处理方式为修改 ZK 端 jute.maxbuffer 参数的值,以增加 RM 作业允许写 ZK 的最大值。但这种处理方式有三种不足:

    1. ZK 服务端允许写入的 ZNode 数据量太大,会影响 ZK 服务的读写性能和 ZK 内存紧张;

    2. 需要重启 ZK 服务端和客户端 RM 服务,运维成本较高。(如果有其他服务依赖此 ZK 则成本更高,可能还需要重启其他服务)

    3. 异常任务写 ZNode 数据量不可控,某些情况下还是会发生写入 ZNode 大小超过限制。

    Q:为什么要限制 ZK 中 ZNode 大小?

    A:ZK 是一套高吞吐量的系统,为了提高系统的读取速度,ZK不允许从文件中读取需要的数据,而是直接从内存中查找。换句话说,ZK 集群中每一台服务器都包含全量的数据,并且这些数据都会加载到内存中,同时 ZNode 的数据不支持 Append 操作,全部都是 Replace 操作。如果 ZNode 数据量过大,那么读写 ZNode 将造成不确定的延时(比如服务端同步数据慢),同时 ZNode 太大会消耗 ZK 服务器的内存,这也是为什么 ZK 不适合存储大量数据的原因。

    3.2 Hadoop 2.9.0 及后续版本修复方法

    在 Hadoop 2.9.0 及后续版本中,yarn-site.xml 中增加了 yarn.resourcemanager.zk-max-znode-size.bytes 参数,该参数定义了 ZK 的 ZNode 节点所能存储的最大数据量,以字节为单位,默认是 1024*1024 字节,也就是 1MB。使用这种方式,我们就不需要修改 ZK 的服务端的配置,而只需修改 Yarn 服务端的配置并重启 RM 服务,就能限制 RM 往 ZK 中写入的数据量,而且也提高了 ZK 服务的可用性。

    修复的核心主要是在 ZKRMStateStore 类中的 storeApplicationStateInternal()、updateApplicationStateInternal()、storeApplicationAttemptStateInternal()、updateApplicationAttemptStateInternal() 方法逻辑中增加了是否超过写 ZNode 大小限制的判断,避免单个作业写 ZNode 数据量过大导致 RM 和 ZK 服务的不可用。部分代码如下:

    //位置:org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java 
      // Application 写 ZNode 时判断大小限制
        @Override
      public synchronized void storeApplicationStateInternal(ApplicationId appId,
          ApplicationStateData appStateDataPB) throws Exception {
        String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
    
        if (LOG.isDebugEnabled()) {
          LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
        }
        byte[] appStateData = appStateDataPB.getProto().toByteArray();
        if (appStateData.length <= zknodeLimit) {
          createWithRetries(nodeCreatePath, appStateData, zkAcl,
                  CreateMode.PERSISTENT);
          LOG.debug("Store application state data size for " + appId + " is " + appStateData.length);
        } else {
          LOG.info("Store application state data size for " + appId + " is " + appStateData.length +
            ". exceeds the maximum allowed size " + zknodeLimit + " for application data.");
        }
      }
    
      // Application 状态更新时判断写 ZNode 大小
      @Override
      public synchronized void updateApplicationStateInternal(ApplicationId appId,
          ApplicationStateData appStateDataPB) throws Exception {
        String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
    
        if (LOG.isDebugEnabled()) {
          LOG.debug("Storing final state info for app: " + appId + " at: "
              + nodeUpdatePath);
        }
        byte[] appStateData = appStateDataPB.getProto().toByteArray();
    
        if (appStateData.length <= zknodeLimit) {
          if (existsWithRetries(nodeUpdatePath, false) != null) {
            setDataWithRetries(nodeUpdatePath, appStateData, -1);
          } else {
            createWithRetries(nodeUpdatePath, appStateData, zkAcl,
                    CreateMode.PERSISTENT);
            LOG.debug(appId + " znode didn't exist. Created a new znode to"
                    + " update the application state.");
          }
          LOG.debug("Update application state data size for " + appId + " is " + appStateData.length);
        } else {
          LOG.info("Update application state data size for " + appId + " is " + appStateData.length +
                  ". exceeds the maximum allowed size " + zknodeLimit + " for application data.");
        }
      }

    3.3 任务测试

    设置 Yarn app 允许写 ZNode 的最大值,重启 active RM

    参数:yarn-site.xml 的 ResourceManager 高级配置代码段(安全阀)
    值:
    <property>
        <name>yarn.resourcemanager.zk-max-znode-size.bytes</name>
        <value>512</value>
    </property>

    测试任务:

    hadoop jar /opt/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/jars/hadoop-mapreduce-examples-2.6.0-cdh5.14.4.jar  pi -Dmapred.job.queue.name=root.exquery 20 10

    任务失败时 RM 任务日志如下,可以看出作业状态信息保存在 ZK 的数据超过了 ZNode 限制,此时 ZK 不会保存该作业的状态信息,而 ZK 服务和 RM 服务均是正常对外提供服务的,不影响集群的正常使用。

    # tailf hadoop-cmf-yarn-RESOURCEMANAGER-slave-prd-10-197-1-141.v-bj-5.vivo.lan.log.out  |grep "the maximum allowed size"
    2020-12-10 16:53:37,544 INFO org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore: Application state data size for application_1607589684539_0001 is 1515. exceeds the maximum allowed size 512 for application data.
    2020-12-10 16:53:48,086 INFO org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore: Application state data size for application_1607590418121_0001 is 1515. exceeds the maximum allowed size 512 for application data.
    
    # RM 具体 Warn 信息:
    2020-12-10 16:53:49,377 WARN org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:kwang (auth:SIMPLE) cause:org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException: Application with id 'application_1607590418121_0001' doesn't exist in RM.
    2020-12-10 16:53:49,377 INFO org.apache.hadoop.ipc.Server: IPC Server handler 0 on 8032, call org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.getApplicationReport from 10.197.1.141:56026 Call#63 Retry#0
    org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException: Application with id 'application_1607590418121_0001' doesn't exist in RM.
            at org.apache.hadoop.yarn.server.resourcemanager.ClientRMService.getApplicationReport(ClientRMService.java:324)
            at org.apache.hadoop.yarn.api.impl.pb.service.ApplicationClientProtocolPBServiceImpl.getApplicationReport(ApplicationClientProtocolPBServiceImpl.java:170)
            at org.apache.hadoop.yarn.proto.ApplicationClientProtocol$ApplicationClientProtocolService$2.callBlockingMethod(ApplicationClientProtocol.java:401)
            at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
            at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
            at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281)
            at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277)
            at java.security.AccessController.doPrivileged(Native Method)
            at javax.security.auth.Subject.doAs(Subject.java:422)
            at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
            at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275)

    四、修复方法

    4.1 修复patch并变更参数

    设置 Yarn app 允许写 ZNode 的最大值(4*1024*1024 B,即 4M),重启 RM。

    参数:yarn-site.xml 的 ResourceManager 高级配置代码段(安全阀)
    值:
    <property>
        <name>yarn.resourcemanager.zk-max-znode-size.bytes</name>
        <value>4194304</value>
    </property>

    4.2 建议参数变更

    前面在 2.2 小节中分析了作业在更新 Application 或 AppAttemp 状态时,会通过重试的方式向 ZK 的 ZNode 中写入数据,线上环境默认的重试次数为 1000 次,重试间隔为 60ms,而一旦任务出现异常时,这种高频次的写入会对 ZK 或 RM 服务造成一定的压力,因此可以调小作业的重试次数,减少重试时对服务的压力。

    参数:yarn-site.xml 的 ResourceManager 高级配置代码段(安全阀)
    值:
    <property>
        <name>yarn.resourcemanager.zk-num-retries</name>
        <value>100</value>
    </property>

    【参考资料】

    1. https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java

    2. https://issues.apache.org/jira/browse/YARN-2368

    3. https://cloud.tencent.com/developer/article/1629687

    4. https://blog.csdn.net/Androidlushangderen/article/details/48224707

     
  • 相关阅读:
    VS2015 C#取消最大化按钮,设置鼠标不可调整窗体大小
    C++调用C#编写的DLL【转】
    C#封装成DLL,并在C#中调用
    ubuntu永久修改主机名
    Anaconda3的安装和汉化
    windows下面安装Python和pip教程
    pip install bs4安装失败
    Sublime text 3中文汉化教程
    Vmware安装ubuntu详细教程
    Sublime text3修改tab键为缩进为四个空格
  • 原文地址:https://www.cnblogs.com/lemonu/p/14250550.html
Copyright © 2011-2022 走看看