zoukankan      html  css  js  c++  java
  • 客户端MapReduce提交到YARN过程

    在Mapreduce v1中是使用JobClient来和JobTracker交互完成Job的提交,用户先创建一个Job,通过JobConf设置好参数,通过JobClient提交并监控Job的进展,在JobClient中有一个内部成员变量JobSubmissionProtocol,JobTracker实现了该接口,通过该协议客户端和JobTracker通信完成作业的提交
      public void init(JobConf conf) throws IOException {
        String tracker = conf.get("mapred.job.tracker", "local");
        tasklogtimeout = conf.getInt(
          TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);
        this.ugi = UserGroupInformation.getCurrentUser();
        //如果mapred.job.tracker设置成local,则创建本地LocalJobRunner,否则创建RPC代理
        if ("local".equals(tracker)) {
          conf.setNumMapTasks(1);
          this.jobSubmitClient = new LocalJobRunner(conf);
        } else {
          this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
        }        
      }
    按顺序调用: 
    Job.waitForCompletion()
    Job.submit()
    jobClient.submitJobInternal()
    jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials()) 
    完成作业提交

    而YARN的作业提交procotol是ClientRMProtocol 提交MRv2作业时,首先会生成集群信息类cluster,里面有一个frameworkLoader内部变量会从配置文件中加载ClientProtocolProvider的实现类,这里 分别是LocalClientProtocolProvider和 YarnClientProtocolProvider 。Cluster类在initialize中,会遍历frameworkLoader,由ClientProtocolProvider来生成具体的ClientProtocol ,比如在YarnClientProtocolProvider中就会判断JobConf中的 mapreduce.framework.name是否为 yarn,如果是的话则会生成YARNRunner
    YarnClientProtocolProvider的create方法:
      @Override
      public ClientProtocol create(Configuration conf) throws IOException {
        if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
          return new YARNRunner(conf);
        }
        return null;
      }

    ClientProtocol目前有两个实现 YARNRunner 和LocalJobRunner,LocalJobRunner(mapreduce.framework.name为local )主要是在本地执行mapreduce,可以方便对程序进行调试。YARNRunner是将作业提交到YARN上 。
    YARNRunner初始化会和ResourceManager建立RPC链接(默认是8032端口 ),真正和RM通信的协议是 ClientRMProtocol ,客户端和RM交互的所有操作都会通过YARNRunner的成员变量 rmClient( ClientRMProtocol )提交出去,比如killApplication, getNodeReports, getJobCounters等等
      public synchronized void start() {
        YarnRPC rpc = YarnRPC.create(getConfig());
        this.rmClient = (ClientRMProtocol) rpc.getProxy(
            ClientRMProtocol.class, rmAddress, getConfig());
        if (LOG.isDebugEnabled()) {
          LOG.debug("Connecting to ResourceManager at " + rmAddress);
        }
        super.start();
      }

    Cluster类初始化完成后,就要生成Application了,先和RM通信申请一个Application(getNewApplication ),得到一个GetNewApplicationResponse,里面封装了ApplicationID,和RM能提供的最小、最大Resource Capacity
    public interface GetNewApplicationResponse {
      public abstract ApplicationId getApplicationId();
      public Resource getMinimumResourceCapability();
      public Resource getMaximumResourceCapability();
      public void setMaximumResourceCapability(Resource capability); 
    }
    Resource定义了一组集群计算资源,目前只把memory和cpu纳入进来,这边的cpu指virtual core,也就是一个物理core可以被认为抽象成多个virtual core,而非一对一对应关系
    public abstract class Resource implements Comparable<Resource> {
      public abstract int getMemory();
      public abstract void setMemory(int memory);
      public abstract int getVirtualCores();
      public abstract void setVirtualCores(int vCores);
    }

    然后需要构造ApplicationSubmissionContext,其中包含了启动MR AM的信息, 比如提交的job在HDFS的staging目录路径(job.xml,  job.split, job.splitmetainfo, libjars, files, archives等 ),用户ugi信息,Secure Tokens。完成context构造后,调用resMgrDelegate.submitApplication(appContext)
    YARNRunner的submitJob方法:
      @Override
      public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
      throws IOException, InterruptedException {
        // Construct necessary information to start the MR AM
        ApplicationSubmissionContext appContext =
          createApplicationSubmissionContext(conf, jobSubmitDir, ts);
    
        // Submit to ResourceManager
        ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);
    
        ApplicationReport appMaster = resMgrDelegate.getApplicationReport(applicationId);
        String diagnostics = (appMaster == null ?
                "application report is null" : appMaster.getDiagnostics());
        if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
            || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
          throw new IOException("Failed to run job : " +
            diagnostics);
        }
        return clientCache.getClient(jobId).getJobStatus(jobId);
      }

    最后通过getJobStatus方法获得Job状态信息
        org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
          TypeConverter.toYarn(oldJobID);
        GetJobReportRequest request =
            recordFactory.newRecordInstance(GetJobReportRequest.class);
        request.setJobId(jobId);
        JobReport report = ((GetJobReportResponse) invoke("getJobReport",
            GetJobReportRequest.class, request)).getJobReport();


  • 相关阅读:
    C#操作Word (2)-- 打开&关闭Word文档
    JS input 银行卡号格式转换
    解决H5在微信浏览器或QQ浏览器修改title的问题
    CSS
    Atom 编辑器使用和学习
    php的一个小坑,输出不了json_encode
    js 组合键监听ctrl + enter
    webpack3.0 环境搭建
    css 使表格随着内容自动适应宽度
    获取input光标的x和y轴
  • 原文地址:https://www.cnblogs.com/keanuyaoo/p/3271412.html
Copyright © 2011-2022 走看看