zoukankan      html  css  js  c++  java
  • Hadoop源码分析23:MapReduce的Job提交过程

    命令为:

    hadoop_debugjar /opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar wordcount/user/admin/in/yellow.txt /user/admin/out/555

    首先调用org.apache.hadoop.util.runJar.main

    public static void main(String[]args){

      // 加载Jar /opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar

     JarFile jarFile = new JarFile(fileName); 

      //根据META-INF得知主Classorg/apache/hadoop/examples/ExampleDriver

     Manifest manifest = jarFile.getManifest();

     if (manifest !=null){

         mainClassName =manifest.getMainAttributes().getValue("Main-Class");

     }

     

       //建立本地临时文件夹 /tmp/hadoop-admin

      File tmpDir = newFile(newConfiguration().get("hadoop.tmp.dir"));

      tmpDir.mkdirs();

     

       //建立本地工作文件夹 /tmp/hadoop-admin/hadoop-unjar4705742737164408087               finalFile workDir = File.createTempFile("hadoop-unjar", "", tmpDir);

       workDir.delete();

       workDir.mkdirs();

    //JVM退出时将tmp/hadoop-admin/hadoop-unjar4705742737164408087删除

    Runtime.getRuntime().addShutdownHook(newThread(){

           publicvoidrun(){

             try{

               FileUtil.fullyDelete(workDir);

             } catch(IOExceptione) {

             }

           }

         });

      //Jar包解压到/tmp/hadoop-admin/hadoop-unjar4705742737164408087               

       unJar(file, workDir);

     

      ///tmp/hadoop-admin/hadoop-unjar4705742737164408087/tmp/hadoop-admin/hadoop-unjar4705742737164408087/classes/,/tmp/hadoop-admin/hadoop-unjar4705742737164408087/lib全部添加到classpath

       classPath.add(newFile(workDir+"/").toURL());

       classPath.add(file.toURL());

       classPath.add(newFile(workDir,"classes/").toURL());

       File[] libs = newFile(workDir,"lib").listFiles();

       if(libs!= null){

         for(inti = 0;i libs.length;i++) {

           classPath.add(libs[i].toURL());

         }

       }

     

      //运行主函数

     main.invoke(null,newObject[]{ newArgs });

    }  

    设置属性:

    job.setJarByClass(WordCount.class);         //mapred.jar

    job.setMapperClass(WordCountMap.class);     //mapreduce.map.class

    job.setReducerClass(WordCountReduce.class); //mapreduce.reduce.class

    job.setCombinerClass(WordCountReduce.class);//mapreduce.combine.class

    job.setMapOutputKeyClass(Text.class);       //mapred.mapoutput.key.class

    job.setMapOutputValueClass(IntWritable.class);//mapred.mapoutput.value.class

    job.setOutputKeyClass(Text.class);            //mapred.output.key.class

    job.setOutputValueClass(IntWritable.class);   //mapred.output.value.class

    job.setJobName("WordCount");                 //mapred.job.name

     

    FileInputFormat.addInputPath(job,input);    //mapred.input.dir

    FileOutputFormat.setOutputPath(job,output);  //mapred.output.dir

     

     

    job.submit()

     

     publicvoidsubmit()throwsIOException,InterruptedException,

                                 ClassNotFoundException {

       ......

       // Connect tothe JobTracker and submit the job

       connect();

       info=jobClient.submitJobInternal(conf);

       ......

      }

     

     

    连接JobTracker

     

    privatevoidconnect()throwsIOException,InterruptedException {

           ......

           jobClient=newJobClient((JobConf)getConfiguration());   

           ......

          

     }

     

    其中:

     publicJobClient(JobConfconf) throwsIOException{

       ......

       init(conf);

     }

    publicvoidinit(JobConfconf) throwsIOException{

        ......

        this.jobSubmitClient=createRPCProxy(JobTracker.getAddress(conf),conf);

     }

     privatestaticJobSubmissionProtocolcreateRPCProxy(InetSocketAddress addr,

         Configuration conf) throwsIOException{

       return(JobSubmissionProtocol)RPC.getProxy(JobSubmissionProtocol.class,

           JobSubmissionProtocol.versionID,addr,

           UserGroupInformation.getCurrentUser(), conf,

           NetUtils.getSocketFactory(conf,JobSubmissionProtocol.class));

     }

     

    此时获得一个实现JobSubmissionProtocolRPC调用,即JobTracker的代理。

     

    获取job StagingArea

     

    PathjobStagingArea =JobSubmissionFiles.getStagingDir(JobClient.this,

               jobCopy);

    RPC请求:JobSubmissionProtocol.getStagingAreaDir()

    返回:hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging

     

    RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/Admin/.staging)

    返回:org.apache.hadoop.hdfs.protocol.HdfsFileStatus@5521691b,即存在

     

    RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/Admin/.staging)

    返回:org.apache.hadoop.hdfs.protocol.HdfsFileStatus@726c554,用以判断权限

     

    获得 NewJobId

    JobIDjobId = jobSubmitClient.getNewJobId();

     

    RPC请求:JobSubmissionProtocol.getNewJobId()

    返回:job_201404010621_0004

     

    建立 submitJob Dir

    PathsubmitJobDir = newPath(jobStagingArea,jobId.toString());

     

    hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004

     

    复制JarHDFS

    copyAndConfigureFiles(jobCopy,submitJobDir);

     

    RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004)

    返回:null

     

    RPC请求:ClientProtocol.mkdirs(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004,rwxr-xr-x)

    返回:true

     

    RPC请求:ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004,rwx------)

    返回:null

     

    RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar

    返回:null,即不存在

     

    RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar,rwxr-xr-x, DFSClient_-1317833261, true, true, 3,67108864)

    返回:输出流

     

    RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar,DFSClient_-1317833261, null)

    返回:org.apache.hadoop.hdfs.protocol.LocatedBlock@1a9b701

    Blockblk_6689254996395759186_2720

    BlockTokenIdent: ,Pass: , Kind: , Service:

    DataNode[10.1.1.103:50010,10.1.1.102:50010]

     

    RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar,DFSClient_-1317833261

    返回:true

     

    RPC请求:ClientProtocol.setReplication(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar,10)

    返回:true

     

    RPC请求:ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar,rw-r--r--)

    返回:null

     

    RPC请求:ClientProtocol.renewLease(DFSClient_-1317833261)

    返回:null

    此后有1个守护线程会不断发送 renewLease请求

     

    此时本地文件/opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar被复制到HDFS文件系统/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.xml

     

    Reduce数目:

    int reduces= jobCopy.getNumReduceTasks();

    reduce数目为2

     

    检查输出目录

    RPC请求:ClientProtocol.getFileInfo(/user/admin/out/555)

    返回:null,即不存在

     

    获取输入分片信息:

    int maps =writeSplits(context, submitJobDir);

    其中:

     privateT extendsInputSplit intwriteNewSplits(JobContextjob, Path jobSubmitDir) throwsIOException,

       InterruptedException, ClassNotFoundException {

       Configuration conf = job.getConfiguration();

       InputFormat?, ? input =

         ReflectionUtils.newInstance(job.getInputFormatClass(),conf);

     

       ListInputSplit splits =input.getSplits(job);

       T[] array = (T[]) splits.toArray(newInputSplit[splits.size()]);

     

       // sort thesplits into order based on size, so that the biggest

       // gofirst

       Arrays.sort(array, newSplitComparator());

       JobSplitWriter.createSplitFiles(jobSubmitDir,conf,

           jobSubmitDir.getFileSystem(conf), array);

       returnarray.length;

     }

      

    其中:

     publicListInputSplitgetSplits(JobContextjob

                                       ) throwsIOException{

      ...........

     }

     

    RPC请求:ClientProtocol.getFileInfo(/user/admin/in/yellow.txt)

    返回:path="hdfs://server1:9000/user/admin/in/yellow.txt",length=201000000,isdir=false,block_replication=3, blocksize=67108864, permission=rw-r--r--,owner=Admin, group=supergroup

     

    RPC请求:ClientProtocol.getBlockLocations(/user/admin/in/yellow.txt,0, 201000000)

    返回:3BlockLocation

    offset={0},        length={67108864}, hosts={server3,server2}, names={[10.1.1.102:50010, 10.1.1.103:50010]}, topologyPaths={[/default-rack/10.1.1.103:50010,/default-rack/10.1.1.102:50010]}

    offset={67108864}, length={67108864}, hosts={server3,server2}, names={[10.1.1.102:50010, 10.1.1.103:50010]}, topologyPaths={[/default-rack/10.1.1.103:50010,/default-rack/10.1.1.102:50010]}

    offset={134217728},length={66782272}, hosts={server3,server2}, names={[10.1.1.102:50010, 10.1.1.103:50010]},topologyPaths={[/default-rack/10.1.1.103:50010,/default-rack/10.1.1.102:50010]}

     

    最终确定的分片信息 3Filespit

    Filespit file={hdfs://server1:9000/user/admin/in/yellow.txt},hosts={ [server3, server2] }, length={ 67108864 },start={0}

    Filespit file={hdfs://server1:9000/user/admin/in/yellow.txt},hosts={ [server3, server2] }, length={ 67108864 },start={67108864}

    Filespit file={hdfs://server1:9000/user/admin/in/yellow.txt},hosts={ [server3, server2] }, length={ 66782272}, start={134217728}

     

    map数量为3

    jobCopy.setNumMapTasks(maps);

     

    建立分片文件:

    JobSplitWriter.createSplitFiles(jobSubmitDir,conf,

           jobSubmitDir.getFileSystem(conf), array);

    RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split,rwxr-xr-x, DFSClient_-1317833261, true, true, 3,67108864);

    返回:输出流

     

    RPC请求:ClientProtocolsetPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split,rw-r--r--)

    返回:null

     

    RPC请求:ClientProtocol.setReplication(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split,10)

    返回:true

     

    RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split,DFSClient_-1317833261, null)

    返回:LocatedBlock对象为

     

    Block blockid=-921399365952861077,generationStamp=2714numBytes=0

    BlockTokenIdentifierIdent: ,Pass: , Kind: , Service:

    DatanodeInfo[][10.1.1.103:50010,10.1.1.102:50010]

    offset0

     

    RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split,DFSClient_-1317833261)

    返回:true

     

    写入的 SplitMetaInfo

    [data-size :67108864 start-offset : 7 locations :  server3  server2]

    [data-size :67108864 start-offset : 116 locations:   server2 server3]

    [data-size :66782272 start-offset : 225 locations : server2  server3 ]

     

     RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo,rwxr-xr-x, DFSClient_-1317833261, true, true, 3,67108864)

    返回:输出流

     

    RPC请求: ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo,rw-r--r--)

    返回:null

     

    RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo,DFSClient_-1317833261, null)

    返回:LocatedBlock对象为

     

    Block blockid=789965327875207186,generationStamp=2715numBytes=0

    BlockTokenIdentifierIdent: ,Pass: , Kind: , Service:

    DatanodeInfo[][10.1.1.103:50010,10.1.1.102:50010]

    offset0

     

    RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo,DFSClient_-1317833261)

    返回:true

     

    设置AccessControl

    RPC请求:JobSubmissionProtocol.getQueueAdmins(default)

    返回:All usersare allowed

     

    Write jobfile to JobTracker'sfs  

    RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml,rwxr-xr-x, DFSClient_-1317833261, true, true, 3,67108864)

    返回:输出流

     

    RPC请求:ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml,rw-r--r--)

    返回:null

     

    RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xmlDFSClient_-1317833261,null)

    返回:LocatedBlock对象为

     

    Block blockid= -7725157033540829125,generationStamp= 2716numBytes=0

    BlockTokenIdentifierIdent: ,Pass: , Kind: , Service:

    DatanodeInfo[][10.1.1.103:50010,10.1.1.102:50010]

    offset0

     

    RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml,DFSClient_-1317833261)

    返回:true

     

    此时"/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/"下生成文件 job.xml,包含了所有的配置信息.

    此时HDFS目录"/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/"下面文件为:

    -rw-r--r--  10 admin supergroup    142465 2014-04-08 00:20  job.jar

    -rw-r--r--  10 admin supergroup       334 2014-04-08 00:45    job.split

    -rw-r--r--  3 admin supergroup        80 2014-04-08 00:50      job.splitmetainfo

    -rw-r--r--  3 admin supergroup  20416 2014-04-08 00:55job.xml

    job.jar 为运行的Jar,  job.split内容 为(FileSplit 对象), job.splitmetainfo内容 为(SplitMetaInfo对象),job.xml job的配置文件

     

    提交作业:

    status= jobSubmitClient.submitJob(

                 jobId, submitJobDir.toString(),jobCopy.getCredentials());

     

    RPC请求:JobSubmissionProtocol.submitJob(job_201404010621_0004,hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004,org.apache.hadoop.security.Credentials@70677770)

    返回:JobStatus setProgress=0mapProgress=0reduceProgress=0cleanProgress=0runstate=4priority=NOMAL..

     

    RPC请求:JobSubmissionProtocol.getJobProfile(job_201404010621_0004

    返回:JobProfilejobFile=hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xmljobID=job_201404010621_0004name=WordCountqueue=defaulturl=http://server1:50030/jobdetails.jsp?jobid=job_201404010621_0004user=Admin

     

    综合JobStatusJobProfile

    Job:job_201404010621_0004

    file:hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.xml

    tracking URL:http://server1:50030/jobdetails.jsp?jobid=job_201404010621_0004

    map()completion: 0.0

    reduce()completion: 0.0

     

    监控Job状态:

    jobClient.monitorAndPrintJob(conf,info);

     

    RPC请求:JobSubmissionProtocol.getJobStatus(job_201404010621_0004)

    返回:   setProgress=1mapProgress=1reduceProgress=0.22222224cleanProgress=1runstate=1priority=NOMAL

     

    RPC请求:JobSubmissionProtocol.getJobStatus(job_201404010621_0004)

    返回:   setProgress=1mapProgress=1reduceProgress=1cleanProgress=1runstate=2priority=NOMAL

     

    map 100%reduce 100%

    之后会多次发送JobSubmissionProtocol.getJobStatus(job_201404010621_0004)请求

     

    RPC请求:JobSubmissionProtocol.getTaskCompletionEvents(job_201404010621_0004,0, 10)

    返回: [Task Id :attempt_201404010621_0004_m_000004_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_m_000002_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_m_000000_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_m_000001_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_m_000000_1, Status : KILLED, Task Id :attempt_201404010621_0004_r_000000_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_r_000001_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_m_000003_0, Status :SUCCEEDED]

     

     

    RPC请求:JobSubmissionProtocol.getJobCounters(job_201404010621_0004)

    返回:OW[class=classorg.apache.hadoop.mapred.Counters,value=Counters: 29

          Job Counters

                 Launched reduce tasks=2

                 SLOTS_MILLIS_MAPS=293879

                 Total time spent by all reduces waiting after reserving slots(ms)=0

                 Total time spent by all maps waiting after reserving slots(ms)=0

                 Launched map tasks=4

                 Data-local map tasks=4

                 SLOTS_MILLIS_REDUCES=74342

          File Output Format Counters

                 Bytes Written=933

          FileSystemCounters

                 FILE_BYTES_READ=316152

                 HDFS_BYTES_READ=201008521

                 FILE_BYTES_WRITTEN=370366

                 HDFS_BYTES_WRITTEN=933

          File Input Format Counters

                 Bytes Read=201008194

          Map-Reduce Framework

                 Map output materialized bytes=2574

                 Map input records=15600000

                 Reduce shuffle bytes=2574

                 Spilled Records=23025

                 Map output bytes=356000000

                 Total committed heap usage (bytes)=378023936

                 CPU time spent (ms)=158350

                 Combine input records=41011850

                 SPLIT_RAW_BYTES=327

                 Reduce input records=225

                 Reduce input groups=75

                 Combine output records=12075

                 Physical memory (bytes) snapshot=650371072

                 Reduce output records=75

                 Virtual memory (bytes) snapshot=5300277248

                 Map output records=41000000]

     

     

     

  • 相关阅读:
    强关联二维材料1T—TaS2晶体
    超薄二维Mo2C晶体
    稀有的二维狄拉克材料
    大自然中的几何植物
    字符串的内建函数
    python数据模型(特殊方法)
    插入排序
    下载谷歌浏览器(Chrome)扩展离线安装包crx文件最简单的方法
    33. 高精度练习之乘法
    32. 整数加法
  • 原文地址:https://www.cnblogs.com/leeeee/p/7276499.html
Copyright © 2011-2022 走看看