zoukankan      html  css  js  c++  java
  • 【Yarn源码分析】Container启动流程源码分析

    在 《ApplicationMaster启动及资源申请源码分析》中,AM 向 RM 注册后,会周期性地通过 RPC 函数 ApplicationMaster#allocate() 与 RM 通信,通信目的包括请求资源、获取新分配的资源及形成周期性心跳,本文中我们重点看看 AM 向 RM 申请到 Container 资源后,如何在 NM 节点上启动 Container,接下来会详细介绍 Container 从申请资源、启动到资源清理整个过程的源码。

    一、Container 启动流程介绍

    Container 启动是由 ApplicationMaster 通过 RPC 函数 ContainerManagementProtocol#startContainers() 向 NM 发起的,NM 中的 ContainerManagerImpl 组件负责接收并处理该请求。Container 启动过程主要经历三个阶段:资源本地化、启动并运行 Container和资源清理。

    • 资源本地化主要是指分布式缓存机制完成的工作,功能包括初始化各种服务组件、创建工作目录、从 HDFS 下载运行所需的各种资源(比如文本文件、JAR 包、可执行文件)等。资源本地化主要有两部分组成,分别是应用程序初始化和 Container 本地化。其中,应用程序初始化的主要工作是初始化各类必需的服务组件(比如日志记录组件 LogHandler、资源状态追踪器 LocalResourceTrackerImpl等),供后续 Container 使用,通常由 Application 的第一个 Container 完成;Container 本地化则是创建工作目录,从 HDFS 下载各类文件资源。
    • Container 启动是由 ContainerLauncher 服务完成,该服务将进一步调用插拔式组件 ContainerExecutor。Yarn 中提供了三种 ContainerExecutor 实现,一种是 DefaultContainerExecutor,一种是 LinuxContainerExecutor,另一种是 DockerContainerExecutor,由参数 yarn.nodemanager.container-executor.class 控制具体采用的方式。
    • 资源清理则是资源本地化的逆过程,它负责清理各类资源,均由 ResourceLocalizationService 服务完成。

    二、Container 启动源码分析

    2.1 AM 调用 api 请求启动 Container

    在介绍 Container 启动前,我们先来看看 AM 在心跳时如何根据申请到的资源来请求 Container 的启动。AM 通过  RPC 函数 ApplicationMaster#allocate() 周期性向 RM 申请资源,并将申请到的资源保存在阻塞队列 responseQueue 中。

    //位置:org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
      private class HeartbeatThread extends Thread {
        public HeartbeatThread() {
          super("AMRM Heartbeater thread");
        }
         
        public void run() {
          while (true) {   // 心跳线程死循环的跑
            AllocateResponse response = null;
            // synchronization ensures we don't send heartbeats after unregistering
            synchronized (unregisterHeartbeatLock) {
              if (!keepRunning) {
                return;
              }
     
              try {
                // 重点:心跳线程其实就是周期性的调用 allocate() 方法,将分配出来的 Container 保存在 AllocateResponse 实例中
                response = client.allocate(progress);
              } catch (ApplicationAttemptNotFoundException e) {
                handler.onShutdownRequest();
                LOG.info("Shutdown requested. Stopping callback.");
                return;
              } catch (Throwable ex) {
                LOG.error("Exception on heartbeat", ex);
                savedException = ex;
                // interrupt handler thread in case it waiting on the queue
                handlerThread.interrupt();
                return;
              }
              if (response != null) {
                while (true) {
                  try {
                      // 将 RM 返回的 AllocateResponse 对象资源添加到阻塞队列 responseQueue 中
                    responseQueue.put(response);
                    break;
                  } catch (InterruptedException ex) {
                    LOG.debug("Interrupted while waiting to put on response queue", ex);
                  }
                }
              }
            }
            try {
              Thread.sleep(heartbeatIntervalMs.get());
            } catch (InterruptedException ex) {
              LOG.debug("Heartbeater interrupted", ex);
            }
          }
        }
      }

    那 responseQueue 队列保存申请到的 Container 资源怎么使用呢?通过查看 responseQueue.take() 函数,可以发现 AMRMClientAsyncImpl 类中的独立线程 CallbackHandlerThread 会不断地从队列中取出 AllocateResponse 对象进行处理。

    //位置:org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
      private class CallbackHandlerThread extends Thread {
        public CallbackHandlerThread() {
          super("AMRM Callback Handler Thread");
        }
        
        public void run() {
          while (true) {    // 死循环取出申请到的 Container 资源并进行处理
            if (!keepRunning) {
              return;
            }
            try {
              AllocateResponse response;
              if(savedException != null) {
                LOG.error("Stopping callback due to: ", savedException);
                handler.onError(savedException);
                return;
              }
              try {
                  // 从阻塞队列 responseQueue 取出 Container 资源
                response = responseQueue.take();
              } catch (InterruptedException ex) {
                LOG.info("Interrupted while waiting for queue", ex);
                continue;
              }
              List<NodeReport> updatedNodes = response.getUpdatedNodes();
              if (!updatedNodes.isEmpty()) {
                handler.onNodesUpdated(updatedNodes);
              }
    
              List<ContainerStatus> completed =
                  response.getCompletedContainersStatuses();
              if (!completed.isEmpty()) {
                handler.onContainersCompleted(completed);
              }
    
              List<Container> allocated = response.getAllocatedContainers();
              if (!allocated.isEmpty()) {
                  // 重点:处理分配出来的 Container
                handler.onContainersAllocated(allocated);
              }
    
              // 更新 Container 的执行进度
              progress = handler.getProgress();
            } catch (Throwable ex) {
              handler.onError(ex);
              // re-throw exception to end the thread
              throw new YarnRuntimeException(ex);
            }
          }
        }
      }
    }

    handler.onContainersAllocated(allocated) 方法会对分配出来的 Container 资源进行处理。

    //位置:org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
        @Override
        public void onContainersAllocated(List<Container> allocatedContainers) {
          LOG.info("Got response from RM for container ask, allocatedCnt="
              + allocatedContainers.size());
          numAllocatedContainers.addAndGet(allocatedContainers.size());
          for (Container allocatedContainer : allocatedContainers) {
            String yarnShellId = Integer.toString(yarnShellIdCounter);
            yarnShellIdCounter++;
            LOG.info("Launching shell command on a new container."
                + ", containerId=" + allocatedContainer.getId()
                + ", yarnShellId=" + yarnShellId
                + ", containerNode=" + allocatedContainer.getNodeId().getHost()
                + ":" + allocatedContainer.getNodeId().getPort()
                + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
                + ", containerResourceMemory"
                + allocatedContainer.getResource().getMemory()
                + ", containerResourceVirtualCores"
                + allocatedContainer.getResource().getVirtualCores());
            // + ", containerToken"
            // +allocatedContainer.getContainerToken().getIdentifier().toString());
    
            // 创建运行 Container 的 LaunchContainerRunnable 线程
            Thread launchThread = createLaunchContainerThread(allocatedContainer,
                yarnShellId);
    
            // launch and start the container on a separate thread to keep
            // the main thread unblocked
            // as all containers may not be allocated at one go.
            launchThreads.add(launchThread);
            launchedContainers.add(allocatedContainer.getId());
            // 启动 LaunchContainerRunnable 线程
            launchThread.start();
          }
        }
    
        @VisibleForTesting
        Thread createLaunchContainerThread(Container allocatedContainer,
            String shellId) {
          LaunchContainerRunnable runnableLaunchContainer =
              new LaunchContainerRunnable(allocatedContainer, containerListener,
                  shellId);
          return new Thread(runnableLaunchContainer);
        }

    上面的逻辑启动了一个 LaunchContainerRunnable 线程,LaunchContainerRunnable 是 ApplicationMaster 类的内部类,继承自 Runnable 接口,通过该类的 run() 方法,可以知道该类主要做了两件事:

    • 初始化 Contianer 的本地资源,并构建 Container 的启动脚本
    • 调用 NMClientAsync#startContainerAsync() api 接口启动 Container。
    //位置:org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
        public void run() {
          LOG.info("Setting up container launch container for containerid="
              + container.getId() + " with shellid=" + shellId);
    
          // 省略构建 Container 启动脚本逻辑
    
          // Set up ContainerLaunchContext, setting local resource, environment,
          // command and token for constructor.
    
          // Note for tokens: Set up tokens for the container too. Today, for normal
          // shell commands, the container in distribute-shell doesn't need any
          // tokens. We are populating them mainly for NodeManagers to be able to
          // download anyfiles in the distributed file-system. The tokens are
          // otherwise also useful in cases, for e.g., when one is running a
          // "hadoop dfs" command inside the distributed shell.
          Map<String, String> myShellEnv = new HashMap<String, String>(shellEnv);
          myShellEnv.put(YARN_SHELL_ID, shellId);
          ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
            localResources, myShellEnv, commands, null, allTokens.duplicate(),
              null);
          containerListener.addContainer(container.getId(), container);
    
          // 2. 重点:通过 NMClientAsync api 启动分配出来的 Container
          nmClientAsync.startContainerAsync(container, ctx);
        }
      }

    可以看到 nmClientAsync.startContainerAsync() 方法并没有真正启动 Container,而是将 ContainerEventType.START_CONTAINER 事件封装成  ContainerEvent 对象(StartContainerEvent 类继承自 ContainerEvent 类),并添加到 Container 事件处理的阻塞队列 events 中,具体操作处理流程由 events 队列的消费逻辑处理。

    //位置:org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
      public void startContainerAsync(
          Container container, ContainerLaunchContext containerLaunchContext) {
        if (containers.putIfAbsent(container.getId(),
            new StatefulContainer(this, container.getId())) != null) {
          callbackHandler.onStartContainerError(container.getId(),
              RPCUtil.getRemoteException("Container " + container.getId() +
                  " is already started or scheduled to start"));
        }
        try {
          events.put(new StartContainerEvent(container, containerLaunchContext));
        } catch (InterruptedException e) {
          LOG.warn("Exception when scheduling the event of starting Container " +
              container.getId());
          callbackHandler.onStartContainerError(container.getId(), e);
        }
      }

    那这里的阻塞队列 events 又是怎么处理呢?还是来找找 events.take() 方法,发现在 NMClientAsyncImpl 类执行 serviceStart() 方法时会启动一个线程去消费 events 队列的事件,队列取出来的事件对象为内部封装有 ContainerEventType.START_CONTAINER 事件的 ContainerEvent 对象,通过 getContainerEventProcessor(event) 方法,获取对应的 ContainerEvent 对象的处理器 ContainerEventProcessor,并以线程池的方式运行该处理器。

    //位置:org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
      protected void serviceStart() throws Exception {
        client.start();
    
        ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
            this.getClass().getName() + " #%d").setDaemon(true).build();
    
        // Start with a default core-pool size and change it dynamically.
        int initSize = Math.min(INITIAL_THREAD_POOL_SIZE, maxThreadPoolSize);
        threadPool = new ThreadPoolExecutor(initSize, Integer.MAX_VALUE, 1,
            TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
    
        eventDispatcherThread = new Thread() {
          @Override
          public void run() {
            ContainerEvent event = null;
            Set<String> allNodes = new HashSet<String>();
    
            while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
              try {
                  // 从阻塞队列 events 中取出 ContainerEvent 事件
                event = events.take();
              } catch (InterruptedException e) {
                if (!stopped.get()) {
                  LOG.error("Returning, thread interrupted", e);
                }
                return;
              }
    
              allNodes.add(event.getNodeId().toString());
    
              int threadPoolSize = threadPool.getCorePoolSize();
    
              // We can increase the pool size only if haven't reached the maximum
              // limit yet.
              if (threadPoolSize != maxThreadPoolSize) {
    
                // nodes where containers will run at *this* point of time. This is
                // *not* the cluster size and doesn't need to be.
                int nodeNum = allNodes.size();
                int idealThreadPoolSize = Math.min(maxThreadPoolSize, nodeNum);
    
                if (threadPoolSize < idealThreadPoolSize) {
                  // Bump up the pool size to idealThreadPoolSize +
                  // INITIAL_POOL_SIZE, the later is just a buffer so we are not
                  // always increasing the pool-size
                  int newThreadPoolSize = Math.min(maxThreadPoolSize,
                      idealThreadPoolSize + INITIAL_THREAD_POOL_SIZE);
                  LOG.info("Set NMClientAsync thread pool size to " +
                      newThreadPoolSize + " as the number of nodes to talk to is "
                      + nodeNum);
                  threadPool.setCorePoolSize(newThreadPoolSize);
                }
              }
    
              // 重点:根据获取到的 Container 事件类型为 ContainerEventType.START_CONTAINER
              // getContainerEventProcessor(event) 返回一个 ContainerEventProcessor 线程对象,并在线程池中启动
              threadPool.execute(getContainerEventProcessor(event));
            }
          }
        };
        // 启动线程
        eventDispatcherThread.setName("Container  Event Dispatcher");
        eventDispatcherThread.setDaemon(false);
        eventDispatcherThread.start();
    
        super.serviceStart();
      }

    ContainerEventProcessor 处理器类是 NMClientAsyncImpl 类的内部类,继承自 Runnable 类,那我们来看看该类的 run() 方法,根据事件类型 ContainerEventType.START_CONTAINER 进入到对应的执行逻辑中,并通过 handle() 方法交给对应的状态机执行。

    //位置:org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
        public void run() {
          ContainerId containerId = event.getContainerId();
          LOG.info("Processing Event " + event + " for Container " + containerId);
          // 对 ContainerEventType.QUERY_CONTAINER 事件单独处理
          if (event.getType() == ContainerEventType.QUERY_CONTAINER) {
            try {
              ContainerStatus containerStatus = client.getContainerStatus(
                  containerId, event.getNodeId());
              try {
                callbackHandler.onContainerStatusReceived(
                    containerId, containerStatus);
              } catch (Throwable thr) {
                // Don't process user created unchecked exception
                LOG.info(
                    "Unchecked exception is thrown from onContainerStatusReceived" +
                        " for Container " + event.getContainerId(), thr);
              }
            } catch (YarnException e) {
              onExceptionRaised(containerId, e);
            } catch (IOException e) {
              onExceptionRaised(containerId, e);
            } catch (Throwable t) {
              onExceptionRaised(containerId, t);
            }
          } else {
              // ContainerEventType.START_CONTAINER 和 ContainerEventType.STOP_CONTAINER 事件处理逻辑
            StatefulContainer container = containers.get(containerId);
            if (container == null) {
              LOG.info("Container " + containerId + " is already stopped or failed");
            } else {
              // 根据事件类型交给对应的状态机处理
              container.handle(event);
              if (isCompletelyDone(container)) {
                containers.remove(containerId);
              }
            }
          }
        }

    ContainerEventType.START_CONTAINER 事件的注册状态机为 StartContainerTransition。

    //位置:org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
        // Transitions from PREP state
        .addTransition(ContainerState.PREP,
            EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
            ContainerEventType.START_CONTAINER,
            new StartContainerTransition())

    StartContainerTransition 状态机里的转换方法 transition()。

    //位置:org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
        @Override
          public ContainerState transition(
              StatefulContainer container, ContainerEvent event) {
            ContainerId containerId = event.getContainerId();
            try {
              StartContainerEvent scEvent = null;
              if (event instanceof StartContainerEvent) {
                scEvent = (StartContainerEvent) event;
              }
              assert scEvent != null;
    
              //重点:调用 NMClient 类的 startContainer() 启动 Container
              Map<String, ByteBuffer> allServiceResponse =
                  container.nmClientAsync.getClient().startContainer(
                      scEvent.getContainer(), scEvent.getContainerLaunchContext());
              try {
                  // 通过回调的方式更新 Container 状态
                container.nmClientAsync.getCallbackHandler().onContainerStarted(
                    containerId, allServiceResponse);
              } catch (Throwable thr) {
                // Don't process user created unchecked exception
                LOG.info("Unchecked exception is thrown from onContainerStarted for "
                    + "Container " + containerId, thr);
              }
              // 返回 Container 的 RUNNING 状态
              return ContainerState.RUNNING;
            } catch (YarnException e) {
              return onExceptionRaised(container, event, e);
            } catch (IOException e) {
              return onExceptionRaised(container, event, e);
            } catch (Throwable t) {
              return onExceptionRaised(container, event, t);
            }
          }

    在这里看到了激动人心的 startContainer() 方法,不过别急,这里还没有到真正的启动 Container 的时候,这里首先获取到 AM 真正与 NM 交互的客户端 NMClient,并调用其实现类 NMClientImpl 的 startContainer() 方法,获取到与 NM 交互的 RPC 协议 ContainerManagementProtocol,并通过其协议的 startContainers() 方法实现 RPC 远程调用,来实现 Container 的启动。

    // 位置:org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
      public Map<String, ByteBuffer> startContainer(
          Container container, ContainerLaunchContext containerLaunchContext)
              throws YarnException, IOException {
        // 构建 StartContainer 对象
        StartedContainer startingContainer =
            new StartedContainer(container.getId(), container.getNodeId());
        synchronized (startingContainer) {
          addStartingContainer(startingContainer);
          
          Map<String, ByteBuffer> allServiceResponse;
          ContainerManagementProtocolProxyData proxy = null;
          try {
            proxy =
                cmProxy.getProxy(container.getNodeId().toString(),
                    container.getId());
            StartContainerRequest scRequest =
                StartContainerRequest.newInstance(containerLaunchContext,
                  container.getContainerToken());
            List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
            list.add(scRequest);
            StartContainersRequest allRequests =
                StartContainersRequest.newInstance(list);
    
            // 重点:获取到 RPC 调用协议 ContainerManagementProtocol,并通过 RPC 函数 startContainers 启动 Container
            StartContainersResponse response =
                proxy
                    .getContainerManagementProtocol().startContainers(allRequests);
            if (response.getFailedRequests() != null
                && response.getFailedRequests().containsKey(container.getId())) {
              Throwable t =
                  response.getFailedRequests().get(container.getId()).deSerialize();
              parseAndThrowException(t);
            }
            allServiceResponse = response.getAllServicesMetaData();
            startingContainer.state = ContainerState.RUNNING;
          } catch (YarnException e) {
            // 省略异常的状态返回
          } finally {
            if (proxy != null) {
              cmProxy.mayBeCloseProxy(proxy);
            }
          }
          return allServiceResponse;
        }
      }

    NMClient 调用 RPC 函数 ContainerManagementProtocol#startContainers() 启动 Container。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
      @Override
      public StartContainersResponse
          startContainers(StartContainersRequest requests) throws YarnException,
              IOException {
        if (blockNewContainerRequests.get()) {
          throw new NMNotYetReadyException(
            "Rejecting new containers as NodeManager has not"
                + " yet connected with ResourceManager");
        }
        UserGroupInformation remoteUgi = getRemoteUgi();
        NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
        authorizeUser(remoteUgi,nmTokenIdentifier);
        List<ContainerId> succeededContainers = new ArrayList<ContainerId>();
        Map<ContainerId, SerializedException> failedContainers =
            new HashMap<ContainerId, SerializedException>();
        for (StartContainerRequest request : requests.getStartContainerRequests()) {
          ContainerId containerId = null;
          try {
            ContainerTokenIdentifier containerTokenIdentifier =
                BuilderUtils.newContainerTokenIdentifier(request.getContainerToken());
            verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
              containerTokenIdentifier);
            containerId = containerTokenIdentifier.getContainerID();
    
            // 启动 Contain 的内部逻辑
            startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
              request);
            succeededContainers.add(containerId);
          } catch (YarnException e) {
            failedContainers.put(containerId, SerializedException.newInstance(e));
          } catch (InvalidToken ie) {
            failedContainers.put(containerId, SerializedException.newInstance(ie));
            throw ie;
          } catch (IOException e) {
            throw RPCUtil.getRemoteException(e);
          }
        }
    
        return StartContainersResponse.newInstance(getAuxServiceMetaData(),
          succeededContainers, failedContainers);
      }

    至此,AM 与 NM 的交互流程已实现,通过  RPC 函数 ContainerManagementProtocol#startContainers() 来启动 Container,那 Container 又是如何在 NM 上启动的呢?这一块我们留在后面介绍。

    2.2 Container 资源本地化

    上面过程中 AM 通过调用 RPC 函数 ContainerManagementProtocol#startContainers() 开始启动 Container,这部分我们来看看具体的启动逻辑,即 startContainerInternal() 方法。这里做了两件事

    • 发送 ApplicationEventType.INIT_APPLICATION 事件,对应用程序资源的初始化,主要是初始化各类必需的服务组件(如日志记录组件 LogHandler、资源状态追踪组件 LocalResourcesTrackerImpl等),供后续 Container 启动,通常来自 ApplicationMaster 的第一个 Container 完成,后续的 Container 跳过这段 Application 初始化过程。
    • 发送 ApplicationEventType.INIT_CONTAINER 事件,对 Container 进行初始化操作。(这部分事件留在 Container 启动环节介绍)
    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
      private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
          ContainerTokenIdentifier containerTokenIdentifier,
          StartContainerRequest request) throws YarnException, IOException {
    
        // 省略Token认证及ContainerLaunchContext上下文初始化
    
        this.readLock.lock();
        try {
          if (!serviceStopped) {
            // Create the application
            Application application =
                new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
            
            // 应用程序的初始化,供后续Container使用,这个逻辑只调用一次,通常由来自ApplicationMaster的第一个Container完成
            if (null == context.getApplications().putIfAbsent(applicationID,
              application)) {
              LOG.info("Creating a new application reference for app " + applicationID);
              LogAggregationContext logAggregationContext =
                  containerTokenIdentifier.getLogAggregationContext();
              Map<ApplicationAccessType, String> appAcls =
                  container.getLaunchContext().getApplicationACLs();
              context.getNMStateStore().storeApplication(applicationID,
                  buildAppProto(applicationID, user, credentials, appAcls,
                    logAggregationContext));
    
    
              // 1.向 ApplicationImpl 发送 ApplicationEventType.INIT_APPLICATION 事件
              dispatcher.getEventHandler().handle(
                new ApplicationInitEvent(applicationID, appAcls,
                  logAggregationContext));
            }
    
            // 2.向 ApplicationImpl 发送 ApplicationEventType.INIT_CONTAINER 事件
            this.context.getNMStateStore().storeContainer(containerId, request);
            dispatcher.getEventHandler().handle(
              new ApplicationContainerInitEvent(container));
    
            this.context.getContainerTokenSecretManager().startContainerSuccessful(
              containerTokenIdentifier);
            NMAuditLogger.logSuccess(user, AuditConstants.START_CONTAINER,
              "ContainerManageImpl", applicationID, containerId);
            // TODO launchedContainer misplaced -> doesn't necessarily mean a container
            // launch. A finished Application will not launch containers.
            metrics.launchedContainer();
            metrics.allocateContainer(containerTokenIdentifier.getResource());
          } else {
            throw new YarnException(
                "Container start failed as the NodeManager is " +
                "in the process of shutting down");
          }
        } finally {
          this.readLock.unlock();
        }
      }

    ApplicationEventType.INIT_APPLICATION 事件的状态转换过程,状态由 NEW 转变为 INITING,对应的状态机为 AppInitTransition。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
               // Transitions from NEW state
         .addTransition(ApplicationState.NEW, ApplicationState.INITING,
             ApplicationEventType.INIT_APPLICATION, new AppInitTransition())

    AppInitTransition 状态机设置 ACL 属性后,并向 LogHandler(目前有两种实现方式,分别是 LogAggregationService 和 NonAggregatingLogHandler 发送一个 LogHandlerEventType.APPLICATION_STARTED 事件。 

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
      static class AppInitTransition implements
          SingleArcTransition<ApplicationImpl, ApplicationEvent> {
        @Override
        public void transition(ApplicationImpl app, ApplicationEvent event) {
          ApplicationInitEvent initEvent = (ApplicationInitEvent)event;
          // 设置 ACL 属性
          app.applicationACLs = initEvent.getApplicationACLs();
          app.aclsManager.addApplication(app.getAppId(), app.applicationACLs);
          // Inform the logAggregator
          app.logAggregationContext = initEvent.getLogAggregationContext();
    
          // 向 LogHandler 发送 LogHandlerEventType.APPLICATION_STARTED 事件
          app.dispatcher.getEventHandler().handle(
              new LogHandlerAppStartedEvent(app.appId, app.user,
                  app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
                  app.applicationACLs, app.logAggregationContext)); 
        }
      }

    这里以 LogAggregationService 服务为例,当 LogHandler 收到 ApplicationEventType.APPLICATION_LOG_HANDLING_INITED 事件后,将创建应用程序日志目录、设置目录权限等。然后向 ApplicationImpl 发送一个 ApplicationEventType.APPLICATION_LOG_HANDLING_INITED 事件。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
      @Override
      public void handle(LogHandlerEvent event) {
        switch (event.getType()) {
          case APPLICATION_STARTED:
            LogHandlerAppStartedEvent appStartEvent =
                (LogHandlerAppStartedEvent) event;
            // 事情处理逻辑
            initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
                appStartEvent.getCredentials(),
                appStartEvent.getLogRetentionPolicy(),
                appStartEvent.getApplicationAcls(),
                appStartEvent.getLogAggregationContext());
            break;
          case CONTAINER_FINISHED:  // 省略
          case APPLICATION_FINISHED:  // 省略
          default:
            ; // Ignore
        }
      }
    
      private void initApp(final ApplicationId appId, String user,
          Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
          Map<ApplicationAccessType, String> appAcls,
          LogAggregationContext logAggregationContext) {
        ApplicationEvent eventResponse;
        try {
          // 创建应用程序日志目录、设置目录权限等
          verifyAndCreateRemoteLogDir(getConfig());
          initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls,
              logAggregationContext);
          eventResponse = new ApplicationEvent(appId,
              ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
        } catch (YarnRuntimeException e) {
          LOG.warn("Application failed to init aggregation", e);
          eventResponse = new ApplicationEvent(appId,
              ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
        }
        // 向 ApplicationImpl 发送 ApplicationEventType.APPLICATION_LOG_HANDLING_INITED 事件
        this.dispatcher.getEventHandler().handle(eventResponse);
      }

    ApplicationImpl 收到 ApplicationEventType.APPLICATION_LOG_HANDLING_INITED 事件后,直接向 ResourceLocalizationService 发送 LocalizationEventType.INIT_APPLICATION_RESOURCES 事件,此时 ApplicationImpl 仍处于 INITING 状态。ResourceLocalizationService 收到事件请求时进入到 handle() 逻辑处理,这里会创建一个 LocalResourcesTrackerImpl 对象,为接下来资源下载做准备,并向 ApplicationImpl 发送一个 ApplicationEventType.APPLICATION_INITED 事件。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
      public void handle(LocalizationEvent event) {
        // TODO: create log dir as $logdir/$user/$appId
        switch (event.getType()) {
        case INIT_APPLICATION_RESOURCES:  // 处理 LocalizationEventType.INIT_APPLICATION_RESOURCES 事件
          handleInitApplicationResources(
              ((ApplicationLocalizationEvent)event).getApplication());
          break;
        case INIT_CONTAINER_RESOURCES: // 省略
        case CONTAINER_RESOURCES_LOCALIZED:  // 省略
        case CACHE_CLEANUP:  // 省略
        case CLEANUP_CONTAINER_RESOURCES: // 省略
        case DESTROY_APPLICATION_RESOURCES: // 省略
        default:
          throw new YarnRuntimeException("Unknown localization event: " + event);
        }
      }
    
      private void handleInitApplicationResources(Application app) {
        String userName = app.getUser();
        
        // 创建 LocalResourcesTrackerImpl 对象,为接下来的资源下载做准备
        privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
            null, dispatcher, true, super.getConfig(), stateStore));
        String appIdStr = ConverterUtils.toString(app.getAppId());
        appRsrc.putIfAbsent(appIdStr, new LocalResourcesTrackerImpl(app.getUser(),
            app.getAppId(), dispatcher, false, super.getConfig(), stateStore));
        
        // 向 ApplicationImpl 发送 ApplicationEventType.APPLICATION_INITED 事件
        dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
              app.getAppId()));
      }

    ApplicationImpl 收到 ApplicationEventType.APPLICATION_INITED 事件后,依次向该应用程序已经保持的所有 Container 发送一个 INIT_CONTAINER 事件以通知它们进行初始化。此时,ApplicationImpl 运行状态由 INITING 转换为 RUNNING。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
      static class AppInitDoneTransition implements
          SingleArcTransition<ApplicationImpl, ApplicationEvent> {
        @Override
        public void transition(ApplicationImpl app, ApplicationEvent event) {
          // Start all the containers waiting for ApplicationInit
          for (Container container : app.containers.values()) {
            // 向应用程序保存的 Container 发送 INIT_CONTAINER 事件
            app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
                  container.getContainerId()));
          }
        }
      }

    ContainerImpl 收到 INIT_CONTAINER 事件后,先向附属服务 AuxServices 发送 APPLICATION_INIT 事件,以通知它有新的应用程序 Container 启动,然后从 ContainerLaunchContext 中获取各类可见性资源,并保存到 ContainerImpl 中特定的数据结构中,之后向 ResourceLocalizationService 发送 LocalizationEventType.INIT_CONTAINER_RESOURCES 事件,此时 ContainerImpl 运行状态已由 NEW 转换为 LOCALIZING。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
      static class RequestResourcesTransition implements
          MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
        @Override
        public ContainerState transition(ContainerImpl container,
            ContainerEvent event) {
          // 向 AuxService 发送 AuxServicesEventType.CONTAINER_INIT 事件
          container.dispatcher.getEventHandler().handle(new AuxServicesEvent
              (AuxServicesEventType.CONTAINER_INIT, container));
    
          // Inform the AuxServices about the opaque serviceData
          Map<String,ByteBuffer> csd = ctxt.getServiceData();
          if (csd != null) {
            // This can happen more than once per Application as each container may
            // have distinct service data
            for (Map.Entry<String,ByteBuffer> service : csd.entrySet()) {
              container.dispatcher.getEventHandler().handle(
                  new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT,
                      container.user, container.containerId
                          .getApplicationAttemptId().getApplicationId(),
                      service.getKey().toString(), service.getValue()));
            }
          }
    
          container.containerLocalizationStartTime = clock.getTime();
          // 从 ContainerLaunchContext 获取各类资源,并保持在数据结构中
          Map<String,LocalResource> cntrRsrc = ctxt.getLocalResources();
          if (!cntrRsrc.isEmpty()) {
            try {
              for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) {
                try {
                  LocalResourceRequest req =
                      new LocalResourceRequest(rsrc.getValue());
                  List<String> links = container.pendingResources.get(req);
                  if (links == null) {
                    links = new ArrayList<String>();
                    container.pendingResources.put(req, links);
                  }
                  links.add(rsrc.getKey());
                  switch (rsrc.getValue().getVisibility()) {
                  case PUBLIC:
                    container.publicRsrcs.add(req);
                    break;
                  case PRIVATE:
                    container.privateRsrcs.add(req);
                    break;
                  case APPLICATION:
                    container.appRsrcs.add(req);
                    break;
                  }
                } catch (URISyntaxException e) {
                  LOG.info("Got exception parsing " + rsrc.getKey()
                      + " and value " + rsrc.getValue());
                  throw e;
                }
              }
            } 
            
            // 向 ResourceLocalizationService 发送 LocalizationEventType.INIT_CONTAINER_RESOURCES 事件
            container.dispatcher.getEventHandler().handle(
                  new ContainerLocalizationRequestEvent(container, req));
            return ContainerState.LOCALIZING;
          } else {
              // 这种情况是 Contaienr 已经进行了资源初始化操作,这里直接运行 Container
            container.sendLaunchEvent();
            container.metrics.endInitingContainer();
            return ContainerState.LOCALIZED;
          }
        }
      }

    ResourceLocalizationService 收到 LocalizationEventType.INIT_CONTAINER_RESOURCES 事件后,依次将 Container 所需的资源封装成一个 REQUEST 事件,发送给对应的资源状态追踪器 LocalResourcesTrackerImpl。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
      public void handle(LocalizationEvent event) {
        // TODO: create log dir as $logdir/$user/$appId
        switch (event.getType()) {
        case INIT_APPLICATION_RESOURCES:  // 省略
        case INIT_CONTAINER_RESOURCES:
           // 将 Container 所需的资源单独封装成一个 REQUEST 事件,发送给对应的资源状态跟踪器 LocalResourcesTrackerImpl
          handleInitContainerResources((ContainerLocalizationRequestEvent) event);
          break;
        case CONTAINER_RESOURCES_LOCALIZED:  // 省略
        case CACHE_CLEANUP:  // 省略
        case CLEANUP_CONTAINER_RESOURCES:  // 省略
        case DESTROY_APPLICATION_RESOURCES:  // 省略
        default:
          throw new YarnRuntimeException("Unknown localization event: " + event);
        }
      }

    LocalResourcesTrackerImpl 收到 REQUEST 事件后,将为对应的资源创建一个状态机对象 LocalizeResource 以跟踪资源的生命周期,并将 REQUEST 事件进一步传送给 LocalizedResource。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
        // From INIT (ref == 0, awaiting req)
        .addTransition(ResourceState.INIT, ResourceState.DOWNLOADING,
            ResourceEventType.REQUEST, new FetchResourceTransition())

    LocalizedResource 收到 REQUEST 事件后,将待下载资源信息通过 LocalizerEventType.REQUEST_RESOURCE_LOCALIZATION 事件发送给资源下载服务 ResourceLocalizationService,之后 LocalizedResource 状态由 NEW 转换为 DOWNLOADING。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
      private static class FetchResourceTransition extends ResourceTransition {
        @Override
        public void transition(LocalizedResource rsrc, ResourceEvent event) {
          ResourceRequestEvent req = (ResourceRequestEvent) event;
          LocalizerContext ctxt = req.getContext();
          ContainerId container = ctxt.getContainerId();
          rsrc.ref.add(container);
          rsrc.dispatcher.getEventHandler().handle(
              new LocalizerResourceRequestEvent(rsrc, req.getVisibility(), ctxt, 
                  req.getLocalResourceRequest().getPattern()));
        }
      }

    ResourceLocalizationService 收到 LocalizerEventType.REQUEST_RESOURCE_LOCALIZATION 事件后,将交给 LocalizerTracker 服务处理,如果是 PUBLIC 资源,则统一交给 PublicLocalizer 处理,否则检查是否已经为该 Container 创建了 LocalizerRunner 线程,如果没有,则创建一个,否则直接添加到该线程的下载队列中。该线程会调用 ContainerExecutor#startLocalizer() 函数下载资源,该函数通过协议 LocalizationProtocol 与 ResourceLocalizationService 通信,以顺序获取待下载资源位置下载。待资源下载完成后,PublicLocalize 或者 LocalizerRunner 都会向 LocalizedResource 发送一个 LOCALIZED 事件。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
        public void handle(LocalizerEvent event) {
          String locId = event.getLocalizerId();
          switch (event.getType()) {
          case REQUEST_RESOURCE_LOCALIZATION:
            // 0) find running localizer or start new thread
            LocalizerResourceRequestEvent req =
              (LocalizerResourceRequestEvent)event;
            //根据 REQUEST 资源判断资源的可见性
            switch (req.getVisibility()) {
            // 如果是 PUBLIC 资源,则交给线程 PublicLocalizer 处理
            case PUBLIC:
              publicLocalizer.addResource(req);
              break;
            case PRIVATE:
            case APPLICATION:
              synchronized (privLocalizers) {
                LocalizerRunner localizer = privLocalizers.get(locId);
                // 检查是否创建了 LocalizerRunner 线程
                if (null == localizer) {
                  LOG.info("Created localizer for " + locId);
                  localizer = new LocalizerRunner(req.getContext(), locId);
                  privLocalizers.put(locId, localizer);
                  localizer.start();
                }
                // 1) propagate event
                localizer.addResource(req);
              }
              break;
            }
            break;
          }
        }

    LocalizedResource 收到 LOCALIZED 事件后,会向 ContainerImpl 发送一个 ContainerEventType.RESOURCE_LOCALIZED 事件,并且将状态从 DOWNLOADING 转换为 LOCALIZED。ContainerImpl 收到事件后,会检查所依赖的资源是否全部下载完毕,如果下载完成则向 ContainersLauncher 服务发送一个 LAUNCH_CONTAINER 事件,以启动对应 Container。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
      private static class FetchSuccessTransition extends ResourceTransition {
        @Override
        public void transition(LocalizedResource rsrc, ResourceEvent event) {
          ResourceLocalizedEvent locEvent = (ResourceLocalizedEvent) event;
          rsrc.localPath =
              Path.getPathWithoutSchemeAndAuthority(locEvent.getLocation());
          rsrc.size = locEvent.getSize();
          for (ContainerId container : rsrc.ref) {
            // 向 ContainerImpl 发送 ContainerEventType.RESOURCE_LOCALIZED 事件
            rsrc.dispatcher.getEventHandler().handle(
                new ContainerResourceLocalizedEvent(
                  container, rsrc.rsrc, rsrc.localPath));
          }
        }
      }

    至此,Container 资源本地化资源已下载完毕,接下来就开始启动和运行 Container。

    2.3 启动和运行 Container

    Container 运行是由 ContainersLauncher 服务实现的,主要过程可概括为:将待运行的 Container 所需的环境和运行命令写到 Shell 脚本 launch_container.sh 脚本中,并将启动该脚本的命令写入 default_container_executro.sh 中,然后通过该脚本启动 Container。之所以要将 Container 运行命令写到脚本中并通过运行脚本来执行它,主要是直接执行命令可能让一些特殊符号发生转义。

    上面主要介绍 startContainerInternal() 的第一个事件处理,接下来看第一个事件的处理,以及如何启动和运行 Container。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
      private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
          ContainerTokenIdentifier containerTokenIdentifier,
          StartContainerRequest request) throws YarnException, IOException {
    
        // 省略Token认证及ContainerLaunchContext上下文初始化
    
        this.readLock.lock();
        try {
          if (!serviceStopped) {
            // Create the application
            Application application =
                new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
            
            // 应用程序的初始化,供后续Container使用,这个逻辑只调用一次,通常由来自ApplicationMaster的第一个Container完成
            if (null == context.getApplications().putIfAbsent(applicationID,
              application)) {
              LOG.info("Creating a new application reference for app " + applicationID);
              LogAggregationContext logAggregationContext =
                  containerTokenIdentifier.getLogAggregationContext();
              Map<ApplicationAccessType, String> appAcls =
                  container.getLaunchContext().getApplicationACLs();
              context.getNMStateStore().storeApplication(applicationID,
                  buildAppProto(applicationID, user, credentials, appAcls,
                    logAggregationContext));
    
    
              // 1.向 ApplicationImpl 发送 ApplicationEventType.INIT_APPLICATION 事件
              dispatcher.getEventHandler().handle(
                new ApplicationInitEvent(applicationID, appAcls,
                  logAggregationContext));
            }
    
            // 2.向 ApplicationImpl 发送 ApplicationEventType.INIT_CONTAINER 事件
            this.context.getNMStateStore().storeContainer(containerId, request);
            dispatcher.getEventHandler().handle(
              new ApplicationContainerInitEvent(container));
    
            this.context.getContainerTokenSecretManager().startContainerSuccessful(
              containerTokenIdentifier);
            NMAuditLogger.logSuccess(user, AuditConstants.START_CONTAINER,
              "ContainerManageImpl", applicationID, containerId);
            // TODO launchedContainer misplaced -> doesn't necessarily mean a container
            // launch. A finished Application will not launch containers.
            metrics.launchedContainer();
            metrics.allocateContainer(containerTokenIdentifier.getResource());
          } else {
            throw new YarnException(
                "Container start failed as the NodeManager is " +
                "in the process of shutting down");
          }
        } finally {
          this.readLock.unlock();
        }
      }

    这里触发了 Application 的事件 ApplicationEventType.INIT_CONTAINER,下面是该事件的状态转换过程及对应注册的状态机。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
        // Transitions from NEW state
        .addTransition(ApplicationState.NEW, ApplicationState.NEW,
            ApplicationEventType.INIT_CONTAINER,
            new InitContainerTransition())

    InitContainerTransition 状态机的处理逻辑。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
      static class InitContainerTransition implements
          SingleArcTransition<ApplicationImpl, ApplicationEvent> {
        @Override
        public void transition(ApplicationImpl app, ApplicationEvent event) {
          ApplicationContainerInitEvent initEvent =
            (ApplicationContainerInitEvent) event;
          Container container = initEvent.getContainer();
          app.containers.put(container.getContainerId(), container);
          LOG.info("Adding " + container.getContainerId()
              + " to application " + app.toString());
          
          switch (app.getApplicationState()) {
          case RUNNING:
            // 应用程序提交后app是RUNNING状态,这里向调度器发送 ContainerEventType.INIT_CONTAINER 事件
            app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
                container.getContainerId()));
            break;
          case INITING:
          case NEW:
            // these get queued up and sent out in AppInitDoneTransition
            break;
          default:
            assert false : "Invalid state for InitContainerTransition: " +
                app.getApplicationState();
          }
        }
      }

    ContainerEventType.INIT_CONTAINER 事件对应的状态转换及注册的状态机。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
        // From NEW State
        .addTransition(ContainerState.NEW,
            EnumSet.of(ContainerState.LOCALIZING,
                ContainerState.LOCALIZED,
                ContainerState.LOCALIZATION_FAILED,
                ContainerState.DONE),
            ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())

    RequestResourcesTransition 状态机行为的关键在于 sendLaunchEvent() 方法的调用,发送 Container 启动的事情请求,向调度器发送 ContainersLauncherEventType.LAUNCH_CONTAINER 事件。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
      static class RequestResourcesTransition implements
          MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
        @Override
        public ContainerState transition(ContainerImpl container,
            ContainerEvent event) {
            // 省略一些检查逻辑
    
            // 重点:发送启动Container的操作
            container.sendLaunchEvent();
            container.metrics.endInitingContainer();
            return ContainerState.LOCALIZED;
        }
      }
    
       private void sendLaunchEvent() {
        ContainersLauncherEventType launcherEvent =
            ContainersLauncherEventType.LAUNCH_CONTAINER;
        if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
          // try to recover a container that was previously launched
          launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
        }
        containerLaunchStartTime = clock.getTime();
        // 向调度器发送 ContainersLauncherEventType.LAUNCH_CONTAINER 事件请求
        dispatcher.getEventHandler().handle(
            new ContainersLauncherEvent(this, launcherEvent));
      }

    这里向调度器发送 ContainersLauncherEventType.LAUNCH_CONTAINER 事件请求,之前发送事件状态转换过程不太一样,在代码中我们找到该事件的状态转换过程及注册状态机,那是由谁来处理这个事件请求呢?我们就需要看看 ContainersLauncherEventType 事件类注册的地方。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
      public ContainerManagerImpl(Context context, ContainerExecutor exec,
          DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
          NodeManagerMetrics metrics, ApplicationACLsManager aclsManager,
          LocalDirsHandlerService dirsHandler) {
    
        dispatcher.register(ContainerEventType.class,
            new ContainerEventDispatcher());
        dispatcher.register(ApplicationEventType.class,
            new ApplicationEventDispatcher());
        dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc);
        dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
        dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
    
        // ContainersLauncherEventType 事件类的注册方法
        dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
        
        addService(dispatcher);
    
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        this.readLock = lock.readLock();
        this.writeLock = lock.writeLock();
      }

    可以看出 ContainersLauncherEventType 事件类型类注册的事件处理器为 ContainersLauncher 类,那该类又是如何处理 ContainersLauncherEventType.LAUNCH_CONTAINER 事件请求呢?

    //位置:rg/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
      public void handle(ContainersLauncherEvent event) {
        // TODO: ContainersLauncher launches containers one by one!!
        Container container = event.getContainer();
        ContainerId containerId = container.getContainerId();
        switch (event.getType()) {
          case LAUNCH_CONTAINER:
            Application app =
              context.getApplications().get(
                  containerId.getApplicationAttemptId().getApplicationId());
    
            // LAUNCH_CONTAINER 事件的处理逻辑,创建 ContainerLaunch 线程并启动线程
            ContainerLaunch launch =
                new ContainerLaunch(context, getConfig(), dispatcher, exec, app,
                  event.getContainer(), dirsHandler, containerManager);
            containerLauncher.submit(launch);
    
    
            // 将其加入到运行的 Container 数据结构 running 中
            running.put(containerId, launch);
            break;
          case RECOVER_CONTAINER: // 省略
          case CLEANUP_CONTAINER: //省略
        }
      }

    这里的 ContainerLaunch 类是真正启动 Container 的类,ContainerLaunch 类继承自 Callable 类,线程启动的方式是通过 submit() 方法提交,调用 Callable 类的实现方法 call() 来真正执行线程。启动过程主要做了三件事:

    • 准备 Container 的执行环境;
    • 更新 Container 状态,从 LOCALIZED 转换为 RUNNING;
    • 调用 ContainerExecutor 对象在 NM 节点上启动 Container
    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
      public Integer call() {
        final ContainerLaunchContext launchContext = container.getLaunchContext();
        
          /**
          * 启动 Container 前的准备工作:(省略)
          * 1.shell启动脚本的封装与拓展(添加自定义脚本)
          * 2.创建本地工作目录
          * 3.设置token的保存路径
          */
       
          try {
              // 由于 call() 方法调用是阻塞的,这里先发送 ContainerEventType.CONTAINER_LAUNCHED 事件,将 Container 状态从LOCALIZED 转换为 RUNNING
              dispatcher.getEventHandler().handle(new ContainerEvent(
                    containerID,
                    ContainerEventType.CONTAINER_LAUNCHED));
              context.getNMStateStore().storeContainerLaunched(containerID);
    
              // Check if the container is signalled to be killed.
              if (!shouldLaunchContainer.compareAndSet(false, true)) {
                LOG.info("Container " + containerIdStr + " not launched as "
                    + "cleanup already called");
                ret = ExitCode.TERMINATED.getExitCode();
              }
              else {
                  // 重点:调用 ContainerExecutor 对象启动 Contianer
                exec.activateContainer(containerID, pidFilePath);
                ret = exec.launchContainer(container, nmPrivateContainerScriptPath,
                        nmPrivateTokensPath, user, appIdStr, containerWorkDir,
                        localDirs, logDirs);
              }
        } 
    
        // Container 执行结果返回,判断是否成功执行(省略)
        
        LOG.info("Container " + containerIdStr + " succeeded ");
        dispatcher.getEventHandler().handle(
            new ContainerEvent(containerID,
                ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
        return 0;
      }

    Container 的运行环境已经准备好,接下来就是真正在 NM 上真正启动 Container 的过程,具体启动是调用 ContainerExecutor#launchContainer() 方法。运行 Container 是由插拔式组件 ContainerExecutor 完成,Yarn 中提供了三种 ContainerExecutor 实现,一种是 DefaultContainerExecutor,一种是 LinuxContainerExecutor,另一种是 DockerContainerExecutor,由参数 yarn.nodemanager.container-executor.class 控制其具体使用方式。

    2.4 Container 资源清理

    Container 资源清理是指 Container 运行完成后(可能成功或者失败),NM 需回收它占用的资源,这些资源主要是 Container 运行时使用的临时文件,主要来源是 ResourceLocalizationService 和 ContianerExecutor 两个服务/组件,其中 ResourceLocalizationService 将数据 HDFS 文件下载到本地,ContainerExecutor 为 Container 创建私有工作目录,并保存一些临时文件(比如 Container 进程 pid 文件)。因此,Container 资源清理过程主要是通知这两个组件删除临时目录。

    从 ContainerLaunch#call() 方法结束处,当 Container 成功运行完成后,会向调度器发送 ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS 事件。该事件的注册状态转换如下,将 Container 状态 从 RUNNING 转换为 EXITED_WITH_SUCCESS,并触发状态机 ExitedWithSuccessTransition。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
        // From RUNNING State
        .addTransition(ContainerState.RUNNING,
            ContainerState.EXITED_WITH_SUCCESS,
            ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
            new ExitedWithSuccessTransition(true))

    ExitedWithSuccessTransition 状态过程会发送 ContainersLauncherEventType.CLEANUP_CONTAINER 事件,该事件发送了两个事件:

    • 向 ContainerLauncher 发送 ContainersLauncherEventType.CLEANUP_CONTAINER 清理事件;

    • 向 ResourceLocalizationService 发送 LocalizationEventType.CLEANUP_CONTAINER_RESOURCES 清理事件。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
      static class ExitedWithSuccessTransition extends ContainerTransition {
    
        boolean clCleanupRequired;
    
        public ExitedWithSuccessTransition(boolean clCleanupRequired) {
          this.clCleanupRequired = clCleanupRequired;
        }
    
        @Override
        public void transition(ContainerImpl container, ContainerEvent event) {
          // Set exit code to 0 on success        
          container.exitCode = 0;
            
          // TODO: Add containerWorkDir to the deletion service.
    
          if (clCleanupRequired) {
              // 向 ContainerLauncher 发送 ContainersLauncherEventType.CLEANUP_CONTAINER 清理事件
            container.dispatcher.getEventHandler().handle(
                new ContainersLauncherEvent(container,
                    ContainersLauncherEventType.CLEANUP_CONTAINER));
          }
    
          // 向 ResourceLocalizationService 发送 LocalizationEventType.CLEANUP_CONTAINER_RESOURCES 清理事件
          container.cleanup();
        }
      }

    先来看看 ContainerLauncher 清理临时目录的过程。ContainersLauncherEventType.CLEANUP_CONTAINER 事件的处理逻辑最终会进入到 ContainersLauncher 的 handle() 方法,将 Container 从正在运行的 Container 列表中移除,并调用 ContainerLaunch#cleanupContainer() 方法清除 Container 占用的临时目录。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
      public void handle(ContainersLauncherEvent event) {
        // TODO: ContainersLauncher launches containers one by one!!
        Container container = event.getContainer();
        ContainerId containerId = container.getContainerId();
        switch (event.getType()) {
          case LAUNCH_CONTAINER:  // 省略
          case RECOVER_CONTAINER: // 省略
          case CLEANUP_CONTAINER:
            // 将 Container 从正在运行 Container 列表中移除
            ContainerLaunch launcher = running.remove(containerId);
            if (launcher == null) {
              // Container not launched. So nothing needs to be done.
              return;
            }
    
            // Cleanup a container whether it is running/killed/completed, so that
            // no sub-processes are alive.
            try {
              // 清理 Container 占用的临时目录
              launcher.cleanupContainer();
            } catch (IOException e) {
              LOG.warn("Got exception while cleaning container " + containerId
                  + ". Ignoring.");
            }
            break;
        }
      }

    再来看看 ResourceLocalizationService 清除 Container 用户工作目录和 NM 私有目录下的 Container 目录。根据发送的发送 LocalizationEventType.CLEANUP_CONTAINER_RESOURCES 清理事件,可以进入到对应的清理逻辑 handleCleanupContainerResources(),执行具体的清理逻辑。该逻辑将会删除用户工作 ${yarn.nodemanager.local-dirs}/usercache/<user>/appcache/${appid}/${containerid} 的数据(即从 HDFS 下载的数据),和 ${yarn.nodemanager.local-dirs}/nmPrivate/${appid}/${containerid} 私有目录数据,这两个目标都存放了 Tokens 文件和 Shell 运行脚本。

    //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
      public void handle(LocalizationEvent event) {
        switch (event.getType()) {
        case INIT_APPLICATION_RESOURCES:  // 省略
        case INIT_CONTAINER_RESOURCES:  // 省略
        case CONTAINER_RESOURCES_LOCALIZED:  // 省略
        case CACHE_CLEANUP:  // 省略
        case CLEANUP_CONTAINER_RESOURCES:
          handleCleanupContainerResources((ContainerLocalizationCleanupEvent)event);
          break;
        case DESTROY_APPLICATION_RESOURCES: //省略
        default:
          throw new YarnRuntimeException("Unknown localization event: " + event);
        }
      }

    至此,Container 资源清理流程已完成。

    【参考资料】

  • 相关阅读:
    newcoder 筱玛的迷阵探险(搜索 + 01字典树)题解
    str&repr的使用&format模板的自定义
    内置函数的补充与getattrebuit & item系列
    python几种常用模块
    面向对象的反射&动态导入模块
    面向对象的封装&定制数据类型
    面向对象的多态
    面向对象的继承
    面向对象的属性与方法
    面向对象的属性及类的增删改查
  • 原文地址:https://www.cnblogs.com/lemonu/p/13901050.html
Copyright © 2011-2022 走看看