某日 收到告警 线上集群rm切换 观察resourcemanager 日志报错如下
这行不明显 再看看其他日志报错
在 app attempt_removed 时候发生了空指针错误
break; case APP_ATTEMPT_REMOVED: if (!(event instanceof AppAttemptRemovedSchedulerEvent)) { throw new RuntimeException("Unexpected event type: " + event); } AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent = (AppAttemptRemovedSchedulerEvent) event; removeApplicationAttempt( appAttemptRemovedEvent.getApplicationAttemptID(), appAttemptRemovedEvent.getFinalAttemptState(), appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts()); break; case CONTAINER_EXPIRED:
定位到代码问题在这里 新增标记内容
private void removeApplicationAttempt( ApplicationAttemptId applicationAttemptId, RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) { LOG.info("Application " + applicationAttemptId + " is done." + " finalState=" + rmAppAttemptFinalState); try { writeLock.lock(); SchedulerApplication<FSAppAttempt> application = applications.get(applicationAttemptId.getApplicationId()); FSAppAttempt attempt = getSchedulerApp(applicationAttemptId); if (attempt == null || application == null) { LOG.info("Unknown application " + applicationAttemptId + " has completed!"); return; }
//已经停止了就不用再次停止了 新增 // Check if the attempt is already stopped and don't stop it twice. if (attempt.isStopped()) { LOG.info("Application " + applicationAttemptId + " has already been " + "stopped!"); return; } // Release all the running containers for (RMContainer rmContainer : attempt.getLiveContainers()) { if (keepContainers && rmContainer.getState().equals(RMContainerState.RUNNING)) { // do not kill the running container in the case of work-preserving AM // restart. LOG.info("Skip killing " + rmContainer.getContainerId()); continue; } super.completedContainer(rmContainer, SchedulerUtils.createAbnormalContainerStatus( rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), RMContainerEventType.KILL); }
增加如下代码
@Override public String moveApplication(ApplicationId appId, String queueName) throws YarnException { try { writeLock.lock(); SchedulerApplication<FSAppAttempt> app = applications.get(appId); if (app == null) { throw new YarnException("App to be moved " + appId + " not found."); } FSAppAttempt attempt = (FSAppAttempt) app.getCurrentAppAttempt(); // To serialize with FairScheduler#allocate, synchronize on app attempt try { attempt.getWriteLock().lock(); FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue(); // Check if the attempt is already stopped: don't move stopped app // attempt. The attempt has already been removed from all queues. if (attempt.isStopped()) { LOG.info("Application " + appId + " is stopped and can't be moved!"throw new YarnException("Application " ++ " is stopped and can't be moved!"); } String destQueueName = handleMoveToPlanQueue(queueName); FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false); if (targetQueue == null) { throw new YarnException("Target queue " + queueName + " not found or is not a leaf queue."); } if (targetQueue == oldQueue) { return oldQueue.getQueueName(); }
private void executeMove(SchedulerApplication<FSAppAttempt> app, FSAppAttempt attempt, FSLeafQueue oldQueue, FSLeafQueue newQueue) { // Check current runs state. Do not remove the attempt from the queue until // after the check has been performed otherwise it could remove the app // from a queue without moving it to a new queue. boolean wasRunnable = oldQueue.isRunnableApp(attempt); // if app was not runnable before, it may be runnable now boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue, attempt.getUser()); if (wasRunnable && !nowRunnable) { throw new IllegalStateException("Should have already verified that app " + attempt.getApplicationId() + " would be runnable in new queue"); } // Now it is safe to remove from the queue. oldQueue.removeApp(attempt); if (wasRunnable) { maxRunningEnforcer.untrackRunnableApp(attempt); } else if (nowRunnable) { // App has changed from non-runnable to runnable maxRunningEnforcer.untrackNonRunnableApp(attempt); } attempt.move(newQueue); // This updates all the metrics app.setQueue(newQueue); newQueue.addApp(attempt, nowRunnable); if (nowRunnable) { maxRunningEnforcer.trackRunnableApp(attempt); } if (wasRunnable) { maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt, oldQueue); } }
问题解决
参考 https://issues.apache.org/jira/secure/attachment/12841441/YARN-5136.2.patch