zoukankan      html  css  js  c++  java
  • Yarn任务提交流程(源码分析)

    关键词:yarn rm mapreduce 提交

    Based on Hadoop 2.7.1

    JobSubmitter

    • addMRFrameworkToDistributedCache(Configuration conf) : mapreduce.application.framework.path, 用于指定其他framework的hdfs 路径配置,默认yarn的可以不管
    • Token相关的方法:读取认证信息(支持二进制、json),并将其添加至相应的fileSystem中,以便以同样权限访问文件系统
    • copyAndConfigureFiles(Job job, Path jobSubmitDir): 上传配置、jar、files、libjars、archives等
    • submitJobInternal: 真正的提交任务接口

    核心代码提交链

    1. JobSubmitter -> 
    2. ClientProtocol(YARNRunner) -> 
    3. ResourceMgrDelegate -> 
    4. YarnClient(YarnClientImpl).submitApplication( ApplicationSubmissionContext appContext) -> 
    5. 【RM】ApplicationClientProtocol(ClientRMService).submitApplication( SubmitApplicationRequest request) -> // fill ASC with dft values
    6. RMAppManager.submitApplication( ApplicationSubmissionContext submissionContext, long submitTime, String user) -> 

    • ApplicationSubmissionContext 提交上下文,包含application各种元信息
    • SubmitApplicationRequest 提交Request对象
    // Dispatcher is not yet started at this time, so these START events
    // enqueued should be guaranteed to be first processed when dispatcher
    // gets started.
    this.rmContext.getDispatcher().getEventHandler()
        .handle(new RMAppEvent(applicationId, RMAppEventType.START));
    

    START -> APP_NEW_SAVED 

     stateMachineFactory.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
            RMAppEventType.START, new RMAppNewlySavingTransition())
            
    //...
    
    private static final class RMAppNewlySavingTransition extends RMAppTransition {
        @Override
        public void transition(RMAppImpl app, RMAppEvent event) {
    
          // If recovery is enabled then store the application information in a
          // non-blocking call so make sure that RM has stored the information
          // needed to restart the AM after RM restart without further client
          // communication
          LOG.info("Storing application with id " + app.applicationId);
          app.rmContext.getStateStore().storeNewApplication(app);
        }
      }
      
      public synchronized void storeNewApplication(RMApp app) {
        ApplicationSubmissionContext context = app
                                                .getApplicationSubmissionContext();
        assert context instanceof ApplicationSubmissionContextPBImpl;
        ApplicationStateData appState =
            ApplicationStateData.newInstance(
                app.getSubmitTime(), app.getStartTime(), context, app.getUser());
        dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
      }
      
      
       private static final class AddApplicationToSchedulerTransition extends
          RMAppTransition {
        @Override
        public void transition(RMAppImpl app, RMAppEvent event) {
          app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
            app.submissionContext.getQueue(), app.user,
            app.submissionContext.getReservationID()));
        }
      }

    AppAddedSchedulerEvent 会由配置的Scheduler来handle。

    P.S. 看 event 部分代码的方法,

    1. 找出状态,比如 APP_NEW_SAVED,
    2. 找出handle这个状态的事件类
    3. 找出处理这个事件的具体逻辑 (这里可能逻辑最复杂)
    4. 找下一个事件
    5. 重复。。

    ApplicationMaster

    START -> APPNEWSAVED -> APP_ACCEPTED ....

    后面是一些attempt的启动等各种事件的反复。直接跳到 AM 部分。

    ResourceManager内有 createApplicationMasterLauncher() 和 createApplicationMasterService() 

    private void launch() throws IOException, YarnException {
        connect();
        ContainerId masterContainerID = masterContainer.getId();
        ApplicationSubmissionContext applicationContext =
          application.getSubmissionContext();
        LOG.info("Setting up container " + masterContainer
            + " for AM " + application.getAppAttemptId());  
        ContainerLaunchContext launchContext =
            createAMContainerLaunchContext(applicationContext, masterContainerID);
    
        StartContainerRequest scRequest =
            StartContainerRequest.newInstance(launchContext,
              masterContainer.getContainerToken());
        List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
        list.add(scRequest);
        StartContainersRequest allRequests =
            StartContainersRequest.newInstance(list);
    
        StartContainersResponse response =
            containerMgrProxy.startContainers(allRequests);
        if (response.getFailedRequests() != null
            && response.getFailedRequests().containsKey(masterContainerID)) {
          Throwable t =
              response.getFailedRequests().get(masterContainerID).deSerialize();
          parseAndThrowException(t);
        } else {
          LOG.info("Done launching container " + masterContainer + " for AM "
              + application.getAppAttemptId());
        }
      }
      
      
       private ContainerLaunchContext createAMContainerLaunchContext(
          ApplicationSubmissionContext applicationMasterContext,
          ContainerId containerID) throws IOException {
    
        // Construct the actual Container
        ContainerLaunchContext container = 
            applicationMasterContext.getAMContainerSpec();
        LOG.info("Command to launch container "
            + containerID
            + " : "
            + StringUtils.arrayToString(container.getCommands().toArray(
                new String[0])));
        
        // Finalize the container
        setupTokens(container, containerID);
        
        return container;
      }
      

    注意以上其中两行:

    • ContainerLaunchContext launchContext = createAMContainerLaunchContext(applicationContext, masterContainerID) 创建 AM 请求
    • StartContainersResponse response = containerMgrProxy.startContainers(allRequests); 启动AM的容器并在容器内启动AM。
      @Override
      public ContainerLaunchContext getAMContainerSpec() {
        ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
        if (this.amContainer != null) {
          return amContainer;
        } // Else via proto
        if (!p.hasAmContainerSpec()) {
          return null;
        }
        amContainer = convertFromProtoFormat(p.getAmContainerSpec());
        return amContainer;
      }
      
      public class ApplicationSubmissionContextPBImpl 
    extends ApplicationSubmissionContext {
      ApplicationSubmissionContextProto proto = 
          ApplicationSubmissionContextProto.getDefaultInstance();
      ApplicationSubmissionContextProto.Builder builder = null;
      boolean viaProto = false;
      
      private ApplicationId applicationId = null;
      private Priority priority = null;
      private ContainerLaunchContext amContainer = null;
      private Resource resource = null;
      private Set<String> applicationTags = null;
      private ResourceRequest amResourceRequest = null;
      private LogAggregationContext logAggregationContext = null;
      private ReservationId reservationId = null;
    
      /// ...
      }

    接下来便是启动后的AppMaster 创建job,并通过AMRMClient向ResourceManager申请资源等。

  • 相关阅读:
    第2章 数据类型、运算符和表达式
    全国计算机等级考试二级教程(2021年版)C++语言程序设计 目录
    F# 中的异步快排
    Define a static property for F# class
    Get the memory address of Array in F#
    在递归中使用Continuation来避免StackOverflow(查找第K大的数)
    Details about constant value in C#( from CLR via C#)
    How to check whether an F# function/method has been initialized
    F#实现图及其部分操作
    Multi constructor for classes in F#
  • 原文地址:https://www.cnblogs.com/lhfcws/p/7127990.html
Copyright © 2011-2022 走看看