命令为:
hadoop_debugjar /opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar wordcount/user/admin/in/yellow.txt /user/admin/out/555
首先调用org.apache.hadoop.util.runJar.main
public static void main(String[]args){
// 加载Jar包 /opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar
JarFile jarFile = new JarFile(fileName);
//根据META-INF得知主Class为org/apache/hadoop/examples/ExampleDriver
Manifest manifest = jarFile.getManifest();
if (manifest !=null){
mainClassName =manifest.getMainAttributes().getValue("Main-Class");
}
//建立本地临时文件夹 /tmp/hadoop-admin
File tmpDir = newFile(newConfiguration().get("hadoop.tmp.dir"));
tmpDir.mkdirs();
//建立本地工作文件夹 /tmp/hadoop-admin/hadoop-unjar4705742737164408087 finalFile workDir = File.createTempFile("hadoop-unjar", "", tmpDir);
workDir.delete();
workDir.mkdirs();
//JVM退出时将tmp/hadoop-admin/hadoop-unjar4705742737164408087删除
Runtime.getRuntime().addShutdownHook(newThread(){
publicvoidrun(){
try{
FileUtil.fullyDelete(workDir);
} catch(IOExceptione) {
}
}
});
//将Jar包解压到/tmp/hadoop-admin/hadoop-unjar4705742737164408087
unJar(file, workDir);
//将/tmp/hadoop-admin/hadoop-unjar4705742737164408087,/tmp/hadoop-admin/hadoop-unjar4705742737164408087/classes/,/tmp/hadoop-admin/hadoop-unjar4705742737164408087/lib全部添加到classpath
classPath.add(newFile(workDir+"/").toURL());
classPath.add(file.toURL());
classPath.add(newFile(workDir,"classes/").toURL());
File[] libs = newFile(workDir,"lib").listFiles();
if(libs!= null){
for(inti = 0;i <libs.length;i++) {
classPath.add(libs[i].toURL());
}
}
//运行主函数
main.invoke(null,newObject[]{ newArgs });
}
设置属性:
job.setJarByClass(WordCount.class); //mapred.jar
job.setMapperClass(WordCountMap.class); //mapreduce.map.class
job.setReducerClass(WordCountReduce.class); //mapreduce.reduce.class
job.setCombinerClass(WordCountReduce.class);//mapreduce.combine.class
job.setMapOutputKeyClass(Text.class); //mapred.mapoutput.key.class
job.setMapOutputValueClass(IntWritable.class);//mapred.mapoutput.value.class
job.setOutputKeyClass(Text.class); //mapred.output.key.class
job.setOutputValueClass(IntWritable.class); //mapred.output.value.class
job.setJobName("WordCount"); //mapred.job.name
FileInputFormat.addInputPath(job,input); //mapred.input.dir
FileOutputFormat.setOutputPath(job,output); //mapred.output.dir
job.submit()
publicvoidsubmit()throwsIOException,InterruptedException,
ClassNotFoundException {
......
// Connect tothe JobTracker and submit the job
connect();
info=jobClient.submitJobInternal(conf);
......
}
连接JobTracker:
privatevoidconnect()throwsIOException,InterruptedException {
......
jobClient=newJobClient((JobConf)getConfiguration());
......
}
其中:
publicJobClient(JobConfconf) throwsIOException{
......
init(conf);
}
publicvoidinit(JobConfconf) throwsIOException{
......
this.jobSubmitClient=createRPCProxy(JobTracker.getAddress(conf),conf);
}
privatestaticJobSubmissionProtocolcreateRPCProxy(InetSocketAddress addr,
Configuration conf) throwsIOException{
return(JobSubmissionProtocol)RPC.getProxy(JobSubmissionProtocol.class,
JobSubmissionProtocol.versionID,addr,
UserGroupInformation.getCurrentUser(), conf,
NetUtils.getSocketFactory(conf,JobSubmissionProtocol.class));
}
此时获得一个实现JobSubmissionProtocol的RPC调用,即JobTracker的代理。
获取job StagingArea
PathjobStagingArea =JobSubmissionFiles.getStagingDir(JobClient.this,
jobCopy);
RPC请求:JobSubmissionProtocol.getStagingAreaDir()
返回:hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging
RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/Admin/.staging)
返回:org.apache.hadoop.hdfs.protocol.HdfsFileStatus@5521691b,即存在
RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/Admin/.staging)
返回:org.apache.hadoop.hdfs.protocol.HdfsFileStatus@726c554,用以判断权限
获得 NewJobId
JobIDjobId = jobSubmitClient.getNewJobId();
RPC请求:JobSubmissionProtocol.getNewJobId()
返回:job_201404010621_0004
建立 submitJob Dir:
PathsubmitJobDir = newPath(jobStagingArea,jobId.toString());
hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004
复制Jar到HDFS
copyAndConfigureFiles(jobCopy,submitJobDir);
RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004)
返回:null
RPC请求:ClientProtocol.mkdirs(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004,rwxr-xr-x)
返回:true
RPC请求:ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004,rwx------)
返回:null
RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar)
返回:null,即不存在
RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar,rwxr-xr-x, DFSClient_-1317833261, true, true, 3,67108864)
返回:输出流
RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar,DFSClient_-1317833261, null)
返回:org.apache.hadoop.hdfs.protocol.LocatedBlock@1a9b701
Block:blk_6689254996395759186_2720
BlockToken:Ident: ,Pass: , Kind: , Service:
DataNode:[10.1.1.103:50010,10.1.1.102:50010]
RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar,DFSClient_-1317833261)
返回:true
RPC请求:ClientProtocol.setReplication(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar,10)
返回:true
RPC请求:ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar,rw-r--r--)
返回:null
RPC请求:ClientProtocol.renewLease(DFSClient_-1317833261)
返回:null
此后有1个守护线程会不断发送 renewLease请求
此时本地文件/opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar被复制到HDFS文件系统/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.xml
Reduce数目:
int reduces= jobCopy.getNumReduceTasks();
reduce数目为2
检查输出目录
RPC请求:ClientProtocol.getFileInfo(/user/admin/out/555)
返回:null,即不存在
获取输入分片信息:
int maps =writeSplits(context, submitJobDir);
其中:
private<T extendsInputSplit> intwriteNewSplits(JobContextjob, Path jobSubmitDir) throwsIOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(),conf);
List<InputSplit> splits =input.getSplits(job);
T[] array = (T[]) splits.toArray(newInputSplit[splits.size()]);
// sort thesplits into order based on size, so that the biggest
// gofirst
Arrays.sort(array, newSplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir,conf,
jobSubmitDir.getFileSystem(conf), array);
returnarray.length;
}
其中:
publicList<InputSplit>getSplits(JobContextjob
) throwsIOException{
...........
}
RPC请求:ClientProtocol.getFileInfo(/user/admin/in/yellow.txt)
返回:path="hdfs://server1:9000/user/admin/in/yellow.txt",length=201000000,isdir=false,block_replication=3, blocksize=67108864, permission=rw-r--r--,owner=Admin, group=supergroup
RPC请求:ClientProtocol.getBlockLocations(/user/admin/in/yellow.txt,0, 201000000)
返回:3个BlockLocation
offset={0}, length={67108864}, hosts={server3,server2}, names={[10.1.1.102:50010, 10.1.1.103:50010]}, topologyPaths={[/default-rack/10.1.1.103:50010,/default-rack/10.1.1.102:50010]}
offset={67108864}, length={67108864}, hosts={server3,server2}, names={[10.1.1.102:50010, 10.1.1.103:50010]}, topologyPaths={[/default-rack/10.1.1.103:50010,/default-rack/10.1.1.102:50010]}
offset={134217728},length={66782272}, hosts={server3,server2}, names={[10.1.1.102:50010, 10.1.1.103:50010]},topologyPaths={[/default-rack/10.1.1.103:50010,/default-rack/10.1.1.102:50010]}
最终确定的分片信息 为3个Filespit
Filespit: file={hdfs://server1:9000/user/admin/in/yellow.txt},hosts={ [server3, server2] }, length={ 67108864 },start={0}
Filespit: file={hdfs://server1:9000/user/admin/in/yellow.txt},hosts={ [server3, server2] }, length={ 67108864 },start={67108864}
Filespit: file={hdfs://server1:9000/user/admin/in/yellow.txt},hosts={ [server3, server2] }, length={ 66782272}, start={134217728}
map数量为3
jobCopy.setNumMapTasks(maps);
建立分片文件:
JobSplitWriter.createSplitFiles(jobSubmitDir,conf,
jobSubmitDir.getFileSystem(conf), array);
RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split,rwxr-xr-x, DFSClient_-1317833261, true, true, 3,67108864);
返回:输出流
RPC请求:ClientProtocolsetPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split,rw-r--r--)
返回:null
RPC请求:ClientProtocol.setReplication(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split,10)
返回:true
RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split,DFSClient_-1317833261, null)
返回:LocatedBlock对象为
Block: blockid=-921399365952861077,generationStamp=2714,numBytes=0
BlockTokenIdentifier:Ident: ,Pass: , Kind: , Service:
DatanodeInfo[]:[10.1.1.103:50010,10.1.1.102:50010]
offset:0
RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split,DFSClient_-1317833261)
返回:true
写入的 SplitMetaInfo为
[data-size :67108864 start-offset : 7 locations : server3 server2]
[data-size :67108864 start-offset : 116 locations: server2 server3]
[data-size :66782272 start-offset : 225 locations : server2 server3 ]
RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo,rwxr-xr-x, DFSClient_-1317833261, true, true, 3,67108864)
返回:输出流
RPC请求: ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo,rw-r--r--)
返回:null
RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo,DFSClient_-1317833261, null)
返回:LocatedBlock对象为
Block: blockid=789965327875207186,generationStamp=2715,numBytes=0
BlockTokenIdentifier:Ident: ,Pass: , Kind: , Service:
DatanodeInfo[]:[10.1.1.103:50010,10.1.1.102:50010]
offset:0
RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo,DFSClient_-1317833261)
返回:true
设置AccessControl
RPC请求:JobSubmissionProtocol.getQueueAdmins(default)
返回:All usersare allowed
Write jobfile to JobTracker'sfs
RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml,rwxr-xr-x, DFSClient_-1317833261, true, true, 3,67108864)
返回:输出流
RPC请求:ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml,rw-r--r--)
返回:null
RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml,DFSClient_-1317833261,null)
返回:LocatedBlock对象为
Block: blockid= -7725157033540829125,generationStamp= 2716,numBytes=0
BlockTokenIdentifier:Ident: ,Pass: , Kind: , Service:
DatanodeInfo[]:[10.1.1.103:50010,10.1.1.102:50010]
offset:0
RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml,DFSClient_-1317833261)
返回:true
此时"/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/"下生成文件 job.xml,包含了所有的配置信息.
此时HDFS目录"/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/"下面文件为:
-rw-r--r-- 10 admin supergroup 142465 2014-04-08 00:20 job.jar
-rw-r--r-- 10 admin supergroup 334 2014-04-08 00:45 job.split
-rw-r--r-- 3 admin supergroup 80 2014-04-08 00:50 job.splitmetainfo
-rw-r--r-- 3 admin supergroup 20416 2014-04-08 00:55job.xml
job.jar 为运行的Jar包, job.split内容 为(FileSplit 对象), job.splitmetainfo内容 为(SplitMetaInfo对象),job.xml 为job的配置文件
提交作业:
status= jobSubmitClient.submitJob(
jobId, submitJobDir.toString(),jobCopy.getCredentials());
RPC请求:JobSubmissionProtocol.submitJob(job_201404010621_0004,hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004,org.apache.hadoop.security.Credentials@70677770)
返回:JobStatus: setProgress=0,mapProgress=0,reduceProgress=0,cleanProgress=0,runstate=4,priority=NOMAL,..
RPC请求:JobSubmissionProtocol.getJobProfile(job_201404010621_0004)
返回:JobProfile:jobFile=hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml,jobID=job_201404010621_0004,name=WordCount,queue=default,url=http://server1:50030/jobdetails.jsp?jobid=job_201404010621_0004,user=Admin
综合JobStatus和JobProfile
Job:job_201404010621_0004
file:hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.xml
tracking URL:http://server1:50030/jobdetails.jsp?jobid=job_201404010621_0004
map()completion: 0.0
reduce()completion: 0.0
监控Job状态:
jobClient.monitorAndPrintJob(conf,info);
RPC请求:JobSubmissionProtocol.getJobStatus(job_201404010621_0004)
返回: setProgress=1,mapProgress=1,reduceProgress=0.22222224,cleanProgress=1,runstate=1,priority=NOMAL
RPC请求:JobSubmissionProtocol.getJobStatus(job_201404010621_0004)
返回: setProgress=1,mapProgress=1,reduceProgress=1,cleanProgress=1,runstate=2,priority=NOMAL,
即map 100%reduce 100%
之后会多次发送JobSubmissionProtocol.getJobStatus(job_201404010621_0004)请求
RPC请求:JobSubmissionProtocol.getTaskCompletionEvents(job_201404010621_0004,0, 10)
返回: [Task Id :attempt_201404010621_0004_m_000004_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_m_000002_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_m_000000_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_m_000001_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_m_000000_1, Status : KILLED, Task Id :attempt_201404010621_0004_r_000000_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_r_000001_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_m_000003_0, Status :SUCCEEDED]
RPC请求:JobSubmissionProtocol.getJobCounters(job_201404010621_0004)
返回:OW[class=classorg.apache.hadoop.mapred.Counters,value=Counters: 29
Job Counters
Launched reduce tasks=2
SLOTS_MILLIS_MAPS=293879
Total time spent by all reduces waiting after reserving slots(ms)=0
Total time spent by all maps waiting after reserving slots(ms)=0
Launched map tasks=4
Data-local map tasks=4
SLOTS_MILLIS_REDUCES=74342
File Output Format Counters
Bytes Written=933
FileSystemCounters
FILE_BYTES_READ=316152
HDFS_BYTES_READ=201008521
FILE_BYTES_WRITTEN=370366
HDFS_BYTES_WRITTEN=933
File Input Format Counters
Bytes Read=201008194
Map-Reduce Framework
Map output materialized bytes=2574
Map input records=15600000
Reduce shuffle bytes=2574
Spilled Records=23025
Map output bytes=356000000
Total committed heap usage (bytes)=378023936
CPU time spent (ms)=158350
Combine input records=41011850
SPLIT_RAW_BYTES=327
Reduce input records=225
Reduce input groups=75
Combine output records=12075
Physical memory (bytes) snapshot=650371072
Reduce output records=75
Virtual memory (bytes) snapshot=5300277248
Map output records=41000000]