提交作业:
hadoop jar /opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar wordcount /user/admin/in/yellow2.txt /user/admin/out/128
生成2个Map、2个Reduce任务。
执行Maps[0]:
args=[127.0.0.1, 40996, attempt_201404282305_0001_m_000000_0,/opt/hadoop-1.0.0/logs/userlogs/job_201404282305_0001/attempt_201404282305_0001_m_000000_0,-518526792]
cwd =/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/attempt_201404282305_0001_m_000000_0/work
jobTokenFile= /tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/jobToken
jvmContext=JvmContext(jvmId=jvm_201404282305_0001_m_,pid=29184)
myTask=JvmTask{shouldDie=false,t=MapTask{taskId=attempt_201404282305_0001_m_000000_0,jobFile="/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml" ,jobSetup=false, jobCleanup=false, taskCleanup =false, taskStatus=MapTaskStatus { UNASSIGNED},splitMetaInfo=JobSplit$TaskSplitIndex={splitLocation="hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404282305_0001/job.split",startOffset=7 }}
job=JobConf{Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml}
Task的jobContext=JobContext{job=Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml, hdfs-default.xml, hdfs-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml,id=job_201404282305_0001,}
Task的taskContext=TaskAttemptContext(job=Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml, hdfs-default.xml, hdfs-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml,id=job_201404282305_0001,taskId=attempt_201404282305_0001_m_000000_0, reporter=org.apache.hadoop.mapred.Task$TaskReporter@67323b17);
Task的taskStatus=RUNNING
Task的outputFormat=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat@a27b2d9
Task的committer =FileOutputCommitter{outputFileSystem=DFS[DFSClient[clientName=DFSClient_attempt_201404282305_0001_m_000000_0,ugi=admin]],outputPath=/user/admin/out/128,workPath=hdfs://server1:9000/user/admin/out/128/_temporary/_attempt_201404282305_0001_m_000000_0}
runNewMapper(job,splitMetaInfo,umbilical, reporter)里面的变量:
taskContext=TaskAttemptContext{ conf=JobConf {Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml, hdfs-default.xml, hdfs-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml} ,jobId=job_201404282305_0001,taskId=attempt_201404282305_0001_m_000000_0};
mapper=org.apache.hadoop.examples.WordCount$TokenizerMapper@3e4a762a;
inputFormat=org.apache.hadoop.mapreduce.lib.input.TextInputFormat@21833d8a;
split=hdfs://server1:9000/user/admin/in/yellow2.txt:0+67108864;
input=NewTrackingRecordReader{
inputSplit=hdfs://server1:9000/user/admin/in/yellow2.txt:0+67108864,
job= JobConf{Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml, hdfs-default.xml, hdfs-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml}
real= org.apache.hadoop.mapreduce.lib.input.LineRecordReader@5afbee67
};
output= NewOutputCollector{
collector =newMapOutputBuffer{............},
partitions=2,
partitioner=org.apache.hadoop.mapreduce.lib.partition.HashPartitioner@5c66b7ea,
};
output的成员collector(NewOutputCollector类型):
job= JobConf{Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml, hdfs-default.xml, hdfs-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml},
localFs= LocalFileSystem@fdcb254,
partitions =2,
rfs= rawLocalFileSystem,
PARTITION= 0;// partitionoffset in acct
KEYSTART=1; // keyoffset in acct
VALSTART=2; // valoffset in acct
ACCTSIZE=3; // total#fields in acct
RECSIZE=(ACCTSIZE+ 1) * 4 =16; // acctbytes per record
spillper=0.8,
recper=0.05,
sortmb=100,
sorter =org.apache.hadoop.util.QuickSort@aa9502d,
maxMemUsage= sortmb << 20; // 104857600
recordCapacity= (int)(maxMemUsage* recper); // 5242880
recordCapacity-= recordCapacity % RECSIZE; // 5242880
kvbuffer =new byte[maxMemUsage- recordCapacity]; // byte[99614720]
bufvoid =kvbuffer.length; // 99614720
recordCapacity/= RECSIZE;// 327680
kvoffsets =new int[recordCapacity]; // int[327680]
kvindices =new int[recordCapacity* ACCTSIZE]; // int[983040]
softBufferLimit =(int)(kvbuffer.length*spillper); //79691776
softRecordLimit =(int)(kvoffsets.length*spillper); //262144
comparator =job.getOutputKeyComparator(); // Text$Comparator@42b5e6a1
keyClass= job.getMapOutputKeyClass(); //org.apache.hadoop.io.Text
valClass= job.getMapOutputValueClass(); //org.apache.hadoop.io.IntWritable
combinerRunner=CombinerRunner.create(job, getTaskID(),
combineInputCounter,
reporter, null);
//NewCombinerRunner{reducerClass=org.apache.hadoop.examples.WordCount$IntSumReducer,taskId=attempt_201404282305_0001_m_000000_0,keyClass=org.apache.hadoop.io.Text,valueClass=org.apache.hadoop.io.IntWritable,comparator=org.apache.hadoop.io.Text$Comparator@42b5e6a1,committer=null}
combineCollector=newCombineOutputCollector(combineOutputCounter,reporter, conf);
minSpillsForCombine=job.getInt("min.num.spills.for.combine",3); //3
spillThread= = newSpillThread();
spillLock=newReentrantLock();
spillDone=spillLock.newCondition();
spillReady=spillLock.newCondition();
mapperContext= Mapper$Context{taskId=attempt_201404282305_0001_m_000000_0,status="",split=hdfs://server1:9000/user/admin/in/yellow2.txt:0+67108864,jobId=job_201404282305_0001,committer=FileOutputCommitter{outputFileSystem=DFS[DFSClient[clientName=DFSClient_attempt_201404282305_0001_m_000000_0,ugi=admin]],outputPath=/user/admin/out/128,workPath=hdfs://server1:9000/user/admin/out/128/_temporary/_attempt_201404282305_0001_m_000000_0},output=MapTask$NewOutputCollector{...与前面同...},reader=MapTask$NewTrackingRecordReader{...与前面input同...},
}
input.initialize(split,mapperContext)
//设置LineRecordReader的成员,打开文件
start= split.getStart();
end =start +split.getLength();
pos =start;
FSDataInputStreamfileIn = fs.open(split.getPath());
in =newLineReader(fileIn,job);
bufferSize=65536;
buffer =new byte[this.bufferSize];
mapper.run(mapperContext)
//运行Map程序
publicvoidrun(Contextcontext) throwsIOException,InterruptedException {
setup(context);
while(context.nextKeyValue()){
map(context.getCurrentKey(), context.getCurrentValue(),context);
}
cleanup(context);
}
//nextKeyValue()方法,读取一行,读前pos=0,读后key=0,value=Yellow,pos=7.
......
//读前pos=20,读后key=20,value=“Lookat the stars; look how they shine for you”,pos=67.
......
//读前pos=68,读后key=68,value=“Andeverything you do”,pos=90.
......
//
Map方法:
publicvoidmap(Objectkey, Text value, Context context
) throwsIOException,InterruptedException {
StringTokenizer itr = newStringTokenizer(value.toString());
while(itr.hasMoreTokens()){
word.set(itr.nextToken());
context.write(word,one);
}
}
Mapper$Context.write(word,one)
--> TaskInputOutputContext.write(word,one)
--> MapTask$NewOutputCollector.write(word,one)
--> (MapTask.MapOutputCollector)collector.collect(key,value, partitioner.getPartition(key,value, partitions))
HashPartitioner.getPartition()定义为:
publicintgetPartition(K key, Vvalue,
intnumReduceTasks){
return(key.hashCode() &Integer.MAX_VALUE)% numReduceTasks;
}
即通过Key的Hashcode除以Recude的余数,确定属于哪个Reduce
collect的处理流程为:
keySerializer.serialize(key);//写入Key到BlockingBuffer的kvbuffer
valSerializer.serialize(value);//写入Value到BlockingBuffer的kvbuffer
intind =kvindex*ACCTSIZE;
kvoffsets[kvindex]= ind; //一级索引,kvindices中的位置
kvindices[ind+ PARTITION]= partition; //二级索引,属于哪个Reduce
kvindices[ind+ KEYSTART]=keystart; //二级索引,Key位置
kvindices[ind+ VALSTART]=valstart; //二级索引,Value位置
kvindex=kvnext;
SpillThread线程将内存数据排序,并写入本地磁盘
MapTask.MapOutputBuffer.sortAndSpill()处理:
size=2276473,partitions=2,numSpills=0,
filename=/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/attempt_201404282305_0001_m_000000_0/output/spill0.out,
endPosition=262144
排序 sorter.sort(MapOutputBuffer.this,kvstart,endPosition, reporter)
Combine:combinerRunner.combine(kvIter,combineCollector);
运行Combine
写入结果key、value到spill0.out、spill1.out、spill2.out......:
keySerializer.serialize(key);
valueSerializer.serialize(value);
mergeParts()合并结果
Merge完成以后还要运行combine:
combinerRunner.combine(kvIter,combineCollector);
最后生成两个文件:
file.out
file.out.index
sortAndSpill、mergeParts过程:
Map阶段:主线程在Map阶段将所有结果写入内存kvbuffer;SpillThread线程将kvbuffer中内容分块进行Sort(快速排序)、Combine,写入文件spill0.out、spill1.out、spill2.out......spill51.out中,每个文件包括了2个Reduce的内容。
output.close()阶段:主线程将内存中最后一块进行Sort(快速排序)、Combine,写入spill52.out中,每个文件包括了2个Reduce的内容。
mergeParts阶段:主线程将文件spill0.out、spill1.out......spill52.out加入 优先级队列 MergeQueue (实际是一个小顶堆Heap) ,进行合并,每次合并最多使用10个文件,会生成intermediate.0, intermediate.1, intermediate.2, intermediate.3......等临时文件,临时文件也会加入MergeQueue 中。由于spill0.out、spill1.out......spill52.out 等文件内部已经是有序的 (局部有序),所以,Merge的时候,从小顶堆 MergeQueue 取得根文件的指针指向的元素(即最小元素),取得最小元素后更新指针,最小元素加入到目标文件中,然后再调节小顶堆 MergeQueue 的顺序。 这样Merge后的文件也将是有序的。
再进行Combine,写入文件file.out,包括了2个Reduce的内容。
注意这里利用了快速排序 (Quit Sort),小顶堆(Heap)等数据结构和算法。
注意 i << 1 相当于2*i ,i>>>1 相当于 i/2 .