zoukankan      html  css  js  c++  java
  • Job提交流程

    1.【Driver.class】-- Job job = Job.getInstance(conf);
        -->【job.class】getInstance(conf)
        --> new JobConf(conf) //构建一个空集群配置对象
      说明:将默认configuration(4个配置文件)包装成Jobconf
    2.设置相关参数项:
      job.setJarByClass(AirMapper.class); --> 【MRJobConfig】JobContext.JAR:mapreduce.job.jar
      job.setJobName("Text local"); --> 【MRJobConfig】JobContext.JOB_NAME:mapreduce.job.name
      job.setMapperClass(AirMapper.class); --> 【MRJobConfig】MAP_CLASS_ATTR:mapreduce.job.map.class
      job.setReducerClass(AirReducer.class); --> 【MRJobConfig】REDUCE_CLASS_ATTR:mapreduce.job.reduce.class
      job.setOutputKeyClass(Text.class); --> 【MRJobConfig】JobContext.OUTPUT_KEY_CLASS:mapreduce.job.output.key.class
      job.setOutputValueClass(IntWritable.class); --> 【MRJobConfig】JobContext.OUTPUT_VALUE_CLASS:mapreduce.job.output.value.class
      job.setCombinerClass(AirCombiner.class); --> 【MRJobConfig】COMBINE_CLASS_ATTR:mapreduce.job.combine.class

    FileInputFormat.addInputPath(job, new Path("file:///D:/airdata"));
      -->【FileInputFormat.class】 INPUT_DIR:"mapreduce.input.fileinputformat.inputdir";
    FileOutputFormat.setOutputPath(job,outfile);
      -->【FileOutputFormat.class】FileOutputFormat.OUTDIR :mapreduce.output.fileoutputformat.outputdir
    3.job.waitForCompletion(true)
    -->【job.class】waitForCompletion() //作用:提交job至cluster,并等待完成
      -->判定当前State是否为JobState.DEFINE()定义阶段,如果定义阶段调用submit()
        -->【job.class】submit()
          -->【job.class】connect() //说明:通过UGI(用户组信息对象)构建集群对象(cluster)
            -->【job.class】new Cluster(getConfiguration());
              -->【 Cluster.class】通过静态代码块,加载【mapred-site.xml和yarn-site.xml】和默认的default文件
    通过conf的mapreduce.framework.name的值来返回构建cluster集群对象的客户端协议:local=LocalJobRunner;yarn=YARNRunner
        -->【job.class】JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
          说明:构建job提交器,然后执行submitter.submitJobInternal()
        -->【JobSubmitter.class】submitter.submitJobInternal()
    作用:

      1.检查作业的输入和输出规范
      2.计算作业的InputSplit(逻辑分片)。
      3.如果需要,设置必要的缓存信息;
      4.将作业的jar和配置复制到map-reduce系统分布文件系统上的目录。
      5.提交job至ResourceManager,并监控其状态
        说明:检查;生成JobID,设置job相关参数:
      a.MRJobConfig.JOB_SUBMITHOST :mapreduce.job.submithostname
      b.MRJobConfig.JOB_SUBMITHOSTADDR :mapreduce.job.submithostaddress
      c.MRJobConfig.USER_NAME :mapreduce.job.user.name
      d.MRJobConfig.MAPREDUCE_JOB_DIR : mapreduce.job.dir 【/tmp/hadoop-centos/mapred/staging/centos1104417307/.staging/job_local1104417307_0001】
      e.MRJobConfig.NUM_MAPS : mapreduce.job.maps

    -->【JobSubmitter.class】copyAndConfigureFiles(job, submitJobDir);
    -->【JobResourceUploader.class】 rUploader.uploadFiles(job, jobSubmitDir);
    说明:上传jobjar,配置信息等内容;
    -->【JobSubmitter.class】writeSplits(job, submitJobDir);
    说明:计算split;
    -->【JobSubmitter.class】 writeConf(conf, submitJobFile);
    说明:上传job.xml文件
    -->【JobSubmitter.class】 submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
    说明:真正提交Job
    -->
    -->如果State为JobState.RUNNING运行阶段,轮询job,按5000ms时间间隔
    【mapreduce.client.completion.pollinterval=5000】

    JOB提交流程:
      创建一个job-->resourcemanager获得一个应用-->将运行作业所需要的资源复制到文件系统中-->提交作业-->
    调度器分配一个容器-->启动APPMaster-->初始化作业-->接受输入分片-->若作业不适合作为uber任务运行,Appmaster向资源管理器请求容器-->
    Appmaster启动容器-->资源本地化-->运行map与reduce

    资源调度的相关配置属性:
    ------------------------------------------------
      1.mapreduce.map.memory.mb = 1024 //每个map任务的调度程序请求的内存数量
      2.mapreduce.map.cpu.vcores = 1 //每个map任务从调度程序请求的虚拟内核的数量。
      3.mapreduce.reduce.memory.mb = 1024 //每个reduce任务的调度程序请求的内存数量
      4.mapreduce.reduce.cpu.vcores = 1 //每个reduce任务从调度程序请求的虚拟内核的数量。
      5.mapreduce.tasktracker.map.tasks.maximum = 2 //Nodemanager同时运行的map任务的最大数量。
      6.mapreduce.tasktracker.reduce.tasks.maximum = 2 //Nodemanager同时运行的reduce任务的最大数量。
      7.hadoop中每个守护进程(5个)默认分配1000m内存大小;
    修改【hadoop-env.sh】中,export HADOOP_HEAPSIZE=1000(单位MB)
    修改ResourceManager堆大小:【yarn-env.sh】export YARN_RESOURCEMANAGER_HEAPSIZE=1000
    namenode内存计算原则:参照【P291】
    修改namenode:export HADOOP_NAMENODE_OPTS="-Xmx2000m -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,RFAS} -Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER:-INFO,NullAppender} $HADOOP_NAMENODE_OPTS"
    修改Sencondarynamenode:export HADOOP_SECONDARYNAMENODE_OPTS="-Xmx2000m -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,RFAS} -Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER:-INFO,NullAppender} $HADOOP_SECONDARYNAMENODE_OPTS"
    修改datanode:export HADOOP_DATANODE_OPTS=

      8.yarn.nodemanager.resource.memory-mb = 8192m //每个节点可用物理内存,单位MB,为容器
      9.yarn.scheduler.minimum-allocation-mb = 1024m //单个任务可申请最少内存,默认1024MB
      10.yarn.scheduler.maximum-allocation-mb = 8192m //单个任务可申请最大内存,默认8192MB
      11.yarn.scheduler.minimum-allocation-vcores = 4 //单个任务可申请最小cpu核数,默认1
      12.yarn.scheduler.maximum-allocation-vcores = 32 //单个任务可申请最大cpu核数,默认32
    注意:yarn.scheduler.minimum-allocation-mb <= mapreduce.map.memory.mb <= yarn.scheduler.maximum-allocation-mb,否则抛InvalidResourceRequestException
    一个nodemanager节点运行的map数量 <= yarn.nodemanager.resource.memory-mb / mapreduce.map.memory.mb = 8
    一个nodemanager节点运行的reduce数量 <= yarn.nodemanager.resource.memory-mb / mapreduce.reduce.memory.mb = 8

  • 相关阅读:
    说说移动端web开发中的点击穿透问题
    将博客搬至CSDN
    IIS(4)
    IIS(2)
    IIS(3)
    IIS(1)
    链表
    常用到的关键字
    进程与线程
    文件系统的原理
  • 原文地址:https://www.cnblogs.com/lyr999736/p/9381688.html
Copyright © 2011-2022 走看看