zoukankan      html  css  js  c++  java
  • Hadoop源码分析34 Child的Map

    提交作业:

    hadoop  jar /opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar wordcount /user/admin/in/yellow2.txt /user/admin/out/128

     

    生成2Map2Reduce任务。

     

    执行Maps[0]:

    args=[127.0.0.1, 40996, attempt_201404282305_0001_m_000000_0,/opt/hadoop-1.0.0/logs/userlogs/job_201404282305_0001/attempt_201404282305_0001_m_000000_0,-518526792]

     

    cwd =/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/attempt_201404282305_0001_m_000000_0/work

     

    jobTokenFile= /tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/jobToken

    jvmContext=JvmContext(jvmId=jvm_201404282305_0001_m_,pid=29184)

     

    myTask=JvmTask{shouldDie=false,t=MapTask{taskId=attempt_201404282305_0001_m_000000_0,jobFile="/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml" ,jobSetup=false, jobCleanup=false, taskCleanup     =false, taskStatus=MapTaskStatus { UNASSIGNED},splitMetaInfo=JobSplit$TaskSplitIndex={splitLocation="hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404282305_0001/job.split",startOffset=7     }}

     

    job=JobConf{Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml}

     

    TaskjobContext=JobContext{job=Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml, hdfs-default.xml, hdfs-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml,id=job_201404282305_0001,}

     

    TasktaskContext=TaskAttemptContext(job=Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml, hdfs-default.xml, hdfs-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml,id=job_201404282305_0001,taskId=attempt_201404282305_0001_m_000000_0, reporter=org.apache.hadoop.mapred.Task$TaskReporter@67323b17);

     

    TasktaskStatus=RUNNING

     

    TaskoutputFormat=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat@a27b2d9

     

    Taskcommitter =FileOutputCommitter{outputFileSystem=DFS[DFSClient[clientName=DFSClient_attempt_201404282305_0001_m_000000_0,ugi=admin]]outputPath=/user/admin/out/128workPath=hdfs://server1:9000/user/admin/out/128/_temporary/_attempt_201404282305_0001_m_000000_0}

     

    runNewMapper(job,splitMetaInfo,umbilical, reporter)里面的变量:

     

    taskContext=TaskAttemptContext{ conf=JobConf  {Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml, hdfs-default.xml, hdfs-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml}              jobId=job_201404282305_0001taskId=attempt_201404282305_0001_m_000000_0}

     

    mapper=org.apache.hadoop.examples.WordCount$TokenizerMapper@3e4a762a

     

    inputFormat=org.apache.hadoop.mapreduce.lib.input.TextInputFormat@21833d8a

     

    split=hdfs://server1:9000/user/admin/in/yellow2.txt:0+67108864

     

    input=NewTrackingRecordReader{  

    inputSplit=hdfs://server1:9000/user/admin/in/yellow2.txt:0+67108864

    job= JobConf{Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml, hdfs-default.xml, hdfs-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml}

    real= org.apache.hadoop.mapreduce.lib.input.LineRecordReader@5afbee67

    }

     

    output= NewOutputCollector{

    collector =newMapOutputBuffer{............}

    partitions=2

    partitioner=org.apache.hadoop.mapreduce.lib.partition.HashPartitioner@5c66b7ea

    }

     

    output的成员collector(NewOutputCollector类型)

    job= JobConf{Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml, hdfs-default.xml, hdfs-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml}

    localFs= LocalFileSystem@fdcb254

    partitions =2

    rfs= rawLocalFileSystem

    PARTITION= 0;// partitionoffset in acct

    KEYSTART=1;  // keyoffset in acct

    VALSTART=2;  // valoffset in acct

    ACCTSIZE=3;  // total#fields in acct

    RECSIZE=(ACCTSIZE+ 1) * 4 =16;  // acctbytes per record

     

    spillper=0.8

    recper=0.05

    sortmb=100

    sorter =org.apache.hadoop.util.QuickSort@aa9502d

    maxMemUsage= sortmb << 20;  // 104857600

    recordCapacity= (int)(maxMemUsage* recper);  // 5242880

    recordCapacity-= recordCapacity % RECSIZE// 5242880

    kvbuffer =new byte[maxMemUsage- recordCapacity]; // byte[99614720]

    bufvoid =kvbuffer.length// 99614720

    recordCapacity/= RECSIZE;// 327680

    kvoffsets =new int[recordCapacity]; // int[327680]

    kvindices =new int[recordCapacity* ACCTSIZE]; // int[983040]

    softBufferLimit =(int)(kvbuffer.length*spillper); //79691776

    softRecordLimit =(int)(kvoffsets.length*spillper); //262144

    comparator =job.getOutputKeyComparator();  // Text$Comparator@42b5e6a1

    keyClass= job.getMapOutputKeyClass(); //org.apache.hadoop.io.Text

    valClass= job.getMapOutputValueClass(); //org.apache.hadoop.io.IntWritable

     

    combinerRunner=CombinerRunner.create(job, getTaskID(),

                                                combineInputCounter,

                                                reporter, null);

    //NewCombinerRunner{reducerClass=org.apache.hadoop.examples.WordCount$IntSumReducer,taskId=attempt_201404282305_0001_m_000000_0,keyClass=org.apache.hadoop.io.Text,valueClass=org.apache.hadoop.io.IntWritable,comparator=org.apache.hadoop.io.Text$Comparator@42b5e6a1,committer=null}

    combineCollector=newCombineOutputCollector(combineOutputCounter,reporter, conf);

    minSpillsForCombine=job.getInt("min.num.spills.for.combine",3); //3

     

    spillThread= = newSpillThread();

    spillLock=newReentrantLock();

    spillDone=spillLock.newCondition();

    spillReady=spillLock.newCondition();

     

     

    mapperContext= Mapper$Context{taskId=attempt_201404282305_0001_m_000000_0status=""split=hdfs://server1:9000/user/admin/in/yellow2.txt:0+67108864jobId=job_201404282305_0001committer=FileOutputCommitter{outputFileSystem=DFS[DFSClient[clientName=DFSClient_attempt_201404282305_0001_m_000000_0,ugi=admin]]outputPath=/user/admin/out/128workPath=hdfs://server1:9000/user/admin/out/128/_temporary/_attempt_201404282305_0001_m_000000_0}output=MapTask$NewOutputCollector{...与前面同...}reader=MapTask$NewTrackingRecordReader{...与前面input...}                         

    }

     

    input.initialize(split,mapperContext)

    //设置LineRecordReader的成员,打开文件

    start= split.getStart();

    end =start +split.getLength();

    pos =start;

    FSDataInputStreamfileIn = fs.open(split.getPath());

    in =newLineReader(fileIn,job);

    bufferSize=65536;

    buffer =new byte[this.bufferSize];

     

    mapper.run(mapperContext)

    //运行Map程序

     publicvoidrun(Contextcontext) throwsIOException,InterruptedException {

       setup(context);

       while(context.nextKeyValue()){

         map(context.getCurrentKey(), context.getCurrentValue(),context);

       }

       cleanup(context);

     }

    //nextKeyValue()方法,读取一行,读前pos=0,读后key=0value=Yellowpos=7.

    ......

    //读前pos=20,读后key=20value=Lookat the stars; look how they shine for you”,pos=67.

    ......

    //读前pos=68,读后key=68value=Andeverything you do”,pos=90.

    ......

    //

    Map方法:

    publicvoidmap(Objectkey, Text value, Context context

                       ) throwsIOException,InterruptedException {

         StringTokenizer itr = newStringTokenizer(value.toString());

         while(itr.hasMoreTokens()){

           word.set(itr.nextToken());

           context.write(word,one);

         }

       }

    Mapper$Context.write(word,one)

    --> TaskInputOutputContext.write(word,one)

    --> MapTask$NewOutputCollector.write(word,one)

    --> (MapTask.MapOutputCollector)collector.collect(key,value, partitioner.getPartition(key,value, partitions))

     

     

    HashPartitioner.getPartition()定义为:

     

     publicintgetPartition(K key, Vvalue,

                             intnumReduceTasks){

       return(key.hashCode() &Integer.MAX_VALUE)% numReduceTasks;

     }

    即通过KeyHashcode除以Recude的余数,确定属于哪个Reduce

     

     

    collect的处理流程为:

     

    keySerializer.serialize(key);//写入KeyBlockingBufferkvbuffer

    valSerializer.serialize(value);//写入ValueBlockingBufferkvbuffer

     

     

    intind =kvindex*ACCTSIZE;

    kvoffsets[kvindex]= ind; //一级索引,kvindices中的位置

     

    kvindices[ind+ PARTITION]= partition;  //二级索引,属于哪个Reduce

    kvindices[ind+ KEYSTART]=keystart;   //二级索引,Key位置

    kvindices[ind+ VALSTART]=valstart;   //二级索引,Value位置

     

    kvindex=kvnext;

     

     

    SpillThread线程将内存数据排序,并写入本地磁盘

     

    MapTask.MapOutputBuffer.sortAndSpill()处理:

     

    size=2276473partitions=2numSpills=0

    filename=/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/attempt_201404282305_0001_m_000000_0/output/spill0.out

     

    endPosition=262144

     

     

    排序 sorter.sort(MapOutputBuffer.this,kvstart,endPosition, reporter)

     

    Combine:combinerRunner.combine(kvIter,combineCollector);

     

    运行Combine

     

    写入结果keyvaluespill0.outspill1.outspill2.out......

     

    keySerializer.serialize(key);

    valueSerializer.serialize(value);

     

    mergeParts()合并结果

     

    Merge完成以后还要运行combine

    combinerRunner.combine(kvIter,combineCollector);

     

    最后生成两个文件:

    file.out

    file.out.index

     

     

    sortAndSpillmergeParts过程:

     

    Map阶段:主线程在Map阶段将所有结果写入内存kvbufferSpillThread线程将kvbuffer中内容分块进行Sort(快速排序)Combine,写入文件spill0.outspill1.outspill2.out......spill51.out中,每个文件包括了2Reduce的内容。

     

    output.close()阶段:主线程将内存中最后一块进行Sort(快速排序)Combine,写入spill52.out中,每个文件包括了2Reduce的内容。

     

    mergeParts阶段:主线程将文件spill0.outspill1.out......spill52.out加入 优先级队列 MergeQueue (实际是一个小顶堆Heap) 进行合并,每次合并最多使用10个文件,会生成intermediate.0, intermediate.1, intermediate.2, intermediate.3......等临时文件,临时文件也会加入MergeQueue 中。由于spill0.outspill1.out......spill52.out 等文件内部已经是有序的 (局部有序),所以,Merge的时候,从小顶堆 MergeQueue 取得根文件的指针指向的元素(即最小元素),取得最小元素后更新指针,最小元素加入到目标文件中,然后再调节小顶堆 MergeQueue 的顺序。 这样Merge后的文件也将是有序的。

    再进行Combine,写入文件file.out,包括了2Reduce的内容。


    注意这里利用了快速排序 (Quit Sort),小顶堆(Heap)等数据结构和算法。

    注意 i << 1 相当于2*i ,i>>>1 相当于 i/2 .


      

  • 相关阅读:
    hdu1238 Substrings
    CCF试题:高速公路(Targin)
    hdu 1269 迷宫城堡(Targin算法)
    hdu 1253 胜利大逃亡
    NYOJ 55 懒省事的小明
    HDU 1024 Max Sum Plus Plus
    HDU 1087 Super Jumping! Jumping! Jumping!
    HDU 1257 最少拦截系统
    HDU 1069 Monkey and Banana
    HDU 1104 Remainder
  • 原文地址:https://www.cnblogs.com/leeeee/p/7276481.html
Copyright © 2011-2022 走看看